Some time ago, I was looking for alternatives to Microsoft’s Asynchronous Agents Library. I developed a Windows application interfacing with industrial cameras and performing multiple tasks such as visualization and object recognition, and I needed it to run on other platforms. It was designed with message passing like a pipeline that is a series of components that exchange messages. Every component – usually called “agent” or “actor” – performs a certain task when receiving a message, and possibly passes the results to others in the network:
For example, here above, the physical camera is managed by a “producer agent” which grabs frames from the device and sends them to a sort of “message queue” called “raw images”. Other agents can receive images from that queue: the “visualizer agent” displays the image stream on the screen and the “computer vision agent” accelerates some neural network on each image and delivers the results to another queue named “results”.
A remarkable model leveraging message passing is called actor model which treats an actor (or “agent”) as the basic building block of concurrent computation. In practice, actors may modify their own private state, but can only affect each other indirectly through messaging. This removes the need for any kind of explicit synchronization and eliminates data races, making concurrent applications development more robust and often easier to maintain.
My application took advantage of this model only in the same process but the more general pattern is at the foundations of truly distributed systems.
When embracing a model like this, mixing “application code” (e.g. the domain) with “system code” (e.g. thread and entity management) is an issue that complicates development and maintenance. For this reason, we often rely on a library (either 3rd party or developed by us) that manages actor synchronization and message delivery, letting us focus on the applicative logic only.
In particular, such a library usually provides a few essential elements:
- a way to define actors and how they react to messages;
- data structures and functions to send and receive messages;
- an “environment” that keeps things together and manages low-level details such as threading, lifetime and message delivery, without bothering the user too much but also providing settings and customization points.
In Microsoft’s Asynchronous Agents, for example, agents maintain the state, perform tasks, and communicate through message-passing functions with asynchronous blocks which hold and transmit messages. Everything is auto-magically managed by a scheduler which runs agents and manages message delivery.
When it came time to find a replacement, I had decent experience with libraries like CAF and TBB, however I thought they were not a good fit for my application. Oversimplifying, the former is more suitable for truly distributed systems over the network, the latter is for parallel computing. Another option was to just replace blocks with lock-free queues (e.g. this excellent implementation) and agents with std::thread
s (eventually updated to std::jthread
s now).
Spending some more time to explore other options was worth it.
Meeting SObjectizer
When I found SObjectizer, I quickly realized it was a mature library designed for easing concurrent application development. SObjectizer wisely revisits the classical actor model but without being formally stuck to that. Simplicity is attractive – for me “a punch is just a punch” – and SObjectizer captured my attention primarily for supporting subscriptions (the Publish–subscribe pattern).
Microsoft’s agents have to actively receive from message blocks. Imagine something like this (pseudo-code):
void some_agent::run()
{
Image image;
while (!stop)
{
if (receive(input, image))
{
// process image and eventually send to destination block
}
}
}
On the other hand, SObjectizer works differently: agents make subscriptions. Message delivery and dequeuing is managed by the library.
Another detail concerned agent shutdown. In Microsoft library, stopping a running agent is not a “native” operation, instead it requires you to insert a “custom” piece of code in the main loop of every agent. For example:
Image image;
while (!stop)
{
auto source = make_choice(&stop_source, &input); // a special source combining others two
// if stopped
if (receive(source) == 0) // index of the first receiving source is stop_source
{
stop = true;
}
else if (source.has_value())
{
image = source.value<Image>();
// process image and eventually send to destination block
}
}
Admittedly, the code snippet above is not totally obscure, but it would be more elegant to delegate this task to the library (since literally every agent will need it at some point).
On the other hand, in SObjectizer every agent is automatically stopped at the shutdown. This means, again, less boilerplate and consistent management of the shutdown process.
These two features, apparently very basic, caught my attention and I decided to give SObjectizer a chance. I didn’t regret it.
Taking the first steps
After building and installing the library (e.g. using vcpkg), the first thing I did was to write a simple agent that processed images somehow. Every agent inherits from so_5::agent_t
:
class some_agent final : public so_5::agent_t
{
// ...
}
Then, the work of an agent is, basically, reacting to messages. For this, we define all the needed subscriptions into a special agent member function called so_define_agent
. A subscription is tied to a certain message source (we are talking about in a moment) and is expressed as a function object:
void some_agent::so_define_agent() override
{
// assume we have a MEMBER VARIABLE named "m_input" that holds our message source
so_subscribe(m_input).event([](Image image) {
// ... code to process one image and, eventually, send the results to the destination block
});
}
As you might imagine, SObjectizer deduces the type of the message from the parameter of the lambda. We have defined a subscription for the type Image
on a certain source m_input
.
By default, every agent only processes one message at a time. In other words, if multiple images are delivered to m_input
, the lambda won’t be called in parallel. This is called non-reentrancy and enables agents to easily maintain their internal state without any need for synchronization.
Also, there is no code for stopping the agent. Indeed, every agent is automatically requested to stop at the shutdown. Practically speaking, the agent finishes the pending work and then stops.
All agents receiving messages (e.g. “visualizer agent” and “computer vision agent”) could be implemented this way easily. However, my application hosted an agent running the frame grabbing loop. You can think about it as the source or producer agent. It does not react to messages but, rather, produces data. Imagine something like this (pseudo-code):
void image_producer::run()
{
Camera camera;
camera.start();
while (!m_stop)
{
auto image = camera.get();
send(destination, image); // production
}
camera.stop();
}
void image_producer::stop()
{
m_stop = true;
}
Imagine that run()
is executed on a dedicated thread (we’ll get to that later).
Encapsulating this logic into an so_5::agent_t
can be done through the special member function so_evt_start
that is executed just after the agent starts:
void image_producer_agent::so_evt_start() override
{
Camera camera;
camera.start();
while (!m_stop)
{
auto image = camera.get();
send(destination, image);
}
camera.stop();
}
void image_producer_agent::stop()
{
m_stop = true;
}
As you see, in this case agent stop must be handled “by hand”. Is that the only way? We’ll talk about that in another post.
Think about how this works: when the image_producer_agent
starts, it grabs frames and sends them to some destination until it gets stopped. The other agents which subscribed to the same destination will receive frames.
How to use multiple threads?
On which thread is so_evt_start
executed?
Every agent in SObjectizer is bound to a worker thread. Message handlers are then executed on this worker thread. How this thread is assigned depends on a fundamental entity whose task is to manage worker threads: the dispatcher. By default, SObjectizer creates and runs a dispatcher that assigns the same thread to every agent. The type of this dispatcher is one_thread
. We – programmers – can create and use other dispatchers as needed. In practice, a dispatcher controls the strategy that assigns agent handlers to threads.
SObjectizer provides some dispatcher types ready for use, and also the proper abstractions to roll our own. For example, a simple one that binds an agent to a dedicated worker thread (not shared with anyone else) is called active_obj
. Another one is the active_group
dispatcher that assigns the very same worker thread to a group of agents. This is useful, for instance, to decompose some work into self-contained pieces of logic without running to multiple threads.
Back to the original question – on which thread is so_evt_start
executed? It depends on the dispatcher. In this elegant agent model, in fact, everything is a subscription. s_evt_start
is a predefined handler to a special message, sent by SObjectizer to the agent after successfully creating and registering it, so it is executed in the same way as the dispatcher demands.
By default, all agents are bound to the same worker thread. We’ll see in a moment how to create another type of dispatcher that assigns a dedicated thread per agent. This is a simple way to avoid that an agent blocks others.
Where do messages go?
Now that we have an intuition of how agents can be developed, we need to understand where to send and receive the images. In other words, we are wondering what is the type of m_input
here:
void some_agent::so_define_agent() override
{
// assume we have a MEMBER VARIABLE named "m_input" that holds our message source
so_subscribe(m_input).event([](Image image) {
// ... code to process one image and, eventually, send the results to the destination block
});
}
Practically speaking, that entity enables us to connect agents together. Indeed, images are not sent to agents directly, instead they are delivered to this sort of “message queue” that holds and transmits messages to listeners. And you might imagine that lifetime issues are also addressed nicely because agents don’t hold any references to others. This paradigm distinguishes SObjectizer (and Microsoft’s Agents Library) from the “pure” actor model in which messages are sent to actors directly. In this case, the sender needs to keep some reference to the receiver (maybe just an address, but still a reference).
In SObjectizer, this data structure is called message box, which you can imagine like a data channel. To receive from a message box, an agent has to make a subscription to it for a certain message type. There is no other way. Message boxes can have one or more subscribers, just like one or more senders. Indeed, SObjectizer distinguishes MPSC (multi-producer/single-consumer) and MPMC (multi-producer/multiple-consumer) message boxes, even though both implement the same interface. Every agent has its own MPSC message box, as a shorthand for sending messages directly to it (semantically speaking). We will see some usages of this in the future.
A MPMC message box was just what I needed for my application.
A low-hanging fruit
When things were taking the shape of a testable application, I realized that I was not only replacing a library with another but I had something more powerful in my hands.
I take a step backwards.
Some months before meeting SObjectizer, I was imagining a disruptive feature for the users: the ability to specify how to connect components together. At the time, indeed, every component could only receive from the “raw images” channel, holding the images coming directly from the camera. The user could specify any number of components to use (e.g. a visualizer and a save-to-disk), but the output of each component was fixed in code. For example, the “computer vision component” performed a certain task (e.g. recognition of patterns) and dropped the results into a predetermined buffer, eventually propagated to another destination (fixed as well).
This was fine at that point but, in the months that followed, my board brimmed with features that would have benefited from something a bit more flexible. For example, I needed to design a way to produce “enhanced” images from raw images and use the same components for visualizing and saving them.
Thus, sending the output of the new “enhancer” component to the “visualizer” component was a solution, as long as the output of the former was compatible with the input of the latter.
To introduce this feature, I imagined named channels: every buffer is a channel with a name. Some channels are automatically created by the program and come with fixed names (e.g. the raw images buffer might be called “main”) and those components able to produce results can send data to a configurable output channel.
For example, in an imaginary piece of YAML, a "computer-vision agent" takes images from a channel named "main", draws the bounding boxes of the recognized patterns on top of them, and sends the "enhanced" images to a channel named "patterns". A "visualizer agent" and a "saver agent" subscribe to the channel "patterns" to perform their task:
agents:
- name: computer-vision
input: main
output: patterns
- name: visualizer
input: patterns
- name: saver
input: patterns
Implementing this with Microsoft's library was doable but not free. For example some effort would be needed to keep channels indexed and to manage ownership properly (in order to destroy buffers at the right time). Also, message buffers are typed.
On the other hand, with SObjectizer this feature was already in place. Indeed, message boxes can be created (and retrieved) by name and are also untyped, allowing any message type to transit.
It was going places.
Collecting all the pieces of the puzzle
In order to assemble and run the agents, I got into a few additional concepts.
First of all, the environment, that is the container where all SObjectizer entities are created and managed. You remember? The thing that does the magic, behind the scenes. All the details are not important here. Creating and starting the environment can be done by using either so_5::launch
or a special class called so_5::wrapped_env_t
. The former is a self-contained launch function, whereas the latter gives a little bit more flexibility:
int main()
{
const so_5::wrapped_env_t sobjectizer; // <<- the magic is here!
// ...
// shutdown performed automatically
}
An environment is automatically created and started in the constructor of wrapped_env_t
, and there is no need to shut it down explicitly since it will be done in the destructor.
The second important concept at this point is the creation of agents. Sometimes agents have logical dependencies between each other and it might be that a certain feature can’t work without running more agents as a whole. This would lead to complex initialization logic, e.g. one agent has been created but another one failed then the first must be destroyed. If you are familiar with pods, that are the smallest deployable units of computing, you can find some similarities. For example, imagine a payment system where several components need to work together otherwise the overall pipeline can’t work. The “payment pod” is composed of multiple services that work as a whole.
When dealing with “distributed” entities, this is a common issue.
For this reason, SObjectizer provides an elegant abstraction: agents are not created in isolation, instead they can be grouped into something called cooperation that guarantees a transactional creation process. Either all the agents of a cooperation are successfully created, or none. The terms “creation” here is loose. Underneath, there are some more details that, again, are not important at this point. Also, consider that a cooperation containing a single agent is totally fine.
Introducing only one cooperation was enough for the spike:
sobjectizer.environment().introduce_coop([](so_5::coop_t& c) {
// ...
}
That lambda is a sort of “recipe” to add agents to the cooperation.
And now, let’s see the producer agent. For the sake of simplicity, let me use C++20’s stop_token
I didn’t have at that time (I had available a custom implementation of the very same concept):
class image_producer final : public so_5::agent_t
{
public:
image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
{
}
void so_evt_start() override
{
Camera camera;
camera.start();
while (!m_stop.stop_requested())
{
so_5::send<Image>(m_channel, camera.get());
}
camera.stop();
}
private:
so_5::mbox_t m_channel;
std::stop_token m_stop;
};
This is clearly a minimal implementation that omits lots of details (error handling, image data type involved, etc), but it’s ok for the purpose of this post.
Another basic agent useful for debugging was a simple “visualizer”:
class image_viewer final : public so_5::agent_t
{
public:
image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_define_agent() override
{
so_subscribe(m_channel).event([](cv::Mat image) {
imshow("Live!", image);
cv::waitKey(25);
});
}
private:
so_5::mbox_t m_channel;
};
You can guess this is based on OpenCV and just shows one image after the other. More details in other posts, no need to worry about that now.
Then we create the “main” channel, that is the message box that will hold the “raw” stream of images:
auto main_channel = sobjectizer.environment().create_mbox("main");
At this point, we need a stop_token
to pass to image_producer
. Imagine we bind to a stop_token
that triggers when a termination signal (e.g. SIGTERM
or SIGINT
) is sent to the application. This detail is not important here and will be materialized in the next post. Let’s pretend we have this:
auto ctrl_c = get_ctrl_c_token();
Finally, we introduce a cooperation to hold and manage our agents:
sobjectizer.environment().introduce_coop([&](so_5::coop_t& c) {
c.make_agent<image_producer>(main_channel, ctrl_c);
c.make_agent<image_viewer>(main_channel);
});
Can you spot any issues here?
We have to pay attention to the dispatcher. Agents, by default, are handled by a one_thread
dispatcher that assigns the very same worker thread to every agent (even those belonging to different cooperations). So, here above, both image_viewer
and image_producer
share the same worker thread. This could be done on purpose in other scenarios but we have a problem here: image_producer::so_evt_start
does blocks until explicitly stopped. This means that image_viewer
won’t have any chance to execute its message handler when an image arrives. A simple solution consists in using an active_obj
dispatcher which always assigns a new worker thread to every agent:
sobjectizer.environment().introduce_coop([&](so_5::coop_t& c) {
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
c.make_agent_with_binder<image_producer>(dispatcher.binder(), main_channel, ctrl_c);
c.make_agent_with_binder<image_viewer>(dispatcher.binder(), main_channel);
}
Thus, both the agents now have their own worker thread. It’s ok for the spike.
A few additional details here: every dispatcher has a convenient make_dispatcher
function that creates an instance of that type of dispatcher. However, the dispatcher is not passed directly to the agent, instead there is another small abstraction for that: the dispatcher binder that is obtained by calling binder()
on the dispatcher instance. Since the output of this function is reference counted, it's idiomatic to pass around copies of the binder to express that several agents are "managed" by the same dispatcher.
An alternative syntax is:
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer>(main_channel, ctrl_c);
c.make_agent<image_viewer>(main_channel);
}
Passing the dispatcher binder to the cooperation makes it the default for the whole cooperation.
Here is the complete code (still a few details missing):
int main()
{
auto ctrl_c = get_ctrl_c_token(); // we will get back to this in the next post
const so_5::wrapped_env_t sobjectizer;
auto main_channel = sobjectizer.environment().create_mbox("main");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer>(main_channel, ctrl_c);
c.make_agent<image_viewer>(main_channel);
}
wait_for_stop(ctrl_c); // we will get back to this in the next post
}
get_ctrl_c_token()
and wait_for_stop()
are not coming from SObjectizer and will be explained in the next post.
When I launched the experiment for the first time, I got the expected results. Functionally speaking, some unit and regression tests passed, and others needed me to implement more agents to work. Similarly, performance tests showed very good results.
The spike was successful and porting the application to SObjectizer was promising.
Then I committed to the activity, adding all the agents to the party, and discovering many more features and capabilities offered by the library. Sometimes I had questions and the SObjectizer community – in particular the project leader Yauheni Akhotnikau – was always very helpful. Also, I was glad to host this meetup with Nicolai Grodzitski – a SObjectizer developer – during the pandemic.
Some time later, after having heavily tested the new version of my software and having compared all the relevant metrics with the stable version, that was finally released. Shortly after, the low-hanging fruit “composing components” (discussed above) unleashed all its power enabling it to adopt the software in some other unforeseen scenarios and making it easier (both from the development and usage point of view) to introduce some new features.
SObjectizer Tales
The story was successful and I have decided to start this series primarily to give something back to the SObjectizer community because I think the library is valuable. My idea is to incrementally develop a very simplified version of my real application, and to discuss (real) micro and macro problems encountered. I will focus on SObjectizer, leaving aside all those details that pad out. For example, I won’t detail how the application reads the config, or how the camera disconnection events can be handled. Instead, topics closer to SObjectizer will be described, like “how to split work on multiple workers” or “how to retry sending data over the network”.
This article was very long, bear with me. This won’t be the norm, though. I will keep the theory short and simple, eventually referring to the official documentation or a relevant discussion. Concepts will be added on demand. When possible, I will contextualize the problem by bringing back the original conversation I had with the development team. I am grateful to Yauheni Akhotnikau – SObjectizer’s project leader – who is going to review this series. This way, the contents will be more accurate and idiomatic.
One post, one problem. We’ll learn by doing. This is not a tutorial on SObjectizer and we won’t cover everything. This is not a tutorial on the actor model or multithreading either. But some problems and solutions discussed in the series and also being exposed to message passing will be beneficial for you to step into the topic.
Show me the code! This series goes hand in hand with this repository that will be tagged on every new post. Each installment will leave the repository in a working state and you will be able to compile and run the code yourself, working code starts from the next post.
You are encouraged to create your own agents and combine them as you like!
Feel like sharing your creations with the rest of the community? Open a pull request!
I racked my brain for days to decide if the “camera” application was appealing. Alternatively, I had to create something different from scratch. You know, the tool is appropriate for a bunch of scenarios and I have birthed ideas like an Alexa-like software, a car infotainment system, or a classical web server. However, I have come to the conclusion that taking the cue from an application I really developed and put into production is more valuable. Almost surely, some digression posts are possible and, in case, you will be informed.
Clearly, I’ll take some shortcuts and omit lots of details. For instance, I won’t refer to the cameras used in production, instead, I will grab frames from the webcam with OpenCV. However, adapting the code to any physical (or virtual) devices should be easy.
We are all set for starting this journey!
Let me just sum up what you are going to find in the series:
- incrementally design and develop a message passing-styled application;
- learn how to use the most important abstractions provided by SObjectizer;
- discussions about real problems, solutions and trade-offs;
- idiomatic content reviewed by SObjectizer’s project leader;
- image processing domain, hopefully fun for you;
- working source code and unrestricted freedom to add and combine your own image processors.
In the next installment we’ll rework the code presented in this post and make something that compiles and runs...
Thanks to Yauheni Akhotnikau and Davide Di Gennaro for having reviewed this post.
Top comments (0)