DEV Community

Cover image for Introduction to Spring Cloud Function and Spring Cloud Stream
Ishan Soni
Ishan Soni

Posted on

Introduction to Spring Cloud Function and Spring Cloud Stream

What is Streaming and Stream processing?

Event Stream Processing (ESP) is the practice of taking action on a series of data points/events (stream) that originate from a system that continuously creates data.

Stream processing can typically be represented using a DAG:

Stream Processing DAG

This is a very high level abstraction. Can we model it using something we already have in Java? Yes — Java8 Functions!

Stream Processing DAG — Java Functional Toolkit

Spring Cloud Function

Spring Cloud Function is a framework that promotes the implementation of business logic as Java Functions further instrumenting them to execute in context of different runtimes (HTTP, Messaging, etc.)!

Example — Let’s create a function that turns a string into uppercase:

public Function<String, String> toUpper() {
    return (str) -> {
        System.out.println("Original: " + str);
        return str.toUpperCase();
    };
}
Enter fullscreen mode Exit fullscreen mode

If you are using spring initializer, and simply add the Spring Cloud Function dependency, it will add the spring-cloud-function-context dependency. If you have the Spring Web dependency in your project and add the Spring Cloud Function dependency, it will add the spring-cloud-function-web dependency i.e. we will be able to expose the above function as a rest endpoint (i.e this function is automatically bound to an HTTP context). Let’s see it in action!

//Returns a Function (A Processor node -> has both Input and Output!)
@Bean
public Function<String, String> toUpper() {
    return (str) -> {
        System.out.println("Original: " + str);
        return str.toUpperCase();
    };
}
Enter fullscreen mode Exit fullscreen mode

Start your application and invoke this function at:

localhost:/toUpper/ishan

Output:

Spring Cloud Function Output

With Spring Cloud Function, we were successfully able to bind this function to an HTTP context!

Spring Cloud Stream

A framework that instruments your Spring Cloud Functions as event handlers bound to destinations using binders specific to a particular messaging system. Also see — why do you need a messaging system?

Eg — You want your Spring Cloud Function to become a message handler for a RabbitMQ destination(queue)!

Setup

Add the spring-cloud-stream-binder-rabbit dependency.

spring-cloud-stream-binder-rabbit dependency tree

Bring up RabbitMQ using docker-compose

---
version: “3.8”
services:
  rabbitmq:
    image: rabbitmq:3.11-management-alpine
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
      RABBITMQ_DEFAULT_VHOST: /
Enter fullscreen mode Exit fullscreen mode

RabbitMQ before

Connect RabbitMQ to your spring application (application.properties/yaml)

rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
Enter fullscreen mode Exit fullscreen mode

When you bring the application up, you’ll see two new exchanges!

RabbitMQ after

If you drop a message in toUpper-in-0 exchange, your toUpper function will get invoked and the output will be pushed to toUpper-out-0 exchange! Note — The REST access to this function will still work if you also have the spring-cloud-function-web dependency!

Note — A Function is a processor. That is why it created 2 exchanges! It listens to toUpper-in-0 and sends the output to toUpper-out-0

Let’s create a consumer for toUpper-out-0 exchange:

@Bean
public Consumer<String> consumeUpper() {
    return (upper) -> {
        System.out.println("Consumed: " + upper);
    };
}
Enter fullscreen mode Exit fullscreen mode
# Spring Cloud Functions defined in your application
# Define multiple functions definitions by using ;
spring.cloud.function.definition=toUpper;consumeUpper

# Based on the Function definitions, Spring Cloud Stream will create bindings
# which connects the Function's input and/or output to destinations! 
# The toUpper-in-0 (the toUpper function listens to this)
# and toUpper-out-0 (the toUpper function produces results to this) were 
# automatically created

# If you do not do any configuration for the consumeUpper function,
# it will automatically create a consumeUpper-in-0 exchange and listen 
# to that. But we want this function to listen to toUpper-out-0
# What we are saying below is ->
# Bind consumeUpper-in-0 binding to listen to the toUpper-out-0 destination
# Since this is a Consumer function, the binding that is created is consumeUpper-in-0
spring.cloud.stream.bindings.consumeUpper-in-0.destination=toUpper-out-0
spring.cloud.stream.bindings.consumeUpper-in-0.group=school-service
Enter fullscreen mode Exit fullscreen mode

The concept of groups is similar to consumer groups in Kafka. But more on that in another article.

Let’s drop the message “Ishan” into the exchange “toUpper-in-0” using the RabbitMQ admin console. You’ll see the following in your application logs:

From the toUpper function

From the consumeUpper function

Producer

You can directly produce data to a destination using the StreamBridge class:

@Autowired
private StreamBridge streamBridge;

...

streamBridge.send(Destination, data);
Enter fullscreen mode Exit fullscreen mode

or you can use a Supplier

Supplier is a little different that a Function or a Consumer. Functions/Consumers are triggered whenever there is an input. But a Supplier doesn’t have a trigger. In Spring Cloud Function, the Supplier has an automated polling mechanism which polls the supplier every second (which you can override)

@Bean
public Supplier<String> createData() {
    return () -> {
        System.out.println("Creating some data");
        return "Ishan-" +
                ThreadLocalRandom.current().nextInt(0, 10000) +
                "-" +
                LocalDateTime.now();
    };
}
Enter fullscreen mode Exit fullscreen mode
spring.cloud.function.definition=toUpper;consumeUpper;createData

...

# Modify the poll interval of the supplier
spring.cloud.stream.poller.fixed-delay=10000

# The binding destination of the supplier function.
# If we had provided no configuration, spring cloud stream would have
# automatically created a createData-out-0 exchange and the data this
# supplier creates will be pushed to that exchange. But we want this
# supplier to push data to the toUpper-in-0 exchange
spring.cloud.stream.bindings.createData-out-0.destination=toUpper-in-0
Enter fullscreen mode Exit fullscreen mode

Restart your application again:

Output

Sending and Consuming data based on specific binding and routing keys

If you are unfamiliar with RabbitMQ routing and binding keys, see this article first.

Example — You are using StreamBridge to send messages to a students exchange. How do you set the routing key of the message?

spring.cloud.stream.rabbit.bindings.students.producer.routingKeyExpression=headers['type']
Enter fullscreen mode Exit fullscreen mode

What you are saying is, treat the type header as a routing key

import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

...

Message<StudentCreated> message = MessageBuilder.withPayload(studentCreated)
        .setHeader("type", "student.created").build();
streamBridge.send("students", message);
Enter fullscreen mode Exit fullscreen mode

Now, how do you bind the below consumer and the queue it listens to a specific binding key on an exchange? (i.e this consumer only wants to listen to StudentCreated events from the students exchange)

@Bean
public Consumer<StudentCreated> studentCreated() {
  ...
}
Enter fullscreen mode Exit fullscreen mode
spring.cloud.function.definition=studentCreated
spring.cloud.stream.bindings.studentCreated-in-0.destination=students
spring.cloud.stream.bindings.studentCreated-in-0.group=school-service
#IMP
spring.cloud.stream.rabbit.bindings.studentCreated-in-0.consumer.bindingRoutingKey=student.created
spring.cloud.stream.rabbit.bindings.studentCreated-in-0.consumer.bindingRoutingKeyDelimiter=,
Enter fullscreen mode Exit fullscreen mode

Top comments (0)