DEV Community

Cover image for Enterprise Integration Patterns With Apache Camel
Diogo Daniel Soares Ferreira
Diogo Daniel Soares Ferreira

Posted on • Originally published at diogodanielsoaresferreira.github.io

1

Enterprise Integration Patterns With Apache Camel

Hi there! I want to tell you about a great open-source tool that is AWESOME and it does not get the love it deserves: Apache Camel.

Apache Camel is an integration framework. What does that mean? Let’s suppose you are working on a project that consumes data from Kafka and RabbitMQ, reads and writes from and to various databases, transforms data, logs everything to files and outputs the processed data to another Kafka topic. You also have to implement the error handling of the service (retries, dead letter channel, etc.) for everything to run flawlessly. It seems hard.

Apache Camel helps you to integrate with many components, such as databases, files, brokers, and much more, while keeping the simplicity and promoting enterprise integration patterns. Let’s see some examples, based on integration patterns. You can find the code in this repository.

We will start by consuming events from a Kafka topic and output to another one, taking advantage of the Event-Driven Consumer pattern. The events will be representation of text messages sent by a user.

{
"id": 1,
"text": "Hello Peter!",
"emitter": "John",
"type": "chat",
"devices": [
"smartphone",
"pc"
]
}
view raw event.json hosted with ❤ by GitHub
public class CamelRoutes extends EndpointRouteBuilder {
public void configure() {
from("kafka:input_topic?brokers=localhost:9092&groupId=group1")
.log("${body}")
.to("kafka:output_topic?brokers=localhost:9092");
}
}
view raw CamelRoute.java hosted with ❤ by GitHub

That’s about it! We also added the log for us to see the message body in the logs. The log argument is passed using the Simple language, an Apache Camel language used to evaluate expressions.

Now let’s implement a message filter. This pattern filters out the messages that do not match certain conditions. In our case, we will only process those that have the type “chat”.

public void configure() {
from("kafka:input_topic?brokers=localhost:9092&groupId=group1")
.unmarshal().json(UserMessage.class)
.filter(simple("${body.type} == 'chat'"))
.log("${body}")
.marshal().json()
.to("kafka:output_topic?brokers=localhost:9092");
}

Easy, right? We now unmarshal the message from JSON to the UserMessage POJO to be able to filter by type. We marshal again in JSON before sending it to another Kafka topic.

@Data
@RegisterForReflection
public class UserMessage {
private int id;
private String text;
private String emitter;
private String type;
private List<String> devices;
}

Now suppose we want to store all messages in a file. Besides, for the messages where the emitter is “John Doe”, we want to store them in a different file, for testing purposes. For that, we can use the content-based router pattern.

// ...
.to("kafka:output_topic?brokers=localhost:9092")
.unmarshal().json(UserMessage.class)
.choice()
.when(simple("${body.emitter} == 'John Doe'"))
.marshal().json().to("file://YOUR_PATH/user-message?filename=events.json&fileExist=Append&appendChars=\\n")
.otherwise()
.marshal().json().to("file://YOUR_PATH/john-doe-message?filename=events.json&fileExist=Append&appendChars=\\n")
.end();

If the file already exists, we will append the events and add a newline at the end of each event. For other emitters, we will do the same, but stores them in another file. It does look like an ‘if’ construct, right?

We can see a list of “devices” in the event, and we want to log them one by one. How can we do that? Using the Splitter pattern, we can iterate through any list. We can do it sequentially or parallelly. Let’s try to do it sequentially in this example.

// ...
.filter(simple("${body.type} == 'chat'"))
.log("${body}")
.split(simple("${body.devices}"))
.log("${body}")
.end()
.marshal().json()
.to("kafka:output_topic?brokers=localhost:9092")
// ...

We can split by any field that is an Iterable. As you can see, we are using again the Simple language to access the content of the event.

Let’s try something harder. We are receiving messages with text from various emitters, but we want to aggregate multiple text messages and create a new message with all messages for an emitter. To do that, we can use the Aggregator pattern. The aggregator pattern allows events to be buffered and wait for other events. When another event is received, it can be performed a custom aggregation, based on our needs. A new event is sent when a condition is met. That condition can be based on the number of events received, a timeout, or any other custom condition.

In our case, we will create a new POJO that will aggregate the text messages from an emitter. The new event will be sent after 5 seconds of the first event received for the emitter.

//...
from("kafka:input_topic?brokers=localhost:9092&groupId=group2")
.unmarshal().json(UserMessage.class)
.aggregate(simple("${body.emitter}"), new CombinedUserMessagesAggregationStrategy())
.completionInterval(5000)
.log("${body}");
//...

We are using an in-memory aggregation, but we could use other data stores, such as Postgres or Redis. We are using simple language to aggregate the emitter of the message, and we created a custom aggregation strategy, shown below.

In the custom aggregation strategy, for the first event (oldExchange==null), we create a new CombinedUserMessage with the text of the message. For all other events, we add the text of the message to the combined event.

public class CombinedUserMessagesAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
final UserMessage body = newExchange.getIn().getBody(UserMessage.class);
final CombinedUserMessage newEventBody = CombinedUserMessage.builder()
.emitter(body.getEmitter())
.text(List.of(body.getText()))
.build();
newExchange.getIn().setBody(newEventBody);
return newExchange;
}
final UserMessage newUserMessage = newExchange.getIn().getBody(UserMessage.class);
final CombinedUserMessage oldCombinedUserMessage = oldExchange.getIn().getBody(CombinedUserMessage.class);
final List<String> newTest = new ArrayList<>(oldCombinedUserMessage.getText());
newTest.add(newUserMessage.getText());
final CombinedUserMessage newCombinedUserMessage = CombinedUserMessage.builder()
.emitter(newUserMessage.getEmitter())
.text(newTest)
.build();
oldExchange.getIn().setBody(newCombinedUserMessage);
return oldExchange;
}
}

This is all great, but how do we apply transformations to a field? We now have a combined event, but what was great was if we could somehow process the combined event and turn it into plain text, by combining the multiple elements of the text messages. We can do that using the Message Translator pattern.

from("kafka:input_topic?brokers=localhost:9092&groupId=group2")
.unmarshal().json(UserMessage.class)
.aggregate(simple("${body.emitter}"), new CombinedUserMessagesAggregationStrategy())
.completionInterval(5000)
.bean(NLPUtils.class, "createUserMessages")
.log("${body}");
public class NLPUtils {
public static UserMessages createUserMessages(final CombinedUserMessage event) {
return UserMessages.builder()
.emitter(event.getEmitter())
.text(String.join(". ", event.getText()))
.build();
}
}
view raw NLPUtils.java hosted with ❤ by GitHub

We can call bean functions directly from a Camel Route and perform all the transformations that we need,using plain Java code. Neat!

We can see that our Camel Routes are becoming bigger. How do we do if we want, for example, to separate them between files? Two in-memory components that allow us to do that: Direct and SEDA.

Direct is a synchronous endpoint that works like a call from a route to another route. Let’s use it to separate the route that stores the event in a file.

//...
from("kafka:input_topic?brokers=localhost:9092&groupId=group1")
.unmarshal().json(UserMessage.class)
.filter(simple("${body.type} == 'chat'"))
.log("${body}")
.split(simple("${body.devices}"))
.log("${body}")
.end()
.marshal().json()
.to("kafka:output_topic?brokers=localhost:9092")
.to("direct:store_in_file");
from("direct:store_in_file")
.unmarshal().json(UserMessage.class)
.choice()
.when(simple("${body.emitter} == 'John Doe'"))
.marshal().json().to("file:///home/diogoferreira/Desktop/Apache Camel Demo/getting-started/user-message?filename=events.json&fileExist=Append&appendChars=\\n")
.otherwise()
.marshal().json().to("file:///home/diogoferreira/Desktop/Apache Camel Demo/getting-started/john-doe-message?filename=events.json&fileExist=Append&appendChars=\\n")
.end();
//...

Great! There is another in-memory component that will be useful for us: SEDA. SEDA works like Direct but is asynchronous, which means that puts the message in a queue for other thread to process. Let’s use SEDA to decouple the receiving of the message from Kafka from the routes that consume it.

public class CamelRoutes extends EndpointRouteBuilder {
public void configure() {
from("kafka:input_topic?brokers=localhost:9092&groupId=group1")
.unmarshal().json(UserMessage.class)
.filter(simple("${body.type} == 'chat'"))
.to("seda:incoming_event");
from("seda:incoming_event?multipleConsumers=true")
.log("${body}")
.split(simple("${body.devices}"))
.log("${body}")
.end()
.marshal().json()
.to("kafka:output_topic?brokers=localhost:9092")
.to("direct:store_in_file");
from("direct:store_in_file")
.unmarshal().json(UserMessage.class)
.choice()
.when(simple("${body.emitter} == 'John Doe'"))
.marshal().json().to("file:///home/diogoferreira/Desktop/Apache Camel Demo/getting-started/user-message?filename=events.json&fileExist=Append&appendChars=\\n")
.otherwise()
.marshal().json().to("file:///home/diogoferreira/Desktop/Apache Camel Demo/getting-started/john-doe-message?filename=events.json&fileExist=Append&appendChars=\\n")
.end();
from("seda:incoming_event?multipleConsumers=true")
.aggregate(simple("${body.emitter}"), new CombinedUserMessagesAggregationStrategy())
.completionInterval(5000)
.bean(NLPUtils.class, "createUserMessages")
.log("${body}");
}
}

Now our routes are much simpler. Suppose we need to perform a periodic task, such as a cleanup. We can take advantage of the Timer endpoint. Let’s exemplify it by creating a route that runs every 5 seconds.

//...
from("timer://foo?period=5000").log("timer");
//...

Now that our application is almost ready for production, we have to improve fault tolerance. What happens if, for some reason, a message gets an error while in a route? Let’s implement the Dead Letter pattern. When there is an error in the route, the message is sent to another Kafka topic, so that later it can be reprocessed.

//...
errorHandler(deadLetterChannel("kafka:dead_letter?brokers=localhost:9092")
.useOriginalMessage());
//...

And that’s it! The error handler configuration applies to all routes in the class. We send the original message to the topic (the one that was first received in the route). We could also configure retry policies, with timeouts and other common fault tolerance configurations, but as we don’t need it, we will leave it as is.

Now that we are reaching the end of this article, I also wanted to show you something: it is possible to configure REST endpoints as Camel routes.

//...
rest("/api").get("/hello").to("direct:hello");
from("direct:hello").transform().constant("Hello world!");
//...

As simple as that! We just configured a GET for the URL /api/hello, to be answered with “Hello World!”.

As you can see, Apache Camel is a framework that simplifies the integration with other components, supporting the enterprise integration patterns and making it easier to create data pipelines.

I hope you have liked it! Thank you!

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read full post →

Top comments (0)

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up