Last time we left with a new requirement:
A camera vendor has just released a new SDK that works through callbacks. Basically, the user sets a function that will get called by the SDK on every new available frame. How can we integrate this into a producer agent?
If you have some experience with camera drivers, you know that SDKs usually work in two ways:
- polling/pull model: to retrieve the next available frame, we must explicitly call a GetNextFrame-like function;
- callback-based/push model: every time a new frame is available, the SDK calls a function we have set in advance.
In our simple scenario where we just continuously extract frames from the camera, they are interchangeable, at least from a functional point of view. However, each has its own peculiarities that might be more suitable than the other in some scenarios. Indeed, some SDKs support both the flavors, others only one. We have already covered the pull model in the previous post:
class image_producer final : public so_5::agent_t
{
public:
image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
{
}
void so_evt_start() override
{
cv::VideoCapture cap(0);
if (!cap.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
cv::Mat image;
while (!m_stop.stop_requested())
{
cap >> image; // we pull here the next frame
so_5::send<cv::Mat>(m_channel, std::move(image));
}
}
private:
so_5::mbox_t m_channel;
std::stop_token m_stop;
};
In this post, we are going to cover the push model, but unfortunately OpenCV’s VideoCapture
does not support a callback-based API. How can we do? Well, we can emulate it by resorting to the classical reader thread approach:
class observable_videocapture
{
public:
observable_videocapture()
: m_capture(0, cv::CAP_DSHOW)
{
}
void on_next_image(std::stop_token st, auto on_next)
{
if (!m_capture.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
// reader thread
m_worker = std::jthread{ [this, st, f = std::move(on_next)] {
cv::Mat image;
while (!st.stop_requested())
{
m_capture >> image;
f(std::move(image)); // callback
}
} };
}
private:
cv::VideoCapture m_capture;
std::jthread m_worker;
};
observable_videocapture
is a minimal callback-based replacement for VideoCapture
. Using this into an agent is smooth as glass:
class image_producer_callback final : public so_5::agent_t
{
public:
image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
{
}
void so_evt_start() override
{
m_device.on_next_image(m_stop, [this](cv::Mat image) {
so_5::send<cv::Mat>(m_channel, std::move(image));
});
}
private:
so_5::mbox_t m_channel;
observable_videocapture m_device;
std::stop_token m_stop;
};
on_next_image
does not engage image_producer_callback
in a tight loop, instead it returns immediately after spawning an internal worker thread. If the webcam can’t be open, on_next_image
throws (and so does so_evt_start
, as before). In case you are wondering, VideoCapture
manages a configurable buffer of frames that prevents frame loss in case the caller is not fast enough (it’s a common practice).
An interesting observation: image_producer_callback
can be slightly changed in order to automatically stop the acquisition on shutdown. Indeed, as we have seen in the previous episode, agents can execute some code before stopping by overriding so_evt_finish()
that is a special event handler installed on our behalf by SObjectizer. We can simply keep a stop_source
whose stop_token
is passed to on_next_image
and call request_stop()
from that is the shutdown handler:
class image_producer_callback final : public so_5::agent_t
{
public:
image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_evt_start() override
{
m_device.on_next_image(m_stop_source.get_token(), [this](cv::Mat image) {
so_5::send<cv::Mat>(m_channel, std::move(image));
});
}
void so_evt_finish() override
{
m_stop_source.request_stop();
}
private:
so_5::mbox_t m_channel;
observable_videocapture m_device;
std::stop_source m_stop_source;
};
This is a minimal implementation and some details are missing. Bear in mind that observable_videocapture
is conceptually similar to some real camera SDKs. For example, Spinnaker SDK spawns an image event processing thread that polls an internal GetNextImage-like function, and calls the supplied callback(s) on other threads (using boost::thread_group
).
Anyway, our task was not to implement observable_videocapture
, instead it was to provide an effective way to use a callback-based API from an agent. observable_videocapture
is just a fake implementation that allows us to emulate this case.
Thus, we end up with two scenarios:
- if we need/want to use polling (e.g.
VideoCapture
) then we useimage_producer
; - if we need/want to use callbacks (e.g.
observable_videocapture
) then we useimage_producer_callback
.
Possibly, some more code is needed to use a real SDK. For example, it might be required to configure the camera or to call some initialization functions. Such details can be hidden into so_define_agent
or so_evt_start
.
This new agent is a drop-in replacement for image_producer
, so we can add it to the cooperation as before:
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer_callback>(main_channel, ctrl_c);
// ... other agents
}
The task has been accomplished, yet it’s interesting to make a small step further that is preparatory to the next post.
The tight loop problem
image_producer_callback
has an advantage compared to image_producer
: it is not stuck in a tight loop, instead it can handle other messages while the camera is streaming. After all, image_producer
is not a “pure” agent but it’s more of an execution thread. Using agent_t
here is not superior to jthread
.
The question is: how can image_producer
handle other requests in between? It’s reasonable to assume that both image_producer
and image_producer_callback
are the only classes that are coupled with the device class. Suppose, just for a moment, that we use another more realistic device class – let’s call it SomeRealLifeDevice
– that provides a function GetBatteryLevel()
that returns the amount of available battery. Maybe, the application needs to call this function from time to time.
image_producer_callback
could simply subscribe to some message box and invoke that function when a certain message is received:
class image_producer_callback final : public so_5::agent_t
{
// ...
void so_define_agent() override
{
so_subscribe(..somewhere..).event([this](const PrintBatteryLevel&){
std::cout << m_device.GetBatteryLevel();
});
}
//...
private:
so_5::mbox_t m_channel;
SomeRealLifeDevice m_device;
std::stop_token m_stop;
};
Here above, we assume that SomeRealLifeDevice
allows calling GetBatteryLevel()
while the device is grabbing frames. We can assume, also, that SomeRealLifeDevice
supports both callbacks and polling. These are both fair assumptions, considering how real SDKs often work.
What about image_producer
?
Well, there exist more than one solution to this problem, but one consists in introducing an extra level of indirection. After all, which computer problems can’t be solved this way?! Since image_producer
can’t really react to messages then another agent will do it on its behalf. Superior to a thread-based solution, image_producer
and this broker can be tied in a special way, a sort of “relationship by birth”.
Suppose we use this SomeRealLifeDevice
into image_producer
:
class image_producer final : public so_5::agent_t
{
public:
image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
{
}
void so_evt_start() override
{
while (!m_stop.stop_requested())
{
so_5::send<cv::Mat>(m_channel, m_device.GetNextImage());
}
}
void print_battery_level()
{
std::osyncstream(std::cout) << m_device.GetBatteryLevel() << "\n";
}
private:
so_5::mbox_t m_channel;
std::stop_token m_stop;
SomeRealLifeDevice m_device;
};
Naively, we might think that print_battery_level
can be called this way:
image_producer* ptr = nullptr;
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
ptr = c.make_agent<image_producer>(main_channel, ctrl_c);
// ... other agents
}
// even from another thread
ptr->print_battery_level();
However, this is a bad idea because SObjectizer manages the lifetime of agents in an unspecified way, therefore ptr
can be dangling anytime.
Instead, SObjectizer provides one guarantee regarding agent pointers: an agent can safely store and use pointers to other agents as long as they belong to its parent cooperation. This introduces cooperation parent-child relationships: a cooperation can be created as a child of another cooperation. In this case, SObjectizer guarantees that all child cooperations will be deregistered and destroyed before their parent cooperation. This means, agents of the parent cooperation will be destroyed after agents of the child cooperation.
Thus, image_producer
can introduce a child cooperation holding a sort of broker agent that calls print_battery_level
when a certain message arrives (somewhere that is not important now).
class image_producer final : public so_5::agent_t
{
class image_producer_broker : public agent_t
{
public:
image_producer_broker(so_5::agent_context_t c, image_producer* parent)
: agent_t(std::move(c)), m_parent(parent)
{
}
void so_define_agent() override
{
so_subscribe(...somewhere...).event([this](const PrintBatteryLevel&) {
m_parent->print_battery_level();
});
}
private:
image_producer* m_parent;
};
public:
image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
{
}
void so_evt_start() override
{
// create an image_producer_broker inside a new child cooperation
introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
c.make_agent<image_producer_broker>(this);
});
while (!m_stop.stop_requested())
{
so_5::send<cv::Mat>(m_channel, m_device.GetNextImage());
}
}
private:
so_5::mbox_t m_channel;
std::stop_token m_stop;
SomeRealLifeDevice m_device;
void print_battery_level()
{
std::osyncstream(std::cout) << m_device.GetBatteryLevel() << "\n";
}
};
A few details here: there exist three ways to introduce child cooperations. Here we used the handy introduce_child_coop
:
- the first parameter may be either the parent cooperation or an agent of the parent cooperation;
- the second parameter is the binder to use;
- the third parameter is the usual lambda that fills the cooperation with agents;
Note that the second parameter can be omitted (e.g. introduce_child_coop(*this, lambda)
) and in that case the default dispatcher will be used. In our case, we want to be sure the broker handlers will be executed in a dedicated thread and then we simply used an active_obj
.
The only difference with introduce_coop
is the parent-child relationship.
Semantically, it makes sense that image_producer_broker
dies before image_producer
, otherwise it would be possible that it receives a message when image_producer
is dead.
Cooperation parent-child relationships are useful in other scenarios and we’ll use them again in the future. Just for completeness, consider that we can take advantage of the parent-child relationship between image_producer
and its child to handle the producer stop without injecting a stop_token
:
class image_producer final : public so_5::agent_t
{
class image_producer_broker : public agent_t
{
public:
image_producer_broker(so_5::agent_context_t c, image_producer* parent, std::stop_source stop_source)
: agent_t(std::move(c)), m_parent(parent), m_stop_source(std::move(stop_source))
{
}
void so_define_agent() override
{
so_subscribe(...somewhere...).event([this](const PrintBatteryLevel&) {
m_parent->print_battery_level();
});
}
void so_evt_finish() override
{
m_stop_source.request_stop();
}
private:
image_producer* m_parent;
std::stop_source m_stop_source;
};
public:
image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_evt_start() override
{
// create an image_producer_broker inside a new child cooperation
introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
c.make_agent<image_producer_broker>(this, m_stop_source);
});
const auto st = m_stop_source.get_token();
while (!st.stop_requested())
{
so_5::send<cv::Mat>(m_channel, m_device.GetNextImage());
}
}
private:
so_5::mbox_t m_channel;
std::stop_source m_stop_source;
SomeRealLifeDevice m_device;
void print_battery_level()
{
std::osyncstream(std::cout) << m_device.GetBatteryLevel() << "\n";
}
};
Indeed, image_producer_broker::so_evt_finish()
will be called strictly before finishing image_producer
.
Even though this solution is not encouraged because it implies state sharing that is not consistent with the actor model, this pattern can make some sense to glue SObjectizer code with other components that work with a totally different paradigm. By the way, it’s essential that such borderline cases are well isolated.
An alternative solution that works with message passing only will be discussed in the next post.
Takeaway
In this episode we have learned:
- camera SDKs often support a callback-based manner of grabbing frames;
- agents are functionally easier to use together with callback-based SDKs;
- cooperations parent-child relationships: we can introduce a cooperation that is child of another one, with the guarantee that children cooperations are destroyed before the parent;
- keeping a pointer to an agent created with
make_agent
(or such) is a bad idea; - state sharing is not in line with the actor model paradigm but sometimes is difficult to avoid.
As usual, calico
is updated and tagged.
What’s next?
In the next post we’ll rework image_producer
in order to use message passing style only.
Thanks to Yauheni Akhotnikau for having reviewed this post.
Top comments (0)