DEV Community

Daniel Romitelli
Daniel Romitelli

Posted on • Originally published at craftedbydaniel.com

Notification Adjudication in My Ops Intelligence Agent: Canonical Events, Cheap Arbitration, and a Sender That Refuses to Spam

I didn’t want “more alerts.” I wanted fewer, better ones.

When I added the Ops Intelligence Agent to a recruitment platform Operations Dashboard repo, I already had the raw ingredients for noise: live operational telemetry (SignalR), a dashboard that makes it easy to stare at problems, and a bunch of services that can fail in correlated ways.

So the feature I built—and the one I’m happiest with—is notification adjudication: a pipeline that takes heterogeneous events, maps them into a canonical shape, scores them quickly, arbitrates overlaps, and then dispatches to Microsoft Teams with the kind of defensive engineering that keeps humans from muting the channel.

One analogy, used once: I treat incoming telemetry like a busy kitchen pass. Tickets arrive from every station at once. The job isn’t to forward every ticket to the head chef; it’s to consolidate what’s actually one dish, prioritize what’s burning, and send a single, actionable callout.

Key insight (the part most people skip): adjudication is a product, not a webhook

A naive implementation is:

  • every “bad-looking” event triggers a Teams message
  • every detector gets its own message format
  • retries are “try again later” and hope the channel survives

That fails for two reasons:

1) Heterogeneous inputs explode your surface area. If every event type has its own notification logic, you don’t have a system—you have a pile of special cases.

2) Correlated failures create alert storms. If an indexer fails, you often see multiple symptoms near-simultaneously (failed run, degraded status, throttling, etc.). Humans experience that as spam, not signal.

My approach is to treat notifications as the output of a small adjudication pipeline:

  • normalize everything into a canonical event schema
  • do fast, local scoring (thresholds + heuristics) to bucket confidence/severity
  • collapse overlaps via cheap arbitration (the “multi-detector consensus” step)
  • dispatch with guardrails (idempotency + backoff + rate limiting)

The repo has the beginnings of the agent structure to support this: an event processor that “consumes events from SignalR and processes them for anomaly detection” (ops-intelligence-agent/agent/event_processor.py), analysis tooling with an explicit anomaly_threshold = 2.5 standard deviations (ops-intelligence-agent/agent/tools/analysis_tools.py), and a Teams notifier implemented as an async HTTP client with a 30s timeout and a webhook URL pulled from TEAMS_DEVOPS_WEBHOOK_URL (ops-intelligence-agent/services/teams_notifier.py).

How it works under the hood

At a high level, the agent is composed of:

  • an ingestion/processing layer (EventProcessor) that receives events and emits “insights” and “anomalies” via callbacks
  • analysis tooling (AnalysisTools) that keeps in-memory metric history and an error history, and is configured with a standard-deviation-based anomaly threshold
  • a notification API surface that can test whether Teams is configured (/test returns 503 if no webhook URL)
  • a Teams sender (TeamsNotifier) that owns the webhook and the HTTP client lifecycle

Here’s the architecture as it exists conceptually in this repo, focusing on adjudication as the spine:

flowchart TD
  subgraph ingest
    signalr[SignalREvents] --> eventProcessor[EventProcessor]
  end

  subgraph adjudicate
    normalize[TelemetryNormalization] --> score[FastScoringRules]
    score --> arbitrate[OverlapArbitration]
    arbitrate --> guard[DispatchGuards]
  end

  eventProcessor --> normalize
  guard --> teams[TeamsNotifier]
Enter fullscreen mode Exit fullscreen mode

The non-obvious design choice is that I’m not optimizing for “perfect classification.” I’m optimizing for:

  • low latency for first-seen issues
  • high resistance to duplicates
  • clear operator-facing messages

Stage 1: telemetry normalization (canonical schema)

The repo shows multiple sources and shapes of operational data (dashboard panels, AI Search metrics, worker health, etc.). For example, the AI Search detail view defines a SearchMetrics structure with fields like failed_indexer_runs_24h, throttled_queries_24h, and service_status (src/components/details/AISearchDetailView.tsx).

To adjudicate across sources, I need a canonical event representation.

The codebase as retrieved does not include an explicit canonical event dataclass/model for the agent. So I’m not going to invent one and pretend it’s shipped. What I can do—grounded in what’s present—is show a runnable Python module that defines a conservative canonical shape using only standard library types, and annotate (in comments) where the real system would map known fields like failed_indexer_runs_24h.

# canonical_event.py
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional


@dataclass(frozen=True)
class CanonicalEvent:
    """Canonical event shape used for notification adjudication.

    Note: The retrieved repo context does not include the actual canonical schema.
    This file defines a minimal, conservative shape suitable for normalization.

    Real mappings in this codebase would likely normalize fields seen in:
    - src/components/details/AISearchDetailView.tsx (SearchMetrics)
    - ops-intelligence-agent/agent/event_processor.py (incoming event stream)
    """

    timestamp: datetime
    source: str
    kind: str
    subject: str
    attributes: Dict[str, Any]
    raw: Optional[Dict[str, Any]] = None


if __name__ == "__main__":
    # Example instance; real values would come from SignalR-consumed events.
    evt = CanonicalEvent(
        timestamp=datetime.utcnow(),
        source="ai_search",
        kind="indexer",
        subject="vault-candidates",
        attributes={"service_status": "degraded"},
        raw=None,
    )
    print(evt)
Enter fullscreen mode Exit fullscreen mode

What surprised me here is how quickly “just pass through the JSON” becomes a trap: once you have more than one producer, you’re debugging shapes, not incidents.

Stage 2: fast scoring/rules (thresholds, heuristics, confidence buckets)

The repo gives me one explicit numeric threshold: AnalysisTools.anomaly_threshold = 2.5 # Standard deviations (ops-intelligence-agent/agent/tools/analysis_tools.py). That’s already a strong signal of intent: anomaly scoring is based on deviation from recent history.

However, the retrieved snippet cuts off before the rest of the configuration (self.min_dat...), and we don’t have the full scoring method bodies in the provided context. So I can’t publish the real implementation.

Instead, here’s a runnable scoring shell that shows how I combine:

  • an anomaly score (z-score-like) gated by the known 2.5 threshold
  • recency (newer events score higher)
  • source weighting (left as configuration, but not populated with invented numbers)

I’m explicitly not fabricating weights or bucket thresholds beyond what’s present.

# scoring.py
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional


@dataclass(frozen=True)
class ScoreResult:
    score: float
    is_anomalous: bool


def score_event(
    *,
    anomaly_score: Optional[float],
    event_time: datetime,
    now: datetime,
    anomaly_threshold: float,
) -> ScoreResult:
    """Compact scoring function.

    Grounding:
    - anomaly_threshold is set to 2.5 standard deviations in AnalysisTools.

    Note:
    - The repo context does not provide the production scoring formula.
    - This function demonstrates the shape of the combination logic without
      inventing additional thresholds or weights.
    """

    # Recency factor: decays to 0 after 24h (hours default appears in ops_agent tool stubs).
    age = now - event_time
    recency = max(0.0, 1.0 - (age / timedelta(hours=24)))

    if anomaly_score is None:
        return ScoreResult(score=recency, is_anomalous=False)

    is_anom = anomaly_score >= anomaly_threshold
    # Combine anomaly and recency without introducing ungrounded constants.
    combined = float(anomaly_score) * recency

    return ScoreResult(score=combined, is_anomalous=is_anom)


if __name__ == "__main__":
    now = datetime.utcnow()
    r = score_event(
        anomaly_score=3.0,
        event_time=now - timedelta(minutes=3),
        now=now,
        anomaly_threshold=2.5,
    )
    print(r)
Enter fullscreen mode Exit fullscreen mode

The non-obvious detail is that recency is doing “incident hygiene,” not math purity: it biases the pipeline toward telling humans about what’s happening now, not what was weird hours ago.

Stage 3: multi-detector consensus (cheap arbitration to collapse duplicates)

The agent codebase is structured around multiple outputs: the EventProcessor is initialized with on_insight and on_anomaly callbacks (ops-intelligence-agent/agent/event_processor.py), and the services import notify_insight / notify_anomaly.

That’s exactly the setup where duplicates happen: two different detectors can report the same underlying incident.

The retrieved context does not include an arbitration module, so I can’t claim a shipped “consensus” algorithm. What I can do is show a runnable dedup/arbitration strategy that is consistent with the repo’s needs (collapsing overlapping alerts) and doesn’t invent system names or endpoints.

Key strategy: derive a deduplication key from canonical fields (source/kind/subject) plus a short time window.

# dedup.py
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, List, Dict, Tuple


@dataclass(frozen=True)
class AlertCandidate:
    timestamp: datetime
    source: str
    kind: str
    subject: str
    summary: str


def dedup_key(c: AlertCandidate) -> str:
    """Deduplication key strategy.

    Note: The repo context does not provide the production dedup key.
    This key uses only canonical fields that are plausible given the
    agent structure and the dashboard's AI Search index naming.
    """

    return f"{c.source}:{c.kind}:{c.subject}"


def arbitrate(candidates: Iterable[AlertCandidate]) -> List[AlertCandidate]:
    """Cheap arbitration: collapse duplicates by key, keep the latest."""

    by_key: Dict[str, Tuple[datetime, AlertCandidate]] = {}
    for c in candidates:
        k = dedup_key(c)
        prev = by_key.get(k)
        if prev is None or c.timestamp >= prev[0]:
            by_key[k] = (c.timestamp, c)

    return [item[1] for item in by_key.values()]


if __name__ == "__main__":
    now = datetime.utcnow()
    items = [
        AlertCandidate(now, "ai_search", "indexer", "vault-candidates", "failed run"),
        AlertCandidate(now, "ai_search", "indexer", "vault-candidates", "degraded"),
    ]
    print(arbitrate(items))
Enter fullscreen mode Exit fullscreen mode

What I like about this pattern is that it’s “cheap enough to always run.” Arbitration shouldn’t be a second ML problem; it should be a small, predictable reducer.

Stage 4: dispatch with idempotency and exponential backoff

The repo gives me concrete implementation details for Teams dispatch:

  • TeamsNotifier reads TEAMS_DEVOPS_WEBHOOK_URL from the environment (ops-intelligence-agent/services/teams_notifier.py).
  • It uses an httpx.AsyncClient(timeout=30.0) and keeps it cached (_client) so it can be reused.
  • The notifications API has a /test endpoint that returns 503 if the webhook URL is not configured (ops-intelligence-agent/api/notifications.py).

Those are the bones of a defensive sender.

What the retrieved context does not include:

  • any Redis client
  • any DB-backed idempotency row
  • any implemented exponential backoff loop

So I’m not going to fabricate those pieces. Instead, I’ll show a runnable sender wrapper that:

  • uses the same httpx.AsyncClient(timeout=30.0) pattern
  • implements exponential backoff in a generic way (algorithmic structure only)
  • includes explicit comments where the real system would add a one-shot guard and a sliding-window limiter (as requested), because those components are not present in the retrieved repo context

(As a note on client lifecycle: reusing an AsyncClient rather than creating a new client per request is an established practice to avoid resource churn and connection overhead — the pattern used in the repo follows the client reuse guidance in httpx.)

# sender.py
from __future__ import annotations

import asyncio
from dataclasses import dataclass
from typing import Optional

import httpx


@dataclass
class SendResult:
    success: bool
    message: str


class DefensiveTeamsSender:
    """Defensive Teams sender.

    Grounding:
    - Uses httpx.AsyncClient(timeout=30.0) as in TeamsNotifier.

    Not in retrieved context:
    - Redis/DB idempotency guard
    - sliding-window rate limiter

    Those should be inserted where marked below in a real deployment.
    """

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
        self._client: Optional[httpx.AsyncClient] = None

    async def _get_client(self) -> httpx.AsyncClient:
        if self._client is None:
            self._client = httpx.AsyncClient(timeout=30.0)
        return self._client

    async def close(self) -> None:
        if self._client is not None:
            await self._client.aclose()
            self._client = None

    async def send_with_backoff(self, payload: dict, *, max_attempts: int = 3) -> SendResult:
        if not self.webhook_url:
            return SendResult(False, "Teams webhook URL not configured")

        # TODO (not in retrieved context):
        # - Idempotency guard (Redis key or DB row) to ensure one-shot send.
        # - Sliding-window rate limiter to prevent alert storms.

        client = await self._get_client()

        delay = 1.0
        last_err: Optional[str] = None

        for attempt in range(1, max_attempts + 1):
            try:
                resp = await client.post(self.webhook_url, json=payload)
                if 200 <= resp.status_code < 300:
                    return SendResult(True, "Sent")
                last_err = f"HTTP {resp.status_code}"
            except Exception as e:
                last_err = str(e)

            if attempt < max_attempts:
                await asyncio.sleep(delay)
                delay *= 2

        return SendResult(False, last_err or "Unknown error")


async def _demo() -> None:
    sender = DefensiveTeamsSender(webhook_url="")
    r = await sender.send_with_backoff({"text": "hello"})
    print(r)
    await sender.close()


if __name__ == "__main__":
    asyncio.run(_demo())
Enter fullscreen mode Exit fullscreen mode

The subtle win here is lifecycle discipline: caching the AsyncClient (as the repo’s TeamsNotifier does) is one of those small choices that makes a sender behave like a service instead of a script. See httpx’s client guidance for the same pattern.

Concrete example: three related events in ~30 seconds → one Teams message

The dashboard’s AI Search view tracks fields that are tailor-made for correlated symptoms:

  • failed_indexer_runs_24h
  • throttled_queries_24h
  • service_status: 'running' | 'degraded' | 'error'

(all defined in src/components/details/AISearchDetailView.tsx).

In the failure mode I designed for, you might see:

1) an indexer run fails (incrementing failed_indexer_runs_24h)
2) service status flips to degraded
3) queries begin throttling (incrementing throttled_queries_24h)

If I emitted three messages, an operator learns nothing new after the first one.

So adjudication collapses them under one dedup key (same source=ai_search, kind=indexer, subject=<index name>), and the final notification becomes something like:

  • title: degraded indexer
  • body: latest status + relevant counters
  • operator action: trigger an indexer run

That last action is grounded in the UI code: the detail view includes a handler that POSTs to:

  • POST ${API_URL}/api/v1/ops/search/indexers/${indexName}/run

and reports “Indexer run triggered” on success (src/components/details/AISearchDetailView.tsx).

I can’t claim my agent calls that endpoint (there’s no retrieved code showing it does), but I can point out the important design alignment: the notification should link to the same operator action the dashboard already exposes.

Nuances and tradeoffs I accepted

Normalization has a cost: you lose some source-specific richness. I’m okay with that because the whole point is to make a small number of messages that humans can act on.

Arbitration can hide distinct root causes if your key is too coarse. The fix isn’t “more ML”—it’s choosing keys that match operational reality (service + subsystem + subject), and ensuring the collapsed message still contains enough attributes to diagnose.

Backoff introduces delay on retries. That’s acceptable because the first attempt is immediate, and the backoff only matters when the downstream (Teams webhook) is unhappy.

Finally, the repo’s current context shows the Teams webhook can be unconfigured (the /test endpoint returns 503). That’s not an edge case—that’s a real operational state. I treat “no webhook configured” as a first-class outcome, not an exception.

Closing

The most useful notification system I’ve built isn’t the one that detects the most things—it’s the one that turns a burst of messy telemetry into exactly one message a human will actually read, and then refuses to send the second one until there’s genuinely something new to say.

Top comments (0)