One of the ways to consume EventBus events is implementing GenStage consumers. GenStage handles backpressure easily with configurable workers. event_bus_postgres
library uses GenStage to persist event_bus
events to postgres DB with batch insert.
How it works
+-----+
| | GEN STAGE
| | EVENTBUS +------------------------------------------+
| | CONSUMER | +---+ |
| | +-----+ | | | |
| | | | | | | +---+ |
| | | E | | | | | | |
| | | v | | | | | | |
| | | e | | | | | | |
| E | | n | | | E | | | |
| l | | t | | +-------+ | v | | | |
| i | topic | B | topic | | | e | | |
| x | + | u | + | Q | | n | | B | +--+
| i |event_id| s | event_id | u | ask | t | ask | u | | |
| r |------->| . |--------->| e |<-------| | <--------| c | BATCH | |
| | | P | | u |------->| M | -------->| k |------>|DB|
| E | | o | | e | pull | a | pull | e | INSERT| |
| v | | s | | | | p | | t | | |
| e | | t | | +-------+ | p | | | | +--+
| n | | g | | GENSTAGE | e | | | |
| t | | r | | PRODUCER | r | | | |
| B | | e | | | | | | |
| u | | s | | | | | | |
| s | +-----+ | | | | | |
| |<-----------------------------------------| | +---+ |
| | | fetch_event/1 | | CONSUMER |
| | | | | |
+-----+ | +---+ |
| CONSUMER |
| PRODUCER |
+------------------------------------------+
Components
EventBus
Message bus for Elixir; it publishes event_id
and topic
data to topic
subscribers.
EventBus.Postgres
Message bus event consumer; it pushes event_id
and topic
to the EventBus.Postgres.Queue
Queue
GenStage
producer; it is a simple queue implementaion
EventMapper
GenStage
producer-consumer; it pulls/dequeues from EventBus.Postgres.Queue
, and fetch original event from EventBus
and then convert data into Ecto
model.
Bucket
GenStage
consumer; it pulls/dequeues from EventBus.Postgres.EventMapper
and batch insert data to Postgres DB.
Source code: https://github.com/otobus/event_bus_postgres
Top comments (0)