In the previous episode, we covered the process of sending commands and utilizing timers. Nevertheless, we encountered an issue that impacts our image viewer. Specifically, when the acquisition process is halted, the image window becomes unresponsive. However, once the acquisition resumes, it regains responsiveness and functions smoothly.
Can you reproduce the issue? What’s going on?
The behavior depends on how we handle OpenCV’s window. After displaying an image with imshow
it is our responsibility to call waitKey
to enable GUI-related tasks like responding to mouse and keyboard interactions. This process is performed for each frame by image_viewer
. However, when the image stream pauses, the opportunity to call waitKey
also vanishes. Consequently, the window, although still present, becomes unresponsive.
When the agent no longer receives frames, it would be more suitable to either close the window or continue invoking waitKey
. In this post, we will explore a more advanced utilization of agent states to implement both of these strategies.
Observing when the stream goes idle
The central challenge revolves around detecting when the producer comes to a halt. One potential solution could involve having the image_viewer
subscribe to the stop command. However, this approach has its drawbacks because the producer may take some time to process the stop command and send the final frames. Consequently, the image_viewer
could potentially miss the last set of images.
Furthermore, in a more general scenario, the stop command might not even be available, or the producer might cease its operation for various other reasons, including temporary issues, hardware errors, and the like.
An alternative, middle-ground solution involves enhancing the communicativeness of producers. They could transmit notifications about the status of the stream and the camera, indicating when the acquisition commences, when it concludes, or if any problems arise. However, this approach is effective only when the image_viewer
directly subscribes to the producer’s output. On the flip side, if the image_viewer
is employed to visualize a different channel, such as the output of another agent, implementing this approach would necessitate additional logic, as notifications might not be part of the output of such an agent.
Hence, it is imperative to ensure that the image_viewer
can function seamlessly with any image channel.
One universal approach to determine the stream’s current status is by implementing a time-based criterion: if a specific duration elapses without any images being transmitted through the channel, we can infer that the stream has experienced either a definitive interruption or pause.
We could derive the threshold from the camera’s frame rate, but there are situations where the device is externally triggered, frame by frame, by a physical entity like an encoder. This method indirectly affects the output rate dynamically. In such cases, more advanced solutions may involve continuously estimating and adjusting this threshold over time. However, it’s important to note that the image_viewer
is primarily intended as a troubleshooting tool and won’t be frequently used in production. Therefore, using a static value may suffice for the time being.
Approach #1: just closing the window
The initial approach involves closing the window when the stream ends.
At present, the image_viewer
operates exclusively in its default state. Upon receiving the first frame, it creates a window that is subsequently closed when the program concludes. Subsequent frames are then displayed within this window. As previously mentioned, one approach to identify when the stream becomes inactive involves monitoring the absence of incoming frames for a specified period.
SObjectizer’s agent states are quite sophisticated and provide some utilities that might be useful for developing a working solution. First of all, image_viewer
can be modeled as a two-state agent:
- handling images: as before, just display images as they arrive;
- stream down: do nothing.
The main point is how to express transitions between the two:
- when in handling images, if no frames arrive for a certain time limit, the agent transitions to stream down;
- when exiting from handling images, the agent closes the window;
- if an image arrives when in stream down, the agent transitions to handling images.
Here is the code:
class image_viewer final : public so_5::agent_t
{
so_5::state_t st_handling_images{ this };
so_5::state_t st_stream_down{ this };
public:
image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_define_agent() override
{
st_handling_images
.event(m_channel, [this](const cv::Mat& image) {
imshow(m_title, image);
cv::waitKey(25);
st_handling_images.time_limit(500ms, st_stream_down);
})
.on_exit([this] { cv::destroyWindow(m_title); });
st_stream_down
.transfer_to_state<cv::Mat>(m_channel, st_handling_images);
st_stream_down.activate();
}
private:
so_5::mbox_t m_channel;
std::string m_title = "image viewer";
};
As you can see, states provide functions for expressing additional properties and transitions.
Some details:
-
on_exit
takes a function that is executed when the agent leaves the state. It’s the opportunity to calldestroyWindow
; -
transfer_to_state
allows the agent to switch from the current state to another when a certain message arrives. In other words, while being inst_stream_down
, ifcv::Mat
arrives, the agent transitions tost_handling_images
andcv::Mat
is handled there; -
time_limit
allows to set a maximum amount of time the agent may persist in a certain state.
It’s crucial to understand why time_limit
is called inside the message handler. One can think that doing this would be equivalent:
st_handling_images
.event(m_channel, [this](const cv::Mat& image) {
imshow(m_title, image);
cv::waitKey(25);
})
.time_limit(500ms, st_stream_down);
.on_exit([this] { cv::destroyWindow(m_title); });
However, in this case the timer is not reset when a new image arrives. The effect is seeing the window blinking while the stream is active, because it gets destroyed and created every 500ms! On the other hand, when time_limit
is called inside the handler then its timer is reset after every image (that is the intended behavior, because we set a time limit after the last received frame).
As you can imagine, there is also on_enter
that executes a function when transitioning into a state. Just for your information, both on_enter
and on_exit
mustn’t throw exceptions.
Note that this solution is not bulletproof: cv::waitKey(25)
suffers the very same problem as estimating how long to await between two frames. However, as said, image_viewer
is just for troubleshooting and can be enough for now. Feel free to change this number at will.
Here is a demo:
Approach #2: keeping the window responsive
Sometimes it can make sense to display the last acquired frame and keep the window responsive.
Basically, when the stream goes down, we should keep on calling waitKey
until a new frame arrives. This should remind you of the post where we discussed how to model an infinite loop with messages. Indeed, the pattern fills the bill.
Also, since we have just given a taste of agent state utilities, we can use our new tools to express transitions succinctly.
We can still model the image viewer as a two-state machine, adding a signal to the party. When images arrive – that is st_handling_images
– the agent behaves exactly as before. When it enters into st_stream_down
, this time the agent sends call_waitkey
to itself to start calling waitKey
until cv::Mat
arrives, and the state turns into st_handling_images again
. Not forget to remove on_exit
from st_handling_images
since we don’t need to destroy the window anymore:
class image_viewer_live final : public so_5::agent_t
{
struct call_waitkey final : so_5::signal_t {};
so_5::state_t st_handling_images{ this };
so_5::state_t st_stream_down{ this };
public:
image_viewer_live(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_define_agent() override
{
st_handling_images
.event(m_channel, [this](const cv::Mat& image) {
imshow(m_title, image);
cv::waitKey(25);
st_handling_images.time_limit(500ms, st_stream_down);
});
st_stream_down
.on_enter([this] {
so_5::send<call_waitkey>(*this);
})
.event([this](so_5::mhood_t<call_waitkey>) {
cv::waitKey(25);
so_5::send<call_waitkey>(*this);
}).transfer_to_state<cv::Mat>(m_channel, st_handling_images);
st_handling_images.activate();
}
private:
so_5::mbox_t m_channel;
std::string m_title = "image viewer live";
};
Note that we activate st_handling_images
first otherwise call_waitkey
is sent without any existing window (OpenCV would not complain though).
Here is a demo:
Stream heartbeat logger
A variation of this pattern can be adopted for crafting another utility agent that logs the uptime of the stream, let’s say every 5 seconds. When the stream is active, we expect such logs:
[Heartbeat] Uptime: 00:00:05
[Heartbeat] Uptime: 00:00:10
[Heartbeat] Uptime: 00:00:15
[Heartbeat] Uptime: 00:00:20
...
When the stream goes down, the agent shuts up. And if the stream goes up again, so does the agent (restarting from 0).
This time, we combine state transitions with periodic messages. Likewise iage_viewer_live
, the agent sends a signal to itself, but not at breakneck speed, instead every 5 seconds:
class stream_heartbeat final : public so_5::agent_t
{
struct log_heartbeat final : so_5::signal_t {};
so_5::state_t st_handling_images{ this };
so_5::state_t st_stream_down{ this };
public:
stream_heartbeat(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_define_agent() override
{
st_handling_images
.on_enter([this] {
m_start_time = std::chrono::steady_clock::now();
m_timer = so_5::send_periodic<log_heartbeat>(so_direct_mbox(), 5s, 5s);
})
.event(m_channel, [this](const cv::Mat&) {
st_handling_images.time_limit(500ms, st_stream_down);
}).event([this](so_5::mhood_t<log_heartbeat>) {
std::osyncstream(std::cout) << std::format("[Heartbeat] Uptime: {:%H:%M:%S}\n", std::chrono::floor<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time));
}).on_exit([this] {
m_timer.release();
});
st_stream_down
.transfer_to_state<cv::Mat>(m_channel, st_handling_images);
st_stream_down.activate();
}
private:
so_5::mbox_t m_channel;
std::chrono::time_point<std::chrono::steady_clock> m_start_time;
so_5::timer_id_t m_timer;
};
A few details:
-
log_heartbeat
indicates when it’s time to log the message on the console; -
m_timer
holds the periodic message and it’s released (aka: cancelled) when the stream goes down; -
m_timer
is (re)created when the stream goes up, along withm_start_time
which represents the streaming start time; - the uptime is calculated as the number of seconds from the start time rounded up.
As usual, you can play with these new agents by adding them to the cooperation. For example:
int main()
{
auto ctrl_c = get_ctrl_c_token();
const so_5::wrapped_env_t sobjectizer;
auto main_channel = sobjectizer.environment().create_mbox("main");
auto commands_channel = sobjectizer.environment().create_mbox("commands");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer_recursive>(main_channel, commands_channel);
c.make_agent<image_viewer_live>(main_channel);
c.make_agent<remote_control>(commands_channel);
c.make_agent<stream_heartbeat>(main_channel);
}
wait_for_stop(ctrl_c);
}
Here is a demo:
Just a design digression
Since stream_heartbeat
, image_viewer
and image_viewer_live
do the very same inner logic, we can think of moving that into a dedicated agent that sends stream notifications:
class stream_detector final : public so_5::agent_t
{
so_5::state_t st_handling_images{ this };
so_5::state_t st_stream_down{ this };
public:
struct stream_up final : so_5::signal_t{};
struct stream_down final : so_5::signal_t{};
stream_detector(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t output)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_out_channel(std::move(output))
{
}
void so_define_agent() override
{
st_handling_images
.on_enter([this] {
so_5::send<stream_up>(m_out_channel);
})
.event(m_channel, [this](const cv::Mat&) {
st_handling_images.time_limit(500ms, st_stream_down);
}).on_exit([this] {
so_5::send<stream_down>(m_out_channel);
});
st_stream_down
.transfer_to_state<cv::Mat>(m_channel, st_handling_images);
st_stream_down.activate();
}
private:
so_5::mbox_t m_channel;
so_5::mbox_t m_out_channel;
};
At this point, stream_heartbeat
boils down to:
class stream_heartbeat_with_detector final : public so_5::agent_t
{
struct log_heartbeat final : so_5::signal_t {};
public:
stream_heartbeat_with_detector(so_5::agent_context_t ctx, so_5::mbox_t detector_channel)
: agent_t(std::move(ctx)), m_channel(std::move(detector_channel))
{
}
void so_define_agent() override
{
so_subscribe(m_channel).event([this](so_5::mhood_t<stream_detector::stream_up>) {
m_start_time = std::chrono::steady_clock::now();
m_timer = so_5::send_periodic<log_heartbeat>(so_direct_mbox(), 5s, 5s);
}).event([this](so_5::mhood_t<stream_detector::stream_down>) {
m_timer.release();
});
so_subscribe_self().event([this](so_5::mhood_t<log_heartbeat>) {
std::osyncstream(std::cout) << std::format("[Heartbeat] Uptime: {:%H:%M:%S}\n", std::chrono::floor<std::chrono::seconds>(std::chrono::steady_clock::now() - m_start_time));
});
}
private:
so_5::mbox_t m_channel;
std::chrono::time_point<std::chrono::steady_clock> m_start_time;
so_5::timer_id_t m_timer;
};
Here is an example of usage:
int main()
{
auto ctrl_c = get_ctrl_c_token();
const so_5::wrapped_env_t sobjectizer;
auto main_channel = sobjectizer.environment().create_mbox("main");
auto commands_channel = sobjectizer.environment().create_mbox("commands");
auto stream_notifications = sobjectizer.environment().create_mbox("stream");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer_recursive>(main_channel, commands_channel);
c.make_agent<stream_detector>(main_channel, stream_notifications);
c.make_agent<stream_heartbeat_with_detector>(stream_notifications);
c.make_agent<image_viewer_live>(main_channel);
c.make_agent<remote_control>(commands_channel);
}
wait_for_stop(ctrl_c);
}
Clearly, rather than adding a new channel, main_channel
can host output notifications as well:
c.make_agent(main_channel, main_channel); // output to main_channel
c.make_agent(main_channel);
Adding an extra channel might be not just a matter of style or taste. We tend to keep some channels separate, especially to avoid mixing “raw” data with “derived” data. In the next post we’ll get back to this a bit more.
Another design argument regards stream_heartbeat_with_detector
.
It’s evident that stream_heartbeat_with_detector
is less generic than stream_heartbeat
because it can’t simply work with any “image channel”, but only with those carrying stream_up
and stream_down
. This creates an implicit dependency on stream_detector
, and we can further solidify this relationship by placing both stream_detector
and heartbeat_with_detector
into the same cooperation.
On the other hand, stream_heartbeat_with_detector
has gained two interesting features: firstly, it can now function seamlessly with any image type, eliminating its previous dependency on cv::Mat
. For example, if we introduce introduce a new SDKImage
and integrate it with stream_detector
, stream_heartbeat_with_detector
will continue to work without modification. Secondly, it no longer handles the “500 milliseconds” threshold, which was previously used to determine if a stream becomes idle. This responsibility is now solely within the domain of stream_detector
(about this threshold, however, the central point discussed earlier in this article still stands: the most effective approach to determine if the stream has started and concluded is to have the producer send appropriate signals).
It’s important to highlight that the first feature has a wider applicability. In the scenario where we have numerous agents designed to work exclusively with cv::Mat
(such as image_viewer
and image_tracer
), if, at some point, we introduce support for SDKImage
into the program, we can simplify the transition by introducing an adapter agent that converts SDKImage
into cv::Mat
(assuming it’s feasible). This adaptation would enable the current agents to seamlessly continue their operations without requiring any extra modifications:
The good news is that we do have a choice. Wanna opt for putting all the logic into a few agents, minimizing message exchange and type proliferation? Deal. Or do you prefer fine-grained agents and more modularity? Deal.
Takeaway
In this episode we have learned:
- agent states provide functions to express transitions and particular conditions that must be guaranteed;
-
on_enter
executes a function when the agent transitions to a certain state; -
on_exit
executes a function when the agent leaves a certain state; -
on_enter
andon_exit
handlers mustn’t throw exceptions; -
time_limit
sets the maximum time the agent can be in a certain state (and can be reset from a message handler); - combining agent states with periodic messages can be powerful.
As usual, calico
is updated and tagged.
What’s next?
Our manager is highly impressed with the project’s advancements and is eager to put it to use. However, with an important customer demonstration on the horizon, she has a new request:
“The customer will be captivated by the system’s flexibility. Could you add more agents and provide examples showcasing the power of composition?”
In the forthcoming post, we will introduce a few more agents, highlighting the dynamic potential of composition in action!
Thanks to Yauheni Akhotnikau for having reviewed this post.
Top comments (0)