Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Per-test timeout. Any test that runs longer than `period * terminate-after`
# (= 5 minutes) is killed, instead of being allowed to consume the full
# GitHub Actions job timeout (6 hours). See node-components#134 for the
# incident that motivated this guard.
[profile.default]
slow-timeout = { period = "60s", terminate-after = 5 }
89 changes: 51 additions & 38 deletions crates/rpc/src/eth/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,54 +1057,59 @@ where
{
let task = async move {
let fm = ctx.filter_manager();
let mut entry = fm
.get_mut(id)
.ok_or_else(|| EthError::InvalidParams(format!("filter not found: {id}")))?;

// Scan the global reorg ring buffer for notifications received
// since this filter's last poll, then lazily compute removed logs
// and rewind `next_start_block`.
let reorgs = fm.reorgs_since(entry.last_poll_time());
let removed = entry.compute_removed_logs(&reorgs);
if !removed.is_empty() {
trace!(count = removed.len(), "computed removed logs from reorg ring buffer");
}

// Snapshot filter state under the shard lock, then drop it before
// any cold-storage `.await`. Holding a `DashMap` `RefMut` across
// an await can deadlock the current_thread runtime when another
// task tries to acquire the same shard (parking_lot's RwLock
// parks the OS thread on contention).
let (is_block, stored_filter, start, removed, empty_out) = {
let mut entry = fm
.get_mut(id)
.ok_or_else(|| EthError::InvalidParams(format!("filter not found: {id}")))?;

// Scan the global reorg ring buffer for notifications received
// since this filter's last poll, then lazily compute removed
// logs and rewind `next_start_block`.
let reorgs = fm.reorgs_since(entry.last_poll_time());
let removed = entry.compute_removed_logs(&reorgs);
if !removed.is_empty() {
trace!(count = removed.len(), "computed removed logs from reorg ring buffer");
}

(
entry.is_block(),
entry.as_filter().cloned(),
entry.next_start_block(),
removed,
entry.empty_output(),
)
};

let latest = ctx.tags().latest();
let start = entry.next_start_block();

// Implicit reorg detection: if latest has moved backward past our
// window, a reorg occurred that we missed (e.g. broadcast lagged).
// Return any removed logs we do have, then reset.
if latest + 1 < start {
trace!(latest, start, "implicit reorg detected, resetting filter");
entry.touch_poll_time();
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});
}

if start > latest {
entry.touch_poll_time();
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});
// Early returns: implicit reorg (latest moved backward past our
// window) or no new blocks since last poll. Either way, just
// update the poll timestamp and return.
if latest + 1 < start || start > latest {
if latest + 1 < start {
trace!(latest, start, "implicit reorg detected, resetting filter");
}
if let Some(mut entry) = fm.get_mut(id) {
entry.touch_poll_time();
}
return Ok(if removed.is_empty() { empty_out } else { FilterOutput::from(removed) });
}

let cold = ctx.cold();

if entry.is_block() {
let result = if is_block {
let specs: Vec<_> = (start..=latest).map(HeaderSpecifier::Number).collect();
let headers = cold.get_headers(specs).await?;
let hashes: Vec<B256> = headers.into_iter().flatten().map(|h| h.hash()).collect();
entry.mark_polled(latest);
Ok(FilterOutput::from(hashes))
FilterOutput::from(hashes)
} else {
let stored = entry.as_filter().cloned().unwrap();
let stored = stored_filter.expect("log filter must carry a stored filter spec");
let resolved = Filter {
block_option: alloy::rpc::types::FilterBlockOption::Range {
from_block: Some(BlockNumberOrTag::Number(start)),
Expand All @@ -1128,9 +1133,17 @@ where
logs = combined;
}

FilterOutput::from(logs)
};

// Commit the poll cursor. Tolerate a concurrent uninstall: the
// caller already received `result`; future polls on this id will
// fail with "filter not found".
if let Some(mut entry) = fm.get_mut(id) {
entry.mark_polled(latest);
Ok(FilterOutput::from(logs))
}

Ok(result)
};

await_handler!(hctx.spawn(task), EthError::task_panic())
Expand Down
6 changes: 6 additions & 0 deletions crates/rpc/src/interest/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ impl FilterManagerInner {
}

/// Get a filter by ID.
///
/// The returned [`RefMut`] holds a `parking_lot` write lock on a
/// [`DashMap`] shard. Do not hold it across `.await`: on a
/// current_thread runtime a colliding `get_mut` from another task
/// will park the OS thread and deadlock the runtime. (Same family
/// as the [`DashMap::retain`] hazard documented on [`FilterManager`].)
pub(crate) fn get_mut(&self, id: FilterId) -> Option<RefMut<'_, U64, ActiveFilter>> {
self.filters.get_mut(&id)
}
Expand Down