Introduction
Our Rails application is an old monolith that relies heavily on after/before callbacks to trigger code that has side effects. This usually means that one model can change records of another model. I suspect that we all have come across this "callback hell" at least once in our professional life.
We needed to introduce a new service for search. As we settled on using meilisearch, we needed a way to sync updates on our models with the records in meilisearch. We could've continued to use callbacks but we needed something better.
We settled on a Publish/Subscribe (Pub/Sub) pattern.
Pub/Sub is a design pattern where publishers broadcast messages to topics without specifying recipients, and subscribers listen for specific topics without knowing the source, promoting loose coupling.
Exploring Pub/Sub Solutions
While searching for a solution we came across these:
ActiveSupport::Notification: A Rails component for defining and subscribing to events within an application.
-> Active::Support is synchronous by default. It does not handle errors and retries out of the box. We would need to use a background job handler (like Sidekiq) to make it asynchronous.Wisper: A Ruby gem providing a decoupled communication layer between different parts of an application
-> I personally dislike wisper. I used it in the past and dislike the way of defining subscribers in a global way. I wanted topics to be arbitrary and each class to define what to subscribe for itself.Dry-events: A component of the dry-rb ecosystem focused on event publishing and subscription
-> This gem looks like a framework on top of which I would need to create my own Pub/Sub. It also does not do asynchronous execution out of the box.
We finally settled on Sidekiq!
Sidekiq is a background job processing tool, which we already use in our application and we figured a way to use it for processing Pub/Sub messages.
Implementation
Implementation is quite straightforward.
class PubSub
class SubscriberJob < ApplicationJob
queue_as :pub_sub
def perform(message:, class_name:, handler:)
class_name.constantize.public_send(handler, message)
end
end
include Singleton
# Publish a message to topic
#
# @param topic [String]
# @param message [Hash]
def self.publish(topic, **message)
instance.subscribers(topic).each do |subscriber|
PubSub::SubscriberJob.perform_later(message: message, **subscriber)
end
end
# Subscribe a class + handler to a topic
#
# @param topic [String]
# @param class_name [String]
# @param handler [String]
def self.subscribe(topic, class_name, handler, async: true)
instance.subscribe(topic, class_name, handler)
end
def initialize
@subscribers = {}
end
# return subscribers for the topic
#
# @param topic [String]
# @return [Array<Hash>] { class_name: String, handler: String}
def subscribers(topic)
@subscribers.fetch(topic, Set.new)
end
# Subscribe a class + handler to a topic
#
# @param topic [String]
# @param class_name [String]
# @param handler [String]
def subscribe(topic, class_name, handler, async: true)
@subscribers[topic] ||= Set.new
@subscribers[topic] << { class_name: class_name.to_s, handler: handler }
end
end
Usage
Usage is also quite straightforward:
class Search::Tasks::Indexer
PubSub.subscribe('task.upsert', self, :on_upsert) )
def self.on_upsert(message)
item = message[:item]
MeilisearchClient.reindex(item.to_meilisearch)
end
end
class TaskForm < ApplicationForm
def save
super
PubSub.publish('task.upsert', item: self)
end
end
Experience
After 6 months of using this method, I can safely say that it works as intended.
- It handles unexpected errors like network errors gracefully
- It was easy to use where we needed it
- Classes that handle the incoming messages are separated from the rest of the models and are easy to unit test
📖 Sidestory: Rethinking Pub-Sub
Recently Buha wrote a blog post how he gave up on the Pub/Sub approach. It is a good read to see the downsides of this approach.
Conclusion
We solved the problem of pub/sub messaging in our product with the help of Sidekiq. It proved to be reliable and of high quality, and the implementation itself is not complicated.
What are your experiences with Sidekiq? Are you using something else?
Top comments (3)
Thanks for your post!
Can you tell us about deserialization?
Sidekiq 7+ prefers hash object as json to handle it and in your example, you are passing
self
initem
named parameter. Are you handle object in publish or in other part?Hi! Thanks for sharing this. We use whisper on a daily basis and it really feels overwhelming. Your approach looks very simple and easy to use. Have you encountered any issues ?
On the execution side, no. We have been running this for the last 6 months and haven't touched that code since :)
The problem we encountered is adoption. People just like after_commit hooks better for some reason...