打开tokio-timer可以看到timout
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<T> {
value: T,
delay: Delay,
}
很简单的结构,value就是我们要加timeout的future或者stream,delay就是多久之后就timeout(:
看看他实现的Future和Stream trait
impl<T> Future for Timeout<T>
where
T: Future,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.value.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
// Now check the timer
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Err(Error::elapsed()),
Err(e) => Err(Error::timer(e)),
}
}
}
impl<T> Stream for Timeout<T>
where
T: Stream,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// First, try polling the future
match self.value.poll() {
Ok(Async::Ready(v)) => {
if v.is_some() {
self.delay.reset_timeout();
}
return Ok(Async::Ready(v));
}
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
// Now check the timer
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
self.delay.reset_timeout();
Err(Error::elapsed())
}
Err(e) => Err(Error::timer(e)),
}
}
都是正常去poll value,如果ready了就直接返回,如果没有ready,那么再去poll dealy看看是否timeout,如果没有timeout返回notready,如果timeout了那就返回time elapsed error.
非常简单的逻辑实现了timeout(:
如果我要方便的使用timeout可能需要use tokio::prelude::FutureExt;
fn timeout(self, timeout: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, timeout)
}
其实自己用Timeout::new也可以,但是xxx.timeout看着舒服一些
Top comments (0)