Skip to content

fix: tokio::spawn futures for FuturesOrdered#7104

Merged
hanabi1224 merged 5 commits into
mainfrom
hm/tokio-spawn-futures-ordered
May 26, 2026
Merged

fix: tokio::spawn futures for FuturesOrdered#7104
hanabi1224 merged 5 commits into
mainfrom
hm/tokio-spawn-futures-ordered

Conversation

@hanabi1224
Copy link
Copy Markdown
Contributor

@hanabi1224 hanabi1224 commented May 25, 2026

Summary of changes

Changes introduced in this pull request:

Reference issue to close (if applicable)

Closes

Other information and links

Change checklist

  • I have performed a self-review of my own code,
  • I have made corresponding changes to the documentation. All new code adheres to the team's documentation standards,
  • I have added tests that prove my fix is effective or that my feature works (if possible),
  • I have made sure the CHANGELOG is up-to-date. All user-facing changes should be reflected in this document.

Outside contributions

  • I have read and agree to the CONTRIBUTING document.
  • I have read and agree to the AI Policy document. I understand that failure to comply with the guidelines will lead to rejection of the pull request.

Summary by CodeRabbit

  • Refactor

    • Safer, more concurrent Ethereum event/filter collection across tipsets.
    • Preserved specific error responses during log parsing while improving parsing behavior.
    • Optimized per-tipset state computation scheduling for better throughput.
    • Improved proofs parameter download concurrency using a more robust task model.
  • New Features

    • Added a public task utility that automatically aborts spawned tasks on drop.

Review Change Stack

@hanabi1224 hanabi1224 added the RPC requires calibnet RPC checks to run on CI label May 25, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 25, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Wrap ParsedFilter in Arc across RPC filter paths, update filter module signatures and Matcher imports, spawn per-tipset tokio tasks with abort-handle tracking, migrate proofs parameter downloads to JoinSet, and add an AbortHandles utility.

Changes

ParsedFilter Arc wrapping and concurrent task coordination

Layer / File(s) Summary
Filter module infrastructure: imports, Matcher, signature
src/rpc/methods/eth/filter/mod.rs
Bring crate::prelude::* into scope, update anyhow imports, annotate Matcher with #[auto_impl::auto_impl(&, Arc)], and change get_events_for_parsed_filter to accept &Arc<ParsedFilter>.
Concurrent event collection with tokio::spawn
src/rpc/methods/eth/filter/mod.rs
Refactor collect_events_for_tipsets to spawn per-tipset tokio::spawn tasks, shallow-clone state_manager and optional matcher, track AbortHandles, and aggregate task outputs while enforcing the cross-tipset cap.
EthGetLogs Arc wrapping and error preservation
src/rpc/methods/eth.rs
EthGetLogs now wraps the parsed filter in Arc and preserves EthErrors::BlockRangeExceeded without adding parse-error context.
EthGetFilterLogs and EthGetFilterChanges Arc adaptation
src/rpc/methods/eth.rs
EthGetFilterLogs and EthGetFilterChanges pass parsed filters as &Arc::new(...), including EventFilter conversion and ParsedFilter::new_with_tipset(...) branches for TipSet/Mempool filters.
GetActorEventsRaw Arc wrapping
src/rpc/methods/misc.rs
GetActorEventsRaw parses ActorEventFilter into Arc<ParsedFilter> and reorganizes imports (crate::prelude::*, etc.).
ForestStateCompute per-tipset async task spawning
src/rpc/methods/state.rs
Per-tipset state computation/loading is scheduled with tokio::spawn, and the results loop unwraps the JoinHandle output before recompute/validation logic.
Parameter download concurrency migration to JoinSet
src/utils/proofs_api/paramfetch.rs
Proofs parameter download concurrency moved from FuturesUnordered to tokio::task::JoinSet, spawning fetch tasks and awaiting via join semantics; imports updated.
AbortHandles utility and export
src/utils/mod.rs, src/utils/task/mod.rs
Add pub mod task; and new AbortHandles(Vec<tokio::task::AbortHandle>) type with Drop that aborts stored handles on destruction.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant EthRPC
  participant EthFilterModule
  participant TokioTask
  participant StateManager
  Client->>EthRPC: request logs/filter
  EthRPC->>EthFilterModule: call get_events_for_parsed_filter(&Arc<ParsedFilter>)
  EthFilterModule->>TokioTask: spawn per-tipset tokio::spawn(task with cloned matcher/state_manager)
  TokioTask->>StateManager: load/compute state for tipset
  StateManager-->>TokioTask: return events/results
  TokioTask-->>EthFilterModule: task result (via JoinHandle)
  EthFilterModule->>EthRPC: aggregate results (enforce cap) and return
  EthRPC->>Client: response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • ChainSafe/forest#7025: Related updates to the Ethereum event-filtering pipeline and matcher semantics in src/rpc/methods/eth/filter/mod.rs.
  • ChainSafe/forest#6856: Overlaps on EthGetLogs error-path handling for EthErrors::BlockRangeExceeded.

Suggested reviewers

  • sudo-shashank
  • LesnyRumcajs
  • akaladarshi
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.22% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'fix: tokio::spawn futures for FuturesOrdered' is specific and directly related to the main changes in the changeset, which involve spawning tokio tasks in multiple modules (eth/filter, state, and paramfetch).
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch hm/tokio-spawn-futures-ordered
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch hm/tokio-spawn-futures-ordered

Comment @coderabbitai help to get the list of available commands and usage tips.

@hanabi1224 hanabi1224 marked this pull request as ready for review May 26, 2026 00:00
@hanabi1224 hanabi1224 requested a review from a team as a code owner May 26, 2026 00:00
@hanabi1224 hanabi1224 requested review from LesnyRumcajs and sudo-shashank and removed request for a team May 26, 2026 00:00
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/utils/proofs_api/paramfetch.rs (1)

99-117: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Fix swallowed parameter download errors in get_params

src/utils/proofs_api/paramfetch.rs awaits JoinSet::from_iter(...).join_all().await but discards the returned results. Since fetch_verify_params(...) -> anyhow::Result<()>, any Err from a failed download/verification is ignored and get_params still returns Ok(())—so ensure_proof_params_downloaded() will mark RUN_ONCE as completed and won’t retry.

Propagate task + inner errors
-    JoinSet::from_iter(
+    let results = JoinSet::from_iter(
         params
             .into_iter()
             .filter(|(name, info)| match storage_size {
                 SectorSizeOpt::Keys => !name.ends_with("params"),
                 SectorSizeOpt::Size(size) => {
                     size as u64 == info.sector_size || !name.ends_with(".params")
                 }
                 SectorSizeOpt::All => true,
             })
             .map(|(name, info)| {
                 let data_dir = data_dir.to_owned();
                 async move { fetch_verify_params(&data_dir, &name, Arc::new(info)).await }
             }),
     )
     .join_all()
     .await;
 
+    for result in results {
+        result.context("parameter fetch task panicked")??;
+    }
+
     Ok(())
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/proofs_api/paramfetch.rs` around lines 99 - 117, The current
get_params call creates a JoinSet of async tasks that call fetch_verify_params
but discards the Vec of results from .join_all().await, swallowing both task
JoinErrors and the inner anyhow::Result errors; change get_params to capture the
join_all result into a variable, iterate over each join result and propagate
failures (return Err) instead of ignoring them—handle the JoinError (task
panics/cancellation) and then unwrap or propagate the inner anyhow::Result from
fetch_verify_params (use ? or map_err to convert into the function's
anyhow::Result). Locate the block that builds
JoinSet::from_iter(...).map(|(name, info)| async move {
fetch_verify_params(&data_dir, &name, Arc::new(info)).await }) and replace the
discard with proper error checking so ensure_proof_params_downloaded() only
marks RUN_ONCE when all tasks succeed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/rpc/methods/eth/filter/mod.rs`:
- Around line 299-320: collect_events_for_tipsets currently spawns per-tipset
tasks with tokio::spawn and collects them via FuturesOrdered, but on early
returns (e.g., when tasks.try_next().await? or ensure_filter_cap(...)?) the
remaining JoinHandles are simply dropped and keep running; change this to a
cancellation-aware pattern: replace the tokio::spawn + FuturesOrdered approach
in collect_events_for_tipsets with a JoinSet (or create AbortHandle/Abortable
per task) so you can call joinset.abort_all() (or AbortHandle::abort()) on any
early error/return path, and ensure all spawned tasks (the closures that call
Self::collect_events) are awaited/aborted before returning to avoid background
scans continuing; locate references to tokio::spawn, FuturesOrdered, and the
async closures that call Self::collect_events to implement this.

---

Outside diff comments:
In `@src/utils/proofs_api/paramfetch.rs`:
- Around line 99-117: The current get_params call creates a JoinSet of async
tasks that call fetch_verify_params but discards the Vec of results from
.join_all().await, swallowing both task JoinErrors and the inner anyhow::Result
errors; change get_params to capture the join_all result into a variable,
iterate over each join result and propagate failures (return Err) instead of
ignoring them—handle the JoinError (task panics/cancellation) and then unwrap or
propagate the inner anyhow::Result from fetch_verify_params (use ? or map_err to
convert into the function's anyhow::Result). Locate the block that builds
JoinSet::from_iter(...).map(|(name, info)| async move {
fetch_verify_params(&data_dir, &name, Arc::new(info)).await }) and replace the
discard with proper error checking so ensure_proof_params_downloaded() only
marks RUN_ONCE when all tasks succeed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: e7e19912-5946-4d14-b3e2-2c548b12b4ea

📥 Commits

Reviewing files that changed from the base of the PR and between 934b8ad and b179a47.

📒 Files selected for processing (5)
  • src/rpc/methods/eth.rs
  • src/rpc/methods/eth/filter/mod.rs
  • src/rpc/methods/misc.rs
  • src/rpc/methods/state.rs
  • src/utils/proofs_api/paramfetch.rs

Comment thread src/rpc/methods/eth/filter/mod.rs
@codecov
Copy link
Copy Markdown

codecov Bot commented May 26, 2026

Codecov Report

❌ Patch coverage is 54.54545% with 20 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.36%. Comparing base (df636b3) to head (8347575).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/rpc/methods/eth.rs 17.64% 13 Missing and 1 partial ⚠️
src/rpc/methods/state.rs 0.00% 3 Missing ⚠️
src/rpc/methods/eth/filter/mod.rs 88.23% 0 Missing and 2 partials ⚠️
src/rpc/methods/misc.rs 50.00% 1 Missing ⚠️
Additional details and impacted files
Files with missing lines Coverage Δ
src/utils/mod.rs 82.45% <ø> (ø)
src/utils/task/mod.rs 100.00% <100.00%> (ø)
src/rpc/methods/misc.rs 89.28% <50.00%> (ø)
src/rpc/methods/eth/filter/mod.rs 89.13% <88.23%> (+0.02%) ⬆️
src/rpc/methods/state.rs 44.40% <0.00%> (-0.03%) ⬇️
src/rpc/methods/eth.rs 65.38% <17.64%> (-0.03%) ⬇️

... and 6 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update df636b3...8347575. Read the comment docs.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/utils/task/mod.rs`:
- Line 6: Update the doc comment that currently reads "Holds a collection of
[`AbortHandle`] and abort them automatically on drop" to use correct
subject-verb agreement: change "abort them" to "aborts them" so it reads "Holds
a collection of [`AbortHandle`] and aborts them automatically on drop"; this is
the documentation comment associated with the type that stores [`AbortHandle`]
instances in src/utils/task/mod.rs—edit that comment accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 0ce24220-6753-4e02-8c3d-408e8f1a327b

📥 Commits

Reviewing files that changed from the base of the PR and between b179a47 and 9dcc86a.

📒 Files selected for processing (3)
  • src/rpc/methods/eth/filter/mod.rs
  • src/utils/mod.rs
  • src/utils/task/mod.rs

Comment thread src/utils/task/mod.rs Outdated
@hanabi1224 hanabi1224 force-pushed the hm/tokio-spawn-futures-ordered branch 2 times, most recently from c477ec1 to ae08cee Compare May 26, 2026 00:53
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/utils/proofs_api/paramfetch.rs (1)

99-118: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Restore fail-fast behavior for spawned proof parameter downloads

src/utils/proofs_api/paramfetch.rs uses JoinSet::join_all().await, which waits for all fetch_verify_params tasks to finish before returning, so the first anyhow::Result<()> error is reported late and the remaining downloads keep running. Prefer a join_next() loop so the function returns on the first task Err (dropping the JoinSet cancels the rest), and add context for easier debugging.

💡 Suggested change
-    for task in JoinSet::from_iter(
-        params
-            .into_iter()
-            .filter(|(name, info)| match storage_size {
-                SectorSizeOpt::Keys =&gt; !name.ends_with("params"),
-                SectorSizeOpt::Size(size) =&gt; {
-                    size as u64 == info.sector_size || !name.ends_with(".params")
-                }
-                SectorSizeOpt::All =&gt; true,
-            })
-            .map(|(name, info)| {
-                let data_dir = data_dir.to_owned();
-                async move { fetch_verify_params(&amp;data_dir, &amp;name, Arc::new(info)).await }
-            }),
-    )
-    .join_all()
-    .await
-    {
-        _ = task?;
-    }
+    let mut tasks: JoinSet&lt;anyhow::Result&lt;()&gt;&gt; = params
+        .into_iter()
+        .filter(|(name, info)| match storage_size {
+            SectorSizeOpt::Keys =&gt; !name.ends_with("params"),
+            SectorSizeOpt::Size(size) =&gt; size as u64 == info.sector_size || !name.ends_with(".params"),
+            SectorSizeOpt::All =&gt; true,
+        })
+        .map(|(name, info)| {
+            let data_dir = data_dir.to_owned();
+            async move { fetch_verify_params(&amp;data_dir, &amp;name, Arc::new(info)).await }
+        })
+        .collect();
+
+    while let Some(task) = tasks.join_next().await {
+        task.context("proof parameter download task failed")??;
+    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/proofs_api/paramfetch.rs` around lines 99 - 118, The current use of
JoinSet::from_iter(...).join_all().await in paramfetch.rs causes the function to
wait for all fetch_verify_params tasks and delays error reporting; replace the
join_all() pattern with constructing a JoinSet, spawning each async task
(calling fetch_verify_params with data_dir/name/Arc::new(info)), then loop using
join_next().await to handle each completed task and return early on the first
Err (dropping the JoinSet will cancel remaining tasks); when propagating errors
wrap them with context (e.g., include the parameter name or storage_size info)
to aid debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/utils/proofs_api/paramfetch.rs`:
- Around line 99-118: The current use of
JoinSet::from_iter(...).join_all().await in paramfetch.rs causes the function to
wait for all fetch_verify_params tasks and delays error reporting; replace the
join_all() pattern with constructing a JoinSet, spawning each async task
(calling fetch_verify_params with data_dir/name/Arc::new(info)), then loop using
join_next().await to handle each completed task and return early on the first
Err (dropping the JoinSet will cancel remaining tasks); when propagating errors
wrap them with context (e.g., include the parameter name or storage_size info)
to aid debugging.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 1dbb9a0d-d93c-4393-a0e9-c548463a3134

📥 Commits

Reviewing files that changed from the base of the PR and between c477ec1 and ae08cee.

📒 Files selected for processing (4)
  • src/rpc/methods/eth/filter/mod.rs
  • src/utils/mod.rs
  • src/utils/proofs_api/paramfetch.rs
  • src/utils/task/mod.rs

@hanabi1224 hanabi1224 force-pushed the hm/tokio-spawn-futures-ordered branch from ae08cee to 599a9c1 Compare May 26, 2026 01:05
@hanabi1224 hanabi1224 force-pushed the hm/tokio-spawn-futures-ordered branch from 599a9c1 to 34c5653 Compare May 26, 2026 03:32
Comment thread src/rpc/methods/state.rs
Comment thread src/utils/task/mod.rs Outdated
LesnyRumcajs
LesnyRumcajs previously approved these changes May 26, 2026
@hanabi1224 hanabi1224 enabled auto-merge May 26, 2026 09:18
Co-authored-by: Hubert <lesny.rumcajs+github@gmail.com>
@hanabi1224 hanabi1224 added this pull request to the merge queue May 26, 2026
Merged via the queue into main with commit 32a6885 May 26, 2026
33 checks passed
@hanabi1224 hanabi1224 deleted the hm/tokio-spawn-futures-ordered branch May 26, 2026 10:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

RPC requires calibnet RPC checks to run on CI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants