What is Streaming and Stream processing?
Event Stream Processing (ESP) is the practice of taking action on a series of data points/events (stream) that originate from a system that continuously creates data.
Stream processing can typically be represented using a DAG:
This is a very high level abstraction. Can we model it using something we already have in Java? Yes — Java8 Functions!
Spring Cloud Function
Spring Cloud Function is a framework that promotes the implementation of business logic as Java Functions further instrumenting them to execute in context of different runtimes (HTTP, Messaging, etc.)!
Example — Let’s create a function that turns a string into uppercase:
public Function<String, String> toUpper() {
return (str) -> {
System.out.println("Original: " + str);
return str.toUpperCase();
};
}
If you are using spring initializer, and simply add the Spring Cloud Function dependency, it will add the spring-cloud-function-context dependency. If you have the Spring Web dependency in your project and add the Spring Cloud Function dependency, it will add the spring-cloud-function-web dependency i.e. we will be able to expose the above function as a rest endpoint (i.e this function is automatically bound to an HTTP context). Let’s see it in action!
//Returns a Function (A Processor node -> has both Input and Output!)
@Bean
public Function<String, String> toUpper() {
return (str) -> {
System.out.println("Original: " + str);
return str.toUpperCase();
};
}
Start your application and invoke this function at:
localhost:/toUpper/ishan
Output:
With Spring Cloud Function, we were successfully able to bind this function to an HTTP context!
Spring Cloud Stream
A framework that instruments your Spring Cloud Functions as event handlers bound to destinations using binders specific to a particular messaging system. Also see — why do you need a messaging system?
Eg — You want your Spring Cloud Function to become a message handler for a RabbitMQ destination(queue)!
Setup
Add the spring-cloud-stream-binder-rabbit dependency.
Bring up RabbitMQ using docker-compose
---
version: “3.8”
services:
rabbitmq:
image: rabbitmq:3.11-management-alpine
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: /
Connect RabbitMQ to your spring application (application.properties/yaml)
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
When you bring the application up, you’ll see two new exchanges!
If you drop a message in toUpper-in-0 exchange, your toUpper function will get invoked and the output will be pushed to toUpper-out-0 exchange! Note — The REST access to this function will still work if you also have the spring-cloud-function-web dependency!
Note — A Function is a processor. That is why it created 2 exchanges! It listens to toUpper-in-0 and sends the output to toUpper-out-0
Let’s create a consumer for toUpper-out-0 exchange:
@Bean
public Consumer<String> consumeUpper() {
return (upper) -> {
System.out.println("Consumed: " + upper);
};
}
# Spring Cloud Functions defined in your application
# Define multiple functions definitions by using ;
spring.cloud.function.definition=toUpper;consumeUpper
# Based on the Function definitions, Spring Cloud Stream will create bindings
# which connects the Function's input and/or output to destinations!
# The toUpper-in-0 (the toUpper function listens to this)
# and toUpper-out-0 (the toUpper function produces results to this) were
# automatically created
# If you do not do any configuration for the consumeUpper function,
# it will automatically create a consumeUpper-in-0 exchange and listen
# to that. But we want this function to listen to toUpper-out-0
# What we are saying below is ->
# Bind consumeUpper-in-0 binding to listen to the toUpper-out-0 destination
# Since this is a Consumer function, the binding that is created is consumeUpper-in-0
spring.cloud.stream.bindings.consumeUpper-in-0.destination=toUpper-out-0
spring.cloud.stream.bindings.consumeUpper-in-0.group=school-service
The concept of groups is similar to consumer groups in Kafka. But more on that in another article.
Let’s drop the message “Ishan” into the exchange “toUpper-in-0” using the RabbitMQ admin console. You’ll see the following in your application logs:
Producer
You can directly produce data to a destination using the StreamBridge class:
@Autowired
private StreamBridge streamBridge;
...
streamBridge.send(Destination, data);
or you can use a Supplier
Supplier is a little different that a Function or a Consumer. Functions/Consumers are triggered whenever there is an input. But a Supplier doesn’t have a trigger. In Spring Cloud Function, the Supplier has an automated polling mechanism which polls the supplier every second (which you can override)
@Bean
public Supplier<String> createData() {
return () -> {
System.out.println("Creating some data");
return "Ishan-" +
ThreadLocalRandom.current().nextInt(0, 10000) +
"-" +
LocalDateTime.now();
};
}
spring.cloud.function.definition=toUpper;consumeUpper;createData
...
# Modify the poll interval of the supplier
spring.cloud.stream.poller.fixed-delay=10000
# The binding destination of the supplier function.
# If we had provided no configuration, spring cloud stream would have
# automatically created a createData-out-0 exchange and the data this
# supplier creates will be pushed to that exchange. But we want this
# supplier to push data to the toUpper-in-0 exchange
spring.cloud.stream.bindings.createData-out-0.destination=toUpper-in-0
Restart your application again:
Sending and Consuming data based on specific binding and routing keys
If you are unfamiliar with RabbitMQ routing and binding keys, see this article first.
Example — You are using StreamBridge to send messages to a students exchange. How do you set the routing key of the message?
spring.cloud.stream.rabbit.bindings.students.producer.routingKeyExpression=headers['type']
What you are saying is, treat the type header as a routing key
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
...
Message<StudentCreated> message = MessageBuilder.withPayload(studentCreated)
.setHeader("type", "student.created").build();
streamBridge.send("students", message);
Now, how do you bind the below consumer and the queue it listens to a specific binding key on an exchange? (i.e this consumer only wants to listen to StudentCreated events from the students exchange)
@Bean
public Consumer<StudentCreated> studentCreated() {
...
}
spring.cloud.function.definition=studentCreated
spring.cloud.stream.bindings.studentCreated-in-0.destination=students
spring.cloud.stream.bindings.studentCreated-in-0.group=school-service
#IMP
spring.cloud.stream.rabbit.bindings.studentCreated-in-0.consumer.bindingRoutingKey=student.created
spring.cloud.stream.rabbit.bindings.studentCreated-in-0.consumer.bindingRoutingKeyDelimiter=,
Top comments (0)