Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .claude/rules/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ Driver entry points:

The shared retry-loop driver lives in `op_ctx.rs` (`RetryDriver` trait
+ `drive_retry_loop`). UPDATE is fire-and-forget (no retry loop, no
upstream reply); GET/PUT/SUBSCRIBE share the retry driver.
upstream reply); GET/PUT/SUBSCRIBE share the retry driver. GET's
client and sub-op drivers enter the loop via
`get/op_ctx_task.rs::drive_get_with_assembly_retry`, which treats a
post-terminal stream-assembly failure as a retryable attempt failure
(#4345) — post-terminal side effects that can fail retryably belong in
such a wrapper, never inside the loop's Terminal arm (pinned by
`drive_retry_loop_terminal_arm_does_not_call_advance`).

## Wire-variant dispatch

Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ pub mod dev_tool {
pub use crate::operations::get::op_ctx_task::RELAY_DRIVER_CALL_COUNT as GET_RELAY_DRIVER_CALL_COUNT;
#[cfg(any(test, feature = "testing"))]
pub use crate::operations::get::op_ctx_task::RELAY_GET_STREAMING_FORWARD_COUNT;
// Deterministic stream-assembly fault injection + retry counter for
// the GET driver's assembly-retry path (#4345).
#[cfg(any(test, feature = "testing"))]
pub use crate::operations::get::op_ctx_task::assembly_fault_injection as get_assembly_fault_injection;
#[cfg(any(test, feature = "testing"))]
pub use crate::operations::put::op_ctx_task::RELAY_PUT_DRIVER_CALL_COUNT;
#[cfg(any(test, feature = "testing"))]
Expand Down
25 changes: 23 additions & 2 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,19 @@ pub struct NodeConfig {
}

impl NodeConfig {
/// This node's own peer id as a telemetry attribution string
/// (public key + best-effort address). The address portion falls
/// back to the listener address for non-gateway nodes until
/// external-address discovery — a refresh path is tracked in
/// #4294. Shared by the telemetry reporter and the shadow-RTT /
/// reference-ping emitters so the two constructions can't drift.
pub(crate) fn local_peer_id_string(&self) -> String {
let addr = self.own_addr.unwrap_or_else(|| {
std::net::SocketAddr::new(self.network_listener_ip, self.network_listener_port)
});
PeerId::new(self.key_pair.public().clone(), addr).to_string()
}

pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
tracing::info!("Loading node configuration for mode {}", config.mode);

Expand Down Expand Up @@ -566,8 +579,16 @@ impl NodeConfig {
registers.push(Box::new(OTEventRegister::new()));
}

// Add telemetry reporter if enabled in config
if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry) {
// Add telemetry reporter if enabled in config. The local
// peer id (public key + best-effort address, same
// construction as the shadow-RTT events in `p2p_impl.rs`)
// attributes transport-level events — transfer_failed,
// transport_snapshot, timeout — which otherwise carry an
// empty peer_id and cannot be correlated to a sender in
// the collector (#4345 observability gap).
if let Some(telemetry) =
TelemetryReporter::new(&self.config.telemetry, self.local_peer_id_string())
{
registers.push(Box::new(telemetry));
}

Expand Down
8 changes: 2 additions & 6 deletions crates/core/src/node/p2p_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
use std::{convert::Infallible, sync::Arc, time::Duration};

use futures::{FutureExt, future::BoxFuture};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -418,11 +418,7 @@ impl NodeP2P {
let reference_ping_enabled = config.config.telemetry.enabled
&& !config.config.telemetry.is_test_environment
&& config.config.telemetry.reference_ping_enabled;
let listen_addr = config.own_addr.unwrap_or_else(|| {
SocketAddr::new(config.network_listener_ip, config.network_listener_port)
});
let local_peer_id =
crate::node::PeerId::new(config.key_pair.public().clone(), listen_addr).to_string();
let local_peer_id = config.local_peer_id_string();
crate::transport::rolling_rtt_stats::spawn_aggregator(
local_peer_id.clone(),
&background_task_monitor,
Expand Down
Loading
Loading