DEV Community

Cover image for A Small Tool to Understand How Spam Filtering Works
Leapcell
Leapcell

Posted on

A Small Tool to Understand How Spam Filtering Works

Cover

What's most important when sending an email? The sender and the recipient, but what else?

Beyond that, the most crucial element is undoubtedly spam filtering. Every email client has this feature integrated, and its judgment can determine the fate of an email—whether you ever get to see it.

This article will walk you through the principles of how spam filtering works and guide you in building a small tool to detect spam.

Or—alternatively—you can use this tool to proactively check and revise your own emails to prevent them from being flagged as spam ;-).

How Spam Filtering Works

Spam filtering often relies on a program called Apache SpamAssassin.

Apache SpamAssassin is an open-source spam detection platform maintained by the Apache Software Foundation, which is a widely used tool for many email clients and email filtering tools to classify messages as spam.

It uses a multitude of rules, Bayesian filtering, and network tests to assign a spam “score” to a given email. Generally, an email scoring 5 or above is at high risk of being flagged as spam.

Installing SpamAssassin Locally to Detect Spam

It's important to note that SpamAssassin can only run on Linux, so you will need a Linux operating system or a Docker container.

On Debian/Ubuntu systems, use the following commands to install SpamAssassin:

apt-get update && apt-get install -y spamassassin
sa-update
Enter fullscreen mode Exit fullscreen mode

The sa-update command is used to update SpamAssassin's detection rules to the latest version.

Once installed, we can use it to detect spam. The usage is as follows:

spamassassin -t < input_email.txt > results.txt
Enter fullscreen mode Exit fullscreen mode

This command passes input_email.txt to SpamAssassin and writes the detection results into results.txt.

The content of results.txt will look something like this, with SpamAssassin's score and the reasoning listed at the very end.

X-Spam-Checker-Version: SpamAssassin 4.0.0 (2022-12-13) on 254.254.254.254
X-Spam-Level: 
X-Spam-Status: No, score=0.2 required=5.0 tests=HTML_MESSAGE,
    MIME_HTML_ONLY,MISSING_MID,NO_RECEIVED,
    NO_RELAYS autolearn=no autolearn_force=no version=4.0.0

// ...

Content analysis details:   (0.2 points, 5.0 required)

 pts rule name              description
---- ---------------------- --------------------------------------------------
 0.1 MISSING_MID            Missing Message-Id: header
-0.0 NO_RECEIVED            Informational: message has no Received headers
-0.0 NO_RELAYS              Informational: message was not relayed via SMTP
 0.0 HTML_MESSAGE           BODY: HTML included in message
 0.1 MIME_HTML_ONLY         BODY: Message only has text/html MIME parts
Enter fullscreen mode Exit fullscreen mode

Wrapping SpamAssassin in an API

To allow non-Linux devices to use SpamAssassin or to integrate it with other workflows, we can wrap it in an API.

For example, a typical use case for this API would be: before you click the "Send" button on an email, the content is first sent to the SpamAssassin API. The email is only allowed to be sent if it meets the non-spam criteria.

Next, we'll use Python to create a simple API that accepts the following email fields: subject, html_body, and text_body. It will pass these fields to SpamAssassin and return the validation result.

from fastapi import FastAPI
from datetime import datetime, timezone
from email.utils import format_datetime
from pydantic import BaseModel
import subprocess

def extract_analysis_details(text):
    lines = text.splitlines()

    start_index = None
    for i, line in enumerate(lines):
        if line.strip().startswith("pts rule"):
            start_index = i
            break

    if start_index is None:
        print("No content analysis details found.")
        return []

    data_lines = lines[start_index+2:]
    parsed_lines = []
    for line in data_lines:
        if line.strip() == "":
            break
        parsed_lines.append(line)

    results = []
    current_entry = None

    split_line = lines[start_index+1]
    pts_split, rule_split, *rest = split_line.strip().split(" ")

    pts_start = 0
    pts_end = pts_start + len(pts_split)

    rule_start = pts_end + 1
    rule_end = rule_start + len(rule_split)

    desc_start = rule_end + 1

    for line in parsed_lines:
        pts_str = line[pts_start:pts_end].strip()
        rule_name_str = line[rule_start:rule_end].strip()
        description_str = line[desc_start:].strip()

        if pts_str == "" and rule_name_str == "" and description_str:
            if current_entry:
                current_entry["description"] += " " + description_str
        else:
            current_entry = {
                "pts": pts_str,
                "rule_name": rule_name_str,
                "description": description_str
            }
            results.append(current_entry)

    return results

app = FastAPI()

class Email(BaseModel):
    subject: str
    html_body: str
    text_body: str

@app.post("/spam_check")
def spam_check(email: Email):
    # assemble the full email
    message = f"""From: example@example.com
To: recipient@example.com
Subject: {email.subject}
Date: {format_datetime(datetime.now(timezone.utc))}
MIME-Version: 1.0
Content-Type: multipart/alternative; boundary="__SPAM_ASSASSIN_BOUNDARY__"

--__SPAM_ASSASSIN_BOUNDARY__
Content-Type: text/plain; charset="utf-8"

{email.text_body}

--__SPAM_ASSASSIN_BOUNDARY__
Content-Type: text/html; charset="utf-8"

{email.html_body}

--__SPAM_ASSASSIN_BOUNDARY__--"""

    # Run SpamAssassin and capture the output directly
    output = subprocess.run(["spamassassin", "-t"],
                            input=message.encode('utf-8'),
                            capture_output=True)

    output_str = output.stdout.decode('utf-8', errors='replace')
    details = extract_analysis_details(output_str)
    return {"result": details}
Enter fullscreen mode Exit fullscreen mode

In the code above, we define a helper function extract_analysis_details to extract the reasons for the score from the full output. You can further modify this function, for instance, to filter out certain rules from the results.

Let's test this API. Pass the following parameters:

subject

Claim Your Prize
Enter fullscreen mode Exit fullscreen mode

html_body

<h2>Claim Your Prize</h2>
<p>Dear Winner:</p>
<p>Click the link below to claim your prize.</p>
Enter fullscreen mode Exit fullscreen mode

text_body

Claim Your Prize

Dear Winner:
Click the link below to claim your prize.
Enter fullscreen mode Exit fullscreen mode

The returned result is as follows:

[
  {
    "pts": "0.1",
    "rule_name": "MISSING_MID",
    "description": "Missing Message-Id: header"
  },
  {
    "pts": "-0.0",
    "rule_name": "NO_RECEIVED",
    "description": "Informational: message has no Received headers"
  },
  {
    "pts": "3.1",
    "rule_name": "DEAR_WINNER",
    "description": "BODY: Spam with generic salutation of \"dear winner\""
  },
  {
    "pts": "-0.0",
    "rule_name": "NO_RELAYS",
    "description": "Informational: message was not relayed via SMTP"
  },
  {
    "pts": "0.0",
    "rule_name": "HTML_MESSAGE",
    "description": "BODY: HTML included in message"
  }
]
Enter fullscreen mode Exit fullscreen mode

As you can see, the phrase "Dear winner" is highly suspicious because it is frequently used in various spam emails.

Deploying This API Tool Online

By deploying this small tool online, you can detect spam anytime.

Since this tool is written in Python and requires Apache SpamAssassin to be installed beforehand, it might seem like you can only deploy it using services like AWS EC2 or DigitalOcean. However, these can be expensive and the deployment process is complex.

Are there more suitable tools?

Yes, you can use Leapcell to deploy it.

Leapcell

Leapcell supports the deployment of various languages, including Python, Go, and Rust. It uses a Docker architecture, enabling the installation of various underlying libraries. Most importantly, Leapcell charges only based on the actual number of API calls, meaning it's completely free to have a project sitting idle. This makes it significantly cheaper than platforms like AWS and DigitalOcean.

The deployment steps are simple:

  1. Push the project to GitHub.

  2. In Leapcell, click "Create Service" and select this GitHub project.

  3. Fill in the following command in the "Build Command" field to install SpamAssassin:

apt-get update && apt-get install -y spamassassin
sa-update
pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Example

  1. Click "Submit."

Once deployed, you’ll have an API for spam validating! Whenever the API is invoked, it will run SpamAssassin, score the email, and return the score.


Follow us on X: @LeapcellHQ


Read on our blog

Related Posts:

Top comments (0)