From 4a05925003eac31a3e45bed94f1d8b0eac0dd6a1 Mon Sep 17 00:00:00 2001 From: Wuelle Date: Wed, 8 May 2024 23:11:43 +0200 Subject: [PATCH 1/3] add abstraction traits for different wakers --- src/lib.rs | 2 + src/waker.rs | 121 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 src/waker.rs diff --git a/src/lib.rs b/src/lib.rs index 366f27a..6c83d11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -176,6 +176,8 @@ use alloc::boxed::Box; use loombox::Box; mod errors; +mod waker; + pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError}; /// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`]. diff --git a/src/waker.rs b/src/waker.rs new file mode 100644 index 0000000..37950da --- /dev/null +++ b/src/waker.rs @@ -0,0 +1,121 @@ +#[cfg(all(feature = "std", not(loom)))] +use std::thread; + +#[cfg(all(feature = "std", loom))] +use loom::thread; + +#[cfg(feature = "async")] +use core::task; + +pub trait Waker: private::Sealed { + fn wake(self); +} + +/// A waker that can be used in synchronous operations +/// +/// It is assumed that the [`wake`](Waker::wake) method +/// of a [SyncWaker] will always call [`thread::unpark()`](std::thread::unpark). +pub trait SyncWaker: Waker { + fn new() -> Self; +} + +#[cfg(feature = "async")] +/// A waker that can be used in async operations +pub trait AsyncWaker: Waker { + fn new(ctx: &task::Context<'_>) -> Self; +} + +#[cfg(feature = "std")] +impl Waker for thread::Thread { + fn wake(self) { + self.unpark() + } +} + +#[cfg(feature = "std")] +impl SyncWaker for thread::Thread { + fn new() -> Self { + thread::current() + } +} + +#[cfg(feature = "async")] +impl Waker for task::Waker { + fn wake(self) { + self.wake() + } +} + +#[cfg(feature = "async")] +impl AsyncWaker for task::Waker { + fn new(ctx: &task::Context<'_>) -> Self { + ctx.waker().clone() + } +} + +/// A Waker that can be used in both sync and async contexts +#[cfg(all(feature = "std", feature = "async"))] +pub enum GenericWaker { + Sync(thread::Thread), + Async(task::Waker), +} + +#[cfg(all(feature = "std", feature = "async"))] +impl Waker for GenericWaker { + fn wake(self) { + match self { + Self::Sync(sync_waker) => sync_waker.unpark(), + Self::Async(async_waker) => async_waker.wake(), + } + } +} + +#[cfg(all(feature = "std", feature = "async"))] +impl SyncWaker for GenericWaker { + fn new() -> Self { + Self::Sync(thread::current()) + } +} + +#[cfg(all(feature = "std", feature = "async"))] +impl AsyncWaker for GenericWaker { + fn new(ctx: &task::Context<'_>) -> Self { + Self::Async(ctx.waker().clone()) + } +} + +/// A waker that cannot be waited on +pub struct DummyWaker; + +impl Waker for DummyWaker { + fn wake(self) {} +} + +// FIXME: Once cfg_match! is within MSRV, use it here! + +#[cfg(all(feature = "std", feature = "async"))] +pub type DefaultWaker = GenericWaker; + +#[cfg(all(not(feature = "std"), feature = "async"))] +pub type DefaultWaker = task::Waker; + +#[cfg(all(feature = "std", not(feature = "async")))] +pub type DefaultWaker = thread::Thread; + +#[cfg(all(not(feature = "std"), not(feature = "async")))] +pub type DefaultWaker = DummyWaker; + +mod private { + pub trait Sealed {} +} + +impl private::Sealed for DummyWaker {} + +#[cfg(feature = "std")] +impl private::Sealed for thread::Thread {} + +#[cfg(feature = "async")] +impl private::Sealed for task::Waker {} + +#[cfg(all(feature = "std", feature = "async"))] +impl private::Sealed for GenericWaker {} From dda7b27196e78de20b8cca7f1c9a9dc55cf3f375 Mon Sep 17 00:00:00 2001 From: Wuelle Date: Wed, 8 May 2024 23:14:47 +0200 Subject: [PATCH 2/3] abstract Sender/Receiver over different wakers --- src/errors.rs | 24 ++-- src/lib.rs | 321 ++++++++++++++++++++++++++++---------------------- 2 files changed, 192 insertions(+), 153 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 1fd0de1..b42c9a9 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,4 +1,4 @@ -use super::{dealloc, Channel}; +use super::{dealloc, Channel, DefaultWaker}; use core::fmt; use core::mem; use core::ptr::NonNull; @@ -8,21 +8,21 @@ use core::ptr::NonNull; /// has already been dropped. /// /// The message that could not be sent can be retreived again with [`SendError::into_inner`]. -pub struct SendError { - channel_ptr: NonNull>, +pub struct SendError { + channel_ptr: NonNull>, } -unsafe impl Send for SendError {} -unsafe impl Sync for SendError {} +unsafe impl Send for SendError {} +unsafe impl Sync for SendError {} -impl SendError { +impl SendError { /// # Safety /// /// By calling this function, the caller semantically transfers ownership of the /// channel's resources to the created `SendError`. Thus the caller must ensure that the /// pointer is not used in a way which would violate this ownership transfer. Moreover, /// the caller must assert that the channel contains a valid, initialized message. - pub(crate) const unsafe fn new(channel_ptr: NonNull>) -> Self { + pub(crate) const unsafe fn new(channel_ptr: NonNull>) -> Self { Self { channel_ptr } } @@ -35,7 +35,7 @@ impl SendError { mem::forget(self); // SAFETY: we have ownership of the channel - let channel: &Channel = unsafe { channel_ptr.as_ref() }; + let channel: &Channel = unsafe { channel_ptr.as_ref() }; // SAFETY: we know that the message is initialized according to the safety requirements of // `new` @@ -54,7 +54,7 @@ impl SendError { } } -impl Drop for SendError { +impl Drop for SendError { fn drop(&mut self) { // SAFETY: we have ownership of the channel and require that the message is initialized // upon construction @@ -65,20 +65,20 @@ impl Drop for SendError { } } -impl fmt::Display for SendError { +impl fmt::Display for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { "sending on a closed channel".fmt(f) } } -impl fmt::Debug for SendError { +impl fmt::Debug for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "SendError<{}>(_)", stringify!(T)) } } #[cfg(feature = "std")] -impl std::error::Error for SendError {} +impl std::error::Error for SendError {} /// An error returned from the blocking [`Receiver::recv`](crate::Receiver::recv) method. /// diff --git a/src/lib.rs b/src/lib.rs index 6c83d11..b2dd50f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,10 +151,10 @@ use std::time::{Duration, Instant}; #[cfg(feature = "std")] mod thread { #[cfg(not(loom))] - pub use std::thread::{current, park, park_timeout, Thread}; + pub use std::thread::{park, park_timeout}; #[cfg(loom)] - pub use loom::thread::{current, park, Thread}; + pub use loom::thread::park; // loom does not support parking with a timeout. So we just // yield. This means that the "park" will "spuriously" wake up @@ -179,6 +179,13 @@ mod errors; mod waker; pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError}; +pub use waker::{DefaultWaker, SyncWaker, Waker}; + +#[cfg(feature = "async")] +pub use waker::AsyncWaker; + +#[cfg(all(feature = "std", feature = "async"))] +pub use waker::GenericWaker; /// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`]. pub fn channel() -> (Sender, Receiver) { @@ -201,8 +208,11 @@ pub fn channel() -> (Sender, Receiver) { } #[derive(Debug)] -pub struct Sender { - channel_ptr: NonNull>, +pub struct Sender +where + W: Waker, +{ + channel_ptr: NonNull>, // In reality we want contravariance, however we can't obtain that. // // Consider the following scenario: @@ -223,17 +233,23 @@ pub struct Sender { } #[derive(Debug)] -pub struct Receiver { +pub struct Receiver +where + W: Waker, +{ // Covariance is the right choice here. Consider the example presented in Sender, and you'll // see that if we replaced `rx` instead then we would get the expected behavior - channel_ptr: NonNull>, + channel_ptr: NonNull>, } -unsafe impl Send for Sender {} -unsafe impl Send for Receiver {} -impl Unpin for Receiver {} +unsafe impl Send for Sender where W: Waker {} +unsafe impl Send for Receiver where W: Waker {} +impl Unpin for Receiver where W: Waker {} -impl Sender { +impl Sender +where + W: Waker, +{ /// Sends `message` over the channel to the corresponding [`Receiver`]. /// /// Returns an error if the receiver has already been dropped. The message can @@ -247,7 +263,7 @@ impl Sender { /// depends on your executor. If this method returns a `SendError`, please mind that dropping /// the error involves running any drop implementation on the message type, and freeing the /// channel's heap allocation, which might or might not be lock-free. - pub fn send(self, message: T) -> Result<(), SendError> { + pub fn send(self, message: T) -> Result<(), SendError> { let channel_ptr = self.channel_ptr; // Don't run our Drop implementation if send was called, any cleanup now happens here @@ -304,7 +320,7 @@ impl Sender { // that allocation, and freeing the channel does not drop the waker since the // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of // whether or not the receive has completed by this point. - waker.unpark(); + waker.wake(); Ok(()) } @@ -338,13 +354,16 @@ impl Sender { /// Constructing multiple Senders from the same raw pointer leads to undefined behavior. pub unsafe fn from_raw(raw: *mut ()) -> Self { Self { - channel_ptr: NonNull::new_unchecked(raw as *mut Channel), + channel_ptr: NonNull::new_unchecked(raw as *mut Channel), _invariant: PhantomData, } } } -impl Drop for Sender { +impl Drop for Sender +where + W: Waker, +{ fn drop(&mut self) { // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or // DISCONNECTED states. If we are in the MESSAGE state, then we called @@ -380,7 +399,7 @@ impl Drop for Sender { // The Acquire ordering above ensures that the write of the DISCONNECTED state // happens-before unparking the receiver. - waker.unpark(); + waker.wake(); } // The receiver was already dropped. We are responsible for freeing the channel. DISCONNECTED => { @@ -395,7 +414,10 @@ impl Drop for Sender { } } -impl Receiver { +impl Receiver +where + W: Waker, +{ /// Checks if there is a message in the channel without blocking. Returns: /// * `Ok(message)` if there was a message in the channel. /// * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message. @@ -433,6 +455,41 @@ impl Receiver { } } + /// Begins the process of receiving on the channel by reference. If the message is already + /// ready, or the sender has disconnected, then this function will return the appropriate + /// Result immediately. Otherwise, it will write the waker to memory, check to see if the + /// sender has finished or disconnected again, and then will call `finish`. `finish` is + /// thus responsible for cleaning up the channel's resources appropriately before it returns, + + /// Consumes the Receiver, returning a raw pointer to the channel on the heap. + /// + /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing + /// to do with the returned pointer is to later reconstruct the Receiver with + /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed. + pub fn into_raw(self) -> *mut () { + let raw = self.channel_ptr.as_ptr() as *mut (); + mem::forget(self); + raw + } + + /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver. + /// + /// # Safety + /// + /// This pointer must have come from [`Receiver::into_raw`] with the same message type, `T`. + /// At most one Receiver must exist for a channel at any point in time. + /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior. + pub unsafe fn from_raw(raw: *mut ()) -> Self { + Self { + channel_ptr: NonNull::new_unchecked(raw as *mut Channel), + } + } +} + +impl Receiver +where + W: SyncWaker, +{ /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is /// disconnected. /// @@ -482,7 +539,7 @@ impl Receiver { // Write our waker instance to the channel. // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not // try to access the waker until it sees the state set to RECEIVING below - unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; + unsafe { channel.write_waker(W::new()) }; // Switch the state to RECEIVING. We need to do this in one atomic step in case the // sender disconnected or sent the message while we wrote the waker to memory. We @@ -624,6 +681,89 @@ impl Receiver { }) } + /// such as destroying the waker, for instance. + #[cfg(feature = "std")] + #[inline] + fn start_recv_ref( + &self, + disconnected_error: E, + finish: impl FnOnce(&Channel) -> Result, + ) -> Result { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. We prepare to park. + EMPTY => { + // Conditionally add a delay here to help the tests trigger the edge cases where + // the sender manages to be dropped or send something before we are able to store + // our waker object in the channel. + #[cfg(oneshot_test_delay)] + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Write our waker instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below + unsafe { channel.write_waker(W::new()) }; + + // ORDERING: we use release ordering on success so the sender can synchronize with + // our write of the waker. We use relaxed ordering on failure since the sender does + // not need to synchronize with our write and the individual match arms handle any + // additional synchronization + match channel + .state + .compare_exchange(EMPTY, RECEIVING, Release, Relaxed) + { + // We stored our waker, now we delegate to the callback to finish the receive + // operation + Ok(_) => finish(channel), + // The sender sent the message while we prepared to finish + Err(MESSAGE) => { + // See comments in `recv` for ordering and safety + + fence(Acquire); + + unsafe { channel.drop_waker() }; + + // ORDERING: the sender has been `mem::forget`-ed so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: The MESSAGE state tells us there is a correctly initialized + // message + Ok(unsafe { channel.take_message() }) + } + // The sender was dropped before sending anything while we prepared to park. + Err(DISCONNECTED) => { + // See comments in `recv` for safety + unsafe { channel.drop_waker() }; + Err(disconnected_error) + } + _ => unreachable!(), + } + } + // The sender sent the message. We take the message and mark the channel disconnected. + MESSAGE => { + // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be + // visible to us + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we are in the message state so the message is valid + Ok(unsafe { channel.take_message() }) + } + // The sender was dropped before sending anything, or we already received the message. + DISCONNECTED => Err(disconnected_error), + // The receiver must have been `Future::poll`ed prior to this call. + #[cfg(feature = "async")] + RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), + _ => unreachable!(), + } + } + /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns: /// * `Ok(message)` if there was a message in the channel before the timeout was reached. /// * `Err(Timeout)` if no message arrived on the channel before the timeout was reached. @@ -666,7 +806,7 @@ impl Receiver { /// If the sender is unparking us after a message send, the message must already have been /// written to the channel and an acquire memory barrier issued before calling this function #[cold] - unsafe fn wait_for_unpark(channel: &Channel) -> Result { + unsafe fn wait_for_unpark(channel: &Channel) -> Result { loop { thread::park(); @@ -756,122 +896,13 @@ impl Receiver { } }) } - - /// Begins the process of receiving on the channel by reference. If the message is already - /// ready, or the sender has disconnected, then this function will return the appropriate - /// Result immediately. Otherwise, it will write the waker to memory, check to see if the - /// sender has finished or disconnected again, and then will call `finish`. `finish` is - /// thus responsible for cleaning up the channel's resources appropriately before it returns, - /// such as destroying the waker, for instance. - #[cfg(feature = "std")] - #[inline] - fn start_recv_ref( - &self, - disconnected_error: E, - finish: impl FnOnce(&Channel) -> Result, - ) -> Result { - // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver - // is still alive, meaning that even if the sender was dropped then it would have observed - // the fact that we're still alive and left the responsibility of deallocating the - // channel to us, so `self.channel` is valid - let channel = unsafe { self.channel_ptr.as_ref() }; - - // ORDERING: synchronize with the write of the message - match channel.state.load(Acquire) { - // The sender is alive but has not sent anything yet. We prepare to park. - EMPTY => { - // Conditionally add a delay here to help the tests trigger the edge cases where - // the sender manages to be dropped or send something before we are able to store - // our waker object in the channel. - #[cfg(oneshot_test_delay)] - std::thread::sleep(std::time::Duration::from_millis(10)); - - // Write our waker instance to the channel. - // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not - // try to access the waker until it sees the state set to RECEIVING below - unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; - - // ORDERING: we use release ordering on success so the sender can synchronize with - // our write of the waker. We use relaxed ordering on failure since the sender does - // not need to synchronize with our write and the individual match arms handle any - // additional synchronization - match channel - .state - .compare_exchange(EMPTY, RECEIVING, Release, Relaxed) - { - // We stored our waker, now we delegate to the callback to finish the receive - // operation - Ok(_) => finish(channel), - // The sender sent the message while we prepared to finish - Err(MESSAGE) => { - // See comments in `recv` for ordering and safety - - fence(Acquire); - - unsafe { channel.drop_waker() }; - - // ORDERING: the sender has been `mem::forget`-ed so this update only - // needs to be visible to us - channel.state.store(DISCONNECTED, Relaxed); - - // SAFETY: The MESSAGE state tells us there is a correctly initialized - // message - Ok(unsafe { channel.take_message() }) - } - // The sender was dropped before sending anything while we prepared to park. - Err(DISCONNECTED) => { - // See comments in `recv` for safety - unsafe { channel.drop_waker() }; - Err(disconnected_error) - } - _ => unreachable!(), - } - } - // The sender sent the message. We take the message and mark the channel disconnected. - MESSAGE => { - // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be - // visible to us - channel.state.store(DISCONNECTED, Relaxed); - - // SAFETY: we are in the message state so the message is valid - Ok(unsafe { channel.take_message() }) - } - // The sender was dropped before sending anything, or we already received the message. - DISCONNECTED => Err(disconnected_error), - // The receiver must have been `Future::poll`ed prior to this call. - #[cfg(feature = "async")] - RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), - _ => unreachable!(), - } - } - - /// Consumes the Receiver, returning a raw pointer to the channel on the heap. - /// - /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing - /// to do with the returned pointer is to later reconstruct the Receiver with - /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed. - pub fn into_raw(self) -> *mut () { - let raw = self.channel_ptr.as_ptr() as *mut (); - mem::forget(self); - raw - } - - /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver. - /// - /// # Safety - /// - /// This pointer must have come from [`Receiver::into_raw`] with the same message type, `T`. - /// At most one Receiver must exist for a channel at any point in time. - /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior. - pub unsafe fn from_raw(raw: *mut ()) -> Self { - Self { - channel_ptr: NonNull::new_unchecked(raw as *mut Channel), - } - } } #[cfg(feature = "async")] -impl core::future::Future for Receiver { +impl core::future::Future for Receiver +where + W: AsyncWaker, +{ type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { @@ -964,7 +995,10 @@ impl core::future::Future for Receiver { } } -impl Drop for Receiver { +impl Drop for Receiver +where + W: Waker, +{ fn drop(&mut self) { // SAFETY: since the receiving side is still alive the sender would have observed that and // left deallocating the channel allocation to us. @@ -1026,13 +1060,13 @@ use states::*; /// * The message in the channel. This memory is uninitialized until the message is sent. /// * The waker instance for the thread or task that is currently receiving on this channel. /// This memory is uninitialized until the receiver starts receiving. -struct Channel { +struct Channel { state: AtomicU8, message: UnsafeCell>, - waker: UnsafeCell>, + waker: UnsafeCell>, } -impl Channel { +impl Channel { pub fn new() -> Self { Self { state: AtomicU8::new(EMPTY), @@ -1074,7 +1108,7 @@ impl Channel { #[cfg(any(feature = "std", feature = "async"))] unsafe fn with_waker_mut(&self, op: F) where - F: FnOnce(&mut MaybeUninit), + F: FnOnce(&mut MaybeUninit), { #[cfg(loom)] { @@ -1112,12 +1146,12 @@ impl Channel { #[cfg(any(feature = "std", feature = "async"))] #[inline(always)] - unsafe fn write_waker(&self, waker: ReceiverWaker) { + unsafe fn write_waker(&self, waker: W) { self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker)); } #[inline(always)] - unsafe fn take_waker(&self) -> ReceiverWaker { + unsafe fn take_waker(&self) -> W { #[cfg(loom)] { self.waker.with(|ptr| ptr::read(ptr)).assume_init() @@ -1134,17 +1168,22 @@ impl Channel { unsafe fn drop_waker(&self) { self.with_waker_mut(|slot| slot.assume_init_drop()); } +} +#[cfg(feature = "async")] +impl Channel +where + W: AsyncWaker, +{ /// # Safety /// /// * `Channel::waker` must not have a waker stored in it when calling this method. /// * Channel state must not be RECEIVING or UNPARKING when calling this method. - #[cfg(feature = "async")] unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll> { // Write our thread instance to the channel. // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not // try to access the waker until it sees the state set to RECEIVING below - self.write_waker(ReceiverWaker::task_waker(cx)); + self.write_waker(W::new(cx)); // ORDERING: we use release ordering on success so the sender can synchronize with // our write of the waker. We use relaxed ordering on failure since the sender does @@ -1240,6 +1279,6 @@ const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str = "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled"; #[inline] -pub(crate) unsafe fn dealloc(channel: NonNull>) { +pub(crate) unsafe fn dealloc(channel: NonNull>) { drop(Box::from_raw(channel.as_ptr())) } From 639584d5884e4af9b5c03c3bba22df3e97d97c11 Mon Sep 17 00:00:00 2001 From: Wuelle Date: Wed, 8 May 2024 23:45:21 +0200 Subject: [PATCH 3/3] remove old ReceiverWaker --- src/lib.rs | 47 ----------------------------------------------- 1 file changed, 47 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b2dd50f..6580a45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1227,53 +1227,6 @@ where } } -enum ReceiverWaker { - /// The receiver is waiting synchronously. Its thread is parked. - #[cfg(feature = "std")] - Thread(thread::Thread), - /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`. - #[cfg(feature = "async")] - Task(task::Waker), - /// A little hack to not make this enum an uninhibitable type when no features are enabled. - #[cfg(not(any(feature = "async", feature = "std")))] - _Uninhabited, -} - -impl ReceiverWaker { - #[cfg(feature = "std")] - pub fn current_thread() -> Self { - Self::Thread(thread::current()) - } - - #[cfg(feature = "async")] - pub fn task_waker(cx: &task::Context<'_>) -> Self { - Self::Task(cx.waker().clone()) - } - - pub fn unpark(self) { - match self { - #[cfg(feature = "std")] - ReceiverWaker::Thread(thread) => thread.unpark(), - #[cfg(feature = "async")] - ReceiverWaker::Task(waker) => waker.wake(), - #[cfg(not(any(feature = "async", feature = "std")))] - ReceiverWaker::_Uninhabited => unreachable!(), - } - } -} - -#[cfg(not(loom))] -#[test] -fn receiver_waker_size() { - let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) { - (false, false) => 0, - (false, true) => 16, - (true, false) => 8, - (true, true) => 16, - }; - assert_eq!(mem::size_of::(), expected); -} - #[cfg(all(feature = "std", feature = "async"))] const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str = "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";