Suppose you need to store documents in MongoDB from an external source such as a RabbitMQ or Kafka queue. The order in which these messages are processed is not guaranteed, especially if there are several processes running in parallel.
To ensure that the latest version of a document is always available, and that it is not overwritten by an older version from another message that was processed later, a version number can be used. The higher the version number, the more recent the document.
The external data source must then include the document version number with each message. This version number can be an increment or simply the timestamp at which the data source generated the message.
In MongoDB, we use the updateOne
method to update a document, but to ensure that we don't overwrite a more recent version, we add a condition that checks that the version number of the document in the database is lower than that of the document we wish to insert.
use MongoDB\Collection;
/**
* @param Collection $collection The MongoDB collection
* @param int $id The ID of the document to update
* @param int $version The version of the new document
* @param array $document The new document
* @return bool True if the new version have been saved, false if it was outdated
*/
function optimisticUpsert(Collection $collection, int $id, int $version, array $document): bool
{
// The document id and version are stored in the document itself
$document['_id'] = $id;
$document['_version'] = $version;
$result = $collection->updateOne(
['_id' => $id],
[
['$replaceWith' => [
'$cond' => [
'if' => ['$lt' => ['$_version', $version]],
'then' => $document,
'else' => '$$ROOT',
],
]],
],
['upsert' => true],
);
return $result->getUpsertedCount() > 0 || $result->getModifiedCount() > 0;
}
The MongoDB server is smart enough to know that there are no changes
when the stored version was already more recent than the one you wanted to insert.
If $result->getUpsertedCount()
is greater than 0, this means that the document did not exist and has been inserted. If $result->getModifiedCount()
is greater than 0, this means that the document already existed and has been updated. Otherwise, it means
that the document version was already more recent than the one to be inserted.
More information on update operations can be found in the documentation :
- Expression $replaceWith.
- Operator $cond.
- Method updateOne.
Top comments (0)