Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use super::{
};
use crate::networks::{ChainConfig, Height};
use crate::prelude::*;
use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
use crate::rpc::{
chain::ChainGetTipSetFinalityStatus,
eth::{eth_tx_from_signed_eth_message, types::EthHash},
};
use crate::shim::clock::ChainEpoch;
use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion};
use crate::state_manager::ExecutedTipset;
Expand All @@ -30,7 +33,10 @@ use fvm_ipld_encoding::CborStore;
use nonzero_ext::nonzero;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use std::num::NonZeroUsize;
use std::{
num::NonZeroUsize,
sync::atomic::{self, AtomicI64},
};
use tokio::sync::broadcast;
use tracing::{debug, error, trace, warn};

Expand Down Expand Up @@ -60,6 +66,9 @@ pub struct ChainStore {
/// F3 finalized tipset cache
f3_finalized_tipset: Arc<RwLock<Option<Tipset>>>,

/// EC calculator finalized epoch cache
ec_calculator_finalized_epoch: Arc<AtomicI64>,

/// Used as a cache for tipset `lookbacks`.
chain_index: ChainIndex,

Expand All @@ -84,6 +93,7 @@ impl ShallowClone for ChainStore {
head_changes_tx: self.head_changes_tx.clone(),
heaviest_tipset: self.heaviest_tipset.shallow_clone(),
f3_finalized_tipset: self.f3_finalized_tipset.shallow_clone(),
ec_calculator_finalized_epoch: self.ec_calculator_finalized_epoch.shallow_clone(),
chain_index: self.chain_index.shallow_clone(),
tipset_tracker: self.tipset_tracker.shallow_clone(),
genesis_block_header: self.genesis_block_header.shallow_clone(),
Expand Down Expand Up @@ -111,35 +121,37 @@ impl ChainStore {
} else {
Tipset::from(&genesis_block_header)
};
let heaviest_tipset = Arc::new(RwLock::new(head));
let heaviest_tipset = Arc::new(RwLock::new(head.shallow_clone()));
let f3_finalized_tipset: Arc<RwLock<Option<Tipset>>> = Default::default();
let chain_index = ChainIndex::new(db.shallow_clone()).with_is_tipset_finalized(Arc::new({
let chain_finality = chain_config.policy.chain_finality;
let heaviest_tipset = heaviest_tipset.clone();
let f3_finalized_tipset = f3_finalized_tipset.clone();
let chain_index = ChainIndex::new(db.shallow_clone());
let ec_calculator_finalized_epoch = Arc::new(AtomicI64::new(
ChainGetTipSetFinalityStatus::get_ec_finality_epoch(&chain_index, &chain_config, &head),
));
let chain_index = chain_index.with_is_tipset_finalized(Arc::new({
let f3_finalized_tipset = f3_finalized_tipset.shallow_clone();
let ec_calculator_finalized_epoch = ec_calculator_finalized_epoch.shallow_clone();
move |ts| {
let finalized = f3_finalized_tipset
.read()
.as_ref()
.map(|ts| ts.epoch())
.unwrap_or_default()
.max(heaviest_tipset.read().epoch() - chain_finality);
.max(ec_calculator_finalized_epoch.load(atomic::Ordering::Acquire));
ts.epoch() <= finalized
}
}));
let cs = Self {
Ok(Self {
head_changes_tx: publisher,
chain_index,
tipset_tracker: TipsetTracker::new(db, chain_config.clone()),
heaviest_tipset,
f3_finalized_tipset,
ec_calculator_finalized_epoch,
genesis_block_header: genesis_block_header.into(),
validated_blocks: Default::default(),
chain_config,
messages_in_tipset_cache: Default::default(),
};

Ok(cs)
})
}

/// Sets F3 finalized tipset
Expand All @@ -152,6 +164,12 @@ impl ChainStore {
self.f3_finalized_tipset.read().clone()
}

/// Gets the EC calculator finalized epoch
pub fn ec_calculator_finalized_epoch(&self) -> ChainEpoch {
self.ec_calculator_finalized_epoch
.load(atomic::Ordering::Acquire)
}

/// Cache for messages in tipsets, keyed by tipset key.
pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache {
&self.messages_in_tipset_cache
Expand All @@ -161,8 +179,15 @@ impl ChainStore {
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
head.key().save(self.db())?;
self.db().set_heaviest_tipset_key(head.key())?;
let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.clone());

let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.shallow_clone());
self.ec_calculator_finalized_epoch.store(
ChainGetTipSetFinalityStatus::get_ec_finality_epoch(
self.chain_index(),
self.chain_config(),
&head,
),
atomic::Ordering::Release,
);
if crate::utils::broadcast::has_subscribers(&self.head_changes_tx) {
let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key())
{
Expand Down
21 changes: 18 additions & 3 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ impl ChainIndex {
epoch.mod_floor(&CHECKPOINT_INTERVAL) == 0
}

if to == 0 {
return Ok(Some(Tipset::from(from.genesis(&self.db)?)));
}

let from_epoch = from.epoch();

let mut checkpoint_from_epoch = to;
Expand All @@ -180,14 +184,13 @@ impl ChainIndex {
checkpoint_from_epoch = next_checkpoint(checkpoint_from_epoch);
}

if to == 0 {
return Ok(Some(Tipset::from(from.genesis(&self.db)?)));
}
if to > from.epoch() {
return Err(Error::Other(format!(
"looking for tipset with height greater than start point, req: {to}, head: {from}",
from = from.epoch()
)));
} else if to == from.epoch() {
return Ok(Some(from));
}

let from_epoch = from.epoch();
Expand Down Expand Up @@ -225,6 +228,18 @@ impl ChainIndex {
Ok(None)
}

pub async fn tipset_by_height_async(
&self,
to: ChainEpoch,
from: Tipset,
resolve: ResolveNullTipset,
) -> Result<Option<Tipset>, Error> {
let this = self.shallow_clone();
tokio::task::spawn_blocking(move || this.tipset_by_height(to, from, resolve))
.await
.map_err(|e| Error::Other(e.to_string()))?
}

/// Same as [`Self::tipset_by_height`], but errors if that would return `None`.
pub fn load_required_tipset_by_height(
&self,
Expand Down
25 changes: 25 additions & 0 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,31 @@ async fn prefill_rpc_caches_for_tipset(state_manager: StateManager, tsk: TipsetK
warn!("failed to call `StateManager::execution_trace` for cache warmup: {e:#}");
}
}
{
let finalized_epoch = state_manager
.chain_store()
.ec_calculator_finalized_epoch()
.max(
state_manager
.chain_store()
.f3_finalized_tipset()
.map(|ts| ts.epoch())
.unwrap_or(0),
);
if let Err(e) = state_manager
.chain_index()
.tipset_by_height_async(
finalized_epoch,
ts.shallow_clone(),
ResolveNullTipset::TakeOlder,
)
.await
{
warn!(
"failed to call `ChainIndex::tipset_by_height` at finalized epoch {finalized_epoch} for cache warmup: {e:#}"
);
}
}
{
use crate::rpc::eth::filter::{Matcher, SkipEvent};
struct CollectEventsCachePrefillingMatcher;
Expand Down
110 changes: 74 additions & 36 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use types::*;
#[cfg(test)]
use crate::blocks::RawBlockHeader;
use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey};
use crate::chain::index::ResolveNullTipset;
use crate::chain::index::{ChainIndex, ResolveNullTipset};
use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange};
use crate::chain_sync::{get_full_tipset, load_full_tipset};
use crate::cid_collections::{CidHashSet, FileBackedCidHashSet};
Expand All @@ -17,6 +17,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
#[cfg(test)]
use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
use crate::message::{ChainMessage, SignedMessage};
use crate::networks::ChainConfig;
use crate::prelude::*;
use crate::rpc::eth::{
Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders,
Expand All @@ -43,6 +44,7 @@ use num::BigInt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::convert::Infallible;
use std::fs::File;
use std::{collections::VecDeque, path::PathBuf, sync::LazyLock};
use tokio::sync::{
Expand Down Expand Up @@ -1121,6 +1123,7 @@ impl RpcMethod<1> for ChainGetTipSetV2 {

pub enum ChainGetTipSetFinalityStatus {}

const EC_CALCULATOR_FINALITY_CACHE_SIZE: usize = 4;
impl ChainGetTipSetFinalityStatus {
pub fn get_finality_status(ctx: &Ctx) -> anyhow::Result<ChainFinalityStatus> {
let head = ctx.chain_store().heaviest_tipset();
Expand Down Expand Up @@ -1152,25 +1155,58 @@ impl ChainGetTipSetFinalityStatus {
ctx: &Ctx,
head: Tipset,
) -> anyhow::Result<(i64, Option<Tipset>)> {
static CACHE: parking_lot::Mutex<Option<(Tipset, i64, Option<Tipset>)>> =
parking_lot::Mutex::new(None);
let mut cache = CACHE.lock();
if let Some((cached_head, cached_threshold, cached_tipset)) = &*cache
&& cached_head == &head
{
Ok((*cached_threshold, cached_tipset.shallow_clone()))
static CACHE: LazyLock<quick_cache::sync::Cache<TipsetKey, (i64, Option<Tipset>)>> =
LazyLock::new(|| quick_cache::sync::Cache::new(EC_CALCULATOR_FINALITY_CACHE_SIZE));
CACHE.get_or_insert_with(head.shallow_clone().key(), move || {
Self::get_ec_finality_threshold_depth_and_tipset(ctx, head)
})
}

pub fn get_ec_finality_epoch(
chain_index: &ChainIndex,
chain_config: &ChainConfig,
head: &Tipset,
) -> i64 {
let depth =
Self::get_ec_finality_threshold_depth_with_cache(chain_index, chain_config, head);
Self::get_ec_finality_epoch_by_depth(chain_config, head, depth)
}

fn get_ec_finality_epoch_by_depth(
chain_config: &ChainConfig,
head: &Tipset,
depth: i64,
) -> i64 {
if depth >= 0 {
(head.epoch() - depth).max(0)
} else {
let (threshold, tipset) =
Self::get_ec_finality_threshold_depth_and_tipset(ctx, head.shallow_clone())?;
*cache = Some((head, threshold, tipset.shallow_clone()));
Ok((threshold, tipset))
(head.epoch() - chain_config.policy.chain_finality).max(0)
}
}

fn get_ec_finality_threshold_depth_and_tipset(
ctx: &Ctx,
head: Tipset,
) -> anyhow::Result<(i64, Option<Tipset>)> {
fn get_ec_finality_threshold_depth_with_cache(
chain_index: &ChainIndex,
chain_config: &ChainConfig,
head: &Tipset,
) -> i64 {
static CACHE: LazyLock<quick_cache::sync::Cache<TipsetKey, i64>> =
LazyLock::new(|| quick_cache::sync::Cache::new(EC_CALCULATOR_FINALITY_CACHE_SIZE));
CACHE
.get_or_insert_with(head.key(), move || -> Result<i64, Infallible> {
Ok(Self::get_ec_finality_threshold_depth(
chain_index,
chain_config,
head,
))
})
.expect("infallible")
}

fn get_ec_finality_threshold_depth(
chain_index: &ChainIndex,
chain_config: &ChainConfig,
head: &Tipset,
) -> i64 {
use crate::chain::ec_finality::calculator::{
DEFAULT_BLOCKS_PER_EPOCH, DEFAULT_BYZANTINE_FRACTION, DEFAULT_GUARANTEE,
find_threshold_depth,
Expand All @@ -1184,13 +1220,13 @@ impl ChainGetTipSetFinalityStatus {
/// they consume array slots without advancing the meaningful epoch count.
const FINALITY_CHAIN_EXTRA_EPOCHS: usize = 5;

let finality = ctx.chain_config().policy.chain_finality;
let finality = chain_config.policy.chain_finality;
let chain_len = finality as usize + FINALITY_CHAIN_EXTRA_EPOCHS;
let mut chain = Vec::with_capacity(chain_len);
let mut ts = head.shallow_clone();
while chain.len() < chain_len {
chain.push(ts.len() as i64);
if let Ok(parent) = ctx.chain_index().load_required_tipset(ts.parents()) {
if let Ok(parent) = chain_index.load_required_tipset(ts.parents()) {
// insert 0 for null rounds
if let Ok(n_null_tipsets_to_pad) = usize::try_from(ts.epoch() - parent.epoch() - 1)
&& n_null_tipsets_to_pad > 0
Expand All @@ -1206,7 +1242,7 @@ impl ChainGetTipSetFinalityStatus {
}
// Reverse to chronological order (oldest first).
chain.reverse();
let depth = match find_threshold_depth(
match find_threshold_depth(
&chain,
finality,
DEFAULT_BLOCKS_PER_EPOCH,
Expand All @@ -1220,23 +1256,25 @@ impl ChainGetTipSetFinalityStatus {
);
-1
}
};
let finalized = if depth >= 0
&& let Ok(Some(ts)) = ctx.chain_index().tipset_by_height(
(head.epoch() - depth).max(0),
head.shallow_clone(),
ResolveNullTipset::TakeOlder,
) {
Some(ts)
} else {
let ec_finality_epoch =
(head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
ctx.chain_index().tipset_by_height(
ec_finality_epoch,
head,
ResolveNullTipset::TakeOlder,
)?
};
}
}

fn get_ec_finality_threshold_depth_and_tipset(
ctx: &Ctx,
head: Tipset,
) -> anyhow::Result<(i64, Option<Tipset>)> {
let depth = Self::get_ec_finality_threshold_depth_with_cache(
ctx.chain_index(),
ctx.chain_config(),
&head,
);
let ec_finality_epoch =
Self::get_ec_finality_epoch_by_depth(ctx.chain_config(), &head, depth);
let finalized = ctx.chain_index().tipset_by_height(
ec_finality_epoch,
head,
ResolveNullTipset::TakeOlder,
)?;
Ok((depth, finalized))
}
}
Expand Down
Loading