Hi Ian, interesting conundrum. The query plan is much more complicated than what it seems from the worker's code, I guess that .contacts isn't that straigthforward.
At first glance there's a lot going on in that worker.
Let's see if we can tackle it a piece at a time:
the global lock
message.with_lockdo
this sets an exclusive lock on the row for the duration of the transaction opened by .with_lock. So we know that this blocks all transactions from accessing that row, but it also waits if the row is being accessed by another transaction.
I'm not sure you need to lock the row for so long as the state machine methods (let's call them like this :D) seems to be clustered at the top. Could it make sense to only lock the row around those methods and prepare the batch outside the lock?
Basically when you start preparing the batches, you're done with message, I think you can release the lock by then.
the count
ifmessage.contacts.count==0
this issues a SELECT COUNT(*) which seems fine but it can probably be optimized by using if messages.contacts.none? which issues a SELECT ... LIMIT 1. You may want to test both speeds as I'm uncertain about the schema.
the batching
seen=Set.new({})message.contacts.select(:id).find_in_batchesdo|contact_batch|args=contact_batch.pluck(:id).mapdo|contact_id|nextunlessseen.add?(contact_id)# add? returns nil if the object is already in the set[message_guid,contact_id]endSidekiq::Client.push_bulk('class'=>BroadcastMessageDeliverWorker,'args'=>args.compact)end
Here you're loading contacts in batches, using an in memory set to make sure you haven't seen the contact_id already. I have a question: why can they be duplicate in the first place if the contact ids come from message.contacts ?
Another issue with that set is that you're storing, for the entire duration of the job all the IDs in memory. According to the plan you have tens of thousands of them.
I then re-read the code and thought about a different approach to the whole thing (aside from the long locking).
job #1 counts the total and splits the size evenly distributed (basically its goal would be to create a bunch of start and finish parameters to give to find_in_batches). It then creates a bunch of jobs #2
job #2 takes the parameters and does the selecting and preparation of the bulks for Sidekiq
job #3 is what actually sends the emails
This as you say would reduce the running time by splitting multiple concurrent jobs in smaller SELECTs.
You still have to solve the problem of the duplication. But there might be a solution if you use Redis and its atomic transactions: redis.io/topics/transactions
The other idea, untested, here is to have job #1 split the ids and queueing job #2 N times (1 for each pair of start and finish offsets). job #2 doesn't directly calls the "send email jobs" BUT prepares data in Redis. When the batch of jobs #2 is completed. Then you fire off the batches of job #3. When job #3 starts it will check with Redis if it actually has to send the email or not (the not will come by Redis basically telling them "someone else already sent an email to this contact id").
Hi Ian, interesting conundrum. The query plan is much more complicated than what it seems from the worker's code, I guess that
.contacts
isn't that straigthforward.At first glance there's a lot going on in that worker.
Let's see if we can tackle it a piece at a time:
this sets an exclusive lock on the row for the duration of the transaction opened by
.with_lock
. So we know that this blocks all transactions from accessing that row, but it also waits if the row is being accessed by another transaction.I'm not sure you need to lock the row for so long as the state machine methods (let's call them like this :D) seems to be clustered at the top. Could it make sense to only lock the row around those methods and prepare the batch outside the lock?
Basically when you start preparing the batches, you're done with message, I think you can release the lock by then.
this issues a
SELECT COUNT(*)
which seems fine but it can probably be optimized by usingif messages.contacts.none?
which issues aSELECT ... LIMIT 1
. You may want to test both speeds as I'm uncertain about the schema.Here you're loading contacts in batches, using an in memory set to make sure you haven't seen the
contact_id
already. I have a question: why can they be duplicate in the first place if the contact ids come frommessage.contacts
?I had immediately thought of:
but you said that's too slow as well.
Another issue with that set is that you're storing, for the entire duration of the job all the IDs in memory. According to the plan you have tens of thousands of them.
I then re-read the code and thought about a different approach to the whole thing (aside from the long locking).
start
andfinish
parameters to give tofind_in_batches
). It then creates a bunch of jobs #2This as you say would reduce the running time by splitting multiple concurrent jobs in smaller
SELECT
s.You still have to solve the problem of the duplication. But there might be a solution if you use Redis and its atomic transactions: redis.io/topics/transactions
The other idea, untested, here is to have job #1 split the ids and queueing job #2 N times (1 for each pair of start and finish offsets). job #2 doesn't directly calls the "send email jobs" BUT prepares data in Redis. When the batch of jobs #2 is completed. Then you fire off the batches of job #3. When job #3 starts it will check with Redis if it actually has to send the email or not (the not will come by Redis basically telling them "someone else already sent an email to this contact id").
Let me know if any of this makes sense :D
Just found this Sidekiq wiki page called Really Complex Workflows with Batches.