DEV Community

Cover image for Implementation of an LLM Agent in Go and Rust for Market Anomaly Analysis
Zmey56
Zmey56

Posted on

Implementation of an LLM Agent in Go and Rust for Market Anomaly Analysis

Introduction

Classic algorithmic HFT bots live in a vacuum. For them, the market is just a stream of numbers: price, volume, spread, and Z-score. But these metrics know nothing about the real world.

Imagine this: on a crypto exchange or on Polymarket, there is a sudden spike in volume and the asset's Z-score shoots beyond three sigma (|Z| > 3). From the perspective of a classical arbitrage bot, this is an inefficiency to trade, expecting mean reversion.

But why did the spike happen? Was a devastating regulatory report released? Did hackers drain liquidity? Or was it a cascading liquidation of a single whale's positions?

Without understanding the context (news, tweets, on-chain activity), a mathematical spike is not equal to a trading opportunity. Entering a trade aggressively without context is a direct path to becoming exit liquidity for insiders.

Feeding every tick into a neural network is architectural madness (too expensive, and latency is measured in seconds). What we need is a hybrid system:

Mathematical detector: A lightweight service listens to streaming ticks via WebSocket and computes statistics on the fly.

Trigger: As soon as the detector catches an anomaly, it freezes a snapshot of the data.

LLM agent: In the same millisecond, an agent is engaged. It parses external context (news headlines, Twitter) for the specific ticker.

Verdict: The agent returns a conclusion: "The spike was caused by a smart contract exploit. Ignore." or "No negative news detected; stop orders were triggered. Recommendation: trade."

In this paradigm, the LLM does not replace mathematics. The model acts purely as a contextual filter in front of the decision-making module.

System Architecture

If you have ever taken a trading bot out of a local sandbox and deployed it into real production, you know one thing: architecture determines everything. The market does not forgive memory leaks, goroutine blocking, or spaghetti code when an exchange suddenly starts flooding you with 10,000 ticks per second.

Our system is built as a strictly separated hybrid. We have a high-throughput number-crunching stream processor (the Detection Engine) operating in real time, and a heavy, intelligent but slower contextual layer (LLM Agent).

The key rule of this architecture is: the LLM is not involved in every tick. A language model is expensive and slow. It sleeps until the mathematical detector wakes it up with a statistical anomaly event.

To prevent the system from turning into a monolithic monster, we use Clean Architecture. The delivery layer (WebSockets) knows nothing about databases, and the business logic (anomaly detection) is isolated from specific implementations of LLM clients. If we decide to switch from Binance to Polymarket, we simply write a new adapter in delivery without touching the core.

Let's walk through the data flow step by step.

Ingestion (Data Collection)

Everything starts with connections to data sources. We open WebSocket connections to exchanges. The purpose of this layer is simply to read frames, deserialize JSON into our domain structures, and push them into a channel for further processing. No mathematics happens here.

package websocket

import (
 "context"
 "encoding/json"
 "log/slog"

 "github.com/Zmey56/market-anomaly-llm/internal/domain"
 "github.com/gorilla/websocket"
)

type BinanceStream struct {
 url       string
 ticksChan chan<- domain.Tick
 logger    *slog.Logger
}

func NewBinanceStream(url string, ch chan<- domain.Tick, l *slog.Logger) *BinanceStream {
 return &BinanceStream{url: url, ticksChan: ch, logger: l}
}

func (s *BinanceStream) Start(ctx context.Context) error {
 conn, _, err := websocket.DefaultDialer.Dial(s.url, nil)
 if err != nil {
  return err
 }
 defer conn.Close()

 for {
  select {
  case <-ctx.Done():
   return nil
  default:
   _, msg, err := conn.ReadMessage()
   if err != nil {
    s.logger.Error("websocket read error", "error", err)
    continue
   }

   var raw map[string]interface{} // Simplified parsing for example
   json.Unmarshal(msg, &raw)

   // Normalize and send to core
   s.ticksChan <- domain.Tick{
    Symbol: "BTCUSDT",
    Price:  100500.0, // Parse from raw
    Volume: 1.5,
   }
  }
 }
}
Enter fullscreen mode Exit fullscreen mode

Detection Engine (Anomaly Detector)

This is the heart of the system. The usecase layer reads ticks from the channel and runs them through a mathematical engine. We need to determine whether the current price or volume movement is anomalous relative to a historical window.

We use a moving average and standard deviation to compute a Z-score: Z = (X — μ) / σ

If |Z| > 3, the price deviates more than three standard deviations from the norm. This is a mathematical spike. A domain entity Anomaly is generated.

package domain

import "time"

type Tick struct {
 Symbol    string
 Price     float64
 Volume    float64
 Timestamp time.Time
}

type Anomaly struct {
 Symbol      string
 TriggerTick Tick
 ZScore      float64
 WindowMean  float64
 Type        string // "PRICE_SPIKE" or "VOLUME_SPIKE"
}
Enter fullscreen mode Exit fullscreen mode

Here is what the hot analysis loop looks like in analyzer.go. It is important to minimize allocations here (GC pauses are a trader's enemy). If we were chasing nanoseconds, we would move this calculation module to Rust via FFI or a separate gRPC microservice, but for crypto exchanges and probability aggregators Go performs perfectly well.

package usecase

import (
 "context"

 "github.com/Zmey56/market-anomaly-llm/internal/domain"
)

type MathEngine interface {
 PushAndCheck(price float64) (zScore float64, mean float64, isAnomaly bool)
}

type Analyzer struct {
 mathEngine MathEngine
 llmAdvisor LLMAdvisor // Interface for calling the agent
}

func (a *Analyzer) ProcessStream(ctx context.Context, ticks <-chan domain.Tick) {
 for {
  select {
  case <-ctx.Done():
   return
  case tick := <-ticks:
   // In reality, the engine should be separated by Symbol
   zScore, mean, isAnomaly := a.mathEngine.PushAndCheck(tick.Price)

   if isAnomaly {
    anomaly := domain.Anomaly{
     Symbol:      tick.Symbol,
     TriggerTick: tick,
     ZScore:      zScore,
     WindowMean:  mean,
     Type:        "PRICE_SPIKE",
    }
    // Async call to LLM advisor
    go a.llmAdvisor.Analyze(ctx, anomaly)
   }
  }
 }
}
Enter fullscreen mode Exit fullscreen mode

LLM Agent (Contextual Layer)

An anomaly has occurred. The math said: "Look, Bitcoin just jumped 5% in one second." At this point the llm_advisor.go layer kicks in.

Its job is to assemble context (e.g., the last N ticks, possibly fetching recent news via API) and construct a tightly structured prompt for the LLM. We're not asking the model to "chat" — we're demanding it classify the event: news-driven, technical spike, manipulation, or ignore.

The external API call is encapsulated in openai_client.go:

package llm

import (
 "context"
 "fmt"
 "market-anomaly-llm/internal/domain"
 // openai sdk import
)

type OpenAIClient struct {
 apiKey string
}

func (c *OpenAIClient) GetVerdict(ctx context.Context, a domain.Anomaly) (string, error) {
 prompt := fmt.Sprintf(
  "Act as a quantitative analyst. Anomaly detected on %s. "+
  "Trigger price: %.2f, Z-Score: %.2f (Mean: %.2f). "+
  "Determine if this is a technical spike or news-driven based on current market context. "+
  "Return ONLY one of: [TECHNICAL_SPIKE, NEWS_DRIVEN, MANIPULATION, IGNORE].",
  a.Symbol, a.TriggerTick.Price, a.ZScore, a.WindowMean,
 )

 // OpenAI API call...
 return "TECHNICAL_SPIKE", nil // Mocked response
}
Enter fullscreen mode Exit fullscreen mode

Data Storage: Two Databases for Two Tasks

Do not try to push everything into a single database. In algorithmic trading, storage requirements differ dramatically depending on the workload profile.

1. ClickHouse (Analytical Storage)

Purpose: All raw ticks are written here. ClickHouse is a column-oriented database capable of ingesting millions of rows per second without choking. This becomes the foundation for future backtesting. If you want to re-evaluate your Z-score window parameters on a year of historical data, ClickHouse will return aggregations in milliseconds.

2. PostgreSQL (Transactional Storage)

Purpose: Only alert states, user configurations, and LLM verdicts are stored here. Was an alert processed? Was a trading decision made? Here we critically depend on ACID guarantees and strict transactional boundaries (for example, when concurrently updating an order status), which PostgreSQL handles perfectly.

Entry Point and Dependency Injection (Wiring)

The file main.go is where the entire clean architecture is assembled. Here we initialize dependencies (Dependency Injection), pass repositories into use cases, and pass use cases into delivery handlers.

The most important feature for a production-ready service is graceful shutdown. If we kill the container (for example, during a deployment of a new version), we must properly close WebSocket connections, wait for active LLM agent goroutines to finish, and close database connection pools.

package main

import (
 "context"
 "log/slog"
 "os"
 "os/signal"
 "syscall"

 "market-anomaly-llm/internal/delivery/websocket"
 "market-anomaly-llm/internal/domain"
 "market-anomaly-llm/internal/repository/postgres"
 "market-anomaly-llm/internal/repository/llm"
 "market-anomaly-llm/internal/usecase"
 "market-anomaly-llm/pkg/math"
)

func main() {
 logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()

 // 1. Init Repositories
 alertRepo := postgres.NewAlertRepo("postgres://...")
 llmClient := llm.NewOpenAIClient("sk-...")

 // 2. Init Usecases (DI)
 mathEngine := math.NewZScoreEngine(1000) // Window of 1000 ticks
 advisor := usecase.NewLLMAdvisor(llmClient, alertRepo)
 analyzer := usecase.NewAnalyzer(mathEngine, advisor)

 // 3. Init Channels & Delivery
 ticksChan := make(chan domain.Tick, 10000)
 binanceWSS := websocket.NewBinanceStream("wss://...", ticksChan, logger)

 // 4. Start processing
 go analyzer.ProcessStream(ctx, ticksChan)
 go binanceWSS.Start(ctx)

 logger.Info("Market Anomaly Agent started")

 // 5. Graceful Shutdown
 quit := make(chan os.Signal, 1)
 signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
 <-quit

 logger.Info("Shutting down gracefully...")
 cancel() // Cancel context, cascading goroutine shutdown
 // Add wg.Wait() here to guarantee processing completion
}
Enter fullscreen mode Exit fullscreen mode

This structure makes the system predictable. You always know where the mathematics lives, where the network I/O is handled, and where the business rules for AI integration are defined.

Go vs Rust

If you've ever built infrastructure for algorithmic trading or statistical arbitrage, you know the core pain of this domain: there are no universal tools. Trying to build an HFT engine in Python ends in suffering from the GIL and slow computations, while implementing all networking, database interaction, and REST APIs in C++ or Rust sends your time-to-market to infinity.

In our system, there are two fundamentally different classes of problems that require opposite approaches.

I/O-bound (Networking & Orchestration)

  • Maintaining dozens of WebSocket connections to exchanges (Binance, Polymarket)
  • Routing massive inbound message streams (fan-in / fan-out)
  • Calling external APIs (LLM inference, context retrieval)
  • Asynchronous logging and tick storage in ClickHouse and Postgres

CPU-bound (Math & Detection)

  • Continuous rolling window calculations over tens of thousands of ticks per second
  • Real-time computation of variance, standard deviation, and Z-score
  • Potential extension into feature extraction and micro-predictive algorithms

Trying to solve everything with a single language is an architectural compromise. That's why we take a hybrid approach.

When it comes to the Detection Engine, our primary metric is not just throughput, but deterministic latency.

In garbage-collected languages, you never fully control the system. Even Go's modern GC — extremely fast as it is — can occasionally introduce microsecond (or, with poor memory profiles, millisecond) pauses. By Murphy's law, that stop-the-world event will happen exactly when the market experiences an abnormal liquidity spike you needed to capture.

Rust gives us zero-cost abstractions and full memory control. We can implement the hot path without a single heap allocation.

Example: a ring buffer in Rust for rolling mean and variance. This code stays CPU-cache friendly and executes in nanoseconds:

pub struct FastRollingStat {
    window: Vec<f64>,
    capacity: usize,
    head: usize,
    count: usize,
    sum: f64,
    sum_sq: f64,
}

impl FastRollingStat {
    pub fn new(capacity: usize) -> Self {
        Self {
            window: vec![0.0; capacity], // Preallocation, no runtime allocations
            capacity,
            head: 0,
            count: 0,
            sum: 0.0,
            sum_sq: 0.0,
        }
    }

    #[inline(always)]
    pub fn push(&mut self, val: f64) {
        if self.count == self.capacity {
            let old_val = self.window[self.head];
            self.sum -= old_val;
            self.sum_sq -= old_val * old_val;
        } else {
            self.count += 1;
        }

        self.window[self.head] = val;
        self.sum += val;
        self.sum_sq += val * val;
        self.head = (self.head + 1) % self.capacity;
    }

    #[inline]
    pub fn z_score(&self, current_val: f64) -> f64 {
        if self.count < 2 { return 0.0; }
        let mean = self.sum / self.count as f64;
        let variance = (self.sum_sq / self.count as f64) - (mean * mean);
        if variance <= 0.0 { return 0.0; }
        (current_val - mean) / variance.sqrt()
    }
}
Enter fullscreen mode Exit fullscreen mode

Such a module can be compiled with aggressive SIMD optimizations and process millions of ticks per core without a single latency spike.

If Rust is a scalpel for math, Go is the best Swiss army knife for backend systems.

For I/O workloads, we need cheap concurrency. In crypto trading, connections drop constantly. Exchanges change formats on the fly, APIs timeout. We need patterns like Circuit Breakers, Retries, and Graceful Degradation. Writing all of this in Rust (with tokio and complex async lifetime management) can take weeks, while in Go it takes hours.

Goroutines and channels allow us to build clean pipelines — from tick ingestion to LLM calls:

// Orchestration layer: coordinating WSS, detector, and LLM
func (a *AppOrchestrator) RunPipeline(ctx context.Context, ticks <-chan domain.Tick) {
 llmJobs := make(chan domain.Anomaly, 10)

 for i := 0; i < 5; i++ {
  go a.llmWorker(ctx, llmJobs)
 }

 for {
  select {
  case <-ctx.Done():
   return
  case tick := <-ticks:
   anomaly, ok := a.computeEngine.CheckTick(tick)

   if ok {
    select {
    case llmJobs <- anomaly:
    default:
     a.logger.Warn("LLM queue is full, dropping anomaly", "symbol", tick.Symbol)
    }
   }
  }
 }
}

func (a *AppOrchestrator) llmWorker(ctx context.Context, jobs <-chan domain.Anomaly) {
 for anomaly := range jobs {
  verdict, err := a.llmClient.Analyze(ctx, anomaly)
  if err == nil {
   a.db.SaveVerdict(ctx, anomaly.ID, verdict)
  }
 }
Enter fullscreen mode Exit fullscreen mode

How do we combine both worlds?

Options:

  • CGO (FFI): Call Rust directly from Go as a C library. Tempting, but it breaks the goroutine scheduler and adds overhead. You also lose Go's cross-compilation simplicity.
  • IPC / Shared Memory: Great for hardcore HFT setups.
  • gRPC / TCP streams: Run Rust as a separate microservice (sidecar) communicating over a lightweight stream.

We choose a dedicated Compute Layer (gRPC/TCP).

Go maintains WebSocket connections and streams raw data over a persistent gRPC connection to a Rust service. Rust processes data and returns only alerts (asset ID + Z-score).

From a system perspective, you have to be pragmatic. The final product is an LLM-powered agent.

No matter how fast the Rust detector is, once an anomaly is detected, we make an HTTP call to OpenAI or Claude. The LLM response takes 500 ms to several seconds.

In this paradigm, squeezing 10 nanoseconds out of the networking stack (by rewriting orchestration in Rust) is meaningless — Amdahl's law in action. The real bottleneck is model inference.

So:

  • Go is the backbone — handling networking, integrations, routing, and state. It maximizes development speed and maintainability.
  • Rust is surgical — isolated for heavy math, protecting the main pipeline from performance degradation.

This architecture makes the system fast enough to catch anomalies and flexible enough to quickly integrate new AI models and exchanges.

Anomaly Detector in Rust

On architecture diagrams, streaming data processing looks elegant: a WSS box connects to a detector box, and from there an arrow goes to an LLM. But in reality, when a liquidation hits the market and the exchange dumps tens of thousands of events per second into your WebSocket, that "beautiful architecture" starts to choke.

In this chapter, we drop down to the level of memory and CPU to implement the core of our system — a Z-score detector in Rust.

The anomaly detector is the hottest part of our pipeline. Every single tick (trade) received from the exchange passes through it.

Three things are critical here:

  1. Predictable latency: We cannot afford microsecond pauses from garbage collection (GC), which are inevitable in Go or Java. During high volatility, a GC pause means a missed signal.
  2. No allocations in the loop: Heap allocations per tick will kill performance. We need in-place memory operations.
  3. Cache locality: The CPU reads contiguous data in L1/L2 cache much faster than chasing pointers in memory.

Even small string copies or temporary object allocations at HFT scale quickly become bottlenecks. Rust gives guarantees that the code will run in strictly defined time without doing anything behind our back.

To determine whether the current price is anomalous, we compare it to recent history using a fixed-size sliding window.

The key optimization rule: avoid recomputation from scratch. If the window holds 10,000 elements, recomputing the sum each tick is wasteful. Instead, we update incrementally: add the new value and subtract the one that falls out of the window. This reduces mean calculation from O(N) to O(1).

As a data structure, we use VecDeque (a double-ended queue) from the standard library. It's effectively a ring buffer, allowing O(1) push to the back and pop from the front.

Trade-off: VecDeque may split data into two segments in memory when wrapping, which is slightly worse for cache locality than a flat array, but acceptable for a baseline implementation.

Below is the core math engine. It is initialized once and then operates entirely on preallocated memory.

use std::collections::VecDeque;

pub struct AnomalyDetector {
    window: VecDeque<f64>,
    window_size: usize,
    sum: f64,
}

impl AnomalyDetector {
    pub fn new(size: usize) -> Self {
        AnomalyDetector { 
            // Preallocate exactly the window size
            window: VecDeque::with_capacity(size), 
            window_size: size, 
            sum: 0.0 
        }
    }

    pub fn push_and_check(&mut self, price: f64) -> bool {
        // If full, evict oldest value and adjust sum
        if self.window.len() == self.window_size {
            self.sum -= self.window.pop_front().unwrap();
        }

        // Insert new value
        self.window.push_back(price);
        self.sum += price;

        // Wait until window is filled
        if self.window.len() < self.window_size { return false; }

        // Mean (O(1))
        let mean = self.sum / self.window_size as f64;

        // Variance (O(N) — weak point)
        let variance: f64 = self.window.iter()
            .map(|&x| (x - mean).powi(2))
            .sum::<f64>() / self.window_size as f64;

        let std_dev = variance.sqrt();

        // Z-score
        let z_score = if std_dev > 0.0 {
            (price - mean) / std_dev
        } else {
            0.0
        };

        z_score.abs() > 3.0 // anomaly trigger
    }
}
Enter fullscreen mode Exit fullscreen mode

Where we win:
The key line is VecDeque::with_capacity(size). Memory is allocated once at startup. After that, pop_front and push_back only move internal pointers — no allocations in the hot loop (zero allocations). The sum is updated incrementally and instantly.

Where we lose (and will fix next):
Look at the variance calculation. We iterate over the entire window each tick — O(N). With a window of 10,000 and 5,000 ticks/sec, that's 50 million iterations per second. Rust can handle it, but it's still wasted CPU.

In production, this must be replaced with Welford's online algorithm, which updates variance incrementally in O(1) without iterating the full window. The O(N) version is kept here for clarity.

How does this Rust engine interact with the Go orchestrator (which writes to ClickHouse and calls the LLM)?

In the architecture (/Users/alekstut/Project/market-anomaly-llm/internal/usecase/analyzer.go), the detector is isolated as a compute worker. We avoid FFI (CGO), since calling C from Go can block OS threads and interfere with the goroutine scheduler.

Instead, the Rust component runs as a lightweight gRPC service or communicates via a fast TCP socket / ZeroMQ. The Go service streams raw [price, volume] batches continuously. Rust processes them and emits only alerts:

{"symbol": "BTCUSDT", "z_score": 4.1}
Enter fullscreen mode Exit fullscreen mode

Upon receiving an alert, the Go orchestrator pauses the stream, queries llm_advisor.go for context analysis, and writes the verdict to Postgres.

For ultra-low latency, this can be pushed further:

  • Custom ring buffer: Replace VecDeque with a flat [f64; SIZE] and manual head/tail indexing for perfect cache locality.
  • Incremental variance: Implement Welford's algorithm (O(1)).
  • SIMD: Use AVX instructions for vectorized computations (e.g., correlations across multiple pairs).

The detector math is ready.

Implementing the Core of an LLM Agent in Go

So, we already have a math engine in Rust. It sits on a WSS stream, processes tens of thousands of ticks per second, computes sliding windows, and uses memory efficiently. As soon as a spike occurs — the Z-score shoots through the roof — Rust yells: "Anomaly!"

But Rust doesn't know what it was. Is it a real breakout? A short squeeze? Or did Elon Musk tweet about a dog again?

This is where the LLM agent comes in. Its job is to act as a context layer: take raw numbers, query a language model, and return a meaningful verdict.

The key architectural rule: the agent does not participate in per-tick processing. A network call to OpenAI or Anthropic takes seconds. The agent wakes up only on a trigger (alert) from the math detector.

In simple terms, it works like this:

  1. The math engine generates an Anomaly event and pushes it into a Go channel (chan).
  2. A separate goroutine (worker) continuously listens to this channel.
  3. When an anomaly arrives, the goroutine builds a text prompt, injecting the ticker and metrics.
  4. An HTTP call is made to an API (e.g., GPT-4o).
  5. The response is parsed and stored in a database (Postgres).

To avoid spaghetti code, we strictly separate responsibilities using Clean Architecture. We need three files:

  • internal/domain/anomaly.go — data structures. The Anomaly entity (what happened) and Verdict (LLM response).
  • internal/repository/llm/openai_client.go — infrastructure layer. Handles JSON formatting for OpenAI API, HTTP requests, and response parsing. No knowledge of trading.
  • internal/usecase/llm_advisor.go — business logic layer (our Agent). Knows what prompt to build, how to enrich it, and what to do with the response.

Let's look at a basic implementation. For brevity, this shows the core integration (in a real project, it's split across layers).

package usecase

import (
 "context"
 "fmt"

 "market-anomaly-llm/internal/domain"
 // In production, the client is injected via an interface,
 // but here we use the SDK directly for clarity
 "github.com/sashabaranov/go-openai" 
)

// Agent - business logic
type Agent struct {
 client *openai.Client
}

func NewAgent(apiKey string) *Agent {
 return &Agent{
  client: openai.NewClient(apiKey),
 }
}

// AnalyzeAnomaly builds the prompt and queries the LLM
func (a *Agent) AnalyzeAnomaly(ctx context.Context, ticker string, currentPrice, zScore float64) (string, error) {
 prompt := fmt.Sprintf(
  "You are a financial analyst. An anomaly detected for asset %s. "+
  "Current price: %.2f, Z-score: %.2f (strong deviation). "+
  "Describe possible reasons for this movement.", 
  ticker, currentPrice, zScore,
 )

 req := openai.ChatCompletionRequest{
  Model: openai.GPT4o,
  Messages: []openai.ChatCompletionMessage{
   {Role: openai.ChatMessageRoleSystem, Content: "Be concise and to the point."},
   {Role: openai.ChatMessageRoleUser, Content: prompt},
  },
 }

 resp, err := a.client.CreateChatCompletion(ctx, req)
 if err != nil {
  return "", err
 }

 return resp.Choices[0].Message.Content, nil
}
Enter fullscreen mode Exit fullscreen mode

Client: Ideally, openai.NewClient and CreateChatCompletion are hidden behind an interface in internal/repository/llm/openai_client.go. This allows switching to Claude 3.5 Sonnet or a local Llama 3 later without changing the use case layer.

Agent: Prompt construction and orchestration logic live in internal/usecase/llm_advisor.go.

Now the critical part: running the agent without blocking the system. Entry point: cmd/agent/main.go.

We create a buffered channel and start a worker.

package main

import (
 "context"
 "log"
 "time"

 "market-anomaly-llm/internal/domain"
 "market-anomaly-llm/internal/usecase"
)

func main() {
 agent := usecase.NewAgent("sk-your-openai-api-key")

 anomaliesChan := make(chan domain.Anomaly, 100)

 go startLLMWorker(agent, anomaliesChan)

 // ... WSS + detector logic ...

 select {} // block main (replace with graceful shutdown)
}

func startLLMWorker(agent *usecase.Agent, ch <-chan domain.Anomaly) {
 for anomaly := range ch {
  ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

  log.Printf("Starting agent for %s...", anomaly.Ticker)

  verdict, err := agent.AnalyzeAnomaly(ctx, anomaly.Ticker, anomaly.Price, anomaly.ZScore)
  if err != nil {
   log.Printf("LLM error for %s: %v", anomaly.Ticker, err)
  } else {
   log.Printf("Verdict for %s: %s", anomaly.Ticker, verdict)
   // persist to Postgres here
  }

  cancel()
 }
}
Enter fullscreen mode Exit fullscreen mode

Writing this takes 15 minutes. Making it production-grade is a different problem. Key considerations:

Avoid sync calls: Never call LLM APIs in the same thread as tick processing. Models may take 5–10 seconds. Meanwhile, the exchange may generate 50,000 new events.

Anomaly bursts (cascade triggers): During market crashes, the detector may emit dozens of anomalies per second. Sending all to OpenAI will hit HTTP 429. Use a worker pool (2–3 goroutines) or a rate limiter.

Timeouts: Always use context.WithTimeout. If no response in 10 seconds — drop it. In HFT, stale context is useless.

Retry policies: Implement exponential backoff (e.g., 3 retries) for 5xx errors only.

The initial prompt is weak. The model will respond with something generic like "maybe positive news". To get real value, feed it data.

Example of an enriched prompt:

prompt := fmt.Sprintf(`
Context: You are an HFT analyst at a quant fund.
Event: Statistical anomaly in volume and price for %s.
Data: 
- Current price: %.2f
- Z-score: %.2f (strong deviation from moving average)
- Volume change: increased by %.1fx over the last 5 minutes
- Global market trend (BTC): falling

Task: Estimate the probability that this is a liquidation cascade (squeeze) vs a fundamental news event. Respond strictly in JSON:
{"type": "SQUEEZE" | "NEWS", "confidence": 0-100, "reasoning": "brief explanation"}
`, ticker, currentPrice, zScore, volumeMultiplier)
Enter fullscreen mode Exit fullscreen mode

We added volume, market context, and forced structured JSON output. Now Go can parse it (json.Unmarshal) and pass it to a decision module, enabling automated position entry.

Build, Run, and Results

Writing clean code is only half the job. In trading, a system doesn't exist until it's deployed and processing real market chaos. We won't force users to install databases locally or manually compile Rust libraries. In 2026, the industry standard is containerization.

The entire hybrid system is brought up with a single command.

We defined the full infrastructure in deployments/docker-compose.yml, which includes:

  • Agent: Our Go application (with the Rust engine statically linked or running alongside it)
  • ClickHouse: For streaming ingestion of raw tick data
  • Postgres: For transactional storage of alerts and LLM responses
services:
  # --- Rust compute engine (gRPC :50051) ---
  compute-engine:
    build:
      context: .
      dockerfile: compute-engine/Dockerfile
    environment:
      LISTEN_ADDR: "0.0.0.0:50051"
      WINDOW_SIZE: "${WINDOW_SIZE:-1000}"
      RUST_LOG: "info"
    ports:
      - "50051:50051"
    restart: unless-stopped
    healthcheck:
      test: ["CMD-SHELL", "echo > /dev/tcp/127.0.0.1/50051 || exit 1"]
      interval: 5s
      timeout: 3s
      retries: 5

  # --- Go agent (orchestrator) ---
  agent:
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      COMPUTE_ENGINE_ADDR: "compute-engine:50051"
      OPENAI_API_KEY: "${OPENAI_API_KEY}"
      BINANCE_WSS_URL: "${BINANCE_WSS_URL:-wss://stream.binance.com:9443/ws}"
      POSTGRES_DSN: "postgres://market:market@postgres:5432/market_anomaly?sslmode=disable"
      CLICKHOUSE_DSN: "clickhouse://default:@clickhouse:9000/market_anomaly"
    depends_on:
      compute-engine:
        condition: service_healthy
      postgres:
        condition: service_healthy
      clickhouse:
        condition: service_healthy
    restart: unless-stopped

  # --- PostgreSQL (alerts, verdicts) ---
  postgres:
    image: postgres:17-alpine
    environment:
      POSTGRES_USER: market
      POSTGRES_PASSWORD: market
      POSTGRES_DB: market_anomaly
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./deployments/init_postgres.sql:/docker-entrypoint-initdb.d/init.sql:ro
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U market -d market_anomaly"]
      interval: 5s
      timeout: 3s
      retries: 5

  # --- ClickHouse (time-series tick data) ---
  clickhouse:
    image: clickhouse/clickhouse-server:24-alpine
    ports:
      - "8123:8123"
      - "9000:9000"
    volumes:
      - clickhouse_data:/var/lib/clickhouse
      - ./deployments/init_clickhouse.sql:/docker-entrypoint-initdb.d/init.sql:ro
    healthcheck:
      test: ["CMD-SHELL", "clickhouse-client --query 'SELECT 1'"]
      interval: 5s
      timeout: 3s
      retries: 5

volumes:
  postgres_data:
  clickhouse_data:
Enter fullscreen mode Exit fullscreen mode

To run the project, simply clone the repository, add your OpenAI/Claude API key to .env, and execute:

docker-compose -f deployments/docker-compose.yml up -d
Enter fullscreen mode Exit fullscreen mode

Once the containers turn "green," the system comes to life:

  1. The Go agent establishes a WebSocket connection to Binance (or Polymarket)
  2. The data stream begins. The Rust detector processes ticks continuously, updating rolling windows without GC pauses
  3. In the background, ClickHouse ingests raw historical data in batches for future backtesting
  4. Suddenly, a spike occurs. A whale dumps a large volume into the market. Rust detects a deviation of |Z| > 3 and emits a signal
  5. A Go goroutine captures the alert, wraps it into a prompt, and sends it to the LLM API
  6. Within 1–2 seconds, a meaningful verdict is returned and stored in Postgres
[INFO] 14:21:10 connected to wss://stream.binance.com:9443/ws/ethusdt@trade
[INFO] 14:21:10 starting rust mathematical engine (window_size=10000)
...
[INFO] 14:25:01 tick received: symbol=ETH/USDT price=3124.55 vol=1.2
[INFO] 14:25:01 tick received: symbol=ETH/USDT price=3124.50 vol=4.5
...
[WARN] 14:32:14 ANOMALY DETECTED: symbol=ETH/USDT trigger_price=3180.00 z_score=4.12
[INFO] 14:32:14 generating prompt and calling LLM (GPT-4o)...
[INFO] 14:32:16 LLM VERDICT [SQUEEZE]: "A sharp 1.7% price increase within 30 seconds is not supported by news flow. Order book and volume analysis indicate a cascade of short liquidations (short squeeze) above the $3150 resistance level. Mean reversion is expected."
[INFO] 14:32:16 alert ID 4092 saved to Postgres with status=RESOLVED
Enter fullscreen mode Exit fullscreen mode

This is not just a "ChatGPT toy," but a robust infrastructure pattern. The LLM does not trade blindly — it acts as an intelligent filter:

  • Smart alerting: No more waking up to false bot alerts caused by stop-loss cascades
  • Trading signal: Structured JSON outputs from Postgres can be fed directly into your execution bot to decide whether to enter against the crowd or skip
  • Analytical tool: With ClickHouse, you can later analyze when the LLM was correct and when it hallucinated, and refine prompts accordingly

Mathematics detects the anomaly. AI explains it. The engineer captures the profit.

Conclusion

Let's be honest. Language models cannot trade. They do not replace rigorous mathematics, Z-score, statistical arbitrage, or years of careful backtesting. If you try to feed an LLM a continuous stream of market data and have it directly manage a portfolio, you will simply lose money quickly.

However, when paired with a strict mathematical core, an LLM becomes an ideal "co-pilot." It provides what classical quantitative algorithms lack — the ability to understand context.

Our hybrid pipeline solves a concrete pain point. Mathematics detects the anomaly, while AI explains its nature. You no longer have to guess, staring at a chart, why a one-minute volume spike occurred. Was it a cascade of liquidations from overleveraged traders? A reaction to macroeconomic news? A large on-chain transfer or a smart contract exploit? The agent extracts this context in seconds and stores a ready-made verdict in the database.

There are no perfect systems in production, and ours has its limitations:

  • Hallucinations: LLMs sometimes infer causal relationships where none exist.
  • Latency: A network request to an external API (OpenAI/Claude) takes seconds. For HFT execution, that is an eternity.
  • Provider dependency: If the API goes down during high market volatility, you are left without context.

That is why the model must never be allowed to make trading decisions blindly. Its responsibility is noise filtering, log enrichment, and alerting. The final decision to execute an order must always remain with a deterministic algorithm.

The future of algorithmic trading lies neither in "pure AI" nor in "pure mathematics." It lies in hybrid architectures. Rust handles ruthless microsecond-level tick processing, Go ensures reliable routing and orchestration, and the LLM translates market chaos into human-understandable meaning.

There is no "trading magic" in such systems. It is simply pragmatic software engineering. Successful builds and green PnL!

All source code, SQL migrations, and Makefile for local development are available open-source:

🔗 GitHub: https://github.com/Zmey56/market-anomaly-llm

Top comments (0)