I can see why you would think that. But I'm not using notifications in the way that most event stores use them -- to push events to listeners. For one thing, Postgres NOTIFY payloads are too small for that to work. For another I don't like the push model for listeners; I prefer a pull model.
Using these notifications, you can setup the listener either way -- to process all events, or to only listen for the latest happenings.
To process all events, the listener uses notifications simply to be informed of a new event and it's position (SequenceNum). If the sequence number is higher than its last processed one, it will fetch all new events starting from it's last processed SequenceNum. The listener tracks its own last processed position and loads events sequentially. It doesn't depend on notifications for tracking. No events are missed.
But there are cases where the listener doesn't care about processing every event, only the most recent. In that case, the listener loads the specific event referenced in the notification. Some events will be missed (e.g. listener node failure), but this kind of listener isn't concerned about that.
P.S. As far as notifications and concurrency. With Postgres NOTIFY, only committed events trigger notifications. They are never triggered when the transaction aborts, such as when there is a unique key constraint (concurrency) violation.
To process all events, the listener uses notifications simply to be informed of a new event and it's position (SequenceNum). If the sequence number is higher than its last processed one, it will fetch all new events starting from it's last processed SequenceNum.
Consider two transactions T1 and T2:
T1 starts, obtains next sequence number N from Postgres.
T1 stalls for whatever reason.
T2 starts, obtains next sequence number N+1 from Postgres.
T2 commits, sends notification.
Listener sees N+1 is higher than $last, retrieves all events since $last including N+1, sets $last := N+1.
T1 revives, commits, sends notification.
Listener sees N, not higher than $last (= N+1), ignores.
=> missed event.
(AFAIK the only way to avoid this with this architecture is to have a single writer per partition...).
The case of "T1 stalls for whatever reason." is certainly possible, but unlikely for us so far. Aside from not yet reaching the IOPS to make this likely, transactions are literally BEGIN ... INSERT ... INSERT ... COMMIT, all sent as a single (maybe even prepared) statement. Although I'm not an expert in what effects auto-vaccuming pauses can have.
If I run into this problem, I will have to give this more of a think. A temporary workaround that would work for us is to rebuild read models if this happens.
Yes, beside from that issue, Postgres is a great choice and we are using a similar setup as the one you are describing here. We are currently overcoming this by having a single threaded writer, which is still fast enough given that transactions are really fast for us, as you mentioned, and the single writer is certainly not the bottleneck :)
Keep in mind, however, that even if it's very, very unlikely, given that you might store tens of millions of events per year (which is not uncommon even for a moderate business software application with a couple of users), even a 0.0001% chance per event means you are likely to encounter that race condition about 10,000,000 * 0.0001% = 10 times a year. From my own negative experience, I can tell you that a) this will happen, unfortunately, and b) this is one of those bugs that are very hard to find :-)
Anyways, keep up the good work, this is a very nice usage of the notify feature in postgres I hadn't considered before.
On quick search, I found this article on gapless sequences. Using what was described there would also enforce a totally ordered sequence number. This will have a performance impact, not sure how much. Perhaps a single writer is a better solution overall for now.
I am concerned that business logic (command processing) will have to scale compute resources. And currently I planned to colocate the writer with the command processing, hence the multiple concurrent writers as command processing scales. I guess good problems to have and figure out when I get there. But I don't like the idea of the single writer being its own separate service.
Here is a quick sketch of what the solution would look like that supported multiple concurrent writers. Using the strategy from that link. This is accomplished by making sequence number generation part of the parent transaction. I even added some rules to make the Event table append-only and the sequence counter update only to prevent maintenance mistakes. Also inspired by that link.
Here, I also renamed SequenceNum to Position.
--DROP TABLE IF EXISTS PositionCounter CASCADE;--DROP FUNCTION IF EXISTS NextPosition();--DROP TABLE IF EXISTS Event CASCADE;--DROP TRIGGER IF EXISTS trg_EventRecorded ON Event;--DROP FUNCTION IF EXISTS NotifyEvent();-- transactional sequence numberCREATETABLEIFNOTEXISTSPositionCounter(PositionbigintNOTNULL);INSERTINTOPositionCounterVALUES(0);-- prevent removal / additional rowsCREATERULErule_positioncounter_noinsertASONINSERTTOPositionCounterDOINSTEADNOTHING;CREATERULErule_positioncounter_nodeleteASONDELETETOPositionCounterDOINSTEADNOTHING;-- function to get next sequence numberCREATEFUNCTIONNextPosition()RETURNSbigintAS$$DECLAREnextPosbigint;BEGINUPDATEPositionCounterSETPosition=Position+1;SELECTINTOnextPosPositionFROMPositionCounter;RETURNnextPos;END;$$LANGUAGEplpgsql;-- event tableCREATETABLEIFNOTEXISTSEvent(PositionbigintNOTNULL,TenantIduuidNOTNULL,StreamIduuidNOTNULL,VersionintNOTNULL,TypetextNOTNULL,MetajsonbNOTNULL,Datajsonb,LogDatetimestamptzNOTNULLDEFAULTnow(),CONSTRAINTpk_event_positionPRIMARYKEY(TenantId,Position),CONSTRAINTuk_event_streamid_versionUNIQUE(TenantId,StreamId,Version))PARTITIONBYLIST(TenantId);-- Append onlyCREATERULErule_event_nodeleteASONDELETETOEventDOINSTEADNOTHING;CREATERULErule_event_noupdateASONUPDATETOEventDOINSTEADNOTHING;-- notificationCREATEFUNCTIONNotifyEvent()RETURNStriggerAS$$DECLAREpayloadtext;BEGIN-- { position }/{ tenantId }/{ streamId }/{ version }/{ event type }SELECTCONCAT_WS('/',NEW.Position,NEW.TenantId,REPLACE(CAST(NEW.StreamIdAStext),'-',''),NEW.Version,NEW.Type)INTOpayload;-- using lower case channel name or else LISTEN would require quoted identifier.PERFORMpg_notify('eventrecorded',payload);RETURNNULL;END;$$LANGUAGEplpgsql;CREATETRIGGERtrg_EventRecordedAFTERINSERTONEventFOREACHROWEXECUTEPROCEDURENotifyEvent();
And here's how to append an event with the transactional sequence number.
-- inside a transactionINSERTINTOEvent(Position,TenantId,StreamId,Version,Type,Meta,Data)VALUES(NextPosition(),'3791B53677C840FC81CA65BFDAF34F1F','A88F94DB6E7A439E9861485F63CC8A13',1,'EmptyEvent','{}',NULL);
Would it not be simpler instead to use explicit table lock when inserting events instead of PositionCounter? It limits performance but I believe result is same as PositionCounter table because it too needs a row level lock
begin;
lock table event;
insert into event ...
commit
I can see why you would think that. But I'm not using notifications in the way that most event stores use them -- to push events to listeners. For one thing, Postgres NOTIFY payloads are too small for that to work. For another I don't like the push model for listeners; I prefer a pull model.
Using these notifications, you can setup the listener either way -- to process all events, or to only listen for the latest happenings.
To process all events, the listener uses notifications simply to be informed of a new event and it's position (SequenceNum). If the sequence number is higher than its last processed one, it will fetch all new events starting from it's last processed SequenceNum. The listener tracks its own last processed position and loads events sequentially. It doesn't depend on notifications for tracking. No events are missed.
But there are cases where the listener doesn't care about processing every event, only the most recent. In that case, the listener loads the specific event referenced in the notification. Some events will be missed (e.g. listener node failure), but this kind of listener isn't concerned about that.
P.S. As far as notifications and concurrency. With Postgres NOTIFY, only committed events trigger notifications. They are never triggered when the transaction aborts, such as when there is a unique key constraint (concurrency) violation.
Yes, I understood that. The error is in here:
Consider two transactions T1 and T2:
T1 starts, obtains next sequence number N from Postgres.
T1 stalls for whatever reason.
T2 starts, obtains next sequence number N+1 from Postgres.
T2 commits, sends notification.
Listener sees N+1 is higher than $last, retrieves all events since $last including N+1, sets $last := N+1.
T1 revives, commits, sends notification.
Listener sees N, not higher than $last (= N+1), ignores.
=> missed event.
(AFAIK the only way to avoid this with this architecture is to have a single writer per partition...).
Ah, I see. Excellent point.
The case of "T1 stalls for whatever reason." is certainly possible, but unlikely for us so far. Aside from not yet reaching the IOPS to make this likely, transactions are literally
BEGIN ... INSERT ... INSERT ... COMMIT
, all sent as a single (maybe even prepared) statement. Although I'm not an expert in what effects auto-vaccuming pauses can have.If I run into this problem, I will have to give this more of a think. A temporary workaround that would work for us is to rebuild read models if this happens.
Anyway, I'm glad you mentioned this. Thank you!
Yes, beside from that issue, Postgres is a great choice and we are using a similar setup as the one you are describing here. We are currently overcoming this by having a single threaded writer, which is still fast enough given that transactions are really fast for us, as you mentioned, and the single writer is certainly not the bottleneck :)
Keep in mind, however, that even if it's very, very unlikely, given that you might store tens of millions of events per year (which is not uncommon even for a moderate business software application with a couple of users), even a 0.0001% chance per event means you are likely to encounter that race condition about 10,000,000 * 0.0001% = 10 times a year. From my own negative experience, I can tell you that a) this will happen, unfortunately, and b) this is one of those bugs that are very hard to find :-)
Anyways, keep up the good work, this is a very nice usage of the notify feature in postgres I hadn't considered before.
I really appreciate your feedback!
On quick search, I found this article on gapless sequences. Using what was described there would also enforce a totally ordered sequence number. This will have a performance impact, not sure how much. Perhaps a single writer is a better solution overall for now.
I am concerned that business logic (command processing) will have to scale compute resources. And currently I planned to colocate the writer with the command processing, hence the multiple concurrent writers as command processing scales. I guess good problems to have and figure out when I get there. But I don't like the idea of the single writer being its own separate service.
Here is a quick sketch of what the solution would look like that supported multiple concurrent writers. Using the strategy from that link. This is accomplished by making sequence number generation part of the parent transaction. I even added some rules to make the Event table append-only and the sequence counter update only to prevent maintenance mistakes. Also inspired by that link.
Here, I also renamed SequenceNum to Position.
And here's how to append an event with the transactional sequence number.
Thanks for putting this together, great read!
Working on porting this to YugaByteDB.
Would it not be simpler instead to use explicit table lock when inserting events instead of
PositionCounter
? It limits performance but I believe result is same as PositionCounter table because it too needs a row level lockSimpler? It would be less to setup vs a Position table. In usage it's adding a lock statement vs calling
NextPosition()
in the insert.I suppose you will just have to test for the performance difference between the two.