One of the tools that are rarely covered in the Ruby/Rails world are the message brokers (probably because they mostly written in Java). Everyone are familiar mostly with background jobs processing, but message brokers offer a more flexible approach to asynchronous execution. For example, you can create a message from one application and process it in another and continue executing without waiting for the response.
Some benefits that you get at the architectural level: Fault Tolerance, Guaranteed delivery, Asynchronous communication(through Publish/Subscribe pattern), Loosely coupling, etc.
One of the messaging brokers that I used is ActiveMQ. ActiveMQ provides most of these features and I will consider building communication using this broker as an example.
ActiveMQ
Open source multi-protocol Messaging Broker.
Advantages:
- Supports many Cross Language Clients and Protocols
- High availability using shared storage (master/slave)
- KahaDB & JDBC options for persistence
Installing
ActiveMQ requires Java 7 to run and to build.
Brew (on MacOS)
The easier way to install
brew install apache-activemq
activemq start
Unix Binary Installation
Download the latest version here and follow up the documentation.
Docker
Unfortunately, the ActiveMQ doesn’t have official docker image. One of me
checked that I can recommend is https://hub.docker.com/r/rmohr/activemq
docker pull rmohr/activemq
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
CLI commands
Here are three most useful commands for beginning:
activemq start
— Creates and starts a broker using a configuration file.
activemq stop
— Stops a running broker.
activemq restart
— Restarts a running broker.
To see all the commands just call activemq
into terminal.
Start ActiveMQ
activemq
INFO: Loading '/usr/local/Cellar/activemq/5.15.9/libexec//bin/env'INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/bin/java'INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get detailsINFO: pidfile created : '/usr/local/Cellar/activemq/5.15.9/libexec//data/activemq.pid' (pid '61388')
Monitoring ActiveMQ
You can monitor ActiveMQ using the Web Console by pointing your browser at http://localhost:8161/admin
. Default credentials: login: admin, pass: admin
Messaging Patterns
The use of messaging brokers is simple and consists of only two concepts —
Topics and Queues. But modern tools also provide a combination of these approaches and provide additional features such as implementations of Publish/Subscribe pattern, failover, etc. But first, let’s get to know the main concepts.
Queue
Queues are the base messaging pattern. They provide direct
communication between a publisher and a subscriber. The publisher creates
messages, while the consumer reads one after another. After a message was read, it’s gone from the Queue. If the queue has multiple subscribers, only one of them will get the message.
Topic
Topic implements one-to-many communication. Unlike a queue, every subscriber will receive a message sent by the publisher. And the main problem is that the message cannot be recovered for a single listener(for example, if the service is disconnected from reading Topic).
Virtual Topics
Virtual topics combine both approaches. While the publisher sends messages to a topic, subscribers will receive a copy of the message on their own related queue.
Protocols
ActiveMQ supports most of the communication protocols such as MQTT,
OpenWire, REST, RSS and Atom, Stomp, WSIF, WebSocket and XMPP.
Getting Started
The easiest way is to start a feature review with a familiar protocol HTTP.
REST
ActiveMQ implements a RESTful API to messaging which allows any web capable device to publish messages using a regular HTTP POST or GET.
Publish to Queue
curl -u admin:admin -d "body=order_id" http://localhost:8161/api/message/shop?type=queue
Publish to Topic
curl -u admin:admin -d "body=order_id” http://localhost:8161/api/message/shop?type=topic
Integration with Ruby
The protocol that I will consider — STOMP (The Simple Text Oriented
Messaging Protocol). STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms, and brokers.
There is a great gem to work with this protocol for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, SSL support.
gem 'stomp'
bundle install
Initialize Connection
def config_hash
{
hosts: [
{
login: 'admin',
passcode: 'admin',
host: '0.0.0.0',
port: 61613,
ssl: false
}
]
}
end
client = Stomp::Client.new(config_hash)
Queues
Interface to Queues with STOMP is pretty simple. Just initialize the connection with the list of configurations and close after publishing the message.
Send a Message to Queue
client = Stomp::Client.new(config_hash)
data = { order_id: 1, command: :paid }
client.publish('/queue/user-notifications', data.to_json)
client.close
Receive a Message from Queue
client = Stomp::Client.new(config_hash)
Thread.new do
client.subscribe('/queue/user-notifications') do |msg|
begin
msg = JSON.parse(msg.body)
# message processing...
rescue StandardError => e
Raven.capture_exception(e)
end
end
end
Note: Use exception handling to respond to them in time. For example, here I use Raven — Sentry wrapper.
Topics
Topics have a similar interface to Queues. Some examples below.
Send a Message to Topic
client = Stomp::Client.new(config_hash)
data = { order_id: 1, command: :paid }
client.publish('/topic/user-notifications', data.to_json)
client.close
Receive a Message from Topic
client = Stomp::Client.new(config_hash)
Thread.new do
client.subscribe('/topic/user-notifications') do |msg|
begin
msg = JSON.parse(msg.body)
# message processing...
rescue StandardError => e
Raven.capture_exception(e)
end
end
end
Integration with Rails
ActiveMessaging — Attempt to bring the simplicity and elegance of rails development to the world of messaging.
Add a gem to Gemfile
version for Rails 5+
gem 'activemessaging', github: 'kookster/activemessaging', branch: 'feat/rails5'
And then execute:
bundle install
Initializing
After adding ActiveMessaging the following command (to add a base class for defining listeners and polling server)
rails g active_messaging:install
create app/processors/application_processor.rb
create script/poller
chmod script/poller
create script/threaded_poller
chmod script/threaded_poller
create lib/poller.rb
create config/broker.yml
gemfile daemons
Generate a listener
rails g active_messaging:processor RailsQueue
create app/processors/rails_queue_processor.rb
create config/messaging.rb
invoke rspec
create spec/functional/rails_queue_processor_spec.rb
Processor
Here you specify which will be listened by subscribes_to
. When the message is published to RailsQueue
then on_message
executes with first arguments as the message body.
class RailsQueueProcessor < ApplicationProcessor
subscribes_to :rails_queue
def on_message(message)
logger.debug 'RailsQueueProcessor received: ' + message
end
end
Destination config
In initializer, we describe the destination of queue.
ActiveMessaging::Gateway.define do |s|
s.destination :rails_queue, '/queue/RailsQueue'
end
Run Application
script/poller run
Now you can publish to RailsQueue
and Rails instance will receive messages.
Production
First of all, you should think about deployment and maintenance. So, for small teams (without DevOps/SRE/System Administrator Role) I suggest look into Cloud Solutions.
AmazonMQ
https://aws.amazon.com/amazon-mq
Some features:
- Uses Apache KahaDB as its data store. Other data stores, such as JDBC and LevelDB, aren't supported
- Offers low latency messaging, often as low as single digit milliseconds
- Persistence out of the box
- Backups
No need to use extra API, just follow STOMP protocol (or any other) which Amazon provides to you.
FIFO (First In, First Out)
To enable saving the order of delivery of messages, you need to add in the broker configuration total ordering.
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<!--
The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other
consumers by limiting the number of messages that are retained
For more information, see:
-->
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
Reference
Post related to my last talk on Ruby Wine#1 about Event-Driven Architecture and Messaging Patterns for Ruby Microservices.
Slides available on Speaker Deck.
Conclusion
For simple projects, the use of queues may not be justified, since adding an additional architectural layer responsible for queuing messages is not an easy task. And before use, you must weigh the pros and cons(for e.g. you are ready to spend time for support). Queuing allows you to scale the application more flexible and solve most performance problems that are not related to language, but to architecture.
Top comments (2)
Thanks for this article. It's a roadmap for exactly what I'm trying to do, which is to connect my Rails 5.1 app to ActiveMQ. However, the ActiveMessaging gem you link to seems to be abandonware- there's been no commits for 2 years and people are creating issues like 'nothing works?' and receiving no answer. I'd guess that unless this article brought them there then they may not have tried the feature branch of the gem- I guess you've tested all this code works? - but the fact it isn't master and isn't being updated doesn't inspire confidence.
Can I ask if in the last year you're using this gem in production, and if not have you found any other ways of achieving this integration?
I was working with 'feat/rails5' branch ActiveMessaging, core features works fine, but I'm strongly recommend just use Stomp gem and write wrapper for your needs.