From 264a36a5aec60aabe50907e6f1f9be301ec1b7bf Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 7 Jun 2026 20:31:08 -0500 Subject: [PATCH 1/4] fix(get): retry against next candidate when stream assembly fails (#4345) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The GET driver classified the ResponseStreaming *header* as terminal success and exited its retry loop; if the stream assembly then failed (fragments lost on a lossy path, the sender aborting on cwnd-wait timeout, a relay's pipe dying after the header was forwarded), the driver only logged a WARN and synthesized a generic client error — one transport hiccup burned the whole GET with the MAX_RETRIES budget untouched. The relay driver already treats header-without-stream as a retryable routing failure; this gives the originator (and the sub-op driver) the same semantics. - New drive_get_with_assembly_retry wrapper: on assembly failure, penalize the failed candidate (RouteOutcome::Failure), advance to the next candidate (shared retry budget), and re-enter the loop with a fresh attempt tx (reusing the old tx would collide with the relay dedup gates). Exhaustion keeps the Done(Streaming) outcome so the client sees a diagnostic OperationError, never a false NotFound. - The synthesized client error now carries the assembly failure cause instead of the generic store-lookup message. - Deterministic per-contract-key fault injection (get_assembly_fault_injection) + e2e regression test that fails without the fix and passes with it, plus source-scrape pin tests for the new invariants. Part of #4345 (op-layer resilience; the underlying transport flightsize/loss-pause interaction is tracked separately in the issue). [AI-assisted - Claude] Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/core/src/lib.rs | 4 + crates/core/src/operations/get/op_ctx_task.rs | 537 ++++++++++++++---- crates/core/tests/streaming_e2e.rs | 219 +++++++ 3 files changed, 644 insertions(+), 116 deletions(-) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 0ac1e88b58..33a7b664b0 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -109,6 +109,10 @@ pub mod dev_tool { pub use crate::operations::get::op_ctx_task::RELAY_DRIVER_CALL_COUNT as GET_RELAY_DRIVER_CALL_COUNT; #[cfg(any(test, feature = "testing"))] pub use crate::operations::get::op_ctx_task::RELAY_GET_STREAMING_FORWARD_COUNT; + // Deterministic stream-assembly fault injection + retry counter for + // the GET driver's assembly-retry path (#4345). + #[cfg(any(test, feature = "testing"))] + pub use crate::operations::get::op_ctx_task::assembly_fault_injection as get_assembly_fault_injection; #[cfg(any(test, feature = "testing"))] pub use crate::operations::put::op_ctx_task::RELAY_PUT_DRIVER_CALL_COUNT; #[cfg(any(test, feature = "testing"))] diff --git a/crates/core/src/operations/get/op_ctx_task.rs b/crates/core/src/operations/get/op_ctx_task.rs index cb12c6432e..648269882b 100644 --- a/crates/core/src/operations/get/op_ctx_task.rs +++ b/crates/core/src/operations/get/op_ctx_task.rs @@ -316,7 +316,14 @@ async fn drive_client_get_inner( response_received_at: None, }; - let loop_result = drive_retry_loop(op_manager, client_tx, "get", &mut driver).await; + let (loop_result, streaming_assembly) = drive_get_with_assembly_retry( + op_manager, + client_tx, + "get", + &mut driver, + /* emit_route_failure_on_retry */ true, + ) + .await; // `op_manager.completed(client_tx)` runs from the // `ClientGetCompletionGuard` in `run_client_get` AFTER this fn @@ -388,10 +395,7 @@ async fn drive_client_get_inner( *key } Terminal::Streaming { - key, - stream_id, - includes_contract, - total_size, + key, total_size, .. } => { // `total_size` is the wire-authoritative payload byte // count from the `ResponseStreaming` header. Using it @@ -403,53 +407,15 @@ async fn drive_client_get_inner( // a separate "store evicted before re-query" // failure mode (the LRU has finite capacity). payload_size = *total_size as usize; - // Assemble the stream and cache locally. Mirrors - // the legacy `process_message` streaming branch - // at `get.rs:2721-3196`. Uses `current_target` - // as the sender address — accurate for the - // single-hop response case where the responder - // equals the selected target. - if let Some(peer_addr) = driver.current_target.socket_addr() { - // `transfer_duration` covers stream claim + - // assemble + deserialize + local cache write — - // the same composite measurement the legacy - // `GetStats::record_transfer_end` captured at - // `get.rs:1542`. For small payloads near the - // streaming threshold the local-work component - // is non-negligible, but keeping the metric - // composite matches the existing router prior - // built on the legacy code path. - let stream_start = tokio::time::Instant::now(); - if let Err(e) = assemble_and_cache_stream( - op_manager, - peer_addr, - *stream_id, - *key, - *includes_contract, - ) - .await - { - tracing::warn!( - %key, - error = %e, - "get: stream assembly failed — \ - state will not be cached locally" - ); - // Assembly failed → don't emit a transfer-rate - // observation (transfer_duration stays ZERO, - // which makes the router skip the rate sample - // at router.rs:443). The response-time - // observation still goes through because the - // header DID arrive. - } else { - transfer_duration = stream_start.elapsed(); - } - } else { - tracing::warn!( - %key, - "get: current_target has no socket_addr; \ - cannot claim orphan stream" - ); + // Stream assembly (with per-candidate retry, #4345) + // already ran inside `drive_get_with_assembly_retry` + // — consume its timing here. `None` means every + // assembly attempt failed: leave `transfer_duration` + // at ZERO so the router skips the rate sample at + // router.rs:443 (the response-time observation still + // goes through because a header DID arrive). + if let Some(duration) = streaming_assembly.transfer_duration { + transfer_duration = duration; } *key } @@ -466,8 +432,13 @@ async fn drive_client_get_inner( } }; - let host_result = - build_host_response(op_manager, &instance_id, return_contract_code).await; + let host_result = build_host_response( + op_manager, + &instance_id, + return_contract_code, + streaming_assembly.error.as_deref(), + ) + .await; // Auto-subscribe on successful GET at the originator — // mirrors the legacy branches at get.rs:2313/2408/3136/3185. @@ -833,6 +804,246 @@ impl RetryDriver for GetRetryDriver<'_> { } } +// --- Assembly-retry wrapper (#4345) --- + +/// Outcome metadata for the streaming-assembly step of a GET. +struct AssemblyOutcome { + /// Wall-clock duration of the successful claim + assemble + + /// deserialize + local cache write, fed to the router's + /// transfer-rate estimator. `None` when the terminal was not + /// `Streaming` or when every assembly attempt failed. The + /// composite measurement matches what the legacy + /// `GetStats::record_transfer_end` captured at `get.rs:1542`. + transfer_duration: Option, + /// Last assembly failure cause when assembly never succeeded. + /// Threaded into the synthesized client error so the failure is + /// diagnosable instead of the generic store-lookup message. + error: Option, +} + +/// Drive the shared GET retry loop, treating stream-assembly failure +/// as a retryable attempt failure (#4345). +/// +/// The shared loop classifies the `ResponseStreaming` *header* as +/// terminal (`op_ctx.rs` pins the Terminal arm to return +/// synchronously), so stream assembly necessarily runs after the loop +/// returns. Before this wrapper existed, an assembly failure — +/// fragments lost on a lossy path, the sender aborting on cwnd-wait +/// timeout, a relay's pipe dying after the header was forwarded — +/// burned the whole GET: the driver logged a WARN, fell through to +/// the store re-query, and synthesized a client error with the +/// `MAX_RETRIES` budget untouched. One transport hiccup failed the op +/// even though other candidates could serve it. The relay driver +/// already treats header-without-stream as a retryable routing +/// failure (`drive_relay_get_inner`'s claim-failure `continue`); this +/// gives the originator the same semantics. +/// +/// On assembly failure the wrapper advances the driver to the next +/// candidate (consuming the shared retry budget) and re-enters the +/// loop with a fresh attempt transaction — reusing the previous tx +/// would collide with the relay dedup gates (`active_relay_get_txs`) +/// while the failed attempt's relay chain is still draining. +/// +/// When the budget is exhausted, the original `Done(Streaming)` +/// outcome is returned (NOT `Exhausted`): a streaming header proves +/// the contract exists, so the client must see an operation error +/// rather than a false `NotFound`. +async fn drive_get_with_assembly_retry( + op_manager: &OpManager, + first_tx: Transaction, + op_label: &str, + driver: &mut GetRetryDriver<'_>, + emit_route_failure_on_retry: bool, +) -> (RetryLoopOutcome, AssemblyOutcome) { + let mut attempt_tx = first_tx; + let mut assembly = AssemblyOutcome { + transfer_duration: None, + error: None, + }; + + let outcome = loop { + let result = drive_retry_loop(op_manager, attempt_tx, op_label, driver).await; + + // Only a streaming terminal has a post-loop assembly step; + // everything else passes through unchanged. + let (key, stream_id, includes_contract, total_size) = match result { + RetryLoopOutcome::Done(Terminal::Streaming { + key, + stream_id, + includes_contract, + total_size, + }) => (key, stream_id, includes_contract, total_size), + other @ (RetryLoopOutcome::Done(_) + | RetryLoopOutcome::Exhausted(_) + | RetryLoopOutcome::Unexpected + | RetryLoopOutcome::InfraError(_)) => break other, + }; + + // Uses `current_target` as the sender address — accurate for + // the single-hop response case where the responder equals the + // selected target; relays pipe the stream hop-by-hop so the + // fragments arrive from the adjacent hop either way. + let Some(peer_addr) = driver.current_target.socket_addr() else { + tracing::warn!( + %key, + "get: current_target has no socket_addr; \ + cannot claim orphan stream" + ); + assembly.error = Some( + "selected target has no socket address; \ + cannot claim the response stream" + .to_string(), + ); + break RetryLoopOutcome::Done(Terminal::Streaming { + key, + stream_id, + includes_contract, + total_size, + }); + }; + + let stream_start = tokio::time::Instant::now(); + match assemble_and_cache_stream(op_manager, peer_addr, stream_id, key, includes_contract) + .await + { + Ok(()) => { + assembly.transfer_duration = Some(stream_start.elapsed()); + assembly.error = None; + break RetryLoopOutcome::Done(Terminal::Streaming { + key, + stream_id, + includes_contract, + total_size, + }); + } + Err(e) => { + // Capture the failing peer BEFORE advance() replaces + // `current_target` — the routing penalty must land on + // the candidate whose header never became a stream. + let failed_target = driver.current_target.clone(); + match driver.advance() { + AdvanceOutcome::Next => { + tracing::warn!( + %key, + error = %e, + retries = driver.retries, + "get: stream assembly failed; \ + retrying against next candidate (#4345)" + ); + // Penalize the failed candidate so the router + // learns (mirrors the relay driver's + // claim-failure handling). Only on the + // advancing path: when the budget is + // exhausted, the caller's final route event + // (driven by the failed host_result) already + // records the Failure for the last target — + // emitting here too would double-count it. + if emit_route_failure_on_retry { + emit_get_route_failure(op_manager, attempt_tx, &failed_target, &key) + .await; + } + #[cfg(any(test, feature = "testing"))] + assembly_fault_injection::ASSEMBLY_RETRY_COUNT + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + assembly.error = Some(e); + attempt_tx = driver.new_attempt_tx(); + continue; + } + AdvanceOutcome::Exhausted => { + tracing::warn!( + %key, + error = %e, + "get: stream assembly failed and retry budget \ + exhausted — state will not be cached locally" + ); + assembly.error = Some(e); + break RetryLoopOutcome::Done(Terminal::Streaming { + key, + stream_id, + includes_contract, + total_size, + }); + } + } + } + } + }; + + (outcome, assembly) +} + +/// Emit a routing-failure observation for a candidate whose streaming +/// header never became a completed stream. Mirrors the route-event +/// emission in `drive_client_get_inner`'s final outcome block. +async fn emit_get_route_failure( + op_manager: &OpManager, + tx: Transaction, + peer: &PeerKeyLocation, + key: &ContractKey, +) { + let route_event = RouteEvent { + peer: peer.clone(), + contract_location: Location::from(key), + outcome: RouteOutcome::Failure, + op_type: Some(crate::node::network_status::OpType::Get), + }; + if let Some(log_event) = + crate::tracing::NetEventLog::route_event(&tx, &op_manager.ring, &route_event) + { + op_manager + .ring + .register_events(either::Either::Left(log_event)) + .await; + } + op_manager.ring.routing_finished(route_event); +} + +/// Test-only fault injection for `assemble_and_cache_stream` (#4345). +/// +/// Keyed by `ContractKey` so concurrently-running tests in the same +/// process cannot consume each other's injection budget. A test arms +/// `n` failures for its own (unique) contract key; the first `n` +/// assembly attempts for that key fail with a synthetic error, +/// exercising the driver's assembly-retry path deterministically. +#[cfg(any(test, feature = "testing"))] +pub mod assembly_fault_injection { + use freenet_stdlib::prelude::ContractKey; + use std::collections::HashMap; + use std::sync::atomic::AtomicUsize; + use std::sync::{Mutex, OnceLock}; + + /// Lifetime count of assembly-failure retries actually taken by + /// GET drivers (incremented when the driver advances to the next + /// candidate after an assembly failure). Tests snapshot + /// before/after to prove the retry path genuinely fired. + pub static ASSEMBLY_RETRY_COUNT: AtomicUsize = AtomicUsize::new(0); + + fn budgets() -> &'static Mutex> { + static BUDGETS: OnceLock>> = OnceLock::new(); + BUDGETS.get_or_init(|| Mutex::new(HashMap::new())) + } + + /// Arm `n` injected assembly failures for `key`. + pub fn inject_failures(key: ContractKey, n: usize) { + budgets().lock().expect("poisoned").insert(key, n); + } + + /// Consume one injected failure for `key`, if armed. + pub(crate) fn consume(key: &ContractKey) -> bool { + let mut map = budgets().lock().expect("poisoned"); + match map.get_mut(key) { + Some(n) if *n > 0 => { + *n -= 1; + if *n == 0 { + map.remove(key); + } + true + } + _ => false, + } + } +} + // --- Host-response construction --- /// Query the local contract store for `(state, contract)` and package @@ -842,10 +1053,17 @@ impl RetryDriver for GetRetryDriver<'_> { /// on the happy path — `process_message` stores before the terminal /// reply fires), synthesize an operation error matching the shape /// `to_host_result` produces on NotFound. +/// +/// `assembly_error` carries the last stream-assembly failure cause when +/// the streaming path exhausted its retry budget without ever caching +/// the state (#4345). It makes the synthesized client error diagnostic +/// — "stream assembly failed: …" — instead of the generic store-lookup +/// message that conflates assembly failures with eviction races. async fn build_host_response( op_manager: &OpManager, instance_id: &ContractInstanceId, return_contract_code: bool, + assembly_error: Option<&str>, ) -> HostResult { let lookup = op_manager .notify_contract_handler(ContractHandlerEvent::GetQuery { @@ -876,16 +1094,23 @@ async fn build_host_response( )) } _ => { + let cause = match assembly_error { + Some(e) => format!( + "GET response stream assembly failed for {instance_id} \ + after exhausting retries: {e}" + ), + None => { + format!("GET succeeded on wire but local store lookup failed for {instance_id}") + } + }; tracing::warn!( contract = %instance_id, + %cause, "get: terminal reply classified success but local \ store lookup returned no state; synthesizing client error" ); Err(ErrorKind::OperationError { - cause: format!( - "GET succeeded on wire but local store lookup failed for {instance_id}" - ) - .into(), + cause: cause.into(), } .into()) } @@ -1091,6 +1316,14 @@ async fn assemble_and_cache_stream( expected_key: ContractKey, includes_contract: bool, ) -> Result<(), String> { + // Test-only deterministic fault injection (#4345). Returning before + // the claim mirrors the production failure (the inbound stream is + // left orphaned for GC), so the retry path is exercised end-to-end. + #[cfg(any(test, feature = "testing"))] + if assembly_fault_injection::consume(&expected_key) { + return Err("injected stream assembly failure (assembly_fault_injection test hook)".into()); + } + let handle = match op_manager .orphan_stream_registry() .claim_or_wait(peer_addr, stream_id, STREAM_CLAIM_TIMEOUT) @@ -1608,7 +1841,17 @@ async fn drive_sub_op_get( response_received_at: None, }; - let loop_result = drive_retry_loop(op_manager, tx, "get-subop", &mut driver).await; + let (loop_result, streaming_assembly) = drive_get_with_assembly_retry( + op_manager, + tx, + "get-subop", + &mut driver, + // Sub-op GETs don't feed the router (no RouteEvent is emitted + // on the sub-op path), so skip the per-retry failure events. + /* emit_route_failure_on_retry */ + false, + ) + .await; // `op_manager.completed(tx)` runs from the // `SubOpGetCompletionGuard` in `run_sub_op_get` AFTER this fn @@ -1632,43 +1875,11 @@ async fn drive_sub_op_get( ) .await; } - Terminal::Streaming { - key, - stream_id, - includes_contract, - // Sub-op GETs don't feed the router (no RouteEvent - // is emitted on the sub-op path), so payload size - // attribution is irrelevant here. - total_size: _, - } => { - if let Some(peer_addr) = driver.current_target.socket_addr() { - if let Err(e) = assemble_and_cache_stream( - op_manager, - peer_addr, - *stream_id, - *key, - *includes_contract, - ) - .await - { - tracing::warn!( - %key, - error = %e, - "get (driver sub-op): stream assembly failed" - ); - } - } else { - // Mirrors the client driver's branch at op_ctx_task.rs:341-347: - // without a socket_addr we cannot claim the orphan stream, so - // the local-store re-query below will return NotFound. Surface - // the breadcrumb so operators can correlate to the failure. - tracing::warn!( - %key, - "get (driver sub-op): current_target has no socket_addr; \ - cannot claim orphan stream" - ); - } - } + // Stream assembly (with per-candidate retry, #4345) + // already ran inside `drive_get_with_assembly_retry`; + // on persistent failure the store re-query below + // returns NotFound with the assembly cause attached. + Terminal::Streaming { .. } => {} Terminal::LocalCompletion => {} } @@ -1696,7 +1907,10 @@ async fn drive_sub_op_get( client_contract, )) } - _ => SubOpGetOutcome::NotFound(missing_state_cause(&instance_id)), + _ => SubOpGetOutcome::NotFound(match &streaming_assembly.error { + Some(e) => format!("{}: {e}", missing_state_cause(&instance_id)), + None => missing_state_cause(&instance_id), + }), } } RetryLoopOutcome::Exhausted(cause) => SubOpGetOutcome::NotFound(cause), @@ -3843,29 +4057,120 @@ mod tests { #[test] fn streaming_terminal_calls_assemble_and_cache_stream() { let src = production_source(); - let body = extract_fn_body(src, "async fn drive_client_get_inner("); - // Find the `Terminal::Streaming` arm of the Done match. - let arm = body - .find("Terminal::Streaming {") - .expect("Done arm must handle Terminal::Streaming"); - // The matching arm must call `assemble_and_cache_stream`. - let tail = &body[arm..]; - // Bound the search to this arm by clipping at the next - // `Terminal::` match arm. - let arm_end = tail[1..] - .find("Terminal::") - .map(|p| p + 1) - .unwrap_or(tail.len()); - let arm_body = &tail[..arm_end]; + // Since #4345, assembly runs inside `drive_get_with_assembly_retry` + // (so a failed assembly can advance to the next candidate and + // re-enter the retry loop). Both drivers must route through the + // wrapper, and the wrapper must perform the assemble+cache. + let client_body = extract_fn_body(src, "async fn drive_client_get_inner("); assert!( - arm_body.contains("assemble_and_cache_stream"), - "Terminal::Streaming arm of drive_client_get_inner must call \ - `assemble_and_cache_stream`. Without this, cold-cache streaming \ - GETs return OperationError because nothing writes the local \ - store. See bug #1 in PR #3884 review." + client_body.contains("drive_get_with_assembly_retry("), + "drive_client_get_inner must drive the loop via \ + `drive_get_with_assembly_retry`. Without it, cold-cache \ + streaming GETs return OperationError because nothing writes \ + the local store (bug #1 in PR #3884 review), and assembly \ + failures burn the whole GET without consuming the retry \ + budget (#4345)." + ); + let sub_op_body = extract_fn_body(src, "async fn drive_sub_op_get("); + assert!( + sub_op_body.contains("drive_get_with_assembly_retry("), + "drive_sub_op_get must drive the loop via \ + `drive_get_with_assembly_retry` — the sub-op path has the \ + same assembly-failure-burns-the-op gap as the client path \ + (#4345)." + ); + let wrapper_body = extract_fn_body(src, "async fn drive_get_with_assembly_retry("); + assert!( + wrapper_body.contains("assemble_and_cache_stream("), + "drive_get_with_assembly_retry must call \ + `assemble_and_cache_stream` on the Streaming terminal." ); } + /// #4345: a failed stream assembly must be a *retryable* attempt + /// failure — advance to the next candidate, fresh attempt tx — and + /// exhaustion must still surface as `Done` (OperationError), never + /// as `Exhausted` (which the caller maps to a false `NotFound`: + /// a streaming header proves the contract exists). + #[test] + fn assembly_failure_advances_with_fresh_tx_and_never_exhausts_to_notfound() { + let src = production_source(); + let body = extract_fn_body(src, "async fn drive_get_with_assembly_retry("); + + let err_arm = body + .find("Err(e) =>") + .expect("wrapper must handle assembly Err"); + let err_body = &body[err_arm..]; + + // The routing penalty must land on the candidate whose header + // never became a stream — capture it BEFORE advance() mutates + // `current_target`. + let capture_pos = err_body + .find("let failed_target = driver.current_target.clone()") + .expect("Err arm must capture the failing target"); + let advance_pos = err_body + .find("driver.advance()") + .expect("Err arm must call driver.advance()"); + assert!( + capture_pos < advance_pos, + "failed_target must be captured BEFORE driver.advance() — \ + advance() replaces current_target, so capturing after \ + penalizes the wrong (fresh) candidate." + ); + + // Re-entry must use a fresh attempt tx: reusing the previous tx + // collides with the relay dedup gates (`active_relay_get_txs`) + // while the failed attempt's relay chain is still draining. + assert!( + err_body.contains("attempt_tx = driver.new_attempt_tx()"), + "assembly-failure retry must re-enter the loop with a fresh \ + attempt tx via driver.new_attempt_tx()" + ); + + // Exhaustion keeps the Done(Streaming) outcome. + let exhausted_arm = err_body + .find("AdvanceOutcome::Exhausted =>") + .expect("Err arm must handle Exhausted"); + let exhausted_body = &err_body[exhausted_arm..]; + assert!( + exhausted_body.contains("break RetryLoopOutcome::Done(Terminal::Streaming {"), + "exhausted assembly retries must break with the original \ + Done(Streaming) outcome so the client sees OperationError, \ + NOT RetryLoopOutcome::Exhausted (which maps to a false \ + NotFound for a contract that provably exists)." + ); + assert!( + !exhausted_body.contains("RetryLoopOutcome::Exhausted("), + "exhausted assembly retries must not be converted into \ + RetryLoopOutcome::Exhausted" + ); + } + + /// #4345: the injection budget is keyed by contract so parallel + /// tests in one process can't consume each other's failures. + #[test] + fn assembly_fault_injection_is_per_key_and_bounded() { + let key_a = ContractKey::from_id_and_code( + ContractInstanceId::new([0xA1; 32]), + CodeHash::new([0xA2; 32]), + ); + let key_b = ContractKey::from_id_and_code( + ContractInstanceId::new([0xB1; 32]), + CodeHash::new([0xB2; 32]), + ); + + // Unarmed keys never fail. + assert!(!assembly_fault_injection::consume(&key_b)); + + assembly_fault_injection::inject_failures(key_a, 2); + assert!(assembly_fault_injection::consume(&key_a)); + // Other keys are unaffected while a budget is armed. + assert!(!assembly_fault_injection::consume(&key_b)); + assert!(assembly_fault_injection::consume(&key_a)); + // Budget exhausted → assembly succeeds again. + assert!(!assembly_fault_injection::consume(&key_a)); + } + /// Pure-data regression test for the streaming payload shape the /// driver deserializes. Locks down the invariant that /// `GetStreamingPayload` round-trips via bincode, so a regression diff --git a/crates/core/tests/streaming_e2e.rs b/crates/core/tests/streaming_e2e.rs index 530a06c8f8..be55aedaa0 100644 --- a/crates/core/tests/streaming_e2e.rs +++ b/crates/core/tests/streaming_e2e.rs @@ -1289,3 +1289,222 @@ fn test_driver_inline_get_triggers_auto_subscribe() { .collect::>() ); } + +// ============================================================================= +// Test: GET retries after stream-assembly failure (#4345) +// ============================================================================= + +/// Build the sparse-but-routable topology used by +/// `test_streaming_get_retries_after_assembly_failure`. max_connections +/// below the node count keeps the PUT fan-out from reaching every node +/// (so a cold getter exists), while min_connections = 3 gives nodes +/// enough ring candidates for the assembly-failure retry to advance to. +async fn setup_assembly_retry_network(name: &str, seed: u64, threshold: usize) -> SimNetwork { + GlobalRng::set_seed(seed); + const BASE_EPOCH_MS: u64 = 1577836800000; + const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000; + GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + (seed % RANGE_MS)); + let mut sim = SimNetwork::new( + name, 1, // gateways + 12, // nodes + 7, // ring_max_htl + 3, // rnd_if_htl_above + 4, // max_connections (sparse: < node count → cold getter exists) + 3, // min_connections (≥ 2 retry candidates per node) + seed, + ) + .await; + sim.with_streaming_threshold(threshold); + sim +} + +/// Regression test for #4345: a failed stream assembly must consume the +/// GET retry budget and advance to the next candidate instead of +/// synthesizing a client error with the budget untouched. +/// +/// Production failure chain: packet loss → FixedRate loss-pause caps the +/// cwnd → the sender aborts on the 3s cwnd-wait timeout → the receiver's +/// stream assembly hits its inactivity timeout → the originator's GET +/// driver (pre-fix) logged a WARN and synthesized "GET succeeded on wire +/// but local store lookup failed" without retrying — one transport +/// hiccup burned the whole GET even though `MAX_RETRIES` was untouched +/// and other candidates could serve the contract. +/// +/// Uses the two-phase cold-node isolation strategy from +/// `test_driver_streaming_get_cold_cache`, but on a routable htl=7 +/// topology (the HTL=1 variant is not routable — see that test's +/// #[ignore]): phase 1 runs the PUT alone to find the nodes the fan-out +/// missed; phase 2 re-runs the same seed (same ring) and GETs from a +/// cold node with exactly one injected assembly failure armed for this +/// contract key (via `get_assembly_fault_injection`, which mirrors the +/// production failure by leaving the inbound stream orphaned). The +/// driver must advance to another candidate and still deliver the full +/// state. +/// +/// Not every fan-out-missed node can route a GET to a holder in a +/// sparse topology (a routing property, not the #4345 assembly-retry +/// property under test), so phase 2 iterates the cold candidates +/// deterministically until one demonstrates the retry path end-to-end. +/// On a regression every candidate fails and the test fails with the +/// per-candidate diagnostics. +#[test] +fn test_streaming_get_retries_after_assembly_failure() { + use freenet::dev_tool::GET_DRIVER_CALL_COUNT; + use freenet::dev_tool::get_assembly_fault_injection; + use std::sync::atomic::Ordering; + + const SEED: u64 = 0xDE1A_4345_0000_0001; + const NETWORK_NAME_PHASE1: &str = "get-assembly-retry-p1"; + const THRESHOLD: usize = 1024; + const LARGE_STATE_SIZE: usize = 100 * 1024; // 100KB, above THRESHOLD + const NODES: usize = 12; + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let contract = SimOperation::create_test_contract(0x45); + let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0x45); + let contract_key = contract.key(); + let contract_id = *contract_key.id(); + + // Phase 1: run the PUT in isolation to discover the nodes that did + // NOT receive the state during fan-out. + let sim1 = rt.block_on(setup_assembly_retry_network( + NETWORK_NAME_PHASE1, + SEED, + THRESHOLD, + )); + let phase1_ops = vec![ScheduledOperation::new( + NodeLabel::gateway(NETWORK_NAME_PHASE1, 0), + SimOperation::Put { + contract: contract.clone(), + state: large_state.clone(), + subscribe: false, + }, + )]; + let phase1 = sim1.run_controlled_simulation( + SEED, + phase1_ops, + Duration::from_secs(120), + Duration::from_secs(30), + ); + assert!( + phase1.turmoil_result.is_ok(), + "Phase 1 (PUT only) should complete: {:?}", + phase1.turmoil_result.err() + ); + let cold_nodes: Vec = (1..=NODES) + .filter(|i| { + let label = NodeLabel::node(NETWORK_NAME_PHASE1, *i); + phase1 + .node_storages + .get(&label) + .is_some_and(|s| s.get_stored_state(&contract_key).is_none()) + }) + .collect(); + assert!( + !cold_nodes.is_empty(), + "Test precondition: no cold-cache node exists after the PUT in \ + this {NODES}-node sparse topology. All nodes received the state \ + during fan-out. Pick a different SEED or sparser max_connections." + ); + + let mut failures: Vec = Vec::new(); + let mut demonstrated = false; + for (attempt, cold_idx) in cold_nodes.iter().copied().enumerate() { + let network_name = format!("get-assembly-retry-p2-{attempt}"); + + // (Re-)arm exactly one injected assembly failure for THIS + // contract key — keying by contract makes this safe under + // parallel test execution in the same process. `inject_failures` + // overwrites, so a previous candidate that never streamed leaves + // no stale budget behind. + get_assembly_fault_injection::inject_failures(contract_key, 1); + let retries_before = + get_assembly_fault_injection::ASSEMBLY_RETRY_COUNT.load(Ordering::SeqCst); + let baseline_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst); + + let sim2 = rt.block_on(setup_assembly_retry_network(&network_name, SEED, THRESHOLD)); + let cold_node_label = NodeLabel::node(&network_name, cold_idx); + let phase2_ops = vec![ + ScheduledOperation::new( + NodeLabel::gateway(&network_name, 0), + SimOperation::Put { + contract: contract.clone(), + state: large_state.clone(), + subscribe: false, + }, + ), + ScheduledOperation::new( + cold_node_label.clone(), + SimOperation::Get { + contract_id, + return_contract_code: false, + subscribe: false, + }, + ), + ]; + let phase2 = sim2.run_controlled_simulation( + SEED, + phase2_ops, + Duration::from_secs(300), + Duration::from_secs(120), + ); + if let Err(e) = &phase2.turmoil_result { + failures.push(format!("node {cold_idx}: simulation error: {e:?}")); + continue; + } + + // Driver-isolation probe: the GET must have gone through the + // task-per-tx driver (not the local-cache shortcut), otherwise + // the assembly-retry path was never reachable. + let driver_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst); + if driver_calls == baseline_calls { + failures.push(format!( + "node {cold_idx}: GET never reached the driver (locally cached?)" + )); + continue; + } + + let retries_after = + get_assembly_fault_injection::ASSEMBLY_RETRY_COUNT.load(Ordering::SeqCst); + let stored = phase2 + .node_storages + .get(&cold_node_label) + .and_then(|s| s.get_stored_state(&contract_key)); + + match (retries_after > retries_before, stored) { + (true, Some(state)) => { + // The driver took the assembly-failure retry path AND + // the retry delivered the full, correct state. + let stored_bytes: Vec = state.as_ref().to_vec(); + assert_eq!( + stored_bytes, large_state, + "Stored state bytes should match the original 100KB \ + state after the assembly-failure retry" + ); + demonstrated = true; + break; + } + (took_retry, stored) => { + failures.push(format!( + "node {cold_idx}: retry_path_taken={took_retry} \ + state_present={} (streaming GET likely never reached \ + a holder from this candidate, or the retry burned \ + the GET — #4345 regression if this repeats for every \ + candidate)", + stored.is_some() + )); + } + } + } + + assert!( + demonstrated, + "No cold candidate demonstrated the #4345 assembly-failure retry \ + path (injected failure → driver.advance() → fresh attempt → \ + state delivered). Per-candidate outcomes: {failures:#?}" + ); +} From 3dac07d55379fa3a0e692d2e0b34929e05b441fb Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 7 Jun 2026 20:31:08 -0500 Subject: [PATCH 2/4] feat(telemetry): attribute transport-level events to the local peer id and version (#4345) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit transfer_failed / transfer_started / transfer_completed, transport_snapshot, and timeout telemetry events carried an empty peer_id, making sender attribution in the collector impossible — the v0.2.70 post-release analysis of #4345 could not tell which versions the failing senders ran. The transport layer genuinely has no peer identity, but the emitting node's own id is what these events need and it is available at reporter construction. - Thread the local peer id (public key + best-effort address, same construction and caveats as the shadow-RTT events in p2p_impl.rs) into TelemetryReporter/TelemetryWorker and stamp it on the three event families that previously sent an empty peer_id. - Add the standard service.version OTLP resource attribute so every event is attributable to a crate version without joining against peer_startup events. Additive fields only; the dashboard parses unknown attributes permissively and events.peer_id was already nullable. [AI-assisted - Claude] Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/core/src/node.rs | 17 +++- crates/core/src/tracing/telemetry.rs | 144 +++++++++++++++++++++++---- 2 files changed, 139 insertions(+), 22 deletions(-) diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 3d03bc106d..1ec6f01dc1 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -566,8 +566,21 @@ impl NodeConfig { registers.push(Box::new(OTEventRegister::new())); } - // Add telemetry reporter if enabled in config - if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry) { + // Add telemetry reporter if enabled in config. The local + // peer id (public key + best-effort address, same + // construction as the shadow-RTT events in `p2p_impl.rs`) + // attributes transport-level events — transfer_failed, + // transport_snapshot, timeout — which otherwise carry an + // empty peer_id and cannot be correlated to a sender in + // the collector (#4345 observability gap). + let local_peer_id = PeerId::new( + self.key_pair.public().clone(), + self.own_addr.unwrap_or_else(|| { + std::net::SocketAddr::new(self.network_listener_ip, self.network_listener_port) + }), + ) + .to_string(); + if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry, local_peer_id) { registers.push(Box::new(telemetry)); } diff --git a/crates/core/src/tracing/telemetry.rs b/crates/core/src/tracing/telemetry.rs index 43a9734d75..b2a2e3139f 100644 --- a/crates/core/src/tracing/telemetry.rs +++ b/crates/core/src/tracing/telemetry.rs @@ -127,6 +127,15 @@ pub fn current_timestamp_ms() -> u64 { #[derive(Clone)] pub struct TelemetryReporter { sender: mpsc::Sender, + /// This node's own peer id, stamped on events that are emitted + /// without a `NetLogMessage` carrying one (timeouts, transfer + /// events, transport snapshots). Without it those events carry an + /// empty `peer_id` and cannot be attributed to a sender in the + /// collector (#4345 observability gap). The address portion is + /// best-effort for non-gateway nodes (listener fallback until + /// external-address discovery — same caveat as the shadow-RTT + /// events, see `p2p_impl.rs` and #4294). + local_peer_id: String, } #[allow(dead_code)] @@ -147,8 +156,12 @@ struct TelemetryEvent { impl TelemetryReporter { /// Create a new telemetry reporter. /// + /// `local_peer_id` is this node's own peer id (public key + + /// best-effort address), used to attribute transport-level events + /// that have no `NetLogMessage` context. + /// /// Returns None if telemetry is disabled or in a test environment. - pub fn new(config: &TelemetryConfig) -> Option { + pub fn new(config: &TelemetryConfig, local_peer_id: String) -> Option { if !config.enabled { tracing::info!("Telemetry reporting is disabled"); return None; @@ -188,10 +201,14 @@ impl TelemetryReporter { receiver, transport_snapshot_interval_secs, transfer_event_receiver, + local_peer_id.clone(), ); GlobalExecutor::spawn(worker.run()); - Some(Self { sender }) + Some(Self { + sender, + local_peer_id, + }) } async fn send_event(&self, event: TelemetryEvent) { @@ -233,10 +250,11 @@ impl NetEventRegister for TelemetryReporter { ) -> BoxFuture<'_, ()> { let sender = self.sender.clone(); let op_type = op_type.to_string(); + let local_peer_id = self.local_peer_id.clone(); async move { let event = TelemetryEvent { timestamp: current_timestamp_ms(), - peer_id: String::new(), + peer_id: local_peer_id, transaction_id: tx.to_string(), event_type: "timeout".to_string(), event_data: serde_json::json!({ @@ -272,6 +290,8 @@ struct TelemetryWorker { transport_snapshot_interval_secs: u64, /// Receiver for per-transfer telemetry events from transport layer transfer_event_receiver: mpsc::Receiver, + /// This node's own peer id — see `TelemetryReporter::local_peer_id`. + local_peer_id: String, } impl TelemetryWorker { @@ -280,6 +300,7 @@ impl TelemetryWorker { receiver: mpsc::Receiver, transport_snapshot_interval_secs: u64, transfer_event_receiver: mpsc::Receiver, + local_peer_id: String, ) -> Self { Self { endpoint, @@ -295,6 +316,27 @@ impl TelemetryWorker { rate_limit_window_start: Instant::now(), transport_snapshot_interval_secs, transfer_event_receiver, + local_peer_id, + } + } + + /// Wrap a transport-layer transfer event into a `TelemetryEvent`, + /// stamped with this node's own peer id. The transport layer has + /// no peer-identity context of its own, so before #4345 these + /// events carried an empty `peer_id` and sender attribution in the + /// collector was impossible. + fn transfer_event_to_telemetry(&self, transfer_event: super::TransferEvent) -> TelemetryEvent { + let event_type = match &transfer_event { + super::TransferEvent::Started { .. } => "transfer_started", + super::TransferEvent::Completed { .. } => "transfer_completed", + super::TransferEvent::Failed { .. } => "transfer_failed", + }; + TelemetryEvent { + timestamp: current_timestamp_ms(), + peer_id: self.local_peer_id.clone(), + transaction_id: String::new(), // Not available at transport layer + event_type: event_type.to_string(), + event_data: event_kind_to_json(&super::EventKind::Transfer(transfer_event)), } } @@ -336,18 +378,7 @@ impl TelemetryWorker { transfer_event = self.transfer_event_receiver.recv() => { // Handle per-transfer telemetry events from transport layer if let Some(transfer_event) = transfer_event { - let event_type = match &transfer_event { - super::TransferEvent::Started { .. } => "transfer_started", - super::TransferEvent::Completed { .. } => "transfer_completed", - super::TransferEvent::Failed { .. } => "transfer_failed", - }; - let event = TelemetryEvent { - timestamp: current_timestamp_ms(), - peer_id: String::new(), // Transport layer doesn't have peer identity - transaction_id: String::new(), // Not available at transport layer - event_type: event_type.to_string(), - event_data: event_kind_to_json(&super::EventKind::Transfer(transfer_event)), - }; + let event = self.transfer_event_to_telemetry(transfer_event); self.handle_event(event).await; } }, @@ -360,7 +391,9 @@ impl TelemetryWorker { if let Some(snapshot) = TRANSPORT_METRICS.take_snapshot() { let event = TelemetryEvent { timestamp: current_timestamp_ms(), - peer_id: String::new(), // Transport metrics are node-wide, not peer-specific + // Node-wide metrics, attributed to this + // node's own peer id (#4345). + peer_id: self.local_peer_id.clone(), transaction_id: String::new(), // Not tied to a transaction event_type: "transport_snapshot".to_string(), event_data: serde_json::to_value(&snapshot).unwrap_or_default(), @@ -506,10 +539,21 @@ fn to_otlp_logs(events: &[TelemetryEvent]) -> serde_json::Value { serde_json::json!({ "resourceLogs": [{ "resource": { - "attributes": [{ - "key": "service.name", - "value": {"stringValue": "freenet-peer"} - }] + "attributes": [ + { + "key": "service.name", + "value": {"stringValue": "freenet-peer"} + }, + { + // Standard OTLP placement for the sender's + // crate version — lets collector queries + // attribute every event type to a release + // without joining against peer_startup events + // (#4345 observability gap). + "key": "service.version", + "value": {"stringValue": env!("CARGO_PKG_VERSION")} + } + ] }, "scopeLogs": [{ "scope": { @@ -1841,6 +1885,66 @@ mod tests { assert_eq!(peer_attr["value"]["stringValue"], "test-peer-id-xyz"); } + #[test] + fn test_otlp_resource_includes_service_version() { + // #4345 observability gap: every OTLP export must carry the + // sender's crate version as a standard resource attribute, so + // collector queries can attribute events (transfer_failed in + // particular) to a release without joining against + // peer_startup events. + let event = TelemetryEvent { + timestamp: 1_700_000_000_000, + peer_id: "some-peer".to_string(), + transaction_id: String::new(), + event_type: "transfer_failed".to_string(), + event_data: serde_json::json!({}), + }; + let otlp = to_otlp_logs(std::slice::from_ref(&event)); + let resource_attrs = &otlp["resourceLogs"][0]["resource"]["attributes"]; + let version_attr = resource_attrs + .as_array() + .expect("resource attributes must be an array") + .iter() + .find(|a| a["key"] == "service.version") + .expect("service.version resource attribute must be present"); + assert_eq!( + version_attr["value"]["stringValue"], + env!("CARGO_PKG_VERSION") + ); + } + + #[test] + fn test_transfer_events_carry_local_peer_id() { + // #4345 observability gap: transfer_failed events carried an + // empty peer_id, making sender attribution in the collector + // impossible. The worker must stamp its own peer id on every + // transport-level transfer event. + let (_cmd_tx, cmd_rx) = mpsc::channel(1); + let (_transfer_tx, transfer_rx) = mpsc::channel(1); + let worker = TelemetryWorker::new( + "http://localhost:4318".to_string(), + cmd_rx, + 0, + transfer_rx, + "test-local-peer-id".to_string(), + ); + let event = worker.transfer_event_to_telemetry(crate::tracing::TransferEvent::Failed { + stream_id: 42, + peer_addr: "127.0.0.1:31337".parse().unwrap(), + bytes_transferred: 1024, + reason: "cwnd wait timeout".to_string(), + elapsed_ms: 3000, + direction: crate::tracing::TransferDirection::Send, + timestamp: 1_700_000_000_000, + }); + assert_eq!(event.event_type, "transfer_failed"); + assert_eq!( + event.peer_id, "test-local-peer-id", + "transfer events must be attributed to the emitting node's \ + own peer id (#4345)" + ); + } + #[test] fn test_otlp_serialization_empty_peer_id_when_unset() { // Pin the back-compat path: events sent via the original From 8e44bf4516736ecc95a27fca9e40d60ca99eeacb Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 7 Jun 2026 20:42:13 -0500 Subject: [PATCH 3/4] refactor(get): deduplicate Done(Streaming) rebuilds in assembly-retry wrapper Match the retry-loop result by reference (the Streaming fields are all Copy) so the original outcome value stays whole; the three give-up exits in drive_get_with_assembly_retry now `break result` instead of each rebuilding RetryLoopOutcome::Done(Terminal::Streaming { ... }). This removes the thrice-repeated rewrap, drops the now-unused total_size binding, and lists the non-streaming pass-through variants explicitly (future Terminal variants force a decision here). Also derive Default for AssemblyOutcome and update the source-scrape pin test anchor from the rebuilt-terminal text to `break result` in the same edit; the negative Exhausted-conversion assertion is unchanged. No behavior change. [AI-assisted - Claude] Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/core/src/operations/get/op_ctx_task.rs | 55 ++++++++----------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/crates/core/src/operations/get/op_ctx_task.rs b/crates/core/src/operations/get/op_ctx_task.rs index 648269882b..6077f09393 100644 --- a/crates/core/src/operations/get/op_ctx_task.rs +++ b/crates/core/src/operations/get/op_ctx_task.rs @@ -807,6 +807,7 @@ impl RetryDriver for GetRetryDriver<'_> { // --- Assembly-retry wrapper (#4345) --- /// Outcome metadata for the streaming-assembly step of a GET. +#[derive(Default)] struct AssemblyOutcome { /// Wall-clock duration of the successful claim + assemble + /// deserialize + local cache write, fed to the router's @@ -856,27 +857,27 @@ async fn drive_get_with_assembly_retry( emit_route_failure_on_retry: bool, ) -> (RetryLoopOutcome, AssemblyOutcome) { let mut attempt_tx = first_tx; - let mut assembly = AssemblyOutcome { - transfer_duration: None, - error: None, - }; + let mut assembly = AssemblyOutcome::default(); let outcome = loop { let result = drive_retry_loop(op_manager, attempt_tx, op_label, driver).await; // Only a streaming terminal has a post-loop assembly step; - // everything else passes through unchanged. - let (key, stream_id, includes_contract, total_size) = match result { + // everything else passes through unchanged. Match by reference + // (the fields are all Copy) so `result` stays whole — every + // give-up exit below is then `break result`, returning the + // original `Done(Streaming)` outcome without rebuilding it. + let (key, stream_id, includes_contract) = match &result { RetryLoopOutcome::Done(Terminal::Streaming { key, stream_id, includes_contract, - total_size, - }) => (key, stream_id, includes_contract, total_size), - other @ (RetryLoopOutcome::Done(_) + .. + }) => (*key, *stream_id, *includes_contract), + RetryLoopOutcome::Done(Terminal::InlineFound { .. } | Terminal::LocalCompletion) | RetryLoopOutcome::Exhausted(_) | RetryLoopOutcome::Unexpected - | RetryLoopOutcome::InfraError(_)) => break other, + | RetryLoopOutcome::InfraError(_) => break result, }; // Uses `current_target` as the sender address — accurate for @@ -894,12 +895,7 @@ async fn drive_get_with_assembly_retry( cannot claim the response stream" .to_string(), ); - break RetryLoopOutcome::Done(Terminal::Streaming { - key, - stream_id, - includes_contract, - total_size, - }); + break result; }; let stream_start = tokio::time::Instant::now(); @@ -909,12 +905,7 @@ async fn drive_get_with_assembly_retry( Ok(()) => { assembly.transfer_duration = Some(stream_start.elapsed()); assembly.error = None; - break RetryLoopOutcome::Done(Terminal::Streaming { - key, - stream_id, - includes_contract, - total_size, - }); + break result; } Err(e) => { // Capture the failing peer BEFORE advance() replaces @@ -957,12 +948,7 @@ async fn drive_get_with_assembly_retry( exhausted — state will not be cached locally" ); assembly.error = Some(e); - break RetryLoopOutcome::Done(Terminal::Streaming { - key, - stream_id, - includes_contract, - total_size, - }); + break result; } } } @@ -4127,17 +4113,20 @@ mod tests { attempt tx via driver.new_attempt_tx()" ); - // Exhaustion keeps the Done(Streaming) outcome. + // Exhaustion keeps the Done(Streaming) outcome. The wrapper + // only reaches the assembly Err arm after `result` matched + // `Done(Terminal::Streaming { .. })`, so `break result` returns + // that original outcome unchanged. let exhausted_arm = err_body .find("AdvanceOutcome::Exhausted =>") .expect("Err arm must handle Exhausted"); let exhausted_body = &err_body[exhausted_arm..]; assert!( - exhausted_body.contains("break RetryLoopOutcome::Done(Terminal::Streaming {"), + exhausted_body.contains("break result"), "exhausted assembly retries must break with the original \ - Done(Streaming) outcome so the client sees OperationError, \ - NOT RetryLoopOutcome::Exhausted (which maps to a false \ - NotFound for a contract that provably exists)." + Done(Streaming) outcome (`break result`) so the client sees \ + OperationError, NOT RetryLoopOutcome::Exhausted (which maps \ + to a false NotFound for a contract that provably exists)." ); assert!( !exhausted_body.contains("RetryLoopOutcome::Exhausted("), From b8140fca2adfa9c5b96913bf3809b235de6323b7 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 7 Jun 2026 21:11:17 -0500 Subject: [PATCH 4/4] =?UTF-8?q?fix(get):=20close=20review=20findings=20?= =?UTF-8?q?=E2=80=94=20false-NotFound=20after=20assembly-retry=20exhaustio?= =?UTF-8?q?n,=20stale=20diagnostics,=20test=20gaps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review findings from the four-perspective + external (Gemini) pass on PR #4367, all addressed: - False NotFound (code-first + testing reviewers, blocking class): an assembly failure that advanced and re-entered the loop could then exhaust on the wire, and the pass-through Exhausted outcome mapped to ContractResponse::NotFound — a false NotFound for a contract whose streaming header proved existence. The wrapper now remembers the failed header and converts a subsequent wire exhaustion back to Done(Streaming), so the client always gets the diagnostic OperationError. Pinned in the assembly-retry pin test. - Stale assembly.error (both reviewers): cleared when a retry resolves via a non-streaming terminal, so an unrelated store-lookup miss is not mislabeled as an assembly failure. - Stale cross-attempt timing on the conversion path (adversarial reviewer): request_sent_at/response_received_at are cleared in the conversion arm so the caller's received --- .claude/rules/operations.md | 8 +- crates/core/src/node.rs | 24 +- crates/core/src/node/p2p_impl.rs | 8 +- crates/core/src/operations/get/op_ctx_task.rs | 223 ++++++++++++++++-- crates/core/src/tracing/telemetry.rs | 79 ++++++- crates/core/tests/streaming_e2e.rs | 14 +- 6 files changed, 305 insertions(+), 51 deletions(-) diff --git a/.claude/rules/operations.md b/.claude/rules/operations.md index 4e209cbb4d..14e0ddcd1e 100644 --- a/.claude/rules/operations.md +++ b/.claude/rules/operations.md @@ -26,7 +26,13 @@ Driver entry points: The shared retry-loop driver lives in `op_ctx.rs` (`RetryDriver` trait + `drive_retry_loop`). UPDATE is fire-and-forget (no retry loop, no -upstream reply); GET/PUT/SUBSCRIBE share the retry driver. +upstream reply); GET/PUT/SUBSCRIBE share the retry driver. GET's +client and sub-op drivers enter the loop via +`get/op_ctx_task.rs::drive_get_with_assembly_retry`, which treats a +post-terminal stream-assembly failure as a retryable attempt failure +(#4345) — post-terminal side effects that can fail retryably belong in +such a wrapper, never inside the loop's Terminal arm (pinned by +`drive_retry_loop_terminal_arm_does_not_call_advance`). ## Wire-variant dispatch diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 1ec6f01dc1..0e94edce7c 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -302,6 +302,19 @@ pub struct NodeConfig { } impl NodeConfig { + /// This node's own peer id as a telemetry attribution string + /// (public key + best-effort address). The address portion falls + /// back to the listener address for non-gateway nodes until + /// external-address discovery — a refresh path is tracked in + /// #4294. Shared by the telemetry reporter and the shadow-RTT / + /// reference-ping emitters so the two constructions can't drift. + pub(crate) fn local_peer_id_string(&self) -> String { + let addr = self.own_addr.unwrap_or_else(|| { + std::net::SocketAddr::new(self.network_listener_ip, self.network_listener_port) + }); + PeerId::new(self.key_pair.public().clone(), addr).to_string() + } + pub async fn new(config: Config) -> anyhow::Result { tracing::info!("Loading node configuration for mode {}", config.mode); @@ -573,14 +586,9 @@ impl NodeConfig { // transport_snapshot, timeout — which otherwise carry an // empty peer_id and cannot be correlated to a sender in // the collector (#4345 observability gap). - let local_peer_id = PeerId::new( - self.key_pair.public().clone(), - self.own_addr.unwrap_or_else(|| { - std::net::SocketAddr::new(self.network_listener_ip, self.network_listener_port) - }), - ) - .to_string(); - if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry, local_peer_id) { + if let Some(telemetry) = + TelemetryReporter::new(&self.config.telemetry, self.local_peer_id_string()) + { registers.push(Box::new(telemetry)); } diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index c3ac843ec0..1858c6c7ec 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration}; +use std::{convert::Infallible, sync::Arc, time::Duration}; use futures::{FutureExt, future::BoxFuture}; use tokio::task::JoinHandle; @@ -418,11 +418,7 @@ impl NodeP2P { let reference_ping_enabled = config.config.telemetry.enabled && !config.config.telemetry.is_test_environment && config.config.telemetry.reference_ping_enabled; - let listen_addr = config.own_addr.unwrap_or_else(|| { - SocketAddr::new(config.network_listener_ip, config.network_listener_port) - }); - let local_peer_id = - crate::node::PeerId::new(config.key_pair.public().clone(), listen_addr).to_string(); + let local_peer_id = config.local_peer_id_string(); crate::transport::rolling_rtt_stats::spawn_aggregator( local_peer_id.clone(), &background_task_monitor, diff --git a/crates/core/src/operations/get/op_ctx_task.rs b/crates/core/src/operations/get/op_ctx_task.rs index 6077f09393..61f49eacd2 100644 --- a/crates/core/src/operations/get/op_ctx_task.rs +++ b/crates/core/src/operations/get/op_ctx_task.rs @@ -845,10 +845,14 @@ struct AssemblyOutcome { /// would collide with the relay dedup gates (`active_relay_get_txs`) /// while the failed attempt's relay chain is still draining. /// -/// When the budget is exhausted, the original `Done(Streaming)` -/// outcome is returned (NOT `Exhausted`): a streaming header proves -/// the contract exists, so the client must see an operation error -/// rather than a false `NotFound`. +/// Once a streaming header has been seen, exhaustion can never surface +/// as `Exhausted` (which callers map to a false `NotFound` for a +/// contract that provably exists). Both exhaustion shapes are covered: +/// the assembly-time `advance()` exhaustion keeps the original +/// `Done(Streaming)` outcome, and a wire exhaustion on a re-entered +/// loop (every remaining candidate NotFound / timeout) is converted +/// back to the remembered `Done(Streaming)` header. Either way the +/// client sees an operation error carrying the assembly cause. async fn drive_get_with_assembly_retry( op_manager: &OpManager, first_tx: Transaction, @@ -858,6 +862,15 @@ async fn drive_get_with_assembly_retry( ) -> (RetryLoopOutcome, AssemblyOutcome) { let mut attempt_tx = first_tx; let mut assembly = AssemblyOutcome::default(); + // Streaming-header fields remembered across an assembly-failure + // retry. A header proves the contract exists, so if the re-entered + // loop then exhausts on the wire (every remaining candidate + // NotFound / timeout), the exhaustion must NOT surface as a false + // `NotFound` to the client — convert it back to the remembered + // `Done(Streaming)` so the caller synthesizes an operation error + // carrying the assembly cause. Pre-#4345 this state was + // unreachable (assembly failure never re-entered the loop). + let mut failed_header: Option<(ContractKey, StreamId, bool, u64)> = None; let outcome = loop { let result = drive_retry_loop(op_manager, attempt_tx, op_label, driver).await; @@ -867,17 +880,48 @@ async fn drive_get_with_assembly_retry( // (the fields are all Copy) so `result` stays whole — every // give-up exit below is then `break result`, returning the // original `Done(Streaming)` outcome without rebuilding it. - let (key, stream_id, includes_contract) = match &result { + let (key, stream_id, includes_contract, total_size) = match &result { RetryLoopOutcome::Done(Terminal::Streaming { key, stream_id, includes_contract, - .. - }) => (*key, *stream_id, *includes_contract), - RetryLoopOutcome::Done(Terminal::InlineFound { .. } | Terminal::LocalCompletion) - | RetryLoopOutcome::Exhausted(_) - | RetryLoopOutcome::Unexpected - | RetryLoopOutcome::InfraError(_) => break result, + total_size, + }) => (*key, *stream_id, *includes_contract, *total_size), + RetryLoopOutcome::Done(Terminal::InlineFound { .. } | Terminal::LocalCompletion) => { + // A non-streaming terminal delivered the state inline, + // so any earlier assembly failure is moot — clear the + // remembered error or an unrelated store-lookup miss + // (eviction race) would be mislabeled as an assembly + // failure by the caller's error synthesis. + assembly.error = None; + break result; + } + RetryLoopOutcome::Exhausted(cause) => { + if let Some((key, stream_id, includes_contract, total_size)) = failed_header { + let prior = assembly.error.take().unwrap_or_default(); + assembly.error = Some(format!( + "{prior}; retries after assembly failure exhausted: {cause}" + )); + // The remembered header's `response_received_at` + // belongs to the FIRST attempt, while + // `request_sent_at` was overwritten by the last + // (exhausted) attempt — the pair is incoherent, and + // leaving it set makes the caller's received break result, }; // Uses `current_target` as the sender address — accurate for @@ -937,6 +981,10 @@ async fn drive_get_with_assembly_retry( assembly_fault_injection::ASSEMBLY_RETRY_COUNT .fetch_add(1, std::sync::atomic::Ordering::SeqCst); assembly.error = Some(e); + // Remember the header so a later wire + // exhaustion converts back to Done(Streaming) + // instead of surfacing a false NotFound. + failed_header = Some((key, stream_id, includes_contract, total_size)); attempt_tx = driver.new_attempt_tx(); continue; } @@ -1080,15 +1128,7 @@ async fn build_host_response( )) } _ => { - let cause = match assembly_error { - Some(e) => format!( - "GET response stream assembly failed for {instance_id} \ - after exhausting retries: {e}" - ), - None => { - format!("GET succeeded on wire but local store lookup failed for {instance_id}") - } - }; + let cause = synthesized_get_error_cause(instance_id, assembly_error); tracing::warn!( contract = %instance_id, %cause, @@ -1893,10 +1933,10 @@ async fn drive_sub_op_get( client_contract, )) } - _ => SubOpGetOutcome::NotFound(match &streaming_assembly.error { - Some(e) => format!("{}: {e}", missing_state_cause(&instance_id)), - None => missing_state_cause(&instance_id), - }), + _ => SubOpGetOutcome::NotFound(sub_op_not_found_cause( + &instance_id, + streaming_assembly.error.as_deref(), + )), } } RetryLoopOutcome::Exhausted(cause) => SubOpGetOutcome::NotFound(cause), @@ -1905,6 +1945,37 @@ async fn drive_sub_op_get( } } +/// Cause string for a client GET whose terminal reply was classified +/// success but whose state never reached the local store. Carries the +/// assembly failure cause when the streaming path exhausted its retry +/// budget (#4345); otherwise the generic store-lookup message (covers +/// eviction races and executor rejections). Pure helper so both arms +/// are unit-testable without a real `OpManager`. +fn synthesized_get_error_cause( + instance_id: &ContractInstanceId, + assembly_error: Option<&str>, +) -> String { + match assembly_error { + Some(e) => format!( + "GET response stream assembly failed for {instance_id} \ + after exhausting retries: {e}" + ), + None => format!("GET succeeded on wire but local store lookup failed for {instance_id}"), + } +} + +/// Sub-op variant of [`synthesized_get_error_cause`]: the store-miss +/// cause with the assembly failure appended when one occurred (#4345). +fn sub_op_not_found_cause( + instance_id: &ContractInstanceId, + assembly_error: Option<&str>, +) -> String { + match assembly_error { + Some(e) => format!("{}: {e}", missing_state_cause(instance_id)), + None => missing_state_cause(instance_id), + } +} + /// Cause string for a sub-op GET that succeeded on the wire but /// produced no state in the local store on re-query. Extracted as a /// pure helper so the message format is unit-testable without a @@ -4083,8 +4154,15 @@ mod tests { let src = production_source(); let body = extract_fn_body(src, "async fn drive_get_with_assembly_retry("); - let err_arm = body + // Anchor on the assembly call first so a future earlier + // `Err(e)` arm (e.g. around the claim) can't silently shift + // the slice this test inspects. + let assemble_call = body + .find("match assemble_and_cache_stream") + .expect("wrapper must match on assemble_and_cache_stream"); + let err_arm = body[assemble_call..] .find("Err(e) =>") + .map(|p| p + assemble_call) .expect("wrapper must handle assembly Err"); let err_body = &body[err_arm..]; @@ -4113,6 +4191,29 @@ mod tests { attempt tx via driver.new_attempt_tx()" ); + // The advancing path must penalize the failed candidate (the + // router only learns about header-without-stream peers from + // this call) and must remember the header for the + // wire-exhaustion conversion below. + let next_arm = err_body + .find("AdvanceOutcome::Next =>") + .expect("Err arm must handle Next"); + let next_body = &err_body[next_arm + ..err_body + .find("AdvanceOutcome::Exhausted =>") + .expect("Err arm must handle Exhausted")]; + assert!( + next_body.contains("emit_get_route_failure("), + "the advancing path must emit a route failure for the \ + failed candidate (gated on emit_route_failure_on_retry)" + ); + assert!( + next_body + .contains("failed_header = Some((key, stream_id, includes_contract, total_size))"), + "the advancing path must remember the streaming header so a \ + later wire exhaustion can't surface as a false NotFound" + ); + // Exhaustion keeps the Done(Streaming) outcome. The wrapper // only reaches the assembly Err arm after `result` matched // `Done(Terminal::Streaming { .. })`, so `break result` returns @@ -4133,6 +4234,78 @@ mod tests { "exhausted assembly retries must not be converted into \ RetryLoopOutcome::Exhausted" ); + // ...and the budget-exhausted path must NOT emit a route + // failure: the caller's final route event (driven by the + // failed host_result) already records the Failure for the + // last target — emitting here too would double-count it. + assert!( + !exhausted_body.contains("emit_get_route_failure("), + "the budget-exhausted path must not emit a per-retry route \ + failure — the caller's final Failure event covers the last \ + target (double-count otherwise)" + ); + + // Wire exhaustion AFTER a failed assembly must convert back to + // the remembered Done(Streaming): a header proved the contract + // exists, so RetryLoopOutcome::Exhausted (mapped to NotFound by + // both callers) would be a false NotFound. This state is only + // reachable post-#4345 (assembly failure re-enters the loop). + let exhausted_passthrough = body + .find("RetryLoopOutcome::Exhausted(cause) =>") + .expect("wrapper must have an Exhausted pass-through arm"); + let passthrough_body = &body[exhausted_passthrough..]; + assert!( + passthrough_body.contains( + "if let Some((key, stream_id, includes_contract, total_size)) = failed_header" + ), + "the Exhausted pass-through must check failed_header" + ); + assert!( + passthrough_body.contains("break RetryLoopOutcome::Done(Terminal::Streaming {"), + "wire exhaustion after a seen streaming header must convert \ + back to Done(Streaming) so the client sees OperationError, \ + not a false NotFound" + ); + } + + /// #4345: the synthesized client error must carry the assembly + /// failure cause when one occurred, and fall back to the generic + /// store-lookup message otherwise. Behavioral coverage for the + /// cause construction both callers use on the exhaustion path. + #[test] + fn synthesized_error_causes_carry_assembly_failure() { + let instance_id = ContractInstanceId::new([0xC1; 32]); + + let with_assembly = synthesized_get_error_cause( + &instance_id, + Some("stream assembly: no fragments received within inactivity timeout"), + ); + assert!( + with_assembly.contains("stream assembly failed") + && with_assembly.contains("no fragments received"), + "assembly-exhaustion cause must be diagnostic: {with_assembly}" + ); + + let without = synthesized_get_error_cause(&instance_id, None); + assert!( + without.contains("local store lookup failed"), + "fallback cause must keep the store-lookup message: {without}" + ); + assert!( + !without.contains("assembly"), + "fallback cause must not mention assembly: {without}" + ); + + let sub_with = sub_op_not_found_cause(&instance_id, Some("injected failure")); + assert!( + sub_with.contains(&missing_state_cause(&instance_id)) + && sub_with.contains("injected failure"), + "sub-op cause must append the assembly failure: {sub_with}" + ); + assert_eq!( + sub_op_not_found_cause(&instance_id, None), + missing_state_cause(&instance_id) + ); } /// #4345: the injection budget is keyed by contract so parallel diff --git a/crates/core/src/tracing/telemetry.rs b/crates/core/src/tracing/telemetry.rs index b2a2e3139f..49d7f7f9d5 100644 --- a/crates/core/src/tracing/telemetry.rs +++ b/crates/core/src/tracing/telemetry.rs @@ -320,6 +320,22 @@ impl TelemetryWorker { } } + /// Wrap a transport metrics snapshot into a `TelemetryEvent`. + /// Node-wide metrics, attributed to this node's own peer id — + /// same #4345 rationale as `transfer_event_to_telemetry`. + fn snapshot_to_telemetry( + &self, + snapshot: &crate::transport::metrics::TransportSnapshot, + ) -> TelemetryEvent { + TelemetryEvent { + timestamp: current_timestamp_ms(), + peer_id: self.local_peer_id.clone(), + transaction_id: String::new(), // Not tied to a transaction + event_type: "transport_snapshot".to_string(), + event_data: serde_json::to_value(snapshot).unwrap_or_default(), + } + } + /// Wrap a transport-layer transfer event into a `TelemetryEvent`, /// stamped with this node's own peer id. The transport layer has /// no peer-identity context of its own, so before #4345 these @@ -389,15 +405,7 @@ impl TelemetryWorker { // Emit transport layer metrics snapshot (only if enabled) if self.transport_snapshot_interval_secs > 0 { if let Some(snapshot) = TRANSPORT_METRICS.take_snapshot() { - let event = TelemetryEvent { - timestamp: current_timestamp_ms(), - // Node-wide metrics, attributed to this - // node's own peer id (#4345). - peer_id: self.local_peer_id.clone(), - transaction_id: String::new(), // Not tied to a transaction - event_type: "transport_snapshot".to_string(), - event_data: serde_json::to_value(&snapshot).unwrap_or_default(), - }; + let event = self.snapshot_to_telemetry(&snapshot); self.handle_event(event).await; } } @@ -1885,6 +1893,59 @@ mod tests { assert_eq!(peer_attr["value"]["stringValue"], "test-peer-id-xyz"); } + #[test] + fn test_transport_snapshot_carries_local_peer_id() { + // #4345 observability gap, same class as transfer_failed: + // transport_snapshot events carried an empty peer_id. The + // worker must stamp its own peer id on snapshot events. + let (_cmd_tx, cmd_rx) = mpsc::channel(1); + let (_transfer_tx, transfer_rx) = mpsc::channel(1); + let worker = TelemetryWorker::new( + "http://localhost:4318".to_string(), + cmd_rx, + 0, + transfer_rx, + "snapshot-peer-id".to_string(), + ); + let snapshot = crate::transport::metrics::TransportSnapshot::default(); + let event = worker.snapshot_to_telemetry(&snapshot); + assert_eq!(event.event_type, "transport_snapshot"); + assert_eq!( + event.peer_id, "snapshot-peer-id", + "transport snapshots must be attributed to the emitting \ + node's own peer id (#4345)" + ); + } + + #[tokio::test] + async fn test_timeout_events_carry_local_peer_id() { + // #4345 observability gap, same class as transfer_failed: + // timeout events carried an empty peer_id. + use crate::message::Transaction; + use crate::operations::get::GetMsg; + + let (sender, mut receiver) = mpsc::channel(1); + let mut reporter = TelemetryReporter { + sender, + local_peer_id: "timeout-peer-id".to_string(), + }; + let tx = Transaction::new::(); + reporter + .notify_of_time_out(tx, "get", Some("target-peer".to_string())) + .await; + let TelemetryCommand::Event(event) = + receiver.try_recv().expect("timeout event must be sent") + else { + panic!("expected an Event command"); + }; + assert_eq!(event.event_type, "timeout"); + assert_eq!( + event.peer_id, "timeout-peer-id", + "timeout events must be attributed to the emitting node's \ + own peer id (#4345)" + ); + } + #[test] fn test_otlp_resource_includes_service_version() { // #4345 observability gap: every OTLP export must carry the diff --git a/crates/core/tests/streaming_e2e.rs b/crates/core/tests/streaming_e2e.rs index be55aedaa0..1048136392 100644 --- a/crates/core/tests/streaming_e2e.rs +++ b/crates/core/tests/streaming_e2e.rs @@ -1413,14 +1413,24 @@ fn test_streaming_get_retries_after_assembly_failure() { let mut failures: Vec = Vec::new(); let mut demonstrated = false; - for (attempt, cold_idx) in cold_nodes.iter().copied().enumerate() { + // Cap the candidates tried: each iteration is a full phase-2 sim + // run (~15s), and under a regression EVERY candidate fails — an + // uncapped loop over up to 11 cold nodes would hit the nextest ci + // profile's termination timeout and lose the per-candidate + // diagnostics below. The sim is deterministic per seed; with this + // seed the first four cold candidates cannot route a GET to a + // holder at all (their retry exhausts on the wire) and the fifth + // demonstrates the property, so the cap leaves one spare. + for (attempt, cold_idx) in cold_nodes.iter().copied().take(6).enumerate() { let network_name = format!("get-assembly-retry-p2-{attempt}"); // (Re-)arm exactly one injected assembly failure for THIS // contract key — keying by contract makes this safe under // parallel test execution in the same process. `inject_failures` // overwrites, so a previous candidate that never streamed leaves - // no stale budget behind. + // no stale budget behind. (ASSEMBLY_RETRY_COUNT itself is + // process-global; the before/after delta below relies on + // nextest's process-per-test isolation in CI.) get_assembly_fault_injection::inject_failures(contract_key, 1); let retries_before = get_assembly_fault_injection::ASSEMBLY_RETRY_COUNT.load(Ordering::SeqCst);