DEV Community

Cover image for Persisting event data to Postgres using GenStage and EventBus
Mustafa Turan
Mustafa Turan

Posted on

2 1

Persisting event data to Postgres using GenStage and EventBus

One of the ways to consume EventBus events is implementing GenStage consumers. GenStage handles backpressure easily with configurable workers. event_bus_postgres library uses GenStage to persist event_bus events to postgres DB with batch insert.

How it works

+-----+
|     |                                         GEN STAGE
|     |        EVENTBUS      +------------------------------------------+
|     |        CONSUMER      |                   +---+                  |
|     |        +-----+       |                   |   |                  |
|     |        |     |       |                   |   |          +---+   |
|     |        |  E  |       |                   |   |          |   |   |
|     |        |  v  |       |                   |   |          |   |   |
|     |        |  e  |       |                   |   |          |   |   |
|  E  |        |  n  |       |                   | E |          |   |   |
|  l  |        |  t  |       |  +-------+        | v |          |   |   |
|  i  | topic  |  B  |  topic   |       |        | e |          |   |
|  x  |   +    |  u  |    +     |   Q   |        | n |          | B |       +--+
|  i  |event_id|  s  | event_id |   u   |   ask  | t |    ask   | u |       |  |
|  r  |------->|  .  |--------->|   e   |<-------|   | <--------| c | BATCH |  |
|     |        |  P  |          |   u   |------->| M | -------->| k |------>|DB|
|  E  |        |  o  |          |   e   |   pull | a |    pull  | e | INSERT|  |
|  v  |        |  s  |          |       |        | p |          | t |       |  |
|  e  |        |  t  |       |  +-------+        | p |          |   |   |   +--+
|  n  |        |  g  |       |  GENSTAGE         | e |          |   |   |
|  t  |        |  r  |       |  PRODUCER         | r |          |   |   |
|  B  |        |  e  |       |                   |   |          |   |   |
|  u  |        |  s  |       |                   |   |          |   |   |
|  s  |        +-----+       |                   |   |          |   |   |
|     |<-----------------------------------------|   |          +---+   |
|     |                      |    fetch_event/1  |   |         CONSUMER |
|     |                      |                   |   |                  |
+-----+                      |                   +---+                  |
                             |                  CONSUMER                |
                             |                  PRODUCER                |
                             +------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Components

EventBus
Message bus for Elixir; it publishes event_id and topic data to topic subscribers.

EventBus.Postgres
Message bus event consumer; it pushes event_id and topic to the EventBus.Postgres.Queue

Queue
GenStage producer; it is a simple queue implementaion

EventMapper
GenStage producer-consumer; it pulls/dequeues from EventBus.Postgres.Queue, and fetch original event from EventBus and then convert data into Ecto model.

Bucket
GenStage consumer; it pulls/dequeues from EventBus.Postgres.EventMapper and batch insert data to Postgres DB.

Source code: https://github.com/otobus/event_bus_postgres

Top comments (0)

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay