diff --git a/CHANGELOG.md b/CHANGELOG.md index 46cb902..00b19ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. This makes the library more minimal by default and only contains the functionality that the user explicitly enables. This is a breaking change. - Upgrade to Rust 2024 edition. This also bumps the MSRV to 1.85.0 +- Add a separate type `AsyncReceiver` that implements `Future` instead of implementing it + directly on the `Receiver` type. Now the `Receiver` implements `IntoFuture` instead. + This is a breaking change. This change removes the possible panics in many recv* methods, + and it simplifies some code a bit. ### Fixed - Make Debug impl on SendError include the channel message type, instead of just saying diff --git a/Cargo.toml b/Cargo.toml index c3e36bc..0b0fbd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ rust-version = "1.85.0" # Enables usage of libstd. Adds support for thread blocking receive methods. std = [] -# Enables async receiving by implementing Future +# Enables async receiving via the AsyncReceiver receiver async = [] # Only used for internal correctness testing. diff --git a/README.md b/README.md index 2432619..8b82a87 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ So you are guaranteed, at the type level, that there can only be one message sen The sender's send method is non-blocking, and potentially lock- and wait-free. See documentation on [Sender::send] for situations where it might not be fully wait-free. The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time -limited thread blocking receive operations. The receiver also implements `Future` and +limited thread blocking receive operations. The receiver also implements `IntoFuture` and supports asynchronously awaiting the message. @@ -90,10 +90,10 @@ task, or the other way around. If message passing is the way you are communicati that should work smoothly between the sync and async parts of the program! This library achieves that by having a fast and cheap send operation that can -be used in both sync threads and async tasks. The receiver has both thread blocking -receive methods for synchronous usage, and implements `Future` for asynchronous usage. +be used in both regular threads and async tasks. The receiver has both thread blocking +receive methods for synchronous usage, and implements `IntoFuture` for asynchronous usage. -The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on +The receiving endpoint of this channel implements Rust's `IntoFuture` trait and can be waited on in an asynchronous task. This implementation is completely executor/runtime agnostic. It should be possible to use this library with any executor, or even pass messages between tasks running in different executors. diff --git a/src/lib.rs b/src/lib.rs index 1cc4091..f05f013 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ //! The sender's send method is non-blocking, and potentially lock- and wait-free. //! See documentation on [Sender::send] for situations where it might not be fully wait-free. //! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time -//! limited thread blocking receive operations. The receiver also implements `Future` and +//! limited thread blocking receive operations. The receiver also implements `IntoFuture` and //! supports asynchronously awaiting the message. //! //! @@ -90,10 +90,10 @@ //! that should work smoothly between the sync and async parts of the program! //! //! This library achieves that by having a fast and cheap send operation that can -//! be used in both sync threads and async tasks. The receiver has both thread blocking -//! receive methods for synchronous usage, and implements `Future` for asynchronous usage. +//! be used in both regular threads and async tasks. The receiver has both thread blocking +//! receive methods for synchronous usage, and implements `IntoFuture` for asynchronous usage. //! -//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on +//! The receiving endpoint of this channel implements Rust's `IntoFuture` trait and can be waited on //! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should //! be possible to use this library with any executor, or even pass messages between tasks running //! in different executors. @@ -142,6 +142,8 @@ mod sender; pub use sender::Sender; mod receiver; +#[cfg(feature = "async")] +pub use receiver::AsyncReceiver; pub use receiver::Receiver; mod states; @@ -178,6 +180,21 @@ pub fn channel() -> (Sender, Receiver) { ) } +/// Ergonomic shorthand for creating a channel and immediately convert the [`Receiver`] into +/// a future. +/// +/// This can be useful when you need to pass the receiver to a function that expects a +/// type implementing [`Future`] directly. Using this function is not necessary when +/// you are going to use `.await` on the receiver, as that will automatically call +/// [`IntoFuture::into_future`] in the background. +#[cfg(feature = "async")] +#[inline(always)] +pub fn async_channel() -> (Sender, AsyncReceiver) { + let (sender, receiver) = channel(); + let async_receiver = core::future::IntoFuture::into_future(receiver); + (sender, async_receiver) +} + /// Deallocates the channel's heap allocation (created in `oneshot::channel()`). /// /// # Safety diff --git a/src/receiver.rs b/src/receiver.rs index aed5973..21b6897 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,8 +1,8 @@ use core::{mem, ptr::NonNull}; -#[cfg(all(any(feature = "std", feature = "async"), not(oneshot_loom)))] +#[cfg(all(feature = "async", not(oneshot_loom)))] use core::hint; -#[cfg(all(any(feature = "std", feature = "async"), oneshot_loom))] +#[cfg(all(feature = "async", oneshot_loom))] use loom::hint; #[cfg(feature = "async")] @@ -37,7 +37,7 @@ use crate::waker::ReceiverWaker; /// Can be used to receive a message from the corresponding [`Sender`](crate::Sender). How the message /// can be received depends on what features are enabled. /// -/// This type implement [`IntoFuture`] when the `async` feature is enabled. +/// This type implements [`IntoFuture`](core::future::IntoFuture) when the `async` feature is enabled. /// This allows awaiting it directly in an async context. #[derive(Debug)] pub struct Receiver { @@ -46,6 +46,20 @@ pub struct Receiver { channel_ptr: NonNull>, } +/// A version of [`Receiver`] that implements [`Future`](core::future::Future), for awaiting the +/// message in an async context. +/// +/// This type is automatically created and polled in the background when awaiting a [`Receiver`]. +/// But it can also be created explicitly with the [`async_channel`](crate::async_channel) function or by calling +/// [`IntoFuture::into_future`](core::future::IntoFuture::into_future) on the [`Receiver`]. +#[cfg(feature = "async")] +#[derive(Debug)] +pub struct AsyncReceiver { + // Covariance is the right choice here. Consider the example presented in Sender, and you'll + // see that if we replaced `rx` instead then we would get the expected behavior + channel_ptr: NonNull>, +} + // SAFETY: The core functionality of this library is to be able to pass channel ends to different // threads to then be able to pass messages between threads or tasks. // The receiver only contains a pointer to the channel, and the entire library revolves around @@ -57,6 +71,12 @@ unsafe impl Send for Receiver {} impl Unpin for Receiver {} +// SAFETY: See documentation on Send impl on Receiver. +#[cfg(feature = "async")] +unsafe impl Send for AsyncReceiver {} +#[cfg(feature = "async")] +impl Unpin for AsyncReceiver {} + impl Receiver { /// # Safety /// @@ -104,8 +124,6 @@ impl Receiver { } EMPTY => Err(TryRecvError::Empty), DISCONNECTED => Err(TryRecvError::Disconnected), - #[cfg(feature = "async")] - RECEIVING | UNPARKING => Err(TryRecvError::Empty), _ => unreachable!(), } } @@ -123,10 +141,6 @@ impl Receiver { /// /// If a sent message has already been extracted from this channel this method will return an /// error. - /// - /// # Panics - /// - /// Panics if called after this receiver has been polled asynchronously. #[cfg(feature = "std")] pub fn recv(self) -> Result { // Note that we don't need to worry about changing the state to disconnected or setting the @@ -277,9 +291,6 @@ impl Receiver { Err(RecvError) } - // The receiver must have been `Future::poll`ed prior to this call. - #[cfg(feature = "async")] - RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), _ => unreachable!(), } } @@ -291,10 +302,6 @@ impl Receiver { /// /// If a message is returned, the channel is disconnected and any subsequent receive operation /// using this receiver will return an error. - /// - /// # Panics - /// - /// Panics if called after this receiver has been polled asynchronously. #[cfg(feature = "std")] pub fn recv_ref(&self) -> Result { self.start_recv_ref(RecvError, |channel| { @@ -335,10 +342,6 @@ impl Receiver { /// /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point /// in the future this falls back to an indefinitely blocking receive operation. - /// - /// # Panics - /// - /// Panics if called after this receiver has been polled asynchronously. #[cfg(feature = "std")] pub fn recv_timeout(&self, timeout: Duration) -> Result { match Instant::now().checked_add(timeout) { @@ -355,10 +358,6 @@ impl Receiver { /// /// If a message is returned, the channel is disconnected and any subsequent receive operation /// using this receiver will return an error. - /// - /// # Panics - /// - /// Panics if called after this receiver has been polled asynchronously. #[cfg(feature = "std")] pub fn recv_deadline(&self, deadline: Instant) -> Result { /// # Safety @@ -588,9 +587,6 @@ impl Receiver { } // The sender was dropped before sending anything, or we already received the message. DISCONNECTED => Err(disconnected_error), - // The receiver must have been `Future::poll`ed prior to this call. - #[cfg(feature = "async")] - RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), _ => unreachable!(), } } @@ -621,7 +617,23 @@ impl Receiver { } #[cfg(feature = "async")] -impl core::future::Future for Receiver { +impl core::future::IntoFuture for Receiver { + type Output = Result; + type IntoFuture = AsyncReceiver; + + #[inline(always)] + fn into_future(self) -> Self::IntoFuture { + let Receiver { channel_ptr } = self; + + // Don't run our Drop implementation, since the receiver lives on as an async receiver. + mem::forget(self); + + AsyncReceiver { channel_ptr } + } +} + +#[cfg(feature = "async")] +impl core::future::Future for AsyncReceiver { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { @@ -743,11 +755,47 @@ impl Drop for Receiver { // that signal to the sender to deallocate the channel. So the channel pointer is valid let channel = unsafe { self.channel_ptr.as_ref() }; + // Set the channel state to disconnected and read what state the channel was in + // ORDERING: Release is required so that in the states where the sender becomes responsible + // for deallocating the channel, they can synchronize with this final state swap store from + // us. Acquire is required by most branches to synchronize with writes in the sender. + // See each branch for details. + match channel.state().swap(DISCONNECTED, AcqRel) { + // The sender has not sent anything, nor is it dropped. The sender is responsible for + // deallocating the channel. + EMPTY => (), + // The sender already sent something. We must drop the message, and free the channel. + MESSAGE => { + // SAFETY: The MESSAGE state plus acquire ordering guarantees the sender has + // written a message and that it has a happens-before relationship with this drop. + unsafe { channel.drop_message() }; + + // SAFETY: The acquire ordering of the swap above synchronize with the sender's + // final write of the state. So we can safely deallocate it. + unsafe { dealloc(self.channel_ptr) }; + } + // The sender was already dropped. We are responsible for freeing the channel. + DISCONNECTED => { + // SAFETY: The acquire ordering of the swap above synchronize with the sender's + // final write of the state. So we can safely deallocate it. + unsafe { dealloc(self.channel_ptr) }; + } + _ => unreachable!(), + } + } +} + +#[cfg(feature = "async")] +impl Drop for AsyncReceiver { + fn drop(&mut self) { + // SAFETY: since the receiving side is still alive the sender would have observed that and + // left deallocating the channel allocation to us. + let channel = unsafe { self.channel_ptr.as_ref() }; + // If this receiver was previously polled, but was not polled to completion, then the // channel is in the RECEIVING state and with a waker written. We must tell the sender // we are no longer receiving, and then drop the waker. We must first move away from // the RECEIVING state in order to not race with the sender around taking the waker. - #[cfg(feature = "async")] if channel.state().load(Relaxed) == RECEIVING && channel .state() @@ -787,7 +835,6 @@ impl Drop for Receiver { // state. But the sender has observed the RECEIVING state and is currently reading the // waker to wake us up. We need to loop here until we observe the MESSAGE or // DISCONNECTED state. We busy loop here since we know the sender is done very soon. - #[cfg(any(feature = "std", feature = "async"))] UNPARKING => { loop { hint::spin_loop(); @@ -818,7 +865,3 @@ impl Drop for Receiver { } } } - -#[cfg(all(feature = "std", feature = "async"))] -const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str = - "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled"; diff --git a/tests/async.rs b/tests/async.rs index 6c0464d..b8b0841 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -88,37 +88,9 @@ async fn await_before_send_then_drop_sender_async_std() { t.await; } -// Tests that the Receiver handles being used synchronously even after being polled -#[tokio::test] -async fn poll_future_and_then_try_recv() { - use core::future::Future; - use core::pin::Pin; - use core::task::{self, Poll}; - - struct StupidReceiverFuture(oneshot::Receiver<()>); - - impl Future for StupidReceiverFuture { - type Output = Result<(), oneshot::RecvError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let poll_result = Future::poll(Pin::new(&mut self.0), cx); - self.0.try_recv().expect_err("Should never be a message"); - poll_result - } - } - - let (sender, receiver) = oneshot::channel(); - let t = tokio::spawn(async { - tokio::time::sleep(Duration::from_millis(20)).await; - mem::drop(sender); - }); - StupidReceiverFuture(receiver).await.unwrap_err(); - t.await.unwrap(); -} - #[tokio::test] async fn poll_receiver_then_drop_it() { - let (sender, receiver) = oneshot::channel::<()>(); + let (sender, receiver) = oneshot::async_channel::<()>(); // This will poll the receiver and then give up after 100 ms. tokio::time::timeout(Duration::from_millis(100), receiver) .await diff --git a/tests/future.rs b/tests/future.rs index 199bfda..8a3c7af 100644 --- a/tests/future.rs +++ b/tests/future.rs @@ -45,7 +45,7 @@ fn multiple_receiver_polls_keeps_only_latest_waker() { let waker1 = unsafe { task::Waker::from_raw(raw_waker1) }; let mut context1 = task::Context::from_waker(&waker1); - let (_sender, mut receiver) = oneshot::channel::<()>(); + let (_sender, mut receiver) = oneshot::async_channel::<()>(); let poll_result = future::Future::poll(pin::Pin::new(&mut receiver), &mut context1); assert_eq!(poll_result, task::Poll::Pending); diff --git a/tests/loom.rs b/tests/loom.rs index 9de4da7..e26d467 100644 --- a/tests/loom.rs +++ b/tests/loom.rs @@ -5,7 +5,7 @@ use oneshot::TryRecvError; use loom::hint; use loom::thread; #[cfg(feature = "async")] -use std::future::Future; +use std::future::{Future, IntoFuture}; #[cfg(feature = "async")] use std::pin::Pin; #[cfg(feature = "async")] @@ -75,7 +75,7 @@ fn async_recv() { let t1 = thread::spawn(move || { sender.send(987).unwrap(); }); - assert_eq!(loom::future::block_on(receiver), Ok(987)); + assert_eq!(loom::future::block_on(receiver.into_future()), Ok(987)); t1.join().unwrap(); }) } @@ -84,7 +84,7 @@ fn async_recv() { #[test] fn send_then_poll() { loom::model(|| { - let (sender, mut receiver) = oneshot::channel::(); + let (sender, mut receiver) = oneshot::async_channel::(); sender.send(1234).unwrap(); let (waker, waker_handle) = helpers::waker::waker(); @@ -105,7 +105,7 @@ fn send_then_poll() { #[test] fn poll_then_drop_receiver_during_send() { loom::model(|| { - let (sender, mut receiver) = oneshot::channel::(); + let (sender, mut receiver) = oneshot::async_channel::(); let (waker, _waker_handle) = helpers::waker::waker(); let mut context = task::Context::from_waker(&waker); @@ -130,7 +130,7 @@ fn poll_then_drop_receiver_during_send() { #[test] fn poll_then_send() { loom::model(|| { - let (sender, mut receiver) = oneshot::channel::(); + let (sender, mut receiver) = oneshot::async_channel::(); let (waker, waker_handle) = helpers::waker::waker(); let mut context = task::Context::from_waker(&waker); @@ -159,7 +159,7 @@ fn poll_then_send() { #[test] fn poll_with_different_wakers() { loom::model(|| { - let (sender, mut receiver) = oneshot::channel::(); + let (sender, mut receiver) = oneshot::async_channel::(); let (waker1, waker_handle1) = helpers::waker::waker(); let mut context1 = task::Context::from_waker(&waker1); @@ -192,60 +192,3 @@ fn poll_with_different_wakers() { assert_eq!(waker_handle2.wake_count(), 1); }) } - -#[cfg(feature = "async")] -#[test] -fn poll_then_try_recv() { - loom::model(|| { - let (_sender, mut receiver) = oneshot::channel::(); - - let (waker, waker_handle) = helpers::waker::waker(); - let mut context = task::Context::from_waker(&waker); - - assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); - assert_eq!(waker_handle.clone_count(), 1); - assert_eq!(waker_handle.drop_count(), 0); - assert_eq!(waker_handle.wake_count(), 0); - - assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty)); - - assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); - assert_eq!(waker_handle.clone_count(), 2); - assert_eq!(waker_handle.drop_count(), 1); - assert_eq!(waker_handle.wake_count(), 0); - }) -} - -#[cfg(feature = "async")] -#[test] -fn poll_then_try_recv_while_sending() { - loom::model(|| { - let (sender, mut receiver) = oneshot::channel::(); - - let (waker, waker_handle) = helpers::waker::waker(); - let mut context = task::Context::from_waker(&waker); - - assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); - assert_eq!(waker_handle.clone_count(), 1); - assert_eq!(waker_handle.drop_count(), 0); - assert_eq!(waker_handle.wake_count(), 0); - - let t = thread::spawn(move || { - sender.send(1234).unwrap(); - }); - - let msg = loop { - match receiver.try_recv() { - Ok(msg) => break msg, - Err(TryRecvError::Empty) => hint::spin_loop(), - Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"), - } - }; - assert_eq!(msg, 1234); - assert_eq!(waker_handle.clone_count(), 1); - assert_eq!(waker_handle.drop_count(), 1); - assert_eq!(waker_handle.wake_count(), 1); - - t.join().unwrap(); - }) -} diff --git a/tests/miri.rs b/tests/miri.rs index a9c0d5e..d3f4413 100644 --- a/tests/miri.rs +++ b/tests/miri.rs @@ -143,7 +143,7 @@ fn tx_send_rx_try_recv_then_drop() { #[cfg(feature = "async")] #[test] fn tx_send_rx_poll_to_completion() { - let (tx, rx) = oneshot::channel::(); + let (tx, rx) = oneshot::async_channel::(); let rx_thread = spawn_named("rx_thread", move || { let mut rx = pin!(rx); @@ -168,7 +168,7 @@ fn tx_send_rx_poll_to_completion() { #[cfg(feature = "async")] #[test] fn tx_send_rx_poll_then_drop() { - let (tx, rx) = oneshot::channel::(); + let (tx, rx) = oneshot::async_channel::(); let rx_thread = spawn_named("rx_thread", move || { let mut rx = pin!(rx); @@ -305,7 +305,7 @@ fn tx_drop_rx_try_recv_then_drop() { #[cfg(feature = "async")] #[test] fn tx_drop_rx_poll_to_completion() { - let (tx, rx) = oneshot::channel::(); + let (tx, rx) = oneshot::async_channel::(); let rx_thread = spawn_named("rx_thread", move || { let mut rx = pin!(rx); @@ -330,7 +330,7 @@ fn tx_drop_rx_poll_to_completion() { #[cfg(feature = "async")] #[test] fn tx_drop_rx_poll_then_drop() { - let (tx, rx) = oneshot::channel::(); + let (tx, rx) = oneshot::async_channel::(); let rx_thread = spawn_named("rx_thread", move || { let mut rx = pin!(rx);