From ef0ada2127a3ee49165ac90a180d55dccbc3616f Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 24 Jul 2025 22:00:48 +0800 Subject: [PATCH 01/10] feat: add `--unordered` to `forest-cli snapshot export` --- src/chain/mod.rs | 48 +++- src/cli/subcommands/snapshot_cmd.rs | 37 ++- src/db/gc/snapshot.rs | 8 +- src/ipld/util.rs | 249 ++++++++---------- src/rpc/methods/chain.rs | 77 +++++- src/rpc/mod.rs | 1 + .../api_cmd/test_snapshots_ignored.txt | 1 + src/tool/subcommands/archive_cmd.rs | 14 +- 8 files changed, 263 insertions(+), 172 deletions(-) diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 6658ec45d764..db94406de439 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -6,7 +6,7 @@ use crate::blocks::{Tipset, TipsetKey}; use crate::cid_collections::CidHashSet; use crate::db::car::forest; use crate::db::{SettingsStore, SettingsStoreExt}; -use crate::ipld::stream_chain; +use crate::ipld::{stream_chain, unordered_stream_chain}; use crate::utils::io::{AsyncWriterWithChecksum, Checksum}; use crate::utils::stream::par_buffer; use anyhow::Context as _; @@ -17,17 +17,23 @@ use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter}; pub use self::{store::*, weight::*}; +#[derive(Debug, Clone, Default)] +pub struct ExportOptions { + pub skip_checksum: bool, + pub unordered: bool, + pub seen: CidHashSet, +} + pub async fn export_from_head( db: &Arc, lookup_depth: ChainEpochDelta, writer: impl AsyncWrite + Unpin, - seen: CidHashSet, - skip_checksum: bool, + option: Option, ) -> anyhow::Result<(Tipset, Option>), Error> { let head_key = SettingsStoreExt::read_obj::(db, crate::db::setting_keys::HEAD_KEY)? .context("chain head key not found")?; let head_ts = Tipset::load_required(&db, &head_key)?; - let digest = export::(db, &head_ts, lookup_depth, writer, seen, skip_checksum).await?; + let digest = export::(db, &head_ts, lookup_depth, writer, option).await?; Ok((head_ts, digest)) } @@ -36,9 +42,14 @@ pub async fn export( tipset: &Tipset, lookup_depth: ChainEpochDelta, writer: impl AsyncWrite + Unpin, - seen: CidHashSet, - skip_checksum: bool, + option: Option, ) -> anyhow::Result>, Error> { + let ExportOptions { + skip_checksum, + unordered, + seen, + } = option.unwrap_or_default(); + let stateroot_lookup_limit = tipset.epoch() - lookup_depth; let roots = tipset.key().to_cids(); @@ -52,12 +63,25 @@ pub async fn export( // are small enough that keeping 1k in memory isn't a problem. Average // block size is between 1kb and 2kb. 1024, - stream_chain( - Arc::clone(db), - tipset.clone().chain_owned(Arc::clone(db)), - stateroot_lookup_limit, - ) - .with_seen(seen), + if unordered { + futures::future::Either::Left( + unordered_stream_chain( + Arc::clone(db), + tipset.clone().chain_owned(Arc::clone(db)), + stateroot_lookup_limit, + ) + .with_seen(seen), + ) + } else { + futures::future::Either::Right( + stream_chain( + Arc::clone(db), + tipset.clone().chain_owned(Arc::clone(db)), + stateroot_lookup_limit, + ) + .with_seen(seen), + ) + }, ); // Encode Ipld key-value pairs in zstd frames diff --git a/src/cli/subcommands/snapshot_cmd.rs b/src/cli/subcommands/snapshot_cmd.rs index d938504e10e5..285f0a955b85 100644 --- a/src/cli/subcommands/snapshot_cmd.rs +++ b/src/cli/subcommands/snapshot_cmd.rs @@ -6,14 +6,14 @@ use crate::chain_sync::SyncConfig; use crate::cli_shared::snapshot::{self, TrustedVendor}; use crate::db::car::forest::new_forest_car_temp_path_in; use crate::networks::calibnet; -use crate::rpc::types::ApiTipsetKey; -use crate::rpc::{self, chain::ChainExportParams, prelude::*}; +use crate::rpc::{self, chain::ForestChainExportParams, prelude::*, types::ApiTipsetKey}; use anyhow::Context as _; use chrono::DateTime; use clap::Subcommand; use human_repr::HumanCount; +use num::Zero; use std::path::{Path, PathBuf}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::io::AsyncWriteExt; #[derive(Debug, Subcommand)] @@ -35,6 +35,9 @@ pub enum SnapshotCommands { /// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`. #[arg(short, long)] depth: Option, + /// Traverse chain in non-deterministic order for better performance with more parallelization. + #[arg(long)] + unordered: bool, }, } @@ -47,6 +50,7 @@ impl SnapshotCommands { dry_run, tipset, depth, + unordered, } => { let chain_head = ChainHead::call(&client, ()).await?; @@ -82,21 +86,23 @@ impl SnapshotCommands { let output_dir = output_path.parent().context("invalid output path")?; let temp_path = new_forest_car_temp_path_in(output_dir)?; - let params = ChainExportParams { + let params = ForestChainExportParams { epoch, recent_roots: depth.unwrap_or(SyncConfig::default().recent_state_roots), output_path: temp_path.to_path_buf(), tipset_keys: ApiTipsetKey(Some(chain_head.key().clone())), + unordered, skip_checksum, dry_run, }; let handle = tokio::spawn({ + let start = Instant::now(); let tmp_file = temp_path.to_owned(); let output_path = output_path.clone(); async move { let mut interval = - tokio::time::interval(tokio::time::Duration::from_secs_f32(0.25)); + tokio::time::interval(tokio::time::Duration::from_secs_f32(0.5)); println!("Getting ready to export..."); loop { interval.tick().await; @@ -108,10 +114,17 @@ impl SnapshotCommands { anes::MoveCursorToPreviousLine(1), anes::ClearLine::All ); + let elapsed_secs = start.elapsed().as_secs_f64(); println!( - "{}: {}", + "{}: {} ({}/s)", &output_path.to_string_lossy(), - snapshot_size.human_count_bytes() + snapshot_size.human_count_bytes(), + if elapsed_secs.is_zero() { + 0. + } else { + (snapshot_size as f64) / elapsed_secs + } + .human_count_bytes(), ); let _ = std::io::stdout().flush(); } @@ -120,16 +133,18 @@ impl SnapshotCommands { // Manually construct RpcRequest because snapshot export could // take a few hours on mainnet let hash_result = client - .call(ChainExport::request((params,))?.with_timeout(Duration::MAX)) + .call(ForestChainExport::request((params,))?.with_timeout(Duration::MAX)) .await?; handle.abort(); let _ = handle.await; - if let Some(hash) = hash_result { - save_checksum(&output_path, hash).await?; + if !dry_run { + if let Some(hash) = hash_result { + save_checksum(&output_path, hash).await?; + } + temp_path.persist(output_path)?; } - temp_path.persist(output_path)?; println!("Export completed."); Ok(()) diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index 0d7cb99368c5..d1abf8f50b6d 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -43,7 +43,7 @@ //! use crate::blocks::{Tipset, TipsetKey}; -use crate::cid_collections::CidHashSet; +use crate::chain::ExportOptions; use crate::cli_shared::chain_path; use crate::db::car::forest::new_forest_car_temp_path_in; use crate::db::{ @@ -225,8 +225,10 @@ where &db, self.recent_state_roots, file, - CidHashSet::default(), - true, + Some(ExportOptions { + skip_checksum: true, + ..Default::default() + }), ) .await?; let target_path = self.car_db_dir.join(format!( diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 4f6bc1343794..ead8c8b075da 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -10,8 +10,8 @@ use crate::utils::encoding::extract_cids; use crate::utils::multihash::prelude::*; use anyhow::Context as _; use cid::Cid; -use flume::TryRecvError; -use futures::Stream; +use futures::stream::Fuse; +use futures::{Stream, StreamExt}; use fvm_ipld_blockstore::Blockstore; use parking_lot::Mutex; use pin_project_lite::pin_project; @@ -119,8 +119,14 @@ pin_project! { } impl ChainStream { - pub fn with_seen(self, seen: CidHashSet) -> Self { - ChainStream { seen, ..self } + pub fn with_seen(mut self, seen: CidHashSet) -> Self { + self.seen = seen; + self + } + + pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self { + self.fail_on_dead_links = fail_on_dead_links; + self } #[allow(dead_code)] @@ -162,14 +168,7 @@ pub fn stream_graph, ITER: Iterator tipset_iter: ITER, stateroot_limit: ChainEpoch, ) -> ChainStream { - ChainStream { - tipset_iter, - db, - dfs: VecDeque::new(), - seen: CidHashSet::default(), - stateroot_limit, - fail_on_dead_links: false, - } + stream_chain(db, tipset_iter, stateroot_limit).fail_on_dead_links(false) } impl, ITER: Iterator + Unpin> Stream @@ -179,6 +178,8 @@ impl, ITER: Iterator + Unpin> Stream fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { use Task::*; + let fail_on_dead_links = self.fail_on_dead_links; + let stateroot_limit = self.stateroot_limit; let this = self.project(); let ipld_to_cid = |ipld| { @@ -188,7 +189,6 @@ impl, ITER: Iterator + Unpin> Stream None }; - let stateroot_limit = *this.stateroot_limit; loop { while let Some(task) = this.dfs.front_mut() { match task { @@ -197,7 +197,7 @@ impl, ITER: Iterator + Unpin> Stream this.dfs.pop_front(); if let Some(data) = this.db.get(&cid)? { return Poll::Ready(Some(Ok(CarBlock { cid, data }))); - } else if *this.fail_on_dead_links { + } else if fail_on_dead_links { return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid)))); } } @@ -219,7 +219,7 @@ impl, ITER: Iterator + Unpin> Stream } } return Poll::Ready(Some(Ok(CarBlock { cid, data }))); - } else if *this.fail_on_dead_links { + } else if fail_on_dead_links { return Poll::Ready(Some(Err(anyhow::anyhow!( "missing key: {}", cid @@ -279,26 +279,36 @@ impl, ITER: Iterator + Unpin> Stream } pin_project! { - pub struct UnorderedChainStream { + pub struct UnorderedChainStream<'a, DB, T> { tipset_iter: T, db: Arc, seen: Arc>, worker_handle: JoinHandle>, - block_receiver: flume::Receiver>, + block_recv_stream: Fuse>>, extract_sender: flume::Sender, stateroot_limit: ChainEpoch, queue: Vec, fail_on_dead_links: bool, } - impl PinnedDrop for UnorderedChainStream { + impl<'a, DB, T> PinnedDrop for UnorderedChainStream<'a, DB, T> { fn drop(this: Pin<&mut Self>) { this.worker_handle.abort() } } } -impl UnorderedChainStream { +impl<'a, DB, T> UnorderedChainStream<'a, DB, T> { + pub fn with_seen(self, seen: CidHashSet) -> Self { + *self.seen.lock() = seen; + self + } + + pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self { + self.fail_on_dead_links = fail_on_dead_links; + self + } + pub fn into_seen(self) -> CidHashSet { let mut set = CidHashSet::new(); let mut guard = self.seen.lock(); @@ -318,8 +328,8 @@ impl UnorderedChainStream { /// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, in-depth. /// This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` is the /// number of `[`Tipset`]` that needs inspection. -#[allow(dead_code)] pub fn unordered_stream_chain< + 'a, DB: Blockstore + Sync + Send + 'static, T: Borrow, ITER: Iterator + Unpin + Send + 'static, @@ -327,7 +337,7 @@ pub fn unordered_stream_chain< db: Arc, tipset_iter: ITER, stateroot_limit: ChainEpoch, -) -> UnorderedChainStream { +) -> UnorderedChainStream<'a, DB, ITER> { let (sender, receiver) = flume::bounded(BLOCK_CHANNEL_LIMIT); let (extract_sender, extract_receiver) = flume::unbounded(); let fail_on_dead_links = true; @@ -344,7 +354,7 @@ pub fn unordered_stream_chain< seen, db, worker_handle: handle, - block_receiver: receiver, + block_recv_stream: receiver.into_stream().fuse(), queue: Vec::new(), extract_sender, tipset_iter, @@ -356,6 +366,7 @@ pub fn unordered_stream_chain< // Stream available graph in unordered search. All reachable nodes are touched and dead-links // are ignored. pub fn unordered_stream_graph< + 'a, DB: Blockstore + Sync + Send + 'static, T: Borrow, ITER: Iterator + Unpin + Send + 'static, @@ -363,34 +374,16 @@ pub fn unordered_stream_graph< db: Arc, tipset_iter: ITER, stateroot_limit: ChainEpoch, -) -> UnorderedChainStream { - let (sender, receiver) = flume::bounded(2048); - let (extract_sender, extract_receiver) = flume::unbounded(); - let fail_on_dead_links = false; - let seen = Arc::new(Mutex::new(CidHashSet::default())); - let handle = UnorderedChainStream::::start_workers( - db.clone(), - sender.clone(), - extract_receiver, - seen.clone(), - fail_on_dead_links, - ); - - UnorderedChainStream { - seen, - db, - worker_handle: handle, - block_receiver: receiver, - queue: Vec::new(), - tipset_iter, - extract_sender, - stateroot_limit, - fail_on_dead_links, - } +) -> UnorderedChainStream<'a, DB, ITER> { + unordered_stream_chain(db, tipset_iter, stateroot_limit).fail_on_dead_links(false) } -impl, ITER: Iterator + Unpin> - UnorderedChainStream +impl< + 'a, + DB: Blockstore + Send + Sync + 'static, + T: Borrow, + ITER: Iterator + Unpin, +> UnorderedChainStream<'a, DB, ITER> { fn start_workers( db: Arc, @@ -401,8 +394,7 @@ impl, ITER: Iterator JoinHandle> { task::spawn(async move { let mut handles = JoinSet::new(); - - for _ in 0..num_cpus::get() { + for _ in 0..num_cpus::get().clamp(1, 4) { let seen = seen.clone(); let extract_receiver = extract_receiver.clone(); let db = db.clone(); @@ -418,14 +410,19 @@ impl, ITER: Iterator, ITER: Iterator + Unpin> Stream - for UnorderedChainStream +impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Unpin> Stream + for UnorderedChainStream<'a, DB, T> { type Item = anyhow::Result; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let receive_block = || { - if let Ok(item) = this.block_receiver.try_recv() { - return Some(item); - } - None - }; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let stateroot_limit = self.stateroot_limit; + let fail_on_dead_links = self.fail_on_dead_links; + loop { - while let Some(cid) = this.queue.pop() { - if let Some(data) = this.db.get(&cid)? { + while let Some(cid) = self.queue.pop() { + if let Some(data) = self.db.get(&cid)? { return Poll::Ready(Some(Ok(CarBlock { cid, data }))); - } else if *this.fail_on_dead_links { + } else if fail_on_dead_links { return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid)))); } } - if let Some(block) = receive_block() { - return Poll::Ready(Some(block)); - } - - let stateroot_limit = *this.stateroot_limit; - // This consumes a [`Tipset`] from the iterator one at a time. Workers are then processing - // the extract queue. The emit queue is processed in the loop above. Once the desired depth - // has been reached yield a block without walking the graph it represents. - if let Some(tipset) = this.tipset_iter.next() { - for block in tipset.into_block_headers().into_iter() { - if this.seen.lock().insert(*block.cid()) { - // Make sure we always yield a block, directly to the stream to avoid extra - // work. - this.queue.push(*block.cid()); - - if block.epoch == 0 { - // The genesis block has some kind of dummy parent that needs to be emitted. - for p in &block.parents { - this.queue.push(p); - } - } + match Pin::new(&mut self.block_recv_stream).poll_next(cx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(block)) => return Poll::Ready(Some(block)), + _ => { + let this = self.as_mut().project(); + // This consumes a [`Tipset`] from the iterator one at a time. Workers are then processing + // the extract queue. The emit queue is processed in the loop above. Once the desired depth + // has been reached yield a block without walking the graph it represents. + if let Some(tipset) = this.tipset_iter.next() { + for block in tipset.into_block_headers().into_iter() { + if this.seen.lock().insert(*block.cid()) { + // Make sure we always yield a block, directly to the stream to avoid extra + // work. + this.queue.push(*block.cid()); + + if block.epoch == 0 { + // The genesis block has some kind of dummy parent that needs to be emitted. + for p in &block.parents { + this.queue.push(p); + } + } - // Process block messages. - if block.epoch > stateroot_limit - && should_save_block_to_snapshot(block.messages) - { - if this.db.has(&block.messages)? { - this.extract_sender.send(block.messages)?; - // This will simply return an error once we reach that item in - // the queue. - } else if *this.fail_on_dead_links { - this.queue.push(block.messages); - } else { - // Make sure we update seen here as we don't send the block for - // inspection. - this.seen.lock().insert(block.messages); - } - } + // Process block messages. + if block.epoch > stateroot_limit + && should_save_block_to_snapshot(block.messages) + { + if this.db.has(&block.messages)? { + this.extract_sender.send(block.messages)?; + // This will simply return an error once we reach that item in + // the queue. + } else if fail_on_dead_links { + this.queue.push(block.messages); + } else { + // Make sure we update seen here as we don't send the block for + // inspection. + this.seen.lock().insert(block.messages); + } + } - // Visit the block if it's within required depth. And a special case for `0` - // epoch to match Lotus' implementation. - if (block.epoch == 0 || block.epoch > stateroot_limit) - && should_save_block_to_snapshot(block.state_root) - { - if this.db.has(&block.state_root)? { - this.extract_sender.send(block.state_root)?; - // This will simply return an error once we reach that item in - // the queue. - } else if *this.fail_on_dead_links { - this.queue.push(block.state_root); - } else { - // Make sure we update seen here as we don't send the block for - // inspection. - this.seen.lock().insert(block.state_root); + // Visit the block if it's within required depth. And a special case for `0` + // epoch to match Lotus' implementation. + if (block.epoch == 0 || block.epoch > stateroot_limit) + && should_save_block_to_snapshot(block.state_root) + { + if this.db.has(&block.state_root)? { + this.extract_sender.send(block.state_root)?; + // This will simply return an error once we reach that item in + // the queue. + } else if fail_on_dead_links { + this.queue.push(block.state_root); + } else { + // Make sure we update seen here as we don't send the block for + // inspection. + this.seen.lock().insert(block.state_root); + } + } } } - } - } - } else { - match this.block_receiver.try_recv() { - Ok(item) => return Poll::Ready(Some(item)), - Err(err) => { - if this.extract_sender.is_empty() { - this.worker_handle.abort(); - return Poll::Ready(None); - // This should never happen, because both `extract_sender` and - // `block_receiver` are held by worker_handle and their counterparts - - // by the main process. So those are either both functional or both - // closed. - } else if err == TryRecvError::Disconnected { - panic!( - "block_receiver can only be closed after extract_sender is empty" - ) - } + } else if this.extract_sender.is_empty() { + this.worker_handle.abort(); } } } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index b0704c041995..40addd49a4db 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -9,7 +9,7 @@ use types::*; use crate::blocks::RawBlockHeader; use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey}; use crate::chain::index::ResolveNullTipset; -use crate::chain::{ChainStore, HeadChange}; +use crate::chain::{ChainStore, ExportOptions, HeadChange}; use crate::cid_collections::CidHashSet; use crate::ipld::DfsIter; use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; @@ -222,25 +222,26 @@ impl RpcMethod<1> for ChainPruneSnapshot { } } -pub enum ChainExport {} -impl RpcMethod<1> for ChainExport { - const NAME: &'static str = "Filecoin.ChainExport"; +pub enum ForestChainExport {} +impl RpcMethod<1> for ForestChainExport { + const NAME: &'static str = "Forest.ChainExport"; const PARAM_NAMES: [&'static str; 1] = ["params"]; const API_PATHS: BitFlags = ApiPaths::all(); const PERMISSION: Permission = Permission::Read; - type Params = (ChainExportParams,); + type Params = (ForestChainExportParams,); type Ok = Option; async fn handle( ctx: Ctx, (params,): Self::Params, ) -> Result { - let ChainExportParams { + let ForestChainExportParams { epoch, recent_roots, output_path, tipset_keys: ApiTipsetKey(tsk), + unordered, skip_checksum, dry_run, } = params; @@ -265,14 +266,18 @@ impl RpcMethod<1> for ChainExport { ctx.chain_index() .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?; + let option = Some(ExportOptions { + skip_checksum, + unordered, + ..Default::default() + }); match if dry_run { crate::chain::export::( &ctx.store_owned(), &start_ts, recent_roots, VoidAsyncWriter, - CidHashSet::default(), - skip_checksum, + option, ) .await } else { @@ -282,8 +287,7 @@ impl RpcMethod<1> for ChainExport { &start_ts, recent_roots, file, - CidHashSet::default(), - skip_checksum, + option, ) .await } { @@ -293,6 +297,45 @@ impl RpcMethod<1> for ChainExport { } } +pub enum ChainExport {} +impl RpcMethod<1> for ChainExport { + const NAME: &'static str = "Filecoin.ChainExport"; + const PARAM_NAMES: [&'static str; 1] = ["params"]; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Read; + + type Params = (ChainExportParams,); + type Ok = Option; + + async fn handle( + ctx: Ctx, + (params,): Self::Params, + ) -> Result { + let ChainExportParams { + epoch, + recent_roots, + output_path, + tipset_keys, + skip_checksum, + dry_run, + } = params; + + ForestChainExport::handle( + ctx, + (ForestChainExportParams { + unordered: false, + epoch, + recent_roots, + output_path, + tipset_keys, + skip_checksum, + dry_run, + },), + ) + .await + } +} + pub enum ChainReadObj {} impl RpcMethod<1> for ChainReadObj { const NAME: &'static str = "Filecoin.ChainReadObj"; @@ -840,6 +883,20 @@ pub struct ApiMessage { lotus_json_with_self!(ApiMessage); +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct ForestChainExportParams { + pub epoch: ChainEpoch, + pub recent_roots: i64, + pub output_path: PathBuf, + #[schemars(with = "LotusJson")] + #[serde(with = "crate::lotus_json")] + pub tipset_keys: ApiTipsetKey, + pub unordered: bool, + pub skip_checksum: bool, + pub dry_run: bool, +} +lotus_json_with_self!(ForestChainExportParams); + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct ChainExportParams { pub epoch: ChainEpoch, diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d37221c23a5c..00d836b41850 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -78,6 +78,7 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::chain::ChainSetHead); $callback!($crate::rpc::chain::ChainStatObj); $callback!($crate::rpc::chain::ChainTipSetWeight); + $callback!($crate::rpc::chain::ForestChainExport); // common vertical $callback!($crate::rpc::common::Session); diff --git a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt index d665bb42e907..83b21d6206a7 100644 --- a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt +++ b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt @@ -78,6 +78,7 @@ Filecoin.WalletSignMessage Filecoin.WalletValidateAddress Filecoin.WalletVerify Filecoin.Web3ClientVersion +Forest.ChainExport Forest.ChainGetMinBaseFee Forest.NetInfo Forest.SnapshotGC diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index a94ab544786a..354cee1096e4 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -27,6 +27,7 @@ //! Additional reading: [`crate::db::car::plain`] use crate::blocks::Tipset; +use crate::chain::ExportOptions; use crate::chain::{ ChainEpochDelta, index::{ChainIndex, ResolveNullTipset}, @@ -510,7 +511,18 @@ async fn do_export( pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1)); let writer = pb.wrap_async_write(writer); - crate::chain::export::(store, &ts, depth, writer, seen, true).await?; + crate::chain::export::( + store, + &ts, + depth, + writer, + Some(ExportOptions { + skip_checksum: true, + seen, + ..Default::default() + }), + ) + .await?; Ok(()) } From a45e66e132cf0e982418c7f493bc8718768c237e Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 24 Jul 2025 23:06:55 +0800 Subject: [PATCH 02/10] less db read ops --- src/blocks/header.rs | 14 +++++- src/ipld/util.rs | 116 ++++++++++++++++++++++++------------------- 2 files changed, 78 insertions(+), 52 deletions(-) diff --git a/src/blocks/header.rs b/src/blocks/header.rs index dbbb19f3af6f..12dad6e5986c 100644 --- a/src/blocks/header.rs +++ b/src/blocks/header.rs @@ -14,10 +14,12 @@ use crate::shim::{ address::Address, crypto::Signature, econ::TokenAmount, sector::PoStProof, version::NetworkVersion, }; -use crate::utils::{cid::CidCborExt as _, encoding::blake2b_256}; +use crate::utils::encoding::blake2b_256; +use crate::utils::multihash::MultihashCode; use cid::Cid; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::CborStore as _; +use multihash_derive::MultihashDigest as _; use num::BigInt; use serde::{Deserialize, Serialize}; use serde_tuple::{Deserialize_tuple, Serialize_tuple}; @@ -96,7 +98,15 @@ impl Default for RawBlockHeader { impl RawBlockHeader { pub fn cid(&self) -> Cid { - Cid::from_cbor_blake2b256(self).unwrap() + self.car_block().expect("Infallible").0 + } + pub fn car_block(&self) -> anyhow::Result<(Cid, Vec)> { + let data = fvm_ipld_encoding::to_vec(self)?; + let cid = Cid::new_v1( + fvm_ipld_encoding::DAG_CBOR, + MultihashCode::Blake2b256.digest(&data), + ); + Ok((cid, data)) } pub(super) fn tipset_sort_key(&self) -> Option<([u8; 32], Vec)> { let ticket_hash = blake2b_256(self.ticket.as_ref()?.vrfproof.as_bytes()); diff --git a/src/ipld/util.rs b/src/ipld/util.rs index ead8c8b075da..08166c56d22b 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -102,7 +102,7 @@ impl Iterator for DfsIter { enum Task { // Yield the block, don't visit it. - Emit(Cid), + Emit(Cid, Option>), // Visit all the elements, recursively. Iterate(VecDeque), } @@ -192,13 +192,19 @@ impl, ITER: Iterator + Unpin> Stream loop { while let Some(task) = this.dfs.front_mut() { match task { - Emit(cid) => { - let cid = *cid; - this.dfs.pop_front(); - if let Some(data) = this.db.get(&cid)? { + Emit(_, _) => { + if let Some(Emit(cid, data)) = this.dfs.pop_front() { + let data = if let Some(data) = data { + data + } else if let Some(data) = this.db.get(&cid)? { + data + } else { + return Poll::Ready(Some(Err(anyhow::anyhow!( + "missing key: {}", + cid + )))); + }; return Poll::Ready(Some(Ok(CarBlock { cid, data }))); - } else if fail_on_dead_links { - return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid)))); } } Iterate(cid_vec) => { @@ -212,10 +218,11 @@ impl, ITER: Iterator + Unpin> Stream if let Some(data) = this.db.get(&cid)? { if cid.codec() == fvm_ipld_encoding::DAG_CBOR { let new_values = extract_cids(&data)?; - cid_vec.reserve(new_values.len()); - - for v in new_values.into_iter().rev() { - cid_vec.push_front(v) + if !new_values.is_empty() { + cid_vec.reserve(new_values.len()); + for v in new_values.into_iter().rev() { + cid_vec.push_front(v) + } } } return Poll::Ready(Some(Ok(CarBlock { cid, data }))); @@ -237,14 +244,15 @@ impl, ITER: Iterator + Unpin> Stream // yield the block without walking the graph it represents. if let Some(tipset) = this.tipset_iter.next() { for block in tipset.borrow().block_headers() { - if this.seen.insert(*block.cid()) { + let (cid, data) = block.car_block()?; + if this.seen.insert(cid) { // Make sure we always yield a block otherwise. - this.dfs.push_back(Emit(*block.cid())); + this.dfs.push_back(Emit(*block.cid(), Some(data))); if block.epoch == 0 { // The genesis block has some kind of dummy parent that needs to be emitted. for p in &block.parents { - this.dfs.push_back(Emit(p)); + this.dfs.push_back(Emit(p, None)); } } @@ -287,7 +295,7 @@ pin_project! { block_recv_stream: Fuse>>, extract_sender: flume::Sender, stateroot_limit: ChainEpoch, - queue: Vec, + queue: Vec<(Cid, Option>)>, fail_on_dead_links: bool, } @@ -304,11 +312,6 @@ impl<'a, DB, T> UnorderedChainStream<'a, DB, T> { self } - pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self { - self.fail_on_dead_links = fail_on_dead_links; - self - } - pub fn into_seen(self) -> CidHashSet { let mut set = CidHashSet::new(); let mut guard = self.seen.lock(); @@ -318,17 +321,7 @@ impl<'a, DB, T> UnorderedChainStream<'a, DB, T> { } } -/// Stream all blocks that are reachable before the `stateroot_limit` epoch in an unordered fashion. -/// After this limit, only block headers are streamed. Any dead links are reported as errors. -/// -/// # Arguments -/// -/// * `db` - A database that implements [`Blockstore`] interface. -/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`. -/// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, in-depth. -/// This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` is the -/// number of `[`Tipset`]` that needs inspection. -pub fn unordered_stream_chain< +fn unordered_stream_chain_inner< 'a, DB: Blockstore + Sync + Send + 'static, T: Borrow, @@ -337,14 +330,14 @@ pub fn unordered_stream_chain< db: Arc, tipset_iter: ITER, stateroot_limit: ChainEpoch, + fail_on_dead_links: bool, ) -> UnorderedChainStream<'a, DB, ITER> { let (sender, receiver) = flume::bounded(BLOCK_CHANNEL_LIMIT); let (extract_sender, extract_receiver) = flume::unbounded(); - let fail_on_dead_links = true; let seen = Arc::new(Mutex::new(CidHashSet::default())); let handle = UnorderedChainStream::::start_workers( db.clone(), - sender.clone(), + sender, extract_receiver, seen.clone(), fail_on_dead_links, @@ -363,6 +356,29 @@ pub fn unordered_stream_chain< } } +/// Stream all blocks that are reachable before the `stateroot_limit` epoch in an unordered fashion. +/// After this limit, only block headers are streamed. Any dead links are reported as errors. +/// +/// # Arguments +/// +/// * `db` - A database that implements [`Blockstore`] interface. +/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`. +/// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, in-depth. +/// This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` is the +/// number of `[`Tipset`]` that needs inspection. +pub fn unordered_stream_chain< + 'a, + DB: Blockstore + Sync + Send + 'static, + T: Borrow, + ITER: Iterator + Unpin + Send + 'static, +>( + db: Arc, + tipset_iter: ITER, + stateroot_limit: ChainEpoch, +) -> UnorderedChainStream<'a, DB, ITER> { + unordered_stream_chain_inner(db, tipset_iter, stateroot_limit, true) +} + // Stream available graph in unordered search. All reachable nodes are touched and dead-links // are ignored. pub fn unordered_stream_graph< @@ -375,7 +391,7 @@ pub fn unordered_stream_graph< tipset_iter: ITER, stateroot_limit: ChainEpoch, ) -> UnorderedChainStream<'a, DB, ITER> { - unordered_stream_chain(db, tipset_iter, stateroot_limit).fail_on_dead_links(false) + unordered_stream_chain_inner(db, tipset_iter, stateroot_limit, false) } impl< @@ -394,7 +410,7 @@ impl< ) -> JoinHandle> { task::spawn(async move { let mut handles = JoinSet::new(); - for _ in 0..num_cpus::get().clamp(1, 4) { + for _ in 0..num_cpus::get().clamp(1, 8) { let seen = seen.clone(); let extract_receiver = extract_receiver.clone(); let db = db.clone(); @@ -407,22 +423,19 @@ impl< if let Some(data) = db.get(&cid)? { if cid.codec() == fvm_ipld_encoding::DAG_CBOR { let mut new_values = extract_cids(&data)?; - cid_vec.append(&mut new_values); + if !new_values.is_empty() { + cid_vec.append(&mut new_values); + } } // Break out of the loop if the receiving end quit. - if block_sender - .send_async(Ok(CarBlock { cid, data })) - .await - .is_err() - { + if block_sender.send(Ok(CarBlock { cid, data })).is_err() { break 'main; } } else if fail_on_dead_links { // If the receiving end has already quit - just ignore it and // break out of the loop. let _ = block_sender - .send_async(Err(anyhow::anyhow!("missing key: {}", cid))) - .await; + .send(Err(anyhow::anyhow!("missing key: {}", cid))); break 'main; } } @@ -455,8 +468,10 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un let fail_on_dead_links = self.fail_on_dead_links; loop { - while let Some(cid) = self.queue.pop() { - if let Some(data) = self.db.get(&cid)? { + while let Some((cid, data)) = self.queue.pop() { + if let Some(data) = data { + return Poll::Ready(Some(Ok(CarBlock { cid, data }))); + } else if let Some(data) = self.db.get(&cid)? { return Poll::Ready(Some(Ok(CarBlock { cid, data }))); } else if fail_on_dead_links { return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid)))); @@ -473,15 +488,16 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un // has been reached yield a block without walking the graph it represents. if let Some(tipset) = this.tipset_iter.next() { for block in tipset.into_block_headers().into_iter() { - if this.seen.lock().insert(*block.cid()) { + let (cid, data) = block.car_block()?; + if this.seen.lock().insert(cid) { // Make sure we always yield a block, directly to the stream to avoid extra // work. - this.queue.push(*block.cid()); + this.queue.push((cid, Some(data))); if block.epoch == 0 { // The genesis block has some kind of dummy parent that needs to be emitted. for p in &block.parents { - this.queue.push(p); + this.queue.push((p, None)); } } @@ -494,7 +510,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un // This will simply return an error once we reach that item in // the queue. } else if fail_on_dead_links { - this.queue.push(block.messages); + this.queue.push((block.messages, None)); } else { // Make sure we update seen here as we don't send the block for // inspection. @@ -512,7 +528,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un // This will simply return an error once we reach that item in // the queue. } else if fail_on_dead_links { - this.queue.push(block.state_root); + this.queue.push((block.state_root, None)); } else { // Make sure we update seen here as we don't send the block for // inspection. From bc0e6ff8e5e98a10dd625eebc5640d6cd7580337 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 28 Jul 2025 19:45:49 +0800 Subject: [PATCH 03/10] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e769382ac109..51bbc2ba08a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ ### Added +- [#5867](https://github.com/ChainSafe/forest/pull/5867) Added `--unordered` to `forest-cli snapshot export` for exporting `CAR` blocks in non-deterministic order for better performance with more parallelization. + ### Changed ### Removed From 5023a7e9ba23bf8400faf44c73fdf59278dfc8d6 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 28 Jul 2025 20:35:57 +0800 Subject: [PATCH 04/10] fix --- src/ipld/util.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 08166c56d22b..835aa4116391 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -194,17 +194,15 @@ impl, ITER: Iterator + Unpin> Stream match task { Emit(_, _) => { if let Some(Emit(cid, data)) = this.dfs.pop_front() { - let data = if let Some(data) = data { - data + if let Some(data) = data { + return Poll::Ready(Some(Ok(CarBlock { cid, data }))); } else if let Some(data) = this.db.get(&cid)? { - data - } else { + return Poll::Ready(Some(Ok(CarBlock { cid, data }))); + } else if fail_on_dead_links { return Poll::Ready(Some(Err(anyhow::anyhow!( - "missing key: {}", - cid + "missing key: {cid}" )))); }; - return Poll::Ready(Some(Ok(CarBlock { cid, data }))); } } Iterate(cid_vec) => { @@ -228,8 +226,7 @@ impl, ITER: Iterator + Unpin> Stream return Poll::Ready(Some(Ok(CarBlock { cid, data }))); } else if fail_on_dead_links { return Poll::Ready(Some(Err(anyhow::anyhow!( - "missing key: {}", - cid + "missing key: {cid}" )))); } } @@ -435,7 +432,7 @@ impl< // If the receiving end has already quit - just ignore it and // break out of the loop. let _ = block_sender - .send(Err(anyhow::anyhow!("missing key: {}", cid))); + .send(Err(anyhow::anyhow!("missing key: {cid}"))); break 'main; } } @@ -474,7 +471,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un } else if let Some(data) = self.db.get(&cid)? { return Poll::Ready(Some(Ok(CarBlock { cid, data }))); } else if fail_on_dead_links { - return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid)))); + return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}")))); } } From 5d7ee883debb99a110abcdc0bdade52b2be6ed4d Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 7 Aug 2025 18:42:11 +0800 Subject: [PATCH 05/10] abort worker_handle gracefully & CI check --- .github/workflows/forest.yml | 22 +++++++++++ scripts/tests/calibnet_eth_mapping_check.sh | 3 +- .../tests/calibnet_export_unordered_check.sh | 37 +++++++++++++++++++ scripts/tests/harness.sh | 5 +++ src/ipld/util.rs | 34 +++++++++++------ 5 files changed, 87 insertions(+), 14 deletions(-) create mode 100755 scripts/tests/calibnet_export_unordered_check.sh diff --git a/.github/workflows/forest.yml b/.github/workflows/forest.yml index 5c5ed1013e88..1248ed032cf9 100644 --- a/.github/workflows/forest.yml +++ b/.github/workflows/forest.yml @@ -306,6 +306,28 @@ jobs: - name: Snapshot export check v2 run: ./scripts/tests/calibnet_export_check.sh v2 timeout-minutes: ${{ fromJSON(env.SCRIPT_TIMEOUT_MINUTES) }} + calibnet-unordered-export-check: + needs: + - build-ubuntu + name: Snapshot unordered export checks + runs-on: ubuntu-24.04 + steps: + - run: lscpu + - uses: actions/cache@v4 + with: + path: "${{ env.FIL_PROOFS_PARAMETER_CACHE }}" + key: proof-params-keys + - uses: actions/checkout@v4 + - uses: actions/download-artifact@v4 + with: + name: "forest-${{ runner.os }}" + path: ~/.cargo/bin + - name: Set permissions + run: | + chmod +x ~/.cargo/bin/forest* + - name: Snapshot unordered export check + run: ./scripts/tests/calibnet_export_unordered_check.sh + timeout-minutes: ${{ fromJSON(env.SCRIPT_TIMEOUT_MINUTES) }} calibnet-no-discovery-checks: needs: - build-ubuntu diff --git a/scripts/tests/calibnet_eth_mapping_check.sh b/scripts/tests/calibnet_eth_mapping_check.sh index 7a3e7168bd7b..8803edd8c35f 100755 --- a/scripts/tests/calibnet_eth_mapping_check.sh +++ b/scripts/tests/calibnet_eth_mapping_check.sh @@ -46,8 +46,7 @@ done echo "Done" -echo "Waiting to be ready for serving" -$FOREST_CLI_PATH healthcheck ready --wait +forest_wait_for_healthcheck_ready ERROR=0 echo "Testing Ethereum mappings" diff --git a/scripts/tests/calibnet_export_unordered_check.sh b/scripts/tests/calibnet_export_unordered_check.sh new file mode 100755 index 000000000000..180ba8c838d4 --- /dev/null +++ b/scripts/tests/calibnet_export_unordered_check.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# This script is checking the correctness of +# the snapshot export feature. +# It requires both the `forest` and `forest-cli` binaries to be in the PATH. + +set -eu + +source "$(dirname "$0")/harness.sh" + +forest_init "$@" + +echo "Cleaning up the initial snapshot" +rm --force --verbose ./*.{car,car.zst,sha256sum} + +echo "Exporting zstd compressed snapshot with unordred graph traversal" +$FOREST_CLI_PATH snapshot export --unordered + +$FOREST_CLI_PATH shutdown --force + +for f in *.car.zst; do + echo "Inspecting archive info $f" + $FOREST_TOOL_PATH archive info "$f" + echo "Inspecting archive metadata $f" + $FOREST_TOOL_PATH archive metadata "$f" +done + +echo "Cleanup calibnet db" +$FOREST_TOOL_PATH db destroy --chain calibnet --force + +echo "Import the unordered snapshot" +$FOREST_PATH --chain calibnet --encrypt-keystore false --halt-after-import --height=-200 --import-snapshot *.car.zst + +echo "Check if Forest is able to sync" +forest_run_node_detached +forest_wait_api +forest_wait_for_sync +forest_wait_for_healthcheck_ready diff --git a/scripts/tests/harness.sh b/scripts/tests/harness.sh index dc153f27a54f..1ab312882c0d 100644 --- a/scripts/tests/harness.sh +++ b/scripts/tests/harness.sh @@ -97,6 +97,11 @@ function forest_wait_for_sync { timeout 30m $FOREST_CLI_PATH sync wait } +function forest_wait_for_healthcheck_ready { + echo "Waiting for healthcheck ready" + timeout 30m $FOREST_CLI_PATH healthcheck ready --wait +} + function forest_init { forest_download_and_import_snapshot diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 4f191a489c17..25766d869556 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -284,7 +284,7 @@ pin_project! { seen: Arc>, worker_handle: JoinHandle>, block_recv_stream: Fuse>>, - extract_sender: flume::Sender, + extract_sender: Option>, stateroot_limit: ChainEpoch, queue: Vec<(Cid, Option>)>, fail_on_dead_links: bool, @@ -326,7 +326,7 @@ fn unordered_stream_chain_inner< let (sender, receiver) = flume::bounded(BLOCK_CHANNEL_LIMIT); let (extract_sender, extract_receiver) = flume::unbounded(); let seen = Arc::new(Mutex::new(CidHashSet::default())); - let handle = UnorderedChainStream::::start_workers( + let worker_handle = UnorderedChainStream::::start_workers( db.clone(), sender, extract_receiver, @@ -337,10 +337,10 @@ fn unordered_stream_chain_inner< UnorderedChainStream { seen, db, - worker_handle: handle, + worker_handle, block_recv_stream: receiver.into_stream().fuse(), queue: Vec::new(), - extract_sender, + extract_sender: Some(extract_sender), tipset_iter, stateroot_limit, fail_on_dead_links, @@ -414,9 +414,7 @@ impl< if let Some(data) = db.get(&cid)? { if cid.codec() == fvm_ipld_encoding::DAG_CBOR { let mut new_values = extract_cids(&data)?; - if !new_values.is_empty() { - cid_vec.append(&mut new_values); - } + cid_vec.append(&mut new_values); } // Break out of the loop if the receiving end quit. if block_sender.send(Ok(CarBlock { cid, data })).is_err() { @@ -455,6 +453,14 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un type Item = anyhow::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn send(sender: &Option>, v: T) -> Result<(), flume::SendError> { + if let Some(sender) = sender { + sender.send(v) + } else { + Err(flume::SendError(v)) + } + } + let stateroot_limit = self.stateroot_limit; let fail_on_dead_links = self.fail_on_dead_links; @@ -473,7 +479,8 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(block)) => return Poll::Ready(Some(block)), _ => { - let this = self.as_mut().project(); + let self_mut = self.as_mut(); + let this = self_mut.project(); // This consumes a [`Tipset`] from the iterator one at a time. Workers are then processing // the extract queue. The emit queue is processed in the loop above. Once the desired depth // has been reached yield a block without walking the graph it represents. @@ -497,7 +504,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un && should_save_block_to_snapshot(block.messages) { if this.db.has(&block.messages)? { - this.extract_sender.send(block.messages)?; + send(this.extract_sender, block.messages)?; // This will simply return an error once we reach that item in // the queue. } else if fail_on_dead_links { @@ -515,7 +522,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un && should_save_block_to_snapshot(block.state_root) { if this.db.has(&block.state_root)? { - this.extract_sender.send(block.state_root)?; + send(this.extract_sender, block.state_root)?; // This will simply return an error once we reach that item in // the queue. } else if fail_on_dead_links { @@ -528,8 +535,11 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un } } } - } else if this.extract_sender.is_empty() { - this.worker_handle.abort(); + } else if let Some(extract_sender) = this.extract_sender + && extract_sender.is_empty() + { + // drop the sender to abort the woker task + *this.extract_sender = None; } } } From ac15a2229445ec7d6626f83e56f6fd04de6e5831 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 7 Aug 2025 19:15:58 +0800 Subject: [PATCH 06/10] fix script lint --- scripts/tests/calibnet_export_unordered_check.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/tests/calibnet_export_unordered_check.sh b/scripts/tests/calibnet_export_unordered_check.sh index 180ba8c838d4..d779410d8f1a 100755 --- a/scripts/tests/calibnet_export_unordered_check.sh +++ b/scripts/tests/calibnet_export_unordered_check.sh @@ -28,7 +28,7 @@ echo "Cleanup calibnet db" $FOREST_TOOL_PATH db destroy --chain calibnet --force echo "Import the unordered snapshot" -$FOREST_PATH --chain calibnet --encrypt-keystore false --halt-after-import --height=-200 --import-snapshot *.car.zst +$FOREST_PATH --chain calibnet --encrypt-keystore false --halt-after-import --height=-100 --import-snapshot "*.car.zst" echo "Check if Forest is able to sync" forest_run_node_detached From f3897d3c8d61416fd4558d4ddae3d256b30b26b0 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 7 Aug 2025 22:11:25 +0800 Subject: [PATCH 07/10] fix --- scripts/tests/calibnet_export_unordered_check.sh | 4 ++-- src/ipld/util.rs | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/scripts/tests/calibnet_export_unordered_check.sh b/scripts/tests/calibnet_export_unordered_check.sh index d779410d8f1a..fd2e8e42ce6f 100755 --- a/scripts/tests/calibnet_export_unordered_check.sh +++ b/scripts/tests/calibnet_export_unordered_check.sh @@ -13,7 +13,7 @@ echo "Cleaning up the initial snapshot" rm --force --verbose ./*.{car,car.zst,sha256sum} echo "Exporting zstd compressed snapshot with unordred graph traversal" -$FOREST_CLI_PATH snapshot export --unordered +$FOREST_CLI_PATH snapshot export --unordered -o unordered.forest.car.zst $FOREST_CLI_PATH shutdown --force @@ -28,7 +28,7 @@ echo "Cleanup calibnet db" $FOREST_TOOL_PATH db destroy --chain calibnet --force echo "Import the unordered snapshot" -$FOREST_PATH --chain calibnet --encrypt-keystore false --halt-after-import --height=-100 --import-snapshot "*.car.zst" +$FOREST_PATH --chain calibnet --encrypt-keystore false --halt-after-import --height=-100 --import-snapshot unordered.forest.car.zst echo "Check if Forest is able to sync" forest_run_node_detached diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 25766d869556..398033bd1aa2 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -417,14 +417,19 @@ impl< cid_vec.append(&mut new_values); } // Break out of the loop if the receiving end quit. - if block_sender.send(Ok(CarBlock { cid, data })).is_err() { + if block_sender + .send_async(Ok(CarBlock { cid, data })) + .await + .is_err() + { break 'main; } } else if fail_on_dead_links { // If the receiving end has already quit - just ignore it and // break out of the loop. let _ = block_sender - .send(Err(anyhow::anyhow!("missing key: {cid}"))); + .send_async(Err(anyhow::anyhow!("missing key: {cid}"))) + .await; break 'main; } } From 0ccb41d20247dc9643ac6468f3bf5f09715ced60 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 8 Aug 2025 00:00:54 +0800 Subject: [PATCH 08/10] resolve AI comment --- .github/workflows/forest.yml | 2 ++ src/ipld/util.rs | 66 +++++++++++++++++++++--------------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/.github/workflows/forest.yml b/.github/workflows/forest.yml index 1248ed032cf9..134b6d743941 100644 --- a/.github/workflows/forest.yml +++ b/.github/workflows/forest.yml @@ -598,6 +598,8 @@ jobs: - state-migrations-check - calibnet-wallet-check - calibnet-export-check + - calibnet-export-check-v2 + - calibnet-unordered-export-check - calibnet-no-discovery-checks - calibnet-kademlia-checks - calibnet-eth-mapping-check diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 398033bd1aa2..d64379fda66c 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -466,6 +466,26 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un } } + fn process_cid( + cid: Cid, + db: &DB, + extract_sender: &Option>, + queue: &mut Vec<(Cid, Option>)>, + seen: &Arc>, + fail_on_dead_links: bool, + ) -> anyhow::Result<()> { + if should_save_block_to_snapshot(cid) { + if db.has(&cid)? { + send(extract_sender, cid)?; + } else if fail_on_dead_links { + queue.push((cid, None)); + } else { + seen.lock().insert(cid); + } + } + Ok(()) + } + let stateroot_limit = self.stateroot_limit; let fail_on_dead_links = self.fail_on_dead_links; @@ -505,38 +525,28 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un } // Process block messages. - if block.epoch > stateroot_limit - && should_save_block_to_snapshot(block.messages) - { - if this.db.has(&block.messages)? { - send(this.extract_sender, block.messages)?; - // This will simply return an error once we reach that item in - // the queue. - } else if fail_on_dead_links { - this.queue.push((block.messages, None)); - } else { - // Make sure we update seen here as we don't send the block for - // inspection. - this.seen.lock().insert(block.messages); - } + if block.epoch > stateroot_limit { + process_cid( + block.messages, + this.db, + &this.extract_sender, + this.queue, + this.seen, + fail_on_dead_links, + )?; } // Visit the block if it's within required depth. And a special case for `0` // epoch to match Lotus' implementation. - if (block.epoch == 0 || block.epoch > stateroot_limit) - && should_save_block_to_snapshot(block.state_root) - { - if this.db.has(&block.state_root)? { - send(this.extract_sender, block.state_root)?; - // This will simply return an error once we reach that item in - // the queue. - } else if fail_on_dead_links { - this.queue.push((block.state_root, None)); - } else { - // Make sure we update seen here as we don't send the block for - // inspection. - this.seen.lock().insert(block.state_root); - } + if block.epoch == 0 || block.epoch > stateroot_limit { + process_cid( + block.state_root, + this.db, + &this.extract_sender, + this.queue, + this.seen, + fail_on_dead_links, + )?; } } } From 10241f8dfc62c31a963ef0b453679fce2a5d9a2e Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 8 Aug 2025 19:57:25 +0800 Subject: [PATCH 09/10] fix lint errors --- src/ipld/util.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ipld/util.rs b/src/ipld/util.rs index d64379fda66c..f5a428aa8b2a 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -529,7 +529,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un process_cid( block.messages, this.db, - &this.extract_sender, + this.extract_sender, this.queue, this.seen, fail_on_dead_links, @@ -542,7 +542,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un process_cid( block.state_root, this.db, - &this.extract_sender, + this.extract_sender, this.queue, this.seen, fail_on_dead_links, From 2158980b1ed1ae749f2d8061bdf3a64df6318a9c Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 8 Aug 2025 20:53:17 +0800 Subject: [PATCH 10/10] resolve AI comments --- src/ipld/util.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/ipld/util.rs b/src/ipld/util.rs index f5a428aa8b2a..f530b3b25a8b 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -16,6 +16,7 @@ use fvm_ipld_blockstore::Blockstore; use parking_lot::Mutex; use pin_project_lite::pin_project; use std::borrow::Borrow; +use std::fmt::Display; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; @@ -458,11 +459,13 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un type Item = anyhow::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - fn send(sender: &Option>, v: T) -> Result<(), flume::SendError> { + fn send(sender: &Option>, v: T) -> anyhow::Result<()> { if let Some(sender) = sender { - sender.send(v) + sender + .send(v) + .map_err(|e| anyhow::anyhow!("failed to send {}", e.into_inner())) } else { - Err(flume::SendError(v)) + anyhow::bail!("attempted to enqueue after shutdown (extract_sender dropped): {v}"); } } @@ -553,7 +556,7 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator + Un } else if let Some(extract_sender) = this.extract_sender && extract_sender.is_empty() { - // drop the sender to abort the woker task + // drop the sender to abort the worker task *this.extract_sender = None; } }