DEV Community

Pedro Santos
Pedro Santos

Posted on

Part 3 - Agents That Diagnose, Plan, and Query a Distributed Saga

In the previous posts, I set up LangChain4j and connected AI agents to 5 microservices via MCP. The plumbing was done. Now for the actual agents, the part that made me rethink how I approach operations in distributed systems.

I built 3 agents, each with a different trigger and a different job. None of them are chatbots. They’re background workers and query interfaces that use LLMs to reason over real system data.

Agent 1: OperationsAgent (Auto-Diagnosis on Failure)

Trigger: Kafka consumer on notify-ending topic (only when status = FAIL)
Job: Figure out why a saga failed, find similar past incidents, write a diagnostic report
Storage: pgvector (embeddings) + PostgreSQL (diagnostics table)

This was the first agent I built, and it’s the one that surprised me the most.

How It Works

Every saga, whether it succeeds or fails, ends with a notify-ending event on Kafka. My agent listens to that topic:

@KafkaListener(
    topics = "${spring.kafka.topic.notify-ending}",
    groupId = "ai-agent-group")
public void onSagaEnded(String payload) {
    Event event = objectMapper.readValue(payload, Event.class);

    // Vectorize ALL events, builds the historical base
    String historyText = buildHistoryText(event);
    vectorize(event, historyText);

    // Diagnose only failures
    if (event.getStatus() == FAIL) {
        diagnose(event, historyText);
    }
}
Enter fullscreen mode Exit fullscreen mode

Two things happen here. First, every event gets vectorized, converted to an embedding and stored in pgvector. This builds up a knowledge base over time. Second, failures get diagnosed.

The RAG Pipeline

The diagnosis uses RAG. Before asking the LLM anything, I search for similar past incidents:

private String findSimilarIncidents(String historyText) {
    var queryEmbedding = embeddingModel.embed(historyText).content();
    var results = embeddingStore.search(
        EmbeddingSearchRequest.builder()
            .queryEmbedding(queryEmbedding)
            .maxResults(3)
            .minScore(0.75)
            .build());

    if (results.matches().isEmpty())
        return "No similar incidents found in history.";

    return results.matches().stream()
        .map(m -> "--- Similar incident (score=" +
            String.format("%.2f", m.score()) + ") ---\n" + m.embedded().text())
        .collect(Collectors.joining("\n\n"));
}
Enter fullscreen mode Exit fullscreen mode

The embedding model is Ollama’s nomic-embed-text. Runs locally and costs nothing. The vector store is pgvector on PostgreSQL. Nothing exotic.

Then I build a prompt with the saga history + RAG context and pass it to the agent:

private void diagnose(Event event, String historyText) {
    String ragContext = findSimilarIncidents(historyText);
    String prompt = """
        SAGA FAILED, DIAGNOSE
        OrderId: %s | TransactionId: %s
        Final status: %s | Total amount: R$ %.2f

        SAGA HISTORY:
        %s

        SIMILAR INCIDENTS (RAG):
        %s
        """.formatted(
            event.getOrderId(), event.getTransactionId(),
            event.getStatus(), totalAmount, historyText, ragContext);

    String diagnosis = operationsAgent.analyze(prompt);

    diagnosticRepository.save(SagaDiagnostic.builder()
        .orderId(event.getOrderId())
        .diagnosis(diagnosis)
        .createdAt(LocalDateTime.now())
        .build());
}
Enter fullscreen mode Exit fullscreen mode

The Agent Definition

The agent itself is minimal, just a system prompt defining the output format:

public interface OperationsAgent {

    @SystemMessage("""
        You are a failure diagnosis specialist for distributed sagas.
        You receive the full history of a FAIL saga and similar past incidents.

        Required format:
        ROOT CAUSE: <service and reason>
        AFFECTED SERVICES: <list>
        FINANCIAL IMPACT: <based on totalAmount>
        HISTORICAL PATTERN: <if RAG found similar cases>
        RECOMMENDATION: <corrective action>

        Rules:
        1. Only use the provided context, never invent data.
        2. If no similar incidents found, say so.
        3. Be concise, consumed by a monitoring system.
        """)
    String analyze(@UserMessage String context);
}
Enter fullscreen mode Exit fullscreen mode

No tools here. The OperationsAgent doesn’t need to query anything. All the data arrives via the Kafka event + RAG. It just needs to reason over the context and produce a structured report.

What It Catches

After running this for a while, it started finding patterns I hadn’t noticed. Payment failures from new customers during late hours. Inventory rollbacks always hitting the same product. Fraud scores spiking for a specific order amount range. The RAG context gets better as more events accumulate. The agent learns from your system’s history.

Agent 2: SagaComposerAgent (Dynamic Saga Planning)

Trigger: Scheduled, every 60 seconds in dev, every 30 minutes in production
Job: Decide the optimal execution order for each customer profile
Storage: Redis with TTL (saga-plan:{profile})

This is the weird one. Instead of hardcoding the saga step order, I let the AI decide it based on actual failure data and system metrics.

The Idea

My saga has a default order: Product Validation → Payment → Inventory. If Payment is failing 40% of the time, it’d be smarter to run it first. Fail fast, avoid unnecessary validation calls.

Same logic applies to fraud. A “new customer + high value order” profile with a 30% fraud block rate probably needs a Fraud Validation step before Payment.

How It Works

Every minute, the agent runs for each customer profile:

@Scheduled(fixedDelayString = "${saga.composer.interval:60000}")
public void recomputePlans() {
    for (String profile : profiles) {
        String ragContext = findHistoricalPatterns(profile);
        String metrics = queryMetrics(dataAnalystAgent);
        String stockAlerts = queryStockAlerts(dataAnalystAgent);

        String prompt = buildCompositionPrompt(profile, metrics, stockAlerts, ragContext);
        String planJson = sagaComposerAgent.compose(prompt);

        redis.opsForValue().set("saga-plan:" + profile, planJson, 35, MINUTES);
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice something: the SagaComposerAgent uses the DataAnalystAgent to get current metrics. Agents calling agents.

The Agent Definition

The system prompt is very specific about the output format and decision rules:

public interface SagaComposerAgent {

    @SystemMessage("""
        You are a saga plan architect. Respond ONLY with raw JSON.
        First character MUST be '{', last MUST be '}'.

        Response format:
        {
          "steps": ["PRODUCT_VALIDATION", "FRAUD_VALIDATION", "PAYMENT", "INVENTORY"],
          "reasoning": "reason for the chosen order"
        }

        Decision rules:
        1. Place high-failure services earlier to fail fast.
        2. If INVENTORY failure rate > 30%, place before PAYMENT.
        3. Include FRAUD_VALIDATION for new + high-value or high fraud rate.
        4. Skip FRAUD_VALIDATION for VIP with < 5% fraud and long positive history.
        5. If data is insufficient, use default order.
        """)
    String compose(@UserMessage String profileContext);
}
Enter fullscreen mode Exit fullscreen mode

The Orchestrator Reads the Plan

On the orchestrator side, when a saga starts, it checks Redis:

public String getFirstTopicForOrder(Order order) {
    String profile = classifyProfile(order);  // e.g., "new:high-value"
    String json = redis.opsForValue().get("saga-plan:" + profile);

    if (json == null) return DEFAULT_FIRST_TOPIC;  // fallback

    var steps = objectMapper.readTree(json).get("steps");
    return resolveTopicFromStep(steps.get(0).asText());
}
Enter fullscreen mode Exit fullscreen mode

If Redis has a plan, use it. If not, fall back to the default order. Redis could be down. The plan could be expired. The agent might not have run yet. Doesn’t matter. The AI layer is additive. It never breaks the existing flow.

Example Output

For a new:high-value profile with recent payment failures:

{
  "steps": ["PRODUCT_VALIDATION", "FRAUD_VALIDATION", "PAYMENT", "INVENTORY"],
  "reasoning": "New high-value customer profile. Historical fraud rate 18% warrants early fraud check. Payment placed after fraud validation to avoid unnecessary payment attempts on blocked orders."
}
Enter fullscreen mode Exit fullscreen mode

For a vip:any profile with clean history:

{
  "steps": ["PRODUCT_VALIDATION", "PAYMENT", "INVENTORY"],
  "reasoning": "FRAUD_SKIP_REASON: VIP customer with 2% fraud rate and 98% success rate over 47 orders. No night order patterns detected."
}
Enter fullscreen mode Exit fullscreen mode

Agent 3: DataAnalystAgent (Natural Language Queries)

Trigger: HTTP GET request to /api/agent/chat?question=...
Job: Answer operational questions by querying all microservices via MCP tools
Output: Human-readable analysis

This is the agent that uses MCP most heavily. It connects to all 4 microservices and has 12+ tools available.

The Agent Definition

public interface DataAnalystAgent {

    @SystemMessage("""
        You are a data analyst for distributed sagas. Answer using exclusively
        the available MCP tools. Never invent data.

        Workflow for finding failed sagas:
        1. Extract N from the question, default to 5.
        2. Call listRecentEvents(limit = N + 10) to get enough FAIL events.
        3. Filter where status=FAIL, take only the first N.
        4. For each failed saga:
           a. Call getOrderById(orderId) to get clientType, totalAmount.
           b. Extract hourOfDay from the event timestamp.
           c. Call getFraudRiskScore with the order data.
        5. Report only the N requested sagas.
        """)
    String analyze(@UserMessage String question);
}
Enter fullscreen mode Exit fullscreen mode

The Critical Lesson: Workflow Instructions Beat Tool Descriptions

Look at the ## Workflow section in the system prompt. That’s the most important thing I learned building these agents.

At first, I just described the tools and let the model figure out the workflow. It worked… sometimes. Other times it would call tools in the wrong order, forget to filter by FAIL status, or process 15 sagas when I asked for 5.

Once I wrote explicit step-by-step instructions in the system prompt, the reliability jumped. I told the agent HOW to use the tools, not just WHAT they do. The model still decides which tools to call, but it follows the prescribed workflow.

Example Interaction

Question: “List the 5 most recent failed sagas and assess their fraud risk.”

The agent:

  1. Calls listRecentEvents(limit=15) on order-service
  2. Filters for FAIL status, takes first 5
  3. For each: calls getOrderById() then getFraudRiskScore()
  4. Returns a structured report:

Order 69b6c29f → Payment rejected (R$932.80 > R$500 limit). Fraud score: 12/100 APPROVED. No compensation needed, payment was never processed.

Lessons Learned (the Hard Way)

After building all 3 agents, here are the things I wish someone had told me:

1. MCP beats @Tool for microservices. Not even close. The decoupling alone is worth it. Any agent can connect to any service without code changes.

2. SystemMessage alignment is critical. If your system prompt mentions a tool that doesn’t exist, the agent fails silently. It tries to call it, gets no result, and gives a vague answer. I spent hours debugging this before I realized the prompt referenced getTransactionStatus but the tool was actually named getPaymentStatus.

3. JSON responses from tools win over key=value. I started with "status=SUCCESS | amount=150.00" and switched to ObjectMapper.writeValueAsString(). One line of code, zero parsing bugs on the model side.

4. maxOutputTokens matters more than you think. I set it to 1024 initially. Asking for 5 sagas + fraud scores was consistently truncated. Bumped it to 4096 and the problem disappeared.

5. Virtual threads are not optional. When an agent calls 5 MCP tools, those are HTTP calls. Without virtual threads, they’re sequential and slow. One line in application.yml:

spring:
  threads:
    virtual:
      enabled: true
Enter fullscreen mode Exit fullscreen mode

Parallel MCP calls at zero cost.

6. The AI layer should always be additive. Every piece of AI in my system has a fallback. Redis plan not found? Use default saga order. Diagnosis fails? The saga still completes normally. The AI improves operations but never blocks them.

The Full Picture

Here’s what the system looks like with all 3 agents running.

An order comes in. The orchestrator checks Redis for an AI-generated plan and executes the saga. If the saga fails, the OperationsAgent diagnoses the failure using RAG and saves it to the database. Every minute, the SagaComposerAgent reads metrics and failure patterns, then writes new plans to Redis. And anytime, a developer can ask the DataAnalystAgent “why are payments failing for new customers?” and get a grounded answer.

The agents feed each other. The OperationsAgent’s vectorized events improve the SagaComposerAgent’s RAG context. The DataAnalystAgent’s metrics help the SagaComposerAgent make better plans. It’s a flywheel.

Try It Yourself

The entire project is open source with setup instructions:

You’ll need Docker Compose for the infrastructure (Kafka, PostgreSQL, Redis, MongoDB). You’ll also need Ollama for local embeddings and a Gemini API key (free tier works fine for testing).

The README has step-by-step instructions and a pre-configured Bruno collection with all the API requests.

If you have questions or find issues, open an issue on the repo or drop a comment here. I’m still iterating on the system prompts. They’re never really “done.”


This is part 3 of a 3-part series on integrating AI into a distributed saga system:

  1. Part 1 - Why I Picked LangChain4j Over Spring AI
  2. Part 2 - Connecting AI Agents to Microservices with MCP
  3. Part 3 - Agents That Diagnose, Plan, and Query a Distributed Saga ← you are here

Top comments (0)