<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Joseph Joshua</title>
    <description>The latest articles on DEV Community by Joseph Joshua (@izzyjosh).</description>
    <link>https://dev.to/izzyjosh</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1238224%2Fb670f1b9-c00a-4a4f-b603-5bea064372da.png</url>
      <title>DEV Community: Joseph Joshua</title>
      <link>https://dev.to/izzyjosh</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/izzyjosh"/>
    <language>en</language>
    <item>
      <title>Detecting &amp; Blocking Anomalous Traffic with Cloud Anomaly Detector</title>
      <dc:creator>Joseph Joshua</dc:creator>
      <pubDate>Tue, 28 Apr 2026 02:38:47 +0000</pubDate>
      <link>https://dev.to/izzyjosh/detecting-blocking-anomalous-traffic-with-cloud-anomaly-detector-2i7n</link>
      <guid>https://dev.to/izzyjosh/detecting-blocking-anomalous-traffic-with-cloud-anomaly-detector-2i7n</guid>
      <description>&lt;p&gt;&lt;strong&gt;A lightweight, containerized anomaly detection system that monitors traffic in real time, detects abuse patterns, and automatically blocks malicious IPs at the host firewall level.&lt;/strong&gt;&lt;/p&gt;




&lt;p&gt;I built a real-time anomaly detection system that monitors nginx access logs, computes adaptive rolling baselines per time window, detects traffic anomalies using statistical methods (z-score + spike multipliers), and automatically blocks malicious IPs using host-level &lt;code&gt;iptables&lt;/code&gt; rules. The system includes Slack alerts and a live dashboard for observability and debugging.&lt;/p&gt;




&lt;h2&gt;
  
  
  Background / Motivation
&lt;/h2&gt;

&lt;p&gt;Modern systems face constant threats such as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;DDoS attacks
&lt;/li&gt;
&lt;li&gt;Credential stuffing
&lt;/li&gt;
&lt;li&gt;API abuse and scraping bots
&lt;/li&gt;
&lt;li&gt;Sudden traffic spikes that degrade service
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Most production solutions rely on expensive managed WAFs or cloud security tools. I wanted to build a &lt;strong&gt;low-cost, self-hosted anomaly detection engine&lt;/strong&gt; that runs entirely on a VPS using logs, statistics, and system-level enforcement.&lt;/p&gt;

&lt;h3&gt;
  
  
  Constraints:
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Must be containerized (Docker-based)&lt;/li&gt;
&lt;li&gt;Must run on low-cost VPS infrastructure&lt;/li&gt;
&lt;li&gt;Must use logs (not packet inspection tools)&lt;/li&gt;
&lt;li&gt;Must enforce bans at host level (not only inside containers)&lt;/li&gt;
&lt;li&gt;Must provide real-time visibility and debugging&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  What I Built
&lt;/h2&gt;

&lt;p&gt;A full-stack anomaly detection pipeline composed of:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Detector Service (Python)&lt;/li&gt;
&lt;li&gt;Baseline Engine (rolling statistical model)&lt;/li&gt;
&lt;li&gt;Blocker Service (iptables enforcement on host)&lt;/li&gt;
&lt;li&gt;Dashboard (real-time monitoring UI)&lt;/li&gt;
&lt;li&gt;Slack Alerting System (incident notifications)&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  How It Works
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Nginx logs every request in structured JSON format.
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"ip"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1.2.3.4"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"endpoint"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"/"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"status"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;200&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1710000000&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  From Logs to Detection
&lt;/h2&gt;

&lt;p&gt;Once nginx writes request logs, the detector continuously processes them in real time.&lt;/p&gt;

&lt;p&gt;Each incoming log entry goes through the following pipeline:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Parse JSON log entry
&lt;/li&gt;
&lt;li&gt;Extract IP, timestamp, and status code
&lt;/li&gt;
&lt;li&gt;Update per-second counters
&lt;/li&gt;
&lt;li&gt;Feed values into rolling baseline engine
&lt;/li&gt;
&lt;li&gt;Evaluate anomaly conditions
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This pipeline runs continuously with minimal latency, ensuring near real-time detection.&lt;/p&gt;




&lt;h2&gt;
  
  
  Rolling Baseline Behavior
&lt;/h2&gt;

&lt;p&gt;The system does not rely on fixed thresholds. Instead, it learns traffic behavior over time.&lt;/p&gt;

&lt;p&gt;For each time window, the baseline tracks:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Average request rate (mean)&lt;/li&gt;
&lt;li&gt;Variance (standard deviation)&lt;/li&gt;
&lt;li&gt;Traffic distribution per second&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This allows the system to adapt dynamically to traffic changes.&lt;/p&gt;

&lt;h3&gt;
  
  
  Example behavior:
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Normal traffic period → stable baseline&lt;/li&gt;
&lt;li&gt;Gradual increase → baseline adjusts slightly&lt;/li&gt;
&lt;li&gt;Sudden spike → deviation becomes statistically significant&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Anomaly Decision Process
&lt;/h2&gt;

&lt;p&gt;Every second, the detector evaluates:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Current request rate vs baseline mean&lt;/li&gt;
&lt;li&gt;Z-score deviation&lt;/li&gt;
&lt;li&gt;Spike multiplier threshold&lt;/li&gt;
&lt;li&gt;Error rate deviation&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If any condition exceeds configured thresholds, the IP or system state is flagged.&lt;/p&gt;

&lt;p&gt;This ensures:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Low false positives during normal usage&lt;/li&gt;
&lt;li&gt;Fast reaction to sudden abuse patterns&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Blocking Execution Flow
&lt;/h2&gt;

&lt;p&gt;When an anomaly is confirmed, the system does not block immediately inside the application layer.&lt;/p&gt;

&lt;p&gt;Instead, it uses a &lt;strong&gt;decoupled enforcement pipeline&lt;/strong&gt;:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;IP is added to a shared ban queue
&lt;/li&gt;
&lt;li&gt;Host worker process reads queue
&lt;/li&gt;
&lt;li&gt;Firewall rule is applied at kernel level
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This ensures:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Separation of detection and enforcement&lt;/li&gt;
&lt;li&gt;Reliability even if app crashes&lt;/li&gt;
&lt;li&gt;Immediate packet-level blocking&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Why Host-Level Blocking Matters
&lt;/h2&gt;

&lt;p&gt;Blocking inside containers or application code is not sufficient because:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Traffic may already be routed through Docker bridge&lt;/li&gt;
&lt;li&gt;App-level blocking still consumes resources&lt;/li&gt;
&lt;li&gt;Reverse proxies may already forward requests&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Using &lt;code&gt;iptables DOCKER-USER&lt;/code&gt; ensures:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Traffic is dropped before it reaches the container network stack&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;This makes enforcement fast and reliable.&lt;/p&gt;




&lt;h2&gt;
  
  
  Observability Layer
&lt;/h2&gt;

&lt;p&gt;To ensure visibility, the system exposes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Live request rate graphs
&lt;/li&gt;
&lt;li&gt;Current baseline values
&lt;/li&gt;
&lt;li&gt;Active banned IP list
&lt;/li&gt;
&lt;li&gt;Recent anomaly events
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The dashboard updates in real time based on detector outputs.&lt;/p&gt;




&lt;h2&gt;
  
  
  Testing Strategy (k6)
&lt;/h2&gt;

&lt;p&gt;The system is validated using controlled load testing:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Gradual ramp-up tests&lt;/li&gt;
&lt;li&gt;Sudden spike injection&lt;/li&gt;
&lt;li&gt;Sustained high traffic simulation&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This ensures:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Baseline accuracy&lt;/li&gt;
&lt;li&gt;Proper Z-score calibration&lt;/li&gt;
&lt;li&gt;Reliable ban triggering&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  System Reliability Design
&lt;/h2&gt;

&lt;p&gt;Several mechanisms improve stability:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Warm-up period (prevents early noise)&lt;/li&gt;
&lt;li&gt;Duplicate ban suppression&lt;/li&gt;
&lt;li&gt;Rolling window smoothing&lt;/li&gt;
&lt;li&gt;Queue-based enforcement (decoupled architecture)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;These ensure the system remains stable under continuous load.&lt;/p&gt;




&lt;h2&gt;
  
  
  Summary of Flow
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Nginx logs requests
&lt;/li&gt;
&lt;li&gt;Detector parses logs
&lt;/li&gt;
&lt;li&gt;Baseline is updated
&lt;/li&gt;
&lt;li&gt;Anomaly detected using statistical rules
&lt;/li&gt;
&lt;li&gt;IP is queued for blocking
&lt;/li&gt;
&lt;li&gt;Host worker applies firewall rule
&lt;/li&gt;
&lt;li&gt;Slack alert is sent
&lt;/li&gt;
&lt;li&gt;Dashboard reflects updated state
&lt;/li&gt;
&lt;/ol&gt;




&lt;p&gt;&lt;strong&gt;Visit repo for code workflow&lt;/strong&gt;: &lt;a href="https://github.com/izzyjosh/cloud-anomaly-detector" rel="noopener noreferrer"&gt;https://github.com/izzyjosh/cloud-anomaly-detector&lt;/a&gt;&lt;/p&gt;

</description>
      <category>programming</category>
      <category>python</category>
      <category>devops</category>
      <category>aws</category>
    </item>
    <item>
      <title>FastAPI + PydanticAI + a2a-protocol</title>
      <dc:creator>Joseph Joshua</dc:creator>
      <pubDate>Mon, 03 Nov 2025 19:48:59 +0000</pubDate>
      <link>https://dev.to/izzyjosh/fastapi-pydanticai-a2a-protocol-2fkb</link>
      <guid>https://dev.to/izzyjosh/fastapi-pydanticai-a2a-protocol-2fkb</guid>
      <description>&lt;p&gt;This post give a full implementation of AI agent with pydantic AI.&lt;/p&gt;

&lt;p&gt;An AI agent combines the function of a LLM with tools that helps the AI interact with the real world. &lt;/p&gt;

&lt;p&gt;Firstly, create your AI agent implemention&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Python import
import os
from typing import List, Optional
from uuid import uuid4

# Library import 
from pydantic_ai import Agent
from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.providers.google import GoogleProvider
from dotenv import load_dotenv
from fastapi.exceptions import HTTPException

# Module import
from models import A2AMessage, GrammarResponse, MessageConfiguration, MessagePart, TaskResult, TaskStatus

load_dotenv()


class GrammarAgent:
    SYSTEM_INSTRUCTIONS = (
            "You are a specialized assistant that helps users correct grammar, spelling, "
            "and phrasing mistakes in text"
            "Your goal is to return correct sentence and explanation"
            "If users provides unrelated topics, politely state that you can only help with grammar or writing task"
            )

    def __init__(self):

        provider = GoogleProvider(api_key=os.getenv("GOOGLE_API_KEY", "no Key"))

        model = GoogleModel("gemini-2.0-flash", provider=provider)

        self.agent = Agent(
                model=model,
                output_type=GrammarResponse,
                system_prompt=self.SYSTEM_INSTRUCTIONS
                )

    async def run(self, message: A2AMessage, context_id: Optional[str] = None, task_id: Optional[str] = None, config: Optional[MessageConfiguration] = None):

        context_id = context_id or str(uuid4())
        task_id = task_id or str(uuid4())

        user_messages = message.parts

        if not user_messages:
            raise ValueError("No message provided")

        # handle last message part
        last_part = user_messages[-1]

        user_text = ""

        if hasattr(last_part, "kind") and last_part.kind == "text":
            user_text = getattr(last_part, "text", "")
        elif hasattr(last_part, "data") and last_part.data:
            data_part = last_part.data[-1]
            if isinstance(data_part, dict) and data_part.get("kind") == "text":
                user_text = data_part.get("text", "").strip()
        else:
            user_text = ""

        if not user_text:
            raise ValueError("No text provided")

        try:
            response = await self.agent.run(user_prompt=user_text)

            response_message = A2AMessage(
                    role="agent",
                    parts=[MessagePart(kind="text", text=response.output.model_dump_json())],
                    taskId=task_id
                    )
            history = [message, response_message]

            task_result = TaskResult(
                    id=task_id,
                    contextId=context_id,
                    status=TaskStatus(state="completed", message=response_message),
                    history=history
                    )

            return task_result
        except Exception as e:
            print(e)
            raise HTTPException(status_code=500, detail=f"internal server error: {str(e)}")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;then implement the API endpoint that expose the agent&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import uvicorn
import os
from models import A2AMessage, JSONRPCRequest, JSONRPCResponse
from agent import GrammarAgent

grammar_agent = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global grammar_agent

    grammar_agent = GrammarAgent()

    yield

    if grammar_agent:
        grammar_agent = None

app = FastAPI(title="Grammar Agent", description="Ai agent for grammatical correction", version="1.0.0", lifespan=lifespan)

@app.post("/a2a/grammar-check")
async def grammar_check(request: Request):
    try:
        body = await request.json()

        if body.get("jsonrpc") != "2.0" or "id" not in body:
            return JSONResponse(
                status_code=400,
                content={
                    "jsonrpc": "2.0",
                    "id": body.get("id"),
                    "error": {
                        "code": -32600,
                        "message": "Invalid Request: jsonrpc must be '2.0' and id is required"
                    }
                }
            )
        rpc_request = JSONRPCRequest(**body)

        messages = []
        context_id = None
        task_id = None
        config = None


        if rpc_request.method == "message/send":
            messages = rpc_request.params.message
            config = rpc_request.params.configuration

        elif rpc_request.method == "execute":
            messages = rpc_request.params.messages
            context_id = rpc_request.params.contextId
            task_id = rpc_request.params.taskId

        result = await grammar_agent.run(
            message=messages,
            context_id=context_id,
            task_id=task_id,
            config=config
        )

        response = JSONRPCResponse(
            id=rpc_request.id,
            result=result
        )

        return response.model_dump()

    except Exception as e:
        return JSONResponse(
            status_code=500,
            content={
                "jsonrpc": "2.0",
                "id": None,
                "error": {
                    "code": -32000,
                    "message": str(e)
                }
            }
        )


if __name__ == "__main__":
    port = int(os.getenv("PORT", 5000))
    uvicorn.run("main:app", host="127.0.0.1", port=port, reload=True)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then set up all necessary schemas for validation&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pydantic import BaseModel, Field
from typing import Literal, Optional, List, Dict, Any
from datetime import datetime
from uuid import uuid4

class GrammarResponse(BaseModel):
    response: str
    explanation: str

class MessagePart(BaseModel):
    kind: Literal["text", "data"]
    text: Optional[str] = None
    data: Optional[List[Dict[str, Any]]] = None

class A2AMessage(BaseModel):
    kind: Literal["message"] = "message"
    role: Literal["user", "agent", "system"]
    parts: List[MessagePart]
    messageId: str = Field(default_factory=lambda: str(uuid4()))
    taskId: Optional[str] = None
    metadata: Optional[Dict[str, Any]] = None

class PushNotificationConfig(BaseModel):
    url: str
    token: Optional[str] = None
    authentication: Optional[Dict[str, Any]] = None

class MessageConfiguration(BaseModel):
    blocking: bool = True
    acceptedOutputModes: List[str] = ["text/plain", "image/png", "image/svg+xml"]
    pushNotificationConfig: Optional[PushNotificationConfig] = None

class MessageParams(BaseModel):
    message: A2AMessage
    configuration: MessageConfiguration = Field(default_factory=MessageConfiguration)

class ExecuteParams(BaseModel):
    contextId: Optional[str] = None
    taskId: Optional[str] = None
    messages: List[A2AMessage]

class JSONRPCRequest(BaseModel):
    jsonrpc: Literal["2.0"]
    id: str
    method: Literal["message/send", "execute"]
    params: MessageParams | ExecuteParams

class TaskStatus(BaseModel):
    state: Literal["working", "completed", "input-required", "failed"]
    timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
    message: Optional[A2AMessage] = None


class Artifact(BaseModel):
    artifactId: str = Field(default_factory=lambda: str(uuid4()))
    name: str
    parts: List[MessagePart]

class TaskResult(BaseModel):
    id: str
    contextId: str
    status: TaskStatus
    artifacts: List[Artifact] = []
    history: List[A2AMessage] = []
    kind: Literal["task"] = "task"

class JSONRPCResponse(BaseModel):
    jsonrpc: Literal["2.0"] = "2.0"
    id: str
    result: Optional[TaskResult] = None
    error: Optional[Dict[str, Any]] = None
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Move forward to set up .env variable&lt;br&gt;
&lt;code&gt;GOOGLE_API_KEY=YOUAPIKEY&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Then you can run your code and enjoy after importing all required dependencies.&lt;/p&gt;

</description>
      <category>agents</category>
      <category>gemini</category>
      <category>ai</category>
      <category>python</category>
    </item>
  </channel>
</rss>
