If you know architecture of your code in advance, you can easily manage to make the code asynchronous!
Let's examine such an example: you want a function that downloads data from given URL, caches it for future use, and has a timeout for loading (so if remote server hangs the program can keep running). There are basically two solutions:
- Create just an async caching-downloading function, then race it against timeout. This creates problem when function is not written in cancellation-safe way: it can be terminated at any
.await
point, which may be after data loading but before putting it into cache. - Apply timeout exactly to downloading part! We can do this even without factoring out a separate function - that is, without overhead on creating additional
Future
objects.
Coding
Let's suppose we have a separate cache for each user. If so, the downloader struct will look as below:
use tokio::time::{sleep, Duration};
use std::collections::HashMap;
mod mock;
use mock::{AsyncInChannel, AsyncOutChannel, Url};
struct CacheForUser {
cache: HashMap<Url, String>,
channel: AsyncOutChannel,
}
Then, we write the function we actually needed. Since downloading can timeout, it will return Result
.
impl CacheForUser {
async fn get_response(&mut self, url: Url) -> Result<(), &'static str> {
if let Some(cached_response) = self.cache.get(&url) {
self.channel.send(cached_response).await;
return Ok(());
}
...
Here comes the part where the deadline applies! Let's create the appropriate timeout-tracking Future
.
tokio::select!
awaits Futures (Promises in JS terminology) it gets and, like match
block, processes the first one ready.
let response = {
tokio::pin! {let response_deadline = sleep(Duration::from_millis(600));}
let in_channel;
tokio::select! {
c = AsyncInChannel::connect(url) => {in_channel = c;}
_ = &mut response_deadline => {return Err("timeout on connection");}
}
let data;
tokio::select! {
d = in_channel.read() => {data = d;}
_ = &mut response_deadline => {return Err("timeout on reading data");}
}
data
}; // timeout no longer applies
From the end of the block, we can be sure that timeout will not interrupt our work. Then, we don't need special consideration for order of cache write and await
for sending data out.
...
self.channel.send(&response).await;
self.cache.insert(url, response);
Ok(())
}
}
Mocking
The easiest way to mock this is to have outbound channel write to Stdout
, and inbound channel read some constant value (but with delay configured by Url
). We don't even need to make Url
a string during tests, we may just pass pair of millisecond delays there.
// mock.rs
use tokio::io::{stdout, Stdout, AsyncWriteExt};
use tokio::time::{sleep, Duration};
pub type Url = (u64, u64);
pub struct AsyncOutChannel {
stdout: Stdout
}
impl AsyncOutChannel {
pub fn new() -> AsyncOutChannel {
AsyncOutChannel {stdout: stdout()}
}
pub async fn send(&mut self, s: &str) {
self.stdout.write_all(s.as_bytes()).await
.expect("IO error on stdout::write_all");
self.stdout.flush().await
.expect("IO error on stdout::flush");
}
}
pub struct AsyncInChannel {
read_time: u64
}
impl AsyncInChannel {
pub async fn connect(url: Url) -> AsyncInChannel {
sleep(Duration::from_millis(url.0)).await;
AsyncInChannel {read_time: url.1}
}
pub async fn read(&self) -> String {
sleep(Duration::from_millis(self.read_time)).await;
"read complete\n".to_owned()
}
}
Testing
Everything is quite straightforward here!
// main.rs
...
#[tokio::main]
async fn main() {
let mut cfu = CacheForUser {cache: HashMap::new(),
channel: AsyncOutChannel::new()};
println!("100,700ms: {:?}", cfu.get_response( (100,700) ).await);
println!("100,550ms: {:?}", cfu.get_response( (100,550) ).await);
println!("500,150ms: {:?}", cfu.get_response( (500,150) ).await);
println!("800,150ms: {:?}", cfu.get_response( (800,150) ).await);
println!("100,150ms: {:?}", cfu.get_response( (100,150) ).await);
}
We'll see the following results:
100,700ms: Err("timeout on reading data")
100,550ms: Err("timeout on reading data")
500,150ms: Err("timeout on reading data")
800,150ms: Err("timeout on connection")
read complete
100,150ms: Ok(())
I highly wonder whether it's possible to write such code in languages like Python or JS and be sure about timeout working correctly!
Top comments (0)