We received a request from our boss, who will be showcasing the program at a significant customer meeting. She strongly believes that the key to selling this product lies in its flexibility. Consequently, she requires additional agents to allow customers to interact with the system.
In this segment, we will introduce some new agents and delve deeper into the concept of composition.
Spoiler alert: read through the conclusion to see the demo…
What is composition?
In basic terms, an agent has the capability to receive data from single or multiple input channels and, if needed, can transmit data to single or multiple output channels. Composition is the ability to acquire functionalities by combining agents together using channels.
Consider this example:
The channel “raw images” ties the “producer agent” with the “visualizer agent”.
Suppose that we want to display smaller images by resizing them before. Then we introduce a new agent that takes an image from the input channel, performs resize, and finally outputs the resulting image into another channel. The new agent might be added as follows:
Assuming the same image type is involved (e.g. cv::Mat
), accommodating that change didn’t required to modify the producer nor the visualizer, but only to reorganize the agents. Indeed, agents are weakly coupled together: they depend only on the type of messages rather than senders and receivers. The “pipeline” can even be generated at run-time, from a configuration file or whatever. The advantage is quite evident.
The resizer agent is a good candidate for the purpose of this article:
class image_resizer final : public so_5::agent_t
{
public:
image_resizer(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output, double factor)
: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output)), m_factor(factor)
{
}
void so_define_agent() override
{
so_subscribe(m_input).event([this](const cv::Mat& image) {
cv::Mat resized;
resize(image, resized, {}, m_factor, m_factor);
so_5::send<cv::Mat>(m_output, std::move(resized));
});
}
private:
so_5::mbox_t m_input;
so_5::mbox_t m_output;
double m_factor;
};
OpenCV’s resize
comes to the rescue here:
-
resized
contains the resulting image; - passing
size = {}
is a trick for letting OpenCV calculate the final size by applying scale factor (passed in construction).
You can might be tempted to make the code shorter by rewriting the message handler as follows:
so_subscribe(m_input).event([this](cv::Mat image) {
resize(image, image, {}, m_factor, m_factor);
so_5::send<cv::Mat>(m_output, std::move(image));
});
However, you would fall into a hidden trap. Indeed, under the hood cv::Mat
is reference counted and does not perform a deep copy by default. This means that changing a local instance of cv::Mat
would instead tread on someone else’s toes! In particular, all subscribers to the main channel would be affected in an undefined way, resulting in undefined behavior. A way to mitigate this problem won’t be discussed here but consists in wrapping cv::Mat
into a “smarter” type that makes image modifications more explicit and acknowledged.
At this point, image_resizer
can be added to the cooperation:
int main()
{
auto ctrl_c = get_ctrl_c_token();
const so_5::wrapped_env_t sobjectizer;
auto main_channel = sobjectizer.environment().create_mbox("main");
auto commands_channel = sobjectizer.environment().create_mbox("commands");
auto resized_images = sobjectizer.environment().create_mbox("resized");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer>(main_channel, commands_channel);
c.make_agent<image_resizer>(main_channel, resized_images, 0.5);
c.make_agent<image_viewer_live>(resized_images);
c.make_agent<remote_control>(commands_channel, ctrl_c);
}
wait_for_stop(ctrl_c);
}
resized_images
is a named message box globally available and, consequently, can be subscribed by any agents inside the program (technically speaking, by any holding a reference to the wrapped_env_t
that owns that message box). This feature is possibly more useful when several agents need to subscribe to that channel or when we simply keep the door open for new subscriptions. On the other hand, in case only one subscriber is interested in that output (and will always be), using either a named or an anonymous message box is just a matter of taste or style.
However, if we use several image_resizer
s (e.g. one resizes for mobile and another for desktop) then each of them needs its own output channel. This merits attention when named message boxes are involved because it requires finding reasonable (and unique) names:
Anyway, “output agents” might be equipped with another constructor that automatically creates an anonymous output message box and a member function output()
:
Thus, image_resizer
might be changed as follows:
class image_resizer final : public so_5::agent_t
{
public:
image_resizer(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output , double factor)
: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output)), m_factor(factor)
{
}
image_resizer(so_5::agent_context_t ctx, so_5::mbox_t input, double factor)
: image_resizer(ctx, std::move(input), ctx.environment().create_mbox(), factor)
{
}
void so_define_agent() override
{
so_subscribe(m_input).event([this](const cv::Mat& image) {
cv::Mat resized;
resize(image, resized, {}, m_factor, m_factor);
so_5::send<cv::Mat>(m_output, std::move(resized));
});
}
so_5::mbox_t output() const
{
return m_output;
}
private:
so_5::mbox_t m_input;
so_5::mbox_t m_output;
double m_factor;
};
This allows creating the cooperation more succinctly:
auto resized_images = c.make_agent(main_channel, 0.5)->output();
c.make_agent(resized_images);
Beware we can’t just return image_resizer
‘s message box (so_direct_mbox()
) because it can’t be subscribed by any other agents but itself. For this reason, we create an anonymous message box (so_environment().create_mbox()
).
Forwarding data
Another interesting pattern comes into play when we just need to forward some data from one channel to another. Consider stream_detector
we introduced in the last episode and suppose we need its output to be sent to an additional channel. If we can’t change stream_detector
, the only approach is introducing a proxy agent that subscribes to the output of stream_detector
and redirects data to the new channel. In general, not only writing such proxies is a boring and error-prone task, but also a bit inefficient because these require to run in the context of some dispatcher. However, this was the only way to deal with this until SObjectizer 5.8.
Recently, SObjectizer introduced a new abstraction that solves this problem elegantly and open doors to new forms of message distribution: message sinks.
A message sink represents the destination of a message. Behind that can be entities like agents, message boxes or even custom types. Details are not important and can be a bit convoluted at this point, but we can exploit message sinks to create redirections. For example, semantically speaking, message sinks enable a message box to subscribe to another message box.
Consider this situation:
auto main_channel = sobjectizer.environment().create_mbox("main");
auto stream_notifications = sobjectizer.environment().create_mbox("stream");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<stream_detector>(main_channel, stream_notifications);
//...
}
Suppose we need stream_detector::stream_up
to be forwarded from stream_notifications
to main_channel
.
SObjectizer provides a couple of helpers to create forwardings from a message box to an arbitrary message sink. The concept of forwarding here is called binding. A binding can be seen as a tuple (message type, message source, message destination).
SObjectizer provides single_sink_binding_t
that can host only one binding and multi_sink_binding_t
that can host any number of bindings.
Both provide the function bind
to add a binding. For example:
so_5::single_sink_binding_t binding;
binding.bind<stream_detector::stream_up>(stream_notifications, some_destination);
This means: forward to some_destination
any message of type stream_detector::stream_up
(in this case, it's a signal) delivered to stream_notification
.
You might wonder if some_destination
can be the main_channel
. Well, there is a technical detail here: a message box is not a message sink but it can be turned into it by using a core function: wrap_to_msink(message_box)
. Thus, the example can be completed as follows:
so_5::single_sink_binding_t binding;
binding.bind<stream_detector::stream_up>(stream_notifications, so_5::wrap_to_msink(main_channel));
At this point, any stream_up
sent to stream_notifications
is automatically propagated to main_channel
as well. Semantically, we have attained a message box (main_channel
) subscribing to another message box (stream_notifications
). That was not possible before SObjectizer 5.8.
Note that adding an existing subscription would replace the old one:
so_5::single_sink_binding_t binding;
// 1
binding.bind<stream_detector::stream_up>(stream_notifications, so_5::wrap_to_msink(main_channel));
// this replaces "1"
binding.bind<stream_detector::stream_down>(stream_notifications, so_5::wrap_to_msink(main_channel));
so_5::wrap_to_msink(another_channel));
On the other hand, multi_sink_binding_t
is a bit more sophisticated (even from an implementation point of view) as it supports multiple bindings:
so_5::multi_sink_binding_t binding;
binding.bind<stream_detector::stream_up>(stream_notifications, so_5::wrap_to_msink(main_channel));
binding.bind<stream_detector::stream_down>(stream_notifications, so_5::wrap_to_msink(main_channel));
binding.bind<stream_detector::stream_down>(stream_notifications, so_5::wrap_to_msink(another_channel));
Here, any stream_down
sent to stream_notifications
is automatically propagated to both main_channel
and another_channel
as well. stream_up only to main_channel.
The final important bit about bindings regards lifetime. Both single_sink_binding_t
and multi_sink_binding_t
drop all subscriptions in the destructor. Thus, it’s crucial to keep binding instances alive as long as we need them.
In addition to usual ways like making the binding object part of an agent, another approach takes advantage of a tiny feature of cooperations:
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<stream_detector>(main_channel, stream_notifications);
//...
auto binding = c.take_under_control(std::make_unique<multi_sink_binding_t>());
binding->bind<stream_detector::stream_up>(stream_notifications, wrap_to_msink(main_channel));
binding->bind<stream_detector::stream_down>(stream_notifications, wrap_to_msink(main_channel));
}
take_under_control
transfers ownership of any user resource in the form of unique_ptr<T>
(not necessarily part of SObjectizer) to the cooperation.
In a future post we’ll discuss an interesting use case where message sinks are essential.
An extra agent
For experimenting with composition, in addition to image_resizer
, we incorporate a fancier agent such as a basic face detector using OpenCV’s Cascade Classifier, which employs a machine learning approach to identify objects within an image. Specifically, we’ll utilize a classifier file (haarcascade_frontalface_default.xml) optimized for recognizing human faces. To streamline our process, this file has been added to the repository, and will be copied into the output build directory for accessibility during runtime:
class face_detector final : public so_5::agent_t
{
public:
face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
{
}
void so_define_agent() override
{
so_subscribe(m_input).event([this](const cv::Mat& src) {
cv::Mat gray;
cvtColor(src, gray, cv::COLOR_BGR2GRAY);
std::vector<cv::Rect> faces;
m_classifier.detectMultiScale(gray, faces, 1.1, 4);
auto cloned = src.clone();
for (const auto& [x, y, w, h] : faces)
{
rectangle(cloned, { x, y }, { x + w, y + h }, { 255, 0, 0 }, 2);
}
so_5::send<cv::Mat>(m_output, std::move(cloned));
});
}
void so_evt_start() override
{
if (!m_classifier.load("haarcascade_frontalface_default.xml"))
{
throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
}
}
private:
so_5::mbox_t m_input; so_5::mbox_t m_output;
cv::CascadeClassifier m_classifier;
};
This new agent has the ability to overlay a rectangle onto each identified face within the frames. It’s important to highlight the utilization of so_evt_start()
for loading the classifier file, which might be a time-consuming operation, executed on the agent’s worker thread. If this operation encounters failure, it will throw an exception that will be managed according to the cooperation’s reaction policies discussed in a previous episode.
When employing machine learning (and deep learning) utilizing smaller images can offer advantages such as quicker computation and reduced resource utilization. Although in this simple scenario it might not be essential, let’s combine our image_resizer
with the face_detector
to emulate the case:
That is:
int main()
{
auto ctrl_c = get_ctrl_c_token();
const so_5::wrapped_env_t sobjectizer;
auto main_channel = sobjectizer.environment().create_mbox("main");
auto commands_channel = sobjectizer.environment().create_mbox("commands");
auto decorated_with_faces = sobjectizer.environment().create_mbox("faces");
auto resized_images = sobjectizer.environment().create_mbox("resized");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer>(main_channel, commands_channel);
c.make_agent<image_resizer>(main_channel, resized_images, 0.5);
c.make_agent<edge_detector>(resized_images, decorated_with_faces);
c.make_agent<image_viewer_live>(decorated_with_faces);
c.make_agent<remote_control>(commands_channel, ctrl_c);
}
wait_for_stop(ctrl_c);
}
As done before for the image_resizer
, we might equip face_detector
with another constructor that automatically creates an anonymous output message box and with a member function output()
. On the repository, you will find this version used as an example.
Finally, the live demo. As expected, the live viewer appears smaller than usual due to the input image being resized beforehand:
What about your own agents?!
By now, you’ve likely discovered that incorporating your own agent is quite straightforward! We encourage you to conduct experiments and create combinations of both existing and new agents according to your preferences.
Would you be interested in sharing some of these outcomes with the community? Please feel free to open a pull request or create an issue on the repository!
Fixing image viewer duplicated title
At this point, it can make sense to use multiple image viewers to experiment with the system. However, since the window title is statically defined, using many instances of the same image viewer class is problematic. For this reason, we make a small change to generate unique window titles:
class image_viewer final : public so_5::agent_t
{
struct call_waitkey final : so_5::signal_t {};
so_5::state_t st_handling_images{ this };
so_5::state_t st_stream_down{ this };
public:
image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_define_agent() override
{
st_handling_images
.event(m_channel, [this](so_5::mhood_t<cv::Mat> image) {
imshow(m_title, *image);
cv::waitKey(25);
st_handling_images.time_limit(500ms, st_stream_down);
});
st_stream_down
.on_enter([this] {
so_5::send<call_waitkey>(*this);
})
.event([this](so_5::mhood_t<call_waitkey>) {
cv::waitKey(25);
so_5::send<call_waitkey>(*this);
}).transfer_to_state<cv::Mat>(m_channel, st_handling_images);
st_handling_images.activate();
}
private:
static inline int global_id = 0;
so_5::mbox_t m_channel;
std::string m_title = std::format("image viewer {}", global_id++);
};
Takeaway
In this episode we have learned:
- either named and anonymous message boxes enable agent composition;
- message sinks are a new feature of SObjectizer that enables to redirect data succinctly from any message source to one or many others (e.g. from a message box to another message box);
- cooperations provide a generic mechanism to take ownership of user data in the form of
unique_ptr<T>
.
What’s next?
Our manager left a positive impact, sparking the customer’s interest in testing the alpha version of the program. Nevertheless, she expresses concern about potential webcam errors and insists that we address them before releasing the program.
Prepare for our upcoming post, where we’ll take on errors directly!
Thanks to Yauheni Akhotnikau for having reviewed this post.
Top comments (0)