Table Of Contents
- Introduction
- Business logic implementation
- Test implementation
- How can I play with the code?
- What’s in it for me?
- Want to go deeper?
- Links
1. Introduction
How can I easily test my MongoDB multi-document transaction code without setting up MongoDB on my device? One might argue that they have to set it up first because in order to carry out such a transaction it needs a session which requires a replica set. Thankfully, there is no need to create a 3-node replica set and we can run these transactions only against a single database instance.
To achieve this, we may do the following:
- Run a MongoDB container of version 4 or higher and specify a --replSet command;
- Initialize a single replica set by executing a proper command;
- Wait for the initialization to complete;
- Connect to a standalone without specifying a replica set in order not to worry about modifying our OS host file.
It is worth mentioning that a replica set is not the only option here because MongoDB version 4.2 introduces distributed transactions in sharded clusters, which is beyond the scope of this article.
There are a lot of ways of how to initialize a replica set, including Docker compose, bash scripts, services in a CI/CD etc. However, it takes some extra work in terms of scripting, handling random ports, and making it part of the CI/CD process. Fortunately, starting from Testcontainers’ version 1.14.2 we are able to delegate all the heavy lifting to the MongoDB Module.
Let us try it out on a small warehouse management system based on Spring Boot 2.3. In the recent past one had to use ReactiveMongoOperations
and its inTransaction
method, but since Spring Data MongoDB 2.2 M4 we have been able to leverage the good old @Transactional
annotation or more advanced TransactionalOperator
.
Our application should have a REST API to provide the information on successfully processed files including the number of the documents modified. Regarding the files causing errors along the way, we should skip them to process all the files.
It may be noted that even though duplicated articles and their sizes within a single file are a rare case, this possibility is quite realistic, and therefore should be handled as well.
As per business requirements to our system, we already have some products in our database and we upload a bunch of Excel (xlsx) files to update some fields of the matched documents in our storage. Data is supposed to be only at the first sheet of any workbook. Each file is processed in a separate multi-document transaction to prevent simultaneous modifications of the same documents. For example, Figure 1 shows collision cases on how a transaction ends up except for a possible scenario when transactions are executed sequentially (json representation is shortened here for the sake of simplicity). Transactional behavior helps us to avoid clashing the data and guarantees consistency.
As for a product collection, we have an article as a unique index. At the same time, each article is bound to a concrete size. Therefore, it is important for our application to verify that both of them are in the database before updating. Figure 2 gives an insight into this collection.
2. Business logic implementation
Let us elaborate on the major points of the above-mentioned business logic and start with ProductController
as an entry point for the processing. You can find a complete project on GitHub. Prerequisites are Java8+ and Docker.
@PatchMapping(
consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
produces = MediaType.APPLICATION_STREAM_JSON_VALUE
)
public ResponseEntity<Flux<FileUploadDto>> patchProductQuantity(
@RequestPart("file") Flux<FilePart> files,
@AuthenticationPrincipal Principal principal
) {
log.debug("shouldPatchProductQuantity");
return ResponseEntity.accepted().body(
uploadProductService.patchProductQuantity(files, principal.getName())
);
}
1) Wrap a response in a ResponseEntity
and return the flux
of the FileUploadDto
;
2) Get a current authentication principal, coming in handy later on;
3) Pass the flux
of the FilePart
to process.
Here is the patchProductQuantity
method of the UploadProductServiceImpl
:
public Flux<FileUploadDto> patchProductQuantity(
final Flux<FilePart> files,
final String userName
) {
return Mono.fromRunnable(() -> initRootDirectory(userName))
.publishOn(Schedulers.newBoundedElastic(1, 1, "initRootDirectory"))
.log(String.format("cleaning-up directory: %s", userName))
.thenMany(files.flatMap(f ->
saveFileToDiskAndUpdate(f, userName)
.subscribeOn(Schedulers.boundedElastic())
)
);
}
1) Use the name of the user as the root directory name;
2) Do the blocking initialization of the root directory on a separate elastic thread;
3) For each Excel file:
3.1) Save it on a disk;
3.2) Then update the quantity of the products on a separate elastic thread, as blocking processing of the file is ran.
The saveFileToDiskAndUpdate
method does the following logic:
private Mono<FileUploadDto> saveFileToDiskAndUpdate(
final FilePart file,
final String userName
) {
final String fileName = file.filename();
final Path path = Paths.get(pathToStorage, userName, fileName);
return Mono.just(path)
.log(String.format("A file: %s has been uploaded", fileName))
.flatMap(file::transferTo)
.log(String.format("A file: %s has been saved", fileName))
.then(processExcelFile(fileName, userName, path));
}
1) Copy the content of the file to the user’s directory;
2) After the copy stage is completed, call the processExcelFile
method.
At this point, we are going to divide logic in accordance with the size of the file:
private Mono<FileUploadDto> processExcelFile(
final String fileName,
final String userName,
final Path path
) {
return Mono.fromCallable(() -> Files.size(path))
.flatMap(size -> {
if (size >= bigFileSizeThreshold) {
return processBigExcelFile(fileName, userName);
} else {
return processSmallExcelFile(fileName, userName);
}
});
}
1) Wrap the blocking Files.size(path)
call in Mono.fromCallable
;
2) bigFileSizeThreshold
is injected from a proper application.yml file via @Value("${upload-file.bigFileSizeThreshold}")
.
Before going into detail on processing Excel files depending on their size, we should take a look at the getProducts
method of the ExcelFileDaoImpl
:
@Override
public Flux<Product> getProducts(
final String pathToStorage,
final String fileName,
final String userName
) {
return Flux.defer(() -> {
FileInputStream is;
Workbook workbook;
try {
final File file = Paths.get(pathToStorage, userName, fileName).toFile();
verifyFileAttributes(file);
is = new FileInputStream(file);
workbook = StreamingReader.builder()
.rowCacheSize(ROW_CACHE_SIZE)
.bufferSize(BUFFER_SIZE)
.open(is);
} catch (IOException e) {
return Mono.error(new UploadProductException(
String.format("An exception has been occurred while parsing a file: %s " +
"has been saved", fileName), e));
}
try {
final Sheet datatypeSheet = workbook.getSheetAt(0);
final Iterator<Row> iterator = datatypeSheet.iterator();
final AtomicInteger rowCounter = new AtomicInteger();
if (iterator.hasNext()) {
final Row currentRow = iterator.next();
rowCounter.incrementAndGet();
verifyExcelFileHeader(fileName, currentRow);
}
return Flux.<Product>create(fluxSink -> fluxSink.onRequest(value -> {
try {
for (int i = 0; i < value; i++) {
if (!iterator.hasNext()) {
fluxSink.complete();
return;
}
final Row currentRow = iterator.next();
final Product product = Objects.requireNonNull(getProduct(
FileRow.builder()
.fileName(fileName)
.currentRow(currentRow)
.rowCounter(rowCounter.incrementAndGet())
.build()
), "product is not supposed to be null");
fluxSink.next(product);
}
} catch (Exception e1) {
fluxSink.error(e1);
}
})).doFinally(signalType -> {
try {
is.close();
workbook.close();
} catch (IOException e1) {
log.error("Error has occurred while releasing {} resources: {}", fileName, e1);
}
});
} catch (Exception e) {
return Mono.error(e);
}
});
}
1) differ
the whole logic once there is a new subscriber;
2) Verify the excel file header;
3) Create a flux
to provide the requested number of products;
4) Convert an Excel row into a Product
domain object;
5) Finally, close all of the opened resources.
Getting back to the processing of the Excel files in the UploadProductServiceImpl
, we are going to use the MongoDB’s bulkWrite
method on a collection to update products in bulk, which requires the eagerly evaluated list of the UpdateOneModel
. In practice, collecting such a list is a memory-consuming operation, especially for big files.
Regarding small Excel files, we provide a more detailed log and do additional validation check:
private Mono<FileUploadDto> processSmallExcelFile(
final String fileName,
final String userName
) {
log.debug("processSmallExcelFile: {}", fileName);
return excelFileDao.getProducts(pathToStorage, fileName, userName)
.reduce(new ConcurrentHashMap<ProductArticleSizeDto, Tuple2<UpdateOneModel<Document>, BigInteger>>(),
(indexMap, product) -> {
final BigInteger quantity = product.getQuantity();
indexMap.merge(
new ProductArticleSizeDto(product.getArticle(), product.getSize()),
Tuples.of(
updateOneModelConverter.convert(Tuples.of(product, quantity, userName)),
quantity
),
(oldValue, newValue) -> {
final BigInteger mergedQuantity = oldValue.getT2().add(newValue.getT2());
return Tuples.of(
updateOneModelConverter.convert(Tuples.of(product, mergedQuantity, userName)),
mergedQuantity
);
}
);
return indexMap;
})
.filterWhen(productIndexFile ->
productDao.findByArticleIn(extractArticles(productIndexFile.keySet()))
.<ProductArticleSizeDto>handle(
(productArticleSizeDto, synchronousSink) -> {
if (productIndexFile.containsKey(productArticleSizeDto)) {
synchronousSink.next(productArticleSizeDto);
} else {
synchronousSink.error(new UploadProductException(
String.format(
"A file %s does not have an article: %d with size: %s",
fileName,
productArticleSizeDto.getArticle(),
productArticleSizeDto.getSize()
)
));
}
})
.count()
.handle((sizeDb, synchronousSink) -> {
final int sizeFile = productIndexFile.size();
if (sizeDb == sizeFile) {
synchronousSink.next(Boolean.TRUE);
} else {
synchronousSink.error(new UploadProductException(
String.format(
"Inconsistency between total element size in MongoDB: %d and a file %s: %d",
sizeDb,
fileName,
sizeFile
)
));
}
})
).onErrorResume(e -> {
log.debug("Exception while processExcelFile fileName: {}: {}", fileName, e);
return Mono.empty();
}).flatMap(productIndexFile ->
productPatcherService.incrementProductQuantity(
fileName,
productIndexFile.values().stream().map(Tuple2::getT1).collect(Collectors.toList()),
userName
)
).map(bulkWriteResult -> FileUploadDto.builder()
.fileName(fileName)
.matchedCount(bulkWriteResult.getMatchedCount())
.modifiedCount(bulkWriteResult.getModifiedCount())
.build()
);
}
1) reduce
helps us handle duplicate products whose quantities should be summed up;
2) Collect a map to get the list of the ProductArticleSizeDto
against the pair of the list of the UpdateOneModel
and the total quantity for a product. The former is in use for matching an article and its size in the file with those that are in the database via a projection ProductArticleSizeDto
;
3) Use the atomic merge
method of the ConcurrentMap
to sum up the quantity of the same products and create a new UpdateOneModel
;
4) Filter out all products in the file by those product’s articles that are in the database;
5) Each ProductArticleSizeDto
found in the storage matches a ProductArticleSizeDto
from the file summed up by quantity;
6) Then count
the result after filtration which should be equal to the distinct number of products in the file;
7) Use the onErrorResume
method to continue when any error occurs because we need to process all files as mentioned in the requirements;
8) Extract the list of the UpdateOneModel
from the map collected earlier to be further used in the incrementProductQuantity
method;
9) Then run the incrementProductQuantity
method as a sub-process within flatMap
and map
its result in FileUploadDto
that our business users are in need of.
Even though the filterWhen
and the subsequent productDao.findByArticleIn
allow us to do some additional validation at an early stage, they come at a price, which is especially noticeable while processing big files in practice. However, the incrementProductQuantity
method can compare the number of modified documents and match them against the number of the distinct products in the file. Knowing that, we can implement a more light-weight option to process big files:
private Mono<FileUploadDto> processBigExcelFile(
final String fileName,
final String userName
) {
log.debug("processBigExcelFile: {}", fileName);
return excelFileDao.getProducts(pathToStorage, fileName, userName)
.reduce(new ConcurrentHashMap<Product, Tuple2<UpdateOneModel<Document>, BigInteger>>(),
(indexMap, product) -> {
final BigInteger quantity = product.getQuantity();
indexMap.merge(
product,
Tuples.of(
updateOneModelConverter.convert(Tuples.of(product, quantity, userName)),
quantity
),
(oldValue, newValue) -> {
final BigInteger mergedQuantity = oldValue.getT2().add(newValue.getT2());
return Tuples.of(
updateOneModelConverter.convert(Tuples.of(product, mergedQuantity, userName)),
mergedQuantity
);
}
);
return indexMap;
})
.map(indexMap -> indexMap.values().stream().map(Tuple2::getT1).collect(Collectors.toList()))
.onErrorResume(e -> {
log.debug("Exception while processExcelFile: {}: {}", fileName, e);
return Mono.empty();
}).flatMap(dtoList ->
productPatcherService.incrementProductQuantity(
fileName,
dtoList,
userName
)
).map(bulkWriteResult -> FileUploadDto.builder()
.fileName(fileName)
.matchedCount(bulkWriteResult.getMatchedCount())
.modifiedCount(bulkWriteResult.getModifiedCount())
.build()
);
}
Here is the ProductAndUserNameToUpdateOneModelConverter
that we have used to create an UpdateOneModel
:
@Component
public class ProductAndUserNameToUpdateOneModelConverter implements
Converter<Tuple3<Product, BigInteger, String>, UpdateOneModel<Document>> {
@Override
@NonNull
public UpdateOneModel<Document> convert(@NonNull Tuple3<Product, BigInteger, String> source) {
Objects.requireNonNull(source);
final Product product = source.getT1();
final BigInteger quantity = source.getT2();
final String userName = source.getT3();
return new UpdateOneModel<>(
Filters.and(
Filters.eq(Product.SIZE_DB_FIELD, product.getSize().name()),
Filters.eq(Product.ARTICLE_DB_FIELD, product.getArticle())
),
Document.parse(
String.format(
"{ $inc: { %s: %d } }",
Product.QUANTITY_DB_FIELD,
quantity
)
).append(
"$set",
new Document(
Product.LAST_MODIFIED_BY_DB_FIELD,
userName
)
),
new UpdateOptions().upsert(false)
);
}
}
1) Firstly, find a document by article and size. Figure 2 shows that we have a compound index on the size and article fields of the product collection to facilitate such a search;
2) Increment the quantity of the found document and set the name of the user in the lastModifiedBy
field;
3) It is also possible to upsert
a document here, but we are interested only in the modification of the existing documents in the storage.
Now we are ready to implement the central part of our processing which is the incrementProductQuantity
method of the ProductPatcherDaoImpl
:
@Override
public Mono<BulkWriteResult> incrementProductQuantity(
final String fileName,
final List<UpdateOneModel<Document>> models,
final String userName
) {
return transactionalOperator.execute(
action -> reactiveMongoOperations.getCollection(Product.COLLECTION_NAME)
.flatMap(collection ->
Mono.from(collection.bulkWrite(models, new BulkWriteOptions().ordered(true)))
).<BulkWriteResult>handle((bulkWriteResult, synchronousSink) -> {
final int fileCount = models.size();
if (Objects.equals(bulkWriteResult.getModifiedCount(), fileCount)) {
synchronousSink.next(bulkWriteResult);
} else {
synchronousSink.error(
new IllegalStateException(
String.format(
"Inconsistency between modified doc count: %d and file doc count: %d. Please, check file: %s",
bulkWriteResult.getModifiedCount(), fileCount, fileName
)
)
);
}
}).onErrorResume(
e -> Mono.fromRunnable(action::setRollbackOnly)
.log("Exception while incrementProductQuantity: " + fileName + ": " + e)
.then(Mono.empty())
)
).singleOrEmpty();
}
1) Use a transactionalOperator
to roll back a transaction manually. As has been mentioned before, our goal is to process all files while skipping those causing exceptions;
2) Run a single sub-process to bulk write modifications to the database sequentially for fail-fast and less resource-intensive behavior. The word "single" is of paramount importance here because we avoid the dangerous "N+1 Query Problem" leading to spawning a lot of sub-processes on a flux
within flatMap
;
3) Handle
the situation when the number of the documents processed does not match the one coming from the distinct number of the products in the file;
4) The onErrorResume
method handles the rollback of the transaction and then returns Mono.empty()
to skip the current processing;
5) Expect either a single item or an empty Mono as the result of the transactionalOperator.execute
method.
One would say: "You called collection.bulkWrite(models, new BulkWriteOptions().ordered(true))
, what about setting a session?". The thing is that the SessionAwareMethodInterceptor
of the Spring Data MongoDB does it via reflection:
ReflectionUtils.invokeMethod(targetMethod.get(), target,
prependSessionToArguments(session, methodInvocation)
Here is the prependSessionToArguments
method:
private static Object[] prependSessionToArguments(ClientSession session, MethodInvocation invocation) {
Object[] args = new Object[invocation.getArguments().length + 1];
args[0] = session;
System.arraycopy(invocation.getArguments(), 0, args, 1, invocation.getArguments().length);
return args;
}
1) Get the arguments of the MethodInvocation
;
2) Add session
as a the first element in the args
array.
In fact, the following method of the MongoCollectionImpl
is called:
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return Publishers.publish(
callback -> wrapped.bulkWrite(clientSession.getWrapped(), requests, options, callback));
}
3. Test implementation
So far so good, we can create integration tests to cover our logic.
To begin with, we create ProductControllerITTest
to test our public API via the Spring’s WebTestClient
and initialize a MongoDB instance to run tests against:
private static final MongoDBContainer MONGO_DB_CONTAINER =
new MongoDBContainer("mongo:4.2.8");
1) Use a static field to have single Testcontainers’ MongoDBContainer
per all test methods in ProductControllerITTest
;
2) We use 4.2.8 MongoDB container version from Docker Hub as it is the latest stable one, otherwise MongoDBContainer
defaults to 4.0.10.
Then in static methods setUpAll
and tearDownAll
we start and stop the MongoDBContainer
respectively. Even though we do not use Testcontainers' reusable feature here, we leave open the possibility of setting it. Which is why we call MONGO_DB_CONTAINER.stop()
only if the reusable feature is turned off.
@BeforeAll
static void setUpAll() {
MONGO_DB_CONTAINER.start();
}
@AfterAll
static void tearDownAll() {
if (!MONGO_DB_CONTAINER.isShouldBeReused()) {
MONGO_DB_CONTAINER.stop();
}
}
tearDown
is responsible for cleaning up the modifications that each test does:
@AfterEach
void tearDown() {
StepVerifier.create(productDao.deleteAll()).verifyComplete();
}
Next we set spring.data.mongodb.uri
by executing MONGO_DB_CONTAINER.getReplicaSetUrl()
in ApplicationContextInitializer
:
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
TestPropertyValues.of(
String.format("spring.data.mongodb.uri: %s", MONGO_DB_CONTAINER.getReplicaSetUrl())
).applyTo(configurableApplicationContext);
}
}
Now we are ready to write a first test without any transaction collision, because our test files (see Figure 3) have products whose articles do not clash with one another.
@WithMockUser(
username = SecurityConfig.ADMIN_NAME,
password = SecurityConfig.ADMIN_PAS,
authorities = SecurityConfig.WRITE_PRIVILEGE
)
@Test
void shouldPatchProductQuantity() {
//GIVEN
insertMockProductsIntoDb(Flux.just(product1, product2, product3));
final BigInteger expected1 = BigInteger.valueOf(16);
final BigInteger expected2 = BigInteger.valueOf(27);
final BigInteger expected3 = BigInteger.valueOf(88);
final String fileName1 = "products1.xlsx";
final String fileName3 = "products3.xlsx";
final String[] fileNames = {fileName1, fileName3};
final FileUploadDto fileUploadDto1 = ProductTestUtil.mockFileUploadDto(fileName1, 2);
final FileUploadDto fileUploadDto3 = ProductTestUtil.mockFileUploadDto(fileName3, 1);
//WHEN
final WebTestClient.ResponseSpec exchange = webClient
.patch()
.uri(BASE_URL)
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(ProductTestUtil.getMultiPartFormData(fileNames)))
.exchange();
//THEN
exchange.expectStatus().isAccepted();
exchange.expectBodyList(FileUploadDto.class)
.hasSize(2)
.contains(fileUploadDto1, fileUploadDto3);
StepVerifier.create(productDao.findAllByOrderByQuantityAsc())
.assertNext(product -> assertEquals(expected1, product.getQuantity()))
.assertNext(product -> assertEquals(expected2, product.getQuantity()))
.assertNext(product -> assertEquals(expected3, product.getQuantity()))
.verifyComplete();
}
Finally, let us test a transaction collision in action,
keeping in mind Figure 1 and Figure 4 showing such files:
@WithMockUser(
username = SecurityConfig.ADMIN_NAME,
password = SecurityConfig.ADMIN_PAS,
authorities = SecurityConfig.WRITE_PRIVILEGE
)
@Test
void shouldPatchProductQuantityConcurrently() {
//GIVEN
TransactionUtil.setMaxTransactionLockRequestTimeoutMillis(
20,
MONGO_DB_CONTAINER.getReplicaSetUrl()
);
insertMockProductsIntoDb(Flux.just(product1, product2));
final String fileName1 = "products1.xlsx";
final String fileName2 = "products2.xlsx";
final String[] fileNames = {fileName1, fileName2};
final BigInteger expected120589Sum = BigInteger.valueOf(19);
final BigInteger expected120590Sum = BigInteger.valueOf(32);
final BigInteger expected120589T1 = BigInteger.valueOf(16);
final BigInteger expected120589T2 = BigInteger.valueOf(12);
final BigInteger expected120590T1 = BigInteger.valueOf(27);
final BigInteger expected120590T2 = BigInteger.valueOf(11);
final FileUploadDto fileUploadDto1 = ProductTestUtil.mockFileUploadDto(fileName1, 2);
final FileUploadDto fileUploadDto2 = ProductTestUtil.mockFileUploadDto(fileName2, 2);
//WHEN
final WebTestClient.ResponseSpec exchange = webClient
.patch()
.uri(BASE_URL)
.contentType(MediaType.MULTIPART_FORM_DATA)
.accept(MediaType.APPLICATION_STREAM_JSON)
.body(BodyInserters.fromMultipartData(ProductTestUtil.getMultiPartFormData(fileNames)))
.exchange();
//THEN
exchange.expectStatus().isAccepted();
assertThat(
extractBodyArray(exchange),
either(arrayContaining(fileUploadDto1))
.or(arrayContaining(fileUploadDto2))
.or(arrayContainingInAnyOrder(fileUploadDto1, fileUploadDto2))
);
final List<Product> list = productDao.findAll(Sort.by(Sort.Direction.ASC, "article"))
.toStream().collect(Collectors.toList());
assertThat(list.size(), is(2));
assertThat(
list.stream().map(Product::getQuantity).toArray(BigInteger[]::new),
either(arrayContaining(expected120589T1, expected120590T1))
.or(arrayContaining(expected120589T2, expected120590T2))
.or(arrayContaining(expected120589Sum, expected120590Sum))
);
TransactionUtil.setMaxTransactionLockRequestTimeoutMillis(
5,
MONGO_DB_CONTAINER.getReplicaSetUrl()
);
}
1) We can specify the maximum amount of time in milliseconds that multi-document transactions should wait to acquire locks required by the operations in the transaction (by default, multi-document transactions wait 5 milliseconds);
2) As an example here, we might use a helper method to change 5ms with 20ms (see an implementation details below).
Note that the maxTransactionLockRequestTimeoutMillis
setting has no sense for this particular test case and serves the purpose of the example. After running this test class 120 times via a script ./load_test.sh 120 ProductControllerITTest.shouldPatchProductQuantityConcurrently
in the tools directory of the project, I got the following figures:
indicator | 20ms, times |
5ms(default), times |
---|---|---|
T1 successes | 61 | 56 |
T2 successes | 57 | 63 |
T1 and T2 success | 2 | 1 |
While going through logs, we may come across something like:
Exception while incrementProductQuantity: products1.xlsx: com.mongodb.MongoCommandException: Command failed with error 112 (WriteConflict): 'WriteConflict' on server…
Initiating transaction rollback...
Initiating transaction commit...
About to abort transaction for session...
About to commit transaction for session...
Then, let us test the processing of the big file containing 1 million products in a separate PatchProductLoadITTest
:
@WithMockUser(
username = SecurityConfig.ADMIN_NAME,
password = SecurityConfig.ADMIN_PAS,
authorities = SecurityConfig.WRITE_PRIVILEGE
)
@Test
void shouldPatchProductQuantityBigFile() {
//GIVEN
unzipClassPathFile("products_1M.zip");
final String fileName = "products_1M.xlsx";
final int count = 1000000;
final long totalQuantity = 500472368779L;
final List<Document> products = getDocuments(count);
TransactionUtil.setTransactionLifetimeLimitSeconds(
900,
MONGO_DB_CONTAINER.getReplicaSetUrl()
);
StepVerifier.create(
reactiveMongoTemplate.remove(new Query(), Product.COLLECTION_NAME)
.then(reactiveMongoTemplate.getCollection(Product.COLLECTION_NAME))
.flatMapMany(c -> c.insertMany(products))
.switchIfEmpty(Mono.error(new RuntimeException("Cannot insertMany")))
.then(getTotalQuantity())
).assertNext(t -> assertEquals(totalQuantity, t)).verifyComplete();
//WHEN
final Instant start = Instant.now();
final WebTestClient.ResponseSpec exchange = webClient
.patch()
.uri(BASE_URL)
.contentType(MediaType.MULTIPART_FORM_DATA)
.accept(MediaType.APPLICATION_STREAM_JSON)
.body(BodyInserters.fromMultipartData(ProductTestUtil.getMultiPartFormData("products_1M.xlsx")))
.exchange();
//THEN
exchange
.expectStatus()
.isAccepted()
.expectBodyList(FileUploadDto.class)
.contains(ProductTestUtil.mockFileUploadDto(fileName, count));
StepVerifier.create(getTotalQuantity())
.assertNext(t -> assertEquals(totalQuantity * 2, t))
.verifyComplete();
log.debug(
"============= shouldPatchProductQuantityBigFile elapsed {}minutes =============",
Duration.between(start, Instant.now()).toMinutes()
);
}
1) The general setup is similar to the ProductControllerITTest
;
2) Unzip a json file containing 1 million products which requires about 254M on a disk;
3) Transactions have a lifetime limit as specified by transactionLifetimeLimitSeconds
which is 60 seconds by default. We need to increase it here, because generally it takes more than 60 s to process such a file. For this, we use a helper method to change this lifespan to 900 s (see the implementation details below). For your information, the REST call with the file takes GitHub Actions about 9-12 minutes;
4) Before processing, we clean up a product collection, insert 1 million products from the json file and then get the total of the quantity;
5) Given the products in the json file and the big excel file are equal, we assert that the total quantity of the product after processing should double.
Such a test requires a relatively big heap of about 4GB (see Figure 6) and Docker's memory resource of about 6GB (see Figure 7):
As we can see, it is sensible to configure the maximum amount of disk space allowed for file parts and the maximum number of parts allowed in a given multipart request. Which is why I added properties to a proper application.yml file and then set them in the configureHttpMessageCodecs
method of the implemented WebFluxConfigurer
. However, adding Rate Limiter and configuring Schedulers
might be a better solution in production environment. Note that we use Schedulers.boundedElastic()
here having a pool of 10 * Runtime.getRuntime().availableProcessors()
threads by default.
Here is TransactionUtil
containing the above-mentioned helper methods:
public class TransactionUtil {
private TransactionUtil() {
}
public static void setTransactionLifetimeLimitSeconds(
final int duration,
final String replicaSetUrl
) {
setMongoParameter("transactionLifetimeLimitSeconds", duration, replicaSetUrl);
}
public static void setMaxTransactionLockRequestTimeoutMillis(
final int duration,
final String replicaSetUrl
) {
setMongoParameter("maxTransactionLockRequestTimeoutMillis", duration, replicaSetUrl);
}
private static void setMongoParameter(
final String param,
final int duration,
final String replicaSetUrl
) {
try (final MongoClient mongoReactiveClient = MongoClients.create(
ConnectionUtil.getMongoClientSettingsWithTimeout(replicaSetUrl)
)) {
StepVerifier.create(mongoReactiveClient.getDatabase("admin").runCommand(
new Document("setParameter", 1).append(param, duration)
)).expectNextCount(1)
.verifyComplete();
}
}
}
4. How can I play with the code?
HTTP API for small WMS (warehouse management system) based on Spring boot 2 Reactive WebFlux and MongoDB 4 demonstrating the use of document transactions.
Prerequisite
- Java 8
- Docker (was tested on version 18.09.2)
Article on DEV Community
The Testcontainers’ MongoDB Module and Spring Data MongoDB in Action
General info
The application allows:
- create new products with article verification (duplicate key error collection: wms.product index: article occurs while trying to insert products with an identical id);
- find products dto(user's representation) by either name or brand (full equality, may be soften in ProductDao via clear DSL);
- find all products (entity representation);
- upload xlsx file (other formats are not currently supported) in order to update current product quantities Matching is done by article and size. The app informs about inconsistencies between the products that are supposed to be patched and MongoDB After uploading, files are kept in a storage.bulk-upload-path/userName folder and then removed…
5. What’s in it for me?
1) The MongoDBContainer
takes care of the complexity in the MongoDB replica set initialization allowing the developer to focus on testing. Now we can simply make MongoDB transaction testing part of our CI/CD process;
2) While processing data, it is sensible to favor MongoDB’s bulk methods, reducing the number of sub-processes within the flatMap
method of the Flux
and thus to avoid introducing the "N+1 Query problem". However, it also comes at a price because here we need to collect a list of UpdateOneModel
and keep it in memory lacking reactive flexibility;
3) When it comes to skipping processing, one might employ onErrorResume
instead of the dangerous onErrorContinue
onErrorContinue() design #2184
Not a bug per-se, but a discussion - is onErrorContinue()
(and associated onErrorStop()
) the right solution, and if not, can the same functionality be designed in a more obvious way?
I keep noticing people tripping up on it, and I'd be lying if I said it also hadn't caught me out a few times. There's two main issues I keep seeing with onErrorContinue()
.
Friendly name
There's no hint from the name or Javadoc that it's a specialist operator that should be used with care - it seems analogous to onErrorResume()
. The name seems very "friendly" - as in "hey, that name looks like what I want to do, so I'll use it!" This is in contrast to (for example) block()
, which makes it obvious it's not something you want to do.
The Javadoc doesn't help much either - there's only a small note below the diagram that "this error handling mode is not necessarily implemented by all operators". I've therefore understandably seen people use onErrorContinue()
in complete ignorance that it's a specialist operator they should use with care (onErrorResume()
would have been a better choice in almost all those cases.)
Complex reasoning
However, I think the problem goes deeper than the above. Even for an admittedly "specialist" operator that needs to be used with care, and even for someone who knows their way around reactor reasonably, it still makes a stream incredibly hard to reason about in a whole bunch of scenarios:
Swallowing unexpected exceptions
There's certain exceptions (or exceptions that occur in certain places) that you wouldn't necessarily expect it to "swallow", but it does (such as retry exceptions, as per this issue raised a while ago.)
Going "too far" up the chain
This one is a bit looser, but library writers returning a Flux
will often assume, as per the reactive streams spec, that an error will always cause the Flux
to halt, where they can either let the operator propagate or resume to take corrective action. If someone has called onErrorContinue()
downstream, this can mess up the library code in a way the author didn't intend, causing unexpected consequences.
This could of course be fixed by all library writers appending onErrorStop()
to each publisher at the end of the chain, but in reality no-one seems to do that.
No operator chaining
Since onErrorContinue()
only affects operators above it, and doesn't chain with other onErrorContinue()
calls, you wind up with counter-intuitive situations like this:
Flux.range(1,10).flatMap(x -> Mono.error(new RuntimeException()))
.onErrorContinue(IllegalArgumentException.class, (t,o)-> System.out.println("A")) //Not called
.onErrorContinue(RuntimeException.class, (t,o)-> System.out.println("B")) //Not called
...which of course, won't cause either of those blocks to be executed since the IllegalArgumentException
block is declared first.
Inconsistent continuing with a single value
If the Flux
just contains a single value through use of Flux.just()
, continuing doesn't seem to always work reliably:
Flux.just(1,1)
.flatMap(ctx -> Flux.push((sink)->{ throw new RuntimeException();}))
.onErrorContinue((ex, obj)->{
System.err.println("Caught"); //Called, prints twice
})
...but:
Flux.just(1)
.flatMap(ctx -> Flux.push((sink)->{ throw new RuntimeException();}))
.onErrorContinue((ex, obj)->{
System.err.println("Caught"); //Not called
})
...but:
Flux.just(1)
.flatMap(x -> Mono.error(new RuntimeException()))
.onErrorContinue((ex, obj)->{
System.err.println("Caught"); //Called
})
(I can't quite reason about what's causing this - it feels like a bug more than expected behaviour.)
Unclear "cause" - inner or outer?
The cause object isn't necessarily unambiguous - what's the cause here? I've seen cases where people would expect it to be 1
(the top level object), but of course it's actually A
(the inner object.)
Flux.just(1)
.flatMap(i -> Flux.just("A").flatMap(x -> Mono.error(new IllegalArgumentException())))
.onErrorContinue((ex, obj) -> {
System.out.println(obj); //A
})
Unclear "cause" #2
...but what about when there is no "cause"?
Flux.just(1,1)
.flatMap(i -> Flux.push((sink)->{ throw new IllegalArgumentException("bum!");}))
.onErrorContinue((ex, obj) -> {
System.out.println(obj); //null
})
...it's null
, which follows the "inner" pattern of the above, but that isn't necessarily obvious - I've seen some who would expect it to fallback to the outer cause (i) rather than being null.
I'm sure there's a bunch of other non-obvious behaviour situations - and I'm deliberately omitting the more "obvious" caveats that apply (such as operator support has to be explicit, it affects above the stream not below it, etc.) My point is that while most operators are reasonably simple to reason about, onErrorContinue()
is a big exception to this rule that can make writing & maintaining reactive code far harder than it needs to be.
Possible solutions
I'm not sure on this, really. Changes go from "nothing" to "light touch" to "redesign somehow". Some possible solutions might be:
- Mostly leave everything as-is, but make it much clearer in the Javadoc that
onErrorContinue()
is a specialist operator that should be avoided wherever possible; - Rename the method to something a bit more "warning-inducing", such as
ignoreUpstreamErrorsWherePossible()
(just an "off the cuff" example) - Do something with types - maybe:
- Introduce a different type of
Flux
that supportsonErrorContinue()
, and that type only works with supported operators; - Introduce a type that forces only one "in-bound"
onErrorContinue
in the whole chain at compile time (can convert to the type by means of usingonErrorContinue()
and back by usingonErrorStop()
- Introduce a different type of
- Introduce more overloaded methods for supported operators that take an "error handler" for continuing rather than setting it globally in the chain;
- Remove
onErrorContinue()
entirely (not feasible I suspect, are there any situations where this behaviour simply can't be emulated by using other operators?)
Thanks for reading the ramble - would be interesting to hear anyone's thoughts...!
4) Even though are we allowed to set
maxTransactionLockRequestTimeoutMillis
and transactionLifetimeLimitSeconds
as parameters during start-up to mongod, we may achieve the effect by calling the MongoDB's adminCommand
via helper methods;5) Processing big files is resource-consuming and thus better be limited.
6. Want to go deeper?
To construct a multi-node MongoDB replica set for testing complicated failover cases, consider:
silaev / mongodb-replica-set
Java8 MongoDbReplicaSet to construct a full-featured MongoDB cluster for integration testing, reproducing production issues, learning distributed systems by the example of MongoDB
Java8 MongoDBReplicaSet to construct a full-featured MongoDB cluster for integration testing, reproducing production issues, learning distributed systems by the example of MongoDB
Prerequisite
-
Java 8+
-
Docker Desktop
-
Chart shows local and remote docker support for replicaSetNumber
replicaSetNumber local docker host local docker host running tests from inside a container with mapping the Docker socket remote docker daemon availability of an arbiter node 1 + + + - from 2 to 7 (including) only if adding 127.0.0.1 dockerhost to the OS host file + + +
Tip A single node replica set is the fastest among others. That is the default mode for MongoDbReplicaSet However, to use only it, consider the Testcontainers MongoDB module on GitHub
Getting it
- Gradle:
dependencies {
testCompile("com.github.silaev:mongodb-replica-set:${LATEST_RELEASE}")
}
- Maven:
<dependencies>
<dependency>
<groupId>com.github.silaev</groupId>
<artifactId>mongodb-replica-set</artifactId>
<version>${LATEST_RELEASE}</version
…
Top comments (5)
Does MongoDBContainer work with Reusable containers?
Yes, it does. If need be, you can separate data via a database name.
If you don’t use a framework, you are free to go with
MongoClient#getDatabase("my-db")
.However, a framework (for instance, Spring Data MongoDB) might get a db name from a url, that one can get via
MongoDBContainer#getReplicaSetUrl()
. There are 2 choices:1) add as a workaround your db name to a url, like
MongoDBContainer#getReplicaSetUrl() + "-my-db"
, which, for example, results inmongodb://localhost:32880/test-my-db
. Just make sure that such a db name is unique for each test;2) wait for this PR to be merged.
Tried your workaround solution on your PatchProductLoadITTest and ProductControllerITTest by adding:
Put it simply, my tests stop each
MongoDBContainer
in@AfterAll
methods. I’ve addedif (!MONGO_DB_CONTAINER.isShouldBeReused())
statement to stopMongoDBContainer
only if we do not use Testcontainers' reusable feature. See this commit for more details. Thanks for you comment.Thanks for this detailed article, very helpful!