DEV Community

Cover image for Go, Kafka and gRPC clean architecture CQRS microservices with Jaeger tracing πŸ‘‹πŸ§‘β€πŸ’»
Alexander
Alexander

Posted on

Go, Kafka and gRPC clean architecture CQRS microservices with Jaeger tracing πŸ‘‹πŸ§‘β€πŸ’»

In this article let's try to create closer to real world CQRS microservices with tracing and monitoring using: πŸš€
Kafka as messages broker
gRPC Go implementation of gRPC
PostgreSQL as database
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
MongoDB Web and API based SMTP testing
Redis Type-safe Redis client for Golang
swag Swagger for Go
Echo web framework

Source code you can find in GitHub repository

Main idea here is implementation of CQRS using Go, Kafka and gRPC.
I don't try to write about what is CQRS pattern, because it's makes the article to huge and the best place to read is microservices.io.
Found very interesting and take as starting point example CQRS project and blog of Three Dots Labs.

In this example we have three services, api gateway, read and write services which communicates by kafka and gRPC,
for write database used Postgres, MongoDB for read and Redis for caching.
Like any real-world project of course we need metrics and tracing, here used Prometheus and Grafana for metrics, and Jaeger for tracing.
In this example did not implemented any interesting business logic and didn't cover tests, because don't have time.

UI interfaces will be available on ports:

Jaeger UI: http://localhost:16686

Prometheus UI: http://localhost:9090

Grafana UI: http://localhost:3000

Swagger UI: http://localhost:5001/swagger/index.html

Jaeger tracing ui:

Swagger ui:

Prometheus metrics ui:

Grafana metrics ui:

For local development:

make local or docker_dev // for run docker compose files
make migrate_up // run sql migrations
make mongo // run mongodb sripts
make swagger // generate swagger documentation
Enter fullscreen mode Exit fullscreen mode

For run all in the docker you can run make docker_dev it has hot reloading feature.

Docker compose file for this project:

version: "3.8"

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  microservices_postgesql:
    image: postgres:13-alpine
    container_name: microservices_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=products
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  redis:
    image: redis:6-alpine
    restart: always
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks: [ "microservices" ]

  zoo1:
    image: zookeeper:3.4.9
    restart: always
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog
    networks: [ "microservices" ]

  kafka1:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka1
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:

networks:
  microservices:
    name: microservices
Enter fullscreen mode Exit fullscreen mode

Api gateway service idea is to accept http requests, commands handlers publish events to kafka and queries handlers for retrieving data from reader service by gRPC.
In this project used Echo, but found Gin and Chi is very good for production too.

Let's look at code, create product http handler, accept and validate request body, then generate uuid and call create product command.
Many holly wars about passing id to command or service methods, as separate parameter or in body, or generate id in the command and return it,
can commands return values or not and etc., it's does not make sense, it's up to you and your team which way to use, better spent time and concentrate on more important business tasks :)

// CreateProduct
// @Tags Products
// @Summary Create product
// @Description Create new product item
// @Accept json
// @Produce json
// @Success 201 {object} dto.CreateProductResponseDto
// @Router /products [post]
func (h *productsHandlers) CreateProduct() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.CreateProductHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.CreateProduct")
        defer span.Finish()

        createDto := &dto.CreateProductDto{}
        if err := c.Bind(createDto); err != nil {
            h.log.WarnMsg("Bind", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        createDto.ProductID = uuid.NewV4()
        if err := h.v.StructCtx(ctx, createDto); err != nil {
            h.log.WarnMsg("validate", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        if err := h.ps.Commands.CreateProduct.Handle(ctx, commands.NewCreateProductCommand(createDto)); err != nil {
            h.log.WarnMsg("CreateProduct", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusCreated, dto.CreateProductResponseDto{ProductID: createDto.ProductID})
    }
}
Enter fullscreen mode Exit fullscreen mode

Create product command handler is simple it's marshal command data and publish to kafka.
Kafka accepts []byte as value, used proto here, for pass tracing throughout kafka we have to use headers,
in tracing utils you can find helpers for it.
Go having some good libraries for working with kafka, I like segmentio_kafka-go.

func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    createDto := &kafkaMessages.ProductCreate{
        ProductID:   command.CreateDto.ProductID.String(),
        Name:        command.CreateDto.Name,
        Description: command.CreateDto.Description,
        Price:       command.CreateDto.Price,
    }

    dtoBytes, err := proto.Marshal(createDto)
    if err != nil {
        return err
    }

    return c.kafkaProducer.PublishMessage(ctx, kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreate.TopicName,
        Value:   dtoBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    })
}
Enter fullscreen mode Exit fullscreen mode

For working with kafka nice to have ui clients for debugging, personally like to use conductor

Writer service consumes kafka topics, process messages writing to postgres and publishes successfully processed messages to kafka.
For working with postgres in Go in my opinion the best choose is pgx, but if you need query builder very good library is squirrel,
personally don't like orm's, but usually as have seen, teams often uses gorm, it's up to you.
ProcessMessages method listening kafka topics and call specific method depends on topic:

func (s *productMessageProcessor) ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) {
    defer wg.Done()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        m, err := r.FetchMessage(ctx)
        if err != nil {
            s.log.Warnf("workerID: %v, err: %v", workerID, err)
            continue
        }

        s.logProcessMessage(m, workerID)

        switch m.Topic {
        case s.cfg.KafkaTopics.ProductCreate.TopicName:
            s.processCreateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductUpdate.TopicName:
            s.processUpdateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductDelete.TopicName:
            s.processDeleteProduct(ctx, r, m)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Kafka message processing method deserialize and validate message body, pass it's to commands and commit,
in this place we have to chose way how we handle errors, but it depends on business logic, as example
we can use dead letter queue pattern.

func (s *productMessageProcessor) processCreateProduct(ctx context.Context, r *kafka.Reader, m kafka.Message) {
    s.metrics.CreateProductKafkaMessages.Inc()

    ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "productMessageProcessor.processCreateProduct")
    defer span.Finish()

    var msg kafkaMessages.ProductCreate
    if err := proto.Unmarshal(m.Value, &msg); err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    proUUID, err := uuid.FromString(msg.GetProductID())
    if err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    command := commands.NewCreateProductCommand(proUUID, msg.GetName(), msg.GetDescription(), msg.GetPrice())
    if err := s.v.StructCtx(ctx, command); err != nil {
        s.log.WarnMsg("validate", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    if err := retry.Do(func() error {
        return s.ps.Commands.CreateProduct.Handle(ctx, command)
    }, append(retryOptions, retry.Context(ctx))...); err != nil {
        s.log.WarnMsg("CreateProduct.Handle", err)
        s.metrics.ErrorKafkaMessages.Inc()
        return
    }

    s.commitMessage(ctx, r, m)
}
Enter fullscreen mode Exit fullscreen mode

Writer's service create product command saves data to postgres and publish product saved event to kafka:

func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    productDto := &models.Product{ProductID: command.ProductID, Name: command.Name, Description: command.Description, Price: command.Price}

    product, err := c.pgRepo.CreateProduct(ctx, productDto)
    if err != nil {
        return err
    }

    msg := &kafkaMessages.ProductCreated{Product: mappers.ProductToGrpcMessage(product)}
    msgBytes, err := proto.Marshal(msg)
    if err != nil {
        return err
    }

    message := kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreated.TopicName,
        Value:   msgBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    }

    return c.kafkaProducer.PublishMessage(ctx, message)
}
Enter fullscreen mode Exit fullscreen mode

Postgres repository uses pgx, code is simple:

func (p *productRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productRepository.CreateProduct")
    defer span.Finish()

    var created models.Product
    if err := p.db.QueryRow(ctx, createProductQuery, &product.ProductID, &product.Name, &product.Description, &product.Price).Scan(
        &created.ProductID,
        &created.Name,
        &created.Description,
        &created.Price,
        &created.CreatedAt,
        &created.UpdatedAt,
    ); err != nil {
        return nil, errors.Wrap(err, "db.QueryRow")
    }

    return &created, nil
}
Enter fullscreen mode Exit fullscreen mode

Reader service consumes kafka messages, save to MongoDB and caches by Redis, then project data for retrieving by gRPC calls.
Kafka listener handlers here are looks the same, product created command saves data to MongoDB and cache it in Redis:

func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    product := &models.Product{
        ProductID:   command.ProductID,
        Name:        command.Name,
        Description: command.Description,
        Price:       command.Price,
        CreatedAt:   command.CreatedAt,
        UpdatedAt:   command.UpdatedAt,
    }

    created, err := c.mongoRepo.CreateProduct(ctx, product)
    if err != nil {
        return err
    }

    c.redisRepo.PutProduct(ctx, created.ProductID, created)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

MongoDB repository save method:

func (p *mongoRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.CreateProduct")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    _, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "InsertOne")
    }

    return product, nil
}
Enter fullscreen mode Exit fullscreen mode

And redis caching method is:

func (r *redisRepository) PutProduct(ctx context.Context, key string, product *models.Product) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "redisRepository.PutProduct")
    defer span.Finish()

    productBytes, err := json.Marshal(product)
    if err != nil {
        r.log.WarnMsg("json.Marshal", err)
        return
    }

    if err := r.redisClient.HSetNX(ctx, r.getRedisProductPrefixKey(), key, productBytes).Err(); err != nil {
        r.log.WarnMsg("redisClient.HSetNX", err)
        return
    }
    r.log.Debugf("HSetNX prefix: %s, key: %s", r.getRedisProductPrefixKey(), key)
}
Enter fullscreen mode Exit fullscreen mode

And then api gateway can request reader service for data using gRPC.
Reader gRPC service method:

func (s *grpcService) GetProductById(ctx context.Context, req *readerService.GetProductByIdReq) (*readerService.GetProductByIdRes, error) {
    s.metrics.GetProductByIdGrpcRequests.Inc()

    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetProductById")
    defer span.Finish()

    productUUID, err := uuid.FromString(req.GetProductID())
    if err != nil {
        s.log.WarnMsg("uuid.FromString", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    query := queries.NewGetProductByIdQuery(productUUID)
    if err := s.v.StructCtx(ctx, query); err != nil {
        s.log.WarnMsg("validate", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    product, err := s.ps.Queries.GetProductById.Handle(ctx, query)
    if err != nil {
        s.log.WarnMsg("GetProductById.Handle", err)
        return nil, s.errResponse(codes.Internal, err)
    }

    s.metrics.SuccessGrpcRequests.Inc()
    return &readerService.GetProductByIdRes{Product: models.ProductToGrpcMessage(product)}, nil
}
Enter fullscreen mode Exit fullscreen mode

Get by id method tracing:

Api gateway get product by id http handler method:

// GetProductByID
// @Tags Products
// @Summary Get product
// @Description Get product by id
// @Accept json
// @Produce json
// @Param id path string true "Product ID"
// @Success 200 {object} dto.ProductResponse
// @Router /products/{id} [get]
func (h *productsHandlers) GetProductByID() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.GetProductByIdHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.GetProductByID")
        defer span.Finish()

        productUUID, err := uuid.FromString(c.Param(constants.ID))
        if err != nil {
            h.log.WarnMsg("uuid.FromString", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query := queries.NewGetProductByIdQuery(productUUID)
        response, err := h.ps.Queries.GetProductById.Handle(ctx, query)
        if err != nil {
            h.log.WarnMsg("GetProductById", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusOK, response)
    }
}
Enter fullscreen mode Exit fullscreen mode

and query handler code:

func (q *getProductByIdHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*dto.ProductResponse, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getProductByIdHandler.Handle")
    defer span.Finish()

    ctx = tracing.InjectTextMapCarrierToGrpcMetaData(ctx, span.Context())
    res, err := q.rsClient.GetProductById(ctx, &readerService.GetProductByIdReq{ProductID: query.ProductID.String()})
    if err != nil {
        return nil, err
    }

    return dto.ProductResponseFromGrpc(res.GetProduct()), nil
}
Enter fullscreen mode Exit fullscreen mode

Search products method:


func (p *mongoRepository) Search(ctx context.Context, search string, pagination *utils.Pagination) (*models.ProductsList, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Search")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    filter := bson.D{
        {Key: "$or", Value: bson.A{
            bson.D{{Key: "name", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
            bson.D{{Key: "description", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
        }},
    }

    count, err := collection.CountDocuments(ctx, filter)
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "CountDocuments")
    }
    if count == 0 {
        return &models.ProductsList{Products: make([]*models.Product, 0)}, nil
    }

    limit := int64(pagination.GetLimit())
    skip := int64(pagination.GetOffset())
    cursor, err := collection.Find(ctx, filter, &options.FindOptions{
        Limit: &limit,
        Skip:  &skip,
    })
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "Find")
    }
    defer cursor.Close(ctx)

    products := make([]*models.Product, 0, pagination.GetSize())

    for cursor.Next(ctx) {
        var prod models.Product
        if err := cursor.Decode(&prod); err != nil {
            p.traceErr(span, err)
            return nil, errors.Wrap(err, "Find")
        }
        products = append(products, &prod)
    }

    if err := cursor.Err(); err != nil {
        span.SetTag("error", true)
        span.LogKV("error_code", err.Error())
        return nil, errors.Wrap(err, "cursor.Err")
    }

    return models.NewProductListWithPagination(products, count, pagination), nil
}
Enter fullscreen mode Exit fullscreen mode

More details and source code you can find here,
of course in real-world applications, we have to implement many more necessary features,
like circuit breaker, retries, rate limiters, etc., depends on project it can be implemented in different ways,
for example you can use kubernetes and istio for some of them.
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions, feel free contact me by email or any messengers :)

Top comments (3)

Collapse
 
tudorhulban profile image
Tudor Hulban

hi,
is

var created models.Product
Enter fullscreen mode Exit fullscreen mode

really needed? anyway you pass a pointer to the function.
why create another variable when you can write into the passed *models.Product?

Collapse
 
aleksk1ng profile image
Alexander

Hi, it’s up to you, depends on what you need, in this example, right yes we can reuse pointer like in other methods returning product dto, or may be in other case need return another dto, like in postgres we store id as uuid but want return id as string and etc. only id or only error :)

Collapse
 
mrbyerikbol profile image
MrByerikbol

Hi ,
After call update method by product_id it updates in postgresql but no changes in reader_service .
How does caches in redis retrieve new product info from updated pg db ?