Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ jobs:
with:
shared-key: semver
- uses: obi1kenobi/cargo-semver-checks-action@v2
with:
# `libp2p-datagram` is new and unpublished, so it has no crates.io
# baseline to diff against. Remove this once it is first released.
exclude: libp2p-datagram

rustfmt:
runs-on: ubuntu-latest
Expand Down
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ members = [
"protocols/gossipsub",
"protocols/identify",
"protocols/kad",
"protocols/datagram",
"protocols/mdns",
"protocols/perf",
"protocols/ping",
Expand Down Expand Up @@ -102,6 +103,7 @@ libp2p-relay = { version = "0.22.0", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.18.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.30.0", path = "protocols/request-response" }
libp2p-server = { version = "0.13.0", path = "misc/server" }
libp2p-datagram = { version = "0.1.0", path = "protocols/datagram" }
libp2p-stream = { version = "0.5.0-alpha", path = "protocols/stream" }
libp2p-swarm = { version = "0.48.0", path = "swarm" }
libp2p-swarm-derive = { version = "=0.36.0", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
Expand Down
6 changes: 6 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## 0.44.0

- Add unreliable datagram support to `StreamMuxer`: `send_datagram`, `max_datagram_size`, and `StreamMuxerEvent::Datagram`. Unsupported muxers return `SendDatagramError::Unsupported`.
See [PR 6489](https://github.com/libp2p/rust-libp2p/pull/6489).

- Add `StreamMuxer::substream_id` and `SubstreamBox::transport_stream_id`, surfacing transport stream ids (QUIC) for datagram flows. `SubstreamBox::new` now takes the id.
See [PR 6489](https://github.com/libp2p/rust-libp2p/pull/6489).

- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
bytes = { workspace = true }
either = "1.16"
fnv = "1.0"
futures = { workspace = true, features = ["executor", "thread-pool"] }
Expand Down
7 changes: 7 additions & 0 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ where
future::Either::Right(inner) => inner.poll(cx).map_err(Either::Right),
}
}

fn substream_id(substream: &Self::Substream) -> Option<u64> {
match substream {
future::Either::Left(s) => A::substream_id(s),
future::Either::Right(s) => B::substream_id(s),
}
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
Expand Down
48 changes: 48 additions & 0 deletions core/src/muxing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

use std::{future::Future, pin::Pin};

use bytes::Bytes;
use futures::{
AsyncRead, AsyncWrite,
task::{Context, Poll},
Expand Down Expand Up @@ -111,13 +112,51 @@ pub trait StreamMuxer {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>>;

/// Send an unreliable datagram. Inbound arrive via [`StreamMuxerEvent::Datagram`].
fn send_datagram(self: Pin<&mut Self>, data: Bytes) -> Result<(), SendDatagramError> {
let _ = data;
Err(SendDatagramError::Unsupported)
}

/// Largest [`StreamMuxer::send_datagram`] payload, or `None` if unsupported.
fn max_datagram_size(&self) -> Option<usize> {
None
}

/// Transport-assigned stream id, where one exists (QUIC, WebTransport).
///
/// Keys a datagram flow to its control stream, see [libp2p/specs#680].
///
/// [libp2p/specs#680]: https://github.com/libp2p/specs/pull/680
fn substream_id(substream: &Self::Substream) -> Option<u64>
where
Self: Sized,
{
let _ = substream;
None
}
}

/// An event produced by a [`StreamMuxer`].
#[derive(Debug)]
pub enum StreamMuxerEvent {
/// The address of the remote has changed.
AddressChange(Multiaddr),
/// An unreliable datagram was received from the remote.
Datagram(Bytes),
}

/// Error returned by [`StreamMuxer::send_datagram`].
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum SendDatagramError {
#[error("datagrams are not supported by this muxer")]
Unsupported,
#[error("datagram of {size} bytes exceeds the {max} byte limit")]
TooLarge { size: usize, max: usize },
#[error("connection closed")]
ConnectionClosed,
}

/// Extension trait for [`StreamMuxer`].
Expand Down Expand Up @@ -164,6 +203,15 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_close(cx)
}

/// Convenience function for calling [`StreamMuxer::send_datagram`]
/// for [`StreamMuxer`]s that are `Unpin`.
fn send_datagram_unpin(&mut self, data: Bytes) -> Result<(), SendDatagramError>
where
Self: Unpin,
{
Pin::new(self).send_datagram(data)
}

/// Returns a future for closing this [`StreamMuxer`].
fn close(self) -> Close<Self> {
Close(self)
Expand Down
38 changes: 32 additions & 6 deletions core/src/muxing/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use std::{
task::{Context, Poll},
};

use bytes::Bytes;
use futures::{AsyncRead, AsyncWrite};
use pin_project::pin_project;

use crate::muxing::{StreamMuxer, StreamMuxerEvent};
use crate::muxing::{SendDatagramError, StreamMuxer, StreamMuxerEvent};

/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
Expand All @@ -26,7 +27,7 @@ impl fmt::Debug for StreamMuxerBox {
///
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead`
/// and `AsyncWrite` capabilities.
pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);
pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>, Option<u64>);

#[pin_project]
struct Wrap<T>
Expand All @@ -53,7 +54,7 @@ where
self.project()
.inner
.poll_inbound(cx)
.map_ok(SubstreamBox::new)
.map_ok(|s| SubstreamBox::new(T::substream_id(&s), s))
.map_err(into_io_error)
}

Expand All @@ -64,7 +65,7 @@ where
self.project()
.inner
.poll_outbound(cx)
.map_ok(SubstreamBox::new)
.map_ok(|s| SubstreamBox::new(T::substream_id(&s), s))
.map_err(into_io_error)
}

Expand All @@ -79,6 +80,14 @@ where
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().inner.poll(cx).map_err(into_io_error)
}

fn send_datagram(self: Pin<&mut Self>, data: Bytes) -> Result<(), SendDatagramError> {
self.project().inner.send_datagram(data)
}

fn max_datagram_size(&self) -> Option<usize> {
self.inner.max_datagram_size()
}
}

fn into_io_error<E>(err: E) -> io::Error
Expand Down Expand Up @@ -139,13 +148,30 @@ impl StreamMuxer for StreamMuxerBox {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().poll(cx)
}

fn send_datagram(self: Pin<&mut Self>, data: Bytes) -> Result<(), SendDatagramError> {
self.project().send_datagram(data)
}

fn max_datagram_size(&self) -> Option<usize> {
self.inner.as_ref().get_ref().max_datagram_size()
}

fn substream_id(substream: &Self::Substream) -> Option<u64> {
substream.transport_stream_id()
}
}

impl SubstreamBox {
/// Construct a new [`SubstreamBox`] from something
/// that implements [`AsyncRead`] and [`AsyncWrite`].
pub fn new<S: AsyncRead + AsyncWrite + Send + 'static>(stream: S) -> Self {
Self(Box::pin(stream))
pub fn new<S: AsyncRead + AsyncWrite + Send + 'static>(id: Option<u64>, stream: S) -> Self {
Self(Box::pin(stream), id)
}

/// Transport-assigned stream id, see [`StreamMuxer::substream_id`].
pub fn transport_stream_id(&self) -> Option<u64> {
self.1
}
}

Expand Down
3 changes: 3 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.18.0

- Forward `StreamMuxer::substream_id` through the bandwidth-metering muxer.
See [PR 6489](https://github.com/libp2p/rust-libp2p/pull/6489).

- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

Expand Down
4 changes: 4 additions & 0 deletions misc/metrics/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ where
let this = self.project();
this.inner.poll_close(cx)
}

fn substream_id(substream: &Self::Substream) -> Option<u64> {
SMInner::substream_id(&substream.inner)
}
}

/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it.
Expand Down
3 changes: 3 additions & 0 deletions misc/multistream-select/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.14.0

- Add `Negotiated::as_inner` to access the underlying stream after negotiation.
See [PR 6489](https://github.com/libp2p/rust-libp2p/pull/6489).

- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

Expand Down
8 changes: 8 additions & 0 deletions misc/multistream-select/src/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ impl<TInner> Negotiated<TInner> {
}
}

/// Reference to the underlying I/O stream, once negotiation has completed.
pub fn as_inner(&self) -> Option<&TInner> {
match &self.state {
State::Completed { io } => Some(io),
_ => None,
}
}

/// Polls the `Negotiated` for completion.
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), NegotiationError>>
where
Expand Down
9 changes: 9 additions & 0 deletions protocols/datagram/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## 0.1.0

- Initial release: a `Behaviour` and `Control` for sending and receiving
unreliable datagrams over libp2p connections (QUIC only), implementing the
`/dg/1` control stream and framing from [libp2p/specs#680]. `Behaviour::new`
takes the application protocol the datagrams belong to.
See [PR 6489](https://github.com/libp2p/rust-libp2p/pull/6489).

[libp2p/specs#680]: https://github.com/libp2p/specs/pull/680
29 changes: 29 additions & 0 deletions protocols/datagram/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "libp2p-datagram"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true
description = "Unreliable datagram protocol for libp2p"
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
bytes = { workspace = true }
futures = { workspace = true }
libp2p-core = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
libp2p-swarm = { workspace = true }
thiserror = { workspace = true }
unsigned-varint = { workspace = true, features = ["futures"] }

[dev-dependencies]
libp2p-identity = { workspace = true, features = ["ed25519", "rand"] }
libp2p-quic = { workspace = true, features = ["tokio"] }
libp2p-swarm = { workspace = true, features = ["tokio"] }
tokio = { workspace = true, features = ["full"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[lints]
workspace = true
27 changes: 27 additions & 0 deletions protocols/datagram/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# libp2p-datagram

Unreliable datagrams over libp2p connections, per [libp2p/specs#680].

Datagrams ride a QUIC `/dg/1` control stream that binds them to one application
protocol. They may be dropped, reordered, or duplicated and carry no flow
control; the caller owns reliability. Only QUIC carries them today; sends on
other transports fail with `SendError`.

```rust,no_run
use libp2p_datagram as datagram;
use libp2p_swarm::StreamProtocol;

let mut behaviour = datagram::Behaviour::new(StreamProtocol::new("/my/app/1.0.0"));
let mut control = behaviour.new_control();
let mut incoming = behaviour.incoming_datagrams().unwrap();

// add `behaviour` to your Swarm, then:
// control.send_datagram(peer, bytes)?;
// while let Some((from, bytes)) = incoming.next().await { ... }
```

## License

Licensed under MIT.

[libp2p/specs#680]: https://github.com/libp2p/specs/pull/680
Loading