Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,6 @@ pub use peer_manager::{
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
};
// pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
pub use service::api_types::{PeerRequestId, Response};
pub use service::api_types::Response;
pub use service::utils::*;
pub use service::{Gossipsub, NetworkEvent};
74 changes: 38 additions & 36 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use super::methods::{GoodbyeReason, RpcErrorResponse, RpcResponse};
use super::outbound::OutboundRequestContainer;
use super::protocol::{InboundOutput, Protocol, RPCError, RPCProtocol, RequestType};
use super::RequestId;
use super::{RPCReceived, RPCSend, ReqId, Request};
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::OutboundFramed;
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
Expand Down Expand Up @@ -91,6 +90,11 @@ pub struct RPCHandler<Id, E>
where
E: EthSpec,
{
/// The PeerId matching this `ConnectionHandler`.
peer_id: PeerId,

/// The ConnectionId matching this `ConnectionHandler`.
connection_id: ConnectionId,
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,

Expand Down Expand Up @@ -139,9 +143,6 @@ where

/// Timeout that will me used for inbound and outbound responses.
resp_timeout: Duration,

/// Information about this handler for logging purposes.
log_info: (PeerId, ConnectionId),
}

enum HandlerState {
Expand Down Expand Up @@ -228,6 +229,8 @@ where
connection_id: ConnectionId,
) -> Self {
RPCHandler {
connection_id,
peer_id,
listen_protocol,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
Expand All @@ -244,7 +247,6 @@ where
fork_context,
waker: None,
resp_timeout,
log_info: (peer_id, connection_id),
}
}

Expand All @@ -255,8 +257,8 @@ where
if !self.dial_queue.is_empty() {
debug!(
unsent_queued_requests = self.dial_queue.len(),
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
"Starting handler shutdown"
);
}
Expand Down Expand Up @@ -306,8 +308,8 @@ where
if !matches!(response, RpcResponse::StreamTermination(..)) {
// the stream is closed after sending the expected number of responses
trace!(%response, id = ?inbound_id,
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
"Inbound stream has expired. Response not sent");
}
return;
Expand All @@ -324,8 +326,8 @@ where
if matches!(self.state, HandlerState::Deactivated) {
// we no longer send responses after the handler is deactivated
debug!(%response, id = ?inbound_id,
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
"Response not sent. Deactivated handler");
return;
}
Expand Down Expand Up @@ -394,8 +396,8 @@ where
Poll::Ready(_) => {
self.state = HandlerState::Deactivated;
debug!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
"Shutdown timeout elapsed, Handler deactivated"
);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Expand Down Expand Up @@ -445,8 +447,8 @@ where
)));
} else {
crit!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
stream_id = ?outbound_id.get_ref(), "timed out substream not in the books");
}
}
Expand Down Expand Up @@ -577,8 +579,8 @@ where
// Its useful to log when the request was completed.
if matches!(info.protocol, Protocol::BlocksByRange) {
debug!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
duration = Instant::now()
.duration_since(info.request_start_time)
.as_secs(),
Expand All @@ -587,8 +589,8 @@ where
}
if matches!(info.protocol, Protocol::BlobsByRange) {
debug!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
duration = Instant::now()
.duration_since(info.request_start_time)
.as_secs(),
Expand Down Expand Up @@ -617,16 +619,16 @@ where

if matches!(info.protocol, Protocol::BlocksByRange) {
debug!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
duration = info.request_start_time.elapsed().as_secs(),
"BlocksByRange Response failed"
);
}
if matches!(info.protocol, Protocol::BlobsByRange) {
debug!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
duration = info.request_start_time.elapsed().as_secs(),
"BlobsByRange Response failed"
);
Expand Down Expand Up @@ -816,8 +818,8 @@ where
}
OutboundSubstreamState::Poisoned => {
crit!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
"Poisoned outbound substream"
);
unreachable!("Coding Error: Outbound substream is poisoned")
Expand Down Expand Up @@ -852,8 +854,8 @@ where
&& self.dial_negotiated == 0
{
debug!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,
peer_id = %self.peer_id,
connection_id = %self.connection_id,
"Goodbye sent, Handler deactivated"
);
self.state = HandlerState::Deactivated;
Expand Down Expand Up @@ -986,12 +988,13 @@ where
self.shutdown(None);
}

self.events_out
.push(HandlerEvent::Ok(RPCReceived::Request(Request {
id: RequestId::next(),
self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
super::InboundRequestId {
connection_id: self.connection_id,
substream_id: self.current_inbound_substream_id,
r#type: req,
})));
},
req,
)));
self.current_inbound_substream_id.0 += 1;
}

Expand Down Expand Up @@ -1049,9 +1052,8 @@ where
.is_some()
{
crit!(
peer_id = %self.log_info.0,
connection_id = %self.log_info.1,

peer_id = %self.peer_id,
connection_id = %self.connection_id,
id = ?self.current_outbound_substream_id, "Duplicate outbound substream id");
}
self.current_outbound_substream_id.0 += 1;
Expand Down
Loading