DynamoDB offers DynamoDB Streams, which provides a way to capture changes to items in a table. Although DynamoDB Streams offer valuable insights into data changes, managing and processing these streams can be challenging (when your are not using lambda or kinesis).
Anyways, I just wanted to use DynamoDB Streams along with Flux in Java and that's what inspired me to write this wrapper around DynamoDB Stream's low level api (which you can find here).
First of all, what's diffifult with the existing api, you ask? Well here are a few things you need to do, to listen to the changes of your DynamoDB:
- Well first you need to call
DescribeStream
API which takes your StreamARN as input and returns you a couple ofShards
. - After which you need to get the
ShardIterator
for each theseShards
using theGetShardIterator
API. - And now, since you have an iterator, you can call the
GetRecords
API to get the actual DynamoDB Records (which has new and old images)
On the top of all of these, you need to take care of the expired shards as well as poll these steps since the api does not provide a stream.
And so I created dynamo-streams which will take care of all of these steps for you and you just need to listen to these changes. These changes will come through a Flux.
So, where to we get started?
-
First add the maven or gradle dependency for the
dynamo-streams
<dependency>
<groupId>io.github.jica98</groupId>
<artifactId>aws-java-dynamo-streams</artifactId>
<version>0.0.5</version>
</dependency>
implementation group: 'io.github.jica98', name: 'aws-java-dynamo-streams', version: '0.0.5'
-
If you are using
Spring
, then you can declare a bean for the DynamoStreams in one your config classes.
private static final String STREAM_ARN = "arn:aws:dynamodb:us-east-1:your-dynamo-db-stream";
@Bean(destroyMethod = "shutdown")
protected AmazonDynamoDBStreams streamsClient() {
return AmazonDynamoDBStreamsClientBuilder
.standard()
.withRegion(Regions.US_EAST_1)
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
}
@Bean(destroyMethod = "shutdown")
protected DynamoStreams<DataRoot> dynamoStreams(AmazonDynamoDBStreams dynamoDBStreams) {
return new DynamoStreams<>(
StreamConfig.<DataRoot>builder()
.clazz(DataRoot.class)
.dynamoDBStreams(dynamoDBStreams)
.streamARN(STREAM_ARN)
.build());
}
- Finally in your controller, autowire it and subscribe to the changes using a Flux. There are a couple of utility methods such as
newImages
oroldImages
which will return flux of only thenewImages
oroldImages
respectively.
@Autowired
private DynamoStreams<DataRoot> dynamoStreams;
@PostConstruct
void postConstruct() {
// Initialize here to start streaming events
dynamoStreams.initialize();
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<DataRoot>> streamData() {
return dynamoStreams.stream()
.newImages()
.map(data -> ServerSentEvent.<DataRoot>builder()
.data(data)
.id(UUID.randomUUID().toString())
.build());
}
Hope this makes the DynamoDB Streams easier to use in your code and I acknowledge the fact that the correct way of subscribing to the events will always be through a lambda.
You can find the project over here.
Changes I wish to do in the future:
- Polling is only active when there is atleast one subscriber. (Right now it polls at a configurable interval irrespective of the subscriber count)
- Add more utility methods.
- Add support for something other than flux.
Top comments (0)