From 4ec695d5eacf845694eda446180dcfca3b4d42c9 Mon Sep 17 00:00:00 2001 From: David Craven Date: Fri, 13 Nov 2020 19:26:02 +0100 Subject: [PATCH 1/4] Update tomls. --- Cargo.toml | 9 ++------- core/Cargo.toml | 2 +- muxers/mplex/Cargo.toml | 2 +- protocols/deflate/Cargo.toml | 2 +- protocols/identify/Cargo.toml | 3 +-- protocols/noise/Cargo.toml | 3 +-- protocols/ping/Cargo.toml | 2 +- protocols/request-response/Cargo.toml | 2 +- protocols/secio/Cargo.toml | 2 +- transports/websocket/Cargo.toml | 2 +- 10 files changed, 11 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 04f5dafdb88..42eeb01c343 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ default = [ "pnet", "request-response", "secp256k1", - "tcp-async-std", + "tcp", "uds", "wasm-ext", "websocket", @@ -44,8 +44,7 @@ ping = ["libp2p-ping"] plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] request-response = ["libp2p-request-response"] -tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"] -tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] +tcp = ["libp2p-tcp"] uds = ["libp2p-uds"] wasm-ext = ["libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext/websocket"] @@ -121,7 +120,3 @@ members = [ "transports/websocket", "transports/wasm-ext" ] - -[[example]] -name = "chat-tokio" -required-features = ["tcp-tokio", "mdns"] diff --git a/core/Cargo.toml b/core/Cargo.toml index ce2aba164be..e73c71ea742 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -43,7 +43,7 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal async-std = "1.6.2" libp2p-mplex = { path = "../muxers/mplex" } libp2p-noise = { path = "../protocols/noise" } -libp2p-tcp = { path = "../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../transports/tcp" } multihash = { version = "0.13", default-features = false, features = ["arb"] } quickcheck = "0.9.0" wasm-timer = "0.2" diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 40b3d2ae963..eff1726e3d9 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -26,7 +26,7 @@ async-std = "1.7.0" criterion = "0.3" env_logger = "0.8" futures = "0.3" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } libp2p-plaintext = { path = "../../protocols/plaintext" } quickcheck = "0.9" rand = "0.7" diff --git a/protocols/deflate/Cargo.toml b/protocols/deflate/Cargo.toml index b960433c411..864d14975fe 100644 --- a/protocols/deflate/Cargo.toml +++ b/protocols/deflate/Cargo.toml @@ -16,6 +16,6 @@ flate2 = "1.0" [dev-dependencies] async-std = "1.6.2" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } quickcheck = "0.9" rand = "0.7" diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 3606a3cb87b..7af1d91f7a3 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -22,8 +22,7 @@ wasm-timer = "0.2" async-std = "1.6.2" libp2p-mplex = { path = "../../muxers/mplex" } libp2p-noise = { path = "../../protocols/noise" } -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } [build-dependencies] prost-build = "0.6" - diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index 59b6f9eac7b..a9a8bc210f3 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -29,10 +29,9 @@ snow = { version = "0.7.1", features = ["default-resolver"], default-features = [dev-dependencies] env_logger = "0.8.1" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } quickcheck = "0.9.0" sodiumoxide = "0.2.5" [build-dependencies] prost-build = "0.6" - diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 3aa96f74857..29a40558fe6 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -20,7 +20,7 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } libp2p-noise = { path = "../../protocols/noise" } libp2p-yamux = { path = "../../muxers/yamux" } libp2p-mplex = { path = "../../muxers/mplex" } diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 89d97a6f0ec..2b5a86fe10e 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -26,6 +26,6 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" libp2p-noise = { path = "../noise" } -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } libp2p-yamux = { path = "../../muxers/yamux" } rand = "0.7" diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index fef7ae16f55..66b64d8b53f 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -52,4 +52,4 @@ aes-all = ["aesni"] async-std = "1.6.2" criterion = "0.3" libp2p-mplex = { path = "../../muxers/mplex" } -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index dfd1a471270..08a9da3f66e 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -24,4 +24,4 @@ webpki = "0.21" webpki-roots = "0.21" [dev-dependencies] -libp2p-tcp = { path = "../tcp", features = ["async-std"] } +libp2p-tcp = { path = "../tcp" } From b571413109221f332b33194bc3e6374abcdce9d2 Mon Sep 17 00:00:00 2001 From: David Craven Date: Sat, 14 Nov 2020 00:02:02 +0100 Subject: [PATCH 2/4] Let transports decide when to translate. --- core/src/either.rs | 7 +++++++ core/src/network.rs | 4 ++-- core/src/transport.rs | 3 +++ core/src/transport/and_then.rs | 4 ++++ core/src/transport/boxed.rs | 9 +++++++++ core/src/transport/choice.rs | 8 ++++++++ core/src/transport/dummy.rs | 4 ++++ core/src/transport/map.rs | 4 ++++ core/src/transport/map_err.rs | 4 ++++ core/src/transport/memory.rs | 4 ++++ core/src/transport/optional.rs | 8 ++++++++ core/src/transport/timeout.rs | 4 ++++ core/src/transport/upgrade.rs | 8 ++++++++ transports/dns/src/lib.rs | 4 ++++ transports/uds/src/lib.rs | 4 ++++ transports/wasm-ext/src/lib.rs | 4 ++++ transports/websocket/src/framed.rs | 5 ++++- transports/websocket/src/lib.rs | 4 ++++ 18 files changed, 89 insertions(+), 3 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index 48257fc6914..4d991936121 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -477,4 +477,11 @@ where }, } } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + match self { + EitherTransport::Left(a) => a.address_translation(server, observed), + EitherTransport::Right(b) => b.address_translation(server, observed), + } + } } diff --git a/core/src/network.rs b/core/src/network.rs index 5819ba26be0..fbd8e4c9a79 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -30,7 +30,6 @@ use crate::{ Executor, Multiaddr, PeerId, - address_translation, connection::{ ConnectionId, ConnectionLimit, @@ -198,8 +197,9 @@ where TMuxer: 'a, THandler: 'a, { + let transport = self.listeners.transport(); let mut addrs: Vec<_> = self.listen_addrs() - .filter_map(move |server| address_translation(server, observed_addr)) + .filter_map(move |server| transport.address_translation(server, observed_addr)) .collect(); // remove duplicates diff --git a/core/src/transport.rs b/core/src/transport.rs index 50499ec1b82..802bf027a82 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -128,6 +128,9 @@ pub trait Transport { where Self: Sized; + /// Perform transport specific multiaddr translation. + fn address_translation(&self, _server: &Multiaddr, observed: &Multiaddr) -> Option; + /// Boxes the transport, including custom transport errors. fn boxed(self) -> boxed::Boxed where diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index ba7513283c6..22018729a07 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -69,6 +69,10 @@ where }; Ok(future) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Custom `Stream` to avoid boxing. diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index 7f2e721e81f..5322b517dbe 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -51,6 +51,7 @@ type ListenerUpgrade = Pin> + Send>>; trait Abstract { fn listen_on(&self, addr: Multiaddr) -> Result, TransportError>; fn dial(&self, addr: Multiaddr) -> Result, TransportError>; + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; } impl Abstract for T @@ -78,6 +79,10 @@ where .map_err(|e| e.map(box_err))?; Ok(Box::pin(fut) as Dial<_>) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + Transport::address_translation(self, server, observed) + } } impl fmt::Debug for Boxed { @@ -108,6 +113,10 @@ impl Transport for Boxed { fn dial(self, addr: Multiaddr) -> Result> { self.inner.dial(addr) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } fn box_err(e: E) -> io::Error { diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index c6593912761..3488b06884d 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -74,4 +74,12 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + if let Some(addr) = self.0.address_translation(server, observed) { + Some(addr) + } else { + self.1.address_translation(server, observed) + } + } } diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index 0f9ee6725da..5839a6a5928 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -67,6 +67,10 @@ impl Transport for DummyTransport { fn dial(self, addr: Multiaddr) -> Result> { Err(TransportError::MultiaddrNotSupported(addr)) } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } /// Implementation of `AsyncRead` and `AsyncWrite`. Not meant to be instanciated. diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index f9fb2cf7d49..0305af6626d 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -57,6 +57,10 @@ where let p = ConnectedPoint::Dialer { address: addr }; Ok(MapFuture { inner: future, args: Some((self.fun, p)) }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Custom `Stream` implementation to avoid boxing. diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index 90e65eb29d8..c0be6485204 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -64,6 +64,10 @@ where Err(err) => Err(err.map(map)), } } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Listening stream for `MapErr`. diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index e7d306302a8..366abd4e9c8 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -191,6 +191,10 @@ impl Transport for MemoryTransport { DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable)) } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } /// Error that can be produced from the `MemoryTransport`. diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index 283b50d71f7..2b29773ee22 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -74,4 +74,12 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + if let Some(inner) = &self.0 { + inner.address_translation(server, observed) + } else { + None + } + } } diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index dc29af81c50..d55d007df08 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -101,6 +101,10 @@ where timer: Delay::new(self.outgoing_timeout), }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } // TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 4304314b91f..b2cb7b46804 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -334,6 +334,10 @@ where fn listen_on(self, addr: Multiaddr) -> Result> { self.0.listen_on(addr) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.0.address_translation(server, observed) + } } /// An inbound or outbound upgrade. @@ -383,6 +387,10 @@ where upgrade: self.upgrade }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } /// Errors produced by a transport upgrade. diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index b9bd3763e4a..0f4d33cbd46 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -202,6 +202,10 @@ where Ok(future.boxed().right_future()) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } /// Error that can be generated by the DNS layer. diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index 05efae630bb..ce698e22f5c 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -109,6 +109,10 @@ impl Transport for $uds_config { Err(TransportError::MultiaddrNotSupported(addr)) } } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } }; diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 8c0e50129f0..4aeb906d06f 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -206,6 +206,10 @@ impl Transport for ExtTransport { inner: SendWrapper::new(promise.into()), }) } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } /// Future that dial a remote through an external transport. diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 9f6d6efd3f3..718f7f95c5a 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -262,6 +262,10 @@ where Ok(Box::pin(future)) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } impl WsConfig @@ -586,4 +590,3 @@ where .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } } - diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 10026a4298f..0ee346fdfa8 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -113,6 +113,10 @@ where fn dial(self, addr: Multiaddr) -> Result> { self.transport.map(wrap_connection as WrapperFn).dial(addr) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Type alias corresponding to `framed::WsConfig::Listener`. From 6f4603fb622c31da1a08e4f0ad97c67e77326155 Mon Sep 17 00:00:00 2001 From: David Craven Date: Sat, 14 Nov 2020 00:03:48 +0100 Subject: [PATCH 3/4] Improve tcp transport. --- transports/tcp/Cargo.toml | 18 +- transports/tcp/src/if_task.rs | 144 +++++++ transports/tcp/src/lib.rs | 770 ++++++++++++++-------------------- 3 files changed, 469 insertions(+), 463 deletions(-) create mode 100644 transports/tcp/src/if_task.rs diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 9717c394e96..8827839180c 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -10,16 +10,14 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-std = { version = "1.6.5", optional = true } -futures = "0.3.1" -futures-timer = "3.0" -if-addrs = "0.6.4" -ipnet = "2.0.0" +async-io = "1.2.0" +futures = "0.3.8" +if-watch = "0.1.4" +libc = "0.2.80" libp2p-core = { version = "0.25.0", path = "../../core" } -log = "0.4.1" -socket2 = { version = "0.3.12" } -tokio = { version = "0.3", default-features = false, features = ["net"], optional = true } +log = "0.4.11" +socket2 = { version = "0.3.17", features = ["reuseport"] } [dev-dependencies] -libp2p-tcp = { path = ".", features = ["async-std"] } - +async-std = { version = "1.7.0", features = ["attributes"] } +env_logger = "0.8.2" diff --git a/transports/tcp/src/if_task.rs b/transports/tcp/src/if_task.rs new file mode 100644 index 00000000000..dcea2cc7c43 --- /dev/null +++ b/transports/tcp/src/if_task.rs @@ -0,0 +1,144 @@ +use futures::{ + channel::{mpsc, oneshot}, + prelude::*, + select, +}; +use if_watch::{IfEvent, IfWatcher, IpNet}; +use std::{io, net::SocketAddr}; + +#[derive(Clone, Copy, Debug)] +pub enum IfTaskEvent { + NewAddress(SocketAddr), + AddressExpired(SocketAddr), +} + +#[derive(Debug)] +enum Command { + RegisterListener(SocketAddr, T, mpsc::UnboundedSender), + PortReuseSocket(SocketAddr, oneshot::Sender>), +} + +#[derive(Clone, Debug)] +pub struct IfTaskHandle { + tx: mpsc::UnboundedSender>, +} + +impl IfTaskHandle { + pub async fn new() -> io::Result { + // channel is unbounded so that `register_listener` doesn't need to be async. + let (tx, rx) = mpsc::unbounded(); + let task = Task::new(rx).await?.spawn(); + std::thread::spawn(move || async_io::block_on(task)); + Ok(Self { tx }) + } + + pub fn register_listener( + &self, + socket_addr: &SocketAddr, + user_data: T, + ) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + let cmd = Command::RegisterListener(*socket_addr, user_data, tx); + self.tx.unbounded_send(cmd).expect("task paniced"); + rx + } + + pub async fn port_reuse_socket(&self, socket_addr: &SocketAddr) -> Option { + let (tx, rx) = oneshot::channel(); + let cmd = Command::PortReuseSocket(*socket_addr, tx); + self.tx.unbounded_send(cmd).expect("task paniced"); + rx.await.expect("task paniced") + } +} + +#[derive(Debug)] +struct Task { + /// Command receiver. + rx: mpsc::UnboundedReceiver>, + /// Interface watcher. + watcher: IfWatcher, + /// Listening addresses. + listening_addrs: Vec<(SocketAddr, T, mpsc::UnboundedSender)>, +} + +impl Task { + async fn new(rx: mpsc::UnboundedReceiver>) -> io::Result { + Ok(Self { + rx, + watcher: IfWatcher::new().await?, + listening_addrs: Default::default(), + }) + } + + async fn spawn(mut self) { + let mut rx = self.rx.fuse(); + loop { + select! { + if_event = self.watcher.next().fuse() => { + let event = match if_event { + Ok(event) => event, + Err(err) => { + log::error!("{:?}", err); + continue; + } + }; + self.listening_addrs.retain(|(addr, _, tx)| { + match event { + IfEvent::Up(inet) => { + if let Some(addr) = iface_match(&inet, &addr) { + tx.unbounded_send(IfTaskEvent::NewAddress(addr)).is_ok() + } else { + !tx.is_closed() + } + } + IfEvent::Down(inet) => { + if let Some(addr) = iface_match(&inet, &addr) { + tx.unbounded_send(IfTaskEvent::AddressExpired(addr)).is_ok() + } else { + !tx.is_closed() + } + } + } + }); + } + cmd = rx.next() => { + match cmd { + Some(Command::RegisterListener(socket_addr, user_data, tx)) => { + for iface in self.watcher.iter() { + if let Some(addr) = iface_match(iface, &socket_addr) { + tx.unbounded_send(IfTaskEvent::NewAddress(addr)).ok(); + } + } + self.listening_addrs.push((socket_addr, user_data, tx)); + } + Some(Command::PortReuseSocket(socket_addr, tx)) => { + let mut reuse_socket = None; + for (addr, user_data, _) in &self.listening_addrs { + if addr.ip().is_ipv4() == socket_addr.ip().is_ipv4() + && addr.ip().is_loopback() == socket_addr.ip().is_loopback() { + reuse_socket = Some(user_data.clone()); + break; + } + } + tx.send(reuse_socket).ok(); + } + None => {} + } + } + } + } + } +} + +fn iface_match(inet: &IpNet, socket_addr: &SocketAddr) -> Option { + let matches = if socket_addr.ip().is_unspecified() { + socket_addr.ip().is_ipv4() == inet.addr().is_ipv4() + } else { + inet.addr() == socket_addr.ip() + }; + if matches { + Some(SocketAddr::new(inet.addr(), socket_addr.port())) + } else { + None + } +} diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index b08062fe114..cfcdb4bcde1 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -29,54 +29,60 @@ //! `core` library. See the documentation of `core` and of libp2p in general to learn how to //! use the `Transport` trait. -use futures::{future::{self, Ready}, prelude::*}; -use futures_timer::Delay; -use if_addrs::{IfAddr, get_if_addrs}; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; +mod if_task; + +use async_io::{Async, Timer}; +use futures::{ + channel::mpsc, + future::{self, Ready}, + prelude::*, +}; +use if_task::{IfTaskEvent, IfTaskHandle}; use libp2p_core::{ - Transport, - multiaddr::{Protocol, Multiaddr}, - transport::{ListenerEvent, TransportError} + address_translation, + multiaddr::{Multiaddr, Protocol}, + transport::{ListenerEvent, Transport, TransportError}, }; -use log::{debug, trace}; -use socket2::{Socket, Domain, Type}; +use socket2::{Domain, Socket, Type}; use std::{ - collections::VecDeque, - convert::TryFrom, io, - iter::{self, FromIterator}, - net::{IpAddr, SocketAddr}, + net::{SocketAddr, TcpListener, TcpStream}, pin::Pin, task::{Context, Poll}, - time::Duration + time::Duration, }; -macro_rules! codegen { - ($feature_name:expr, $tcp_config:ident, $tcp_trans_stream:ident, $tcp_listen_stream:ident, $apply_config:ident, $tcp_stream:ty, $tcp_listener:ty) => { - /// Represents the configuration for a TCP/IP transport capability for libp2p. /// /// The TCP sockets created by libp2p will need to be progressed by running the futures and streams /// obtained by libp2p through the tokio reactor. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -#[derive(Debug, Clone, Default)] -pub struct $tcp_config { +#[derive(Clone, Debug)] +pub struct TcpConfig { /// How long a listener should sleep after receiving an error, before trying again. sleep_on_error: Duration, /// TTL to set for opened sockets, or `None` to keep default. ttl: Option, /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default. nodelay: Option, + /// Incoming backlog. + backlog: u32, + /// Port reuse. + port_reuse: bool, + /// The task handle. + handle: IfTaskHandle, } -impl $tcp_config { +impl TcpConfig { /// Creates a new configuration object for TCP/IP. - pub fn new() -> $tcp_config { - $tcp_config { + pub async fn new() -> io::Result { + Ok(Self { sleep_on_error: Duration::from_millis(100), ttl: None, nodelay: None, - } + backlog: 1024, + port_reuse: false, + handle: IfTaskHandle::new().await?, + }) } /// Sets the TTL to set for opened sockets. @@ -90,297 +96,198 @@ impl $tcp_config { self.nodelay = Some(value); self } -} -impl Transport for $tcp_config { - type Output = $tcp_trans_stream; - type Error = io::Error; - type Listener = Pin, Self::Error>> + Send>>; - type ListenerUpgrade = Ready>; - type Dial = Pin> + Send>>; + /// Sets the backlog of incoming connections size. + pub fn backlog(mut self, backlog: u32) -> Self { + self.backlog = backlog; + self + } - fn listen_on(self, addr: Multiaddr) -> Result> { - let socket_addr = - if let Ok(sa) = multiaddr_to_socketaddr(&addr) { - sa - } else { - return Err(TransportError::MultiaddrNotSupported(addr)) - }; + /// Enables or disables port reuse for outgoing connections. + pub fn port_reuse(mut self, port_reuse: bool) -> Self { + self.port_reuse = port_reuse; + self + } - async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr) - -> Result>, io::Error>, io::Error>>, io::Error> - { - let socket = if socket_addr.is_ipv4() { - Socket::new(Domain::ipv4(), Type::stream(), Some(socket2::Protocol::tcp()))? - } else { - let s = Socket::new(Domain::ipv6(), Type::stream(), Some(socket2::Protocol::tcp()))?; - s.set_only_v6(true)?; - s - }; - if cfg!(target_family = "unix") { - socket.set_reuse_address(true)?; - } - socket.bind(&socket_addr.into())?; - socket.listen(1024)?; // we may want to make this configurable - - // Note: Tokio's TcpListener::from_std, which the TcpListener's TryFrom implementation - // uses, does not set the socket into non-blocking mode. - #[cfg(feature = "tokio")] - socket.set_nonblocking(true); - - let listener = <$tcp_listener>::try_from(socket.into_tcp_listener()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - let local_addr = listener.local_addr()?; - let port = local_addr.port(); - - // Determine all our listen addresses which is either a single local IP address - // or (if a wildcard IP address was used) the addresses of all our interfaces, - // as reported by `get_if_addrs`. - let addrs = - if socket_addr.ip().is_unspecified() { - let addrs = host_addresses(port)?; - debug!("Listening on {:?}", addrs.iter().map(|(_, _, ma)| ma).collect::>()); - Addresses::Many(addrs) - } else { - let ma = ip_to_multiaddr(local_addr.ip(), port); - debug!("Listening on {:?}", ma); - Addresses::One(ma) - }; - - // Generate `NewAddress` events for each new `Multiaddr`. - let pending = match addrs { - Addresses::One(ref ma) => { - let event = ListenerEvent::NewAddress(ma.clone()); - let mut list = VecDeque::new(); - list.push_back(Ok(event)); - list - } - Addresses::Many(ref aa) => { - aa.iter() - .map(|(_, _, ma)| ma) - .cloned() - .map(ListenerEvent::NewAddress) - .map(Result::Ok) - .collect::>() - } - }; + fn create_socket(&self, socket_addr: &SocketAddr) -> io::Result { + let domain = if socket_addr.is_ipv4() { + Domain::ipv4() + } else { + Domain::ipv6() + }; + let socket = Socket::new(domain, Type::stream(), Some(socket2::Protocol::tcp()))?; + if socket_addr.is_ipv6() { + socket.set_only_v6(true)?; + } + if let Some(ttl) = self.ttl { + socket.set_ttl(ttl)?; + } + if let Some(nodelay) = self.nodelay { + socket.set_nodelay(nodelay)?; + } + socket.set_reuse_address(true)?; + #[cfg(unix)] + if self.port_reuse { + socket.set_reuse_port(true)?; + } + Ok(socket) + } - let listen_stream = $tcp_listen_stream { - stream: listener, - pause: None, - pause_duration: cfg.sleep_on_error, - port, - addrs, - pending, - config: cfg - }; + fn do_listen(self, socket_addr: SocketAddr) -> io::Result { + let socket = self.create_socket(&socket_addr)?; + socket.bind(&socket_addr.into())?; + socket.listen(self.backlog as _)?; + let listener = Async::new(socket.into_tcp_listener())?; + let socket_addr = listener.get_ref().local_addr()?; + + let rx = self.handle.register_listener(&socket_addr, socket_addr); + let listen_stream = TcpListenStream { + config: self, + listener, + pause: None, + rx, + }; + Ok(listen_stream) + } - Ok(stream::unfold(listen_stream, |s| s.next().map(Some))) + async fn do_dial(self, socket_addr: SocketAddr) -> Result, io::Error> { + let socket = self.create_socket(&socket_addr)?; + if self.port_reuse { + if let Some(socket_addr) = self.handle.port_reuse_socket(&socket_addr).await { + socket.bind(&socket_addr.into())?; + } } + socket.set_nonblocking(true)?; + match socket.connect(&socket_addr.into()) { + Ok(()) => {} + Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + Err(err) => return Err(err), + }; + let stream = Async::new(socket.into_tcp_stream())?; + stream.writable().await?; + Ok(stream) + } +} - Ok(Box::pin(do_listen(self, socket_addr).try_flatten_stream())) +impl Transport for TcpConfig { + type Output = Async; + type Error = io::Error; + type Dial = Pin> + Send>>; + type Listener = TcpListenStream; + type ListenerUpgrade = Ready>; + + fn listen_on(self, addr: Multiaddr) -> Result> { + let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) { + sa + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + log::debug!("listening on {}", socket_addr); + self.do_listen(socket_addr) + .map_err(|e| TransportError::Other(e)) } fn dial(self, addr: Multiaddr) -> Result> { - let socket_addr = - if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { - if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { - debug!("Instantly refusing dialing {}, as it is invalid", addr); - return Err(TransportError::Other(io::ErrorKind::ConnectionRefused.into())) - } - socket_addr - } else { - return Err(TransportError::MultiaddrNotSupported(addr)) - }; - - debug!("Dialing {}", addr); + let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + socket_addr + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + log::debug!("dialing {}", socket_addr); + Ok(Box::pin(self.do_dial(socket_addr))) + } - async fn do_dial(cfg: $tcp_config, socket_addr: SocketAddr) -> Result<$tcp_trans_stream, io::Error> { - let stream = <$tcp_stream>::connect(&socket_addr).await?; - $apply_config(&cfg, &stream)?; - Ok($tcp_trans_stream { inner: stream }) + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + if self.port_reuse { + None + } else { + address_translation(server, observed) } - - Ok(Box::pin(do_dial(self, socket_addr))) } } +type TcpListenerEvent = ListenerEvent, io::Error>>, io::Error>; + /// Stream that listens on an TCP/IP address. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -pub struct $tcp_listen_stream { +pub struct TcpListenStream { + /// The tcp config. + config: TcpConfig, /// The incoming connections. - stream: $tcp_listener, + listener: Async, /// The current pause if any. - pause: Option, - /// How long to pause after an error. - pause_duration: Duration, - /// The port which we use as our listen port in listener event addresses. - port: u16, - /// The set of known addresses. - addrs: Addresses, - /// Temporary buffer of listener events. - pending: Buffer<$tcp_trans_stream>, - /// Original configuration. - config: $tcp_config + pause: Option, + /// Address changes. + rx: mpsc::UnboundedReceiver, } -impl $tcp_listen_stream { - /// Takes ownership of the listener, and returns the next incoming event and the listener. - async fn next(mut self) -> (Result>, io::Error>, io::Error>, Self) { - loop { - if let Some(event) = self.pending.pop_front() { - return (event, self); - } - - if let Some(pause) = self.pause.take() { - let _ = pause.await; - } +impl Stream for TcpListenStream { + type Item = Result; - // TODO: do we get the peer_addr at the same time? - let (sock, _) = match self.stream.accept().await { - Ok(s) => s, - Err(e) => { - debug!("error accepting incoming connection: {}", e); - self.pause = Some(Delay::new(self.pause_duration)); - return (Ok(ListenerEvent::Error(e)), self); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let me = Pin::into_inner(self); + loop { + match Pin::new(&mut me.rx).poll_next(cx) { + Poll::Ready(Some(IfTaskEvent::NewAddress(socket_addr))) => { + let addr = socketaddr_to_multiaddr(&socket_addr); + log::debug!("new address {}", addr); + return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(addr)))); } - }; - - let sock_addr = match sock.peer_addr() { - Ok(addr) => addr, - Err(err) => { - debug!("Failed to get peer address: {:?}", err); - continue + Poll::Ready(Some(IfTaskEvent::AddressExpired(socket_addr))) => { + let addr = socketaddr_to_multiaddr(&socket_addr); + log::debug!("address expired {}", addr); + return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(addr)))); } - }; + Poll::Pending => {} + Poll::Ready(None) => {} + } - let local_addr = match sock.local_addr() { - Ok(sock_addr) => { - if let Addresses::Many(ref mut addrs) = self.addrs { - if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) { - return (Ok(ListenerEvent::Error(err)), self); - } + if let Some(mut pause) = me.pause.take() { + match Pin::new(&mut pause).poll(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + me.pause = Some(pause); + return Poll::Pending; } - ip_to_multiaddr(sock_addr.ip(), sock_addr.port()) } - Err(err) => { - debug!("Failed to get local address of incoming socket: {:?}", err); - continue - } - }; - - let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); + } - match $apply_config(&self.config, &sock) { - Ok(()) => { - trace!("Incoming connection from {} at {}", remote_addr, local_addr); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::ok($tcp_trans_stream { inner: sock }), - local_addr, - remote_addr - })) + match me.listener.poll_readable(cx) { + Poll::Ready(Ok(())) => {} + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))), + } + let (stream, sock_addr) = match me.listener.accept().now_or_never() { + Some(Ok(res)) => res, + Some(Err(e)) => { + log::error!("error accepting incoming connection: {}", e); + me.pause = Some(Timer::after(me.config.sleep_on_error)); + return Poll::Ready(Some(Ok(ListenerEvent::Error(e)))); } + None => unreachable!(), + }; + let local_addr = match stream.get_ref().local_addr() { + Ok(sock_addr) => socketaddr_to_multiaddr(&sock_addr), Err(err) => { - debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::err(err), - local_addr, - remote_addr - })) + log::error!("Failed to get local address of incoming socket: {:?}", err); + continue; } - } - } - } -} - -/// Wraps around a `TcpStream` and adds logging for important events. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -#[derive(Debug)] -pub struct $tcp_trans_stream { - inner: $tcp_stream, -} - -impl Drop for $tcp_trans_stream { - fn drop(&mut self) { - if let Ok(addr) = self.inner.peer_addr() { - debug!("Dropped TCP connection to {:?}", addr); - } else { - debug!("Dropped TCP connection to undeterminate peer"); + }; + let remote_addr = socketaddr_to_multiaddr(&sock_addr); + + log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); + return Poll::Ready(Some(Ok(ListenerEvent::Upgrade { + upgrade: future::ok(stream), + local_addr, + remote_addr, + }))); } } } -/// Applies the socket configuration parameters to a socket. -fn $apply_config(config: &$tcp_config, socket: &$tcp_stream) -> Result<(), io::Error> { - if let Some(ttl) = config.ttl { - socket.set_ttl(ttl)?; - } - - if let Some(nodelay) = config.nodelay { - socket.set_nodelay(nodelay)?; - } - - Ok(()) -} - -}; -} - -#[cfg(feature = "async-std")] -codegen!("async-std", TcpConfig, TcpTransStream, TcpListenStream, apply_config_async_std, async_std::net::TcpStream, async_std::net::TcpListener); - -#[cfg(feature = "tokio")] -codegen!("tokio", TokioTcpConfig, TokioTcpTransStream, TokioTcpListenStream, apply_config_tokio, tokio::net::TcpStream, tokio::net::TcpListener); - -#[cfg(feature = "async-std")] -impl AsyncRead for TcpTransStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) - } -} - -#[cfg(feature = "async-std")] -impl AsyncWrite for TcpTransStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - AsyncWrite::poll_close(Pin::new(&mut self.inner), cx) - } -} - -#[cfg(feature = "tokio")] -impl AsyncRead for TokioTcpTransStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - // Adapted from - // https://github.com/tokio-rs/tokio/blob/6d99e1c7dec4c6a37c4c7bf2801bc82cc210351d/tokio-util/src/compat.rs#L126. - let mut read_buf = tokio::io::ReadBuf::new(buf); - futures::ready!(tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inner), cx, &mut read_buf))?; - Poll::Ready(Ok(read_buf.filled().len())) - } -} - -#[cfg(feature = "tokio")] -impl AsyncWrite for TokioTcpTransStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.inner), cx) - } -} - // This type of logic should probably be moved into the multiaddr package fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { let mut iter = addr.iter(); @@ -399,162 +306,19 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { } // Create a [`Multiaddr`] from the given IP address and port number. -fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { - let proto = match ip { - IpAddr::V4(ip) => Protocol::Ip4(ip), - IpAddr::V6(ip) => Protocol::Ip6(ip) - }; - let it = iter::once(proto).chain(iter::once(Protocol::Tcp(port))); - Multiaddr::from_iter(it) -} - -// Collect all local host addresses and use the provided port number as listen port. -fn host_addresses(port: u16) -> io::Result> { - let mut addrs = Vec::new(); - for iface in get_if_addrs()? { - let ip = iface.ip(); - let ma = ip_to_multiaddr(ip, port); - let ipn = match iface.addr { - IfAddr::V4(ip4) => { - let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); - let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u32, so can not exceed 32"); - IpNet::V4(ipnet) - } - IfAddr::V6(ip6) => { - let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); - let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u128, so can not exceed 128"); - IpNet::V6(ipnet) - } - }; - addrs.push((ip, ipn, ma)) - } - Ok(addrs) -} - -/// Listen address information. -#[derive(Debug)] -enum Addresses { - /// A specific address is used to listen. - One(Multiaddr), - /// A set of addresses is used to listen. - Many(Vec<(IpAddr, IpNet, Multiaddr)>) -} - -type Buffer = VecDeque>, io::Error>, io::Error>>; - -// If we listen on all interfaces, find out to which interface the given -// socket address belongs. In case we think the address is new, check -// all host interfaces again and report new and expired listen addresses. -fn check_for_interface_changes( - socket_addr: &SocketAddr, - listen_port: u16, - listen_addrs: &mut Vec<(IpAddr, IpNet, Multiaddr)>, - pending: &mut Buffer -) -> Result<(), io::Error> { - // Check for exact match: - if listen_addrs.iter().find(|(ip, ..)| ip == &socket_addr.ip()).is_some() { - return Ok(()) - } - - // No exact match => check netmask - if listen_addrs.iter().find(|(_, net, _)| net.contains(&socket_addr.ip())).is_some() { - return Ok(()) - } - - // The local IP address of this socket is new to us. - // We check for changes in the set of host addresses and report new - // and expired addresses. - // - // TODO: We do not detect expired addresses unless there is a new address. - let old_listen_addrs = std::mem::replace(listen_addrs, host_addresses(listen_port)?); - - // Check for addresses no longer in use. - for (ip, _, ma) in old_listen_addrs.iter() { - if listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { - debug!("Expired listen address: {}", ma); - pending.push_back(Ok(ListenerEvent::AddressExpired(ma.clone()))); - } - } - - // Check for new addresses. - for (ip, _, ma) in listen_addrs.iter() { - if old_listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { - debug!("New listen address: {}", ma); - pending.push_back(Ok(ListenerEvent::NewAddress(ma.clone()))); - } - } - - // We should now be able to find the local address, if not something - // is seriously wrong and we report an error. - if listen_addrs.iter() - .find(|(ip, net, _)| ip == &socket_addr.ip() || net.contains(&socket_addr.ip())) - .is_none() - { - let msg = format!("{} does not match any listen address", socket_addr.ip()); - return Err(io::Error::new(io::ErrorKind::Other, msg)) - } - - Ok(()) +fn socketaddr_to_multiaddr(addr: &SocketAddr) -> Multiaddr { + Multiaddr::empty() + .with(addr.ip().into()) + .with(Protocol::Tcp(addr.port())) } #[cfg(test)] mod tests { - use futures::prelude::*; - use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use super::multiaddr_to_socketaddr; - #[cfg(feature = "async-std")] - use super::TcpConfig; - - #[test] - #[cfg(feature = "async-std")] - fn wildcard_expansion() { - fn test(addr: Multiaddr) { - let mut listener = TcpConfig::new().listen_on(addr).expect("listener"); - - // Get the first address. - let addr = futures::executor::block_on_stream(listener.by_ref()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - // Process all initial `NewAddress` events and make sure they - // do not contain wildcard address or port. - let server = listener - .take_while(|event| match event.as_ref().unwrap() { - ListenerEvent::NewAddress(a) => { - let mut iter = a.iter(); - match iter.next().expect("ip address") { - Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), - Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), - other => panic!("Unexpected protocol: {}", other) - } - if let Protocol::Tcp(port) = iter.next().expect("port") { - assert_ne!(0, port) - } else { - panic!("No TCP port in address: {}", a) - } - futures::future::ready(true) - } - _ => futures::future::ready(false) - }) - .for_each(|_| futures::future::ready(())); - - let client = TcpConfig::new().dial(addr).expect("dialer"); - async_std::task::block_on(futures::future::join(server, client)).1.unwrap(); - } - - test("/ip4/0.0.0.0/tcp/0".parse().unwrap()); - test("/ip6/::1/tcp/0".parse().unwrap()); - } + use super::*; #[test] fn multiaddr_to_tcp_conversion() { - use std::net::Ipv6Addr; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; assert!( multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) @@ -602,36 +366,36 @@ mod tests { } #[test] - #[cfg(feature = "async-std")] fn communicating_between_dialer_and_listener() { + env_logger::try_init().ok(); + fn test(addr: Multiaddr) { - let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); - let mut ready_tx = Some(ready_tx); + let (mut ready_tx, mut ready_rx) = mpsc::channel(1); async_std::task::spawn(async move { - let tcp = TcpConfig::new(); + let tcp = TcpConfig::new().await.unwrap(); let mut listener = tcp.listen_on(addr).unwrap(); loop { match listener.next().await.unwrap().unwrap() { ListenerEvent::NewAddress(listen_addr) => { - ready_tx.take().unwrap().send(listen_addr).unwrap(); - }, + ready_tx.send(listen_addr).await.unwrap(); + } ListenerEvent::Upgrade { upgrade, .. } => { let mut upgrade = upgrade.await.unwrap(); let mut buf = [0u8; 3]; upgrade.read_exact(&mut buf).await.unwrap(); assert_eq!(buf, [1, 2, 3]); upgrade.write_all(&[4, 5, 6]).await.unwrap(); - }, - _ => unreachable!() + } + _ => unreachable!(), } } }); async_std::task::block_on(async move { - let addr = ready_rx.await.unwrap(); - let tcp = TcpConfig::new(); + let addr = ready_rx.next().await.unwrap(); + let tcp = TcpConfig::new().await.unwrap(); // Obtain a future socket through dialing let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); @@ -648,15 +412,110 @@ mod tests { } #[test] - #[cfg(feature = "async-std")] - fn replace_port_0_in_returned_multiaddr_ipv4() { - let tcp = TcpConfig::new(); + fn wildcard_expansion() { + env_logger::try_init().ok(); + + fn test(addr: Multiaddr) { + let (mut ready_tx, mut ready_rx) = mpsc::channel(1); + + async_std::task::spawn(async move { + let tcp = TcpConfig::new().await.unwrap(); + let mut listener = tcp.listen_on(addr).unwrap(); + + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(a) => { + let mut iter = a.iter(); + match iter.next().expect("ip address") { + Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), + Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), + other => panic!("Unexpected protocol: {}", other), + } + if let Protocol::Tcp(port) = iter.next().expect("port") { + assert_ne!(0, port) + } else { + panic!("No TCP port in address: {}", a) + } + ready_tx.send(a).await.ok(); + } + _ => {} + } + } + }); + + async_std::task::block_on(async move { + let dest_addr = ready_rx.next().await.unwrap(); + let tcp = TcpConfig::new().await.unwrap(); + tcp.dial(dest_addr).unwrap().await.unwrap(); + }); + } + + test("/ip4/0.0.0.0/tcp/0".parse().unwrap()); + test("/ip6/::1/tcp/0".parse().unwrap()); + } + + #[test] + fn port_reuse() { + env_logger::try_init().ok(); + + fn test(addr: Multiaddr) { + let (mut ready_tx, mut ready_rx) = mpsc::channel(1); + let addr2 = addr.clone(); + + async_std::task::spawn(async move { + let tcp = TcpConfig::new().await.unwrap(); + let mut listener = tcp.listen_on(addr).unwrap(); + + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.send(listen_addr).await.ok(); + } + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + } + _ => unreachable!(), + } + } + }); + + async_std::task::block_on(async move { + let dest_addr = ready_rx.next().await.unwrap(); + let tcp = TcpConfig::new().await.unwrap().port_reuse(true); + let _listener = tcp.clone().listen_on(addr2).unwrap(); + + // Obtain a future socket through dialing + let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + }); + } + + test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); + test("/ip6/::1/tcp/0".parse().unwrap()); + } + + #[async_std::test] + async fn replace_port_0_in_returned_multiaddr_ipv4() { + env_logger::try_init().ok(); + + let tcp = TcpConfig::new().await.unwrap(); let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); assert!(addr.to_string().contains("tcp/0")); - let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) + let new_addr = tcp + .listen_on(addr) + .unwrap() .next() + .await .expect("some event") .expect("no error") .into_new_address() @@ -665,16 +524,20 @@ mod tests { assert!(!new_addr.to_string().contains("tcp/0")); } - #[test] - #[cfg(feature = "async-std")] - fn replace_port_0_in_returned_multiaddr_ipv6() { - let tcp = TcpConfig::new(); + #[async_std::test] + async fn replace_port_0_in_returned_multiaddr_ipv6() { + env_logger::try_init().ok(); + + let tcp = TcpConfig::new().await.unwrap(); let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); assert!(addr.to_string().contains("tcp/0")); - let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) + let new_addr = tcp + .listen_on(addr) + .unwrap() .next() + .await .expect("some event") .expect("no error") .into_new_address() @@ -683,10 +546,11 @@ mod tests { assert!(!new_addr.to_string().contains("tcp/0")); } - #[test] - #[cfg(feature = "async-std")] - fn larger_addr_denied() { - let tcp = TcpConfig::new(); + #[async_std::test] + async fn larger_addr_denied() { + env_logger::try_init().ok(); + + let tcp = TcpConfig::new().await.unwrap(); let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345" .parse::() From 65331c283f544b97f391bba42ee90d99d75cd625 Mon Sep 17 00:00:00 2001 From: David Craven Date: Tue, 1 Dec 2020 21:01:25 +0100 Subject: [PATCH 4/4] Update stuff. --- Cargo.toml | 2 +- core/Cargo.toml | 2 +- core/src/connection/listeners.rs | 9 +- core/tests/network_dial_error.rs | 26 ++-- core/tests/util.rs | 3 +- examples/chat-tokio.rs | 174 ----------------------- examples/chat.rs | 5 +- examples/distributed-key-value-store.rs | 5 +- examples/gossipsub-chat.rs | 5 +- examples/ipfs-kad.rs | 5 +- examples/ipfs-private.rs | 9 +- examples/ping.rs | 5 +- muxers/mplex/tests/async_write.rs | 4 +- muxers/mplex/tests/two_peers.rs | 8 +- protocols/deflate/tests/test.rs | 2 +- protocols/identify/src/identify.rs | 3 +- protocols/identify/src/protocol.rs | 4 +- protocols/noise/Cargo.toml | 1 + protocols/noise/src/lib.rs | 7 +- protocols/noise/tests/smoke.rs | 26 ++-- protocols/ping/tests/ping.rs | 22 +-- protocols/request-response/tests/ping.rs | 35 ++--- protocols/secio/src/lib.rs | 6 +- src/bandwidth.rs | 4 + src/lib.rs | 35 +++-- transports/dns/src/lib.rs | 4 + transports/websocket/src/lib.rs | 2 +- 27 files changed, 125 insertions(+), 288 deletions(-) delete mode 100644 examples/chat-tokio.rs diff --git a/Cargo.toml b/Cargo.toml index 42eeb01c343..37737b918fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ libp2p-tcp = { version = "0.25.1", path = "transports/tcp", optional = true } libp2p-websocket = { version = "0.26.0", path = "transports/websocket", optional = true } [dev-dependencies] -async-std = "1.6.2" +async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.8.1" tokio = { version = "0.3", features = ["io-util", "io-std", "stream", "macros", "rt", "rt-multi-thread"] } diff --git a/core/Cargo.toml b/core/Cargo.toml index e73c71ea742..dc86108d45a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -40,7 +40,7 @@ zeroize = "1" ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false } [dev-dependencies] -async-std = "1.6.2" +async-std = { version = "1.6.2", features = ["attributes"] } libp2p-mplex = { path = "../muxers/mplex" } libp2p-noise = { path = "../protocols/noise" } libp2p-tcp = { path = "../transports/tcp" } diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index 962c14a7b0d..30e33925a72 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -41,10 +41,12 @@ use std::{collections::VecDeque, fmt, pin::Pin}; /// # Example /// /// ```no_run +/// # #[async_std::main] +/// # async fn main() { /// use futures::prelude::*; /// use libp2p_core::connection::{ListenersEvent, ListenersStream}; /// -/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new()); +/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new().await.unwrap()); /// /// // Ask the `listeners` to start listening on the given multiaddress. /// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); @@ -74,6 +76,7 @@ use std::{collections::VecDeque, fmt, pin::Pin}; /// } /// } /// }) +/// # } /// ``` pub struct ListenersStream where @@ -428,6 +431,8 @@ mod tests { fn dial(self, _: Multiaddr) -> Result> { panic!() } + + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } } async_std::task::block_on(async move { @@ -466,6 +471,8 @@ mod tests { fn dial(self, _: Multiaddr) -> Result> { panic!() } + + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } } async_std::task::block_on(async move { diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 13e532d7016..6597240ccd6 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -41,10 +41,12 @@ fn deny_incoming_connec() { swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); let address = async_std::task::block_on(future::poll_fn(|cx| { - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) { - Poll::Ready(listen_addr) - } else { - panic!("Was expecting the listen address to be reported") + match swarm1.poll(cx) { + Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => { + Poll::Ready(listen_addr) + } + Poll::Pending => Poll::Pending, + _ => panic!("Was expecting the listen address to be reported"), } })); @@ -95,15 +97,15 @@ fn dial_self() { let mut swarm = test_network(NetworkConfig::default()); swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let (local_address, mut swarm) = async_std::task::block_on( - future::lazy(move |cx| { - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) { - Ok::<_, void::Void>((listen_addr, swarm)) - } else { - panic!("Was expecting the listen address to be reported") + let local_address = async_std::task::block_on(future::poll_fn(|cx| { + match swarm.poll(cx) { + Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => { + Poll::Ready(listen_addr) } - })) - .unwrap(); + Poll::Pending => Poll::Pending, + _ => panic!("Was expecting the listen address to be reported"), + } + })); swarm.dial(&local_address, TestHandler()).unwrap(); diff --git a/core/tests/util.rs b/core/tests/util.rs index c20a2c59305..2b6947726d7 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -31,7 +31,8 @@ pub fn test_network(cfg: NetworkConfig) -> TestNetwork { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let noise_keys = noise::Keypair::::new().into_authentic(&local_key).unwrap(); - let transport: TestTransport = tcp::TcpConfig::new() + let transport: TestTransport = async_std::task::block_on(tcp::TcpConfig::new()) + .unwrap() .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(mplex::MplexConfig::new()) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs deleted file mode 100644 index 15577ac6dfc..00000000000 --- a/examples/chat-tokio.rs +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! A basic chat application demonstrating libp2p with the mDNS and floodsub protocols -//! using tokio for all asynchronous tasks and I/O. In order for all used libp2p -//! crates to use tokio, it enables tokio-specific features for some crates. -//! -//! The example is run per node as follows: -//! -//! ```sh -//! cargo run --example chat-tokio --features="tcp-tokio mdns-tokio" -//! ``` -//! -//! Alternatively, to run with the minimal set of features and crates: -//! -//! ```sh -//!cargo run --example chat-tokio \\ -//! --no-default-features \\ -//! --features="floodsub mplex noise tcp-tokio mdns-tokio" -//! ``` - -use futures::prelude::*; -use libp2p::{ - Multiaddr, - NetworkBehaviour, - PeerId, - Swarm, - Transport, - core::upgrade, - identity, - floodsub::{self, Floodsub, FloodsubEvent}, - mdns::{Mdns, MdnsEvent}, - mplex, - noise, - swarm::{NetworkBehaviourEventProcess, SwarmBuilder}, - // `TokioTcpConfig` is available through the `tcp-tokio` feature. - tcp::TokioTcpConfig, -}; -use std::error::Error; -use tokio::io::{self, AsyncBufReadExt}; - -/// The `tokio::main` attribute sets up a tokio runtime. -#[tokio::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - // Create a random PeerId - let id_keys = identity::Keypair::generate_ed25519(); - let peer_id = PeerId::from(id_keys.public()); - println!("Local peer id: {:?}", peer_id); - - // Create a keypair for authenticated encryption of the transport. - let noise_keys = noise::Keypair::::new() - .into_authentic(&id_keys) - .expect("Signing libp2p-noise static DH keypair failed."); - - // Create a tokio-based TCP transport use noise for authenticated - // encryption and Mplex for multiplexing of substreams on a TCP stream. - let transport = TokioTcpConfig::new().nodelay(true) - .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(mplex::MplexConfig::new()) - .boxed(); - - // Create a Floodsub topic - let floodsub_topic = floodsub::Topic::new("chat"); - - // We create a custom network behaviour that combines floodsub and mDNS. - // The derive generates a delegating `NetworkBehaviour` impl which in turn - // requires the implementations of `NetworkBehaviourEventProcess` for - // the events of each behaviour. - #[derive(NetworkBehaviour)] - struct MyBehaviour { - floodsub: Floodsub, - mdns: Mdns, - } - - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `floodsub` produces an event. - fn inject_event(&mut self, message: FloodsubEvent) { - if let FloodsubEvent::Message(message) = message { - println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source); - } - } - } - - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `mdns` produces an event. - fn inject_event(&mut self, event: MdnsEvent) { - match event { - MdnsEvent::Discovered(list) => - for (peer, _) in list { - self.floodsub.add_node_to_partial_view(peer); - } - MdnsEvent::Expired(list) => - for (peer, _) in list { - if !self.mdns.has_node(&peer) { - self.floodsub.remove_node_from_partial_view(&peer); - } - } - } - } - } - - // Create a Swarm to manage peers and events. - let mut swarm = { - let mdns = Mdns::new().await?; - let mut behaviour = MyBehaviour { - floodsub: Floodsub::new(peer_id.clone()), - mdns, - }; - - behaviour.floodsub.subscribe(floodsub_topic.clone()); - - SwarmBuilder::new(transport, behaviour, peer_id) - // We want the connection background tasks to be spawned - // onto the tokio runtime. - .executor(Box::new(|fut| { tokio::spawn(fut); })) - .build() - }; - - // Reach out to another node if specified - if let Some(to_dial) = std::env::args().nth(1) { - let addr: Multiaddr = to_dial.parse()?; - Swarm::dial_addr(&mut swarm, addr)?; - println!("Dialed {:?}", to_dial) - } - - // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); - - // Listen on all interfaces and whatever port the OS assigns - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; - - // Kick it off - let mut listening = false; - loop { - let to_publish = { - tokio::select! { - line = stdin.try_next() => Some((floodsub_topic.clone(), line?.expect("Stdin closed"))), - event = swarm.next() => { - println!("New Event: {:?}", event); - None - } - } - }; - if let Some((topic, line)) = to_publish { - swarm.floodsub.publish(topic, line.as_bytes()); - } - if !listening { - for addr in Swarm::listeners(&swarm) { - println!("Listening on {:?}", addr); - listening = true; - } - } - } -} diff --git a/examples/chat.rs b/examples/chat.rs index 67966e07fab..287cdecf6fc 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -63,7 +63,8 @@ use libp2p::{ }; use std::{error::Error, task::{Context, Poll}}; -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { env_logger::init(); // Create a random PeerId @@ -72,7 +73,7 @@ fn main() -> Result<(), Box> { println!("Local peer id: {:?}", local_peer_id); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols - let transport = libp2p::build_development_transport(local_key)?; + let transport = libp2p::build_development_transport(local_key).await?; // Create a Floodsub topic let floodsub_topic = floodsub::Topic::new("chat"); diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 6d5df32f998..f8d05cff6a2 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -65,7 +65,8 @@ use libp2p::{ }; use std::{error::Error, task::{Context, Poll}}; -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { env_logger::init(); // Create a random key for ourselves. @@ -73,7 +74,7 @@ fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol. - let transport = build_development_transport(local_key)?; + let transport = build_development_transport(local_key).await?; // We create a custom network behaviour that combines Kademlia and mDNS. #[derive(NetworkBehaviour)] diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index f5e33741544..a6d4cb6738b 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -60,7 +60,8 @@ use std::{ task::{Context, Poll}, }; -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { Builder::from_env(Env::default().default_filter_or("info")).init(); // Create a random PeerId @@ -69,7 +70,7 @@ fn main() -> Result<(), Box> { println!("Local peer id: {:?}", local_peer_id); // Set up an encrypted TCP Transport over the Mplex and Yamux protocols - let transport = libp2p::build_development_transport(local_key.clone())?; + let transport = libp2p::build_development_transport(local_key.clone()).await?; // Create a Gossipsub topic let topic = Topic::new("test-net".into()); diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index ec48435db00..2870b49c191 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -40,7 +40,8 @@ use libp2p::kad::{ use libp2p::kad::record::store::MemoryStore; use std::{env, error::Error, time::Duration}; -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { env_logger::init(); // Create a random key for ourselves. @@ -48,7 +49,7 @@ fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol - let transport = build_development_transport(local_key)?; + let transport = build_development_transport(local_key).await?; // Create a swarm to manage peers and events. let mut swarm = { diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index b781efe8b27..eabea4e6db3 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -58,7 +58,7 @@ use std::{ }; /// Builds the transport that serves as a common ground for all connections. -pub fn build_transport( +pub async fn build_transport( key_pair: identity::Keypair, psk: Option, ) -> transport::Boxed<(PeerId, StreamMuxerBox)> @@ -67,7 +67,7 @@ pub fn build_transport( let noise_config = noise::NoiseConfig::xx(noise_keys).into_authenticated(); let yamux_config = YamuxConfig::default(); - let base_transport = TcpConfig::new().nodelay(true); + let base_transport = TcpConfig::new().await.unwrap().nodelay(true); let maybe_encrypted = match psk { Some(psk) => EitherTransport::Left( base_transport.and_then(move |socket, _| PnetConfig::new(psk).handshake(socket)), @@ -136,7 +136,8 @@ fn parse_legacy_multiaddr(text: &str) -> Result> { Ok(res) } -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { env_logger::init(); let ipfs_path: Box = get_ipfs_path(); @@ -154,7 +155,7 @@ fn main() -> Result<(), Box> { } // Set up a an encrypted DNS-enabled TCP Transport over and Yamux protocol - let transport = build_transport(local_key.clone(), psk); + let transport = build_transport(local_key.clone(), psk).await; // Create a Gosspipsub topic let gossipsub_topic = gossipsub::Topic::new("chat".into()); diff --git a/examples/ping.rs b/examples/ping.rs index eb5fa762fc7..d484c41b70a 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -43,7 +43,8 @@ use futures::{future, prelude::*}; use libp2p::{identity, PeerId, ping::{Ping, PingConfig}, Swarm}; use std::{error::Error, task::{Context, Poll}}; -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { env_logger::init(); // Create a random PeerId. @@ -52,7 +53,7 @@ fn main() -> Result<(), Box> { println!("Local peer id: {:?}", peer_id); // Create a transport. - let transport = libp2p::build_development_transport(id_keys)?; + let transport = libp2p::build_development_transport(id_keys).await?; // Create a ping network behaviour. // diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 1414db14847..75208351914 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -32,7 +32,7 @@ fn async_write() { let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| + let transport = TcpConfig::new().await.unwrap().and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport @@ -62,7 +62,7 @@ fn async_write() { async_std::task::block_on(async { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| + let transport = TcpConfig::new().await.unwrap().and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 54b939a548a..4851658ae7d 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -32,7 +32,7 @@ fn client_to_server_outbound() { let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| + let transport = TcpConfig::new().await.unwrap().and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport @@ -62,7 +62,7 @@ fn client_to_server_outbound() { async_std::task::block_on(async { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| + let transport = TcpConfig::new().await.unwrap().and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); @@ -88,7 +88,7 @@ fn client_to_server_inbound() { let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| + let transport = TcpConfig::new().await.unwrap().and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport @@ -123,7 +123,7 @@ fn client_to_server_inbound() { async_std::task::block_on(async { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| + let transport = TcpConfig::new().await.unwrap().and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); diff --git a/protocols/deflate/tests/test.rs b/protocols/deflate/tests/test.rs index 896fb491349..0202479f1e0 100644 --- a/protocols/deflate/tests/test.rs +++ b/protocols/deflate/tests/test.rs @@ -44,7 +44,7 @@ fn lot_of_data() { } async fn run(message1: Vec) { - let transport = TcpConfig::new() + let transport = TcpConfig::new().await.unwrap() .and_then(|conn, endpoint| { upgrade::apply(conn, DeflateConfig::default(), endpoint, upgrade::Version::V1) }); diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 667a3f224be..8c98c6fafb5 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -293,7 +293,8 @@ mod tests { let id_keys = identity::Keypair::generate_ed25519(); let noise_keys = noise::Keypair::::new().into_authentic(&id_keys).unwrap(); let pubkey = id_keys.public(); - let transport = TcpConfig::new() + let transport = async_std::task::block_on(TcpConfig::new()) + .unwrap() .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 79f3bcdd087..affb22d59ea 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -225,7 +225,7 @@ mod tests { let (tx, rx) = oneshot::channel(); let bg_task = async_std::task::spawn(async move { - let transport = TcpConfig::new(); + let transport = TcpConfig::new().await.unwrap(); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -256,7 +256,7 @@ mod tests { }); async_std::task::block_on(async move { - let transport = TcpConfig::new(); + let transport = TcpConfig::new().await.unwrap(); let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let RemoteInfo { info, observed_addr, .. } = diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index a9a8bc210f3..8356dfc00ac 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -28,6 +28,7 @@ snow = { version = "0.7.1", features = ["ring-resolver"], default-features = fal snow = { version = "0.7.1", features = ["default-resolver"], default-features = false } [dev-dependencies] +async-io = "1.2.0" env_logger = "0.8.1" libp2p-tcp = { path = "../../transports/tcp" } quickcheck = "0.9.0" diff --git a/protocols/noise/src/lib.rs b/protocols/noise/src/lib.rs index 6b09f636cd1..f7e921f00e8 100644 --- a/protocols/noise/src/lib.rs +++ b/protocols/noise/src/lib.rs @@ -43,13 +43,14 @@ //! use libp2p_tcp::TcpConfig; //! use libp2p_noise::{Keypair, X25519Spec, NoiseConfig}; //! -//! # fn main() { +//! # fn main() { futures::executor::block_on(async { //! let id_keys = identity::Keypair::generate_ed25519(); //! let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); //! let noise = NoiseConfig::xx(dh_keys).into_authenticated(); -//! let builder = TcpConfig::new().upgrade(upgrade::Version::V1).authenticate(noise); +//! let builder = TcpConfig::new().await.unwrap() +//! .upgrade(upgrade::Version::V1).authenticate(noise); //! // let transport = builder.multiplex(...); -//! # } +//! # }); } //! ``` //! //! [noise]: http://noiseprotocol.org/ diff --git a/protocols/noise/tests/smoke.rs b/protocols/noise/tests/smoke.rs index 744d447a247..8395ad150d3 100644 --- a/protocols/noise/tests/smoke.rs +++ b/protocols/noise/tests/smoke.rs @@ -18,15 +18,16 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use async_io::Async; use futures::{future::{self, Either}, prelude::*}; use libp2p_core::identity; use libp2p_core::upgrade::{self, Negotiated, apply_inbound, apply_outbound}; use libp2p_core::transport::{Transport, ListenerEvent}; use libp2p_noise::{Keypair, X25519, X25519Spec, NoiseConfig, RemoteIdentity, NoiseError, NoiseOutput}; -use libp2p_tcp::{TcpConfig, TcpTransStream}; +use libp2p_tcp::TcpConfig; use log::info; use quickcheck::QuickCheck; -use std::{convert::TryInto, io}; +use std::{convert::TryInto, io, net::TcpStream}; #[allow(dead_code)] fn core_upgrade_compat() { @@ -35,7 +36,8 @@ fn core_upgrade_compat() { let id_keys = identity::Keypair::generate_ed25519(); let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); let noise = NoiseConfig::xx(dh_keys).into_authenticated(); - let _ = TcpConfig::new().upgrade(upgrade::Version::V1).authenticate(noise); + let _ = futures::executor::block_on(TcpConfig::new()).unwrap() + .upgrade(upgrade::Version::V1).authenticate(noise); } #[test] @@ -50,14 +52,14 @@ fn xx_spec() { let client_id_public = client_id.public(); let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); - let server_transport = TcpConfig::new() + let server_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint, upgrade::Version::V1) }) .and_then(move |out, _| expect_identity(out, &client_id_public)); let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); - let client_transport = TcpConfig::new() + let client_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint, upgrade::Version::V1) }) @@ -81,14 +83,14 @@ fn xx() { let client_id_public = client_id.public(); let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); - let server_transport = TcpConfig::new() + let server_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint, upgrade::Version::V1) }) .and_then(move |out, _| expect_identity(out, &client_id_public)); let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); - let client_transport = TcpConfig::new() + let client_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint, upgrade::Version::V1) }) @@ -112,14 +114,14 @@ fn ix() { let client_id_public = client_id.public(); let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); - let server_transport = TcpConfig::new() + let server_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { upgrade::apply(output, NoiseConfig::ix(server_dh), endpoint, upgrade::Version::V1) }) .and_then(move |out, _| expect_identity(out, &client_id_public)); let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); - let client_transport = TcpConfig::new() + let client_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { upgrade::apply(output, NoiseConfig::ix(client_dh), endpoint, upgrade::Version::V1) }) @@ -144,7 +146,7 @@ fn ik_xx() { let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); let server_dh_public = server_dh.public().clone(); - let server_transport = TcpConfig::new() + let server_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { if endpoint.is_listener() { Either::Left(apply_inbound(output, NoiseConfig::ik_listener(server_dh))) @@ -157,7 +159,7 @@ fn ik_xx() { let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); let server_id_public2 = server_id_public.clone(); - let client_transport = TcpConfig::new() + let client_transport = futures::executor::block_on(TcpConfig::new()).unwrap() .and_then(move |output, endpoint| { if endpoint.is_dialer() { Either::Left(apply_outbound(output, @@ -175,7 +177,7 @@ fn ik_xx() { QuickCheck::new().max_tests(30).quickcheck(prop as fn(Vec) -> bool) } -type Output = (RemoteIdentity, NoiseOutput>); +type Output = (RemoteIdentity, NoiseOutput>>); fn run(server_transport: T, client_transport: U, messages: I) where diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 52056be4801..15bba26e1d2 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -62,21 +62,16 @@ fn ping_pong() { let mut count2 = count.get(); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - for l in Swarm::listeners(&swarm1) { - tx.send(l.clone()).await.unwrap(); - } - loop { - match swarm1.next().await { - PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { + match swarm1.next_event().await { + SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(), + SwarmEvent::Behaviour(PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) }) => { count1 -= 1; if count1 == 0 { return (pid1.clone(), peer, rtt) } }, - PingEvent { result: Err(e), .. } => panic!("Ping failure: {:?}", e), + SwarmEvent::Behaviour(PingEvent { result: Err(e), .. }) => panic!("Ping failure: {:?}", e), _ => {} } } @@ -132,16 +127,11 @@ fn max_failures() { Swarm::listen_on(&mut swarm1, addr).unwrap(); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - for l in Swarm::listeners(&swarm1) { - tx.send(l.clone()).await.unwrap(); - } - let mut count1: u8 = 0; loop { match swarm1.next_event().await { + SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(), SwarmEvent::Behaviour(PingEvent { result: Ok(PingSuccess::Ping { .. }), .. }) => { @@ -201,7 +191,7 @@ fn mk_transport(muxer: MuxerChoice) -> ( let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().into_peer_id(); let noise_keys = noise::Keypair::::new().into_authentic(&id_keys).unwrap(); - (peer_id, TcpConfig::new() + (peer_id, async_std::task::block_on(TcpConfig::new()).unwrap() .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 9aa7f093300..7b3996cc678 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -31,7 +31,7 @@ use libp2p_core::{ }; use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; use libp2p_request_response::*; -use libp2p_swarm::Swarm; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_tcp::TcpConfig; use futures::{prelude::*, channel::mpsc}; use rand::{self, Rng}; @@ -64,22 +64,19 @@ fn ping_protocol() { let expected_pong = pong.clone(); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - let l = Swarm::listeners(&swarm1).next().unwrap(); - tx.send(l.clone()).await.unwrap(); - loop { - match swarm1.next().await { - RequestResponseEvent::Message { + match swarm1.next_event().await { + SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(), + SwarmEvent::Behaviour(RequestResponseEvent::Message { peer, message: RequestResponseMessage::Request { request, channel, .. } - } => { + }) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); swarm1.send_response(channel, pong.clone()); }, - e => panic!("Peer1: Unexpected event: {:?}", e) + SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), + _ => {} } } }; @@ -147,21 +144,19 @@ fn ping_protocol_throttled() { swarm2.set_receive_limit(NonZeroU16::new(limit2).unwrap()); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - let l = Swarm::listeners(&swarm1).next().unwrap(); - tx.send(l.clone()).await.unwrap(); for i in 1 .. { - match swarm1.next().await { - throttled::Event::Event(RequestResponseEvent::Message { + match swarm1.next_event().await { + SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(), + SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message { peer, message: RequestResponseMessage::Request { request, channel, .. }, - }) => { + })) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); swarm1.send_response(channel, pong.clone()); }, - e => panic!("Peer1: Unexpected event: {:?}", e) + SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), + _ => {} } if i % 31 == 0 { let lim = rand::thread_rng().gen_range(1, 17); @@ -217,7 +212,8 @@ fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().into_peer_id(); let noise_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); - (peer_id, TcpConfig::new() + (peer_id, async_std::task::block_on(TcpConfig::new()) + .unwrap() .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) @@ -292,4 +288,3 @@ impl RequestResponseCodec for PingCodec { write_one(io, data).await } } - diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index 7332c725044..7356aad986e 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -28,7 +28,7 @@ //! See [`authenticate`](libp2p_core::transport::upgrade::Builder::authenticate). //! //! ```no_run -//! # fn main() { +//! # fn main() { futures::executor::block_on(async { //! use futures::prelude::*; //! use libp2p_secio::{SecioConfig, SecioOutput}; //! use libp2p_core::{PeerId, Multiaddr, identity, upgrade}; @@ -40,7 +40,7 @@ //! let local_keys = identity::Keypair::generate_ed25519(); //! //! // Create a `Transport`. -//! let transport = TcpConfig::new() +//! let transport = TcpConfig::new().await.unwrap() //! .upgrade(upgrade::Version::V1) //! .authenticate(SecioConfig::new(local_keys.clone())) //! .multiplex(MplexConfig::default()); @@ -51,7 +51,7 @@ //! //! // let network = Network::new(transport, local_keys.public().into_peer_id()); //! // let swarm = Swarm::new(transport, behaviour, local_keys.public().into_peer_id()); -//! # } +//! # }); } //! ``` //! diff --git a/src/bandwidth.rs b/src/bandwidth.rs index 705164b52b4..87b66653cfc 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -74,6 +74,10 @@ where .dial(addr) .map(move |fut| BandwidthFuture { inner: fut, sinks }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } /// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth diff --git a/src/lib.rs b/src/lib.rs index 514c352d887..8084055355f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,10 +47,13 @@ //! Example (Dialing a TCP/IP multi-address): //! //! ```rust +//! # #[async_std::main] +//! # async fn main() { //! use libp2p::{Multiaddr, Transport, tcp::TcpConfig}; -//! let tcp = TcpConfig::new(); +//! let tcp = TcpConfig::new().await.unwrap(); //! let addr: Multiaddr = "/ip4/98.97.96.95/tcp/20500".parse().expect("invalid multiaddr"); //! let _conn = tcp.dial(addr); +//! # } //! ``` //! In the above example, `_conn` is a [`Future`] that needs to be polled in order for //! the dialing to take place and eventually resolve to a connection. Polling @@ -87,7 +90,7 @@ //! ```rust //! # #[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp-async-std", feature = "noise", feature = "yamux"))] { //! use libp2p::{Transport, core::upgrade, tcp::TcpConfig, noise, identity::Keypair, yamux}; -//! let tcp = TcpConfig::new(); +//! let tcp = TcpConfig::new().await.unwrap(); //! let id_keys = Keypair::generate_ed25519(); //! let noise_keys = noise::Keypair::::new().into_authentic(&id_keys).unwrap(); //! let noise = noise::NoiseConfig::xx(noise_keys).into_authenticated(); @@ -215,8 +218,8 @@ pub use libp2p_ping as ping; pub use libp2p_plaintext as plaintext; #[doc(inline)] pub use libp2p_swarm as swarm; -#[cfg(any(feature = "tcp-async-std", feature = "tcp-tokio"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "tcp-async-std", feature = "tcp-tokio"))))] +#[cfg(feature = "tcp")] +#[cfg_attr(docsrs, doc(cfg(feature = "tcp")))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] pub use libp2p_tcp as tcp; @@ -268,28 +271,25 @@ pub use self::transport_ext::TransportExt; /// /// > **Note**: This `Transport` is not suitable for production usage, as its implementation /// > reserves the right to support additional protocols or remove deprecated protocols. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] #[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] -pub fn build_development_transport(keypair: identity::Keypair) +pub async fn build_development_transport(keypair: identity::Keypair) -> std::io::Result> { - build_tcp_ws_noise_mplex_yamux(keypair) + build_tcp_ws_noise_mplex_yamux(keypair).await } /// Builds an implementation of `Transport` that is suitable for usage with the `Swarm`. /// /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, /// and mplex or yamux as the multiplexing layer. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] #[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] -pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) +pub async fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) -> std::io::Result> { let transport = { - #[cfg(feature = "tcp-async-std")] - let tcp = tcp::TcpConfig::new().nodelay(true); - #[cfg(feature = "tcp-tokio")] - let tcp = tcp::TokioTcpConfig::new().nodelay(true); + let tcp = tcp::TcpConfig::new().await?.nodelay(true); let transport = dns::DnsConfig::new(tcp)?; let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) @@ -311,16 +311,13 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) /// /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, /// and mplex or yamux as the multiplexing layer. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp", feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))] #[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))))] -pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreSharedKey) +pub async fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreSharedKey) -> std::io::Result> { let transport = { - #[cfg(feature = "tcp-async-std")] - let tcp = tcp::TcpConfig::new().nodelay(true); - #[cfg(feature = "tcp-tokio")] - let tcp = tcp::TokioTcpConfig::new().nodelay(true); + let tcp = tcp::TcpConfig::new().await?.nodelay(true); let transport = dns::DnsConfig::new(tcp)?; let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 0f4d33cbd46..beba6778689 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -293,6 +293,10 @@ mod tests { }; Ok(Box::pin(future::ready(Ok(())))) } + + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { + None + } } futures::executor::block_on(async move { diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 0ee346fdfa8..a827c20b657 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -206,7 +206,7 @@ mod tests { } async fn connect(listen_addr: Multiaddr) { - let ws_config = WsConfig::new(tcp::TcpConfig::new()); + let ws_config = WsConfig::new(tcp::TcpConfig::new().await.unwrap()); let mut listener = ws_config.clone() .listen_on(listen_addr)