DEV Community

Stella Achar Oiro
Stella Achar Oiro

Posted on

1 1

How to Build Real-Time Analytics Systems with Go: A Complete Guide

Real-time analytics has become essential for businesses seeking to extract immediate insights from streaming data. In this guide, we'll walk through how to build a scalable, high-performance real-time analytics system using Go, with a special focus on AI applications.

By the end of this guide, you'll understand how to:

  • Set up efficient data streaming pipelines in Go
  • Implement key stream processing patterns
  • Connect your system to time-series databases
  • Apply memory-efficient algorithms for online learning
  • Optimize your system for maximum performance

Prerequisites

Before getting started, make sure you have:

  • Go 1.16+ installed on your system
  • Basic familiarity with Go programming
  • A development environment set up (VS Code, GoLand, or your preferred IDE)
  • Docker (optional, for running databases locally)

1. Setting Up Your Go Environment for Real-Time Streaming

1.1 Create Your Project Structure

Start by creating a new Go module:

mkdir go-streaming-analytics
cd go-streaming-analytics
go mod init github.com/yourusername/go-streaming-analytics
Enter fullscreen mode Exit fullscreen mode

1.2 Adding Essential Dependencies

go get -u github.com/influxdata/influxdb-client-go/v2
go get -u github.com/gorilla/websocket
go get -u github.com/prometheus/client_golang/prometheus
Enter fullscreen mode Exit fullscreen mode

1.3 Creating a Basic Stream Processor

Create a file named main.go with this skeleton:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // Setup signal handling for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

    // Start your streaming components here

    log.Println("Stream processor started. Press Ctrl+C to exit.")

    // Wait for termination signal
    <-signalChan
    log.Println("Shutdown signal received, initiating graceful shutdown...")

    // Add a timeout for shutdown
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer shutdownCancel()

    // Perform cleanup

    log.Println("Shutdown complete")
}
Enter fullscreen mode Exit fullscreen mode

2. Implementing Concurrent Data Streams with Go

Go's concurrency model makes it ideal for handling multiple data streams. Let's implement a basic streaming pipeline.

2.1 Defining Your Data Models

Create a file called models.go:

package main

import "time"

// DataPoint represents a single measurement from a data source
type DataPoint struct {
    Timestamp time.Time
    SourceID  string
    Value     float64
    Metadata  map[string]string
}

// ProcessedResult represents an analyzed data point
type ProcessedResult struct {
    OriginalPoint DataPoint
    AnomalyScore  float64
    Prediction    float64
    ProcessedAt   time.Time
}
Enter fullscreen mode Exit fullscreen mode

2.2 Building a Basic Pipeline

Now let's implement a simple processing pipeline in pipeline.go:

package main

import (
    "context"
    "log"
    "sync"
    "time"
)

// Pipeline stage function types
type SourceFunc func(ctx context.Context, out chan<- DataPoint)
type ProcessFunc func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult)
type SinkFunc func(ctx context.Context, in <-chan ProcessedResult)

// Pipeline represents a complete data processing pipeline
type Pipeline struct {
    source      SourceFunc
    processors  []ProcessFunc
    sink        SinkFunc
    bufferSize  int
    concurrency int
}

// NewPipeline creates a new processing pipeline
func NewPipeline(source SourceFunc, sink SinkFunc, bufferSize, concurrency int) *Pipeline {
    return &Pipeline{
        source:      source,
        sink:        sink,
        bufferSize:  bufferSize,
        concurrency: concurrency,
        processors:  make([]ProcessFunc, 0),
    }
}

// AddProcessor adds a processing stage to the pipeline
func (p *Pipeline) AddProcessor(processor ProcessFunc) {
    p.processors = append(p.processors, processor)
}

// Run starts the pipeline and blocks until the context is canceled
func (p *Pipeline) Run(ctx context.Context) error {
    // Create channels
    sourceChannel := make(chan DataPoint, p.bufferSize)

    // Create intermediate channels for each processor
    channels := make([]chan ProcessedResult, len(p.processors)+1)
    for i := range channels {
        channels[i] = make(chan ProcessedResult, p.bufferSize)
    }

    // WaitGroup to track all goroutines
    var wg sync.WaitGroup

    // Start source
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(sourceChannel)
        p.source(ctx, sourceChannel)
    }()

    // Start processors (with fan-out for each stage)
    for i, processor := range p.processors {
        // Input is either source channel (for first processor) or previous processor's output
        var input chan DataPoint
        if i == 0 {
            input = sourceChannel
        } else {
            // We'd need to convert ProcessedResult back to DataPoint in a real implementation
            // This is simplified for the example
        }

        output := channels[i]

        // Fan out each processor to multiple goroutines
        for j := 0; j < p.concurrency; j++ {
            wg.Add(1)
            go func(proc ProcessFunc, in <-chan DataPoint, out chan<- ProcessedResult) {
                defer wg.Done()
                proc(ctx, in, out)
            }(processor, input, output)
        }
    }

    // Start sink
    wg.Add(1)
    go func() {
        defer wg.Done()
        p.sink(ctx, channels[len(channels)-1])
    }()

    // Wait for completion or cancellation
    go func() {
        wg.Wait()
        // Close the final channel when all processing is done
        close(channels[len(channels)-1])
    }()

    // Wait for context cancellation
    <-ctx.Done()
    log.Println("Pipeline shutdown initiated")

    return ctx.Err()
}
Enter fullscreen mode Exit fullscreen mode

2.3 Creating a Sample Data Source

Let's implement a simple data generator in source.go:

package main

import (
    "context"
    "math/rand"
    "time"
)

// RandomDataSource generates random data points at a specified interval
func RandomDataSource(interval time.Duration, sourceID string) SourceFunc {
    return func(ctx context.Context, out chan<- DataPoint) {
        ticker := time.NewTicker(interval)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case t := <-ticker.C:
                out <- DataPoint{
                    Timestamp: t,
                    SourceID:  sourceID,
                    Value:     rand.Float64() * 100.0, // Random value between 0-100
                    Metadata: map[string]string{
                        "type": "random",
                        "unit": "percentage",
                    },
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.4 Implementing a Processor

Now, let's create a simple anomaly detector in processor.go:

package main

import (
    "context"
    "math"
    "time"
)

// SimpleAnomalyDetector detects values that deviate significantly from recent history
func SimpleAnomalyDetector(windowSize int, threshold float64) ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        // Store recent values in a circular buffer
        buffer := make([]float64, windowSize)
        bufferIndex := 0
        bufferFilled := 0

        for {
            select {
            case <-ctx.Done():
                return
            case dataPoint, ok := <-in:
                if !ok {
                    // Channel closed
                    return
                }

                // Add value to buffer
                buffer[bufferIndex] = dataPoint.Value
                bufferIndex = (bufferIndex + 1) % windowSize
                if bufferFilled < windowSize {
                    bufferFilled++
                }

                // Calculate average and standard deviation
                var sum, sumSquared float64
                for i := 0; i < bufferFilled; i++ {
                    sum += buffer[i]
                    sumSquared += buffer[i] * buffer[i]
                }

                mean := sum / float64(bufferFilled)
                variance := (sumSquared / float64(bufferFilled)) - (mean * mean)
                stdDev := math.Sqrt(variance)

                // Calculate z-score (how many standard deviations from mean)
                zScore := 0.0
                if stdDev > 0 {
                    zScore = math.Abs(dataPoint.Value - mean) / stdDev
                }

                // Output result
                out <- ProcessedResult{
                    OriginalPoint: dataPoint,
                    AnomalyScore:  zScore,
                    Prediction:    mean, // Simple prediction is just the mean
                    ProcessedAt:   time.Now(),
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.5 Creating a Data Sink

Finally, let's implement a simple sink that logs the results in sink.go:

package main

import (
    "context"
    "log"
)

// LoggingSink logs processed results to stdout
func LoggingSink() SinkFunc {
    return func(ctx context.Context, in <-chan ProcessedResult) {
        for {
            select {
            case <-ctx.Done():
                return
            case result, ok := <-in:
                if !ok {
                    // Channel closed
                    return
                }

                // Log results
                if result.AnomalyScore > 2.0 {
                    log.Printf("ANOMALY DETECTED: Source: %s, Value: %.2f, Score: %.2f",
                        result.OriginalPoint.SourceID, result.OriginalPoint.Value, result.AnomalyScore)
                } else {
                    log.Printf("Data: Source: %s, Value: %.2f, Prediction: %.2f",
                        result.OriginalPoint.SourceID, result.OriginalPoint.Value, result.Prediction)
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.6 Putting It All Together

Now, update main.go to use our pipeline components:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // Setup signal handling for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

    // Create the pipeline
    pipeline := NewPipeline(
        RandomDataSource(100*time.Millisecond, "sensor-1"),
        LoggingSink(),
        100,  // Buffer size
        3,    // Concurrency level
    )

    // Add processors
    pipeline.AddProcessor(SimpleAnomalyDetector(50, 2.0))

    // Run the pipeline in a goroutine
    go func() {
        if err := pipeline.Run(ctx); err != nil && err != context.Canceled {
            log.Printf("Pipeline error: %v", err)
        }
    }()

    log.Println("Stream processor started. Press Ctrl+C to exit.")

    // Wait for termination signal
    <-signalChan
    log.Println("Shutdown signal received, initiating graceful shutdown...")

    // Cancel the context to signal all components to shut down
    cancel()

    // Give components time to shut down gracefully
    time.Sleep(2 * time.Second)

    log.Println("Shutdown complete")
}
Enter fullscreen mode Exit fullscreen mode

3. Implementing Key Stream Processing Patterns

3.1 Window Operations for Time-Based Analysis

Create a file called window.go to implement time-based windowing:

package main

import (
    "context"
    "time"
)

// Window represents a time window of data points
type Window struct {
    StartTime time.Time
    EndTime   time.Time
    Points    []DataPoint
}

// WindowedProcessor buffers data into time windows and processes them as a batch
func WindowedProcessor(windowDuration time.Duration, processFn func(Window) []ProcessedResult) ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        var currentWindow Window
        currentWindow.StartTime = time.Now()
        currentWindow.EndTime = currentWindow.StartTime.Add(windowDuration)

        ticker := time.NewTicker(windowDuration)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                // Process any remaining points before shutting down
                results := processFn(currentWindow)
                for _, result := range results {
                    out <- result
                }
                return

            case <-ticker.C:
                // Window time is up, process points and create new window
                results := processFn(currentWindow)
                for _, result := range results {
                    out <- result
                }

                now := time.Now()
                currentWindow = Window{
                    StartTime: now,
                    EndTime:   now.Add(windowDuration),
                    Points:    make([]DataPoint, 0),
                }

            case point, ok := <-in:
                if !ok {
                    return
                }

                // Add point to current window
                currentWindow.Points = append(currentWindow.Points, point)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

3.2 Implementing the Fan-Out/Fan-In Pattern

Create a file called fanout.go:

package main

import (
    "context"
    "sync"
)

// FanOutFanIn distributes work across multiple workers and combines results
func FanOutFanIn(workerFunc func(DataPoint) ProcessedResult, workerCount int) ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        // Create a WaitGroup to track workers
        var wg sync.WaitGroup

        // Create a channel for collecting results from workers
        results := make(chan ProcessedResult, workerCount)

        // Start the result collector
        go func() {
            for result := range results {
                select {
                case <-ctx.Done():
                    return
                case out <- result:
                    // Result sent downstream
                }
            }
        }()

        // Start workers
        for i := 0; i < workerCount; i++ {
            wg.Add(1)
            go func(workerID int) {
                defer wg.Done()

                for {
                    select {
                    case <-ctx.Done():
                        return
                    case data, ok := <-in:
                        if !ok {
                            return
                        }

                        // Process the data and send result
                        result := workerFunc(data)

                        select {
                        case <-ctx.Done():
                            return
                        case results <- result:
                            // Result sent to collector
                        }
                    }
                }
            }(i)
        }

        // Wait for all workers to complete and close the results channel
        go func() {
            wg.Wait()
            close(results)
        }()
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Integrating with Time-Series Databases

4.1 Setting Up InfluxDB

First, let's create a Docker Compose file for InfluxDB:

# docker-compose.yml
version: '3'
services:
  influxdb:
    image: influxdb:2.0
    ports:
      - "8086:8086"
    volumes:
      - influxdb-data:/var/lib/influxdb2
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=mypassword123
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=metrics
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=myadmintoken

volumes:
  influxdb-data:
Enter fullscreen mode Exit fullscreen mode

Start InfluxDB with:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

4.2 Creating an InfluxDB Sink

Now, let's create a sink that writes to InfluxDB in influxdb_sink.go:

package main

import (
    "context"
    "log"
    "time"

    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

// InfluxDBSink writes processed results to InfluxDB
func InfluxDBSink(serverURL, token, org, bucket string, batchSize int) SinkFunc {
    return func(ctx context.Context, in <-chan ProcessedResult) {
        // Create InfluxDB client
        client := influxdb2.NewClient(serverURL, token)
        defer client.Close()

        // Get non-blocking write API
        writeAPI := client.WriteAPI(org, bucket)

        // Listen for write errors
        errorsCh := writeAPI.Errors()
        go func() {
            for err := range errorsCh {
                log.Printf("InfluxDB write error: %s", err.Error())
            }
        }()

        // Process incoming results
        batch := make([]ProcessedResult, 0, batchSize)
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        flushBatch := func() {
            for _, result := range batch {
                // Create InfluxDB point
                point := influxdb2.NewPoint(
                    "sensor_data",
                    map[string]string{
                        "source_id": result.OriginalPoint.SourceID,
                    },
                    map[string]interface{}{
                        "value":         result.OriginalPoint.Value,
                        "anomaly_score": result.AnomalyScore,
                        "prediction":    result.Prediction,
                    },
                    result.OriginalPoint.Timestamp,
                )

                // Write asynchronously
                writeAPI.WritePoint(point)
            }

            // Clear the batch
            batch = batch[:0]
        }

        for {
            select {
            case <-ctx.Done():
                // Flush any remaining points
                if len(batch) > 0 {
                    flushBatch()
                }
                // Ensure pending writes are flushed
                writeAPI.Flush()
                return

            case result, ok := <-in:
                if !ok {
                    // Channel closed
                    if len(batch) > 0 {
                        flushBatch()
                    }
                    writeAPI.Flush()
                    return
                }

                // Add to batch
                batch = append(batch, result)

                // Flush if batch is full
                if len(batch) >= batchSize {
                    flushBatch()
                }

            case <-ticker.C:
                // Time-based flush
                if len(batch) > 0 {
                    flushBatch()
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

4.3 Querying Data from InfluxDB

Create a file called query.go with functions to retrieve analytics data:

package main

import (
    "context"
    "fmt"
    "time"

    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

// TimeSeriesData represents a time series of values
type TimeSeriesData struct {
    Times  []time.Time
    Values []float64
}

// QueryRecentData retrieves recent data from InfluxDB
func QueryRecentData(serverURL, token, org, bucket, measurement string, duration time.Duration) (*TimeSeriesData, error) {
    // Create client
    client := influxdb2.NewClient(serverURL, token)
    defer client.Close()

    // Get query client
    queryAPI := client.QueryAPI(org)

    // Create Flux query
    query := fmt.Sprintf(`
        from(bucket: "%s")
          |> range(start: -%s)
          |> filter(fn: (r) => r._measurement == "%s")
          |> filter(fn: (r) => r._field == "value")
    `, bucket, duration.String(), measurement)

    // Run query
    result, err := queryAPI.Query(context.Background(), query)
    if err != nil {
        return nil, fmt.Errorf("query error: %w", err)
    }
    defer result.Close()

    // Process results
    data := &TimeSeriesData{
        Times:  make([]time.Time, 0),
        Values: make([]float64, 0),
    }

    for result.Next() {
        data.Times = append(data.Times, result.Record().Time())
        value := result.Record().Value()
        if v, ok := value.(float64); ok {
            data.Values = append(data.Values, v)
        }
    }

    // Check for query errors
    if result.Err() != nil {
        return nil, fmt.Errorf("result error: %w", result.Err())
    }

    return data, nil
}
Enter fullscreen mode Exit fullscreen mode

5. Implementing Memory-Efficient Algorithms for Online Learning

5.1 Creating an Online Learning Model

Create a file called online_learning.go for incremental model updates:

package main

import (
    "math"
)

// OnlineStats keeps statistics that update incrementally
type OnlineStats struct {
    Count  int64
    Mean   float64
    M2     float64 // Sum of squared differences from the mean
    Min    float64
    Max    float64
}

// NewOnlineStats creates a new online statistics tracker
func NewOnlineStats() *OnlineStats {
    return &OnlineStats{
        Min: math.Inf(1),
        Max: math.Inf(-1),
    }
}

// Update adds a value and updates statistics
func (s *OnlineStats) Update(value float64) {
    // Update count
    s.Count++

    // Update min/max
    if value < s.Min {
        s.Min = value
    }
    if value > s.Max {
        s.Max = value
    }

    // For the first value, just set the mean
    if s.Count == 1 {
        s.Mean = value
        return
    }

    // Update mean and variance using Welford's algorithm
    delta := value - s.Mean
    s.Mean += delta / float64(s.Count)
    delta2 := value - s.Mean
    s.M2 += delta * delta2
}

// Variance returns the current variance
func (s *OnlineStats) Variance() float64 {
    if s.Count < 2 {
        return 0
    }
    return s.M2 / float64(s.Count)
}

// StdDev returns the current standard deviation
func (s *OnlineStats) StdDev() float64 {
    return math.Sqrt(s.Variance())
}

// ZScore calculates how many standard deviations a value is from the mean
func (s *OnlineStats) ZScore(value float64) float64 {
    stdDev := s.StdDev()
    if stdDev == 0 {
        return 0
    }
    return (value - s.Mean) / stdDev
}

// Normalize returns a value scaled to [0,1] range based on observed min/max
func (s *OnlineStats) Normalize(value float64) float64 {
    if s.Max == s.Min {
        return 0.5
    }
    return (value - s.Min) / (s.Max - s.Min)
}
Enter fullscreen mode Exit fullscreen mode

5.2 Creating an Online Learning Processor

Now, let's use our online learning model in a processor:

package main

import (
    "context"
    "math"
    "sync"
)

// OnlineLearningProcessor uses online statistics to detect anomalies
func OnlineLearningProcessor() ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        // Map to track stats for each source
        statsBySource := make(map[string]*OnlineStats)
        var mu sync.RWMutex

        for {
            select {
            case <-ctx.Done():
                return
            case point, ok := <-in:
                if !ok {
                    return
                }

                // Get or create stats tracker for this source
                mu.RLock()
                stats, exists := statsBySource[point.SourceID]
                mu.RUnlock()

                if !exists {
                    stats = NewOnlineStats()
                    mu.Lock()
                    statsBySource[point.SourceID] = stats
                    mu.Unlock()
                }

                // Calculate anomaly score before updating model
                zScore := stats.ZScore(point.Value)
                anomalyScore := math.Abs(zScore)

                // Update the model with the new data point
                stats.Update(point.Value)

                // Create result
                result := ProcessedResult{
                    OriginalPoint: point,
                    AnomalyScore:  anomalyScore,
                    Prediction:    stats.Mean,
                    ProcessedAt:   point.Timestamp,
                }

                out <- result
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

6. Optimizing Performance for Real-Time Analytics

6.1 Implementing a Memory Pool

Create a file called memory_pool.go:

package main

import "sync"

// DataBuffer represents a reusable buffer
type DataBuffer struct {
    Data []byte
}

// BufferPool manages a pool of reusable buffers
type BufferPool struct {
    pool      sync.Pool
    bufferSize int
}

// NewBufferPool creates a new buffer pool
func NewBufferPool(bufferSize int) *BufferPool {
    return &BufferPool{
        pool: sync.Pool{
            New: func() interface{} {
                return &DataBuffer{
                    Data: make([]byte, bufferSize),
                }
            },
        },
        bufferSize: bufferSize,
    }
}

// Get retrieves a buffer from the pool
func (p *BufferPool) Get() *DataBuffer {
    return p.pool.Get().(*DataBuffer)
}

// Put returns a buffer to the pool
func (p *BufferPool) Put(buffer *DataBuffer) {
    // Clear for security (optional)
    for i := range buffer.Data {
        buffer.Data[i] = 0
    }
    p.pool.Put(buffer)
}
Enter fullscreen mode Exit fullscreen mode

6.2 Implementing a Ring Buffer

Create a file called ring_buffer.go:

package main

import (
    "errors"
    "sync"
)

var (
    ErrBufferFull  = errors.New("ring buffer: full")
    ErrBufferEmpty = errors.New("ring buffer: empty")
)

// RingBuffer is a thread-safe circular buffer
type RingBuffer struct {
    buffer []interface{}
    size   int
    mu     sync.Mutex
    read   int // Read position
    write  int // Write position
    count  int // Number of elements in buffer
}

// NewRingBuffer creates a new ring buffer with the given size
func NewRingBuffer(size int) *RingBuffer {
    return &RingBuffer{
        buffer: make([]interface{}, size),
        size:   size,
    }
}

// Push adds an item to the buffer
func (rb *RingBuffer) Push(item interface{}) error {
    rb.mu.Lock()
    defer rb.mu.Unlock()

    if rb.count == rb.size {
        return ErrBufferFull
    }

    rb.buffer[rb.write] = item
    rb.write = (rb.write + 1) % rb.size
    rb.count++

    return nil
}

// Pop removes and returns an item from the buffer
func (rb *RingBuffer) Pop() (interface{}, error) {
    rb.mu.Lock()
    defer rb.mu.Unlock()

    if rb.count == 0 {
        return nil, ErrBufferEmpty
    }

    item := rb.buffer[rb.read]
    rb.buffer[rb.read] = nil // Help GC
    rb.read = (rb.read + 1) % rb.size
    rb.count--

    return item, nil
}

// Len returns the current number of items in the buffer
func (rb *RingBuffer) Len() int {
    rb.mu.Lock()
    defer rb.mu.Unlock()
    return rb.count
}

// Cap returns the capacity of the buffer
func (rb *RingBuffer) Cap() int {
    return rb.size
}
Enter fullscreen mode Exit fullscreen mode

6.3 Setting Up Monitoring

Create a file called monitoring.go:

package main

import (
    "net/http"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

// Metrics holds Prometheus metrics for the application
type Metrics struct {
    ProcessedCounter   *prometheus.CounterVec
    AnomalyCounter     *prometheus.CounterVec
    ProcessingDuration *prometheus.HistogramVec
    QueueSize          *prometheus.GaugeVec
}

// NewMetrics creates and registers metrics
func NewMetrics(registry *prometheus.Registry) *Metrics {
    m := &Metrics{
        ProcessedCounter: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "stream_processed_total",
                Help: "Total number of processed data points",
            },
            []string{"source_id"},
        ),
        AnomalyCounter: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "stream_anomalies_total",
                Help: "Total number of detected anomalies",
            },
            []string{"source_id"},
        ),
        ProcessingDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "stream_processing_duration_seconds",
                Help:    "Duration of processing operations",
                Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // From 1ms to ~1s
            },
            []string{"operation"},
        ),
        QueueSize: prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: "stream_queue_size",
                Help: "Current number of items in processing queues",
            },
            []string{"queue"},
        ),
    }

    registry.MustRegister(
        m.ProcessedCounter,
        m.AnomalyCounter,
        m.ProcessingDuration,
        m.QueueSize,
    )

    return m
}

// StartMetricsServer starts an HTTP server for Prometheus metrics
func StartMetricsServer(addr string, registry *prometheus.Registry) {
    http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
    go func() {
        http.ListenAndServe(addr, nil)
    }()
}
Enter fullscreen mode Exit fullscreen mode

7. Deploying Your System

7.1 Building the Application

Create a Dockerfile:

FROM golang:1.18-alpine AS builder

WORKDIR /app

# Copy go mod files
COPY go.mod go.sum ./
RUN go mod download

# Copy source code
COPY *.go ./

# Build the application
RUN CGO_ENABLED=0 GOOS=linux go build -o /go-streaming-analytics

# Create final minimal image
FROM alpine:latest

WORKDIR /app

COPY --from=builder /go-streaming-analytics /app/

CMD ["/app/go-streaming-analytics"]
Enter fullscreen mode Exit fullscreen mode

Build and run with Docker:

docker build -t go-streaming-analytics .
docker run -p 8080:8080 -p 9090:9090 go-streaming-analytics
Enter fullscreen mode Exit fullscreen mode

7.2 Configuration Management

Create a file called config.go:

package main

import (
    "encoding/json"
    "os"
    "time"
)

// Config holds application configuration
type Config struct {
    ServerPort      string        `json:"server_port"`
    MetricsPort     string        `json:"metrics_port"`
    BufferSize      int           `json:"buffer_size"`
    ConcurrencyLevel int          `json:"concurrency_level"`
    WindowDuration  time.Duration `json:"window_duration"`
    InfluxDB        struct {
        URL     string `json:"url"`
        Token   string `json:"token"`
        Org     string `json:"org"`
        Bucket  string `json:"bucket"`
    } `json:"influxdb"`
}

// LoadConfig loads configuration from a JSON file
func LoadConfig(path string) (*Config, error) {
    file, err := os.Open(path)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    config := &Config{}
    decoder := json.NewDecoder(file)
    if err := decoder.Decode(config); err != nil {
        return nil, err
    }

    return config, nil
}

// DefaultConfig returns a default configuration
func DefaultConfig() *Config {
    config := &Config{
        ServerPort:      "8080",
        MetricsPort:     "9090",
        BufferSize:      1000,
        ConcurrencyLevel: 4,
        WindowDuration:  10 * time.Second,
    }

    config.InfluxDB.URL = "http://localhost:8086"
    config.InfluxDB.Token = "myadmintoken"
    config.InfluxDB.Org = "myorg"
    config.InfluxDB.Bucket = "metrics"

    return config
}
Enter fullscreen mode Exit fullscreen mode

Conclusion and Next Steps

You've now built a complete real-time analytics pipeline in Go that can:

  • Process streaming data concurrently
  • Implement various stream processing patterns
  • Connect to time-series databases
  • Apply online learning algorithms
  • Optimize for performance

To extend this system, consider:

  1. Adding more sophisticated ML models
  2. Implementing stream joins for correlating multiple data sources
  3. Adding a real-time visualization layer
  4. Implementing fault tolerance with save points
  5. Scaling out to a distributed architecture

How Berrijam AI Leverages These Principles

While Berrijam AI uses a different technology stack, the principles demonstrated in this guide are similar to those that power their real-time analytics platform. Berrijam's system incorporates:

  • Efficient stream processing for immediate insights
  • Memory-optimized algorithms for online learning and prediction
  • Seamless integration with time-series data storage
  • Highly concurrent processing architecture

These capabilities allow Berrijam AI to provide instantaneous insights from streaming data, enabling businesses to make data-driven decisions based on the most current information available.

To learn more about how Berrijam AI can help your organization implement real-time analytics solutions without the complexity of building them yourself, visit Berrijam or contact their solutions team for a personalized demonstration.

Qodo Takeover

Introducing Qodo Gen 1.0: Transform Your Workflow with Agentic AI

While many AI coding tools operate as simple command-response systems, Qodo Gen 1.0 represents the next generation: autonomous, multi-step problem-solving agents that work alongside you.

Read full post

Top comments (0)

Qodo Takeover

Introducing Qodo Gen 1.0: Transform Your Workflow with Agentic AI

Rather than just generating snippets, our agents understand your entire project context, can make decisions, use tools, and carry out tasks autonomously.

Read full post

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay