We love async Rust at Ditto. We were early adopters, using futures and streams in our networking code long before async/await became stable. Our product is a perfect use case. It's I/O-heavy, there are numerous peer-to-peer connections synchronizing data concurrently, and we need to use all cores efficiently on low-powered devices like mobile phones.
A common challenge in async is writing good tests. Take a simple example: below we create a hypothetical Connection
. If there is no activity for 10 seconds it should time out internally and close its events channel. We can write a test verifying this:
#[tokio::test]
async fn test_timeout_occurs() {
// Create the Connection
let mut conn = Connection::new().await;
// Wait for a while
delay_for(Duration::from_secs(11)).await.unwrap();
// The receive channel should have closed
assert_eq!(conn.try_recv().unwrap_err(), TryRecvError::Disconnected);
}
One problem with this test is that it always takes 11 seconds to execute: 10 seconds to wait for the timeout plus a safety factor. Slow tests are a huge hassle for both developers and CI/CD. Imagine if it was 30 minutes instead. Your CI runs would take at least 30 minutes, all because of this one test!
A related challenge is testing for the absence of a timer expiry. Here we ensure the events channel is still open at 5 seconds:
#[tokio::test]
async fn test_timeout_does_not_occur() {
// Create the Connection
let mut conn = Connection::new().await;
// Wait less than in the other test
delay_for(Duration::from_secs(5)).await.unwrap();
// The receive channel should still be open (but no value)
assert_eq!(conn.try_recv().unwrap_err(), TryRecvError::Empty);
}
This test is also slow, weighing in at 5 seconds, but there is a more insidious problem. Imagine our Connection
had a bug which caused it to close the channel after only 4 seconds. Would this test catch it? Probably… but not necessarily.
When that internal 4-second timer expires a series of steps must occur. A tokio worker thread must be unparked and poll the task which was waiting on that timer. Having noticed that the timer has expired it drops the events sender, which indicates to the receiving end that the channel is closed. It's only a small amount of work but it's not instantaneous, and it has to run concurrently with the main test task.
If you run this on your high-end development machine (which is what Rust developers use) then that expiry-and-channel-close process will occur in a matter of milliseconds. By the time the test task performs the assertion 1 second later, the channel will almost certainly be closed and the bug will be detected. On a heavily-loaded CI server, however, there may be fierce contention between threads. Processing the inner timeout might be delayed by 1–2 real-world seconds and the test would appear to pass. The previous test has the same problem in reverse; if the inner timeout gets delayed it will fail, even though the code being tested is correct.
These tests are non-deterministic and will result in flaky CI: intermittent build failures where you're not sure whether you can trust the results. This is suboptimal.
Conventional wisdom says that you shouldn't structure your code this way. Any logic you want to test should be expressed as synchronous code which is fully deterministic and abstracted away from the system clock. This is okay for small unit tests but it becomes a headache in larger integration tests. Some of our business logic is tied up in async behavior. Why can't we test that, the way our customers will experience it? If we test only the sync parts we're losing coverage.
What we want is a way to test our time-driven code with fewer compromises.
- It should be possible to test async code "as-is" in unit tests.
- In a test context, passage of time should be abstracted so that tests execute near-instantly no matter how long the delays are.
- Tests should be fully deterministic: an earlier timer always gets to work to completion before a timer scheduled later.
- As a unit test advances mock time,
Instant::now()
should return the correct intermediate time during the triggering of each timer. - The processing work attached to a timer should allow new timers to be registered along the way.
Ditto has built an internal library to address all of these problems. This crate, ditto_time
, abstracts over std::time
and the tokio timer functions. Here is the first test again, except now using this library.
#[test]
fn test_timeout_occurs_fast() {
let (time_control, _guard) = register_new_control();
let rt = build_instrumented_runtime(&time_control);
rt.block_on(async {
let mut conn = Connection::new().await;
time_control.advance(Duration::from_secs(10)).await; // <----
assert_eq!(conn.try_recv().unwrap_err(), TryRecvError::Disconnected);
});
}
This test is reliable and completes immediately. First there is a little boilerplate which ensures the code under test will use mock timers instead of real ones. For now, focus on the call to time_control.advance()
. Notice that it moves time ahead precisely 10 seconds. Then on the next line, the channel is guaranteed to be closed.
This is the most difficult part. It is not sufficient to trigger the timer. Somehow we have to know when the code which received that timer event has finished doing all of its associated work. This isn't possible using conventional timer futures. We don't know when the associated task will get scheduled, let alone when it's finished. Therefore the futures in ditto_time
are different—they output a FrozenTimeControlGuard
.
{
let _guard = ditto_time::delay_for(Duration::from_secs(10)).await.unwrap();
// handle timer event
// guard dropped; time can advance
}
It turns out that in almost all situations, timer-driven code does all of the relevant processing in a single block immediately following the await
. Therefore all we have to do is create a binding like _guard
, and now we have a value that can notify us when the end of the block has been reached—and therefore time can continue. In unusual cases the developer can preserve the guard as long as needed.
pub struct FrozenTimeControlGuard(Option<oneshot::Sender<()>>);
In unit test code the guard contains a oneshot sender. On drop the channel closes and the call to advance()
knows that it can move on to the next timer. advance()
is an async loop over all the currently-registered timers, triggering them one at a time and waiting for the corresponding channel to close. In non-test code there is no channel at all—no extra processing is required.
The last trick is correctly selecting real vs mock timers, and ensuring that they are associated with the correct TimeControl
. Rust runs tests in parallel so there may be many tokio runtimes executing tests simultaneously. If we tried to use global storage, different tests will clobber each other.
The solution is thread-local storage (TLS). Tokio offers the ability to run code when each worker thread is started, which is an opportunity to store an Arc<TimeControl>
. This is the purpose of the build_instrumented_runtime
boilerplate in the test. When the code under test calls delay_for()
it will dynamically create a Delay
future of the correct variant:
thread_local!(
static MOCK_CONTROL: RefCell<Option<Arc<TimeControl>>> = RefCell::new(None)
);
pub fn delay_for(duration: std::time::Duration) -> Delay {
MOCK_CONTROL.with(|it| match it.borrow().as_ref() {
Some(control) => {
let deadline = control.now() + duration;
Delay::new_mock(control, deadline)
}
None => Delay::Real(Box::pin(tokio::time::sleep(duration))),
})
}
With all that explanation, let's review the test in full.
#[test]
fn test_timeout_occurs_fast() {
let (time_control, _guard) = register_new_control();
let rt = build_instrumented_runtime(&time_control);
rt.block_on(async {
let mut conn = Connection::new().await;
time_control.advance(Duration::from_secs(10)).await;
assert_eq!(conn.try_recv().unwrap_err(), TryRecvError::Disconnected);
});
}
- A
TimeControl
instance is created that the test will use to advance time, and will also collect registrations of mock timers. - A custom
Runtime
is created which injects theArc<TimeControl>
into TLS. - Inside that runtime, the
Connection
is created and a 10-second mock timer is registered. -
advance()
is called with a 10 second duration. This will advance to 10 seconds, permitting all of the code insideConnection
to run. The channel gets closed. - The channel is definitely closed and reports
Disconnected
.
The second test can be adapted similarly. If the channel closes in 5 seconds or less, we're guaranteed to catch it.
#[test]
fn test_timeout_does_not_occur_fast() {
let (time_control, _guard) = register_new_control();
let rt = build_instrumented_runtime(&time_control);
rt.block_on(async {
let mut conn = Connection::new().await;
time_control.advance(Duration::from_secs(5)).await;
assert_eq!(conn.try_recv().unwrap_err(), TryRecvError::Empty);
});
}
This technique has enabled Ditto to build fast-running unit tests on exactly the same async code our customers use, with only modest code modifications to account for the guards. Testing gives us the confidence to include business logic directly in our async layer, avoiding the overhead of structuring timer-driven code into sync and async components. We believe async Rust has a bright future and our customers are already seeing the benefits.
Visit us at www.ditto.live for more content
Top comments (0)