Introduction
Kafka is a powerhouse for real-time data streaming, but what if your processing logic needs to change dynamically? Manually redeploying your service every time a rule changes isn’t practical.
Imagine a system where rules can be updated in real-time without stopping the consumer. In this blog, we'll explore how to consume Kafka messages, update processing rules dynamically, and handle high throughput efficiently in Golang.
📌 Problem Statement
We need a Kafka consumer that:
- Reads messages from a rules topic (to update streaming rules dynamically).
- Reads messages from a data topic (to process data based on the latest rules).
- Applies arithmetic operations (sum, minus, multiply, divide) on selected JSON fields.
- Supports high throughput using goroutines.
Example Use Case 🔥
Consider an e-commerce platform where:
- The rules-topic updates the calculation method for order prices.
- The test-topic contains order details (price, tax, discount, etc.).
- The consumer processes orders in real-time based on the current rules.
- The output is sent to another topic for further analytics.
⚙️ Implementation
1️⃣ Define the Rule Struct
We use a mutex to ensure thread-safe updates of the rule structure.
import (
"sync"
)
type RuleStruct struct {
mu sync.RWMutex
Operation string // sum, minus, multiply, divide
KeyNames []string // Keys to process in test-topic messages
}
2️⃣ Convert Values to Float64
Since Kafka messages store data as JSON, fields may be string
, int
, or float64
. We normalize them to float64
before applying calculations.
var numbers []float64
for _, key := range keys {
if val, exists := data[key]; exists {
if num, ok := val.(float64); ok {
numbers = append(numbers, num)
} else if numStr, ok := val.(string); ok {
if parsedNum, err := strconv.ParseFloat(numStr, 64); err == nil {
numbers = append(numbers, parsedNum)
}
}
}
}
3️⃣ Processing Kafka Messages Efficiently
Processing the messages with go routines.
func processMessages(messageChan chan []byte, producer sarama.SyncProducer) {
for msg := range messageChan {
var data map[string]interface{}
if err := json.Unmarshal(msg, &data); err != nil {
fmt.Println("Invalid JSON in test-topic:", err)
continue
}
// Get the latest rule
operation, keyNames := ruleConfig.GetRule()
// Apply operation on specified keys
result := applyOperation(data, operation, keyNames)
// Publish result to test-destination-topic
jsonResult, _ := json.Marshal(result)
msg := &sarama.ProducerMessage{
Topic: destinationTopic,
Value: sarama.StringEncoder(jsonResult),
}
_, _, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Error sending message to destination topic:", err)
} else {
fmt.Println("Published processed data:", string(jsonResult))
}
}
}
// Function to apply the rule logic
func applyOperation(data map[string]interface{}, operation string, keys []string) map[string]interface{} {
var numbers []float64
for _, key := range keys {
if val, exists := data[key]; exists {
if num, ok := val.(float64); ok {
numbers = append(numbers, num)
} else if numStr, ok := val.(string); ok {
if parsedNum, err := strconv.ParseFloat(numStr, 64); err == nil {
numbers = append(numbers, parsedNum)
}
}
}
}
if len(numbers) == 0 {
return map[string]interface{}{"error": "No valid numeric values found"}
}
var result float64
switch operation {
case "sum":
for _, num := range numbers {
result += num
}
case "minus":
result = numbers[0]
for i := 1; i < len(numbers); i++ {
result -= numbers[i]
}
case "multiply":
result = 1
for _, num := range numbers {
result *= num
}
case "divide":
result = numbers[0]
for i := 1; i < len(numbers); i++ {
if numbers[i] == 0 {
return map[string]interface{}{"error": "Division by zero"}
}
result /= numbers[i]
}
default:
return map[string]interface{}{"error": "Unknown operation"}
}
return map[string]interface{}{
"operation": operation,
"keys": keys,
"result": result,
}
}
4️⃣ Dynamic Rule Updates from Kafka
When a new message arrives in the rules-topic, we update our RuleStruct
dynamically.
func (r *RuleStruct) SetRule(newOperation string, newKeys []string) {
r.mu.Lock()
defer r.mu.Unlock()
r.Operation = newOperation
r.KeyNames = newKeys
}
func (h *RuleTopicConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
var ruleData map[string]interface{}
if err := json.Unmarshal(message.Value, &ruleData); err != nil {
fmt.Println("Invalid JSON in rules-topic:", err)
continue
}
// Extract rule operation and key names
operation, opExists := ruleData["operation"].(string)
keysInterface, keysExist := ruleData["keynames"].([]interface{})
if !opExists || !keysExist {
fmt.Println("Invalid rule format in rules-topic:", ruleData)
continue
}
var keyNames []string
for _, key := range keysInterface {
if k, ok := key.(string); ok {
keyNames = append(keyNames, k)
}
}
// Update global rule struct dynamically
ruleConfig.SetRule(operation, keyNames)
fmt.Println("Updated Rule => Operation:", operation, "Keys:", keyNames)
// Mark message as processed
session.MarkMessage(message, "")
}
return nil
}
🔬 Performance Benchmarking – Sending 100 Messages per Second
We create a producer script to send 1000 messages per second for testing.
go func() {
defer wg.Done()
ticker := time.NewTicker(time.Second / messagesPerSec)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Generate batch of messages
messages := make([]*sarama.ProducerMessage, batchSize)
for i := 0; i < batchSize; i++ {
message := fmt.Sprintf(`{"order_id": "%d", "price": %.2f, "tax": %.2f, "quantity": %d, "cost_per_item": %.2f}`,
rand.Intn(10000), rand.Float64()*100, rand.Float64()*10, rand.Intn(10)+1, rand.Float64()*50)
messages[i] = &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
}
}
// Send batch messages asynchronously
for _, msg := range messages {
producer.Input() <- msg
}
}
}
}()
📌 Why This Approach is Production-Ready?
✅ Real-time Rule Updates (No restart required)
✅ Parallel Processing with Goroutines
✅ High Throughput
✅ Handles JSON Data Type Variations
✅ Kafka Producer-Consumer Optimized for Performance
🌟 Conclusion
Dynamically updating rules in Kafka consumers without service restarts is a game-changer for real-time applications. By using Golang, Kafka, and worker pools, we built a system that is fast, efficient, and scalable.
🔹 Check out the Full Code on GitHub: Dynamic Kafka Processing in Golang
Got questions or improvements? Drop a comment below! 🚀
Top comments (0)