diff --git a/.claude/rules/operations.md b/.claude/rules/operations.md index 4e209cbb4d..14e0ddcd1e 100644 --- a/.claude/rules/operations.md +++ b/.claude/rules/operations.md @@ -26,7 +26,13 @@ Driver entry points: The shared retry-loop driver lives in `op_ctx.rs` (`RetryDriver` trait + `drive_retry_loop`). UPDATE is fire-and-forget (no retry loop, no -upstream reply); GET/PUT/SUBSCRIBE share the retry driver. +upstream reply); GET/PUT/SUBSCRIBE share the retry driver. GET's +client and sub-op drivers enter the loop via +`get/op_ctx_task.rs::drive_get_with_assembly_retry`, which treats a +post-terminal stream-assembly failure as a retryable attempt failure +(#4345) — post-terminal side effects that can fail retryably belong in +such a wrapper, never inside the loop's Terminal arm (pinned by +`drive_retry_loop_terminal_arm_does_not_call_advance`). ## Wire-variant dispatch diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 0ac1e88b58..33a7b664b0 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -109,6 +109,10 @@ pub mod dev_tool { pub use crate::operations::get::op_ctx_task::RELAY_DRIVER_CALL_COUNT as GET_RELAY_DRIVER_CALL_COUNT; #[cfg(any(test, feature = "testing"))] pub use crate::operations::get::op_ctx_task::RELAY_GET_STREAMING_FORWARD_COUNT; + // Deterministic stream-assembly fault injection + retry counter for + // the GET driver's assembly-retry path (#4345). + #[cfg(any(test, feature = "testing"))] + pub use crate::operations::get::op_ctx_task::assembly_fault_injection as get_assembly_fault_injection; #[cfg(any(test, feature = "testing"))] pub use crate::operations::put::op_ctx_task::RELAY_PUT_DRIVER_CALL_COUNT; #[cfg(any(test, feature = "testing"))] diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 3d03bc106d..0e94edce7c 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -302,6 +302,19 @@ pub struct NodeConfig { } impl NodeConfig { + /// This node's own peer id as a telemetry attribution string + /// (public key + best-effort address). The address portion falls + /// back to the listener address for non-gateway nodes until + /// external-address discovery — a refresh path is tracked in + /// #4294. Shared by the telemetry reporter and the shadow-RTT / + /// reference-ping emitters so the two constructions can't drift. + pub(crate) fn local_peer_id_string(&self) -> String { + let addr = self.own_addr.unwrap_or_else(|| { + std::net::SocketAddr::new(self.network_listener_ip, self.network_listener_port) + }); + PeerId::new(self.key_pair.public().clone(), addr).to_string() + } + pub async fn new(config: Config) -> anyhow::Result { tracing::info!("Loading node configuration for mode {}", config.mode); @@ -566,8 +579,16 @@ impl NodeConfig { registers.push(Box::new(OTEventRegister::new())); } - // Add telemetry reporter if enabled in config - if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry) { + // Add telemetry reporter if enabled in config. The local + // peer id (public key + best-effort address, same + // construction as the shadow-RTT events in `p2p_impl.rs`) + // attributes transport-level events — transfer_failed, + // transport_snapshot, timeout — which otherwise carry an + // empty peer_id and cannot be correlated to a sender in + // the collector (#4345 observability gap). + if let Some(telemetry) = + TelemetryReporter::new(&self.config.telemetry, self.local_peer_id_string()) + { registers.push(Box::new(telemetry)); } diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index c3ac843ec0..1858c6c7ec 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration}; +use std::{convert::Infallible, sync::Arc, time::Duration}; use futures::{FutureExt, future::BoxFuture}; use tokio::task::JoinHandle; @@ -418,11 +418,7 @@ impl NodeP2P { let reference_ping_enabled = config.config.telemetry.enabled && !config.config.telemetry.is_test_environment && config.config.telemetry.reference_ping_enabled; - let listen_addr = config.own_addr.unwrap_or_else(|| { - SocketAddr::new(config.network_listener_ip, config.network_listener_port) - }); - let local_peer_id = - crate::node::PeerId::new(config.key_pair.public().clone(), listen_addr).to_string(); + let local_peer_id = config.local_peer_id_string(); crate::transport::rolling_rtt_stats::spawn_aggregator( local_peer_id.clone(), &background_task_monitor, diff --git a/crates/core/src/operations/get/op_ctx_task.rs b/crates/core/src/operations/get/op_ctx_task.rs index cb12c6432e..61f49eacd2 100644 --- a/crates/core/src/operations/get/op_ctx_task.rs +++ b/crates/core/src/operations/get/op_ctx_task.rs @@ -316,7 +316,14 @@ async fn drive_client_get_inner( response_received_at: None, }; - let loop_result = drive_retry_loop(op_manager, client_tx, "get", &mut driver).await; + let (loop_result, streaming_assembly) = drive_get_with_assembly_retry( + op_manager, + client_tx, + "get", + &mut driver, + /* emit_route_failure_on_retry */ true, + ) + .await; // `op_manager.completed(client_tx)` runs from the // `ClientGetCompletionGuard` in `run_client_get` AFTER this fn @@ -388,10 +395,7 @@ async fn drive_client_get_inner( *key } Terminal::Streaming { - key, - stream_id, - includes_contract, - total_size, + key, total_size, .. } => { // `total_size` is the wire-authoritative payload byte // count from the `ResponseStreaming` header. Using it @@ -403,53 +407,15 @@ async fn drive_client_get_inner( // a separate "store evicted before re-query" // failure mode (the LRU has finite capacity). payload_size = *total_size as usize; - // Assemble the stream and cache locally. Mirrors - // the legacy `process_message` streaming branch - // at `get.rs:2721-3196`. Uses `current_target` - // as the sender address — accurate for the - // single-hop response case where the responder - // equals the selected target. - if let Some(peer_addr) = driver.current_target.socket_addr() { - // `transfer_duration` covers stream claim + - // assemble + deserialize + local cache write — - // the same composite measurement the legacy - // `GetStats::record_transfer_end` captured at - // `get.rs:1542`. For small payloads near the - // streaming threshold the local-work component - // is non-negligible, but keeping the metric - // composite matches the existing router prior - // built on the legacy code path. - let stream_start = tokio::time::Instant::now(); - if let Err(e) = assemble_and_cache_stream( - op_manager, - peer_addr, - *stream_id, - *key, - *includes_contract, - ) - .await - { - tracing::warn!( - %key, - error = %e, - "get: stream assembly failed — \ - state will not be cached locally" - ); - // Assembly failed → don't emit a transfer-rate - // observation (transfer_duration stays ZERO, - // which makes the router skip the rate sample - // at router.rs:443). The response-time - // observation still goes through because the - // header DID arrive. - } else { - transfer_duration = stream_start.elapsed(); - } - } else { - tracing::warn!( - %key, - "get: current_target has no socket_addr; \ - cannot claim orphan stream" - ); + // Stream assembly (with per-candidate retry, #4345) + // already ran inside `drive_get_with_assembly_retry` + // — consume its timing here. `None` means every + // assembly attempt failed: leave `transfer_duration` + // at ZERO so the router skips the rate sample at + // router.rs:443 (the response-time observation still + // goes through because a header DID arrive). + if let Some(duration) = streaming_assembly.transfer_duration { + transfer_duration = duration; } *key } @@ -466,8 +432,13 @@ async fn drive_client_get_inner( } }; - let host_result = - build_host_response(op_manager, &instance_id, return_contract_code).await; + let host_result = build_host_response( + op_manager, + &instance_id, + return_contract_code, + streaming_assembly.error.as_deref(), + ) + .await; // Auto-subscribe on successful GET at the originator — // mirrors the legacy branches at get.rs:2313/2408/3136/3185. @@ -833,6 +804,280 @@ impl RetryDriver for GetRetryDriver<'_> { } } +// --- Assembly-retry wrapper (#4345) --- + +/// Outcome metadata for the streaming-assembly step of a GET. +#[derive(Default)] +struct AssemblyOutcome { + /// Wall-clock duration of the successful claim + assemble + + /// deserialize + local cache write, fed to the router's + /// transfer-rate estimator. `None` when the terminal was not + /// `Streaming` or when every assembly attempt failed. The + /// composite measurement matches what the legacy + /// `GetStats::record_transfer_end` captured at `get.rs:1542`. + transfer_duration: Option, + /// Last assembly failure cause when assembly never succeeded. + /// Threaded into the synthesized client error so the failure is + /// diagnosable instead of the generic store-lookup message. + error: Option, +} + +/// Drive the shared GET retry loop, treating stream-assembly failure +/// as a retryable attempt failure (#4345). +/// +/// The shared loop classifies the `ResponseStreaming` *header* as +/// terminal (`op_ctx.rs` pins the Terminal arm to return +/// synchronously), so stream assembly necessarily runs after the loop +/// returns. Before this wrapper existed, an assembly failure — +/// fragments lost on a lossy path, the sender aborting on cwnd-wait +/// timeout, a relay's pipe dying after the header was forwarded — +/// burned the whole GET: the driver logged a WARN, fell through to +/// the store re-query, and synthesized a client error with the +/// `MAX_RETRIES` budget untouched. One transport hiccup failed the op +/// even though other candidates could serve it. The relay driver +/// already treats header-without-stream as a retryable routing +/// failure (`drive_relay_get_inner`'s claim-failure `continue`); this +/// gives the originator the same semantics. +/// +/// On assembly failure the wrapper advances the driver to the next +/// candidate (consuming the shared retry budget) and re-enters the +/// loop with a fresh attempt transaction — reusing the previous tx +/// would collide with the relay dedup gates (`active_relay_get_txs`) +/// while the failed attempt's relay chain is still draining. +/// +/// Once a streaming header has been seen, exhaustion can never surface +/// as `Exhausted` (which callers map to a false `NotFound` for a +/// contract that provably exists). Both exhaustion shapes are covered: +/// the assembly-time `advance()` exhaustion keeps the original +/// `Done(Streaming)` outcome, and a wire exhaustion on a re-entered +/// loop (every remaining candidate NotFound / timeout) is converted +/// back to the remembered `Done(Streaming)` header. Either way the +/// client sees an operation error carrying the assembly cause. +async fn drive_get_with_assembly_retry( + op_manager: &OpManager, + first_tx: Transaction, + op_label: &str, + driver: &mut GetRetryDriver<'_>, + emit_route_failure_on_retry: bool, +) -> (RetryLoopOutcome, AssemblyOutcome) { + let mut attempt_tx = first_tx; + let mut assembly = AssemblyOutcome::default(); + // Streaming-header fields remembered across an assembly-failure + // retry. A header proves the contract exists, so if the re-entered + // loop then exhausts on the wire (every remaining candidate + // NotFound / timeout), the exhaustion must NOT surface as a false + // `NotFound` to the client — convert it back to the remembered + // `Done(Streaming)` so the caller synthesizes an operation error + // carrying the assembly cause. Pre-#4345 this state was + // unreachable (assembly failure never re-entered the loop). + let mut failed_header: Option<(ContractKey, StreamId, bool, u64)> = None; + + let outcome = loop { + let result = drive_retry_loop(op_manager, attempt_tx, op_label, driver).await; + + // Only a streaming terminal has a post-loop assembly step; + // everything else passes through unchanged. Match by reference + // (the fields are all Copy) so `result` stays whole — every + // give-up exit below is then `break result`, returning the + // original `Done(Streaming)` outcome without rebuilding it. + let (key, stream_id, includes_contract, total_size) = match &result { + RetryLoopOutcome::Done(Terminal::Streaming { + key, + stream_id, + includes_contract, + total_size, + }) => (*key, *stream_id, *includes_contract, *total_size), + RetryLoopOutcome::Done(Terminal::InlineFound { .. } | Terminal::LocalCompletion) => { + // A non-streaming terminal delivered the state inline, + // so any earlier assembly failure is moot — clear the + // remembered error or an unrelated store-lookup miss + // (eviction race) would be mislabeled as an assembly + // failure by the caller's error synthesis. + assembly.error = None; + break result; + } + RetryLoopOutcome::Exhausted(cause) => { + if let Some((key, stream_id, includes_contract, total_size)) = failed_header { + let prior = assembly.error.take().unwrap_or_default(); + assembly.error = Some(format!( + "{prior}; retries after assembly failure exhausted: {cause}" + )); + // The remembered header's `response_received_at` + // belongs to the FIRST attempt, while + // `request_sent_at` was overwritten by the last + // (exhausted) attempt — the pair is incoherent, and + // leaving it set makes the caller's received break result, + }; + + // Uses `current_target` as the sender address — accurate for + // the single-hop response case where the responder equals the + // selected target; relays pipe the stream hop-by-hop so the + // fragments arrive from the adjacent hop either way. + let Some(peer_addr) = driver.current_target.socket_addr() else { + tracing::warn!( + %key, + "get: current_target has no socket_addr; \ + cannot claim orphan stream" + ); + assembly.error = Some( + "selected target has no socket address; \ + cannot claim the response stream" + .to_string(), + ); + break result; + }; + + let stream_start = tokio::time::Instant::now(); + match assemble_and_cache_stream(op_manager, peer_addr, stream_id, key, includes_contract) + .await + { + Ok(()) => { + assembly.transfer_duration = Some(stream_start.elapsed()); + assembly.error = None; + break result; + } + Err(e) => { + // Capture the failing peer BEFORE advance() replaces + // `current_target` — the routing penalty must land on + // the candidate whose header never became a stream. + let failed_target = driver.current_target.clone(); + match driver.advance() { + AdvanceOutcome::Next => { + tracing::warn!( + %key, + error = %e, + retries = driver.retries, + "get: stream assembly failed; \ + retrying against next candidate (#4345)" + ); + // Penalize the failed candidate so the router + // learns (mirrors the relay driver's + // claim-failure handling). Only on the + // advancing path: when the budget is + // exhausted, the caller's final route event + // (driven by the failed host_result) already + // records the Failure for the last target — + // emitting here too would double-count it. + if emit_route_failure_on_retry { + emit_get_route_failure(op_manager, attempt_tx, &failed_target, &key) + .await; + } + #[cfg(any(test, feature = "testing"))] + assembly_fault_injection::ASSEMBLY_RETRY_COUNT + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + assembly.error = Some(e); + // Remember the header so a later wire + // exhaustion converts back to Done(Streaming) + // instead of surfacing a false NotFound. + failed_header = Some((key, stream_id, includes_contract, total_size)); + attempt_tx = driver.new_attempt_tx(); + continue; + } + AdvanceOutcome::Exhausted => { + tracing::warn!( + %key, + error = %e, + "get: stream assembly failed and retry budget \ + exhausted — state will not be cached locally" + ); + assembly.error = Some(e); + break result; + } + } + } + } + }; + + (outcome, assembly) +} + +/// Emit a routing-failure observation for a candidate whose streaming +/// header never became a completed stream. Mirrors the route-event +/// emission in `drive_client_get_inner`'s final outcome block. +async fn emit_get_route_failure( + op_manager: &OpManager, + tx: Transaction, + peer: &PeerKeyLocation, + key: &ContractKey, +) { + let route_event = RouteEvent { + peer: peer.clone(), + contract_location: Location::from(key), + outcome: RouteOutcome::Failure, + op_type: Some(crate::node::network_status::OpType::Get), + }; + if let Some(log_event) = + crate::tracing::NetEventLog::route_event(&tx, &op_manager.ring, &route_event) + { + op_manager + .ring + .register_events(either::Either::Left(log_event)) + .await; + } + op_manager.ring.routing_finished(route_event); +} + +/// Test-only fault injection for `assemble_and_cache_stream` (#4345). +/// +/// Keyed by `ContractKey` so concurrently-running tests in the same +/// process cannot consume each other's injection budget. A test arms +/// `n` failures for its own (unique) contract key; the first `n` +/// assembly attempts for that key fail with a synthetic error, +/// exercising the driver's assembly-retry path deterministically. +#[cfg(any(test, feature = "testing"))] +pub mod assembly_fault_injection { + use freenet_stdlib::prelude::ContractKey; + use std::collections::HashMap; + use std::sync::atomic::AtomicUsize; + use std::sync::{Mutex, OnceLock}; + + /// Lifetime count of assembly-failure retries actually taken by + /// GET drivers (incremented when the driver advances to the next + /// candidate after an assembly failure). Tests snapshot + /// before/after to prove the retry path genuinely fired. + pub static ASSEMBLY_RETRY_COUNT: AtomicUsize = AtomicUsize::new(0); + + fn budgets() -> &'static Mutex> { + static BUDGETS: OnceLock>> = OnceLock::new(); + BUDGETS.get_or_init(|| Mutex::new(HashMap::new())) + } + + /// Arm `n` injected assembly failures for `key`. + pub fn inject_failures(key: ContractKey, n: usize) { + budgets().lock().expect("poisoned").insert(key, n); + } + + /// Consume one injected failure for `key`, if armed. + pub(crate) fn consume(key: &ContractKey) -> bool { + let mut map = budgets().lock().expect("poisoned"); + match map.get_mut(key) { + Some(n) if *n > 0 => { + *n -= 1; + if *n == 0 { + map.remove(key); + } + true + } + _ => false, + } + } +} + // --- Host-response construction --- /// Query the local contract store for `(state, contract)` and package @@ -842,10 +1087,17 @@ impl RetryDriver for GetRetryDriver<'_> { /// on the happy path — `process_message` stores before the terminal /// reply fires), synthesize an operation error matching the shape /// `to_host_result` produces on NotFound. +/// +/// `assembly_error` carries the last stream-assembly failure cause when +/// the streaming path exhausted its retry budget without ever caching +/// the state (#4345). It makes the synthesized client error diagnostic +/// — "stream assembly failed: …" — instead of the generic store-lookup +/// message that conflates assembly failures with eviction races. async fn build_host_response( op_manager: &OpManager, instance_id: &ContractInstanceId, return_contract_code: bool, + assembly_error: Option<&str>, ) -> HostResult { let lookup = op_manager .notify_contract_handler(ContractHandlerEvent::GetQuery { @@ -876,16 +1128,15 @@ async fn build_host_response( )) } _ => { + let cause = synthesized_get_error_cause(instance_id, assembly_error); tracing::warn!( contract = %instance_id, + %cause, "get: terminal reply classified success but local \ store lookup returned no state; synthesizing client error" ); Err(ErrorKind::OperationError { - cause: format!( - "GET succeeded on wire but local store lookup failed for {instance_id}" - ) - .into(), + cause: cause.into(), } .into()) } @@ -1091,6 +1342,14 @@ async fn assemble_and_cache_stream( expected_key: ContractKey, includes_contract: bool, ) -> Result<(), String> { + // Test-only deterministic fault injection (#4345). Returning before + // the claim mirrors the production failure (the inbound stream is + // left orphaned for GC), so the retry path is exercised end-to-end. + #[cfg(any(test, feature = "testing"))] + if assembly_fault_injection::consume(&expected_key) { + return Err("injected stream assembly failure (assembly_fault_injection test hook)".into()); + } + let handle = match op_manager .orphan_stream_registry() .claim_or_wait(peer_addr, stream_id, STREAM_CLAIM_TIMEOUT) @@ -1608,7 +1867,17 @@ async fn drive_sub_op_get( response_received_at: None, }; - let loop_result = drive_retry_loop(op_manager, tx, "get-subop", &mut driver).await; + let (loop_result, streaming_assembly) = drive_get_with_assembly_retry( + op_manager, + tx, + "get-subop", + &mut driver, + // Sub-op GETs don't feed the router (no RouteEvent is emitted + // on the sub-op path), so skip the per-retry failure events. + /* emit_route_failure_on_retry */ + false, + ) + .await; // `op_manager.completed(tx)` runs from the // `SubOpGetCompletionGuard` in `run_sub_op_get` AFTER this fn @@ -1632,43 +1901,11 @@ async fn drive_sub_op_get( ) .await; } - Terminal::Streaming { - key, - stream_id, - includes_contract, - // Sub-op GETs don't feed the router (no RouteEvent - // is emitted on the sub-op path), so payload size - // attribution is irrelevant here. - total_size: _, - } => { - if let Some(peer_addr) = driver.current_target.socket_addr() { - if let Err(e) = assemble_and_cache_stream( - op_manager, - peer_addr, - *stream_id, - *key, - *includes_contract, - ) - .await - { - tracing::warn!( - %key, - error = %e, - "get (driver sub-op): stream assembly failed" - ); - } - } else { - // Mirrors the client driver's branch at op_ctx_task.rs:341-347: - // without a socket_addr we cannot claim the orphan stream, so - // the local-store re-query below will return NotFound. Surface - // the breadcrumb so operators can correlate to the failure. - tracing::warn!( - %key, - "get (driver sub-op): current_target has no socket_addr; \ - cannot claim orphan stream" - ); - } - } + // Stream assembly (with per-candidate retry, #4345) + // already ran inside `drive_get_with_assembly_retry`; + // on persistent failure the store re-query below + // returns NotFound with the assembly cause attached. + Terminal::Streaming { .. } => {} Terminal::LocalCompletion => {} } @@ -1696,7 +1933,10 @@ async fn drive_sub_op_get( client_contract, )) } - _ => SubOpGetOutcome::NotFound(missing_state_cause(&instance_id)), + _ => SubOpGetOutcome::NotFound(sub_op_not_found_cause( + &instance_id, + streaming_assembly.error.as_deref(), + )), } } RetryLoopOutcome::Exhausted(cause) => SubOpGetOutcome::NotFound(cause), @@ -1705,6 +1945,37 @@ async fn drive_sub_op_get( } } +/// Cause string for a client GET whose terminal reply was classified +/// success but whose state never reached the local store. Carries the +/// assembly failure cause when the streaming path exhausted its retry +/// budget (#4345); otherwise the generic store-lookup message (covers +/// eviction races and executor rejections). Pure helper so both arms +/// are unit-testable without a real `OpManager`. +fn synthesized_get_error_cause( + instance_id: &ContractInstanceId, + assembly_error: Option<&str>, +) -> String { + match assembly_error { + Some(e) => format!( + "GET response stream assembly failed for {instance_id} \ + after exhausting retries: {e}" + ), + None => format!("GET succeeded on wire but local store lookup failed for {instance_id}"), + } +} + +/// Sub-op variant of [`synthesized_get_error_cause`]: the store-miss +/// cause with the assembly failure appended when one occurred (#4345). +fn sub_op_not_found_cause( + instance_id: &ContractInstanceId, + assembly_error: Option<&str>, +) -> String { + match assembly_error { + Some(e) => format!("{}: {e}", missing_state_cause(instance_id)), + None => missing_state_cause(instance_id), + } +} + /// Cause string for a sub-op GET that succeeded on the wire but /// produced no state in the local store on re-query. Extracted as a /// pure helper so the message format is unit-testable without a @@ -3843,29 +4114,225 @@ mod tests { #[test] fn streaming_terminal_calls_assemble_and_cache_stream() { let src = production_source(); - let body = extract_fn_body(src, "async fn drive_client_get_inner("); - // Find the `Terminal::Streaming` arm of the Done match. - let arm = body - .find("Terminal::Streaming {") - .expect("Done arm must handle Terminal::Streaming"); - // The matching arm must call `assemble_and_cache_stream`. - let tail = &body[arm..]; - // Bound the search to this arm by clipping at the next - // `Terminal::` match arm. - let arm_end = tail[1..] - .find("Terminal::") - .map(|p| p + 1) - .unwrap_or(tail.len()); - let arm_body = &tail[..arm_end]; + // Since #4345, assembly runs inside `drive_get_with_assembly_retry` + // (so a failed assembly can advance to the next candidate and + // re-enter the retry loop). Both drivers must route through the + // wrapper, and the wrapper must perform the assemble+cache. + let client_body = extract_fn_body(src, "async fn drive_client_get_inner("); + assert!( + client_body.contains("drive_get_with_assembly_retry("), + "drive_client_get_inner must drive the loop via \ + `drive_get_with_assembly_retry`. Without it, cold-cache \ + streaming GETs return OperationError because nothing writes \ + the local store (bug #1 in PR #3884 review), and assembly \ + failures burn the whole GET without consuming the retry \ + budget (#4345)." + ); + let sub_op_body = extract_fn_body(src, "async fn drive_sub_op_get("); + assert!( + sub_op_body.contains("drive_get_with_assembly_retry("), + "drive_sub_op_get must drive the loop via \ + `drive_get_with_assembly_retry` — the sub-op path has the \ + same assembly-failure-burns-the-op gap as the client path \ + (#4345)." + ); + let wrapper_body = extract_fn_body(src, "async fn drive_get_with_assembly_retry("); assert!( - arm_body.contains("assemble_and_cache_stream"), - "Terminal::Streaming arm of drive_client_get_inner must call \ - `assemble_and_cache_stream`. Without this, cold-cache streaming \ - GETs return OperationError because nothing writes the local \ - store. See bug #1 in PR #3884 review." + wrapper_body.contains("assemble_and_cache_stream("), + "drive_get_with_assembly_retry must call \ + `assemble_and_cache_stream` on the Streaming terminal." ); } + /// #4345: a failed stream assembly must be a *retryable* attempt + /// failure — advance to the next candidate, fresh attempt tx — and + /// exhaustion must still surface as `Done` (OperationError), never + /// as `Exhausted` (which the caller maps to a false `NotFound`: + /// a streaming header proves the contract exists). + #[test] + fn assembly_failure_advances_with_fresh_tx_and_never_exhausts_to_notfound() { + let src = production_source(); + let body = extract_fn_body(src, "async fn drive_get_with_assembly_retry("); + + // Anchor on the assembly call first so a future earlier + // `Err(e)` arm (e.g. around the claim) can't silently shift + // the slice this test inspects. + let assemble_call = body + .find("match assemble_and_cache_stream") + .expect("wrapper must match on assemble_and_cache_stream"); + let err_arm = body[assemble_call..] + .find("Err(e) =>") + .map(|p| p + assemble_call) + .expect("wrapper must handle assembly Err"); + let err_body = &body[err_arm..]; + + // The routing penalty must land on the candidate whose header + // never became a stream — capture it BEFORE advance() mutates + // `current_target`. + let capture_pos = err_body + .find("let failed_target = driver.current_target.clone()") + .expect("Err arm must capture the failing target"); + let advance_pos = err_body + .find("driver.advance()") + .expect("Err arm must call driver.advance()"); + assert!( + capture_pos < advance_pos, + "failed_target must be captured BEFORE driver.advance() — \ + advance() replaces current_target, so capturing after \ + penalizes the wrong (fresh) candidate." + ); + + // Re-entry must use a fresh attempt tx: reusing the previous tx + // collides with the relay dedup gates (`active_relay_get_txs`) + // while the failed attempt's relay chain is still draining. + assert!( + err_body.contains("attempt_tx = driver.new_attempt_tx()"), + "assembly-failure retry must re-enter the loop with a fresh \ + attempt tx via driver.new_attempt_tx()" + ); + + // The advancing path must penalize the failed candidate (the + // router only learns about header-without-stream peers from + // this call) and must remember the header for the + // wire-exhaustion conversion below. + let next_arm = err_body + .find("AdvanceOutcome::Next =>") + .expect("Err arm must handle Next"); + let next_body = &err_body[next_arm + ..err_body + .find("AdvanceOutcome::Exhausted =>") + .expect("Err arm must handle Exhausted")]; + assert!( + next_body.contains("emit_get_route_failure("), + "the advancing path must emit a route failure for the \ + failed candidate (gated on emit_route_failure_on_retry)" + ); + assert!( + next_body + .contains("failed_header = Some((key, stream_id, includes_contract, total_size))"), + "the advancing path must remember the streaming header so a \ + later wire exhaustion can't surface as a false NotFound" + ); + + // Exhaustion keeps the Done(Streaming) outcome. The wrapper + // only reaches the assembly Err arm after `result` matched + // `Done(Terminal::Streaming { .. })`, so `break result` returns + // that original outcome unchanged. + let exhausted_arm = err_body + .find("AdvanceOutcome::Exhausted =>") + .expect("Err arm must handle Exhausted"); + let exhausted_body = &err_body[exhausted_arm..]; + assert!( + exhausted_body.contains("break result"), + "exhausted assembly retries must break with the original \ + Done(Streaming) outcome (`break result`) so the client sees \ + OperationError, NOT RetryLoopOutcome::Exhausted (which maps \ + to a false NotFound for a contract that provably exists)." + ); + assert!( + !exhausted_body.contains("RetryLoopOutcome::Exhausted("), + "exhausted assembly retries must not be converted into \ + RetryLoopOutcome::Exhausted" + ); + // ...and the budget-exhausted path must NOT emit a route + // failure: the caller's final route event (driven by the + // failed host_result) already records the Failure for the + // last target — emitting here too would double-count it. + assert!( + !exhausted_body.contains("emit_get_route_failure("), + "the budget-exhausted path must not emit a per-retry route \ + failure — the caller's final Failure event covers the last \ + target (double-count otherwise)" + ); + + // Wire exhaustion AFTER a failed assembly must convert back to + // the remembered Done(Streaming): a header proved the contract + // exists, so RetryLoopOutcome::Exhausted (mapped to NotFound by + // both callers) would be a false NotFound. This state is only + // reachable post-#4345 (assembly failure re-enters the loop). + let exhausted_passthrough = body + .find("RetryLoopOutcome::Exhausted(cause) =>") + .expect("wrapper must have an Exhausted pass-through arm"); + let passthrough_body = &body[exhausted_passthrough..]; + assert!( + passthrough_body.contains( + "if let Some((key, stream_id, includes_contract, total_size)) = failed_header" + ), + "the Exhausted pass-through must check failed_header" + ); + assert!( + passthrough_body.contains("break RetryLoopOutcome::Done(Terminal::Streaming {"), + "wire exhaustion after a seen streaming header must convert \ + back to Done(Streaming) so the client sees OperationError, \ + not a false NotFound" + ); + } + + /// #4345: the synthesized client error must carry the assembly + /// failure cause when one occurred, and fall back to the generic + /// store-lookup message otherwise. Behavioral coverage for the + /// cause construction both callers use on the exhaustion path. + #[test] + fn synthesized_error_causes_carry_assembly_failure() { + let instance_id = ContractInstanceId::new([0xC1; 32]); + + let with_assembly = synthesized_get_error_cause( + &instance_id, + Some("stream assembly: no fragments received within inactivity timeout"), + ); + assert!( + with_assembly.contains("stream assembly failed") + && with_assembly.contains("no fragments received"), + "assembly-exhaustion cause must be diagnostic: {with_assembly}" + ); + + let without = synthesized_get_error_cause(&instance_id, None); + assert!( + without.contains("local store lookup failed"), + "fallback cause must keep the store-lookup message: {without}" + ); + assert!( + !without.contains("assembly"), + "fallback cause must not mention assembly: {without}" + ); + + let sub_with = sub_op_not_found_cause(&instance_id, Some("injected failure")); + assert!( + sub_with.contains(&missing_state_cause(&instance_id)) + && sub_with.contains("injected failure"), + "sub-op cause must append the assembly failure: {sub_with}" + ); + assert_eq!( + sub_op_not_found_cause(&instance_id, None), + missing_state_cause(&instance_id) + ); + } + + /// #4345: the injection budget is keyed by contract so parallel + /// tests in one process can't consume each other's failures. + #[test] + fn assembly_fault_injection_is_per_key_and_bounded() { + let key_a = ContractKey::from_id_and_code( + ContractInstanceId::new([0xA1; 32]), + CodeHash::new([0xA2; 32]), + ); + let key_b = ContractKey::from_id_and_code( + ContractInstanceId::new([0xB1; 32]), + CodeHash::new([0xB2; 32]), + ); + + // Unarmed keys never fail. + assert!(!assembly_fault_injection::consume(&key_b)); + + assembly_fault_injection::inject_failures(key_a, 2); + assert!(assembly_fault_injection::consume(&key_a)); + // Other keys are unaffected while a budget is armed. + assert!(!assembly_fault_injection::consume(&key_b)); + assert!(assembly_fault_injection::consume(&key_a)); + // Budget exhausted → assembly succeeds again. + assert!(!assembly_fault_injection::consume(&key_a)); + } + /// Pure-data regression test for the streaming payload shape the /// driver deserializes. Locks down the invariant that /// `GetStreamingPayload` round-trips via bincode, so a regression diff --git a/crates/core/src/tracing/telemetry.rs b/crates/core/src/tracing/telemetry.rs index 43a9734d75..49d7f7f9d5 100644 --- a/crates/core/src/tracing/telemetry.rs +++ b/crates/core/src/tracing/telemetry.rs @@ -127,6 +127,15 @@ pub fn current_timestamp_ms() -> u64 { #[derive(Clone)] pub struct TelemetryReporter { sender: mpsc::Sender, + /// This node's own peer id, stamped on events that are emitted + /// without a `NetLogMessage` carrying one (timeouts, transfer + /// events, transport snapshots). Without it those events carry an + /// empty `peer_id` and cannot be attributed to a sender in the + /// collector (#4345 observability gap). The address portion is + /// best-effort for non-gateway nodes (listener fallback until + /// external-address discovery — same caveat as the shadow-RTT + /// events, see `p2p_impl.rs` and #4294). + local_peer_id: String, } #[allow(dead_code)] @@ -147,8 +156,12 @@ struct TelemetryEvent { impl TelemetryReporter { /// Create a new telemetry reporter. /// + /// `local_peer_id` is this node's own peer id (public key + + /// best-effort address), used to attribute transport-level events + /// that have no `NetLogMessage` context. + /// /// Returns None if telemetry is disabled or in a test environment. - pub fn new(config: &TelemetryConfig) -> Option { + pub fn new(config: &TelemetryConfig, local_peer_id: String) -> Option { if !config.enabled { tracing::info!("Telemetry reporting is disabled"); return None; @@ -188,10 +201,14 @@ impl TelemetryReporter { receiver, transport_snapshot_interval_secs, transfer_event_receiver, + local_peer_id.clone(), ); GlobalExecutor::spawn(worker.run()); - Some(Self { sender }) + Some(Self { + sender, + local_peer_id, + }) } async fn send_event(&self, event: TelemetryEvent) { @@ -233,10 +250,11 @@ impl NetEventRegister for TelemetryReporter { ) -> BoxFuture<'_, ()> { let sender = self.sender.clone(); let op_type = op_type.to_string(); + let local_peer_id = self.local_peer_id.clone(); async move { let event = TelemetryEvent { timestamp: current_timestamp_ms(), - peer_id: String::new(), + peer_id: local_peer_id, transaction_id: tx.to_string(), event_type: "timeout".to_string(), event_data: serde_json::json!({ @@ -272,6 +290,8 @@ struct TelemetryWorker { transport_snapshot_interval_secs: u64, /// Receiver for per-transfer telemetry events from transport layer transfer_event_receiver: mpsc::Receiver, + /// This node's own peer id — see `TelemetryReporter::local_peer_id`. + local_peer_id: String, } impl TelemetryWorker { @@ -280,6 +300,7 @@ impl TelemetryWorker { receiver: mpsc::Receiver, transport_snapshot_interval_secs: u64, transfer_event_receiver: mpsc::Receiver, + local_peer_id: String, ) -> Self { Self { endpoint, @@ -295,6 +316,43 @@ impl TelemetryWorker { rate_limit_window_start: Instant::now(), transport_snapshot_interval_secs, transfer_event_receiver, + local_peer_id, + } + } + + /// Wrap a transport metrics snapshot into a `TelemetryEvent`. + /// Node-wide metrics, attributed to this node's own peer id — + /// same #4345 rationale as `transfer_event_to_telemetry`. + fn snapshot_to_telemetry( + &self, + snapshot: &crate::transport::metrics::TransportSnapshot, + ) -> TelemetryEvent { + TelemetryEvent { + timestamp: current_timestamp_ms(), + peer_id: self.local_peer_id.clone(), + transaction_id: String::new(), // Not tied to a transaction + event_type: "transport_snapshot".to_string(), + event_data: serde_json::to_value(snapshot).unwrap_or_default(), + } + } + + /// Wrap a transport-layer transfer event into a `TelemetryEvent`, + /// stamped with this node's own peer id. The transport layer has + /// no peer-identity context of its own, so before #4345 these + /// events carried an empty `peer_id` and sender attribution in the + /// collector was impossible. + fn transfer_event_to_telemetry(&self, transfer_event: super::TransferEvent) -> TelemetryEvent { + let event_type = match &transfer_event { + super::TransferEvent::Started { .. } => "transfer_started", + super::TransferEvent::Completed { .. } => "transfer_completed", + super::TransferEvent::Failed { .. } => "transfer_failed", + }; + TelemetryEvent { + timestamp: current_timestamp_ms(), + peer_id: self.local_peer_id.clone(), + transaction_id: String::new(), // Not available at transport layer + event_type: event_type.to_string(), + event_data: event_kind_to_json(&super::EventKind::Transfer(transfer_event)), } } @@ -336,18 +394,7 @@ impl TelemetryWorker { transfer_event = self.transfer_event_receiver.recv() => { // Handle per-transfer telemetry events from transport layer if let Some(transfer_event) = transfer_event { - let event_type = match &transfer_event { - super::TransferEvent::Started { .. } => "transfer_started", - super::TransferEvent::Completed { .. } => "transfer_completed", - super::TransferEvent::Failed { .. } => "transfer_failed", - }; - let event = TelemetryEvent { - timestamp: current_timestamp_ms(), - peer_id: String::new(), // Transport layer doesn't have peer identity - transaction_id: String::new(), // Not available at transport layer - event_type: event_type.to_string(), - event_data: event_kind_to_json(&super::EventKind::Transfer(transfer_event)), - }; + let event = self.transfer_event_to_telemetry(transfer_event); self.handle_event(event).await; } }, @@ -358,13 +405,7 @@ impl TelemetryWorker { // Emit transport layer metrics snapshot (only if enabled) if self.transport_snapshot_interval_secs > 0 { if let Some(snapshot) = TRANSPORT_METRICS.take_snapshot() { - let event = TelemetryEvent { - timestamp: current_timestamp_ms(), - peer_id: String::new(), // Transport metrics are node-wide, not peer-specific - transaction_id: String::new(), // Not tied to a transaction - event_type: "transport_snapshot".to_string(), - event_data: serde_json::to_value(&snapshot).unwrap_or_default(), - }; + let event = self.snapshot_to_telemetry(&snapshot); self.handle_event(event).await; } } @@ -506,10 +547,21 @@ fn to_otlp_logs(events: &[TelemetryEvent]) -> serde_json::Value { serde_json::json!({ "resourceLogs": [{ "resource": { - "attributes": [{ - "key": "service.name", - "value": {"stringValue": "freenet-peer"} - }] + "attributes": [ + { + "key": "service.name", + "value": {"stringValue": "freenet-peer"} + }, + { + // Standard OTLP placement for the sender's + // crate version — lets collector queries + // attribute every event type to a release + // without joining against peer_startup events + // (#4345 observability gap). + "key": "service.version", + "value": {"stringValue": env!("CARGO_PKG_VERSION")} + } + ] }, "scopeLogs": [{ "scope": { @@ -1841,6 +1893,119 @@ mod tests { assert_eq!(peer_attr["value"]["stringValue"], "test-peer-id-xyz"); } + #[test] + fn test_transport_snapshot_carries_local_peer_id() { + // #4345 observability gap, same class as transfer_failed: + // transport_snapshot events carried an empty peer_id. The + // worker must stamp its own peer id on snapshot events. + let (_cmd_tx, cmd_rx) = mpsc::channel(1); + let (_transfer_tx, transfer_rx) = mpsc::channel(1); + let worker = TelemetryWorker::new( + "http://localhost:4318".to_string(), + cmd_rx, + 0, + transfer_rx, + "snapshot-peer-id".to_string(), + ); + let snapshot = crate::transport::metrics::TransportSnapshot::default(); + let event = worker.snapshot_to_telemetry(&snapshot); + assert_eq!(event.event_type, "transport_snapshot"); + assert_eq!( + event.peer_id, "snapshot-peer-id", + "transport snapshots must be attributed to the emitting \ + node's own peer id (#4345)" + ); + } + + #[tokio::test] + async fn test_timeout_events_carry_local_peer_id() { + // #4345 observability gap, same class as transfer_failed: + // timeout events carried an empty peer_id. + use crate::message::Transaction; + use crate::operations::get::GetMsg; + + let (sender, mut receiver) = mpsc::channel(1); + let mut reporter = TelemetryReporter { + sender, + local_peer_id: "timeout-peer-id".to_string(), + }; + let tx = Transaction::new::(); + reporter + .notify_of_time_out(tx, "get", Some("target-peer".to_string())) + .await; + let TelemetryCommand::Event(event) = + receiver.try_recv().expect("timeout event must be sent") + else { + panic!("expected an Event command"); + }; + assert_eq!(event.event_type, "timeout"); + assert_eq!( + event.peer_id, "timeout-peer-id", + "timeout events must be attributed to the emitting node's \ + own peer id (#4345)" + ); + } + + #[test] + fn test_otlp_resource_includes_service_version() { + // #4345 observability gap: every OTLP export must carry the + // sender's crate version as a standard resource attribute, so + // collector queries can attribute events (transfer_failed in + // particular) to a release without joining against + // peer_startup events. + let event = TelemetryEvent { + timestamp: 1_700_000_000_000, + peer_id: "some-peer".to_string(), + transaction_id: String::new(), + event_type: "transfer_failed".to_string(), + event_data: serde_json::json!({}), + }; + let otlp = to_otlp_logs(std::slice::from_ref(&event)); + let resource_attrs = &otlp["resourceLogs"][0]["resource"]["attributes"]; + let version_attr = resource_attrs + .as_array() + .expect("resource attributes must be an array") + .iter() + .find(|a| a["key"] == "service.version") + .expect("service.version resource attribute must be present"); + assert_eq!( + version_attr["value"]["stringValue"], + env!("CARGO_PKG_VERSION") + ); + } + + #[test] + fn test_transfer_events_carry_local_peer_id() { + // #4345 observability gap: transfer_failed events carried an + // empty peer_id, making sender attribution in the collector + // impossible. The worker must stamp its own peer id on every + // transport-level transfer event. + let (_cmd_tx, cmd_rx) = mpsc::channel(1); + let (_transfer_tx, transfer_rx) = mpsc::channel(1); + let worker = TelemetryWorker::new( + "http://localhost:4318".to_string(), + cmd_rx, + 0, + transfer_rx, + "test-local-peer-id".to_string(), + ); + let event = worker.transfer_event_to_telemetry(crate::tracing::TransferEvent::Failed { + stream_id: 42, + peer_addr: "127.0.0.1:31337".parse().unwrap(), + bytes_transferred: 1024, + reason: "cwnd wait timeout".to_string(), + elapsed_ms: 3000, + direction: crate::tracing::TransferDirection::Send, + timestamp: 1_700_000_000_000, + }); + assert_eq!(event.event_type, "transfer_failed"); + assert_eq!( + event.peer_id, "test-local-peer-id", + "transfer events must be attributed to the emitting node's \ + own peer id (#4345)" + ); + } + #[test] fn test_otlp_serialization_empty_peer_id_when_unset() { // Pin the back-compat path: events sent via the original diff --git a/crates/core/tests/streaming_e2e.rs b/crates/core/tests/streaming_e2e.rs index 530a06c8f8..1048136392 100644 --- a/crates/core/tests/streaming_e2e.rs +++ b/crates/core/tests/streaming_e2e.rs @@ -1289,3 +1289,232 @@ fn test_driver_inline_get_triggers_auto_subscribe() { .collect::>() ); } + +// ============================================================================= +// Test: GET retries after stream-assembly failure (#4345) +// ============================================================================= + +/// Build the sparse-but-routable topology used by +/// `test_streaming_get_retries_after_assembly_failure`. max_connections +/// below the node count keeps the PUT fan-out from reaching every node +/// (so a cold getter exists), while min_connections = 3 gives nodes +/// enough ring candidates for the assembly-failure retry to advance to. +async fn setup_assembly_retry_network(name: &str, seed: u64, threshold: usize) -> SimNetwork { + GlobalRng::set_seed(seed); + const BASE_EPOCH_MS: u64 = 1577836800000; + const RANGE_MS: u64 = 5 * 365 * 24 * 60 * 60 * 1000; + GlobalSimulationTime::set_time_ms(BASE_EPOCH_MS + (seed % RANGE_MS)); + let mut sim = SimNetwork::new( + name, 1, // gateways + 12, // nodes + 7, // ring_max_htl + 3, // rnd_if_htl_above + 4, // max_connections (sparse: < node count → cold getter exists) + 3, // min_connections (≥ 2 retry candidates per node) + seed, + ) + .await; + sim.with_streaming_threshold(threshold); + sim +} + +/// Regression test for #4345: a failed stream assembly must consume the +/// GET retry budget and advance to the next candidate instead of +/// synthesizing a client error with the budget untouched. +/// +/// Production failure chain: packet loss → FixedRate loss-pause caps the +/// cwnd → the sender aborts on the 3s cwnd-wait timeout → the receiver's +/// stream assembly hits its inactivity timeout → the originator's GET +/// driver (pre-fix) logged a WARN and synthesized "GET succeeded on wire +/// but local store lookup failed" without retrying — one transport +/// hiccup burned the whole GET even though `MAX_RETRIES` was untouched +/// and other candidates could serve the contract. +/// +/// Uses the two-phase cold-node isolation strategy from +/// `test_driver_streaming_get_cold_cache`, but on a routable htl=7 +/// topology (the HTL=1 variant is not routable — see that test's +/// #[ignore]): phase 1 runs the PUT alone to find the nodes the fan-out +/// missed; phase 2 re-runs the same seed (same ring) and GETs from a +/// cold node with exactly one injected assembly failure armed for this +/// contract key (via `get_assembly_fault_injection`, which mirrors the +/// production failure by leaving the inbound stream orphaned). The +/// driver must advance to another candidate and still deliver the full +/// state. +/// +/// Not every fan-out-missed node can route a GET to a holder in a +/// sparse topology (a routing property, not the #4345 assembly-retry +/// property under test), so phase 2 iterates the cold candidates +/// deterministically until one demonstrates the retry path end-to-end. +/// On a regression every candidate fails and the test fails with the +/// per-candidate diagnostics. +#[test] +fn test_streaming_get_retries_after_assembly_failure() { + use freenet::dev_tool::GET_DRIVER_CALL_COUNT; + use freenet::dev_tool::get_assembly_fault_injection; + use std::sync::atomic::Ordering; + + const SEED: u64 = 0xDE1A_4345_0000_0001; + const NETWORK_NAME_PHASE1: &str = "get-assembly-retry-p1"; + const THRESHOLD: usize = 1024; + const LARGE_STATE_SIZE: usize = 100 * 1024; // 100KB, above THRESHOLD + const NODES: usize = 12; + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let contract = SimOperation::create_test_contract(0x45); + let large_state = SimOperation::create_large_state(LARGE_STATE_SIZE, 0x45); + let contract_key = contract.key(); + let contract_id = *contract_key.id(); + + // Phase 1: run the PUT in isolation to discover the nodes that did + // NOT receive the state during fan-out. + let sim1 = rt.block_on(setup_assembly_retry_network( + NETWORK_NAME_PHASE1, + SEED, + THRESHOLD, + )); + let phase1_ops = vec![ScheduledOperation::new( + NodeLabel::gateway(NETWORK_NAME_PHASE1, 0), + SimOperation::Put { + contract: contract.clone(), + state: large_state.clone(), + subscribe: false, + }, + )]; + let phase1 = sim1.run_controlled_simulation( + SEED, + phase1_ops, + Duration::from_secs(120), + Duration::from_secs(30), + ); + assert!( + phase1.turmoil_result.is_ok(), + "Phase 1 (PUT only) should complete: {:?}", + phase1.turmoil_result.err() + ); + let cold_nodes: Vec = (1..=NODES) + .filter(|i| { + let label = NodeLabel::node(NETWORK_NAME_PHASE1, *i); + phase1 + .node_storages + .get(&label) + .is_some_and(|s| s.get_stored_state(&contract_key).is_none()) + }) + .collect(); + assert!( + !cold_nodes.is_empty(), + "Test precondition: no cold-cache node exists after the PUT in \ + this {NODES}-node sparse topology. All nodes received the state \ + during fan-out. Pick a different SEED or sparser max_connections." + ); + + let mut failures: Vec = Vec::new(); + let mut demonstrated = false; + // Cap the candidates tried: each iteration is a full phase-2 sim + // run (~15s), and under a regression EVERY candidate fails — an + // uncapped loop over up to 11 cold nodes would hit the nextest ci + // profile's termination timeout and lose the per-candidate + // diagnostics below. The sim is deterministic per seed; with this + // seed the first four cold candidates cannot route a GET to a + // holder at all (their retry exhausts on the wire) and the fifth + // demonstrates the property, so the cap leaves one spare. + for (attempt, cold_idx) in cold_nodes.iter().copied().take(6).enumerate() { + let network_name = format!("get-assembly-retry-p2-{attempt}"); + + // (Re-)arm exactly one injected assembly failure for THIS + // contract key — keying by contract makes this safe under + // parallel test execution in the same process. `inject_failures` + // overwrites, so a previous candidate that never streamed leaves + // no stale budget behind. (ASSEMBLY_RETRY_COUNT itself is + // process-global; the before/after delta below relies on + // nextest's process-per-test isolation in CI.) + get_assembly_fault_injection::inject_failures(contract_key, 1); + let retries_before = + get_assembly_fault_injection::ASSEMBLY_RETRY_COUNT.load(Ordering::SeqCst); + let baseline_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst); + + let sim2 = rt.block_on(setup_assembly_retry_network(&network_name, SEED, THRESHOLD)); + let cold_node_label = NodeLabel::node(&network_name, cold_idx); + let phase2_ops = vec![ + ScheduledOperation::new( + NodeLabel::gateway(&network_name, 0), + SimOperation::Put { + contract: contract.clone(), + state: large_state.clone(), + subscribe: false, + }, + ), + ScheduledOperation::new( + cold_node_label.clone(), + SimOperation::Get { + contract_id, + return_contract_code: false, + subscribe: false, + }, + ), + ]; + let phase2 = sim2.run_controlled_simulation( + SEED, + phase2_ops, + Duration::from_secs(300), + Duration::from_secs(120), + ); + if let Err(e) = &phase2.turmoil_result { + failures.push(format!("node {cold_idx}: simulation error: {e:?}")); + continue; + } + + // Driver-isolation probe: the GET must have gone through the + // task-per-tx driver (not the local-cache shortcut), otherwise + // the assembly-retry path was never reachable. + let driver_calls = GET_DRIVER_CALL_COUNT.load(Ordering::SeqCst); + if driver_calls == baseline_calls { + failures.push(format!( + "node {cold_idx}: GET never reached the driver (locally cached?)" + )); + continue; + } + + let retries_after = + get_assembly_fault_injection::ASSEMBLY_RETRY_COUNT.load(Ordering::SeqCst); + let stored = phase2 + .node_storages + .get(&cold_node_label) + .and_then(|s| s.get_stored_state(&contract_key)); + + match (retries_after > retries_before, stored) { + (true, Some(state)) => { + // The driver took the assembly-failure retry path AND + // the retry delivered the full, correct state. + let stored_bytes: Vec = state.as_ref().to_vec(); + assert_eq!( + stored_bytes, large_state, + "Stored state bytes should match the original 100KB \ + state after the assembly-failure retry" + ); + demonstrated = true; + break; + } + (took_retry, stored) => { + failures.push(format!( + "node {cold_idx}: retry_path_taken={took_retry} \ + state_present={} (streaming GET likely never reached \ + a holder from this candidate, or the retry burned \ + the GET — #4345 regression if this repeats for every \ + candidate)", + stored.is_some() + )); + } + } + } + + assert!( + demonstrated, + "No cold candidate demonstrated the #4345 assembly-failure retry \ + path (injected failure → driver.advance() → fresh attempt → \ + state delivered). Per-candidate outcomes: {failures:#?}" + ); +}