Introduction
RSocket translates to Reactive socket is a messaging protocol that works over TCP or Websockets. Communication modes this protocol provides are fire-and-forget, request-response and streaming. Since RSocket is fully reactive, it's ideal for high-throughput applications.
In this post, we will explore three communications modes which are fire-and-forget
, request-response
and streaming
and test with RSocket Client CLI (RSC)
a postman type application but to test application over the socket.
Setup
Install Java preferably >8.
Spring boot skeleton project
Navigate to start.spring.io
, select RSocket
as a dependency and a stable version of spring boot version at the point of time and click on Generate
should give a zip file with skeleton project which good to get going.
Initial project structure looks as
- Application configuration
Next, setting up the port on which this application runs by modifying the file application.properties
spring.rsocket.server.port=7000
spring.main.lazy-initialization=true
- Client to test the server application:
Here I am using RSC client created by Toshiaki Maki. Set-up instructions are on his GitHub page.
Data model:
We create a new POJO class representing the data being exchanged by client and server. This class will consist of two member variables for now which are message
created
package com.example.rsocket;
import java.time.Instant;
public class Message {
private String message;
private long created = Instant.now().getEpochSecond();
public Message(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public long getCreated() {
return created;
}
public void setCreated(long created) {
this.created = created;
}
}
This should get the base project set-up and ready for writing code and expose via Socket.
Communication mode 1: Request-Response
Development
Request-Response is a classic way of communication where each request made by the client will get a response back. This is achieved by including below snippet (full code is available in the Github repository: https://github.com/rockey5520/rsocket )
@MessageMapping("request-response")
Mono<Message> requestResponse(final Message message) {
System.out.println("Received request-response message: {}"+message);
return Mono.just(new Message("You said: " + message.getMessage()));
}
The key part of the above code is @MessageMapping("request-response")
where we are letting spring boot know that this method requestResponse
should be invoked when client makes a call using request-response
communication mode. Here we are using the POJO class Message
as the payload for both receiving from client and responding to client.
Since this is a request and response model where server sends response to requests made by client Mono
of Spring Reactor reactor.core
is perfect where there is only need to send response once for each request.
Testing
using the RSC client installed if you pass below instruction
rsc --debug --request --data "{\"message\":\"Hello\"}" --route request-response --stacktrace tcp://localhost:7000
should it run successfully you would see an output as below
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 49
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 11 10 72 65 71 75 65 73 74 2d 72 65 73 |.....request-res|
|00000010| 70 6f 6e 73 65 |ponse |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 48 65 6c 6c |{"message":"Hell|
|00000010| 6f 22 7d |o"} |
+--------+-------------------------------------------------+----------------+
2020-10-11 10:15:29.750 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 56
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 59 6f 75 20 |{"message":"You |
|00000010| 73 61 69 64 3a 20 48 65 6c 6c 6f 22 2c 22 63 72 |said: Hello","cr|
|00000020| 65 61 74 65 64 22 3a 31 36 30 32 34 31 31 33 32 |eated":160241132|
|00000030| 39 7d |9} |
+--------+-------------------------------------------------+----------------+
{"message":"You said: Hello","created":1602411329}
This response received is constructed in 3 message frames
Frame 1
First frame is labelled as metadata shows the routing metadata ( request-response) sent to the server
Frame 2
Second frame shows the data we sent to the server ( here in this example it is "Hello") a JSON string
Frame 3
Third frame shows the server response sent back to the client.
Communication mode 2: Fire-and-Forget
Development
Fire and forget is another way of communication where request made by client but wont get a response back from server. This is achieved by including below snippet (full code is available in the github repository : https://github.com/rockey5520/rsocket )
@MessageMapping("fire-and-forget")
public Mono<Void> fireAndForget(final Message message) {
System.out.println("Received fire-and-forget request: {}"+ message);
return Mono.empty();
}
The key part of the above code is @MessageMapping("fire-and-forget")
where we are letting spring boot know that this method fireAndForget
should be invoked when client makes a call using fire-and-forget communication mode. Here we are using the POJO class
Message` as the payload for both receiving from client.
Testing
using the RSC client installed if you pass below instruction
bash
rsc --debug --fnf --data "{\"message\":\"Hello\"}" --route fire-and-forget --stacktrace tcp://localhost:7000
should it run successfully you would see an output as below
bash
2020-10-11 10:31:28.398 DEBUG --- [ parallel-2] i.r.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_FNF Flags: 0b100000000 Length: 48
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 10 0f 66 69 72 65 2d 61 6e 64 2d 66 6f |.....fire-and-fo|
|00000010| 72 67 65 74 |rget |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 48 65 6c 6c |{"message":"Hell|
|00000010| 6f 22 7d |o"} |
+--------+-------------------------------------------------+----------------+
This response received is constructed in 2 message frames
Frame 1
First frame is labelled as metadata shows the routing metadata ( request-response) sent to the server
Frame 2
Second frame shows the data we sent to the server ( here in this example it is "Hello") a JSON string
Communication mode 3: Request-Stream
Development
This communication mode is for the communication where a client makes a single request and server responds with stream of responses. This is achieved by including below snippet (full code is available in the github repository : https://github.com/rockey5520/rsocket )
java
@MessageMapping("request-stream")
Flux<Message> stream(final Message message) {
return Flux
// create a new indexed Flux emitting one element every second
.interval(Duration.ofSeconds(1))
// create a Flux of new Messages using the indexed Flux
.map(index -> new Message("You said: " + message.getMessage() + ". Response #" + index))
// show what's happening
.log();
}
The key part of the above code is @MessageMapping("request-stream")
where we are letting spring boot know that this method stream
should be invoked when client makes a call using request-stream communication mode. Here we are using the POJO class
Message as the payload for both receiving from client and send a stream of responses of type Message every 1 second which is achieved by using Flux
.
Testing
using the RSC client installed if you pass below instruction
bash
rsc --debug --stream --data "{\"message\":\"Hello\"}" --route request-stream --stacktrace tcp://localhost:7000
should it run successfully you would see an output as below
`bash
2020-10-11 10:42:40.638 DEBUG --- [ parallel-2] i.r.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 51 InitialRequestN: 9223372036854775807
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0f 0e 72 65 71 75 65 73 74 2d 73 74 72 |.....request-str|
|00000010| 65 61 6d |eam |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 48 65 6c 6c |{"message":"Hell|
|00000010| 6f 22 7d |o"} |
+--------+-------------------------------------------------+----------------+
2020-10-11 10:42:41.749 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 69
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 59 6f 75 20 |{"message":"You |
|00000010| 73 61 69 64 3a 20 48 65 6c 6c 6f 2e 20 52 65 73 |said: Hello. Res|
|00000020| 70 6f 6e 73 65 20 23 30 22 2c 22 63 72 65 61 74 |ponse #0","creat|
|00000030| 65 64 22 3a 31 36 30 32 34 31 32 39 36 31 7d |ed":1602412961} |
+--------+-------------------------------------------------+----------------+
{"message":"You said: Hello. Response #0","created":1602412961}
2020-10-11 10:42:42.707 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 69
Data:
`
This response received is a stream of responses back to rsc client.
Communication mode 4: Channel
Development
This communication mode is for the communication where client and server can send a stream of messages, essentially streaming messages in both directions. This is achieved by including below snippet (full code is available in the Github repository: https://github.com/rockey5520/rsocket )
java
@MessageMapping("stream-stream")
Flux<Message> channel(final Flux<Integer> input) {
System.out.println("Received stream-stream (channel) request...");
return settings
.doOnNext(input -> System.out.println("Requested interval is {} seconds."+ setting))
.doOnCancel(() -> System.out.println("The client cancelled the channel."))
.switchMap(input -> Flux.interval(Duration.ofSeconds(input))
.map(index -> new Message("Stream Response #" + index)))
.log();
}
The key part of the above code is @MessageMapping("stream-stream")
where we are letting spring boot know that this method channel
should be invoked when client makes a call using stream-stream
communication mode.
Here we are returning Flux
for each request(input
) sent as part of stream creating a new outbound flux using the Duration sent in input payload thereby achieving the back pressure
where client controls the speed server response stream should be. This is a very good feature for the functionalities such as Video streaming where user controls the speed at which server should stream videos based on the client internet speed
Testing
using the RSC client installed if you pass below instruction
bash
rsc --debug --channel --data - --route stream-stream --stacktrace tcp://localhost:7000
now command line waits for user input for interval in seconds, Given provided 1 server responds stream of messages every 1 second and while it's streaming you could change to 10 and see server changes its streaming speed from 1 second to 10 seconds which is amazing when you need to build an application where client needs to apply back pressure and let server know the speed at which it expects the responses.
`bash
rockey@ubuntu:~/projects/rsocket$ rsc --debug --channel --data - --route stream-stream --stacktrace tcp://localhost:7000
3
2020-10-11 11:06:39.843 DEBUG --- [ parallel-2] i.r.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_CHANNEL Flags: 0b100000000 Length: 32 InitialRequestN: 9223372036854775807
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0e 0d 73 74 72 65 61 6d 2d 73 74 72 65 |.....stream-stre|
|00000010| 61 6d |am |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 33 |3 |
+--------+-------------------------------------------------+----------------+
2020-10-11 11:06:39.847 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 9223372036854775807
Data:
2020-10-11 11:06:42.849 DEBUG --- [actor-tcp-nio-1] i.r.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 59
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 22 6d 65 73 73 61 67 65 22 3a 22 53 74 72 65 |{"message":"Stre|
|00000010| 61 6d 20 52 65 73 70 6f 6e 73 65 20 23 30 22 2c |am Response #0",|
|00000020| 22 63 72 65 61 74 65 64 22 3a 31 36 30 32 34 31 |"created":160241|
|00000030| 34 34 30 32 7d |4402} |
+--------+-------------------------------------------------+----------------+
{"message":"Stream Response #0","created":1602414402}
10
2020-10-11 11:06:45.216 DEBUG --- [oundedElastic-1] i.r.FrameLogger : sending ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b100100000 Length: 29
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 0e 0d 73 74 72 65 61 6d 2d 73 74 72 65 |.....stream-stre|
|00000010| 61 6d |am |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 30 |10 |
+--------+-------------------------------------------------+----------------+
`
This response received is a stream of responses back to rsc client.
Additional Reading material:
https://github.com/benwilcock/springone-2020-rsocket-talk/tree/master/rsocket-server
https://benwilcock.wordpress.com/2020/06/25/getting-started-with-rsocket-on-spring-boot/
Top comments (0)