What are we doing?
We're trying to atomically pop and remove multiple items from a list in Redis.
Consider a list:
0 1 2 3 4 5
I want to return the results of the first 3 items and then remove those items. I also need to guarantee that the list does not change in the time between returning the results and remove the results from the list.
Consider this example:
> RPUSH mykey 0 1 2 3 4 5 (integer) 6 > LRANGE mykey 0 2 1) "0" 2) "1" 3) "2" > LTRIM mykey 3 -1 OK > LRANGE mykey 0 99 1) "3" 2) "4" 3) "5"
This works well if you are the only one manipulating your list. But what if somebody else is manipulating the list at the same time?
> RPUSH mykey 0 1 2 3 4 5 (integer) 6 > LRANGE mykey 0 2 1) "0" 2) "1" 3) "2" > LTRIM mykey 3 -1 OK > LRANGE mykey 0 99 (empty array)
Uh-oh. Another consumer, who we will call B, ran
LRANGE mykey 0 2 and received
0 1 2. We ran the same command immediately after them and received the same results. Consumer B now runs
LTRIM mykey 3 -1, and we do too. Now the list is empty, but both consumers only received the first 3 items of the list. The final 3 are gone.
I need these two transactions,
LTRIM, to be atomic. That means that either they both execute, or neither does. In Redis, we can achieve this using transactions. All the commands in a transaction are serialized and executed sequentially. It can never happen that a request issued by another client is served in the middle of the execution of a Redis transaction. This guarantees that the commands are executed as a single isolated operation.
If you were using the Redis CLI, you would alter your code to look like this:
> MULTI OK > RPUSH mykey 0 1 2 3 4 5 QUEUED > LTRIM mykey 3 -1 QUEUED > EXEC 1) (integer) 6 2) OK
Since nobody actually uses CLIs, let's do this in Python. This script uses redis-py's Pipeline feature. Pipelines are a subclass of the base Redis class that provide support for buffering multiple commands to the server in a single request.
>>> import redis >>> my_key = 'pop_trim_test' >>> r = redis.Redis(host='localhost', port=6379) >>> r.rpush(my_key, *[x for x in range(10)]) >>> pipe = r.pipeline() >>> pipe.lrange(my_key, 0, 3) >>> pipe.ltrim(my_key, 4, -1) >>> pipe.execute() [[b'0', b'1', b'2', b'3'], True]
pipe.execute() sequentially returns the results of your pipe commands as a list. Therefore, the results of our
lrange are the first item in the list.
If you're working with a consumer model where applications wait for items in a queue, the above code can be problematic because it is constantly querying Redis. A better behavior is to use Redis' blocking pop (BLPOP) function. With BLPOP, if the key is empty, the client blocks the connection until another client performs an LPUSH or RPUSH operation against the key. It's a great resource for handling multiple consumers. You can work this in by running the above code after the consumer successfully executes a BLPOP.
Top comments (0)