Skip to content

transports/tcp: use futures::stream::SelectAll for driving listener streams #2781

Description

@elenaf9

Description

With #2652 polling the listeners of a transport became the transport's responsibility.
We currently do this in TcpTransport::poll analogous to how it was done before in the swarm ListenerStream: by iterating through the list of TcpListenStreams and sequentially polling each listener for an event, returning the first event from a listener.
Instead of doing this manual iteration, we should use futures::stream::SelectAll for polling all TcpListenStreams. This is already used in e.g. the relay ClientTransport and the wasm_ext::ExtTransport.

The following changes would be required:

  • Remove TcpListenerEvent, instead <TcpListenStream as Stream>::Item should directly return a TransportEvent.
  • When a listener was closed, <TcpListenStream as Stream>::poll_next should return Poll::Ready(None) for this listener so that the SelectAll combinator drops the stream
  • Manage the TcpListenStreams in a SelectAll, poll the SelectAll stream in TcpTransport::poll.

Motivation

SelectAll is much more efficient when driving multiple streams since it only polls its inner streams when they generate notifications.
Apart from that, it reduces the complexity as we only have to poll the SelectAll stream rather than then manual iteration and polling.

Current Implementation

The TcpListenStreams are pinned in a VecDeque. When the task is woken and TcpTransport::poll called, we iterate through this list by popping the first element, polling it for new events, pushing the listener back onto the list. If the listener returned an TcpListenerEvent we return and TransportEvent for it, else the next listener is polled:

/// Poll all listeners.
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
// Return pending events from closed listeners.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
// We remove each element from `listeners` one by one and add them back.
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
match TryStream::try_poll_next(listener.as_mut(), cx) {
Poll::Pending => {
self.listeners.push_front(listener);
remaining -= 1;
if remaining == 0 {
break;
}
}
Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade {
upgrade,
local_addr,
remote_addr,
}))) => {
let id = listener.listener_id;
self.listeners.push_front(listener);
return Poll::Ready(TransportEvent::Incoming {
listener_id: id,
upgrade,
local_addr,
send_back_addr: remote_addr,
});
}
Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(a)))) => {
let id = listener.listener_id;
self.listeners.push_front(listener);
return Poll::Ready(TransportEvent::NewAddress {
listener_id: id,
listen_addr: a,
});
}
Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(a)))) => {
let id = listener.listener_id;
self.listeners.push_front(listener);
return Poll::Ready(TransportEvent::AddressExpired {
listener_id: id,
listen_addr: a,
});
}
Poll::Ready(Some(Ok(TcpListenerEvent::Error(error)))) => {
let id = listener.listener_id;
self.listeners.push_front(listener);
return Poll::Ready(TransportEvent::ListenerError {
listener_id: id,
error,
});
}
Poll::Ready(None) => {
return Poll::Ready(TransportEvent::ListenerClosed {
listener_id: listener.listener_id,
reason: Ok(()),
});
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(TransportEvent::ListenerClosed {
listener_id: listener.listener_id,
reason: Err(err),
});
}
}
}
Poll::Pending
}

Are you planning to do it yourself in a pull request?

No (or at least not in the near future).

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions