In our previous session, we delved into the process of managing commands to oversee the acquisition. This journey was notably extensive, given the necessity of updating all our producers along the way. The positive takeaway from this endeavor is that the process of sending commands is now significantly simplified.
To illustrate this improvement, we manually initiated start and stop commands from the main function, which, admittedly, lacks excitement. However, in this upcoming post, we will explore various instances of generating commands from different segments within the program.
Our producers are now primed to accept commands via the designated “commands” message box. The responsibility lies with various components of the program to transmit signals through this channel. By sending the start_acquisition_command
and stop_acquisition_command
, any of our producers will respond as required. This opens up a wide array of potential possibilities. We could develop a keyboard listener that translates keystrokes into signals, or create a REST API capable of receiving commands from the network, among other options.
For example, here is a minimal keyboard listener based on OpenCV that shows a tiny frame where you can press Enter or Escape to start and stop the acquisition:
class remote_control : public so_5::agent_t
{
struct keep_on final : so_5::signal_t {};
public:
remote_control(so_5::agent_context_t ctx, so_5::mbox_t commands)
: agent_t(std::move(ctx)), m_channel(std::move(commands))
{
}
void so_evt_start() override
{
cv::Mat frame = cv::Mat::ones(100, 200, CV_8UC3);
putText(frame, "Start (Enter)", { 2, 20 }, cv::FONT_HERSHEY_COMPLEX_SMALL, 1, { 240, 200, 1 });
putText(frame, "Stop (Escape)", { 2, 50 }, cv::FONT_HERSHEY_COMPLEX_SMALL, 1, { 240, 200, 1 });
imshow("Remote Control", frame);
so_5::send<keep_on>(*this);
}
void so_define_agent() override
{
so_subscribe_self()
.event([this[(so_5::mhood_t<keep_on>) {
switch (cv::waitKey(1))
{
case 13: // Enter
so_5::send<start_acquisition_command>(m_channel);
break;
case 27: // Escape
so_5::send<stop_acquisition_command>(m_channel);
break;
default:
break;
}
so_5::send_delayed<keep_on>(*this, 100ms);
});
}
private:
so_5::mbox_t m_channel;
};
Technical detail: the frame must be kept responsive by iteratively calling either waitKey
or pollKey
, for this reason we use that keep_on
signal. As said in a previous discussion, the command channel might be retrieved by name but here we prefer to inject it.
As usual, we add that agent to the party as follows:
int main()
{
auto ctrl_c = get_ctrl_c_token();
const so_5::wrapped_env_t sobjectizer;
auto main_channel = sobjectizer.environment().create_mbox("main");
auto commands_channel = sobjectizer.environment().create_mbox("commands");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&[(so_5::coop_t& c) {
c.make_agent<image_producer_recursive>(main_channel, commands_channel);
c.make_agent<image_viewer>(main_channel);
c.make_agent<remote_control>(commands_channel);
}
wait_for_stop(ctrl_c);
}
Here is a live demo:
Now, let’s consider adding some excitement to the situation. Imagine we need to capture frames for a specific duration, such as 10 seconds. How can we achieve this? One approach is to initiate the process with a “start” command, wait for 10 seconds, and then issue a “stop” command:
// somewhere
send<start_acquisition_command>(commands);
this_thread::sleep_for(10s);
send<stop_acquisition_command>(commands);
This method is functional, and as you may be aware, the “stop” command will not be executed until at least 10 seconds have passed (potentially even longer due to scheduling and other operating system intricacies). However, this approach can be somewhat inconvenient because it might require relocating the operation to a different context, like a separate thread or handling it within a coroutine, to prevent it from blocking the current thread.
An alternative way is using SObjectizer’s timers.
Introducing timed send functions
Typical applications developed with SObjectizer use timers quite extensively and for this reason SObjectizer uses a dedicated timer thread that can efficiently process a big amount of timers (tens and hundreds of million, even billions of timers).
Timers come in the form of two additional ways of sending messages:
- delayed messages
- periodic messages
The former is a manner of postponing the delivery of a message. For example:
so_5::send_delayed<stop_acquisition_command>(commands, 10s);
sends stop_acquisition_command
after 10 seconds.
Messages sent by send_delayed
can’t be cancelled. On the other hand, messages sent by send_periodic
can. We’ll see how in a moment.
As one would anticipate, periodic messages provide a means of transmitting the same message multiple times at specific intervals. In practice, the message instance is not deallocated but, instead, it is delivered again and again, until cancellation happens (either automatic or explicit).
For instance, consider a scenario in which our program is employed for video surveillance, and we require capturing images for a duration of 5 minutes every half an hour. In such a case, we can dispatch a pair of periodic messages:
auto receipt_start = so_5::send_periodic<start_acquisition_command>(commands,
30m, // first delivery delay
30m); // repetition period
auto receipt_stop = so_5::send_periodic<stop_acquisition_command>(commands,
35m, // first delivery delay
30m); // repetition period
receipt_start
and receipt_stop
are instances of so_5::timer_id_t
, a class that identifies a timed message. Often such instances are simply referred to as timers.
Storing receipt_start
and receipt_stop
is crucial, indeed timer_id_t
works like a shared pointer and destroying the last instance will cause the periodic message to be cancelled. This means, the message won’t be sent again according to the period. In other words, this code:
so_5::send_periodic<stop_acquisition_command>(commands, 35m, 30m);
causes the message to be immediately cancelled!
Also, send_periodic
supports delayed messages in case the third parameter (period
) is 0. In this special case, the message will be delivered only once, after the specified delay
:
auto delayed = so_5::send_periodic<start_acquisition_command>(commands,
30m, // delivery delay
0m); // Repetition period is zero -> it's a delayed message
Clearly, in this case destroying the receipt causes the delayed message to be cancelled.
Cancelling a message
As said, periodic messages can be cancelled, whether they are created with a period
greater than zero or not. Cancellation happens automatically when the last instance of timer_id_t
returned from send_periodic
is destroyed. However, we can even cancel a timer explicitly by means of release
:
auto receipt = so_5::send_periodic<start_acquisition_command>();
...
receipt.release(); // cancellation
This method cancels the message regardless of the count of remaining timer_id_t
objects pointed to that instance.
Often timers are stored into agents as member variables. For example:
class some_agent : public so_5::agent_t
{
so_5::timer_id_t m_timer;
//...
void some_function()
{
m_timer = so_5::send_periodic<some_message>(...);
// ...
}
};
In this case, when the agent is destroyed, so is m_timer
.
Have you ever tried to cancel a timer on Alexa…while it was about to ring?! In SObjectizer, message cancellation is not strictly guaranteed. Indeed, a delayed message won’t be revoked if it left the timer thread already. This happens if you try cancelling the message too late, in that case there are good chances it will be delivered anyway. For example, if you send a message with delay 100ms and you try cancelling after 100ms then it might have left the timer thread already. There are ways to prevent this problem but won’t be discussed here. If you are interested, have a look at this.
What about virtual_image_producer
?
You make a good observation! In the previous post, we inadvertently overlooked upgrading virtual_image_producer
to react to commands properly. This occurred because we needed timed messages. Let’s do it now.
The state machine closely resembles that of image_producer_recursive
with two key distinctions:
- the images are retrieved from the file system;
-
grab_image
signal is dispatched as a delayed message to replicate the delay between two successive frames.
Here is the full implementation:
class virtual_image_producer final : public so_5::agent_t
{
struct grab_image final : so_5::signal_t {};
public:
virtual_image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands, std::filesystem::path path)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands)), m_path(std::move(path))
{
}
void so_define_agent() override
{
st_started.event([this[(so_5::mhood_t<grab_image>) {
cv::Mat mat;
m_current_it = std::find_if(m_current_it, {}, [[(const auto& e) {
return e.is_regular_file();
});
so_5::send<cv::Mat>(m_channel, cv::imread(m_current_it->path().string()));
so_5::send_delayed<grab_image>(*this, std::chrono::milliseconds(20));
if (++m_current_it == std::filesystem::directory_iterator{})
{
m_current_it = std::filesystem::directory_iterator{ m_path };
}
}).event(m_commands, [this[(so_5::mhood_t<stop_acquisition_command>) {
st_stopped.activate();
});
st_stopped.event(m_commands, [this[(so_5::mhood_t<start_acquisition_command>) {
st_started.activate();
so_5::send<grab_image>(*this);
});
st_stopped.activate();
}
void so_evt_start() override
{
if (!is_directory(m_path))
{
throw std::runtime_error("Can't open virtual device directory");
}
m_current_it = std::filesystem::directory_iterator{ m_path };
}
private:
so_5::state_t st_stopped{ this };
so_5::state_t st_started{ this };
so_5::mbox_t m_channel;
so_5::mbox_t m_commands;
std::filesystem::path m_path;
std::filesystem::directory_iterator m_current_it;
};
To provide a more accurate adjustment, we should modify the pause duration by subtracting the milliseconds that were “consumed” while processing the last frame, such as the time required to read a frame from the disk:
// ... as before
void so_define_agent() override
{
st_started.event([this[(so_5::mhood_t<grab_image>) {
const auto tic = std::chrono::steady_clock::now();
cv::Mat mat;
m_current_it = std::find_if(m_current_it, {}, [[(const auto& e) {
return e.is_regular_file();
});
so_5::send<cv::Mat>(m_channel, cv::imread(m_current_it->path().string()));
const auto elapsed = std::min(20ms, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - tic));
so_5::send_delayed<grab_image>(*this, 20ms - elapsed);
if (++m_current_it == std::filesystem::directory_iterator{})
{
m_current_it = std::filesystem::directory_iterator{ m_path };
}
}). //... as before
While there are more sophisticated strategies available, we can stick with this simplified implementation for the time being. After all, this is merely a mock device!
Takeaway
In this episode we have learned:
- SObjectizer supports timers through timed send functions;
-
send_delayed
delivers a message after a delay, and such a message can’t be revoked; -
send_periodic
delivers a message again and again, until cancelled; - cancellation of periodic messages can happen implicitly or explicitly;
-
send_periodic
with 0-period is like delayed message but can be cancelled; - cancelling a message immediately after the deadline might prevent the revocation.
As usual, calico is updated and tagged.
What’s next?
We can unleash our imagination and craft any command sources. We can also create functions and components to send commands automatically on the basis of time.
However, playing a bit with start and stop we notice a subtle problem in the behavior of image_viewer
: it seems that after stopping the acquisition, the window hangs and stays unresponsive. When we start the acquisition again then it reawakens…what’s happening?
In the next post we’ll deal with that issue and provide a solution.
Thanks to Yauheni Akhotnikau for having reviewed this post.
Top comments (0)