diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 00000000..46ffe077 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,6 @@ +# Per-test timeout. Any test that runs longer than `period * terminate-after` +# (= 5 minutes) is killed, instead of being allowed to consume the full +# GitHub Actions job timeout (6 hours). See node-components#134 for the +# incident that motivated this guard. +[profile.default] +slow-timeout = { period = "60s", terminate-after = 5 } diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 1ce6604b..4498a684 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1057,54 +1057,59 @@ where { let task = async move { let fm = ctx.filter_manager(); - let mut entry = fm - .get_mut(id) - .ok_or_else(|| EthError::InvalidParams(format!("filter not found: {id}")))?; - - // Scan the global reorg ring buffer for notifications received - // since this filter's last poll, then lazily compute removed logs - // and rewind `next_start_block`. - let reorgs = fm.reorgs_since(entry.last_poll_time()); - let removed = entry.compute_removed_logs(&reorgs); - if !removed.is_empty() { - trace!(count = removed.len(), "computed removed logs from reorg ring buffer"); - } + + // Snapshot filter state under the shard lock, then drop it before + // any cold-storage `.await`. Holding a `DashMap` `RefMut` across + // an await can deadlock the current_thread runtime when another + // task tries to acquire the same shard (parking_lot's RwLock + // parks the OS thread on contention). + let (is_block, stored_filter, start, removed, empty_out) = { + let mut entry = fm + .get_mut(id) + .ok_or_else(|| EthError::InvalidParams(format!("filter not found: {id}")))?; + + // Scan the global reorg ring buffer for notifications received + // since this filter's last poll, then lazily compute removed + // logs and rewind `next_start_block`. + let reorgs = fm.reorgs_since(entry.last_poll_time()); + let removed = entry.compute_removed_logs(&reorgs); + if !removed.is_empty() { + trace!(count = removed.len(), "computed removed logs from reorg ring buffer"); + } + + ( + entry.is_block(), + entry.as_filter().cloned(), + entry.next_start_block(), + removed, + entry.empty_output(), + ) + }; let latest = ctx.tags().latest(); - let start = entry.next_start_block(); - - // Implicit reorg detection: if latest has moved backward past our - // window, a reorg occurred that we missed (e.g. broadcast lagged). - // Return any removed logs we do have, then reset. - if latest + 1 < start { - trace!(latest, start, "implicit reorg detected, resetting filter"); - entry.touch_poll_time(); - return Ok(if removed.is_empty() { - entry.empty_output() - } else { - FilterOutput::from(removed) - }); - } - if start > latest { - entry.touch_poll_time(); - return Ok(if removed.is_empty() { - entry.empty_output() - } else { - FilterOutput::from(removed) - }); + // Early returns: implicit reorg (latest moved backward past our + // window) or no new blocks since last poll. Either way, just + // update the poll timestamp and return. + if latest + 1 < start || start > latest { + if latest + 1 < start { + trace!(latest, start, "implicit reorg detected, resetting filter"); + } + if let Some(mut entry) = fm.get_mut(id) { + entry.touch_poll_time(); + } + return Ok(if removed.is_empty() { empty_out } else { FilterOutput::from(removed) }); } let cold = ctx.cold(); - if entry.is_block() { + let result = if is_block { let specs: Vec<_> = (start..=latest).map(HeaderSpecifier::Number).collect(); let headers = cold.get_headers(specs).await?; let hashes: Vec = headers.into_iter().flatten().map(|h| h.hash()).collect(); - entry.mark_polled(latest); - Ok(FilterOutput::from(hashes)) + FilterOutput::from(hashes) } else { - let stored = entry.as_filter().cloned().unwrap(); + let stored = stored_filter.expect("log filter must carry a stored filter spec"); let resolved = Filter { block_option: alloy::rpc::types::FilterBlockOption::Range { from_block: Some(BlockNumberOrTag::Number(start)), @@ -1128,9 +1133,17 @@ where logs = combined; } + FilterOutput::from(logs) + }; + + // Commit the poll cursor. Tolerate a concurrent uninstall: the + // caller already received `result`; future polls on this id will + // fail with "filter not found". + if let Some(mut entry) = fm.get_mut(id) { entry.mark_polled(latest); - Ok(FilterOutput::from(logs)) } + + Ok(result) }; await_handler!(hctx.spawn(task), EthError::task_panic()) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 3f97ffd5..cea6fd48 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -160,6 +160,12 @@ impl FilterManagerInner { } /// Get a filter by ID. + /// + /// The returned [`RefMut`] holds a `parking_lot` write lock on a + /// [`DashMap`] shard. Do not hold it across `.await`: on a + /// current_thread runtime a colliding `get_mut` from another task + /// will park the OS thread and deadlock the runtime. (Same family + /// as the [`DashMap::retain`] hazard documented on [`FilterManager`].) pub(crate) fn get_mut(&self, id: FilterId) -> Option> { self.filters.get_mut(&id) }