diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 7c3e08284c9..d81a57b6c6d 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -8,7 +8,10 @@ use super::{ }; use crate::networks::{ChainConfig, Height}; use crate::prelude::*; -use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash}; +use crate::rpc::{ + chain::ChainGetTipSetFinalityStatus, + eth::{eth_tx_from_signed_eth_message, types::EthHash}, +}; use crate::shim::clock::ChainEpoch; use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion}; use crate::state_manager::ExecutedTipset; @@ -30,7 +33,10 @@ use fvm_ipld_encoding::CborStore; use nonzero_ext::nonzero; use parking_lot::{Mutex, RwLock}; use serde::{Serialize, de::DeserializeOwned}; -use std::num::NonZeroUsize; +use std::{ + num::NonZeroUsize, + sync::atomic::{self, AtomicI64}, +}; use tokio::sync::broadcast; use tracing::{debug, error, trace, warn}; @@ -60,6 +66,9 @@ pub struct ChainStore { /// F3 finalized tipset cache f3_finalized_tipset: Arc>>, + /// EC calculator finalized epoch cache + ec_calculator_finalized_epoch: Arc, + /// Used as a cache for tipset `lookbacks`. chain_index: ChainIndex, @@ -84,6 +93,7 @@ impl ShallowClone for ChainStore { head_changes_tx: self.head_changes_tx.clone(), heaviest_tipset: self.heaviest_tipset.shallow_clone(), f3_finalized_tipset: self.f3_finalized_tipset.shallow_clone(), + ec_calculator_finalized_epoch: self.ec_calculator_finalized_epoch.shallow_clone(), chain_index: self.chain_index.shallow_clone(), tipset_tracker: self.tipset_tracker.shallow_clone(), genesis_block_header: self.genesis_block_header.shallow_clone(), @@ -111,35 +121,37 @@ impl ChainStore { } else { Tipset::from(&genesis_block_header) }; - let heaviest_tipset = Arc::new(RwLock::new(head)); + let heaviest_tipset = Arc::new(RwLock::new(head.shallow_clone())); let f3_finalized_tipset: Arc>> = Default::default(); - let chain_index = ChainIndex::new(db.shallow_clone()).with_is_tipset_finalized(Arc::new({ - let chain_finality = chain_config.policy.chain_finality; - let heaviest_tipset = heaviest_tipset.clone(); - let f3_finalized_tipset = f3_finalized_tipset.clone(); + let chain_index = ChainIndex::new(db.shallow_clone()); + let ec_calculator_finalized_epoch = Arc::new(AtomicI64::new( + ChainGetTipSetFinalityStatus::get_ec_finality_epoch(&chain_index, &chain_config, &head), + )); + let chain_index = chain_index.with_is_tipset_finalized(Arc::new({ + let f3_finalized_tipset = f3_finalized_tipset.shallow_clone(); + let ec_calculator_finalized_epoch = ec_calculator_finalized_epoch.shallow_clone(); move |ts| { let finalized = f3_finalized_tipset .read() .as_ref() .map(|ts| ts.epoch()) .unwrap_or_default() - .max(heaviest_tipset.read().epoch() - chain_finality); + .max(ec_calculator_finalized_epoch.load(atomic::Ordering::Acquire)); ts.epoch() <= finalized } })); - let cs = Self { + Ok(Self { head_changes_tx: publisher, chain_index, tipset_tracker: TipsetTracker::new(db, chain_config.clone()), heaviest_tipset, f3_finalized_tipset, + ec_calculator_finalized_epoch, genesis_block_header: genesis_block_header.into(), validated_blocks: Default::default(), chain_config, messages_in_tipset_cache: Default::default(), - }; - - Ok(cs) + }) } /// Sets F3 finalized tipset @@ -152,6 +164,12 @@ impl ChainStore { self.f3_finalized_tipset.read().clone() } + /// Gets the EC calculator finalized epoch + pub fn ec_calculator_finalized_epoch(&self) -> ChainEpoch { + self.ec_calculator_finalized_epoch + .load(atomic::Ordering::Acquire) + } + /// Cache for messages in tipsets, keyed by tipset key. pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache { &self.messages_in_tipset_cache @@ -161,8 +179,15 @@ impl ChainStore { pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> { head.key().save(self.db())?; self.db().set_heaviest_tipset_key(head.key())?; - let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.clone()); - + let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.shallow_clone()); + self.ec_calculator_finalized_epoch.store( + ChainGetTipSetFinalityStatus::get_ec_finality_epoch( + self.chain_index(), + self.chain_config(), + &head, + ), + atomic::Ordering::Release, + ); if crate::utils::broadcast::has_subscribers(&self.head_changes_tx) { let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { diff --git a/src/chain/store/index.rs b/src/chain/store/index.rs index 4072d5a3814..170a8a76479 100644 --- a/src/chain/store/index.rs +++ b/src/chain/store/index.rs @@ -166,6 +166,10 @@ impl ChainIndex { epoch.mod_floor(&CHECKPOINT_INTERVAL) == 0 } + if to == 0 { + return Ok(Some(Tipset::from(from.genesis(&self.db)?))); + } + let from_epoch = from.epoch(); let mut checkpoint_from_epoch = to; @@ -180,14 +184,13 @@ impl ChainIndex { checkpoint_from_epoch = next_checkpoint(checkpoint_from_epoch); } - if to == 0 { - return Ok(Some(Tipset::from(from.genesis(&self.db)?))); - } if to > from.epoch() { return Err(Error::Other(format!( "looking for tipset with height greater than start point, req: {to}, head: {from}", from = from.epoch() ))); + } else if to == from.epoch() { + return Ok(Some(from)); } let from_epoch = from.epoch(); @@ -225,6 +228,18 @@ impl ChainIndex { Ok(None) } + pub async fn tipset_by_height_async( + &self, + to: ChainEpoch, + from: Tipset, + resolve: ResolveNullTipset, + ) -> Result, Error> { + let this = self.shallow_clone(); + tokio::task::spawn_blocking(move || this.tipset_by_height(to, from, resolve)) + .await + .map_err(|e| Error::Other(e.to_string()))? + } + /// Same as [`Self::tipset_by_height`], but errors if that would return `None`. pub fn load_required_tipset_by_height( &self, diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index f9206c1d7d9..6b3d4580ecb 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -405,6 +405,31 @@ async fn prefill_rpc_caches_for_tipset(state_manager: StateManager, tsk: TipsetK warn!("failed to call `StateManager::execution_trace` for cache warmup: {e:#}"); } } + { + let finalized_epoch = state_manager + .chain_store() + .ec_calculator_finalized_epoch() + .max( + state_manager + .chain_store() + .f3_finalized_tipset() + .map(|ts| ts.epoch()) + .unwrap_or(0), + ); + if let Err(e) = state_manager + .chain_index() + .tipset_by_height_async( + finalized_epoch, + ts.shallow_clone(), + ResolveNullTipset::TakeOlder, + ) + .await + { + warn!( + "failed to call `ChainIndex::tipset_by_height` at finalized epoch {finalized_epoch} for cache warmup: {e:#}" + ); + } + } { use crate::rpc::eth::filter::{Matcher, SkipEvent}; struct CollectEventsCachePrefillingMatcher; diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index f9f8f6d23bb..9a03161f3c4 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -7,7 +7,7 @@ use types::*; #[cfg(test)] use crate::blocks::RawBlockHeader; use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey}; -use crate::chain::index::ResolveNullTipset; +use crate::chain::index::{ChainIndex, ResolveNullTipset}; use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange}; use crate::chain_sync::{get_full_tipset, load_full_tipset}; use crate::cid_collections::{CidHashSet, FileBackedCidHashSet}; @@ -17,6 +17,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; +use crate::networks::ChainConfig; use crate::prelude::*; use crate::rpc::eth::{ Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, @@ -43,6 +44,7 @@ use num::BigInt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use sha2::Sha256; +use std::convert::Infallible; use std::fs::File; use std::{collections::VecDeque, path::PathBuf, sync::LazyLock}; use tokio::sync::{ @@ -1121,6 +1123,7 @@ impl RpcMethod<1> for ChainGetTipSetV2 { pub enum ChainGetTipSetFinalityStatus {} +const EC_CALCULATOR_FINALITY_CACHE_SIZE: usize = 4; impl ChainGetTipSetFinalityStatus { pub fn get_finality_status(ctx: &Ctx) -> anyhow::Result { let head = ctx.chain_store().heaviest_tipset(); @@ -1152,25 +1155,58 @@ impl ChainGetTipSetFinalityStatus { ctx: &Ctx, head: Tipset, ) -> anyhow::Result<(i64, Option)> { - static CACHE: parking_lot::Mutex)>> = - parking_lot::Mutex::new(None); - let mut cache = CACHE.lock(); - if let Some((cached_head, cached_threshold, cached_tipset)) = &*cache - && cached_head == &head - { - Ok((*cached_threshold, cached_tipset.shallow_clone())) + static CACHE: LazyLock)>> = + LazyLock::new(|| quick_cache::sync::Cache::new(EC_CALCULATOR_FINALITY_CACHE_SIZE)); + CACHE.get_or_insert_with(head.shallow_clone().key(), move || { + Self::get_ec_finality_threshold_depth_and_tipset(ctx, head) + }) + } + + pub fn get_ec_finality_epoch( + chain_index: &ChainIndex, + chain_config: &ChainConfig, + head: &Tipset, + ) -> i64 { + let depth = + Self::get_ec_finality_threshold_depth_with_cache(chain_index, chain_config, head); + Self::get_ec_finality_epoch_by_depth(chain_config, head, depth) + } + + fn get_ec_finality_epoch_by_depth( + chain_config: &ChainConfig, + head: &Tipset, + depth: i64, + ) -> i64 { + if depth >= 0 { + (head.epoch() - depth).max(0) } else { - let (threshold, tipset) = - Self::get_ec_finality_threshold_depth_and_tipset(ctx, head.shallow_clone())?; - *cache = Some((head, threshold, tipset.shallow_clone())); - Ok((threshold, tipset)) + (head.epoch() - chain_config.policy.chain_finality).max(0) } } - fn get_ec_finality_threshold_depth_and_tipset( - ctx: &Ctx, - head: Tipset, - ) -> anyhow::Result<(i64, Option)> { + fn get_ec_finality_threshold_depth_with_cache( + chain_index: &ChainIndex, + chain_config: &ChainConfig, + head: &Tipset, + ) -> i64 { + static CACHE: LazyLock> = + LazyLock::new(|| quick_cache::sync::Cache::new(EC_CALCULATOR_FINALITY_CACHE_SIZE)); + CACHE + .get_or_insert_with(head.key(), move || -> Result { + Ok(Self::get_ec_finality_threshold_depth( + chain_index, + chain_config, + head, + )) + }) + .expect("infallible") + } + + fn get_ec_finality_threshold_depth( + chain_index: &ChainIndex, + chain_config: &ChainConfig, + head: &Tipset, + ) -> i64 { use crate::chain::ec_finality::calculator::{ DEFAULT_BLOCKS_PER_EPOCH, DEFAULT_BYZANTINE_FRACTION, DEFAULT_GUARANTEE, find_threshold_depth, @@ -1184,13 +1220,13 @@ impl ChainGetTipSetFinalityStatus { /// they consume array slots without advancing the meaningful epoch count. const FINALITY_CHAIN_EXTRA_EPOCHS: usize = 5; - let finality = ctx.chain_config().policy.chain_finality; + let finality = chain_config.policy.chain_finality; let chain_len = finality as usize + FINALITY_CHAIN_EXTRA_EPOCHS; let mut chain = Vec::with_capacity(chain_len); let mut ts = head.shallow_clone(); while chain.len() < chain_len { chain.push(ts.len() as i64); - if let Ok(parent) = ctx.chain_index().load_required_tipset(ts.parents()) { + if let Ok(parent) = chain_index.load_required_tipset(ts.parents()) { // insert 0 for null rounds if let Ok(n_null_tipsets_to_pad) = usize::try_from(ts.epoch() - parent.epoch() - 1) && n_null_tipsets_to_pad > 0 @@ -1206,7 +1242,7 @@ impl ChainGetTipSetFinalityStatus { } // Reverse to chronological order (oldest first). chain.reverse(); - let depth = match find_threshold_depth( + match find_threshold_depth( &chain, finality, DEFAULT_BLOCKS_PER_EPOCH, @@ -1220,23 +1256,25 @@ impl ChainGetTipSetFinalityStatus { ); -1 } - }; - let finalized = if depth >= 0 - && let Ok(Some(ts)) = ctx.chain_index().tipset_by_height( - (head.epoch() - depth).max(0), - head.shallow_clone(), - ResolveNullTipset::TakeOlder, - ) { - Some(ts) - } else { - let ec_finality_epoch = - (head.epoch() - ctx.chain_config().policy.chain_finality).max(0); - ctx.chain_index().tipset_by_height( - ec_finality_epoch, - head, - ResolveNullTipset::TakeOlder, - )? - }; + } + } + + fn get_ec_finality_threshold_depth_and_tipset( + ctx: &Ctx, + head: Tipset, + ) -> anyhow::Result<(i64, Option)> { + let depth = Self::get_ec_finality_threshold_depth_with_cache( + ctx.chain_index(), + ctx.chain_config(), + &head, + ); + let ec_finality_epoch = + Self::get_ec_finality_epoch_by_depth(ctx.chain_config(), &head, depth); + let finalized = ctx.chain_index().tipset_by_height( + ec_finality_epoch, + head, + ResolveNullTipset::TakeOlder, + )?; Ok((depth, finalized)) } }