Streams are a relatively new feature in Redis. If you've already used them it's possible that your use case is similar to the following one:
- Events are flowing into your system and you store them in a Redis stream.
- Your application loads oldest N events and processes them.
- Once your application finishes processing the events you want to delete the N oldest events you've just processed.
The problem arises in the last step - what's the best way to delete the processed records?
One option is to use the XDEL command with ids of the events as the argument. This might be quite a heavy task for Redis, especially when removing many entries at once.
An alternative here is the XTRIM. It can work in two modes:
MINID- "Evicts entries with IDs lower than threshold, where threshold is a stream ID.". This may work for you if you're using Redis >= 6.2 - you'd just use the ID of the newest processed record as an argument and you're done. However, if you're using Redis 5 (like myself) you're unlucky...
MAXLEN- "Evicts entries as long as the stream's length exceeds the specified threshold". It doesn't really fit our use case, because we want to do the complete opposite. We want to evict only
Nentries and we don't care how many items will be left after this operation.
Now, even though we can't use the
XTRIM command in
MAXLEN mode we can still make use of it indirectly...
Notice that if we subtract
N from the total number of items in the stream we'll know how many items should remain in the stream after our desired operation. If we could feed
MAXLEN mode with that value we're done.
The problem is how to do it atomically to make sure that the number of items in the stream doesn't change between
XTRIM commands are executed?
Redis provides the EVAL command that allows executing any program atomicaly.
Quoting Redis' docs: "Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed."
By running a following script we can achieve the desired effect:
eval 'return redis.call("XTRIM", "mystream", "MAXLEN", redis.call("XLEN", "mystream") - KEYS)' 1 N.
The only argument is number of stream entries to delete.