From 150ad52a5b180ca2153cac4ec611b8dd34a9ad62 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 1 Mar 2023 15:52:20 +0200 Subject: [PATCH 1/8] Switch from `futures-channel` to `async-channel` in `utils/mpsc` --- Cargo.lock | 13 +- client/utils/Cargo.toml | 2 +- client/utils/src/mpsc.rs | 166 +++++------------- client/utils/src/notification.rs | 2 +- client/utils/src/pubsub.rs | 4 +- .../src/pubsub/tests/normal_operation.rs | 3 +- client/utils/src/status_sinks.rs | 8 +- 7 files changed, 61 insertions(+), 137 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91ae9ab422c3e..5e078683c06b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -326,6 +326,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-io" version = "1.12.0" @@ -9293,7 +9304,7 @@ dependencies = [ name = "sc-utils" version = "4.0.0-dev" dependencies = [ - "backtrace", + "async-channel", "futures", "futures-timer", "lazy_static", diff --git a/client/utils/Cargo.toml b/client/utils/Cargo.toml index e80588453597e..5f1fabeb59dd2 100644 --- a/client/utils/Cargo.toml +++ b/client/utils/Cargo.toml @@ -10,7 +10,7 @@ description = "I/O for Substrate runtimes" readme = "README.md" [dependencies] -backtrace = "0.3.67" +async-channel = "1.8.0" futures = "0.3.21" futures-timer = "3.0.2" lazy_static = "1.4.0" diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index df45e33ff8ee5..10af755312f20 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -21,51 +21,43 @@ #[cfg(not(feature = "metered"))] mod inner { // just aliased, non performance implications - use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; - pub type TracingUnboundedSender = UnboundedSender; - pub type TracingUnboundedReceiver = UnboundedReceiver; + pub type TracingUnboundedSender = async_channel::Sender; + pub type TracingUnboundedReceiver = async_channel::Receiver; - /// Alias `mpsc::unbounded` + /// Alias `async_channel::unbounded` pub fn tracing_unbounded( _key: &'static str, ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { - mpsc::unbounded() + async_channel::unbounded() } } #[cfg(feature = "metered")] mod inner { // tracing implementation + use async_channel::{Receiver, Sender, TrySendError, TryRecvError}; use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; - use backtrace::Backtrace; use futures::{ - channel::mpsc::{ - self, SendError, TryRecvError, TrySendError, UnboundedReceiver, UnboundedSender, - }, - sink::Sink, stream::{FusedStream, Stream}, task::{Context, Poll}, }; use log::error; use std::{ + backtrace::Backtrace, pin::Pin, sync::{ - atomic::{AtomicBool, AtomicI64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, }; - /// Wrapper Type around `UnboundedSender` that increases the global + /// Wrapper Type around `async_channel::Sender` that increases the global /// measure when a message is added #[derive(Debug)] pub struct TracingUnboundedSender { - inner: UnboundedSender, + inner: Sender, name: &'static str, - // To not bother with ordering and possible underflow errors of the unsigned counter - // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. - // It can turn < 0 though. - queue_size: Arc, - queue_size_warning: i64, + queue_size_warning: usize, warning_fired: Arc, creation_backtrace: Arc, } @@ -76,7 +68,6 @@ mod inner { Self { inner: self.inner.clone(), name: self.name, - queue_size: self.queue_size.clone(), queue_size_warning: self.queue_size_warning, warning_fired: self.warning_fired.clone(), creation_backtrace: self.creation_backtrace.clone(), @@ -84,95 +75,71 @@ mod inner { } } - /// Wrapper Type around `UnboundedReceiver` that decreases the global + /// Wrapper Type around `async_channel::Receiver` that decreases the global /// measure when a message is polled #[derive(Debug)] pub struct TracingUnboundedReceiver { - inner: UnboundedReceiver, + inner: Receiver, name: &'static str, - queue_size: Arc, } - /// Wrapper around `mpsc::unbounded` that tracks the in- and outflow via + /// Wrapper around `async_channel::unbounded` that tracks the in- and outflow via /// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows /// above the warning threshold. pub fn tracing_unbounded( name: &'static str, - queue_size_warning: i64, + queue_size_warning: usize, ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { - let (s, r) = mpsc::unbounded(); - let queue_size = Arc::new(AtomicI64::new(0)); + let (s, r) = async_channel::unbounded(); let sender = TracingUnboundedSender { inner: s, name, - queue_size: queue_size.clone(), queue_size_warning, warning_fired: Arc::new(AtomicBool::new(false)), - creation_backtrace: Arc::new(Backtrace::new_unresolved()), + creation_backtrace: Arc::new(Backtrace::force_capture()), }; - let receiver = TracingUnboundedReceiver { inner: r, name, queue_size }; + let receiver = TracingUnboundedReceiver { inner: r, name }; (sender, receiver) } impl TracingUnboundedSender { - /// Proxy function to mpsc::UnboundedSender - pub fn poll_ready(&self, ctx: &mut Context) -> Poll> { - self.inner.poll_ready(ctx) - } - - /// Proxy function to mpsc::UnboundedSender + /// Proxy function to `async_channel::Sender` pub fn is_closed(&self) -> bool { self.inner.is_closed() } - /// Proxy function to mpsc::UnboundedSender - pub fn close_channel(&self) { - self.inner.close_channel() - } - - /// Proxy function to mpsc::UnboundedSender - pub fn disconnect(&mut self) { - self.inner.disconnect() - } - - pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - // The underlying implementation of [`UnboundedSender::start_send`] is the same as - // [`UnboundedSender::unbounded_send`], so we just reuse the message counting and - // error reporting code from `unbounded_send`. - self.unbounded_send(msg).map_err(TrySendError::into_send_error) + /// Proxy function to `async_channel::Sender` + pub fn close(&self) -> bool { + self.inner.close() } - /// Proxy function to mpsc::UnboundedSender - pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { - self.inner.unbounded_send(msg).map(|s| { + /// Proxy function to `async_channel::Sender` + pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { + self.inner.try_send(msg).map(|s| { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); - let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed); - if queue_size == self.queue_size_warning && + if self.inner.len() >= self.queue_size_warning && self.warning_fired .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - // `warning_fired` and `queue_size` are not synchronized, so it's possible + // `warning_fired` and `len()` are not synchronized, so it's possible // that the warning is fired few times before the `warning_fired` is seen // by all threads. This seems better than introducing a mutex guarding them. - let mut backtrace = (*self.creation_backtrace).clone(); - backtrace.resolve(); error!( "The number of unprocessed messages in channel `{}` reached {}.\n\ - The channel was created at:\n{:?}", - self.name, self.queue_size_warning, backtrace, + The channel was created at:\n{}\n + Last message was sent from:\n{}", + self.name, + self.queue_size_warning, + self.creation_backtrace, + Backtrace::force_capture(), ); } s }) } - - /// Proxy function to mpsc::UnboundedSender - pub fn same_receiver(&self, other: &UnboundedSender) -> bool { - self.inner.same_receiver(other) - } } impl TracingUnboundedReceiver { @@ -184,8 +151,8 @@ mod inner { break } - match self.try_next() { - Ok(Some(..)) => count += 1, + match self.try_recv() { + Ok(_) => count += 1, _ => break, } } @@ -197,21 +164,18 @@ mod inner { } } - /// Proxy function to mpsc::UnboundedReceiver + /// Proxy function to `async_channel::Receiver` /// that consumes all messages first and updates the counter - pub fn close(&mut self) { + pub fn close(&mut self) -> bool { self.consume(); self.inner.close() } - /// Proxy function to mpsc::UnboundedReceiver + /// Proxy function to `async_channel::Receiver` /// that discounts the messages taken out - pub fn try_next(&mut self) -> Result, TryRecvError> { - self.inner.try_next().map(|s| { - if s.is_some() { - let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); - } + pub fn try_recv(&mut self) -> Result { + self.inner.try_recv().map(|s| { + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); s }) } @@ -233,7 +197,6 @@ mod inner { match Pin::new(&mut s.inner).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { - let _ = s.queue_size.fetch_sub(1, Ordering::Relaxed); UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); } Poll::Ready(msg) @@ -248,55 +211,6 @@ mod inner { self.inner.is_terminated() } } - - impl Sink for TracingUnboundedSender { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - TracingUnboundedSender::poll_ready(&*self, cx) - } - - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - TracingUnboundedSender::start_send(&mut *self, msg) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - self.disconnect(); - Poll::Ready(Ok(())) - } - } - - impl Sink for &TracingUnboundedSender { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - TracingUnboundedSender::poll_ready(*self, cx) - } - - fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.unbounded_send(msg).map_err(TrySendError::into_send_error) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - // The difference with `TracingUnboundedSender` is intentional. The underlying - // implementation differs for `UnboundedSender` and `&UnboundedSender`: - // the latter closes the channel completely with `close_channel()`, while the former - // only closes this specific sender with `disconnect()`. - self.close_channel(); - Poll::Ready(Ok(())) - } - } } pub use inner::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; diff --git a/client/utils/src/notification.rs b/client/utils/src/notification.rs index 7d866ffc8c019..dabb85d613cc9 100644 --- a/client/utils/src/notification.rs +++ b/client/utils/src/notification.rs @@ -79,7 +79,7 @@ impl NotificationStream { } /// Subscribe to a channel through which the generic payload can be received. - pub fn subscribe(&self, queue_size_warning: i64) -> NotificationReceiver { + pub fn subscribe(&self, queue_size_warning: usize) -> NotificationReceiver { let receiver = self.hub.subscribe((), queue_size_warning); NotificationReceiver { receiver } } diff --git a/client/utils/src/pubsub.rs b/client/utils/src/pubsub.rs index 8136aa0692c26..69cae60ffd145 100644 --- a/client/utils/src/pubsub.rs +++ b/client/utils/src/pubsub.rs @@ -164,7 +164,7 @@ impl Hub { /// Subscribe to this Hub using the `subs_key: K`. /// /// A subscription with a key `K` is possible if the Registry implements `Subscribe`. - pub fn subscribe(&self, subs_key: K, queue_size_warning: i64) -> Receiver + pub fn subscribe(&self, subs_key: K, queue_size_warning: usize) -> Receiver where R: Subscribe + Unsubscribe, { @@ -197,7 +197,7 @@ impl Hub { registry.dispatch(trigger, |subs_id, item| { if let Some(tx) = sinks.get_mut(subs_id) { - if let Err(send_err) = tx.unbounded_send(item) { + if let Err(send_err) = tx.try_send(item) { log::warn!("Sink with SubsID = {} failed to perform unbounded_send: {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", subs_id, send_err, std::any::type_name::(), std::any::type_name::(), std::any::type_name::()); diff --git a/client/utils/src/pubsub/tests/normal_operation.rs b/client/utils/src/pubsub/tests/normal_operation.rs index d4b614d7a8889..a3ea4f7ddee69 100644 --- a/client/utils/src/pubsub/tests/normal_operation.rs +++ b/client/utils/src/pubsub/tests/normal_operation.rs @@ -37,9 +37,8 @@ fn positive_rx_receives_relevant_messages_and_terminates_upon_hub_drop() { // Hub is disposed. The rx_01 should be over after that. std::mem::drop(hub); - assert!(!rx_01.is_terminated()); - assert_eq!(None, rx_01.next().await); assert!(rx_01.is_terminated()); + assert_eq!(None, rx_01.next().await); }); } diff --git a/client/utils/src/status_sinks.rs b/client/utils/src/status_sinks.rs index 51d78aa497752..6547f8952df7a 100644 --- a/client/utils/src/status_sinks.rs +++ b/client/utils/src/status_sinks.rs @@ -70,7 +70,7 @@ impl StatusSinks { /// /// The `interval` is the time period between two pushes on the sender. pub fn push(&self, interval: Duration, sender: TracingUnboundedSender) { - let _ = self.entries_tx.unbounded_send(YieldAfter { + let _ = self.entries_tx.try_send(YieldAfter { delay: Delay::new(interval), interval, sender: Some(sender), @@ -132,8 +132,8 @@ impl<'a, T> ReadySinkEvent<'a, T> { /// Sends an element on the sender. pub fn send(mut self, element: T) { if let Some(sender) = self.sender.take() { - if sender.unbounded_send(element).is_ok() { - let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + if sender.try_send(element).is_ok() { + let _ = self.sinks.entries_tx.try_send(YieldAfter { // Note that since there's a small delay between the moment a task is // woken up and the moment it is polled, the period is actually not // `interval` but `interval + `. We ignore this problem in @@ -154,7 +154,7 @@ impl<'a, T> Drop for ReadySinkEvent<'a, T> { return } - let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + let _ = self.sinks.entries_tx.try_send(YieldAfter { delay: Delay::new(self.interval), interval: self.interval, sender: Some(sender), From 9f272358df9428a254d57149bda33abb7f555d00 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 1 Mar 2023 17:19:12 +0200 Subject: [PATCH 2/8] Update client code to match new `utils/mpsc` API --- Cargo.lock | 1 + client/api/src/client.rs | 2 +- .../common/src/import_queue/basic_queue.rs | 71 ++++++++++--------- .../common/src/import_queue/buffered_link.rs | 12 ++-- .../grandpa/src/communication/gossip.rs | 2 +- .../grandpa/src/communication/periodic.rs | 2 +- .../grandpa/src/communication/tests.rs | 20 +++--- client/consensus/grandpa/src/import.rs | 11 ++- .../consensus/grandpa/src/until_imported.rs | 8 +-- client/network/src/service.rs | 68 +++++++----------- client/network/sync/src/service/chain_sync.rs | 20 +++--- client/network/sync/src/service/network.rs | 6 +- client/network/transactions/src/lib.rs | 4 +- client/offchain/src/api/http.rs | 11 ++- client/peerset/src/lib.rs | 18 ++--- client/rpc/src/system/mod.rs | 20 +++--- client/service/src/client/client.rs | 10 +-- client/service/src/task_manager/mod.rs | 2 +- client/service/test/Cargo.toml | 1 + client/service/test/src/client/mod.rs | 20 +++--- client/telemetry/src/lib.rs | 4 +- client/transaction-pool/src/graph/watcher.rs | 2 +- client/transaction-pool/src/revalidation.rs | 2 +- client/utils/src/mpsc.rs | 4 +- 24 files changed, 152 insertions(+), 169 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e078683c06b2..3e75e98984a71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9101,6 +9101,7 @@ name = "sc-service-test" version = "2.0.0" dependencies = [ "array-bytes", + "async-channel", "fdlimit", "futures", "log", diff --git a/client/api/src/client.rs b/client/api/src/client.rs index e334f2f9fb4f6..1a9d0f9e65381 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -290,7 +290,7 @@ impl UnpinHandleInner { impl Drop for UnpinHandleInner { fn drop(&mut self) { - if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) { + if let Err(err) = self.unpin_worker_sender.try_send(self.hash) { log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err); }; } diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index 06122491eedce..3b42d7b5b538a 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -118,8 +118,8 @@ impl BasicQueueHandle { } pub fn close(&mut self) { - self.justification_sender.close_channel(); - self.block_import_sender.close_channel(); + self.justification_sender.close(); + self.block_import_sender.close(); } } @@ -130,9 +130,7 @@ impl ImportQueueService for BasicQueueHandle { } trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len()); - let res = self - .block_import_sender - .unbounded_send(worker_messages::ImportBlocks(origin, blocks)); + let res = self.block_import_sender.try_send(worker_messages::ImportBlocks(origin, blocks)); if res.is_err() { log::error!( @@ -150,9 +148,12 @@ impl ImportQueueService for BasicQueueHandle { justifications: Justifications, ) { for justification in justifications { - let res = self.justification_sender.unbounded_send( - worker_messages::ImportJustification(who, hash, number, justification), - ); + let res = self.justification_sender.try_send(worker_messages::ImportJustification( + who, + hash, + number, + justification, + )); if res.is_err() { log::error!( @@ -597,11 +598,11 @@ mod tests { fn prioritizes_finality_work_over_block_import() { let (result_sender, mut result_port) = buffered_link::buffered_link(100_000); - let (worker, mut finality_sender, mut block_import_sender) = + let (worker, finality_sender, block_import_sender) = BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None); futures::pin_mut!(worker); - let mut import_block = |n| { + let import_block = |n| { let header = Header { parent_hash: Hash::random(), number: n, @@ -612,35 +613,37 @@ mod tests { let hash = header.hash(); - block_on(block_import_sender.send(worker_messages::ImportBlocks( - BlockOrigin::Own, - vec![IncomingBlock { - hash, - header: Some(header), - body: None, - indexed_body: None, - justifications: None, - origin: None, - allow_missing_state: false, - import_existing: false, - state: None, - skip_execution: false, - }], - ))) - .unwrap(); + block_import_sender + .try_send(worker_messages::ImportBlocks( + BlockOrigin::Own, + vec![IncomingBlock { + hash, + header: Some(header), + body: None, + indexed_body: None, + justifications: None, + origin: None, + allow_missing_state: false, + import_existing: false, + state: None, + skip_execution: false, + }], + )) + .unwrap(); hash }; - let mut import_justification = || { + let import_justification = || { let hash = Hash::random(); - block_on(finality_sender.send(worker_messages::ImportJustification( - libp2p::PeerId::random(), - hash, - 1, - (*b"TEST", Vec::new()), - ))) - .unwrap(); + finality_sender + .try_send(worker_messages::ImportJustification( + libp2p::PeerId::random(), + hash, + 1, + (*b"TEST", Vec::new()), + )) + .unwrap(); hash }; diff --git a/client/consensus/common/src/import_queue/buffered_link.rs b/client/consensus/common/src/import_queue/buffered_link.rs index 5c95cbc4260e1..6fcbff7ab2703 100644 --- a/client/consensus/common/src/import_queue/buffered_link.rs +++ b/client/consensus/common/src/import_queue/buffered_link.rs @@ -53,7 +53,7 @@ use super::BlockImportResult; /// can be used to buffer commands, and the receiver can be used to poll said commands and transfer /// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size. pub fn buffered_link( - queue_size_warning: i64, + queue_size_warning: usize, ) -> (BufferedLinkSender, BufferedLinkReceiver) { let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning); let tx = BufferedLinkSender { tx }; @@ -97,7 +97,7 @@ impl Link for BufferedLinkSender { ) { let _ = self .tx - .unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results)); + .try_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results)); } fn justification_imported( @@ -108,13 +108,11 @@ impl Link for BufferedLinkSender { success: bool, ) { let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, success); - let _ = self.tx.unbounded_send(msg); + let _ = self.tx.try_send(msg); } fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self - .tx - .unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number)); + let _ = self.tx.try_send(BlockImportWorkerMsg::RequestJustification(*hash, number)); } } @@ -166,7 +164,7 @@ impl BufferedLinkReceiver { } /// Close the channel. - pub fn close(&mut self) { + pub fn close(&mut self) -> bool { self.rx.get_mut().close() } } diff --git a/client/consensus/grandpa/src/communication/gossip.rs b/client/consensus/grandpa/src/communication/gossip.rs index cf476f8e32222..f35b6963d555f 100644 --- a/client/consensus/grandpa/src/communication/gossip.rs +++ b/client/consensus/grandpa/src/communication/gossip.rs @@ -1424,7 +1424,7 @@ impl GossipValidator { } fn report(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); + let _ = self.report_sender.try_send(PeerReport { who, cost_benefit }); } pub(super) fn do_validate( diff --git a/client/consensus/grandpa/src/communication/periodic.rs b/client/consensus/grandpa/src/communication/periodic.rs index f3f7572864e5c..d0c441484b679 100644 --- a/client/consensus/grandpa/src/communication/periodic.rs +++ b/client/consensus/grandpa/src/communication/periodic.rs @@ -47,7 +47,7 @@ impl NeighborPacketSender { who: Vec, neighbor_packet: NeighborPacket>, ) { - if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) { + if let Err(err) = self.0.try_send((who, neighbor_packet)) { debug!(target: LOG_TARGET, "Failed to send neighbor packet: {:?}", err); } } diff --git a/client/consensus/grandpa/src/communication/tests.rs b/client/consensus/grandpa/src/communication/tests.rs index 21e2e978c8767..604dfd95abbb9 100644 --- a/client/consensus/grandpa/src/communication/tests.rs +++ b/client/consensus/grandpa/src/communication/tests.rs @@ -74,7 +74,7 @@ impl NetworkPeers for TestNetwork { } fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); + let _ = self.sender.try_send(Event::Report(who, cost_benefit)); } fn disconnect_peer(&self, _who: PeerId, _protocol: ProtocolName) {} @@ -136,14 +136,14 @@ impl NetworkEventStream for TestNetwork { _name: &'static str, ) -> Pin + Send>> { let (tx, rx) = tracing_unbounded("test", 100_000); - let _ = self.sender.unbounded_send(Event::EventStream(tx)); + let _ = self.sender.try_send(Event::EventStream(tx)); Box::pin(rx) } } impl NetworkNotification for TestNetwork { fn write_notification(&self, target: PeerId, _protocol: ProtocolName, message: Vec) { - let _ = self.sender.unbounded_send(Event::WriteNotification(target, message)); + let _ = self.sender.try_send(Event::WriteNotification(target, message)); } fn notification_sender( @@ -157,7 +157,7 @@ impl NetworkNotification for TestNetwork { impl NetworkBlock> for TestNetwork { fn announce_block(&self, hash: Hash, _data: Option>) { - let _ = self.sender.unbounded_send(Event::Announce(hash)); + let _ = self.sender.try_send(Event::Announce(hash)); } fn new_best_block_imported(&self, _hash: Hash, _number: NumberFor) { @@ -365,14 +365,14 @@ fn good_commit_leads_to_relay() { let send_message = tester.filter_network_events(move |event| match event { Event::EventStream(sender) => { // Add the sending peer and send the commit - let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { + let _ = sender.try_send(NetworkEvent::NotificationStreamOpened { remote: sender_id, protocol: grandpa_protocol_name::NAME.into(), negotiated_fallback: None, role: ObservedRole::Full, }); - let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived { + let _ = sender.try_send(NetworkEvent::NotificationsReceived { remote: sender_id, messages: vec![( grandpa_protocol_name::NAME.into(), @@ -382,7 +382,7 @@ fn good_commit_leads_to_relay() { // Add a random peer which will be the recipient of this message let receiver_id = PeerId::random(); - let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { + let _ = sender.try_send(NetworkEvent::NotificationStreamOpened { remote: receiver_id, protocol: grandpa_protocol_name::NAME.into(), negotiated_fallback: None, @@ -400,7 +400,7 @@ fn good_commit_leads_to_relay() { let msg = gossip::GossipMessage::::Neighbor(update); - sender.unbounded_send(NetworkEvent::NotificationsReceived { + sender.try_send(NetworkEvent::NotificationsReceived { remote: receiver_id, messages: vec![( grandpa_protocol_name::NAME.into(), @@ -514,13 +514,13 @@ fn bad_commit_leads_to_report() { let sender_id = id; let send_message = tester.filter_network_events(move |event| match event { Event::EventStream(sender) => { - let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { + let _ = sender.try_send(NetworkEvent::NotificationStreamOpened { remote: sender_id, protocol: grandpa_protocol_name::NAME.into(), negotiated_fallback: None, role: ObservedRole::Full, }); - let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived { + let _ = sender.try_send(NetworkEvent::NotificationsReceived { remote: sender_id, messages: vec![( grandpa_protocol_name::NAME.into(), diff --git a/client/consensus/grandpa/src/import.rs b/client/consensus/grandpa/src/import.rs index c1f6cb566dae9..c30b4572e3355 100644 --- a/client/consensus/grandpa/src/import.rs +++ b/client/consensus/grandpa/src/import.rs @@ -497,9 +497,7 @@ where .map_err(|e| ConsensusError::ClientImport(e.to_string()))?; let new_set = NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities }; - let _ = self - .send_voter_commands - .unbounded_send(VoterCommand::ChangeAuthorities(new_set)); + let _ = self.send_voter_commands.try_send(VoterCommand::ChangeAuthorities(new_set)); Ok(ImportResult::Imported(aux)) }, Ok(r) => Ok(r), @@ -608,7 +606,7 @@ where // Send the pause signal after import but BEFORE sending a `ChangeAuthorities` message. if do_pause { - let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause( + let _ = self.send_voter_commands.try_send(VoterCommand::Pause( "Forced change scheduled after inactivity".to_string(), )); } @@ -628,8 +626,7 @@ where // they should import the block and discard the justification, and they will // then request a justification from sync if it's necessary (which they should // then be able to successfully validate). - let _ = - self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new)); + let _ = self.send_voter_commands.try_send(VoterCommand::ChangeAuthorities(new)); // we must clear all pending justifications requests, presumably they won't be // finalized hence why this forced changes was triggered @@ -808,7 +805,7 @@ where ); // send the command to the voter - let _ = self.send_voter_commands.unbounded_send(command); + let _ = self.send_voter_commands.try_send(command); }, Err(CommandOrError::Error(e)) => return Err(match e { diff --git a/client/consensus/grandpa/src/until_imported.rs b/client/consensus/grandpa/src/until_imported.rs index 14f32ecc88366..d047772abbf51 100644 --- a/client/consensus/grandpa/src/until_imported.rs +++ b/client/consensus/grandpa/src/until_imported.rs @@ -596,7 +596,7 @@ mod tests { let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000); self.known_blocks.lock().insert(hash, number); self.sender - .unbounded_send(BlockImportNotification::::new( + .try_send(BlockImportNotification::::new( hash, BlockOrigin::File, header, @@ -692,7 +692,7 @@ mod tests { None, ); - global_tx.unbounded_send(msg).unwrap(); + global_tx.try_send(msg).unwrap(); let work = until_imported.into_future(); @@ -720,7 +720,7 @@ mod tests { None, ); - global_tx.unbounded_send(msg).unwrap(); + global_tx.try_send(msg).unwrap(); // NOTE: needs to be cloned otherwise it is moved to the stream and // dropped too early. @@ -930,7 +930,7 @@ mod tests { || voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank); // we send the commit message and spawn the until_imported stream - global_tx.unbounded_send(unknown_commit()).unwrap(); + global_tx.try_send(unknown_commit()).unwrap(); let threads_pool = futures::executor::ThreadPool::new().unwrap(); threads_pool.spawn_ok(until_imported.into_future().map(|_| ())); diff --git a/client/network/src/service.rs b/client/network/src/service.rs index fd0b965b77372..cf1b91eccae2b 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -713,7 +713,7 @@ impl NetworkService { let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx }); + .try_send(ServiceToWorkerMsg::NetworkState { pending_response: tx }); match rx.await { Ok(v) => v.map_err(|_| ()), @@ -730,7 +730,7 @@ impl NetworkService { let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::PeersDebugInfo { pending_response: tx }); + .try_send(ServiceToWorkerMsg::PeersDebugInfo { pending_response: tx }); // The channel can only be closed if the network worker no longer exists. rx.await.map_err(|_| ()) @@ -744,7 +744,7 @@ impl NetworkService { let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx }); + .try_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx }); // The channel can only be closed if the network worker no longer exists. rx.await.map_err(|_| ()) @@ -844,7 +844,7 @@ where /// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an /// item on the [`NetworkWorker`] stream. fn get_value(&self, key: &KademliaKey) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone())); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::GetValue(key.clone())); } /// Start putting a value in the DHT. @@ -852,7 +852,7 @@ where /// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an /// item on the [`NetworkWorker`] stream. fn put_value(&self, key: KademliaKey, value: Vec) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::PutValue(key, value)); } } @@ -883,7 +883,7 @@ where let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx }); + .try_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx }); match rx.await { Ok(v) => v.map_err(|_| ()), @@ -899,19 +899,15 @@ where H: ExHashT, { fn set_authorized_peers(&self, peers: HashSet) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReserved(peers)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReserved(peers)); } fn set_authorized_only(&self, reserved_only: bool) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::SetReservedOnly(reserved_only)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReservedOnly(reserved_only)); } fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { @@ -919,15 +915,15 @@ where } fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(who, protocol)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::DisconnectPeer(who, protocol)); } fn accept_unreserved_peers(&self) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(false)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReservedOnly(false)); } fn deny_unreserved_peers(&self) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(true)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReservedOnly(true)); } fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> { @@ -938,13 +934,13 @@ where let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer.peer_id, peer.multiaddr)); - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddReserved(peer.peer_id)); + .try_send(ServiceToWorkerMsg::AddKnownAddress(peer.peer_id, peer.multiaddr)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddReserved(peer.peer_id)); Ok(()) } fn remove_reserved_peer(&self, peer_id: PeerId) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RemoveReserved(peer_id)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::RemoveReserved(peer_id)); } fn set_reserved_peers( @@ -965,15 +961,11 @@ where peers.insert(peer_id); if !addr.is_empty() { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } } - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::SetPeersetReserved(protocol, peers)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetPeersetReserved(protocol, peers)); Ok(()) } @@ -992,13 +984,11 @@ where } if !addr.is_empty() { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::AddSetReserved(protocol.clone(), peer_id)); + .try_send(ServiceToWorkerMsg::AddSetReserved(protocol.clone(), peer_id)); } Ok(()) @@ -1008,7 +998,7 @@ where for peer_id in peers.into_iter() { let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::RemoveSetReserved(protocol.clone(), peer_id)); + .try_send(ServiceToWorkerMsg::RemoveSetReserved(protocol.clone(), peer_id)); } } @@ -1026,13 +1016,11 @@ where } if !addr.is_empty() { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::AddToPeersSet(protocol.clone(), peer_id)); + .try_send(ServiceToWorkerMsg::AddToPeersSet(protocol.clone(), peer_id)); } Ok(()) @@ -1042,7 +1030,7 @@ where for peer_id in peers.into_iter() { let _ = self .to_worker - .unbounded_send(ServiceToWorkerMsg::RemoveFromPeersSet(protocol.clone(), peer_id)); + .try_send(ServiceToWorkerMsg::RemoveFromPeersSet(protocol.clone(), peer_id)); } } @@ -1058,7 +1046,7 @@ where { fn event_stream(&self, name: &'static str) -> Pin + Send>> { let (tx, rx) = out_events::channel(name, 100_000); - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::EventStream(tx)); Box::pin(rx) } } @@ -1161,7 +1149,7 @@ where tx: oneshot::Sender, RequestFailure>>, connect: IfDisconnected, ) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { + let _ = self.to_worker.try_send(ServiceToWorkerMsg::Request { target, protocol: protocol.into(), request, @@ -1177,13 +1165,11 @@ where H: ExHashT, { fn announce_block(&self, hash: B::Hash, data: Option>) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::AnnounceBlock(hash, data)); } fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::NewBestBlockImported(hash, number)); + let _ = self.to_worker.try_send(ServiceToWorkerMsg::NewBestBlockImported(hash, number)); } } diff --git a/client/network/sync/src/service/chain_sync.rs b/client/network/sync/src/service/chain_sync.rs index 4d47899a31727..84ae89b1b21e2 100644 --- a/client/network/sync/src/service/chain_sync.rs +++ b/client/network/sync/src/service/chain_sync.rs @@ -55,7 +55,7 @@ impl ChainSyncInterfaceHandle { /// Notify ChainSync about finalized block pub fn on_block_finalized(&self, hash: B::Hash, number: NumberFor) { - let _ = self.tx.unbounded_send(ToServiceCommand::BlockFinalized(hash, number)); + let _ = self.tx.try_send(ToServiceCommand::BlockFinalized(hash, number)); } /// Get sync status @@ -63,7 +63,7 @@ impl ChainSyncInterfaceHandle { /// Returns an error if `ChainSync` has terminated. pub async fn status(&self) -> Result, ()> { let (tx, rx) = oneshot::channel(); - let _ = self.tx.unbounded_send(ToServiceCommand::Status { pending_response: tx }); + let _ = self.tx.try_send(ToServiceCommand::Status { pending_response: tx }); rx.await.map_err(|_| ()) } @@ -81,9 +81,7 @@ impl NetworkSyncForkRequest> /// /// Passing empty `peers` set effectively removes the sync request. fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - let _ = self - .tx - .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number)); + let _ = self.tx.try_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number)); } } @@ -93,11 +91,11 @@ impl JustificationSyncLink for ChainSyncInterfaceHandle { /// On success, the justification will be passed to the import queue that was part at /// initialization as part of the configuration. fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number)); + let _ = self.tx.try_send(ToServiceCommand::RequestJustification(*hash, number)); } fn clear_justification_requests(&self) { - let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests); + let _ = self.tx.try_send(ToServiceCommand::ClearJustificationRequests); } } @@ -108,9 +106,7 @@ impl Link for ChainSyncInterfaceHandle { count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, ) { - let _ = self - .tx - .unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results)); + let _ = self.tx.try_send(ToServiceCommand::BlocksProcessed(imported, count, results)); } fn justification_imported( @@ -122,10 +118,10 @@ impl Link for ChainSyncInterfaceHandle { ) { let _ = self .tx - .unbounded_send(ToServiceCommand::JustificationImported(who, *hash, number, success)); + .try_send(ToServiceCommand::JustificationImported(who, *hash, number, success)); } fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number)); + let _ = self.tx.try_send(ToServiceCommand::RequestJustification(*hash, number)); } } diff --git a/client/network/sync/src/service/network.rs b/client/network/sync/src/service/network.rs index de7e255f5a895..8495eea6ba704 100644 --- a/client/network/sync/src/service/network.rs +++ b/client/network/sync/src/service/network.rs @@ -73,12 +73,12 @@ impl NetworkServiceHandle { /// Report peer pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit)); + let _ = self.tx.try_send(ToServiceCommand::ReportPeer(who, cost_benefit)); } /// Disconnect peer pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { - let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol)); + let _ = self.tx.try_send(ToServiceCommand::DisconnectPeer(who, protocol)); } /// Send request to peer @@ -92,7 +92,7 @@ impl NetworkServiceHandle { ) { let _ = self .tx - .unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect)); + .try_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect)); } } diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index d4d08d2ab7545..2a5590bc8274e 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -207,7 +207,7 @@ impl TransactionsHandlerController { /// All transactions will be fetched from the `TransactionPool` that was passed at /// initialization as part of the configuration and propagated to peers. pub fn propagate_transactions(&self) { - let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions); + let _ = self.to_handler.try_send(ToHandler::PropagateTransactions); } /// You must call when new a transaction is imported by the transaction pool. @@ -215,7 +215,7 @@ impl TransactionsHandlerController { /// This transaction will be fetched from the `TransactionPool` that was passed at /// initialization as part of the configuration and propagated to peers. pub fn propagate_transaction(&self, hash: H) { - let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash)); + let _ = self.to_handler.try_send(ToHandler::PropagateTransaction(hash)); } } diff --git a/client/offchain/src/api/http.rs b/client/offchain/src/api/http.rs index e3872614eae4d..dd3a949f953c4 100644 --- a/client/offchain/src/api/http.rs +++ b/client/offchain/src/api/http.rs @@ -237,9 +237,8 @@ impl HttpApi { HttpApiRequest::NotDispatched(request, sender) => { tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Added new body chunk"); // If the request is not dispatched yet, dispatch it and loop again. - let _ = self - .to_worker - .unbounded_send(ApiToWorker::Dispatch { id: request_id, request }); + let _ = + self.to_worker.try_send(ApiToWorker::Dispatch { id: request_id, request }); HttpApiRequest::Dispatched(Some(sender)) }, @@ -348,7 +347,7 @@ impl HttpApi { _ => unreachable!("we checked for NotDispatched above; qed"), }; - let _ = self.to_worker.unbounded_send(ApiToWorker::Dispatch { id: *id, request }); + let _ = self.to_worker.try_send(ApiToWorker::Dispatch { id: *id, request }); // We also destroy the sender in order to forbid writing more data. self.requests.insert(*id, HttpApiRequest::Dispatched(None)); @@ -665,7 +664,7 @@ impl Future for HttpWorker { }, Poll::Ready(Ok(response)) => response, Poll::Ready(Err(error)) => { - let _ = me.to_api.unbounded_send(WorkerToApi::Fail { id, error }); + let _ = me.to_api.try_send(WorkerToApi::Fail { id, error }); continue // don't insert the request back }, }; @@ -675,7 +674,7 @@ impl Future for HttpWorker { let (status_code, headers) = (head.status, head.headers); let (body_tx, body_rx) = mpsc::channel(3); - let _ = me.to_api.unbounded_send(WorkerToApi::Response { + let _ = me.to_api.try_send(WorkerToApi::Response { id, status_code, headers, diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index e5393acbaa32f..9706c20df5e06 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -131,47 +131,47 @@ impl PeersetHandle { /// > **Note**: Keep in mind that the networking has to know an address for this node, /// > otherwise it will not be able to connect to it. pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id)); + let _ = self.tx.try_send(Action::AddReservedPeer(set_id, peer_id)); } /// Remove a previously-added reserved peer. /// /// Has no effect if the node was not a reserved peer. pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id)); + let _ = self.tx.try_send(Action::RemoveReservedPeer(set_id, peer_id)); } /// Sets whether or not the peerset only has connections with nodes marked as reserved for /// the given set. pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) { - let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved)); + let _ = self.tx.try_send(Action::SetReservedOnly(set_id, reserved)); } /// Set reserved peers to the new set. pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet) { - let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids)); + let _ = self.tx.try_send(Action::SetReservedPeers(set_id, peer_ids)); } /// Reports an adjustment to the reputation of the given peer. pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) { - let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); + let _ = self.tx.try_send(Action::ReportPeer(peer_id, score_diff)); } /// Add a peer to a set. pub fn add_to_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddToPeersSet(set_id, peer_id)); + let _ = self.tx.try_send(Action::AddToPeersSet(set_id, peer_id)); } /// Remove a peer from a set. pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id)); + let _ = self.tx.try_send(Action::RemoveFromPeersSet(set_id, peer_id)); } /// Returns the reputation value of the peer. pub async fn peer_reputation(self, peer_id: PeerId) -> Result { let (tx, rx) = oneshot::channel(); - let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx)); + let _ = self.tx.try_send(Action::PeerReputation(peer_id, tx)); // The channel can only be closed if the peerset no longer exists. rx.await.map_err(|_| ()) @@ -678,7 +678,7 @@ impl Peerset { // We don't immediately perform the adjustments in order to have state consistency. We // don't want the reporting here to take priority over messages sent using the // `PeersetHandle`. - let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); + let _ = self.tx.try_send(Action::ReportPeer(peer_id, score_diff)); } /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. diff --git a/client/rpc/src/system/mod.rs b/client/rpc/src/system/mod.rs index 0da4f8d0e211c..6643b52e9905d 100644 --- a/client/rpc/src/system/mod.rs +++ b/client/rpc/src/system/mod.rs @@ -106,19 +106,19 @@ impl SystemApiServer::Number> async fn system_health(&self) -> RpcResult { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::Health(tx)); + let _ = self.send_back.try_send(Request::Health(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_local_peer_id(&self) -> RpcResult { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::LocalPeerId(tx)); + let _ = self.send_back.try_send(Request::LocalPeerId(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_local_listen_addresses(&self) -> RpcResult> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::LocalListenAddresses(tx)); + let _ = self.send_back.try_send(Request::LocalListenAddresses(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } @@ -127,21 +127,21 @@ impl SystemApiServer::Number> ) -> RpcResult::Number>>> { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::Peers(tx)); + let _ = self.send_back.try_send(Request::Peers(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_network_state(&self) -> RpcResult { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::NetworkState(tx)); + let _ = self.send_back.try_send(Request::NetworkState(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_add_reserved_peer(&self, peer: String) -> RpcResult<()> { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::NetworkAddReservedPeer(peer, tx)); + let _ = self.send_back.try_send(Request::NetworkAddReservedPeer(peer, tx)); match rx.await { Ok(Ok(())) => Ok(()), Ok(Err(e)) => Err(JsonRpseeError::from(e)), @@ -152,7 +152,7 @@ impl SystemApiServer::Number> async fn system_remove_reserved_peer(&self, peer: String) -> RpcResult<()> { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::NetworkRemoveReservedPeer(peer, tx)); + let _ = self.send_back.try_send(Request::NetworkRemoveReservedPeer(peer, tx)); match rx.await { Ok(Ok(())) => Ok(()), Ok(Err(e)) => Err(JsonRpseeError::from(e)), @@ -162,19 +162,19 @@ impl SystemApiServer::Number> async fn system_reserved_peers(&self) -> RpcResult> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::NetworkReservedPeers(tx)); + let _ = self.send_back.try_send(Request::NetworkReservedPeers(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_node_roles(&self) -> RpcResult> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::NodeRoles(tx)); + let _ = self.send_back.try_send(Request::NodeRoles(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_sync_state(&self) -> RpcResult::Number>> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.unbounded_send(Request::SyncState(tx)); + let _ = self.send_back.try_send(Request::SyncState(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 4b5822ae0e017..a2ce5229619e2 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1024,7 +1024,7 @@ where "best" => ?notification.hash, ); - sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + sinks.retain(|sink| sink.try_send(notification.clone()).is_ok()); Ok(()) } @@ -1068,24 +1068,24 @@ where trigger_storage_changes_notification(); self.import_notification_sinks .lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + .retain(|sink| sink.try_send(notification.clone()).is_ok()); self.every_import_notification_sinks .lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + .retain(|sink| sink.try_send(notification.clone()).is_ok()); }, ImportNotificationAction::RecentBlock => { trigger_storage_changes_notification(); self.import_notification_sinks .lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + .retain(|sink| sink.try_send(notification.clone()).is_ok()); self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); }, ImportNotificationAction::EveryBlock => { self.every_import_notification_sinks .lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + .retain(|sink| sink.try_send(notification.clone()).is_ok()); self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); }, diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 8dc4748b089ee..afccee9033e36 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -281,7 +281,7 @@ impl SpawnEssentialTaskHandle { let essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task).catch_unwind().map(move |_| { log::error!("Essential task `{}` failed. Shutting down service.", name); - let _ = essential_failed.close_channel(); + let _ = essential_failed.close(); }); let _ = self.inner.spawn_inner(name, group, essential_task, task_type); diff --git a/client/service/test/Cargo.toml b/client/service/test/Cargo.toml index 488fe76c333bc..42255c0f6cac1 100644 --- a/client/service/test/Cargo.toml +++ b/client/service/test/Cargo.toml @@ -12,6 +12,7 @@ repository = "https://github.com/paritytech/substrate/" targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-channel = "1.8.0" array-bytes = "4.1" fdlimit = "0.2.1" futures = "0.3.21" diff --git a/client/service/test/src/client/mod.rs b/client/service/test/src/client/mod.rs index f8988c5a454c3..c7b19ca8ba27f 100644 --- a/client/service/test/src/client/mod.rs +++ b/client/service/test/src/client/mod.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use async_channel::TryRecvError; use futures::executor::block_on; use parity_scale_codec::{Decode, Encode, Joiner}; use sc_block_builder::BlockBuilderProvider; @@ -175,16 +176,17 @@ fn finality_notification_check( finalized: &[Hash], stale_heads: &[Hash], ) { - match notifications.try_next() { - Ok(Some(notif)) => { + match notifications.try_recv() { + Ok(notif) => { let stale_heads_expected: HashSet<_> = stale_heads.iter().collect(); let stale_heads: HashSet<_> = notif.stale_heads.iter().collect(); assert_eq!(notif.tree_route.as_ref(), &finalized[..finalized.len() - 1]); assert_eq!(notif.hash, *finalized.last().unwrap()); assert_eq!(stale_heads, stale_heads_expected); }, - Ok(None) => panic!("unexpected notification result, client send channel was closed"), - Err(_) => assert!(finalized.is_empty()), + Err(TryRecvError::Closed) => + panic!("unexpected notification result, client send channel was closed"), + Err(TryRecvError::Empty) => assert!(finalized.is_empty()), } } @@ -983,7 +985,7 @@ fn import_with_justification() { finality_notification_check(&mut finality_notifications, &[a1.hash(), a2.hash()], &[]); finality_notification_check(&mut finality_notifications, &[a3.hash()], &[]); - assert!(finality_notifications.try_next().is_err()); + assert!(matches!(finality_notifications.try_recv().unwrap_err(), TryRecvError::Empty)); } #[test] @@ -1038,7 +1040,7 @@ fn importing_diverged_finalized_block_should_trigger_reorg() { assert_eq!(client.chain_info().finalized_hash, b1.hash()); finality_notification_check(&mut finality_notifications, &[b1.hash()], &[a2.hash()]); - assert!(finality_notifications.try_next().is_err()); + assert!(matches!(finality_notifications.try_recv().unwrap_err(), TryRecvError::Empty)); } #[test] @@ -1124,7 +1126,7 @@ fn finalizing_diverged_block_should_trigger_reorg() { finality_notification_check(&mut finality_notifications, &[b1.hash()], &[]); finality_notification_check(&mut finality_notifications, &[b2.hash(), b3.hash()], &[a2.hash()]); - assert!(finality_notifications.try_next().is_err()); + assert!(matches!(finality_notifications.try_recv().unwrap_err(), TryRecvError::Empty)); } #[test] @@ -1227,7 +1229,7 @@ fn finality_notifications_content() { finality_notification_check(&mut finality_notifications, &[a1.hash(), a2.hash()], &[c1.hash()]); finality_notification_check(&mut finality_notifications, &[d3.hash(), d4.hash()], &[b2.hash()]); - assert!(finality_notifications.try_next().is_err()); + assert!(matches!(finality_notifications.try_recv().unwrap_err(), TryRecvError::Empty)); } #[test] @@ -1437,7 +1439,7 @@ fn doesnt_import_blocks_that_revert_finality() { finality_notification_check(&mut finality_notifications, &[a3.hash()], &[b2.hash()]); - assert!(finality_notifications.try_next().is_err()); + assert!(matches!(finality_notifications.try_recv().unwrap_err(), TryRecvError::Empty)); } #[test] diff --git a/client/telemetry/src/lib.rs b/client/telemetry/src/lib.rs index 113d8303a20f6..9487adbe53435 100644 --- a/client/telemetry/src/lib.rs +++ b/client/telemetry/src/lib.rs @@ -409,7 +409,7 @@ impl Telemetry { let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?; self.register_sender - .unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message }) + .try_send(Register::Telemetry { id: self.id, endpoints, connection_message }) .map_err(|_| Error::TelemetryWorkerDropped) } @@ -469,7 +469,7 @@ pub struct TelemetryConnectionNotifier { impl TelemetryConnectionNotifier { fn on_connect_stream(&self) -> ConnectionNotifierReceiver { let (message_sender, message_receiver) = connection_notifier_channel(); - if let Err(err) = self.register_sender.unbounded_send(Register::Notifier { + if let Err(err) = self.register_sender.try_send(Register::Notifier { addresses: self.addresses.clone(), connection_notifier: message_sender, }) { diff --git a/client/transaction-pool/src/graph/watcher.rs b/client/transaction-pool/src/graph/watcher.rs index fc440771d7bbc..886f7aac0222f 100644 --- a/client/transaction-pool/src/graph/watcher.rs +++ b/client/transaction-pool/src/graph/watcher.rs @@ -129,6 +129,6 @@ impl Sender { } fn send(&mut self, status: TransactionStatus) { - self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok()) + self.receivers.retain(|sender| sender.try_send(status.clone()).is_ok()) } } diff --git a/client/transaction-pool/src/revalidation.rs b/client/transaction-pool/src/revalidation.rs index bd8f3dd6498f3..2b7c9c77590ba 100644 --- a/client/transaction-pool/src/revalidation.rs +++ b/client/transaction-pool/src/revalidation.rs @@ -340,7 +340,7 @@ where } if let Some(ref to_worker) = self.background { - if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) { + if let Err(e) = to_worker.try_send(WorkerPayload { at, transactions }) { log::warn!(target: LOG_TARGET, "Failed to update background worker: {:?}", e); } } else { diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index 10af755312f20..e7696d9b7d3fa 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -35,8 +35,8 @@ mod inner { #[cfg(feature = "metered")] mod inner { // tracing implementation - use async_channel::{Receiver, Sender, TrySendError, TryRecvError}; use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; + use async_channel::{Receiver, Sender, TryRecvError, TrySendError}; use futures::{ stream::{FusedStream, Stream}, task::{Context, Poll}, @@ -127,7 +127,7 @@ mod inner { // that the warning is fired few times before the `warning_fired` is seen // by all threads. This seems better than introducing a mutex guarding them. error!( - "The number of unprocessed messages in channel `{}` reached {}.\n\ + "The number of unprocessed messages in channel `{}` exceeded {}.\n\ The channel was created at:\n{}\n Last message was sent from:\n{}", self.name, From aeaf93fe226073a6085e3052822c0f4d1cb98600 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 2 Mar 2023 11:10:40 +0200 Subject: [PATCH 3/8] Rename `TracingUnboundedSender::try_send` back to `unbounded_send` --- client/utils/src/mpsc.rs | 55 ++++++++++++++++++++++---------- client/utils/src/pubsub.rs | 2 +- client/utils/src/status_sinks.rs | 8 ++--- 3 files changed, 43 insertions(+), 22 deletions(-) diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index e7696d9b7d3fa..b82789f0ad95f 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -20,15 +20,36 @@ #[cfg(not(feature = "metered"))] mod inner { - // just aliased, non performance implications - pub type TracingUnboundedSender = async_channel::Sender; + /// Simple wrapper around [`async_channel::Sender`], no performance implications. + pub struct TracingUnboundedSender(async_channel::Sender); + + /// Just alias for [`async_channel::Receiver`], no performance implications. pub type TracingUnboundedReceiver = async_channel::Receiver; - /// Alias `async_channel::unbounded` + /// Proxy to [`async_channel::unbounded`]. pub fn tracing_unbounded( _key: &'static str, ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { - async_channel::unbounded() + let (tx, rx) = async_channel::unbounded(); + (TracingUnboundedSender(tx), rx) + } + + // The only reason this impl is needed is because we rename `try_send` -> `unbounded_send` + impl TracingUnboundedSender { + /// Proxy function to [`async_channel::Sender`]. + pub fn is_closed(&self) -> bool { + self.0.is_closed() + } + + /// Proxy function to [`async_channel::Sender`]. + pub fn close(&self) -> bool { + self.0.close() + } + + /// Proxy function to `async_channel::Sender::try_send`. + pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { + self.0.try_send(msg) + } } } @@ -51,8 +72,8 @@ mod inner { }, }; - /// Wrapper Type around `async_channel::Sender` that increases the global - /// measure when a message is added + /// Wrapper Type around [`async_channel::Sender`] that increases the global + /// measure when a message is added. #[derive(Debug)] pub struct TracingUnboundedSender { inner: Sender, @@ -75,15 +96,15 @@ mod inner { } } - /// Wrapper Type around `async_channel::Receiver` that decreases the global - /// measure when a message is polled + /// Wrapper Type around [`async_channel::Receiver`] that decreases the global + /// measure when a message is polled. #[derive(Debug)] pub struct TracingUnboundedReceiver { inner: Receiver, name: &'static str, } - /// Wrapper around `async_channel::unbounded` that tracks the in- and outflow via + /// Wrapper around [`async_channel::unbounded`] that tracks the in- and outflow via /// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows /// above the warning threshold. pub fn tracing_unbounded( @@ -103,18 +124,18 @@ mod inner { } impl TracingUnboundedSender { - /// Proxy function to `async_channel::Sender` + /// Proxy function to [`async_channel::Sender`]. pub fn is_closed(&self) -> bool { self.inner.is_closed() } - /// Proxy function to `async_channel::Sender` + /// Proxy function to [`async_channel::Sender`]. pub fn close(&self) -> bool { self.inner.close() } - /// Proxy function to `async_channel::Sender` - pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { + /// Proxy function to `async_channel::Sender::try_send`. + pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { self.inner.try_send(msg).map(|s| { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); @@ -164,15 +185,15 @@ mod inner { } } - /// Proxy function to `async_channel::Receiver` - /// that consumes all messages first and updates the counter + /// Proxy function to [`async_channel::Receiver`] + /// that consumes all messages first and updates the counter. pub fn close(&mut self) -> bool { self.consume(); self.inner.close() } - /// Proxy function to `async_channel::Receiver` - /// that discounts the messages taken out + /// Proxy function to [`async_channel::Receiver`] + /// that discounts the messages taken out. pub fn try_recv(&mut self) -> Result { self.inner.try_recv().map(|s| { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); diff --git a/client/utils/src/pubsub.rs b/client/utils/src/pubsub.rs index 69cae60ffd145..5293fa42ed94c 100644 --- a/client/utils/src/pubsub.rs +++ b/client/utils/src/pubsub.rs @@ -197,7 +197,7 @@ impl Hub { registry.dispatch(trigger, |subs_id, item| { if let Some(tx) = sinks.get_mut(subs_id) { - if let Err(send_err) = tx.try_send(item) { + if let Err(send_err) = tx.unbounded_send(item) { log::warn!("Sink with SubsID = {} failed to perform unbounded_send: {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", subs_id, send_err, std::any::type_name::(), std::any::type_name::(), std::any::type_name::()); diff --git a/client/utils/src/status_sinks.rs b/client/utils/src/status_sinks.rs index 6547f8952df7a..51d78aa497752 100644 --- a/client/utils/src/status_sinks.rs +++ b/client/utils/src/status_sinks.rs @@ -70,7 +70,7 @@ impl StatusSinks { /// /// The `interval` is the time period between two pushes on the sender. pub fn push(&self, interval: Duration, sender: TracingUnboundedSender) { - let _ = self.entries_tx.try_send(YieldAfter { + let _ = self.entries_tx.unbounded_send(YieldAfter { delay: Delay::new(interval), interval, sender: Some(sender), @@ -132,8 +132,8 @@ impl<'a, T> ReadySinkEvent<'a, T> { /// Sends an element on the sender. pub fn send(mut self, element: T) { if let Some(sender) = self.sender.take() { - if sender.try_send(element).is_ok() { - let _ = self.sinks.entries_tx.try_send(YieldAfter { + if sender.unbounded_send(element).is_ok() { + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { // Note that since there's a small delay between the moment a task is // woken up and the moment it is polled, the period is actually not // `interval` but `interval + `. We ignore this problem in @@ -154,7 +154,7 @@ impl<'a, T> Drop for ReadySinkEvent<'a, T> { return } - let _ = self.sinks.entries_tx.try_send(YieldAfter { + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { delay: Delay::new(self.interval), interval: self.interval, sender: Some(sender), From 281120af74e20df1c1f2d9fbb3f4a1896ee43c66 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 2 Mar 2023 11:18:40 +0200 Subject: [PATCH 4/8] Partially revert "Update client code to match new `utils/mpsc` API" --- client/api/src/client.rs | 2 +- .../common/src/import_queue/basic_queue.rs | 17 +++-- .../common/src/import_queue/buffered_link.rs | 8 ++- .../grandpa/src/communication/gossip.rs | 2 +- .../grandpa/src/communication/periodic.rs | 2 +- .../grandpa/src/communication/tests.rs | 20 +++--- client/consensus/grandpa/src/import.rs | 11 +-- .../consensus/grandpa/src/until_imported.rs | 8 +-- client/network/src/service.rs | 68 +++++++++++-------- client/network/sync/src/service/chain_sync.rs | 20 +++--- client/network/sync/src/service/network.rs | 6 +- client/network/transactions/src/lib.rs | 4 +- client/offchain/src/api/http.rs | 11 +-- client/peerset/src/lib.rs | 18 ++--- client/rpc/src/system/mod.rs | 20 +++--- client/service/src/client/client.rs | 10 +-- client/telemetry/src/lib.rs | 4 +- client/transaction-pool/src/graph/watcher.rs | 2 +- client/transaction-pool/src/revalidation.rs | 2 +- 19 files changed, 129 insertions(+), 106 deletions(-) diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 1a9d0f9e65381..e334f2f9fb4f6 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -290,7 +290,7 @@ impl UnpinHandleInner { impl Drop for UnpinHandleInner { fn drop(&mut self) { - if let Err(err) = self.unpin_worker_sender.try_send(self.hash) { + if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) { log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err); }; } diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index 3b42d7b5b538a..6404708151f00 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -130,7 +130,9 @@ impl ImportQueueService for BasicQueueHandle { } trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len()); - let res = self.block_import_sender.try_send(worker_messages::ImportBlocks(origin, blocks)); + let res = self + .block_import_sender + .unbounded_send(worker_messages::ImportBlocks(origin, blocks)); if res.is_err() { log::error!( @@ -148,12 +150,9 @@ impl ImportQueueService for BasicQueueHandle { justifications: Justifications, ) { for justification in justifications { - let res = self.justification_sender.try_send(worker_messages::ImportJustification( - who, - hash, - number, - justification, - )); + let res = self.justification_sender.unbounded_send( + worker_messages::ImportJustification(who, hash, number, justification), + ); if res.is_err() { log::error!( @@ -614,7 +613,7 @@ mod tests { let hash = header.hash(); block_import_sender - .try_send(worker_messages::ImportBlocks( + .unbounded_send(worker_messages::ImportBlocks( BlockOrigin::Own, vec![IncomingBlock { hash, @@ -637,7 +636,7 @@ mod tests { let import_justification = || { let hash = Hash::random(); finality_sender - .try_send(worker_messages::ImportJustification( + .unbounded_send(worker_messages::ImportJustification( libp2p::PeerId::random(), hash, 1, diff --git a/client/consensus/common/src/import_queue/buffered_link.rs b/client/consensus/common/src/import_queue/buffered_link.rs index 6fcbff7ab2703..c23a4b0d5d0ab 100644 --- a/client/consensus/common/src/import_queue/buffered_link.rs +++ b/client/consensus/common/src/import_queue/buffered_link.rs @@ -97,7 +97,7 @@ impl Link for BufferedLinkSender { ) { let _ = self .tx - .try_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results)); + .unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results)); } fn justification_imported( @@ -108,11 +108,13 @@ impl Link for BufferedLinkSender { success: bool, ) { let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, success); - let _ = self.tx.try_send(msg); + let _ = self.tx.unbounded_send(msg); } fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.try_send(BlockImportWorkerMsg::RequestJustification(*hash, number)); + let _ = self + .tx + .unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number)); } } diff --git a/client/consensus/grandpa/src/communication/gossip.rs b/client/consensus/grandpa/src/communication/gossip.rs index f35b6963d555f..cf476f8e32222 100644 --- a/client/consensus/grandpa/src/communication/gossip.rs +++ b/client/consensus/grandpa/src/communication/gossip.rs @@ -1424,7 +1424,7 @@ impl GossipValidator { } fn report(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.report_sender.try_send(PeerReport { who, cost_benefit }); + let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); } pub(super) fn do_validate( diff --git a/client/consensus/grandpa/src/communication/periodic.rs b/client/consensus/grandpa/src/communication/periodic.rs index d0c441484b679..f3f7572864e5c 100644 --- a/client/consensus/grandpa/src/communication/periodic.rs +++ b/client/consensus/grandpa/src/communication/periodic.rs @@ -47,7 +47,7 @@ impl NeighborPacketSender { who: Vec, neighbor_packet: NeighborPacket>, ) { - if let Err(err) = self.0.try_send((who, neighbor_packet)) { + if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) { debug!(target: LOG_TARGET, "Failed to send neighbor packet: {:?}", err); } } diff --git a/client/consensus/grandpa/src/communication/tests.rs b/client/consensus/grandpa/src/communication/tests.rs index 604dfd95abbb9..21e2e978c8767 100644 --- a/client/consensus/grandpa/src/communication/tests.rs +++ b/client/consensus/grandpa/src/communication/tests.rs @@ -74,7 +74,7 @@ impl NetworkPeers for TestNetwork { } fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.sender.try_send(Event::Report(who, cost_benefit)); + let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); } fn disconnect_peer(&self, _who: PeerId, _protocol: ProtocolName) {} @@ -136,14 +136,14 @@ impl NetworkEventStream for TestNetwork { _name: &'static str, ) -> Pin + Send>> { let (tx, rx) = tracing_unbounded("test", 100_000); - let _ = self.sender.try_send(Event::EventStream(tx)); + let _ = self.sender.unbounded_send(Event::EventStream(tx)); Box::pin(rx) } } impl NetworkNotification for TestNetwork { fn write_notification(&self, target: PeerId, _protocol: ProtocolName, message: Vec) { - let _ = self.sender.try_send(Event::WriteNotification(target, message)); + let _ = self.sender.unbounded_send(Event::WriteNotification(target, message)); } fn notification_sender( @@ -157,7 +157,7 @@ impl NetworkNotification for TestNetwork { impl NetworkBlock> for TestNetwork { fn announce_block(&self, hash: Hash, _data: Option>) { - let _ = self.sender.try_send(Event::Announce(hash)); + let _ = self.sender.unbounded_send(Event::Announce(hash)); } fn new_best_block_imported(&self, _hash: Hash, _number: NumberFor) { @@ -365,14 +365,14 @@ fn good_commit_leads_to_relay() { let send_message = tester.filter_network_events(move |event| match event { Event::EventStream(sender) => { // Add the sending peer and send the commit - let _ = sender.try_send(NetworkEvent::NotificationStreamOpened { + let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { remote: sender_id, protocol: grandpa_protocol_name::NAME.into(), negotiated_fallback: None, role: ObservedRole::Full, }); - let _ = sender.try_send(NetworkEvent::NotificationsReceived { + let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived { remote: sender_id, messages: vec![( grandpa_protocol_name::NAME.into(), @@ -382,7 +382,7 @@ fn good_commit_leads_to_relay() { // Add a random peer which will be the recipient of this message let receiver_id = PeerId::random(); - let _ = sender.try_send(NetworkEvent::NotificationStreamOpened { + let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { remote: receiver_id, protocol: grandpa_protocol_name::NAME.into(), negotiated_fallback: None, @@ -400,7 +400,7 @@ fn good_commit_leads_to_relay() { let msg = gossip::GossipMessage::::Neighbor(update); - sender.try_send(NetworkEvent::NotificationsReceived { + sender.unbounded_send(NetworkEvent::NotificationsReceived { remote: receiver_id, messages: vec![( grandpa_protocol_name::NAME.into(), @@ -514,13 +514,13 @@ fn bad_commit_leads_to_report() { let sender_id = id; let send_message = tester.filter_network_events(move |event| match event { Event::EventStream(sender) => { - let _ = sender.try_send(NetworkEvent::NotificationStreamOpened { + let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { remote: sender_id, protocol: grandpa_protocol_name::NAME.into(), negotiated_fallback: None, role: ObservedRole::Full, }); - let _ = sender.try_send(NetworkEvent::NotificationsReceived { + let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived { remote: sender_id, messages: vec![( grandpa_protocol_name::NAME.into(), diff --git a/client/consensus/grandpa/src/import.rs b/client/consensus/grandpa/src/import.rs index c30b4572e3355..c1f6cb566dae9 100644 --- a/client/consensus/grandpa/src/import.rs +++ b/client/consensus/grandpa/src/import.rs @@ -497,7 +497,9 @@ where .map_err(|e| ConsensusError::ClientImport(e.to_string()))?; let new_set = NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities }; - let _ = self.send_voter_commands.try_send(VoterCommand::ChangeAuthorities(new_set)); + let _ = self + .send_voter_commands + .unbounded_send(VoterCommand::ChangeAuthorities(new_set)); Ok(ImportResult::Imported(aux)) }, Ok(r) => Ok(r), @@ -606,7 +608,7 @@ where // Send the pause signal after import but BEFORE sending a `ChangeAuthorities` message. if do_pause { - let _ = self.send_voter_commands.try_send(VoterCommand::Pause( + let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause( "Forced change scheduled after inactivity".to_string(), )); } @@ -626,7 +628,8 @@ where // they should import the block and discard the justification, and they will // then request a justification from sync if it's necessary (which they should // then be able to successfully validate). - let _ = self.send_voter_commands.try_send(VoterCommand::ChangeAuthorities(new)); + let _ = + self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new)); // we must clear all pending justifications requests, presumably they won't be // finalized hence why this forced changes was triggered @@ -805,7 +808,7 @@ where ); // send the command to the voter - let _ = self.send_voter_commands.try_send(command); + let _ = self.send_voter_commands.unbounded_send(command); }, Err(CommandOrError::Error(e)) => return Err(match e { diff --git a/client/consensus/grandpa/src/until_imported.rs b/client/consensus/grandpa/src/until_imported.rs index d047772abbf51..14f32ecc88366 100644 --- a/client/consensus/grandpa/src/until_imported.rs +++ b/client/consensus/grandpa/src/until_imported.rs @@ -596,7 +596,7 @@ mod tests { let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000); self.known_blocks.lock().insert(hash, number); self.sender - .try_send(BlockImportNotification::::new( + .unbounded_send(BlockImportNotification::::new( hash, BlockOrigin::File, header, @@ -692,7 +692,7 @@ mod tests { None, ); - global_tx.try_send(msg).unwrap(); + global_tx.unbounded_send(msg).unwrap(); let work = until_imported.into_future(); @@ -720,7 +720,7 @@ mod tests { None, ); - global_tx.try_send(msg).unwrap(); + global_tx.unbounded_send(msg).unwrap(); // NOTE: needs to be cloned otherwise it is moved to the stream and // dropped too early. @@ -930,7 +930,7 @@ mod tests { || voter::CommunicationIn::Commit(0, unknown_commit.clone(), voter::Callback::Blank); // we send the commit message and spawn the until_imported stream - global_tx.try_send(unknown_commit()).unwrap(); + global_tx.unbounded_send(unknown_commit()).unwrap(); let threads_pool = futures::executor::ThreadPool::new().unwrap(); threads_pool.spawn_ok(until_imported.into_future().map(|_| ())); diff --git a/client/network/src/service.rs b/client/network/src/service.rs index cf1b91eccae2b..fd0b965b77372 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -713,7 +713,7 @@ impl NetworkService { let _ = self .to_worker - .try_send(ServiceToWorkerMsg::NetworkState { pending_response: tx }); + .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx }); match rx.await { Ok(v) => v.map_err(|_| ()), @@ -730,7 +730,7 @@ impl NetworkService { let _ = self .to_worker - .try_send(ServiceToWorkerMsg::PeersDebugInfo { pending_response: tx }); + .unbounded_send(ServiceToWorkerMsg::PeersDebugInfo { pending_response: tx }); // The channel can only be closed if the network worker no longer exists. rx.await.map_err(|_| ()) @@ -744,7 +744,7 @@ impl NetworkService { let _ = self .to_worker - .try_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx }); + .unbounded_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx }); // The channel can only be closed if the network worker no longer exists. rx.await.map_err(|_| ()) @@ -844,7 +844,7 @@ where /// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an /// item on the [`NetworkWorker`] stream. fn get_value(&self, key: &KademliaKey) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::GetValue(key.clone())); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone())); } /// Start putting a value in the DHT. @@ -852,7 +852,7 @@ where /// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an /// item on the [`NetworkWorker`] stream. fn put_value(&self, key: KademliaKey, value: Vec) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::PutValue(key, value)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value)); } } @@ -883,7 +883,7 @@ where let _ = self .to_worker - .try_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx }); + .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx }); match rx.await { Ok(v) => v.map_err(|_| ()), @@ -899,15 +899,19 @@ where H: ExHashT, { fn set_authorized_peers(&self, peers: HashSet) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReserved(peers)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReserved(peers)); } fn set_authorized_only(&self, reserved_only: bool) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReservedOnly(reserved_only)); + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::SetReservedOnly(reserved_only)); } fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { @@ -915,15 +919,15 @@ where } fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::DisconnectPeer(who, protocol)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(who, protocol)); } fn accept_unreserved_peers(&self) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReservedOnly(false)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(false)); } fn deny_unreserved_peers(&self) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetReservedOnly(true)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(true)); } fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> { @@ -934,13 +938,13 @@ where let _ = self .to_worker - .try_send(ServiceToWorkerMsg::AddKnownAddress(peer.peer_id, peer.multiaddr)); - let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddReserved(peer.peer_id)); + .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer.peer_id, peer.multiaddr)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddReserved(peer.peer_id)); Ok(()) } fn remove_reserved_peer(&self, peer_id: PeerId) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::RemoveReserved(peer_id)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RemoveReserved(peer_id)); } fn set_reserved_peers( @@ -961,11 +965,15 @@ where peers.insert(peer_id); if !addr.is_empty() { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } } - let _ = self.to_worker.try_send(ServiceToWorkerMsg::SetPeersetReserved(protocol, peers)); + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::SetPeersetReserved(protocol, peers)); Ok(()) } @@ -984,11 +992,13 @@ where } if !addr.is_empty() { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } let _ = self .to_worker - .try_send(ServiceToWorkerMsg::AddSetReserved(protocol.clone(), peer_id)); + .unbounded_send(ServiceToWorkerMsg::AddSetReserved(protocol.clone(), peer_id)); } Ok(()) @@ -998,7 +1008,7 @@ where for peer_id in peers.into_iter() { let _ = self .to_worker - .try_send(ServiceToWorkerMsg::RemoveSetReserved(protocol.clone(), peer_id)); + .unbounded_send(ServiceToWorkerMsg::RemoveSetReserved(protocol.clone(), peer_id)); } } @@ -1016,11 +1026,13 @@ where } if !addr.is_empty() { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } let _ = self .to_worker - .try_send(ServiceToWorkerMsg::AddToPeersSet(protocol.clone(), peer_id)); + .unbounded_send(ServiceToWorkerMsg::AddToPeersSet(protocol.clone(), peer_id)); } Ok(()) @@ -1030,7 +1042,7 @@ where for peer_id in peers.into_iter() { let _ = self .to_worker - .try_send(ServiceToWorkerMsg::RemoveFromPeersSet(protocol.clone(), peer_id)); + .unbounded_send(ServiceToWorkerMsg::RemoveFromPeersSet(protocol.clone(), peer_id)); } } @@ -1046,7 +1058,7 @@ where { fn event_stream(&self, name: &'static str) -> Pin + Send>> { let (tx, rx) = out_events::channel(name, 100_000); - let _ = self.to_worker.try_send(ServiceToWorkerMsg::EventStream(tx)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); Box::pin(rx) } } @@ -1149,7 +1161,7 @@ where tx: oneshot::Sender, RequestFailure>>, connect: IfDisconnected, ) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::Request { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { target, protocol: protocol.into(), request, @@ -1165,11 +1177,13 @@ where H: ExHashT, { fn announce_block(&self, hash: B::Hash, data: Option>) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::AnnounceBlock(hash, data)); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data)); } fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor) { - let _ = self.to_worker.try_send(ServiceToWorkerMsg::NewBestBlockImported(hash, number)); + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::NewBestBlockImported(hash, number)); } } diff --git a/client/network/sync/src/service/chain_sync.rs b/client/network/sync/src/service/chain_sync.rs index 84ae89b1b21e2..4d47899a31727 100644 --- a/client/network/sync/src/service/chain_sync.rs +++ b/client/network/sync/src/service/chain_sync.rs @@ -55,7 +55,7 @@ impl ChainSyncInterfaceHandle { /// Notify ChainSync about finalized block pub fn on_block_finalized(&self, hash: B::Hash, number: NumberFor) { - let _ = self.tx.try_send(ToServiceCommand::BlockFinalized(hash, number)); + let _ = self.tx.unbounded_send(ToServiceCommand::BlockFinalized(hash, number)); } /// Get sync status @@ -63,7 +63,7 @@ impl ChainSyncInterfaceHandle { /// Returns an error if `ChainSync` has terminated. pub async fn status(&self) -> Result, ()> { let (tx, rx) = oneshot::channel(); - let _ = self.tx.try_send(ToServiceCommand::Status { pending_response: tx }); + let _ = self.tx.unbounded_send(ToServiceCommand::Status { pending_response: tx }); rx.await.map_err(|_| ()) } @@ -81,7 +81,9 @@ impl NetworkSyncForkRequest> /// /// Passing empty `peers` set effectively removes the sync request. fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - let _ = self.tx.try_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number)); + let _ = self + .tx + .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number)); } } @@ -91,11 +93,11 @@ impl JustificationSyncLink for ChainSyncInterfaceHandle { /// On success, the justification will be passed to the import queue that was part at /// initialization as part of the configuration. fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.try_send(ToServiceCommand::RequestJustification(*hash, number)); + let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number)); } fn clear_justification_requests(&self) { - let _ = self.tx.try_send(ToServiceCommand::ClearJustificationRequests); + let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests); } } @@ -106,7 +108,9 @@ impl Link for ChainSyncInterfaceHandle { count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, ) { - let _ = self.tx.try_send(ToServiceCommand::BlocksProcessed(imported, count, results)); + let _ = self + .tx + .unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results)); } fn justification_imported( @@ -118,10 +122,10 @@ impl Link for ChainSyncInterfaceHandle { ) { let _ = self .tx - .try_send(ToServiceCommand::JustificationImported(who, *hash, number, success)); + .unbounded_send(ToServiceCommand::JustificationImported(who, *hash, number, success)); } fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.tx.try_send(ToServiceCommand::RequestJustification(*hash, number)); + let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number)); } } diff --git a/client/network/sync/src/service/network.rs b/client/network/sync/src/service/network.rs index 8495eea6ba704..de7e255f5a895 100644 --- a/client/network/sync/src/service/network.rs +++ b/client/network/sync/src/service/network.rs @@ -73,12 +73,12 @@ impl NetworkServiceHandle { /// Report peer pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.tx.try_send(ToServiceCommand::ReportPeer(who, cost_benefit)); + let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit)); } /// Disconnect peer pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { - let _ = self.tx.try_send(ToServiceCommand::DisconnectPeer(who, protocol)); + let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol)); } /// Send request to peer @@ -92,7 +92,7 @@ impl NetworkServiceHandle { ) { let _ = self .tx - .try_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect)); + .unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect)); } } diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index 2a5590bc8274e..d4d08d2ab7545 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -207,7 +207,7 @@ impl TransactionsHandlerController { /// All transactions will be fetched from the `TransactionPool` that was passed at /// initialization as part of the configuration and propagated to peers. pub fn propagate_transactions(&self) { - let _ = self.to_handler.try_send(ToHandler::PropagateTransactions); + let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions); } /// You must call when new a transaction is imported by the transaction pool. @@ -215,7 +215,7 @@ impl TransactionsHandlerController { /// This transaction will be fetched from the `TransactionPool` that was passed at /// initialization as part of the configuration and propagated to peers. pub fn propagate_transaction(&self, hash: H) { - let _ = self.to_handler.try_send(ToHandler::PropagateTransaction(hash)); + let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash)); } } diff --git a/client/offchain/src/api/http.rs b/client/offchain/src/api/http.rs index dd3a949f953c4..e3872614eae4d 100644 --- a/client/offchain/src/api/http.rs +++ b/client/offchain/src/api/http.rs @@ -237,8 +237,9 @@ impl HttpApi { HttpApiRequest::NotDispatched(request, sender) => { tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Added new body chunk"); // If the request is not dispatched yet, dispatch it and loop again. - let _ = - self.to_worker.try_send(ApiToWorker::Dispatch { id: request_id, request }); + let _ = self + .to_worker + .unbounded_send(ApiToWorker::Dispatch { id: request_id, request }); HttpApiRequest::Dispatched(Some(sender)) }, @@ -347,7 +348,7 @@ impl HttpApi { _ => unreachable!("we checked for NotDispatched above; qed"), }; - let _ = self.to_worker.try_send(ApiToWorker::Dispatch { id: *id, request }); + let _ = self.to_worker.unbounded_send(ApiToWorker::Dispatch { id: *id, request }); // We also destroy the sender in order to forbid writing more data. self.requests.insert(*id, HttpApiRequest::Dispatched(None)); @@ -664,7 +665,7 @@ impl Future for HttpWorker { }, Poll::Ready(Ok(response)) => response, Poll::Ready(Err(error)) => { - let _ = me.to_api.try_send(WorkerToApi::Fail { id, error }); + let _ = me.to_api.unbounded_send(WorkerToApi::Fail { id, error }); continue // don't insert the request back }, }; @@ -674,7 +675,7 @@ impl Future for HttpWorker { let (status_code, headers) = (head.status, head.headers); let (body_tx, body_rx) = mpsc::channel(3); - let _ = me.to_api.try_send(WorkerToApi::Response { + let _ = me.to_api.unbounded_send(WorkerToApi::Response { id, status_code, headers, diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index 9706c20df5e06..e5393acbaa32f 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -131,47 +131,47 @@ impl PeersetHandle { /// > **Note**: Keep in mind that the networking has to know an address for this node, /// > otherwise it will not be able to connect to it. pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.try_send(Action::AddReservedPeer(set_id, peer_id)); + let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id)); } /// Remove a previously-added reserved peer. /// /// Has no effect if the node was not a reserved peer. pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.try_send(Action::RemoveReservedPeer(set_id, peer_id)); + let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id)); } /// Sets whether or not the peerset only has connections with nodes marked as reserved for /// the given set. pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) { - let _ = self.tx.try_send(Action::SetReservedOnly(set_id, reserved)); + let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved)); } /// Set reserved peers to the new set. pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet) { - let _ = self.tx.try_send(Action::SetReservedPeers(set_id, peer_ids)); + let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids)); } /// Reports an adjustment to the reputation of the given peer. pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) { - let _ = self.tx.try_send(Action::ReportPeer(peer_id, score_diff)); + let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); } /// Add a peer to a set. pub fn add_to_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.try_send(Action::AddToPeersSet(set_id, peer_id)); + let _ = self.tx.unbounded_send(Action::AddToPeersSet(set_id, peer_id)); } /// Remove a peer from a set. pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.try_send(Action::RemoveFromPeersSet(set_id, peer_id)); + let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id)); } /// Returns the reputation value of the peer. pub async fn peer_reputation(self, peer_id: PeerId) -> Result { let (tx, rx) = oneshot::channel(); - let _ = self.tx.try_send(Action::PeerReputation(peer_id, tx)); + let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx)); // The channel can only be closed if the peerset no longer exists. rx.await.map_err(|_| ()) @@ -678,7 +678,7 @@ impl Peerset { // We don't immediately perform the adjustments in order to have state consistency. We // don't want the reporting here to take priority over messages sent using the // `PeersetHandle`. - let _ = self.tx.try_send(Action::ReportPeer(peer_id, score_diff)); + let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); } /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. diff --git a/client/rpc/src/system/mod.rs b/client/rpc/src/system/mod.rs index 6643b52e9905d..0da4f8d0e211c 100644 --- a/client/rpc/src/system/mod.rs +++ b/client/rpc/src/system/mod.rs @@ -106,19 +106,19 @@ impl SystemApiServer::Number> async fn system_health(&self) -> RpcResult { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::Health(tx)); + let _ = self.send_back.unbounded_send(Request::Health(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_local_peer_id(&self) -> RpcResult { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::LocalPeerId(tx)); + let _ = self.send_back.unbounded_send(Request::LocalPeerId(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_local_listen_addresses(&self) -> RpcResult> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::LocalListenAddresses(tx)); + let _ = self.send_back.unbounded_send(Request::LocalListenAddresses(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } @@ -127,21 +127,21 @@ impl SystemApiServer::Number> ) -> RpcResult::Number>>> { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::Peers(tx)); + let _ = self.send_back.unbounded_send(Request::Peers(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_network_state(&self) -> RpcResult { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::NetworkState(tx)); + let _ = self.send_back.unbounded_send(Request::NetworkState(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_add_reserved_peer(&self, peer: String) -> RpcResult<()> { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::NetworkAddReservedPeer(peer, tx)); + let _ = self.send_back.unbounded_send(Request::NetworkAddReservedPeer(peer, tx)); match rx.await { Ok(Ok(())) => Ok(()), Ok(Err(e)) => Err(JsonRpseeError::from(e)), @@ -152,7 +152,7 @@ impl SystemApiServer::Number> async fn system_remove_reserved_peer(&self, peer: String) -> RpcResult<()> { self.deny_unsafe.check_if_safe()?; let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::NetworkRemoveReservedPeer(peer, tx)); + let _ = self.send_back.unbounded_send(Request::NetworkRemoveReservedPeer(peer, tx)); match rx.await { Ok(Ok(())) => Ok(()), Ok(Err(e)) => Err(JsonRpseeError::from(e)), @@ -162,19 +162,19 @@ impl SystemApiServer::Number> async fn system_reserved_peers(&self) -> RpcResult> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::NetworkReservedPeers(tx)); + let _ = self.send_back.unbounded_send(Request::NetworkReservedPeers(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_node_roles(&self) -> RpcResult> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::NodeRoles(tx)); + let _ = self.send_back.unbounded_send(Request::NodeRoles(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } async fn system_sync_state(&self) -> RpcResult::Number>> { let (tx, rx) = oneshot::channel(); - let _ = self.send_back.try_send(Request::SyncState(tx)); + let _ = self.send_back.unbounded_send(Request::SyncState(tx)); rx.await.map_err(|e| JsonRpseeError::to_call_error(e)) } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index a2ce5229619e2..4b5822ae0e017 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1024,7 +1024,7 @@ where "best" => ?notification.hash, ); - sinks.retain(|sink| sink.try_send(notification.clone()).is_ok()); + sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); Ok(()) } @@ -1068,24 +1068,24 @@ where trigger_storage_changes_notification(); self.import_notification_sinks .lock() - .retain(|sink| sink.try_send(notification.clone()).is_ok()); + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); self.every_import_notification_sinks .lock() - .retain(|sink| sink.try_send(notification.clone()).is_ok()); + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); }, ImportNotificationAction::RecentBlock => { trigger_storage_changes_notification(); self.import_notification_sinks .lock() - .retain(|sink| sink.try_send(notification.clone()).is_ok()); + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); }, ImportNotificationAction::EveryBlock => { self.every_import_notification_sinks .lock() - .retain(|sink| sink.try_send(notification.clone()).is_ok()); + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); }, diff --git a/client/telemetry/src/lib.rs b/client/telemetry/src/lib.rs index 9487adbe53435..113d8303a20f6 100644 --- a/client/telemetry/src/lib.rs +++ b/client/telemetry/src/lib.rs @@ -409,7 +409,7 @@ impl Telemetry { let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?; self.register_sender - .try_send(Register::Telemetry { id: self.id, endpoints, connection_message }) + .unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message }) .map_err(|_| Error::TelemetryWorkerDropped) } @@ -469,7 +469,7 @@ pub struct TelemetryConnectionNotifier { impl TelemetryConnectionNotifier { fn on_connect_stream(&self) -> ConnectionNotifierReceiver { let (message_sender, message_receiver) = connection_notifier_channel(); - if let Err(err) = self.register_sender.try_send(Register::Notifier { + if let Err(err) = self.register_sender.unbounded_send(Register::Notifier { addresses: self.addresses.clone(), connection_notifier: message_sender, }) { diff --git a/client/transaction-pool/src/graph/watcher.rs b/client/transaction-pool/src/graph/watcher.rs index 886f7aac0222f..fc440771d7bbc 100644 --- a/client/transaction-pool/src/graph/watcher.rs +++ b/client/transaction-pool/src/graph/watcher.rs @@ -129,6 +129,6 @@ impl Sender { } fn send(&mut self, status: TransactionStatus) { - self.receivers.retain(|sender| sender.try_send(status.clone()).is_ok()) + self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok()) } } diff --git a/client/transaction-pool/src/revalidation.rs b/client/transaction-pool/src/revalidation.rs index 2b7c9c77590ba..bd8f3dd6498f3 100644 --- a/client/transaction-pool/src/revalidation.rs +++ b/client/transaction-pool/src/revalidation.rs @@ -340,7 +340,7 @@ where } if let Some(ref to_worker) = self.background { - if let Err(e) = to_worker.try_send(WorkerPayload { at, transactions }) { + if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) { log::warn!(target: LOG_TARGET, "Failed to update background worker: {:?}", e); } } else { From 2e3f80b4aa6c12eacbdbf5d52915cd4be5a44ce1 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 2 Mar 2023 12:57:12 +0200 Subject: [PATCH 5/8] Use `Receiver::len` for counting of dropped messages --- Cargo.lock | 1 + client/utils/Cargo.toml | 1 + client/utils/src/mpsc.rs | 19 +++++-------------- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e75e98984a71..b7e526daa88fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9312,6 +9312,7 @@ dependencies = [ "log", "parking_lot 0.12.1", "prometheus", + "sp-arithmetic", "tokio-test", ] diff --git a/client/utils/Cargo.toml b/client/utils/Cargo.toml index 5f1fabeb59dd2..38484285d3065 100644 --- a/client/utils/Cargo.toml +++ b/client/utils/Cargo.toml @@ -17,6 +17,7 @@ lazy_static = "1.4.0" log = "0.4" parking_lot = "0.12.1" prometheus = { version = "0.13.0", default-features = false } +sp-arithmetic = { version = "6.0.0", default-features = false, path = "../../primitives/arithmetic" } [features] default = ["metered"] diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index b82789f0ad95f..86ef12c27521f 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -63,6 +63,7 @@ mod inner { task::{Context, Poll}, }; use log::error; + use sp_arithmetic::traits::SaturatedConversion; use std::{ backtrace::Backtrace, pin::Pin, @@ -165,23 +166,13 @@ mod inner { impl TracingUnboundedReceiver { fn consume(&mut self) { - // consume all items, make sure to reflect the updated count - let mut count = 0; - loop { - if self.inner.is_terminated() { - break - } - - match self.try_recv() { - Ok(_) => count += 1, - _ => break, - } - } - // and discount the messages + // the number of messages about to be dropped + let count = self.inner.len(); + // discount the messages if count > 0 { UNBOUNDED_CHANNELS_COUNTER .with_label_values(&[self.name, "dropped"]) - .inc_by(count); + .inc_by(count.saturated_into()); } } From 3739c1a143286dfcf23a7195d0898a164d4511df Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 6 Mar 2023 12:26:43 +0200 Subject: [PATCH 6/8] Remove "metered" feature flag --- client/utils/README.md | 17 +- client/utils/src/lib.rs | 14 +- client/utils/src/metrics.rs | 4 - client/utils/src/mpsc.rs | 325 ++++++++++++++++-------------------- 4 files changed, 152 insertions(+), 208 deletions(-) diff --git a/client/utils/README.md b/client/utils/README.md index 2da70f09ccbc5..d20fe69efc5ac 100644 --- a/client/utils/README.md +++ b/client/utils/README.md @@ -1,16 +1,11 @@ -Utilities Primitives for Substrate +# Utilities Primitives for Substrate -## Features +This crate provides `mpsc::tracing_unbounded` function that returns wrapper types to +`async_channel::Sender` and `async_channel::Receiver`, which register every +`send`/`received`/`dropped` action happened on the channel. -### metered - -This feature changes the behaviour of the function `mpsc::tracing_unbounded`. -With the disabled feature this function is an alias to `futures::channel::mpsc::unbounded`. -However, when the feature is enabled it creates wrapper types to `UnboundedSender` -and `UnboundedReceiver` to register every `send`/`received`/`dropped` action happened on -the channel. - -Also this feature creates and registers a prometheus vector with name `unbounded_channel_len` and labels: +Also this wrapper creates and registers a prometheus vector with name `unbounded_channel_len` +and labels: | Label | Description | | ------------ | --------------------------------------------- | diff --git a/client/utils/src/lib.rs b/client/utils/src/lib.rs index f0cc1efe6111c..017fc76207d27 100644 --- a/client/utils/src/lib.rs +++ b/client/utils/src/lib.rs @@ -18,17 +18,11 @@ //! Utilities Primitives for Substrate //! -//! # Features +//! This crate provides `mpsc::tracing_unbounded` function that returns wrapper types to +//! `async_channel::Sender` and `async_channel::Receiver`, which register every +//! `send`/`received`/`dropped` action happened on the channel. //! -//! ## metered -//! -//! This feature changes the behaviour of the function `mpsc::tracing_unbounded`. -//! With the disabled feature this function is an alias to `futures::channel::mpsc::unbounded`. -//! However, when the feature is enabled it creates wrapper types to `UnboundedSender` -//! and `UnboundedReceiver` to register every `send`/`received`/`dropped` action happened on -//! the channel. -//! -//! Also this feature creates and registers a prometheus vector with name `unbounded_channel_len` +//! Also this wrapper creates and registers a prometheus vector with name `unbounded_channel_len` //! and labels: //! //! | Label | Description | diff --git a/client/utils/src/metrics.rs b/client/utils/src/metrics.rs index c2b10100f229f..6bbdbe2e2e599 100644 --- a/client/utils/src/metrics.rs +++ b/client/utils/src/metrics.rs @@ -24,7 +24,6 @@ use prometheus::{ Error as PrometheusError, Registry, }; -#[cfg(feature = "metered")] use prometheus::{core::GenericCounterVec, Opts}; lazy_static! { @@ -36,7 +35,6 @@ lazy_static! { .expect("Creating of statics doesn't fail. qed"); } -#[cfg(feature = "metered")] lazy_static! { pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec = GenericCounterVec::new( Opts::new("substrate_unbounded_channel_len", "Items in each mpsc::unbounded instance"), @@ -49,8 +47,6 @@ lazy_static! { pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> { registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?; registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?; - - #[cfg(feature = "metered")] registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?; Ok(()) diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index 86ef12c27521f..722d93852c904 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -16,213 +16,172 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! Features to meter unbounded channels - -#[cfg(not(feature = "metered"))] -mod inner { - /// Simple wrapper around [`async_channel::Sender`], no performance implications. - pub struct TracingUnboundedSender(async_channel::Sender); - - /// Just alias for [`async_channel::Receiver`], no performance implications. - pub type TracingUnboundedReceiver = async_channel::Receiver; - - /// Proxy to [`async_channel::unbounded`]. - pub fn tracing_unbounded( - _key: &'static str, - ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { - let (tx, rx) = async_channel::unbounded(); - (TracingUnboundedSender(tx), rx) - } - - // The only reason this impl is needed is because we rename `try_send` -> `unbounded_send` - impl TracingUnboundedSender { - /// Proxy function to [`async_channel::Sender`]. - pub fn is_closed(&self) -> bool { - self.0.is_closed() - } - - /// Proxy function to [`async_channel::Sender`]. - pub fn close(&self) -> bool { - self.0.close() - } +//! Code to meter unbounded channels. + +use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; +use async_channel::{Receiver, Sender, TryRecvError, TrySendError}; +use futures::{ + stream::{FusedStream, Stream}, + task::{Context, Poll}, +}; +use log::error; +use sp_arithmetic::traits::SaturatedConversion; +use std::{ + backtrace::Backtrace, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +/// Wrapper Type around [`async_channel::Sender`] that increases the global +/// measure when a message is added. +#[derive(Debug)] +pub struct TracingUnboundedSender { + inner: Sender, + name: &'static str, + queue_size_warning: usize, + warning_fired: Arc, + creation_backtrace: Arc, +} - /// Proxy function to `async_channel::Sender::try_send`. - pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { - self.0.try_send(msg) +// Strangely, deriving `Clone` requires that `T` is also `Clone`. +impl Clone for TracingUnboundedSender { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + name: self.name, + queue_size_warning: self.queue_size_warning, + warning_fired: self.warning_fired.clone(), + creation_backtrace: self.creation_backtrace.clone(), } } } -#[cfg(feature = "metered")] -mod inner { - // tracing implementation - use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; - use async_channel::{Receiver, Sender, TryRecvError, TrySendError}; - use futures::{ - stream::{FusedStream, Stream}, - task::{Context, Poll}, - }; - use log::error; - use sp_arithmetic::traits::SaturatedConversion; - use std::{ - backtrace::Backtrace, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, +/// Wrapper Type around [`async_channel::Receiver`] that decreases the global +/// measure when a message is polled. +#[derive(Debug)] +pub struct TracingUnboundedReceiver { + inner: Receiver, + name: &'static str, +} + +/// Wrapper around [`async_channel::unbounded`] that tracks the in- and outflow via +/// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows +/// above the warning threshold. +pub fn tracing_unbounded( + name: &'static str, + queue_size_warning: usize, +) -> (TracingUnboundedSender, TracingUnboundedReceiver) { + let (s, r) = async_channel::unbounded(); + let sender = TracingUnboundedSender { + inner: s, + name, + queue_size_warning, + warning_fired: Arc::new(AtomicBool::new(false)), + creation_backtrace: Arc::new(Backtrace::force_capture()), }; + let receiver = TracingUnboundedReceiver { inner: r, name }; + (sender, receiver) +} - /// Wrapper Type around [`async_channel::Sender`] that increases the global - /// measure when a message is added. - #[derive(Debug)] - pub struct TracingUnboundedSender { - inner: Sender, - name: &'static str, - queue_size_warning: usize, - warning_fired: Arc, - creation_backtrace: Arc, +impl TracingUnboundedSender { + /// Proxy function to [`async_channel::Sender`]. + pub fn is_closed(&self) -> bool { + self.inner.is_closed() } - // Strangely, deriving `Clone` requires that `T` is also `Clone`. - impl Clone for TracingUnboundedSender { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - name: self.name, - queue_size_warning: self.queue_size_warning, - warning_fired: self.warning_fired.clone(), - creation_backtrace: self.creation_backtrace.clone(), - } - } + /// Proxy function to [`async_channel::Sender`]. + pub fn close(&self) -> bool { + self.inner.close() } - /// Wrapper Type around [`async_channel::Receiver`] that decreases the global - /// measure when a message is polled. - #[derive(Debug)] - pub struct TracingUnboundedReceiver { - inner: Receiver, - name: &'static str, - } + /// Proxy function to `async_channel::Sender::try_send`. + pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { + self.inner.try_send(msg).map(|s| { + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); + + if self.inner.len() >= self.queue_size_warning && + self.warning_fired + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + // `warning_fired` and `len()` are not synchronized, so it's possible + // that the warning is fired few times before the `warning_fired` is seen + // by all threads. This seems better than introducing a mutex guarding them. + error!( + "The number of unprocessed messages in channel `{}` exceeded {}.\n\ + The channel was created at:\n{}\n + Last message was sent from:\n{}", + self.name, + self.queue_size_warning, + self.creation_backtrace, + Backtrace::force_capture(), + ); + } - /// Wrapper around [`async_channel::unbounded`] that tracks the in- and outflow via - /// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows - /// above the warning threshold. - pub fn tracing_unbounded( - name: &'static str, - queue_size_warning: usize, - ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { - let (s, r) = async_channel::unbounded(); - let sender = TracingUnboundedSender { - inner: s, - name, - queue_size_warning, - warning_fired: Arc::new(AtomicBool::new(false)), - creation_backtrace: Arc::new(Backtrace::force_capture()), - }; - let receiver = TracingUnboundedReceiver { inner: r, name }; - (sender, receiver) + s + }) } +} - impl TracingUnboundedSender { - /// Proxy function to [`async_channel::Sender`]. - pub fn is_closed(&self) -> bool { - self.inner.is_closed() - } - - /// Proxy function to [`async_channel::Sender`]. - pub fn close(&self) -> bool { - self.inner.close() - } - - /// Proxy function to `async_channel::Sender::try_send`. - pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { - self.inner.try_send(msg).map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); - - if self.inner.len() >= self.queue_size_warning && - self.warning_fired - .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - // `warning_fired` and `len()` are not synchronized, so it's possible - // that the warning is fired few times before the `warning_fired` is seen - // by all threads. This seems better than introducing a mutex guarding them. - error!( - "The number of unprocessed messages in channel `{}` exceeded {}.\n\ - The channel was created at:\n{}\n - Last message was sent from:\n{}", - self.name, - self.queue_size_warning, - self.creation_backtrace, - Backtrace::force_capture(), - ); - } - - s - }) +impl TracingUnboundedReceiver { + fn consume(&mut self) { + // the number of messages about to be dropped + let count = self.inner.len(); + // discount the messages + if count > 0 { + UNBOUNDED_CHANNELS_COUNTER + .with_label_values(&[self.name, "dropped"]) + .inc_by(count.saturated_into()); } } - impl TracingUnboundedReceiver { - fn consume(&mut self) { - // the number of messages about to be dropped - let count = self.inner.len(); - // discount the messages - if count > 0 { - UNBOUNDED_CHANNELS_COUNTER - .with_label_values(&[self.name, "dropped"]) - .inc_by(count.saturated_into()); - } - } - - /// Proxy function to [`async_channel::Receiver`] - /// that consumes all messages first and updates the counter. - pub fn close(&mut self) -> bool { - self.consume(); - self.inner.close() - } - - /// Proxy function to [`async_channel::Receiver`] - /// that discounts the messages taken out. - pub fn try_recv(&mut self) -> Result { - self.inner.try_recv().map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); - s - }) - } + /// Proxy function to [`async_channel::Receiver`] + /// that consumes all messages first and updates the counter. + pub fn close(&mut self) -> bool { + self.consume(); + self.inner.close() } - impl Drop for TracingUnboundedReceiver { - fn drop(&mut self) { - self.consume(); - } + /// Proxy function to [`async_channel::Receiver`] + /// that discounts the messages taken out. + pub fn try_recv(&mut self) -> Result { + self.inner.try_recv().map(|s| { + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); + s + }) } +} - impl Unpin for TracingUnboundedReceiver {} - - impl Stream for TracingUnboundedReceiver { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let s = self.get_mut(); - match Pin::new(&mut s.inner).poll_next(cx) { - Poll::Ready(msg) => { - if msg.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); - } - Poll::Ready(msg) - }, - Poll::Pending => Poll::Pending, - } - } +impl Drop for TracingUnboundedReceiver { + fn drop(&mut self) { + self.consume(); } +} + +impl Unpin for TracingUnboundedReceiver {} - impl FusedStream for TracingUnboundedReceiver { - fn is_terminated(&self) -> bool { - self.inner.is_terminated() +impl Stream for TracingUnboundedReceiver { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = self.get_mut(); + match Pin::new(&mut s.inner).poll_next(cx) { + Poll::Ready(msg) => { + if msg.is_some() { + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); + } + Poll::Ready(msg) + }, + Poll::Pending => Poll::Pending, } } } -pub use inner::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +impl FusedStream for TracingUnboundedReceiver { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} From 87b27343986848bcd5d2d08381524f2d10cbc5e9 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 6 Mar 2023 14:35:32 +0200 Subject: [PATCH 7/8] Apply code review suggestions --- client/utils/src/mpsc.rs | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index 722d93852c904..62285214a05f4 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -107,9 +107,6 @@ impl TracingUnboundedSender { .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - // `warning_fired` and `len()` are not synchronized, so it's possible - // that the warning is fired few times before the `warning_fired` is seen - // by all threads. This seems better than introducing a mutex guarding them. error!( "The number of unprocessed messages in channel `{}` exceeded {}.\n\ The channel was created at:\n{}\n @@ -127,21 +124,8 @@ impl TracingUnboundedSender { } impl TracingUnboundedReceiver { - fn consume(&mut self) { - // the number of messages about to be dropped - let count = self.inner.len(); - // discount the messages - if count > 0 { - UNBOUNDED_CHANNELS_COUNTER - .with_label_values(&[self.name, "dropped"]) - .inc_by(count.saturated_into()); - } - } - - /// Proxy function to [`async_channel::Receiver`] - /// that consumes all messages first and updates the counter. + /// Proxy function to [`async_channel::Receiver`]. pub fn close(&mut self) -> bool { - self.consume(); self.inner.close() } @@ -157,7 +141,15 @@ impl TracingUnboundedReceiver { impl Drop for TracingUnboundedReceiver { fn drop(&mut self) { - self.consume(); + self.close(); + // the number of messages about to be dropped + let count = self.inner.len(); + // discount the messages + if count > 0 { + UNBOUNDED_CHANNELS_COUNTER + .with_label_values(&[self.name, "dropped"]) + .inc_by(count.saturated_into()); + } } } From 6696919b5a66fa1bbe818e96edb1434bf972cf9d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 6 Mar 2023 15:53:12 +0200 Subject: [PATCH 8/8] minor: formatting --- client/utils/src/mpsc.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index 62285214a05f4..3f783b10060bd 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -109,8 +109,8 @@ impl TracingUnboundedSender { { error!( "The number of unprocessed messages in channel `{}` exceeded {}.\n\ - The channel was created at:\n{}\n - Last message was sent from:\n{}", + The channel was created at:\n{}\n + Last message was sent from:\n{}", self.name, self.queue_size_warning, self.creation_backtrace,