DEV Community

Cover image for Go, Kafka, gRPC and MongoDB microservice with metrics and tracing 👋
Alexander
Alexander

Posted on

Go, Kafka, gRPC and MongoDB microservice with metrics and tracing 👋

This article about tries to implement of clean architecture microservice using: 🚀
Kafka as messages broker
gRPC Go implementation of gRPC
MongoDB as database
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus

Source code you can find in GitHub repository

I want the make the accent on Kafka, because i try it for first time, so it's learning by doing 👨‍💻

Monitoring and tracing of course is required for any microservice so it's included ⚡️

Kafka is high throughtput distributed messaging system.
At any time only one broker can be a leader for a given partition and only that leader can receive serve data for a partition,
other brokers syncronize the data.

Producers:

Producers write data to topics and automatically know to which broker and partition to write to.

In case of broker failure, producers automatically will recover.

Producers can choose to receive acknowledgement of data writes:

  • acks = 0. Producer won't for acknowledgement, it's mean possible data loss.
  • acks = 1. Producer will wait for leader acknowledgement, possible limited data loss in some cases too.
  • acks = all. Leader + replicas acknowledgement, it requires acks from number of brokers specified in min.insync.replicas broker-side config key, and if less are currently in sync, the produce will fail. You may specify min.insync.replicas as low as 1 (acks==1 equivalent), or as high as your replication factor, or somewhere in between, so you can finely control the tradeoff b/w availability and consistency. Here is good article

Producers can choose to send a key with the message.

  • If no key data is sent by round robin.
  • If a key is sent, than all messages for that key will always go to the same partition.

Consumers:

Consumers read data from a topic and know which broker to read from.
Data are read in order within each partition.

Consumers Groups:

Consumers read data in consumer groups. Each consumer within a group reads from exclusive partitions.
So if you have more consumers than partitions, some consumers will be inactive.

Consumers offsets:

Kafka stores the offsets at which a consumer group has been reading.
When a consumer in a group has processed data, it should be committing the offsets.
If consumer down, it will be able to read back from where it left off.
Consumers choose when commit offsets.

There are 3 delivery cases:

At most once:

  • offsets are committed as soon as message received
  • if a process fails, the message will be lost and won't be read again.

At least once:

  • offsets are committed after the message is processed
  • if a process fails, the message will be read again.
  • this can result in duplicate processing of messages, so here is important to be sure your processing is idempotent and won't impact your system.

Exactly once:

  • can be achived for kakfa to kafka communication using kafka streams api.
  • for external systems usage we need use idempotent consumer.

Compression
And one more very important feature is Compression
Compression is enabled at the Producer level and doesn't require any configuration change in the Brokers or Consumers.
By default it is none, for the starting point good choice is snappy or lz4.

For local development:

make local // runs docker-compose.local.yml
make crate_topics // create kafka topics
make mongo // load js init script to mongo docker container
make make_cert // generate local SLL certificates
make swagger // generate swagger documentation
Enter fullscreen mode Exit fullscreen mode

UI interfaces will be available on ports:

Jaeger UI: http://localhost:16686

Prometheus UI: http://localhost:9090

Grafana UI: http://localhost:3000

Kafka UI: http://localhost:9000

Swagger UI by default will run on: https://localhost:5007/swagger/index.html




In Grafana you need to chose prometheus as metrics source and then create dashboard.

Good kafka docker setup with enclouded UI is confluent, but it's had huge images size and will download a half of world wide internet to your local pc.🤖 For this reason here as UI client i used kafdrop

Docker-compose.local.yml:

version: "3.8"

services:
  zookeeper:
    container_name: zookeeper
    restart: always
    image: zookeeper:3.4.9
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
    networks:
      - products_network


  kafka1:
    container_name: kafka1
    image: confluentinc/cp-kafka:5.3.0
    restart: always
    hostname: kafka1
    ports:
      - "9091:9091"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    networks:
      - products_network

  kafka2:
    container_name: kafka2
    restart: always
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka2
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 2
    volumes:
      - ./data/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    networks:
      - products_network

  kafka3:
    container_name: kafka3
    image: confluentinc/cp-kafka:5.3.0
    restart: always
    hostname: kafka3
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    networks:
      - products_network

  kafdrop:
    container_name: kafdrop
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka1:19091"
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    networks:
      - products_network

  redis:
    image: redis:6-alpine
    container_name: user_redis
    ports:
      - "6379:6379"
    restart: always
    networks:
      - products_network

  prometheus:
    container_name: prometheus_container
    restart: always
    image: prom/prometheus
    volumes:
      - ./monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--storage.tsdb.retention=20d'
      - '--web.console.libraries=/usr/share/prometheus/console_libraries'
      - '--web.console.templates=/usr/share/prometheus/consoles'
    ports:
      - '9090:9090'
    networks:
      - products_network


  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks:
      - products_network

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks:
      - products_network

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - 5775:5775/udp
      - 6831:6831/udp
      - 6832:6832/udp
      - 5778:5778
      - 16686:16686
      - 14268:14268
      - 14250:14250
      - 9411:9411
    networks:
      - products_network

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - 27017:27017
    volumes:
      - mongodb_data_container:/data/db

volumes:
  mongodb_data_container:

networks:
  products_network:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

For Go client in production usually used segmentio and sarama,
both is good and up to you which one to chose, for this project i used segmentio.
I didn't implement any interesting business logic here and didn't cover tests, because of not enough time at this moment.
Our microservice can communicate by kafka, gRPC and REST.

In Makefile you can find all helpful commands.
For create kafka topics in docker:

docker exec -it kafka1 kafka-topics --zookeeper zookeeper:2181 --create --topic create-product --partitions 3 --replication-factor 2
Enter fullscreen mode Exit fullscreen mode

For MongoDB we can load javascript files, this one creates collection and indexes:

mongo admin -u admin -p admin < init.js
Enter fullscreen mode Exit fullscreen mode

Segmentio library api gives us reader and writer.
Create reader first:

func (pcg *ProductsConsumerGroup) getNewKafkaReader(kafkaURL []string, topic, groupID string) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:                kafkaURL,
        GroupID:                groupID,
        Topic:                  topic,
        MinBytes:               minBytes,
        MaxBytes:               maxBytes,
        QueueCapacity:          queueCapacity,
        HeartbeatInterval:      heartbeatInterval,
        CommitInterval:         commitInterval,
        PartitionWatchInterval: partitionWatchInterval,
        Logger:                 kafka.LoggerFunc(pcg.log.Debugf),
        ErrorLogger:            kafka.LoggerFunc(pcg.log.Errorf),
        MaxAttempts:            maxAttempts,
        Dialer: &kafka.Dialer{
            Timeout: dialTimeout,
        },
    })
}
Enter fullscreen mode Exit fullscreen mode

and writer:

func (pcg *ProductsConsumerGroup) getNewKafkaWriter(topic string) *kafka.Writer {
    w := &kafka.Writer{
        Addr:         kafka.TCP(pcg.Brokers...),
        Topic:        topic,
        Balancer:     &kafka.LeastBytes{},
        RequiredAcks: writerRequiredAcks,
        MaxAttempts:  writerMaxAttempts,
        Logger:       kafka.LoggerFunc(pcg.log.Debugf),
        ErrorLogger:  kafka.LoggerFunc(pcg.log.Errorf),
        Compression:  compress.Snappy,
        ReadTimeout:  writerReadTimeout,
        WriteTimeout: writerWriteTimeout,
    }
    return w
}
Enter fullscreen mode Exit fullscreen mode

Then create consumers using Worker Pools

func (pcg *ProductsConsumerGroup) consumeCreateProduct(
    ctx context.Context,
    cancel context.CancelFunc,
    groupID string,
    topic string,
    workersNum int,
) {
    r := pcg.getNewKafkaReader(pcg.Brokers, topic, groupID)
    defer cancel()
    defer func() {
        if err := r.Close(); err != nil {
            pcg.log.Errorf("r.Close", err)
            cancel()
        }
    }()

    w := pcg.getNewKafkaWriter(deadLetterQueueTopic)
    defer func() {
        if err := w.Close(); err != nil {
            pcg.log.Errorf("w.Close", err)
            cancel()
        }
    }()

    pcg.log.Infof("Starting consumer group: %v", r.Config().GroupID)

    wg := &sync.WaitGroup{}
    for i := 0; i <= workersNum; i++ {
        wg.Add(1)
        go pcg.createProductWorker(ctx, cancel, r, w, wg, i)
    }
    wg.Wait()
}
Enter fullscreen mode Exit fullscreen mode

Workers validate message body then call usecase, if it's returns error, try for retry, good library for retry is retry-go,
if again fails, publish error message to very simple Dead Letter Queue as i said, didn't implement here any interesting business logic, so in real production we have to handle error cases in the better way.
And after message success processed commit it.

func (pcg *ProductsConsumerGroup) createProductWorker(
    ctx context.Context,
    cancel context.CancelFunc,
    r *kafka.Reader,
    w *kafka.Writer,
    wg *sync.WaitGroup,
    workerID int,
) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "ProductsConsumerGroup.createProductWorker")
    defer span.Finish()

    span.LogFields(log.String("ConsumerGroup", r.Config().GroupID))

    defer wg.Done()
    defer cancel()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            pcg.log.Errorf("FetchMessage", err)
            return
        }

        pcg.log.Infof(
            "WORKER: %v, message at topic/partition/offset %v/%v/%v: %s = %s\n",
            workerID,
            m.Topic,
            m.Partition,
            m.Offset,
            string(m.Key),
            string(m.Value),
        )
        incomingMessages.Inc()

        var prod models.Product
        if err := json.Unmarshal(m.Value, &prod); err != nil {
            errorMessages.Inc()
            pcg.log.Errorf("json.Unmarshal", err)
            continue
        }

        if err := pcg.validate.StructCtx(ctx, prod); err != nil {
            errorMessages.Inc()
            pcg.log.Errorf("validate.StructCtx", err)
            continue
        }

        if err := retry.Do(func() error {
            created, err := pcg.productsUC.Create(ctx, &prod)
            if err != nil {
                return err
            }
            pcg.log.Infof("created product: %v", created)
            return nil
        },
            retry.Attempts(retryAttempts),
            retry.Delay(retryDelay),
            retry.Context(ctx),
        ); err != nil {
            errorMessages.Inc()

            if err := pcg.publishErrorMessage(ctx, w, m, err); err != nil {
                pcg.log.Errorf("publishErrorMessage", err)
                continue
            }
            pcg.log.Errorf("productsUC.Create.publishErrorMessage", err)
            continue
        }

        if err := r.CommitMessages(ctx, m); err != nil {
            errorMessages.Inc()
            pcg.log.Errorf("CommitMessages", err)
            continue
        }

        successMessages.Inc()
    }
}
Enter fullscreen mode Exit fullscreen mode

In repository layer use mongo-go-driver for interreacting with database

// Create Create new product
func (p *productMongoRepo) Create(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productMongoRepo.Create")
    defer span.Finish()

    collection := p.mongoDB.Database(productsDB).Collection(productsCollection)

    product.CreatedAt = time.Now().UTC()
    product.UpdatedAt = time.Now().UTC()

    result, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
    if err != nil {
        return nil, errors.Wrap(err, "InsertOne")
    }

    objectID, ok := result.InsertedID.(primitive.ObjectID)
    if !ok {
        return nil, errors.Wrap(productErrors.ErrObjectIDTypeConversion, "result.InsertedID")
    }

    product.ProductID = objectID

    return product, nil
}
Enter fullscreen mode Exit fullscreen mode

Here is gRPC service implementation os create handler, and full code you can find in github repository:

// Create create new product
func (p *productService) Create(ctx context.Context, req *productsService.CreateReq) (*productsService.CreateRes, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
    defer span.Finish()
    createMessages.Inc()

    catID, err := primitive.ObjectIDFromHex(req.GetCategoryID())
    if err != nil {
        errorMessages.Inc()
        p.log.Errorf("primitive.ObjectIDFromHex: %v", err)
        return nil, grpcErrors.ErrorResponse(err, err.Error())
    }

    prod := &models.Product{
        CategoryID:  catID,
        Name:        req.GetName(),
        Description: req.GetDescription(),
        Price:       req.GetPrice(),
        ImageURL:    &req.ImageURL,
        Photos:      req.GetPhotos(),
        Quantity:    req.GetQuantity(),
        Rating:      int(req.GetRating()),
    }

    created, err := p.productUC.Create(ctx, prod)
    if err != nil {
        errorMessages.Inc()
        p.log.Errorf("productUC.Create: %v", err)
        return nil, grpcErrors.ErrorResponse(err, err.Error())
    }

    successMessages.Inc()
    return &productsService.CreateRes{Product: created.ToProto()}, nil
}
Enter fullscreen mode Exit fullscreen mode

and REST API handler using echo:

// CreateProduct Create product
// @Tags Products
// @Summary Create new product
// @Description Create new single product
// @Accept json
// @Produce json
// @Success 201 {object} models.Product
// @Router /products [post]
func (p *productHandlers) CreateProduct() echo.HandlerFunc {
    return func(c echo.Context) error {
        span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "productHandlers.Create")
        defer span.Finish()
        createRequests.Inc()

        var prod models.Product
        if err := c.Bind(&prod); err != nil {
            p.log.Errorf("c.Bind: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        if err := p.validate.StructCtx(ctx, &prod); err != nil {
            p.log.Errorf("validate.StructCtx: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        if err := p.productUC.PublishCreate(ctx, &prod); err != nil {
            p.log.Errorf("productUC.PublishCreate: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        successRequests.Inc()
        return c.NoContent(http.StatusCreated)
    }
}
Enter fullscreen mode Exit fullscreen mode

On top layer of our app handling, logging errors and process metrics for Prometheus.
Repository with the source code and list of all used tools u can find here 👨‍💻 :)
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions :)

Top comments (2)

Collapse
 
trueneu profile image
Pavel Gurkov

I'm sorry I'm nitpicking, but there's no "acks==2" mode. It's "all" or "-1", and it's a bit more complex, as opposed to just leader + replica acknowledgments: it requires acks from number of brokers specified in min.insync.replicas broker-side config key, and if less are currently in sync, the produce will fail.
You may specify min.insync.replicas as low as 1 (acks==1 equivalent), or as high as your replication factor, or somewhere in between, so you can finely control the tradeoff b/w availability and consistency.

Collapse
 
aleksk1ng profile image
Alexander

Big thanks, i have edit and add right as you said 👍🙂