DEV Community

Sergey Chin
Sergey Chin

Posted on

Spring WebFlux + DynamoDB + S3

Introduction

Disclaimer: the code presented in the article is not suitable for a production environment.

This article will cover an example of creating a simple API on Spring WebFlux to manage a list of audio tracks in DynamoDB and store audio files in AWS S3.

Setting up AWS services

Setting up IAM

To access AWS services from Java, we need a user under which the application will be authorized.

Let’s go to the IAM settings and create a user named aws-tracks-user.

Then you need to add Permission Policies — AmazonDynamoDBFullAccess and AmazonS3FullAccess. These policies are needed so that the application has access to DynamoDB and S3.

Also, in the section Security credentials → Access keys, you need to add an access key. This access key and the corresponding secret key will be used in application.properties.

Setting up DynamoDB

Let’s create a table aws_tutorials.tracks in DynamoDB, where information about tracks will be stored. In the Partition key field, specify name uid and the String type. Leave the rest of the settings by default.

Setting up S3

Let’s create a bucket called aws-tutorial-tracks, this bucket will store audio files. Also, in the Object Ownership section, select ACLs enabled so that the bucket takes into account our IAM settings for the aws-tracks-user.

Creating a Spring WebFlux Application

We will use Spring Boot 3.3.3 and Maven. The full pom.xml setup is below:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>3.3.3</version>
  <relativePath/> <!-- lookup parent from repository -->
 </parent>
 <groupId>com.example</groupId>
 <artifactId>aws-sound-service</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <name>aws-sound-service</name>
 <description>aws-sound-service</description>
 <properties>
  <java.version>21</java.version>
 </properties>
 <dependencyManagement>
  <dependencies>
   <dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>bom</artifactId>
    <version>2.27.22</version>
    <type>pom</type>
    <scope>import</scope>
   </dependency>
  </dependencies>
 </dependencyManagement>
 <dependencies>
  <!-- Spring dependencies -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>
  <!-- Depenency to work with DynamoDB -->
  <dependency>
   <groupId>software.amazon.awssdk</groupId>
   <artifactId>dynamodb</artifactId>
  </dependency>
  <!-- Depenencies to work with S3 -->
  <dependency>
   <groupId>software.amazon.awssdk</groupId>
   <artifactId>s3</artifactId>
  </dependency>
  <dependency>
   <groupId>software.amazon.awssdk</groupId>
   <artifactId>netty-nio-client</artifactId>
  </dependency>
  <!-- OpenAPI dependency -->
  <dependency>
   <groupId>org.springdoc</groupId>
   <artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
   <version>2.3.0</version>
  </dependency>

  <dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-test</artifactId>
   <scope>test</scope>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
     <excludes>
      <exclude>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
      </exclude>
     </excludes>
    </configuration>
   </plugin>
  </plugins>
 </build>

</project>
Enter fullscreen mode Exit fullscreen mode

Let’s add settings to application.properties:

spring.application.name=aws-sound-service
aws.region=<your_region>
aws.access-key=<your_access_key>
aws.secret-key=<your_secred_key>
aws.s3.track-bucket=aws-tutorial-tracks
Enter fullscreen mode Exit fullscreen mode

In the aws.region parameter, specify the name of the region where the DynamoDB and S3 services are located, for example, eu-north-1. We created values for the aws.access-key and aws.secret-key parameters at the IAM setup section.

Let’s create the config for DynamoDB. In the configuration, you need to create an asynchronous client, since we use Spring WebFlux.

package com.example.awssoundservice.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;

@Configuration
public class DynamoDbConfig {

    @Value("${aws.region}")
    private String region;

    @Value("${aws.access-key}")
    private String accessKey;

    @Value("${aws.secret-key}")
    private String secretKey;

    @Bean
    public DynamoDbAsyncClient dynamoDbAsyncClient() {
        return DynamoDbAsyncClient.builder()
                .region(Region.of(region))
                .credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() {
                    @Override
                    public String accessKeyId() {
                        return accessKey;
                    }

                    @Override
                    public String secretAccessKey() {
                        return secretKey;
                    }
                }))
                .build();
    }

}

Enter fullscreen mode Exit fullscreen mode

And we will also create an asynchronous S3 client:

package com.example.awssoundservice.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Configuration;

import java.time.Duration;

@Configuration
public class S3ClientConfig {

    @Value("${aws.region}")
    private String region;

    @Value("${aws.access-key}")
    private String accessKey;

    @Value("${aws.secret-key}")
    private String secretKey;

    @Bean
    public S3AsyncClient s3AsyncClient() {
        SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
                .writeTimeout(Duration.ZERO)
                .maxConcurrency(64)
                .build();
        S3Configuration serviceConfiguration = S3Configuration.builder()
                .checksumValidationEnabled(false)
                .chunkedEncodingEnabled(true)
                .build();
        return S3AsyncClient.builder()
                .httpClient(httpClient)
                .region(Region.of(region))
                .credentialsProvider(() -> AwsBasicCredentials.create(accessKey, secretKey))
                .serviceConfiguration(serviceConfiguration)
                .build();
    }

}
Enter fullscreen mode Exit fullscreen mode

Let’s create a class that will store information about a track:

package com.example.awssoundservice.model;

import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

import java.util.List;
import java.util.Map;

public record Track(
        String uid, // unique track identifier
        String name, // track name
        String fileKey, // unique track file key
        String fileUrl // URL to download the track
) {
    // Data mapping from DynamoDB values to a Track object
    public static Track from(Map<String, AttributeValue> valueMap) {
        return new Track(
                valueMap.get("uid").s(),
                valueMap.get("name").s(),
                valueMap.get("fileKey").s(),
                valueMap.get("fileUrl").s()
        );
    }

    public static List<Track> fromList(List<Map<String, AttributeValue>> valueMaps) {
        return valueMaps.stream().map(Track::from).toList();
    }

    // Data mapping from a Track object to DynamoDB values
    public Map<String, AttributeValue> toMap() {
        return Map.of(
                "uid", AttributeValue.builder().s(this.uid).build(),
                "name", AttributeValue.builder().s(this.name).build(),
                "fileKey", AttributeValue.builder().s(this.fileKey).build(),
                "fileUrl", AttributeValue.builder().s(this.fileUrl).build()
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

The library we use for DynamoDB represents data as a Map. The map key is the column name, the value of the column is stored in an object of the AttributeValue class. For simplicity, only the String type is used here.

Detailed information about what data can be stored in AttributeValue can be found here https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html.

We will also create a small class to represent the request to create and update a track:

package com.example.awssoundservice.request;

public record TrackCreateOrUpdateRequest(String name) {
}
Enter fullscreen mode Exit fullscreen mode

We will store the list of DynamoDB table names in a separate class DynamoDbTables:

package com.example.awssoundservice.common;

public class DynamoDbTables {
    public static final String TRACKS = "aws_sound.tracks";
}

Now lets create a service with business logic. First, lets create methods for reading tracks:

package com.example.awssoundservice.service;

import com.example.awssoundservice.common.DynamoDbTables;
import com.example.awssoundservice.common.utils.AwsSdkUtils;
import com.example.awssoundservice.common.utils.FileUtils;
import com.example.awssoundservice.model.Track;
import com.example.awssoundservice.request.TrackCreateOrUpdateRequest;
import com.example.awssoundservice.response.FileResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.*;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Service
@RequiredArgsConstructor
public class TrackService {

    private final S3AsyncClient s3AsyncClient;
    private final DynamoDbAsyncClient dynamoDb;

    @Value("${aws.s3.track-bucket}")
    private String bucket;

    public Mono<Track> getById(String trackId) {
        GetItemRequest getItemRequest = GetItemRequest.builder()
                .tableName(DynamoDbTables.TRACKS)
                .key(Map.of("uid", AttributeValue.builder().s(trackId).build()))
                .build();
        CompletableFuture<GetItemResponse> future = dynamoDb.getItem(getItemRequest);
        return Mono.fromFuture(future)
                .map(getItemResponse -> {
                    AwsSdkUtils.checkSdkResponse(getItemResponse);
                    return Track.from(getItemResponse.item());
                });
    }

    public Mono<List<Track>> getAll() {
        ScanRequest scanRequest = ScanRequest.builder()
                .tableName(DynamoDbTables.TRACKS)
                .build();
        CompletableFuture<ScanResponse> future = dynamoDb.scan(scanRequest);
        return Mono.fromFuture(future)
                .map(scanResponse -> {
                    AwsSdkUtils.checkSdkResponse(scanResponse);
                    return Track.fromList(scanResponse.items());
                });
    }

}
Enter fullscreen mode Exit fullscreen mode

The DynamoDB asynchronous client returns a CompletableFuture as its result. The Reactor library allows us to create Mono objects from a CompletableFuture using the Mono::fromFuture method.

Next, we implement methods for creating, updating and deleting tracks:

public Mono<Track> create(TrackCreateOrUpdateRequest request) {
    // fields fileKey and FileUrl are empty, 
    // because audio file is not uploaded yet
    Track track = new Track(
            UUID.randomUUID().toString(),
            request.name(),
            "",
            ""
    );
    PutItemRequest putItemRequest = PutItemRequest.builder()
            .tableName(DynamoDbTables.TRACKS)
            .item(track.toMap())
            .build();
    CompletableFuture<PutItemResponse> responseFuture = dynamoDb.putItem(putItemRequest);
    return Mono.fromFuture(responseFuture)
            .map(putItemResponse -> {
                AwsSdkUtils.checkSdkResponse(putItemResponse);
                return track;
            });
}

public Mono<Track> update(String trackId, TrackCreateOrUpdateRequest request) {
    Mono<Track> trackMono = this.getById(trackId);
    return trackMono.flatMap(currentTrack -> {
        Track updatedTrack = new Track(
                currentTrack.uid(),
                request.name(),
                currentTrack.fileKey(),
                currentTrack.fileUrl()
        );
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(DynamoDbTables.TRACKS)
                .item(updatedTrack.toMap())
                .build();
        return Mono.fromFuture(dynamoDb.putItem(putItemRequest));
    }).flatMap(response -> {
        AwsSdkUtils.checkSdkResponse(response);
        return this.getById(trackId);
    });
}

public Mono<Track> delete(String trackId) {
    Mono<Track> trackMono = this.getById(trackId);
    DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
            .tableName(DynamoDbTables.TRACKS)
            .key(Map.of("uid", AttributeValue.builder().s(trackId).build()))
            .build();
    CompletableFuture<DeleteItemResponse> deleteFuture = dynamoDb.deleteItem(deleteItemRequest);
    return trackMono.zipWith(Mono.fromFuture(deleteFuture))
            .map(objects -> {
                Track currentTrack = objects.getT1();
                DeleteItemResponse deleteResponse = objects.getT2();
                AwsSdkUtils.checkSdkResponse(deleteResponse);
                return currentTrack;
            });
}
Enter fullscreen mode Exit fullscreen mode

Now let’s implement the method that will upload audio files to S3.

public Mono<Track> uploadTrackFile(String trackId, FilePart filePart) {
    String filename = filePart.filename();
    String fileKey = trackId + "/" + filename;
    PutObjectRequest putObjectRequest = PutObjectRequest.builder()
            .bucket(bucket)
            .key(fileKey)
            .contentType(Objects.requireNonNull(filePart.headers().getContentType()).toString())
            .build();
    // convert Flux<DataBuffer> to Mono<ByteBuffer>,
    // to send the entire file data to S3
    Mono<ByteBuffer> fileByteBuffer = FileUtils.dataBuffersToByteBuffer(filePart.content());
    return fileByteBuffer
            .flatMap(buffer -> Mono.fromFuture(
                    s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromByteBuffer(buffer))
            ))
            .map(putObjectResponse -> {
                AwsSdkUtils.checkSdkResponse(putObjectResponse);
                GetUrlRequest getUrlRequest = GetUrlRequest.builder().bucket(bucket).key(fileKey).build();
                String fileUrl = s3AsyncClient.utilities().getUrl(getUrlRequest).toExternalForm();
                return new FileResponse(
                        fileKey,
                        fileUrl
                );
            })
            .flatMap(fileResponse -> this.addFileInfoToTrack(trackId, fileResponse));
}

// save the file info (fileKey and fileUrl) to DynamoDB
private Mono<Track> addFileInfoToTrack(String trackId, FileResponse fileResponse) {
    Mono<Track> track = this.getById(trackId);
    return track.flatMap(currentTrack -> {
        Track updatedTrack = new Track(
                currentTrack.uid(),
                currentTrack.name(),
                fileResponse.fileKey(),
                fileResponse.fileUrl()
        );
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(DynamoDbTables.TRACKS)
                .item(updatedTrack.toMap())
                .build();
        return Mono.fromFuture(dynamoDb.putItem(putItemRequest));
    }).flatMap(putItemResponse -> {
        AwsSdkUtils.checkSdkResponse(putItemResponse);
        return this.getById(trackId);
    });
}
Enter fullscreen mode Exit fullscreen mode

In this case, we used a small utility to convert Flux to Mono. The code of the utility is below:

package com.example.awssoundservice.common.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;

@Slf4j
public class FileUtils {

    public static Mono<ByteBuffer> dataBuffersToByteBuffer(Flux<DataBuffer> buffers) {
        return DataBufferUtils.join(buffers).map(dataBuffer -> {
            ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
            dataBuffer.toByteBuffer(buffer);
            return buffer;
        });
    }

}
Enter fullscreen mode Exit fullscreen mode

Information about the uploaded file is saved in the intermediate FileResponse object.

package com.example.awssoundservice.response;

public record FileResponse (
        String fileKey,
        String fileUrl
) {
}
Enter fullscreen mode Exit fullscreen mode

We also used a utility to check the response from AWS. This utility throws an exception if we receive an unsuccessful response.

package com.example.awssoundservice.common.utils;

import software.amazon.awssdk.core.SdkResponse;

import java.text.MessageFormat;

public class AwsSdkUtils {

    public static boolean isErrorSdkHttpResponse(SdkResponse sdkResponse) {
        return sdkResponse.sdkHttpResponse() == null || !sdkResponse.sdkHttpResponse().isSuccessful();
    }

    public static void checkSdkResponse(SdkResponse sdkResponse) {
        if (AwsSdkUtils.isErrorSdkHttpResponse(sdkResponse)){
            if (Objects.nonNull(sdkResponse.sdkHttpResponse())) {
                throw new IllegalStateException(
                        MessageFormat.format(
                                "{0} - {1}",
                                sdkResponse.sdkHttpResponse().statusCode(),
                                sdkResponse.sdkHttpResponse().statusText()
                        )
                );    
            } else {
                throw new IllegalStateException("Unknown error in AWS SDK");
            }
        }
    }

}
Enter fullscreen mode Exit fullscreen mode

Now let’s write the REST API:

package com.example.awssoundservice.controller;

import com.example.awssoundservice.model.Track;
import com.example.awssoundservice.request.TrackCreateOrUpdateRequest;
import com.example.awssoundservice.response.GeneralResponse;
import com.example.awssoundservice.service.TrackService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

import java.util.List;

@RestController
@RequestMapping("/tracks")
@RequiredArgsConstructor
public class TrackController {

    private final TrackService trackService;

    @GetMapping
    public Mono<GeneralResponse<List<Track>>> getAll() {
        return trackService.getAll().map(GeneralResponse::successResponse);
    }

    @GetMapping("/{uid}")
    public Mono<GeneralResponse<Track>> getById(@PathVariable String uid) {
        return trackService.getById(uid).map(GeneralResponse::successResponse);
    }

    @PostMapping
    public Mono<GeneralResponse<Track>> create(@RequestBody TrackCreateOrUpdateRequest request) {
        return trackService.create(request).map(GeneralResponse::successResponse);
    }

    @PutMapping("/{uid}")
    public Mono<GeneralResponse<Track>> update(@PathVariable String uid, @RequestBody TrackCreateOrUpdateRequest request) {
        return trackService.update(uid, request).map(GeneralResponse::successResponse);
    }

    @DeleteMapping("/{uid}")
    public Mono<GeneralResponse<Track>> delete(@PathVariable String uid) {
        return trackService.delete(uid).map(GeneralResponse::successResponse);
    }

    @PostMapping("/{uid}/upload")
    public Mono<GeneralResponse<Track>> uploadTrackFile(
            @PathVariable String uid, @RequestPart(name = "file") Mono<FilePart> file
    ) {
        return file
                .flatMap(filePart -> trackService.uploadTrackFile(uid, filePart))
                .map(GeneralResponse::successResponse);
    }

}
Enter fullscreen mode Exit fullscreen mode

The controller code is very simple, there is just one more class that wraps all responses from trackService — GeneralResponse. The code for this class is below:

package com.example.awssoundservice.response;

public record GeneralResponse<T>(
        Integer statusCode,
        String message,
        T data
) {

    public static <T> GeneralResponse<T> successResponse(T data) {
        return new GeneralResponse<>(200, "OK", data);
    }

}
Enter fullscreen mode Exit fullscreen mode

A global error handler has also been added:

package com.example.awssoundservice.common;

import com.example.awssoundservice.response.GeneralResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.server.ResponseStatusException;

@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler {

    @ExceptionHandler(Exception.class)
    public ResponseEntity<GeneralResponse<Void>> handleException(Exception ex) {
        log.error(ex.getMessage(), ex);
        return ResponseEntity.internalServerError().body(new GeneralResponse<>(
                500, ex.getMessage(), null
        ));
    }

    @ExceptionHandler(ResponseStatusException.class)
    public ResponseEntity<GeneralResponse<Void>> handleResponseStatusException(ResponseStatusException ex) {
        log.error(ex.getMessage(), ex);
        return ResponseEntity.status(ex.getStatusCode()).body(new GeneralResponse<>(
                ex.getStatusCode().value(), ex.getMessage(), null
        ));
    }

}

Enter fullscreen mode Exit fullscreen mode

We have written the application, now we can open the Swagger interface and call the endpoints (http://localhost:8080/swagger-ui.html).

Deploy to EC2

The explanation of deploying the application is quite long, so I will just leave a link to a detailed explanation of how to deploy a Spring Boot application to AWS EC2 (https://learnaws.io/blog/java-ec2-deployment).

Conclusion

We have written a simple application that integrates with AWS services — DynamoDB and S3. This application is educational and is not suitable for a production environment. To improve the code, I suggest you the following:

  1. The low-level DynamoDB library is quite difficult to use and you have to write a lot of additional code. You can use the DynamoDB Enhanced Client library (https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/dynamodb-enhanced-client.html).
  2. The file upload code is not optimized for large files and high service load. An example for uploading large files can be found in the S3 SDK documentation (https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java_s3_code_examples.html) and adapted for Spring WebFlux.
  3. We used ACCESS_KEY_ID and SECRET_ACCESS_KEY directly in the application, but role-based access is preferable.

The entire application code is here — https://github.com/SergiusAC/webflux-dynamodb-s3-example.

Top comments (0)