Elixir EventBus is a library that allows different modules to communicate with each other without knowing about each other. A module/function can create an Event struct, and deliver to the EventBus without knowing which modules will consume.
Modules can also listen to events on the EventBus, without knowing who sent the events. Thus, modules can communicate without depending on each other. Moreover, it is very easy to substitute a consumer module. As long as the new module understands the Event
struct that is being sent and received, the other modules will never know.
Get started with decoupled modules in 4 steps
Decoupled modules help us to write clear code, focus on only one thing at a time. With EventBus library, you can get benefit of real event bus system and write decoupled modules easily.
Registering event topics
On your app init, or anytime before delivering the events we need to register our topics. For example, when we receive a payment, we will create checkout_completed
event.
EventBus.register_topic(:checkout_completed)
EventBus.register_topic(:checkout_failed)
Delivering events
EventBus.EventSource
module provides notify helper to deliver events without modifying your current code.
defmodule Order do
...
use EventBus.EventSource
...
def checkout(params) do
event_params = %{topic: :checkout_completed, error_topic: :checkout_failed}
EventSource.notify(event_params) do
... # process the payment as usual in here and if errors then return {:error, _} tuple
end
end
end
Subscribing consumers to topics
Assume that we have several operations waiting on success and failure conditions. Let's subscribe the consumers to the event topics.
EventBus.subscribe({StockUpdateService, ["^checkout_completed$", "^new_stock_arrived$", ...]})
EventBus.subscribe({CargoService, ["^checkout_completed$, "^cargo_dispatched$", "^new_stock_arrived$", ...]})
EventBus.subscribe({EmailService, ["^user_registered$", "^checkout_completed$", ...]})
EventBus.subscribe({EventBus.Logger, [".*"]})
Consuming events
EventBus consumers are just modules so they can be implemented by any kind of consumer strategy including GenStage(event_bus_postgres as sample), pooling, DynamicSupervisors, global consumers, spawn et al. So, depending on your use case, implement your consumers with the best suitable strategy. (But any kind of blocker strategies are not recommended!)
defmodule CargoService do
use GenServer
...
def process({topic, id}) do
GenServer.cast(__MODULE__, event_shadow)
end
def handle_cast({topic, id}, state) do
payment_data = EventBus.fetch_event_data({topic, id})
# do sth with payment_data
...
# mark event as completed for this consumer
EventBus.mark_as_completed({CargoService, topic, id})
{:noreply, state}
end
end
Here is another consumer that needs all event structure and consumes with spawning:
defmodule StockUpdateService do
...
def process({topic, id}) do
spawn(fn ->
event = EventBus.fetch_event({topic, id})
# do sth with event.data and/or any other event attributes
...
# mark event as completed for this consumer
EventBus.mark_as_completed({StockUpdateService, topic, id})
end)
end
end
About the EventBus library
EventBus library is a traceable, extendible, fast, and memory friendly pure Elixir implementation without any external dependency in the pocket.
Traceable
EventBus library comes with good optional attributes to provide traceability. When a consumer receive the topic and event_id, it can fetch structure with fetch_event/1
function. And if you use EventBus.EventSource helper, the optional fields are set automatically for you. Here is the structure of an Event model:
%EventBus.Model.Event{
id: String.t | integer(), # required
transaction_id: String.t | integer(), # optional
topic: atom(), # required
data: any() # required,
initialized_at: integer(), # optional
occurred_at: integer(), # optional
source: String.t(), # optional
ttl: integer() # optional
}
Extendible
EventBus library allows subscribing events with regex patterns, which allows consumers to subscribe new topics automatically. With this feature, it allows extending the system asynchronously with generic consumers like event_bus_logger, generic Postgresql event store, event_bus_metrics UI, and so on.
Fast by design
EventBus library uses builtin memory store ETS effectively to get benefits of concurrent reads and writes. It doesn't block majority of the read operations, and allows concurrent reads to fetch Event data.
It applies queueing theory to handle inputs. And almost all implementation data accesses have O(1) complexity.
Memory friendly
EventBus library delivers only topic and event_id to the consumers and keeps the original event data in the topic's Event table. So, only when consumers are able to process the event, then they fetch the event data.
Distributed?
Even though the main intention to create this library is to use for internal module communications, it is possible to deliver events across nodes. One of the ways to delivery is sending message to connected other nodes as usual Elixir/Erlang way:
for node <- Node.list() do
Node.spawn(node, EventBus, :register_topic, [:checkout_completed])
Node.spawn(node, EventBus, :notify, [%{EventBus.Model.Event{..}}])
end
Note: EventBus library is not in the same class distributed solutions like Apache Kafka, RabbitMQ, or Redis Streams.
Original story on Medium: https://medium.com/elixirlabs/decoupled-modules-with-elixir-eventbus-a709b1479411
The source code of EventBus library: https://github.com/otobus/event_bus
Top comments (0)