diff --git a/Cargo.lock b/Cargo.lock index 25ec8a8d9a19..50f7ad90a05d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2002,7 +2002,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 2.0.106", + "syn 1.0.109", ] [[package]] @@ -4119,9 +4119,9 @@ dependencies = [ [[package]] name = "half" -version = "2.7.0" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e54c115d4f30f52c67202f079c5f9d8b49db4691f460fdb0b4c2e838261b2ba5" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" dependencies = [ "cfg-if", "crunchy", diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index e17f365470c9..a6341b8ea953 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -15,40 +15,36 @@ //! //! The state machine does not do any network requests or validation. Those are //! handled by an external actor. -use crate::libp2p::hello::HelloRequest; -use crate::message_pool::MessagePool; -use crate::message_pool::MpoolRpcProvider; -use crate::networks::calculate_expected_epoch; -use crate::shim::clock::ChainEpoch; -use crate::state_manager::StateManager; -use crate::utils::misc::env::is_env_truthy; + +use super::network_context::SyncNetworkContext; +use crate::{ + blocks::{Block, FullTipset, Tipset, TipsetKey}, + chain::ChainStore, + chain_sync::{ + ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator, + bad_block_cache::BadBlockCache, metrics, tipset_syncer::validate_tipset, + }, + libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest}, + message_pool::{MessagePool, MpoolRpcProvider}, + networks::calculate_expected_epoch, + shim::clock::ChainEpoch, + state_manager::StateManager, + utils::misc::env::is_env_truthy, +}; use ahash::{HashMap, HashSet}; use chrono::Utc; use cid::Cid; use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; use libp2p::PeerId; -use parking_lot::Mutex; -use std::time::Instant; -use std::{ops::Deref as _, sync::Arc}; +use parking_lot::{Mutex, RwLock}; +use std::{ops::Deref as _, sync::Arc, time::Instant}; use tokio::{sync::Notify, task::JoinSet}; use tracing::{debug, error, info, trace, warn}; -use super::network_context::SyncNetworkContext; -use crate::chain_sync::sync_status::SyncStatusReport; -use crate::chain_sync::tipset_syncer::validate_tipset; -use crate::chain_sync::{ForkSyncInfo, ForkSyncStage}; -use crate::{ - blocks::{Block, FullTipset, Tipset, TipsetKey}, - chain::ChainStore, - chain_sync::{TipsetValidator, bad_block_cache::BadBlockCache, metrics}, - libp2p::{NetworkEvent, PubsubMessage}, -}; -use parking_lot::RwLock; - pub struct ChainFollower { /// Syncing status of the chain - pub sync_status: Arc>, + pub sync_status: SyncStatus, /// manages retrieving and updates state objects state_manager: Arc>, @@ -138,7 +134,7 @@ pub async fn chain_follower( tipset_receiver: flume::Receiver>, network: SyncNetworkContext, mem_pool: Arc>>, - sync_status: Arc>, + sync_status: SyncStatus, genesis: Arc, stateless_mode: bool, ) -> anyhow::Result<()> { diff --git a/src/chain_sync/mod.rs b/src/chain_sync/mod.rs index bd27bab6195b..d7c399a56128 100644 --- a/src/chain_sync/mod.rs +++ b/src/chain_sync/mod.rs @@ -16,6 +16,6 @@ pub use self::{ chain_follower::{ChainFollower, load_full_tipset}, chain_muxer::SyncConfig, consensus::collect_errs, - sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatusReport}, + sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatus, SyncStatusReport}, validation::{TipsetValidationError, TipsetValidator}, }; diff --git a/src/chain_sync/sync_status.rs b/src/chain_sync/sync_status.rs index 892b452f4494..e8f423f7f5de 100644 --- a/src/chain_sync/sync_status.rs +++ b/src/chain_sync/sync_status.rs @@ -7,6 +7,7 @@ use crate::shim::clock::ChainEpoch; use crate::state_manager::StateManager; use chrono::{DateTime, Utc}; use fvm_ipld_blockstore::Blockstore; +use parking_lot::RwLock; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -101,6 +102,8 @@ pub struct ForkSyncInfo { pub(crate) last_updated: Option>, } +pub type SyncStatus = Arc>; + /// Contains information about the current status of the node's synchronization process. #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema)] pub struct SyncStatusReport { diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 258b5b54e302..bafb12a755b5 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -9,8 +9,8 @@ pub mod main; use crate::blocks::Tipset; use crate::chain::HeadChange; use crate::chain::index::ResolveNullTipset; -use crate::chain_sync::ChainFollower; use crate::chain_sync::network_context::SyncNetworkContext; +use crate::chain_sync::{ChainFollower, SyncStatus}; use crate::cli_shared::snapshot; use crate::cli_shared::{ chain_path, @@ -561,8 +561,9 @@ pub(super) async fn start( _ = snap_gc_reboot_rx.recv_async() => { snap_gc.cleanup_before_reboot().await; } - result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx| { + result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| { snap_gc.set_db(ctx.db.clone()); + snap_gc.set_sync_status(sync_status); snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default()); }) => { break result @@ -576,7 +577,7 @@ pub(super) async fn start_services( opts: &CliOpts, mut config: Config, shutdown_send: mpsc::Sender<()>, - on_app_context_and_db_initialized: impl Fn(&AppContext), + on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus), ) -> anyhow::Result<()> { // Cleanup the collector prometheus metrics registry on start crate::metrics::reset_collector_registry(); @@ -608,7 +609,7 @@ pub(super) async fn start_services( services.shutdown().await; return Ok(()); } - on_app_context_and_db_initialized(&ctx); + on_app_context_and_db_initialized(&ctx, chain_follower.sync_status.clone()); warmup_in_background(&ctx); ctx.state_manager.populate_cache(); maybe_start_metrics_service(&mut services, &config, &ctx).await?; diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index 65338bdab35d..0f8102fb05fc 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -74,6 +74,7 @@ pub struct SnapshotGarbageCollector { running: AtomicBool, blessed_lite_snapshot: RwLock>, db: RwLock>>, + sync_status: RwLock>, // On mainnet, it takes ~50MiB-200MiB RAM, depending on the time cost of snapshot export memory_db: RwLock>>>, memory_db_head_key: RwLock>, @@ -111,6 +112,7 @@ where running: AtomicBool::new(false), blessed_lite_snapshot: RwLock::new(None), db: RwLock::new(None), + sync_status: RwLock::new(None), memory_db: RwLock::new(None), memory_db_head_key: RwLock::new(None), exported_head_key: RwLock::new(None), @@ -132,6 +134,10 @@ where *self.car_db_head_epoch.write() = Some(epoch); } + pub fn set_sync_status(&self, sync_status: crate::chain_sync::SyncStatus) { + *self.sync_status.write() = Some(sync_status) + } + pub async fn event_loop(&self) { while self.trigger_rx.recv_async().await.is_ok() { if self.running.load(Ordering::Relaxed) { @@ -139,6 +145,7 @@ where } else { self.running.store(true, Ordering::Relaxed); if let Err(e) = self.export_snapshot().await { + self.running.store(false, Ordering::Relaxed); tracing::warn!("{e}"); } } @@ -170,18 +177,22 @@ where ); loop { if !self.running.load(Ordering::Relaxed) - && let Some(db) = &*self.db.read() && let Some(car_db_head_epoch) = *self.car_db_head_epoch.read() - && let Ok(head_key) = HeaviestTipsetKeyProvider::heaviest_tipset_key(db) - && let Ok(head) = Tipset::load_required(db, &head_key) + && let Some(sync_status) = &*self.sync_status.read() { - let head_epoch = head.epoch(); - if head_epoch - car_db_head_epoch >= snap_gc_interval_epochs + let sync_status = &*sync_status.read(); + let network_head_epoch = sync_status.network_head_epoch; + let head_epoch = sync_status.current_head_epoch; + if head_epoch > 0 // sync_status has been initialized + && head_epoch <= network_head_epoch // head epoch is within a sane range + && sync_status.is_synced() // chain is in sync + && sync_status.active_forks.is_empty() // no active fork + && head_epoch - car_db_head_epoch >= snap_gc_interval_epochs // the gap between chain head and car_db head is above threshold && self.trigger_tx.try_send(()).is_ok() { - tracing::info!(%car_db_head_epoch, %head_epoch, %snap_gc_interval_epochs, "Snap GC scheduled"); + tracing::info!(%car_db_head_epoch, %head_epoch, %network_head_epoch, %snap_gc_interval_epochs, "Snap GC scheduled"); } else { - tracing::trace!(%car_db_head_epoch, %head_epoch, %snap_gc_interval_epochs, "Snap GC not scheduled"); + tracing::debug!(%car_db_head_epoch, %head_epoch, %network_head_epoch, %snap_gc_interval_epochs, "Snap GC not scheduled"); } } tokio::time::sleep(snap_gc_check_interval).await; @@ -219,6 +230,7 @@ where } map }); + let start = Instant::now(); let (head_ts, _) = crate::chain::export_from_head::( &db, self.recent_state_roots, @@ -235,7 +247,11 @@ where head_ts.epoch() )); temp_path.persist(&target_path)?; - tracing::info!("exported lite snapshot at {}", target_path.display()); + tracing::info!( + "exported lite snapshot at {}, took {}", + target_path.display(), + humantime::format_duration(start.elapsed()) + ); *self.blessed_lite_snapshot.write() = Some(target_path); *self.exported_head_key.write() = Some(head_ts.key().clone()); diff --git a/src/health/mod.rs b/src/health/mod.rs index d0dcad924508..e1decd8c3678 100644 --- a/src/health/mod.rs +++ b/src/health/mod.rs @@ -7,10 +7,8 @@ use axum::{ response::{IntoResponse, Response}, routing::get, }; -use parking_lot::RwLock; -use crate::chain_sync::SyncStatusReport; -use crate::{Config, libp2p::PeerManager, networks::ChainConfig}; +use crate::{Config, chain_sync::SyncStatus, libp2p::PeerManager, networks::ChainConfig}; mod endpoints; @@ -22,7 +20,7 @@ pub(crate) struct ForestState { pub config: Config, pub chain_config: Arc, pub genesis_timestamp: u64, - pub sync_status: Arc>, + pub sync_status: SyncStatus, pub peer_manager: Arc, } @@ -60,11 +58,11 @@ impl IntoResponse for AppError { mod test { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use super::*; use crate::Client; + use crate::chain_sync::{NodeSyncStatus, SyncStatusReport}; use crate::cli_shared::cli::ChainIndexerConfig; - - use super::*; - use crate::chain_sync::NodeSyncStatus; + use parking_lot::RwLock; use reqwest::StatusCode; #[tokio::test] diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index defd30c9ff51..460a0121b252 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -452,7 +452,7 @@ pub struct RPCState { pub mpool: Arc>>, pub bad_blocks: Option>, pub msgs_in_tipset: Arc, - pub sync_status: Arc>, + pub sync_status: crate::chain_sync::SyncStatus, pub eth_event_handler: Arc, pub sync_network_context: SyncNetworkContext, pub tipset_send: flume::Sender>,