DEV Community

FrankNPC
FrankNPC

Posted on

How to trace across message queue - kafka, without writing a log and trace

Once the system sent messages through the queue, or other asynchronized service, as part of business logic, it's hard to trace who, when and where do they go to and, who, when and where did they come from.

Now we have bitryon logger to connect puzzles together into workflow and stack-trace. Following how to automate log and trace without writing a log and trace, we can configure and initiate bitryon logger with spring to cover more traces with logs.

Take Kafka as an example. Supposedly we had kafka server installed already.

Here is a taste of the trace logs.

2025-11-20 15:42:24.785|http-nio-80-exec-1#38|7G5HPZyA3QWSF2f1SMRN5qV2U95tjOLi|4|JSON|
MedicService.java#io.bitryon.example.web.service.MedicService#callSelfInvocation#68#|
[{
    "testString": "68ssTfeP43IeSVqFWx1jH1VigFuEdUbt"
}]
2025-11-20 15:42:24.787|http-nio-80-exec-1#38|7G5HPZyA3QWSF2f1SMRN5qV2U95tjOLi|13|JSON|
MedicService.java#io.bitryon.example.web.service.MedicService#callSelfInvocation#68#R|
["1763653344787"]
2025-11-20 15:42:26.073|org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1#51|7G5HPZyA3QWSF2f1SMRN5qV2U95tjOLi|14-1|JSON|
UserService.java#io.bitryon.example.web.service.rpc.UserService#getBySessionId#0#|
[{
    "sessionId": "68ssTfeP43IeSVqFWx1jH1VigFuEdUbt"
}]
2025-11-20 15:42:26.693|http-nio-80-exec-4#41|7G5HPZyA3QWSF2f1SMRN5qV2U95tjOLi|14-1-2|JSON|
UserServiceImpl.java#io.bitryon.example.web.service.rpc.UserServiceImpl#getBySessionId#45#|
[{
    "sessionId": "68ssTfeP43IeSVqFWx1jH1VigFuEdUbt"
}]
2025-11-20 15:42:26.694|http-nio-80-exec-4#41|7G5HPZyA3QWSF2f1SMRN5qV2U95tjOLi|14-1-3|JSON|
UserServiceImpl.java#io.bitryon.example.web.service.rpc.UserServiceImpl#getBySessionId#45#R|
[{
    "name": "ra***ly",
    "id": 456,
    "driverLisenceId": "****************************",
    "age": null
}]
2025-11-20 15:42:26.918|org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1#51|7G5HPZyA3QWSF2f1SMRN5qV2U95tjOLi|14-2|JSON|
UserService.java#io.bitryon.example.web.service.rpc.UserService#getBySessionId#0#R|
[{
    "name": "ra***ly",
    "id": 456,
    "driverLisenceId": "****************************",
    "age": null
}]
Enter fullscreen mode Exit fullscreen mode

Both, bitryon logger is a tracer; bitryon logger is a logger;

Steps to trace everywhere

  1. Write next step log id to metadata/header of the message
  2. Read the step log id from metadata/header of the message
  3. Configure beans for metadata/header Interceptors
  4. Send and receive messages through the queue, in case of kafka.
  5. Upload the logs to web portal and access log traces

Now, show you my code

Write next step log id to metadata/header of the message

public class ProducerLoggerInterceptor implements ProducerInterceptor<String, String> {
    private static final Logger logger = LoggerFactory.getLogger();

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // CRITICAL .getNextStepLogId()
        String nextSteplogId = logger.getNextStepLogId();
        record.headers().add(PreDefinition.HTTP_HEADER_STEP_LOG_ID, nextSteplogId.getBytes(PreDefinition.CharsetEncoding));
        return record;
    }
// other code
}
Enter fullscreen mode Exit fullscreen mode

The difference between service and message queue to use step log id to connect the trace, is getStepLogId VS getNextStepLogId: trace inside the service invoke VS start a new step.

Read the step log id from metadata/header of the message

public class ConsumerLoggerInterceptor implements ConsumerInterceptor<String, String> {
    private static final Logger logger = LoggerFactory.getLogger();

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        records.forEach(record -> {
            Header header = record.headers().lastHeader(PreDefinition.HTTP_HEADER_STEP_LOG_ID);
            if (header!=null) {
                String stepLogId = new String(header.value(), PreDefinition.CharsetEncoding);
                logger.setStepLogId(stepLogId);
            }else {
                logger.reset();// reset to decouple each consume
            }
        });

        return records;
    }
// other code
}
Enter fullscreen mode Exit fullscreen mode

Configure beans for metadata/header Interceptors

@Configuration
public class KafkaBeansConfiguration {
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory(KafkaProperties properties) {
        Map<String, Object> config = new HashMap<>(properties.buildProducerProperties());
        config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
                ProducerLoggerInterceptor.class.getName());
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties properties) {
        Map<String, Object> config = new HashMap<>(properties.buildConsumerProperties());
        config.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
                ConsumerLoggerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}
Enter fullscreen mode Exit fullscreen mode

Send and receive messages through the queue, in case of kafka.

@Service
public class KafkaProducer {

    @Resource
    MedicService medicService;

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        medicService.callSelfInvocation(message);
        kafkaTemplate.send("demo-topic", message);
    }
}
Enter fullscreen mode Exit fullscreen mode
@Service
public class KafkaConsumer {

    @Resource
    UserService userService;

    @KafkaListener(topics = "demo-topic", groupId = "demo-group")
    public void listen(String message) {
        userService.getBySessionId(message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Upload the logs to web portal and access log traces

Please check out the section Upload logs through the agent or OpenTelemetry -> how to automate log and trace without writing a log and trace

More over

  • It's not only for Kafka, it works for all message queue systems.
  • The trace runs into messaging, will be part of the full trace.
  • Keep updated bitryon-logging-examples for more use cases.

Conclusion

Now developers can see the TRUE trace at everywhere, microservice and message queue, kafka etc..

#java
#message-queue
#microservice
#startup
#observation
#logging
#log
#logger
#trace
#tracer
#tracing
#workflow

Top comments (0)