DEV Community

Akshit Zatakia
Akshit Zatakia

Posted on

3

Dynamically Updating Streaming Rules in Golang – A Production-Ready Approach

Introduction

Kafka is a powerhouse for real-time data streaming, but what if your processing logic needs to change dynamically? Manually redeploying your service every time a rule changes isn’t practical.

Imagine a system where rules can be updated in real-time without stopping the consumer. In this blog, we'll explore how to consume Kafka messages, update processing rules dynamically, and handle high throughput efficiently in Golang.


📌 Problem Statement

We need a Kafka consumer that:

  • Reads messages from a rules topic (to update streaming rules dynamically).
  • Reads messages from a data topic (to process data based on the latest rules).
  • Applies arithmetic operations (sum, minus, multiply, divide) on selected JSON fields.
  • Supports high throughput using goroutines.

Example Use Case 🔥

Consider an e-commerce platform where:

  • The rules-topic updates the calculation method for order prices.
  • The test-topic contains order details (price, tax, discount, etc.).
  • The consumer processes orders in real-time based on the current rules.
  • The output is sent to another topic for further analytics.

⚙️ Implementation

1️⃣ Define the Rule Struct
We use a mutex to ensure thread-safe updates of the rule structure.

import (
    "sync"
)

type RuleStruct struct {
    mu        sync.RWMutex
    Operation string   // sum, minus, multiply, divide
    KeyNames  []string // Keys to process in test-topic messages
}
Enter fullscreen mode Exit fullscreen mode

2️⃣ Convert Values to Float64
Since Kafka messages store data as JSON, fields may be string, int, or float64. We normalize them to float64 before applying calculations.

var numbers []float64
    for _, key := range keys {
        if val, exists := data[key]; exists {
            if num, ok := val.(float64); ok {
                numbers = append(numbers, num)
            } else if numStr, ok := val.(string); ok {
                if parsedNum, err := strconv.ParseFloat(numStr, 64); err == nil {
                    numbers = append(numbers, parsedNum)
                }
            }
        }
    }
Enter fullscreen mode Exit fullscreen mode

3️⃣ Processing Kafka Messages Efficiently
Processing the messages with go routines.

func processMessages(messageChan chan []byte, producer sarama.SyncProducer) {
    for msg := range messageChan {
        var data map[string]interface{}
        if err := json.Unmarshal(msg, &data); err != nil {
            fmt.Println("Invalid JSON in test-topic:", err)
            continue
        }

        // Get the latest rule
        operation, keyNames := ruleConfig.GetRule()

        // Apply operation on specified keys
        result := applyOperation(data, operation, keyNames)

        // Publish result to test-destination-topic
        jsonResult, _ := json.Marshal(result)
        msg := &sarama.ProducerMessage{
            Topic: destinationTopic,
            Value: sarama.StringEncoder(jsonResult),
        }
        _, _, err := producer.SendMessage(msg)
        if err != nil {
            fmt.Println("Error sending message to destination topic:", err)
        } else {
            fmt.Println("Published processed data:", string(jsonResult))
        }
    }
}

// Function to apply the rule logic
func applyOperation(data map[string]interface{}, operation string, keys []string) map[string]interface{} {
    var numbers []float64
    for _, key := range keys {
        if val, exists := data[key]; exists {
            if num, ok := val.(float64); ok {
                numbers = append(numbers, num)
            } else if numStr, ok := val.(string); ok {
                if parsedNum, err := strconv.ParseFloat(numStr, 64); err == nil {
                    numbers = append(numbers, parsedNum)
                }
            }
        }
    }

    if len(numbers) == 0 {
        return map[string]interface{}{"error": "No valid numeric values found"}
    }

    var result float64
    switch operation {
    case "sum":
        for _, num := range numbers {
            result += num
        }
    case "minus":
        result = numbers[0]
        for i := 1; i < len(numbers); i++ {
            result -= numbers[i]
        }
    case "multiply":
        result = 1
        for _, num := range numbers {
            result *= num
        }
    case "divide":
        result = numbers[0]
        for i := 1; i < len(numbers); i++ {
            if numbers[i] == 0 {
                return map[string]interface{}{"error": "Division by zero"}
            }
            result /= numbers[i]
        }
    default:
        return map[string]interface{}{"error": "Unknown operation"}
    }

    return map[string]interface{}{
        "operation": operation,
        "keys":      keys,
        "result":    result,
    }
}
Enter fullscreen mode Exit fullscreen mode

4️⃣ Dynamic Rule Updates from Kafka
When a new message arrives in the rules-topic, we update our RuleStruct dynamically.

func (r *RuleStruct) SetRule(newOperation string, newKeys []string) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.Operation = newOperation
    r.KeyNames = newKeys
}

func (h *RuleTopicConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        var ruleData map[string]interface{}
        if err := json.Unmarshal(message.Value, &ruleData); err != nil {
            fmt.Println("Invalid JSON in rules-topic:", err)
            continue
        }

        // Extract rule operation and key names
        operation, opExists := ruleData["operation"].(string)
        keysInterface, keysExist := ruleData["keynames"].([]interface{})

        if !opExists || !keysExist {
            fmt.Println("Invalid rule format in rules-topic:", ruleData)
            continue
        }

        var keyNames []string
        for _, key := range keysInterface {
            if k, ok := key.(string); ok {
                keyNames = append(keyNames, k)
            }
        }

        // Update global rule struct dynamically
        ruleConfig.SetRule(operation, keyNames)
        fmt.Println("Updated Rule => Operation:", operation, "Keys:", keyNames)

        // Mark message as processed
        session.MarkMessage(message, "")
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

🔬 Performance Benchmarking – Sending 100 Messages per Second

We create a producer script to send 1000 messages per second for testing.

go func() {
        defer wg.Done()

        ticker := time.NewTicker(time.Second / messagesPerSec)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                // Generate batch of messages
                messages := make([]*sarama.ProducerMessage, batchSize)
                for i := 0; i < batchSize; i++ {
                    message := fmt.Sprintf(`{"order_id": "%d", "price": %.2f, "tax": %.2f, "quantity": %d, "cost_per_item": %.2f}`,
                        rand.Intn(10000), rand.Float64()*100, rand.Float64()*10, rand.Intn(10)+1, rand.Float64()*50)

                    messages[i] = &sarama.ProducerMessage{
                        Topic: topic,
                        Value: sarama.StringEncoder(message),
                    }
                }

                // Send batch messages asynchronously
                for _, msg := range messages {
                    producer.Input() <- msg
                }
            }
        }
    }()
Enter fullscreen mode Exit fullscreen mode

📌 Why This Approach is Production-Ready?

✅ Real-time Rule Updates (No restart required)
✅ Parallel Processing with Goroutines
✅ High Throughput
✅ Handles JSON Data Type Variations
✅ Kafka Producer-Consumer Optimized for Performance


🌟 Conclusion

Dynamically updating rules in Kafka consumers without service restarts is a game-changer for real-time applications. By using Golang, Kafka, and worker pools, we built a system that is fast, efficient, and scalable.

🔹 Check out the Full Code on GitHub: Dynamic Kafka Processing in Golang

Got questions or improvements? Drop a comment below! 🚀

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs