Skip to content

Async book - Chapter 6 - Exercise #74

@chenyo-17

Description

@chenyo-17

I feel the exercise solution (Build a RetryFuture) can be improved for the following reasons:

  • The implementation requires Unpin for F and Fut, which is too restrictive and unnecessary.
  • The example usage with http_get is most likely !Unpin, which cannot be passed to RetryFuture.
  • The solution can have richer comments to call-back to previous topics like Pin and cx.

Here is an alternative compiled solution I propose for reference:

//! Build a RetryFuture<F, Fut> that takes a closure F: Fn() -> Fut
//! and retries up to N times if the inner future returns Err.
//! It should return the first Ok result or the last Err.

use std::{
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

struct RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    remaining_retry_attempts: usize,
    current_fut: Fut,
    closure: F,
}

impl<F, Fut, T, E> RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    fn new(closure: F, max_retry_attempts: usize) -> Self {
        Self {
            remaining_retry_attempts: max_retry_attempts,
            current_fut: closure(),
            closure,
        }
    }
}

impl<F, Fut, T, E> Future for RetryFuture<F, Fut, T, E>
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    type Output = Fut::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        unsafe {
            // SAFETY: `self` is never moved out of `Pin`
            let this = self.get_unchecked_mut();

            loop {
                // SAFETY: `self.current_fut` is not moved
                match Pin::new_unchecked(&mut this.current_fut).poll(cx) {
                    Poll::Ready(Ok(val)) => return Poll::Ready(Ok(val)),
                    Poll::Ready(err) => {
                        if this.remaining_retry_attempts == 0 {
                            return Poll::Ready(err);
                        }

                        // the future is consumed, create a new one
                        this.current_fut = (this.closure)();

                        this.remaining_retry_attempts -= 1;
                        println!("remaining attempts: {}", this.remaining_retry_attempts);

                        // continue the loop to immediately re-poll the recreated future to retry,
                        // which ensures the waker is registered in the new future before it
                        // returns `Pending`
                    }
                    // the waker is always registered via `current_fut.poll(cx)`
                    Poll::Pending => return Poll::Pending,
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    use anyhow::{Result, bail};

    // simulate a multi-state future
    async fn maybe_fail() -> Result<usize> {
        if rand::random() {
            Ok(async {
                tokio::time::sleep(Duration::from_secs(1)).await;
                17
            }
            .await)
        } else {
            tokio::time::sleep(Duration::from_secs(2)).await;
            bail!("failed")
        }
    }

    let retry_future = RetryFuture::new(maybe_fail, 5);
    println!("{:?}", retry_future.await);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions