From b2899fdf0ff3517817e609d0067026a02e6dca08 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 17 Jun 2025 11:43:10 +0300 Subject: [PATCH 1/7] Add communication channel --- .../beacon_chain/src/validator_custody.rs | 1 + beacon_node/network/src/router.rs | 4 ++- beacon_node/network/src/service.rs | 27 +++++++++++++++++++ beacon_node/network/src/sync/manager.rs | 21 ++++++++++++++- beacon_node/network/src/sync/tests/lookups.rs | 1 + 5 files changed, 52 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 1169b64537d..c622c459e19 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -255,6 +255,7 @@ impl CustodyContext { /// The custody count changed because of a change in the /// number of validators being managed. +#[derive(Debug, Clone)] pub struct CustodyCountChanged { pub new_custody_group_count: u64, pub sampling_count: u64, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 2a7bc597c26..d025d816ac8 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -6,7 +6,7 @@ #![allow(clippy::unit_arg)] use crate::network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor}; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, SyncServiceMessage}; use crate::status::status_message; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -84,6 +84,7 @@ impl Router { beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, + sync_service_recv: mpsc::UnboundedReceiver, executor: task_executor::TaskExecutor, invalid_block_storage: InvalidBlockStorage, beacon_processor_send: BeaconProcessorSend, @@ -117,6 +118,7 @@ impl Router { network_send.clone(), network_beacon_processor.clone(), sync_recv, + sync_service_recv, fork_context, ); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index c9f89ad6686..71e316200c5 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -5,6 +5,7 @@ use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; use crate::router::{Router, RouterMessage}; use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription}; use crate::NetworkConfig; +use beacon_chain::validator_custody::CustodyCountChanged; use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; use futures::channel::mpsc::Sender; @@ -131,6 +132,15 @@ pub enum ValidatorSubscriptionMessage { }, } +/// Messages that Lighthouse services send to communicate with the Sync service. +#[derive(Debug, Clone, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum SyncServiceMessage { + /// Custody group count changed due to a change in validators' weight. + /// Trigger data column backfill. + CustodyCountChanged(CustodyCountChanged), +} + #[derive(Clone)] pub struct NetworkSenders { network_send: mpsc::UnboundedSender>, @@ -182,6 +192,8 @@ pub struct NetworkService { /// The sending channel for the network service to send messages to be routed throughout /// lighthouse. router_send: mpsc::UnboundedSender>, + /// The sending channel for the network service to send messages to the sync service. + sync_service_send: mpsc::UnboundedSender, /// A reference to lighthouse's database to persist the DHT. store: Arc>, /// A collection of global variables, accessible outside of the network service. @@ -205,10 +217,13 @@ pub struct NetworkService { } impl NetworkService { + #[allow(clippy::too_many_arguments)] async fn build( beacon_chain: Arc>, config: Arc, executor: task_executor::TaskExecutor, + sync_service_send: mpsc::UnboundedSender, + sync_service_recv: mpsc::UnboundedReceiver, libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, beacon_processor_reprocess_tx: mpsc::Sender, @@ -312,6 +327,7 @@ impl NetworkService { beacon_chain.clone(), network_globals.clone(), network_senders.network_send(), + sync_service_recv, executor.clone(), invalid_block_storage, beacon_processor_send, @@ -344,6 +360,7 @@ impl NetworkService { subnet_service, network_recv, validator_subscription_recv, + sync_service_send, router_send, store, network_globals: network_globals.clone(), @@ -369,10 +386,13 @@ impl NetworkService { beacon_processor_send: BeaconProcessorSend, beacon_processor_reprocess_tx: mpsc::Sender, ) -> Result<(Arc>, NetworkSenders), String> { + let (sync_service_send, sync_service_recv) = mpsc::unbounded_channel(); let (network_service, network_globals, network_senders) = Self::build( beacon_chain, config, executor.clone(), + sync_service_send, + sync_service_recv, libp2p_registry, beacon_processor_send, beacon_processor_reprocess_tx, @@ -768,6 +788,13 @@ impl NetworkService { self.libp2p .subscribe_new_data_column_subnets(sampling_count); self.libp2p.update_enr_cgc(new_custody_group_count); + let message = SyncServiceMessage::CustodyCountChanged(CustodyCountChanged { + new_custody_group_count, + sampling_count, + }); + if let Err(e) = self.sync_service_send.send(message.clone()) { + tracing::error!(error = %e, ?message, "Could not send message to the syn service."); + } } } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d11a18ed0ae..0c8ca68c746 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,7 +42,7 @@ use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, SyncServiceMessage}; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, @@ -234,6 +234,9 @@ pub struct SyncManager { /// A receiving channel sent by the message processor thread. input_channel: mpsc::UnboundedReceiver>, + /// A receiving channel sent by external lighthouse services. + service_channel: mpsc::UnboundedReceiver, + /// A network context to contact the network service. network: SyncNetworkContext, @@ -261,6 +264,7 @@ pub fn spawn( network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, + sync_service_recv: mpsc::UnboundedReceiver, fork_context: Arc, ) { assert!( @@ -274,6 +278,7 @@ pub fn spawn( network_send, beacon_processor, sync_recv, + sync_service_recv, SamplingConfig::Default, fork_context, ); @@ -296,6 +301,7 @@ impl SyncManager { network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, + sync_service_recv: mpsc::UnboundedReceiver, sampling_config: SamplingConfig, fork_context: Arc, ) -> Self { @@ -303,6 +309,7 @@ impl SyncManager { Self { chain: beacon_chain.clone(), input_channel: sync_recv, + service_channel: sync_service_recv, network: SyncNetworkContext::new( network_send, beacon_processor.clone(), @@ -736,6 +743,9 @@ impl SyncManager { Some(sync_message) = self.input_channel.recv() => { self.handle_message(sync_message); }, + Some(service_message) = self.service_channel.recv() => { + self.handle_service_message(service_message) + } Some(engine_state) = check_ee_stream.next(), if check_ee => { self.handle_new_execution_engine_state(engine_state); } @@ -752,6 +762,15 @@ impl SyncManager { } } + pub(crate) fn handle_service_message(&mut self, sync_service_message: SyncServiceMessage) { + match sync_service_message { + SyncServiceMessage::CustodyCountChanged(_custody_count_changed) => { + self.backfill_sync; + todo!() + }, + } + } + pub(crate) fn handle_message(&mut self, sync_message: SyncMessage) { match sync_message { SyncMessage::AddPeer(peer_id, info) => { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a2c359c87e7..b6ee1e0c669 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -124,6 +124,7 @@ impl TestRig { beacon_processor.into(), // Pass empty recv not tied to any tx mpsc::unbounded_channel().1, + mpsc::unbounded_channel().1, SamplingConfig::Custom { required_successes: vec![SAMPLING_REQUIRED_SUCCESSES], }, From 4f738b682faddee3e9146297bc322cd86f09e52a Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 21 Jun 2025 21:02:52 +0300 Subject: [PATCH 2/7] Rename enum --- .../beacon_chain/src/validator_custody.rs | 7 +++++++ .../lighthouse_network/src/service/mod.rs | 12 +++++++++-- beacon_node/network/src/service.rs | 21 +++++++++++-------- beacon_node/network/src/sync/manager.rs | 2 +- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index c622c459e19..9724f75f9fa 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -261,6 +261,13 @@ pub struct CustodyCountChanged { pub sampling_count: u64, } +/// The data columns to backfill due to a custody count change. +#[derive(Debug, Clone)] +pub struct CustodyColumnBackfill { + pub columns_to_backfill: Vec, + pub start_epoch: Epoch, +} + /// The custody information that gets persisted across runs. #[derive(Debug, Encode, Decode, Clone)] pub struct CustodyContextSsz { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index e2c6f244058..62de312f03c 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -885,14 +885,16 @@ impl Network { } } - /// Subscribe to all data columns determined by the cgc. + /// Subscribe to all data columns determined by the cgc and return the newly subscribed columns #[instrument(parent = None, level = "trace", fields(service = "libp2p"), name = "libp2p", skip_all )] - pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) { + pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) -> Vec { + let old_columns = self.network_globals.sampling_columns(); + self.network_globals .update_data_column_subnets(custody_column_count); @@ -900,6 +902,12 @@ impl Network { let kind = GossipKind::DataColumnSidecar(column); self.subscribe_kind(kind); } + + self.network_globals + .sampling_columns() + .difference(&old_columns) + .cloned() + .collect() } /// Returns the scoring parameters for a topic if set. diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 88a7070b089..e74437ea5d4 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -5,7 +5,7 @@ use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; use crate::router::{Router, RouterMessage}; use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription}; use crate::NetworkConfig; -use beacon_chain::validator_custody::CustodyCountChanged; +use beacon_chain::validator_custody::CustodyColumnBackfill; use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; use futures::channel::mpsc::Sender; @@ -37,8 +37,8 @@ use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use types::{ - ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, - Unsigned, ValidatorSubscription, + ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, + SyncSubnetId, Unsigned, ValidatorSubscription, }; mod tests; @@ -138,7 +138,7 @@ pub enum ValidatorSubscriptionMessage { pub enum SyncServiceMessage { /// Custody group count changed due to a change in validators' weight. /// Trigger data column backfill. - CustodyCountChanged(CustodyCountChanged), + CustodyColumnBackfill(CustodyColumnBackfill), } #[derive(Clone)] @@ -769,7 +769,8 @@ impl NetworkService { sampling_count, } => { // subscribe to `sampling_count` subnets - self.libp2p + let columns_to_backfill = self + .libp2p .subscribe_new_data_column_subnets(sampling_count); if self .network_globals @@ -778,10 +779,12 @@ impl NetworkService { .is_none() { self.libp2p.update_enr_cgc(new_custody_group_count); - let message = SyncServiceMessage::CustodyCountChanged(CustodyCountChanged { - new_custody_group_count, - sampling_count, - }); + let message = + SyncServiceMessage::CustodyColumnBackfill(CustodyColumnBackfill { + columns_to_backfill, + // TODO(cgc-backfill) use correct start epoch (we might not need this) + start_epoch: Epoch::new(0), + }); if let Err(e) = self.sync_service_send.send(message.clone()) { tracing::error!(error = %e, ?message, "Could not send message to the syn service."); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0d06ef0a939..44a8031dcd8 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -764,7 +764,7 @@ impl SyncManager { pub(crate) fn handle_service_message(&mut self, sync_service_message: SyncServiceMessage) { match sync_service_message { - SyncServiceMessage::CustodyCountChanged(_custody_count_changed) => { + SyncServiceMessage::CustodyColumnBackfill(_custody_count_backfill) => { let _ = &self.backfill_sync; todo!() } From a9d5d402217a3296fccc7ff2211e867b99451d02 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 22 Jun 2025 13:42:31 +0300 Subject: [PATCH 3/7] Add tests to historical data column import --- .../beacon_chain/src/historical_blocks.rs | 136 +++++++- .../beacon_chain/src/validator_custody.rs | 1 - beacon_node/beacon_chain/tests/store_tests.rs | 323 +++++++++++++++++- beacon_node/network/src/service.rs | 4 +- beacon_node/network/src/sync/manager.rs | 5 +- beacon_node/store/src/hot_cold_store.rs | 13 + 6 files changed, 471 insertions(+), 11 deletions(-) diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 348e6d52a64..950386a4a18 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -12,7 +12,7 @@ use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::debug; -use types::{FixedBytesExtended, Hash256, Slot}; +use types::{ColumnIndex, DataColumnSidecarList, FixedBytesExtended, Hash256, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -38,12 +38,45 @@ pub enum HistoricalBlockError { StoreError(StoreError), } +#[derive(Debug)] +pub enum HistoricalDataColumnError { + // The provided data column sidecar contains at least one column with a mismatched block root. + MismatchedBlockRoot { + data_column_block_root: Hash256, + data_column_index: ColumnIndex, + expected_block_root: Hash256, + }, + + /// The provided data column sidecar contains a block signature that doesn't match + /// the block stored in the database. + InvalidSignature { + data_column_block_root: Hash256, + }, + + // The provided data column sidecar pertains to a block that doesn't exist in the database. + NoBlockFound { + data_column_block_root: Hash256, + }, + + /// Logic error: should never occur. + IndexOutOfBounds, + + /// Internal store error + StoreError(StoreError), +} + impl From for HistoricalBlockError { fn from(e: StoreError) -> Self { Self::StoreError(e) } } +impl From for HistoricalDataColumnError { + fn from(e: StoreError) -> Self { + Self::StoreError(e) + } +} + impl BeaconChain { /// Store a batch of historical blocks in the database. /// @@ -286,4 +319,105 @@ impl BeaconChain { Ok(num_relevant) } + + /// Store a batch of historical data columns in the database. + /// + /// The data columns block roots and proposer signatures are verified with the existing + /// block stored in the DB. This function assumes that KZG proofs have already been verified. + /// + /// Return the number of `data_columns` successfully imported. + pub fn import_historical_data_column_batch( + &self, + historical_data_column_sidecars: Vec>, + ) -> Result { + let mut total_imported = 0; + let mut ops = vec![]; + + if historical_data_column_sidecars.is_empty() { + return Ok(total_imported); + } + + for data_column_sidecar_list in historical_data_column_sidecars { + let block_root = { + let Some(data_column) = data_column_sidecar_list.first() else { + return Err(HistoricalDataColumnError::IndexOutOfBounds); + }; + + let first_block_root = data_column.block_root(); + + if let Some(mismatched_sidecar) = + data_column_sidecar_list.iter().find(|data_column_sidecar| { + data_column_sidecar.block_root() != first_block_root + }) + { + let error = HistoricalDataColumnError::MismatchedBlockRoot { + data_column_block_root: mismatched_sidecar.block_root(), + data_column_index: mismatched_sidecar.index, + expected_block_root: first_block_root, + }; + tracing::warn!( + block_root=%mismatched_sidecar.block_root(), + data_column_index=%mismatched_sidecar.index, + num_blob_sidecars=%data_column_sidecar_list.len(), + ?error, + "Aborting data column sidecar import" + ); + + return Err(error); + } + first_block_root + }; + + let Some(block) = self.store.get_blinded_block(&block_root)? else { + let error = HistoricalDataColumnError::NoBlockFound { + data_column_block_root: block_root, + }; + tracing::warn!( + %block_root, + num_blob_sidecars = data_column_sidecar_list.len(), + ?error, + "Aborting data column sidecar import" + ); + return Err(error); + }; + + for data_column in data_column_sidecar_list { + if &data_column.signed_block_header.signature != block.signature() { + let error = HistoricalDataColumnError::InvalidSignature { + data_column_block_root: data_column.block_root(), + }; + tracing::warn!( + block_root = ?block_root, + column_index = data_column.index, + ?error, + "Aborting data column sidecar import" + ); + return Err(error); + } + + if self + .store + .get_data_column(&block_root, &data_column.index)? + .is_none() + { + tracing::debug!( + block_root = ?block_root, + column_index = data_column.index, + "Skipping data column import as identical data column exists" + ); + continue; + } + + self.store + .data_column_as_kv_store_ops(&block_root, data_column, &mut ops); + total_imported += 1; + } + } + + self.store.blobs_db.do_atomically(ops)?; + + tracing::debug!(total_imported, "Imported historical data columns"); + + Ok(total_imported) + } } diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 9724f75f9fa..0dd95342498 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -265,7 +265,6 @@ pub struct CustodyCountChanged { #[derive(Debug, Clone)] pub struct CustodyColumnBackfill { pub columns_to_backfill: Vec, - pub start_epoch: Epoch, } /// The custody information that gets persisted across runs. diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index e399339545a..5f044963878 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -12,8 +12,9 @@ use beacon_chain::test_utils::{ }; use beacon_chain::{ data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, - migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, - BlockError, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, + historical_blocks::HistoricalDataColumnError, migrate::MigratorConfig, BeaconChain, + BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig, + NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, }; use logging::create_test_tracing_subscriber; use maplit::hashset; @@ -2578,6 +2579,324 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { assert_eq!(store.get_anchor_info().anchor_slot, 0); } +#[tokio::test] +async fn test_import_historical_data_columns_batch() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + data_columns_list.push(data_columns.unwrap()); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()) + } +} + +// This should verify that a data column sidecar containing mismatched block roots should fail to be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_mismatched_block_root() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + let mut data_column_sidecar = vec![]; + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + if data_column.index % 2 == 0 { + data_column.signed_block_header.message.body_root = Hash256::ZERO; + } + + data_column_sidecar.push(Arc::new(data_column)); + } + data_columns_list.push(data_column_sidecar); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::MismatchedBlockRoot { .. } + )); +} + +// This should verify that a data column sidecar associated to a block root that doesn't exist in the store cannot +// be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_no_block_found() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + let mut data_column_sidecar = vec![]; + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + data_column.signed_block_header.message.body_root = Hash256::ZERO; + data_column_sidecar.push(Arc::new(data_column)); + } + data_columns_list.push(data_column_sidecar); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + +// This should verify that a data column sidecar with an invalid signature cannot be imported into the store. +#[tokio::test] +async fn test_import_historical_data_columns_batch_invalid_signature() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + let mut data_column_sidecar = vec![]; + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + data_column.signed_block_header.signature = Signature::infinity().unwrap().into(); + data_column_sidecar.push(Arc::new(data_column)); + } + data_columns_list.push(data_column_sidecar); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::InvalidSignature { .. } + )); +} + /// Test that blocks and attestations that refer to states around an unaligned split state are /// processed correctly. #[tokio::test] diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e74437ea5d4..5bc33fc8fdc 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -37,7 +37,7 @@ use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use types::{ - ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, + ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription, }; @@ -782,8 +782,6 @@ impl NetworkService { let message = SyncServiceMessage::CustodyColumnBackfill(CustodyColumnBackfill { columns_to_backfill, - // TODO(cgc-backfill) use correct start epoch (we might not need this) - start_epoch: Epoch::new(0), }); if let Err(e) = self.sync_service_send.send(message.clone()) { tracing::error!(error = %e, ?message, "Could not send message to the syn service."); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 44a8031dcd8..44c99cde4bc 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -764,10 +764,7 @@ impl SyncManager { pub(crate) fn handle_service_message(&mut self, sync_service_message: SyncServiceMessage) { match sync_service_message { - SyncServiceMessage::CustodyColumnBackfill(_custody_count_backfill) => { - let _ = &self.backfill_sync; - todo!() - } + SyncServiceMessage::CustodyColumnBackfill(_custody_count_backfill) => {} } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 1663ec7b4d4..755a93a400e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -854,6 +854,19 @@ impl, Cold: ItemStore> HotColdDB )); } + pub fn data_column_as_kv_store_ops( + &self, + block_root: &Hash256, + data_column: Arc>, + ops: &mut Vec, + ) { + ops.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconDataColumn, + get_data_column_key(block_root, &data_column.index), + data_column.as_ssz_bytes(), + )); + } + pub fn put_data_columns( &self, block_root: &Hash256, From 8e3a6db0833efb144f1b6c004ce7a275d24c061c Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 22 Jun 2025 13:58:05 +0300 Subject: [PATCH 4/7] FMT --- beacon_node/network/src/service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 5bc33fc8fdc..baefe8df6e2 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -37,8 +37,8 @@ use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use types::{ - ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, - SyncSubnetId, Unsigned, ValidatorSubscription, + ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + Unsigned, ValidatorSubscription, }; mod tests; From 02a2ba2b5bc532e4e51f60311b49a33d2b4c7f9c Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 22 Jun 2025 16:16:16 +0300 Subject: [PATCH 5/7] linting --- beacon_node/beacon_chain/tests/store_tests.rs | 2 +- beacon_node/network/src/service/tests.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 5f044963878..5b5b6ba1587 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2853,7 +2853,7 @@ async fn test_import_historical_data_columns_batch_invalid_signature() { for data_column in data_columns.unwrap() { let mut data_column = (*data_column).clone(); - data_column.signed_block_header.signature = Signature::infinity().unwrap().into(); + data_column.signed_block_header.signature = Signature::infinity().unwrap(); data_column_sidecar.push(Arc::new(data_column)); } data_columns_list.push(data_column_sidecar); diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 15c3321e94d..6d517c28f66 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -11,6 +11,7 @@ use lighthouse_network::{Enr, GossipTopic}; use std::str::FromStr; use std::sync::Arc; use tokio::runtime::Runtime; +use tokio::sync::mpsc; use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId}; impl NetworkService { @@ -128,6 +129,7 @@ fn test_removing_topic_weight_on_old_topics() { config.discv5_config.table_filter = |_| true; // Do not ignore local IPs config.upnp_enabled = false; let config = Arc::new(config); + let (sync_service_send, sync_service_recv) = mpsc::unbounded_channel(); let beacon_processor_channels = BeaconProcessorChannels::new(&BeaconProcessorConfig::default()); @@ -135,6 +137,8 @@ fn test_removing_topic_weight_on_old_topics() { beacon_chain.clone(), config, executor.clone(), + sync_service_send, + sync_service_recv, None, beacon_processor_channels.beacon_processor_tx, beacon_processor_channels.work_reprocessing_tx, From e943f1490f890c4e5471df43059939a7a3a102ea Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Mon, 23 Jun 2025 22:48:07 +0300 Subject: [PATCH 6/7] initial building blocks of the custody backfill sync service --- .../src/service/api_types.rs | 4 + .../src/network_beacon_processor/mod.rs | 28 +- .../network_beacon_processor/sync_methods.rs | 121 +++- .../network/src/sync/backfill_sync/mod.rs | 2 +- .../src/sync/custody_sync/custody_batch.rs | 240 ++++++++ .../network/src/sync/custody_sync/mod.rs | 575 ++++++++++++++++++ beacon_node/network/src/sync/manager.rs | 5 + beacon_node/network/src/sync/mod.rs | 2 + .../network/src/sync/network_context.rs | 93 +++ .../network/src/sync/range_sync/batch.rs | 12 +- .../network/src/sync/range_sync/mod.rs | 4 +- beacon_node/store/src/hot_cold_store.rs | 2 +- 12 files changed, 1080 insertions(+), 8 deletions(-) create mode 100644 beacon_node/network/src/sync/custody_sync/custody_batch.rs create mode 100644 beacon_node/network/src/sync/custody_sync/mod.rs diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index b36f8cc2154..f0aacf97d6b 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -29,6 +29,8 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Data columns by range request for custody sync + CustodySyncDataColumnsByRange(Id), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -79,6 +81,7 @@ pub struct ComponentsByRangeRequestId { pub enum RangeRequestId { RangeSync { chain_id: Id, batch_id: Epoch }, BackfillSync { batch_id: Epoch }, + CustodySync { batch_id: Epoch }, } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -247,6 +250,7 @@ impl Display for RangeRequestId { match self { Self::RangeSync { chain_id, batch_id } => write!(f, "RangeSync/{batch_id}/{chain_id}"), Self::BackfillSync { batch_id } => write!(f, "BackfillSync/{batch_id}"), + Self::CustodySync { batch_id } => write!(f, "CustodySync/{batch_id}"), } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f7c3a1bf8db..76ff43ca749 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -535,13 +535,39 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to import `data_columns` as part of custody backfill sync. + pub fn send_data_column_sidecar_list( + self: &Arc, + process_id: ChainSegmentProcessId, + data_column_sidecar_list: DataColumnSidecarList, + ) -> Result<(), Error> { + let is_custody_backfill: bool = matches!( + &process_id, + ChainSegmentProcessId::CustodyBackSyncBatchId { .. } + ); + if !is_custody_backfill { + // TODO(cgc-backfill) this should error if its not custody backfill + todo!() + } + debug!(data_columns_sidecars = data_column_sidecar_list.len(), id = ?process_id, "Batch data column sidecar list sending for process"); + + let processor = self.clone(); + let process_fn = async move { + processor + .process_chain_segment(process_id, blocks, notify_execution_layer) + .await; + }; + todo!() + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, process_id: ChainSegmentProcessId, blocks: Vec>, ) -> Result<(), Error> { - let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); + let is_backfill: bool = + matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process"); let processor = self.clone(); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index cff6e26165b..afcfd615515 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -8,7 +8,9 @@ use crate::sync::{ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; -use beacon_chain::data_column_verification::verify_kzg_for_data_column_list; +use beacon_chain::data_column_verification::{ + verify_kzg_for_data_column_list, verify_kzg_for_data_column_list_with_scoring, +}; use beacon_chain::{ validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -25,7 +27,10 @@ use store::KzgCommitment; use tracing::{debug, error, info, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256}; +use types::{ + BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256, + RuntimeVariableList, +}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -34,6 +39,8 @@ pub enum ChainSegmentProcessId { RangeBatchId(ChainId, Epoch), /// Processing ID for a backfill syncing batch. BackSyncBatchId(Epoch), + /// Processing ID for a custody backfill syncing batch. + CustodyBackSyncBatchId(Epoch), } /// Returned when a chain segment import fails. @@ -445,6 +452,59 @@ impl NetworkBeaconProcessor { self.chain.process_sampling_completed(block_root).await; } + /// Attempt to import the `data_column_sidecar_list` to the beacon chain. + pub async fn process_data_column_sidecar_list( + &self, + sync_type: ChainSegmentProcessId, + downloaded_data_column_sidecar_list: Vec>, + ) { + let result = match sync_type { + // this a request from the Backfill sync + ChainSegmentProcessId::CustodyBackSyncBatchId(epoch) => { + // TODO(cgc-backfill) this number isnt actually correct, we'll need to flatten + let sent_data_columns = downloaded_data_column_sidecar_list.len(); + match self + .process_custody_backfill_data_columns(downloaded_data_column_sidecar_list) + { + (imported_data_columns, Ok(_)) => { + // TODO(cgc-backfill) maybe log more granular details for debugging purposes + debug!( + batch_epoch = %epoch, + keep_execution_payload = !self.chain.store.get_config().prune_payloads, + service= "custody_backfill_sync", + "Custody backfill batch processed"); + BatchProcessResult::CustodyBackfillSuccess { + sent_data_columns, + imported_data_columns, + } + } + (_, Err(e)) => { + debug!( + batch_epoch = %epoch, + error = %e.message, + service = "custody_backfill_sync", + "Custody backfill batch processing failed" + ); + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: 0, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, + } + } + } + } + // TODO(cgc-backfill) + // we should only accept requests from CustodyBackfillSync + _ => { + todo!() + } + }; + + self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( @@ -549,6 +609,8 @@ impl NetworkBeaconProcessor { } } } + // TODO(cgc-backfill) + ChainSegmentProcessId::CustodyBackSyncBatchId(_) => todo!(), }; self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); @@ -596,6 +658,61 @@ impl NetworkBeaconProcessor { } } + /// Helper function to process custody backfill data columns + fn process_custody_backfill_data_columns( + &self, + downloaded_data_column_sidecar_list: Vec>, + ) -> (usize, Result<(), ChainSegmentFailed>) { + let all_data_columns = downloaded_data_column_sidecar_list + .clone() + .into_iter() + .flatten() + .collect::>(); + + let total_data_column_sidecars = all_data_columns.len(); + + let all_data_columns = RuntimeVariableList::from_vec( + all_data_columns, + self.chain.spec.number_of_columns as usize, + ); + + // TODO(cgc-backfill) clean up, we probably dont need a match + // Attributes fault to the specific peer that sent an invalid column + match verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.chain.kzg) + { + Ok(_) => {} + Err(_) => { + // TODO(cgc-backfill) how should we handle errors here + return ( + 0, + Err(ChainSegmentFailed { + peer_action: Some(PeerAction::LowToleranceError), + message: format!("KZG verification failed"), + }), + ); + } + } + + match self + .chain + .import_historical_data_column_batch(downloaded_data_column_sidecar_list) + { + Ok(imported_data_columns) => (imported_data_columns, Ok(())), + Err(e) => { + // TODO(cgc-backfill) which errors should be low tolerance vs high tolerance + let peer_action = match &e { + beacon_chain::historical_blocks::HistoricalDataColumnError::MismatchedBlockRoot { data_column_block_root, data_column_index, expected_block_root } => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::InvalidSignature { data_column_block_root } => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::NoBlockFound { data_column_block_root } => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::IndexOutOfBounds => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::StoreError(error) => todo!(), + }; + + todo!() + } + } + } + /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 035349c4757..49c20779af0 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -1186,7 +1186,7 @@ impl BackFillSync { } /// Error kind for attempting to restart the sync from beacon chain parameters. -enum ResetEpochError { +pub enum ResetEpochError { /// The chain has already completed. SyncCompleted, } diff --git a/beacon_node/network/src/sync/custody_sync/custody_batch.rs b/beacon_node/network/src/sync/custody_sync/custody_batch.rs new file mode 100644 index 00000000000..5f4d12ee188 --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/custody_batch.rs @@ -0,0 +1,240 @@ +use crate::sync::{ + range_sync::{Attempt, WrongState}, + BatchOperationOutcome, +}; +use lighthouse_network::service::api_types::Id; +use lighthouse_network::PeerId; +use std::{ + collections::HashSet, + hash::{Hash, Hasher}, + time::{Duration, Instant}, +}; +use strum::Display; +use types::{DataColumnSidecarList, Epoch, EthSpec, Slot}; + +/// The number of times to retry a batch before it is considered failed. +const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; + +/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed +/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. +const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; + +#[derive(Debug)] +pub struct CustodySyncBatchConfig {} + +impl CustodySyncBatchConfig { + fn max_batch_download_attempts() -> u8 { + MAX_BATCH_DOWNLOAD_ATTEMPTS + } + fn max_batch_processing_attempts() -> u8 { + MAX_BATCH_PROCESSING_ATTEMPTS + } + + pub fn batch_attempt_hash( + data_column_sidecar_list: &DataColumnSidecarList, + ) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + data_column_sidecar_list.hash(&mut hasher); + hasher.finish() + } +} + +#[derive(Debug)] +/// A segment of a chain. +pub struct CustodyBatchInfo { + /// Start slot of the batch. + start_slot: Slot, + /// End slot of the batch. + end_slot: Slot, + /// The `Attempts` that have been made and failed to send us this batch. + failed_processing_attempts: Vec, + /// Number of processing attempts that have failed but we do not count. + non_faulty_processing_attempts: u8, + /// The number of download retries this batch has undergone due to a failed request. + failed_download_attempts: Vec>, + /// State of the batch. + state: CustodyBatchState, +} + +impl CustodyBatchInfo { + pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self { + let start_slot = start_epoch.start_slot(E::slots_per_epoch()); + let end_slot = start_slot + num_of_epochs * E::slots_per_epoch(); + Self { + start_slot, + end_slot, + failed_processing_attempts: Vec::new(), + failed_download_attempts: Vec::new(), + non_faulty_processing_attempts: 0, + state: CustodyBatchState::AwaitingDownload, + } + } + + pub fn state(&self) -> &CustodyBatchState { + &self.state + } + + /// Gives a list of peers from which this batch has had a failed download or processing + /// attempt. + pub fn failed_peers(&self) -> HashSet { + let mut peers = HashSet::with_capacity( + self.failed_processing_attempts.len() + self.failed_download_attempts.len(), + ); + + for attempt in &self.failed_processing_attempts { + peers.insert(attempt.peer_id); + } + + for peer in self.failed_download_attempts.iter().flatten() { + peers.insert(*peer); + } + + peers + } + + pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { + match self.state.poison() { + CustodyBatchState::AwaitingDownload => { + self.state = CustodyBatchState::Downloading(request_id); + Ok(()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Starting download for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Mark the batch as failed and return whether we can attempt a re-download. + /// + /// This can happen if a peer disconnects or some error occurred that was not the peers fault. + /// The `peer` parameter, when set to None, does not increment the failed attempts of + /// this batch and register the peer, rather attempts a re-download. + #[must_use = "Batch may have failed"] + pub fn download_failed( + &mut self, + peer: Option, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Downloading(_) => { + // register the attempt and check if the batch can be tried again + self.failed_download_attempts.push(peer); + + self.state = if self.failed_download_attempts.len() + >= CustodySyncBatchConfig::max_batch_download_attempts() as usize + { + CustodyBatchState::Failed + } else { + // drop the blocks + CustodyBatchState::AwaitingDownload + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Download failed for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// After different operations over a batch, this could be in a state that allows it to + /// continue, or in failed state. When the batch has failed, we check if it did mainly due to + /// processing failures. In this case the batch is considered failed and faulty. + pub fn outcome(&self) -> BatchOperationOutcome { + match self.state { + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + CustodyBatchState::Failed => BatchOperationOutcome::Failed { + blacklist: self.failed_processing_attempts.len() + > self.failed_download_attempts.len(), + }, + _ => BatchOperationOutcome::Continue, + } + } + + pub fn start_processing(&mut self) -> Result<(DataColumnSidecarList, Duration), WrongState> { + match self.state.poison() { + CustodyBatchState::AwaitingProcessing( + peer, + data_column_sidecar_list, + start_instant, + ) => { + self.state = CustodyBatchState::Processing(Attempt::new_for_custody_backfill_sync( + peer, + &data_column_sidecar_list, + )); + Ok((data_column_sidecar_list, start_instant.elapsed())) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Starting processing batch in wrong state {:?}", + self.state + ))) + } + } + } +} + +impl std::fmt::Display for CustodyBatchInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Start Slot: {}, End Slot: {}, State: {}", + self.start_slot, self.end_slot, self.state + ) + } +} + +#[derive(Debug, Display)] +/// Current state of a batch +pub enum CustodyBatchState { + /// The batch has failed either downloading or processing, but can be requested again. + AwaitingDownload, + /// The batch is being downloaded. + Downloading(Id), + /// The batch has been completely downloaded and is ready for processing. + AwaitingProcessing(PeerId, DataColumnSidecarList, Instant), + /// The batch is being processed. + Processing(Attempt), + /// The batch was successfully processed and is waiting to be validated. + /// + /// It is not sufficient to process a batch successfully to consider it correct. This is + /// because batches could be erroneously empty, or incomplete. Therefore, a batch is considered + /// valid, only if the next sequential batch imports at least a block. + AwaitingValidation(Attempt), + /// Intermediate state for inner state handling. + Poisoned, + /// The batch has maxed out the allowed attempts for either downloading or processing. It + /// cannot be recovered. + Failed, +} + +impl CustodyBatchState { + /// Helper function for poisoning a state. + pub fn poison(&mut self) -> CustodyBatchState { + std::mem::replace(self, CustodyBatchState::Poisoned) + } + + /// Creates a character representation/visualization for the batch state to display in logs for quicker and + /// easier recognition + fn visualize(&self) -> char { + match self { + CustodyBatchState::Downloading(..) => 'D', + CustodyBatchState::Processing(_) => 'P', + CustodyBatchState::AwaitingValidation(_) => 'v', + CustodyBatchState::AwaitingDownload => 'd', + CustodyBatchState::Failed => 'F', + CustodyBatchState::AwaitingProcessing(..) => 'p', + CustodyBatchState::Poisoned => 'X', + } + } +} diff --git a/beacon_node/network/src/sync/custody_sync/mod.rs b/beacon_node/network/src/sync/custody_sync/mod.rs new file mode 100644 index 00000000000..aae0609ac89 --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/mod.rs @@ -0,0 +1,575 @@ +mod custody_batch; + +use std::{ + collections::{btree_map::Entry, BTreeMap, HashSet}, + sync::Arc, +}; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::{ + rpc::methods::DataColumnsByRangeRequest, service::api_types::RangeRequestId, + types::BackFillState, NetworkGlobals, PeerId, +}; +use logging::crit; +use tracing::{debug, error, info, instrument, warn}; +use types::{DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Slot}; + +use crate::{ + network_beacon_processor::ChainSegmentProcessId, + sync::{ + backfill_sync::{ + BackFillError, ProcessResult, ResetEpochError, SyncStart, BACKFILL_EPOCHS_PER_BATCH, + }, + custody_sync::custody_batch::{CustodyBatchInfo, CustodyBatchState}, + network_context::{RpcRequestSendError, SyncNetworkContext}, + range_sync::BatchId, + BatchOperationOutcome, + }, +}; + +pub use custody_batch::CustodySyncBatchConfig; + +/// The maximum number of batches to queue before requesting more. +const BACKFILL_BATCH_BUFFER_SIZE: u8 = 20; + +pub struct CustodyBackfillSync { + /// Keeps track of the current progress of the custody backfill. + /// This only gets refreshed from the beacon chain if we enter a failed state. + current_start: BatchId, + + /// Starting epoch of the batch that needs to be processed next. + /// This is incremented as the custody backfill advances. + processing_target: BatchId, + + /// Starting epoch of the next batch that needs to be downloaded. + to_be_downloaded: BatchId, + + /// Keeps track if we have requested the final batch. + last_batch_downloaded: bool, + + /// Sorted map of batches undergoing some kind of processing. + batches: BTreeMap>, + + /// The current processing batch, if any. + current_processing_batch: Option, + + /// Batches validated. + validated_batches: u64, + + /// We keep track of peers that are participating in the backfill sync. CustodySync + /// only use peers that custody the columns we'd like to backfill. If CustodySync fails, we don't + /// want to penalize all our synced peers, so we use this variable to keep track of peers that + /// have participated and only penalize these peers if custody sync fails. + participating_peers: HashSet, + + /// When a custody backfill sync fails, we keep track of whether a new fully synced peer has joined. + /// This signifies that we are able to attempt to restart a failed chain. + restart_failed_sync: bool, + + /// Reference to the beacon chain to obtain initial starting points for the backfill sync. + beacon_chain: Arc>, + + /// Reference to the network globals in order to obtain valid peers to backfill blocks from + /// (i.e synced peers). + network_globals: Arc>, + + /// The data column indices we're looking to backfill. + data_column_indices: Vec, +} + +impl CustodyBackfillSync { + #[instrument(parent = None, + name = "custody_backfill_sync", + skip_all + )] + pub fn new( + data_column_indices: Vec, + current_start: Epoch, + beacon_chain: Arc>, + network_globals: Arc>, + ) -> Self { + let cbs = CustodyBackfillSync { + batches: BTreeMap::new(), + processing_target: current_start, + current_start, + last_batch_downloaded: false, + to_be_downloaded: current_start, + network_globals, + current_processing_batch: None, + validated_batches: 0, + participating_peers: HashSet::new(), + restart_failed_sync: false, + beacon_chain, + data_column_indices, + }; + + // Update the global network state with the current backfill state. + cbs.set_state(BackFillState::Paused); + cbs + } + + /// Starts or resumes syncing. + /// + /// If resuming is successful, reports back the current syncing metrics. + #[must_use = "A failure here indicates the custody backfill sync has failed and the global sync state should be updated"] + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + pub fn start( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + match self.state() { + BackFillState::Syncing => {} // already syncing ignore. + BackFillState::Paused => { + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + // TODO(cgc-backfill) get peers that we can sync with + // If there are peers to resume with, begin the resume. + debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming custody backfill sync"); + self.set_state(BackFillState::Syncing); + // Resume any previously failed batches. + self.resume_batches(network)?; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } + } + BackFillState::Failed => { + // Attempt to recover from a failed sync. All local variables should be reset and + // cleared already for a fresh start. + // We only attempt to restart a failed custody backfill sync if a new synced peer has been + // added. + if !self.restart_failed_sync { + return Ok(SyncStart::NotSyncing); + } + + self.set_state(BackFillState::Syncing); + + // TODO(cgc-backfill) we could try figuring out what epoch to start at again, or we can just + // live with potentially requesting redundant data columns + + debug!(start_epoch = %self.current_start, "Resuming a failed custody backfill sync"); + + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + } + BackFillState::Completed => return Ok(SyncStart::NotSyncing), + } + + Ok(SyncStart::Syncing { + completed: (self.validated_batches + * BACKFILL_EPOCHS_PER_BATCH + * T::EthSpec::slots_per_epoch()) as usize, + remaining: self + .current_start + .start_slot(T::EthSpec::slots_per_epoch()) + .saturating_sub(self.beacon_chain.genesis_backfill_slot) + .as_usize(), + }) + } + + /// When resuming a chain, this function searches for batches that need to be re-downloaded and + /// transitions their state to redownload the batch. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn resume_batches(&mut self, network: &mut SyncNetworkContext) -> Result<(), BackFillError> { + let batch_ids_to_retry = self + .batches + .iter() + .filter_map(|(batch_id, batch)| { + // In principle there should only ever be on of these, and we could terminate the + // loop early, however the processing is negligible and we continue the search + // for robustness to handle potential future modification + if matches!(batch.state(), CustodyBatchState::AwaitingDownload) { + Some(*batch_id) + } else { + None + } + }) + .collect::>(); + + for batch_id in batch_ids_to_retry { + self.send_batch(network, batch_id)?; + } + Ok(()) + } + + /// Requests the batch assigned to the given id from a given peer. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), BackFillError> { + if let Some(batch) = self.batches.get_mut(&batch_id) { + let synced_peers = self + .network_globals + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + + let failed_peers = batch.failed_peers(); + + // TODO(cgc-backfill) calculate start slot + // match network.dat + match network.custody_sync_data_columns_by_range_request( + todo!(), + T::EthSpec::slots_per_epoch(), + self.data_column_indices, + RangeRequestId::CustodySync { batch_id }, + &synced_peers, + &failed_peers, + ) { + Ok(request_ids) => { + for request_id in request_ids { + if let Err(e) = batch.start_downloading(request_id) { + return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + } + debug!(epoch = %batch_id, %batch, "Requesting batch"); + return Ok(()); + } + } + Err(e) => match e { + RpcRequestSendError::NoPeer(no_peer_error) => { + // If we are here the chain has no more synced peers + info!( + "reason" = format!("insufficient_synced_peers({no_peer_error:?})"), + "Custody backfill sync paused" + ); + self.set_state(BackFillState::Paused); + return Err(BackFillError::Paused); + } + RpcRequestSendError::InternalError(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, %batch, "Could not send batch request"); + // register the failed download and check if the batch can be retried + if let Err(e) = batch.start_downloading(1) { + return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + } + + match batch.download_failed(None) { + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.send_batch(network, batch_id) + } + } + } + }, + } + }; + + Ok(()) + } + + /// The syncing process has failed. + /// + /// This resets past variables, to allow for a fresh start when resuming. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn fail_sync(&mut self, error: BackFillError) -> Result<(), BackFillError> { + // Some errors shouldn't fail the chain. + if matches!(error, BackFillError::Paused) { + return Ok(()); + } + + // Set the state + self.set_state(BackFillState::Failed); + // Remove all batches and active requests and participating peers. + self.batches.clear(); + self.participating_peers.clear(); + self.restart_failed_sync = false; + + // Reset all downloading and processing targets + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + + // NOTE: Lets keep validated_batches for posterity + + // Emit the log here + error!(?error, "Custody backfill sync failed"); + + // Return the error, kinda weird pattern, but I want to use + // `self.fail_chain(_)?` in other parts of the code. + Err(error) + } + + /// Attempts to request the next required batches from the peer pool if custody backfill is in syncing state. It will exhaust the peer + /// pool and left over batches until the batch buffer is reached or all peers are exhausted. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), BackFillError> { + if !matches!(self.state(), BackFillState::Syncing) { + return Ok(()); + } + + // find the next pending batch and request it from the peer + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch(network) { + // send the batch + self.send_batch(network, batch_id)?; + } + + // No more batches, simply stop + Ok(()) + } + + /// Processes the next ready batch. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn process_completed_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + // Only process batches if backfill is syncing and only process one batch at a time + if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { + return Ok(ProcessResult::Successful); + } + + // Find the id of the batch we are going to process. + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + CustodyBatchState::AwaitingProcessing(..) => { + return self.process_batch(network, self.processing_target); + } + CustodyBatchState::Downloading(..) => { + // Batch is not ready, nothing to process + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + CustodyBatchState::Failed + | CustodyBatchState::AwaitingDownload + | CustodyBatchState::Processing(_) => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Chain should have been removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - Processing -> `self.current_processing_batch` is None + self.fail_sync(BackFillError::InvalidSyncState(String::from( + "Invalid expected batch state", + )))?; + return Ok(ProcessResult::Successful); + } + CustodyBatchState::AwaitingValidation(_) => { + // TODO: I don't think this state is possible, log a CRIT just in case. + // If this is not observed, add it to the failed state branch above. + crit!( + batch = ?self.processing_target, + "Chain encountered a robust batch awaiting validation" + ); + + self.processing_target -= BACKFILL_EPOCHS_PER_BATCH; + if self.to_be_downloaded >= self.processing_target { + self.to_be_downloaded = self.processing_target - BACKFILL_EPOCHS_PER_BATCH; + } + self.request_batches(network)?; + } + } + } else { + self.fail_sync(BackFillError::InvalidSyncState(format!( + "Batch not found for current processing target {}", + self.processing_target + )))?; + return Ok(ProcessResult::Successful); + } + Ok(ProcessResult::Successful) + } + + /// Processes the batch with the given id. + /// The batch must exist and be ready for processing + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn process_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result { + // Only process batches if this chain is Syncing, and only one at a time + if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { + return Ok(ProcessResult::Successful); + } + + let Some(batch) = self.batches.get_mut(&batch_id) else { + return self + .fail_sync(BackFillError::InvalidSyncState(format!( + "Trying to process a batch that does not exist: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + }; + + // NOTE: We send empty batches to the processor in order to trigger the block processor + // result callback. This is done, because an empty batch could end a chain and the logic + // for removing chains and checking completion is in the callback. + + let (data_column_sidecar_list, _) = match batch.start_processing() { + Err(e) => { + return self + .fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) + .map(|_| ProcessResult::Successful) + } + Ok(v) => v, + }; + + let process_id = ChainSegmentProcessId::CustodyBackSyncBatchId(batch_id); + self.current_processing_batch = Some(batch_id); + + if let Err(e) = network + .beacon_processor() + .send_chain_segment(process_id, blocks) + { + crit!( + msg = "process_batch", + error = %e, + batch = ?self.processing_target, + "Failed to send backfill segment to processor." + ); + // This is unlikely to happen but it would stall syncing since the batch now has no + // blocks to continue, and the chain is expecting a processing result that won't + // arrive. To mitigate this, (fake) fail this processing so that the batch is + // re-downloaded. + self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) + } else { + Ok(ProcessResult::Successful) + } + } + + /// Creates the next required batch from the chain. If there are no more batches required, + /// `false` is returned. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { + // TODO(cgc-backfill) make sure were stopping at the DA window as per + // the comment below + // don't request batches beyond the data availability window; + if self.last_batch_downloaded { + return None; + } + + // only request batches up to the buffer size limit + // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync + // if the current processing window is contained in a long range of skip slots. + let in_buffer = |batch: &CustodyBatchInfo| { + matches!( + batch.state(), + CustodyBatchState::Downloading(..) | CustodyBatchState::AwaitingProcessing(..) + ) + }; + if self + .batches + .iter() + .filter(|&(_epoch, batch)| in_buffer(batch)) + .count() + > BACKFILL_BATCH_BUFFER_SIZE as usize + { + return None; + } + + let batch_id = self.to_be_downloaded; + // this batch could have been included already being an optimistic batch + match self.batches.entry(batch_id) { + Entry::Occupied(_) => { + // this batch doesn't need downloading, let this same function decide the next batch + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); + self.include_next_batch(network) + } + Entry::Vacant(entry) => { + let batch_type = network.batch_type(batch_id); + entry.insert(CustodyBatchInfo::new(&batch_id, BACKFILL_EPOCHS_PER_BATCH)); + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); + Some(batch_id) + } + } + } + + /// Checks if custody backfill would complete by syncing to `start_epoch`. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn would_complete(&self, _start_epoch: Epoch) -> bool { + // TODO(cgc-backfill) this should return true if start + // start epoch == DA window + false + } + + /// Updates the global network state indicating the current state of a backfill sync. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn set_state(&self, state: BackFillState) { + *self.network_globals.backfill_state.write() = state; + } + + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn state(&self) -> BackFillState { + self.network_globals.backfill_state.read().clone() + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 44c99cde4bc..a6ceb14e22d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -215,6 +215,11 @@ pub enum BatchProcessResult { sent_blocks: usize, imported_blocks: usize, }, + /// The custody backfill batch was completed successfully. + CustodyBackfillSuccess { + sent_data_columns: usize, + imported_data_columns: usize, + }, /// The batch processing failed. It carries whether the processing imported any block. FaultyFailure { imported_blocks: usize, diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 0f5fd6fb9f1..ca9ccf17a4a 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -4,6 +4,7 @@ mod backfill_sync; mod block_lookups; mod block_sidecar_coupling; +mod custody_sync; pub mod manager; mod network_context; mod peer_sampling; @@ -12,6 +13,7 @@ mod range_sync; #[cfg(test)] mod tests; +pub use custody_sync::CustodySyncBatchConfig; pub use lighthouse_network::service::api_types::SamplingId; pub use manager::{BatchProcessResult, SyncMessage}; pub use range_sync::{BatchOperationOutcome, ChainId}; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d0e62e4ada7..be5f12a1b6d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -194,6 +194,9 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active DataColumnsByRange requests for custody sync + custody_sync_data_columns_by_range_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -282,6 +285,9 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + custody_sync_data_columns_by_range_requests: ActiveRequests::new( + "custody_sync_data_columns_by_range", + ), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), network_beacon_processor, @@ -309,6 +315,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + custody_sync_data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -344,12 +351,18 @@ impl SyncNetworkContext { .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let custody_sync_data_column_by_range_ids = custody_sync_data_columns_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::CustodySyncDataColumnsByRange(*req_id)); + blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(custody_sync_data_column_by_range_ids) .collect() } @@ -413,6 +426,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + custody_sync_data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -434,6 +448,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(custody_sync_data_columns_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -441,6 +456,49 @@ impl SyncNetworkContext { active_request_count_by_peer } + pub fn custody_sync_data_columns_by_range_request( + &mut self, + start_slot: Slot, + count: u64, + column_indices: Vec, + requester: RangeRequestId, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + ) -> Result, RpcRequestSendError> { + let active_request_count_by_peer = self.active_request_count_by_peer(); + let column_indices = column_indices.clone().into_iter().collect(); + // Attempt to find all required custody peers before sending any request or creating an ID + let columns_by_range_peers_to_request = { + Some(self.select_columns_by_range_peers_to_request( + &column_indices, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + }; + + let data_column_requests = columns_by_range_peers_to_request + .map(|columns_by_range_peers_to_request| { + columns_by_range_peers_to_request + .into_iter() + .map(|(peer_id, columns)| { + self.send_data_columns_by_range_request_for_custody_sync( + peer_id, + DataColumnsByRangeRequest { + start_slot: start_slot.into(), + count, + columns, + }, + ) + }) + .collect::, _>>() + }) + .transpose()?; + + // TODO(cgc-backfill) unwrap + Ok(data_column_requests.unwrap()) + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, @@ -1070,6 +1128,41 @@ impl SyncNetworkContext { Ok(id) } + fn send_data_columns_by_range_request_for_custody_sync( + &mut self, + peer_id: PeerId, + request: DataColumnsByRangeRequest, + ) -> Result { + let id = self.next_id(); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::CustodySyncDataColumnsByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "DataColumnsByRange", + slots = request.count, + epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + columns = ?request.columns, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.custody_sync_data_columns_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + DataColumnsByRangeRequestItems::new(request), + ); + Ok(id) + } + fn send_data_columns_by_range_request( &mut self, peer_id: PeerId, diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 264f83ee820..3217905484a 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -8,7 +8,9 @@ use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::time::{Duration, Instant}; use strum::Display; -use types::{Epoch, EthSpec, Slot}; +use types::{DataColumnSidecarList, Epoch, EthSpec, Slot}; + +use crate::sync::CustodySyncBatchConfig; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; @@ -454,6 +456,14 @@ impl Attempt { let hash = B::batch_attempt_hash(blocks); Attempt { peer_id, hash } } + + pub fn new_for_custody_backfill_sync( + peer_id: PeerId, + data_column_sidecar_list: &DataColumnSidecarList, + ) -> Self { + let hash = CustodySyncBatchConfig::batch_attempt_hash(data_column_sidecar_list); + Attempt { peer_id, hash } + } } impl std::fmt::Debug for BatchState { diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 8f881fba90f..9b09cd0b790 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,8 +8,8 @@ mod range; mod sync_type; pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, + Attempt, BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + ByRangeRequestType, WrongState, }; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; #[cfg(test)] diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0911c5170ae..44830bb64c6 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -3551,7 +3551,7 @@ pub fn get_ancestor_state_root<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStor .get_cold_state_root(target_slot) .map_err(Box::new) .map_err(StateSummaryIteratorError::LoadStateRootError)? - .ok_or_else(|| StateSummaryIteratorError::MissingStateRoot { + .ok_or(StateSummaryIteratorError::MissingStateRoot { target_slot, state_upper_limit, }); From e10da7ab250a6710d88b948867e9f69455940190 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 28 Jun 2025 11:22:22 +0300 Subject: [PATCH 7/7] more progress --- .../beacon_chain/src/historical_blocks.rs | 95 ++-- beacon_node/beacon_processor/src/lib.rs | 15 +- .../src/service/api_types.rs | 13 +- .../src/network_beacon_processor/mod.rs | 24 +- .../network_beacon_processor/sync_methods.rs | 91 ++-- .../src/sync/block_sidecar_coupling.rs | 2 +- .../src/sync/custody_sync/custody_batch.rs | 135 +++++- .../sync/custody_sync/custody_sync_manager.rs | 28 ++ .../network/src/sync/custody_sync/mod.rs | 422 ++++++++++++++++-- beacon_node/network/src/sync/manager.rs | 7 +- beacon_node/network/src/sync/mod.rs | 7 +- .../network/src/sync/network_context.rs | 69 +-- .../src/sync/network_context/custody.rs | 2 +- beacon_node/network/src/sync/tests/range.rs | 3 + common/eth2/src/lighthouse/sync_state.rs | 12 +- 15 files changed, 717 insertions(+), 208 deletions(-) create mode 100644 beacon_node/network/src/sync/custody_sync/custody_sync_manager.rs diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 1bdbf271adf..c82945ea8c5 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -329,45 +329,18 @@ impl BeaconChain { /// Return the number of `data_columns` successfully imported. pub fn import_historical_data_column_batch( &self, - historical_data_column_sidecars: Vec>, + historical_data_column_sidecar_list: DataColumnSidecarList, ) -> Result { let mut total_imported = 0; + let expected_imported = historical_data_column_sidecar_list.len(); let mut ops = vec![]; - if historical_data_column_sidecars.is_empty() { + if historical_data_column_sidecar_list.is_empty() { return Ok(total_imported); } - for data_column_sidecar_list in historical_data_column_sidecars { - let block_root = { - let Some(data_column) = data_column_sidecar_list.first() else { - return Err(HistoricalDataColumnError::IndexOutOfBounds); - }; - - let first_block_root = data_column.block_root(); - - if let Some(mismatched_sidecar) = - data_column_sidecar_list.iter().find(|data_column_sidecar| { - data_column_sidecar.block_root() != first_block_root - }) - { - let error = HistoricalDataColumnError::MismatchedBlockRoot { - data_column_block_root: mismatched_sidecar.block_root(), - data_column_index: mismatched_sidecar.index, - expected_block_root: first_block_root, - }; - tracing::warn!( - block_root=%mismatched_sidecar.block_root(), - data_column_index=%mismatched_sidecar.index, - num_blob_sidecars=%data_column_sidecar_list.len(), - ?error, - "Aborting data column sidecar import" - ); - - return Err(error); - } - first_block_root - }; + for data_column_sidecar in historical_data_column_sidecar_list { + let block_root = data_column_sidecar.block_root(); let Some(block) = self.store.get_blinded_block(&block_root)? else { let error = HistoricalDataColumnError::NoBlockFound { @@ -375,44 +348,42 @@ impl BeaconChain { }; tracing::warn!( %block_root, - num_blob_sidecars = data_column_sidecar_list.len(), + num_blob_sidecars = expected_imported, ?error, "Aborting data column sidecar import" ); return Err(error); }; - for data_column in data_column_sidecar_list { - if &data_column.signed_block_header.signature != block.signature() { - let error = HistoricalDataColumnError::InvalidSignature { - data_column_block_root: data_column.block_root(), - }; - tracing::warn!( - block_root = ?block_root, - column_index = data_column.index, - ?error, - "Aborting data column sidecar import" - ); - return Err(error); - } - - if self - .store - .get_data_column(&block_root, &data_column.index)? - .is_none() - { - tracing::debug!( - block_root = ?block_root, - column_index = data_column.index, - "Skipping data column import as identical data column exists" - ); - continue; - } + if &data_column_sidecar.signed_block_header.signature != block.signature() { + let error = HistoricalDataColumnError::InvalidSignature { + data_column_block_root: block_root, + }; + tracing::warn!( + block_root = ?block_root, + column_index = data_column_sidecar.index, + ?error, + "Aborting data column sidecar import" + ); + return Err(error); + } - self.store - .data_column_as_kv_store_ops(&block_root, data_column, &mut ops); - total_imported += 1; + if self + .store + .get_data_column(&block_root, &data_column_sidecar.index)? + .is_none() + { + tracing::debug!( + block_root = ?block_root, + column_index = data_column_sidecar.index, + "Skipping data column import as identical data column exists" + ); + continue; } + + self.store + .data_column_as_kv_store_ops(&block_root, data_column_sidecar, &mut ops); + total_imported += 1; } self.store.blobs_db.do_atomically(ops)?; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 0f324071a1e..b33694fce19 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -121,6 +121,7 @@ pub struct BeaconProcessorQueueLengths { column_reconstruction_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, + custody_backfill_queue: usize, gossip_block_queue: usize, gossip_blob_queue: usize, gossip_data_column_queue: usize, @@ -189,6 +190,7 @@ impl BeaconProcessorQueueLengths { column_reconstruction_queue: 64, chain_segment_queue: 64, backfill_chain_segment: 64, + custody_backfill_queue: 64, gossip_block_queue: 1024, gossip_blob_queue: 1024, gossip_data_column_queue: 1024, @@ -619,6 +621,7 @@ pub enum Work { }, ChainSegment(AsyncFn), ChainSegmentBackfill(AsyncFn), + CustodyBackfill(AsyncFn), Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), @@ -674,6 +677,7 @@ pub enum WorkType { IgnoredRpcBlock, ChainSegment, ChainSegmentBackfill, + CustodyBackfill, Status, BlocksByRangeRequest, BlocksByRootsRequest, @@ -726,6 +730,7 @@ impl Work { Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, Work::ChainSegment { .. } => WorkType::ChainSegment, Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill, + Work::CustodyBackfill(_) => WorkType::CustodyBackfill, Work::Status(_) => WorkType::Status, Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, @@ -894,6 +899,7 @@ impl BeaconProcessor { FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); + let mut custody_backfill_queue = FifoQueue::new(queue_lengths.custody_backfill_queue); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); @@ -1251,7 +1257,9 @@ impl BeaconProcessor { // Handle backfill sync chain segments. } else if let Some(item) = backfill_chain_segment.pop() { Some(item) - // Handle light client requests. + } else if let Some(item) = custody_backfill_queue.pop() { + Some(item) + // Handle light client requests. } else if let Some(item) = lc_gossip_finality_update_queue.pop() { Some(item) } else if let Some(item) = lc_gossip_optimistic_update_queue.pop() { @@ -1390,6 +1398,9 @@ impl BeaconProcessor { Work::ChainSegmentBackfill { .. } => { backfill_chain_segment.push(work, work_id) } + Work::CustodyBackfill { .. } => { + custody_backfill_queue.push(work, work_id) + } Work::Status { .. } => status_queue.push(work, work_id), Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id), Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id), @@ -1478,6 +1489,7 @@ impl BeaconProcessor { WorkType::ColumnReconstruction => column_reconstruction_queue.len(), WorkType::ChainSegment => chain_segment_queue.len(), WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), + WorkType::CustodyBackfill => custody_backfill_queue.len(), WorkType::Status => status_queue.len(), WorkType::BlocksByRangeRequest => blbrange_queue.len(), WorkType::BlocksByRootsRequest => blbroots_queue.len(), @@ -1631,6 +1643,7 @@ impl BeaconProcessor { task_spawner.spawn_async(work) } Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), + Work::CustodyBackfill(process_fn) => task_spawner.spawn_async(process_fn), Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn { BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn), BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn), diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index f0aacf97d6b..ccb19363c4b 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -29,8 +29,6 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), - /// Data columns by range request for custody sync - CustodySyncDataColumnsByRange(Id), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -65,6 +63,15 @@ pub struct DataColumnsByRangeRequestId { pub parent_request_id: ComponentsByRangeRequestId, } +// TODO(cgc-backfill) rename this +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyByRangeRequestId { + /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for data columns. + pub parent_request_id: CustodyByRangeParentRequestId, +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -81,7 +88,6 @@ pub struct ComponentsByRangeRequestId { pub enum RangeRequestId { RangeSync { chain_id: Id, batch_id: Epoch }, BackfillSync { batch_id: Epoch }, - CustodySync { batch_id: Epoch }, } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -250,7 +256,6 @@ impl Display for RangeRequestId { match self { Self::RangeSync { chain_id, batch_id } => write!(f, "RangeSync/{batch_id}/{chain_id}"), Self::BackfillSync { batch_id } => write!(f, "BackfillSync/{batch_id}"), - Self::CustodySync { batch_id } => write!(f, "CustodySync/{batch_id}"), } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 76ff43ca749..b94c0fc150d 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,4 +1,5 @@ use crate::sync::manager::BlockProcessType; +use crate::sync::CustodyBackSyncBatchId; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; @@ -538,26 +539,27 @@ impl NetworkBeaconProcessor { /// Create a new work event to import `data_columns` as part of custody backfill sync. pub fn send_data_column_sidecar_list( self: &Arc, - process_id: ChainSegmentProcessId, + process_id: CustodyBackSyncBatchId, data_column_sidecar_list: DataColumnSidecarList, ) -> Result<(), Error> { - let is_custody_backfill: bool = matches!( - &process_id, - ChainSegmentProcessId::CustodyBackSyncBatchId { .. } - ); - if !is_custody_backfill { - // TODO(cgc-backfill) this should error if its not custody backfill - todo!() - } + // TODO(cgc-backill) len is not correct debug!(data_columns_sidecars = data_column_sidecar_list.len(), id = ?process_id, "Batch data column sidecar list sending for process"); let processor = self.clone(); let process_fn = async move { processor - .process_chain_segment(process_id, blocks, notify_execution_layer) + .process_data_column_sidecar_list(process_id, data_column_sidecar_list) .await; }; - todo!() + + let process_fn = Box::pin(process_fn); + + let work = Work::CustodyBackfill(process_fn); + + self.try_send(BeaconWorkEvent { + drop_during_sync: true, + work, + }) } /// Create a new work event to import `blocks` as a beacon chain segment. diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index afcfd615515..b43786a459b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,9 +1,10 @@ use crate::metrics::{self, register_process_result_metrics}; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; +use crate::sync::custody_sync::CustodyBatchProcessResult; use crate::sync::BatchProcessResult; use crate::sync::{ manager::{BlockProcessType, SyncMessage}, - ChainId, + ChainId, CustodyBackSyncBatchId, }; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; @@ -39,8 +40,6 @@ pub enum ChainSegmentProcessId { RangeBatchId(ChainId, Epoch), /// Processing ID for a backfill syncing batch. BackSyncBatchId(Epoch), - /// Processing ID for a custody backfill syncing batch. - CustodyBackSyncBatchId(Epoch), } /// Returned when a chain segment import fails. @@ -455,54 +454,42 @@ impl NetworkBeaconProcessor { /// Attempt to import the `data_column_sidecar_list` to the beacon chain. pub async fn process_data_column_sidecar_list( &self, - sync_type: ChainSegmentProcessId, - downloaded_data_column_sidecar_list: Vec>, + sync_id: CustodyBackSyncBatchId, + downloaded_data_column_sidecar_list: DataColumnSidecarList, ) { - let result = match sync_type { - // this a request from the Backfill sync - ChainSegmentProcessId::CustodyBackSyncBatchId(epoch) => { - // TODO(cgc-backfill) this number isnt actually correct, we'll need to flatten - let sent_data_columns = downloaded_data_column_sidecar_list.len(); - match self - .process_custody_backfill_data_columns(downloaded_data_column_sidecar_list) - { - (imported_data_columns, Ok(_)) => { - // TODO(cgc-backfill) maybe log more granular details for debugging purposes - debug!( - batch_epoch = %epoch, - keep_execution_payload = !self.chain.store.get_config().prune_payloads, - service= "custody_backfill_sync", - "Custody backfill batch processed"); - BatchProcessResult::CustodyBackfillSuccess { - sent_data_columns, - imported_data_columns, - } - } - (_, Err(e)) => { - debug!( - batch_epoch = %epoch, - error = %e.message, - service = "custody_backfill_sync", - "Custody backfill batch processing failed" - ); - match e.peer_action { - Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks: 0, - penalty, - }, - None => BatchProcessResult::NonFaultyFailure, - } - } + // TODO(cgc-backfill) this number isnt actually correct, we'll need to flatten + let sent_data_columns = downloaded_data_column_sidecar_list.len(); + match self.process_custody_backfill_data_columns(downloaded_data_column_sidecar_list) { + (imported_data_columns, Ok(_)) => { + // TODO(cgc-backfill) maybe log more granular details for debugging purposes + debug!( + batch_epoch = %sync_id, + keep_execution_payload = !self.chain.store.get_config().prune_payloads, + service= "custody_backfill_sync", + "Custody backfill batch processed"); + CustodyBatchProcessResult::Success { + sent_data_columns, + imported_data_columns, } } - // TODO(cgc-backfill) - // we should only accept requests from CustodyBackfillSync - _ => { - todo!() + (_, Err(e)) => { + debug!( + batch_epoch = %sync_id, + error = %e.message, + service = "custody_backfill_sync", + "Custody backfill batch processing failed" + ); + match e.peer_action { + Some(penalty) => CustodyBatchProcessResult::FaultyFailure { + imported_data_columns: 0, + penalty, + }, + None => CustodyBatchProcessResult::NonFaultyFailure, + } } }; - self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); + // self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); } /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync @@ -609,8 +596,6 @@ impl NetworkBeaconProcessor { } } } - // TODO(cgc-backfill) - ChainSegmentProcessId::CustodyBackSyncBatchId(_) => todo!(), }; self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); @@ -661,18 +646,12 @@ impl NetworkBeaconProcessor { /// Helper function to process custody backfill data columns fn process_custody_backfill_data_columns( &self, - downloaded_data_column_sidecar_list: Vec>, + downloaded_data_column_sidecar_list: DataColumnSidecarList, ) -> (usize, Result<(), ChainSegmentFailed>) { - let all_data_columns = downloaded_data_column_sidecar_list - .clone() - .into_iter() - .flatten() - .collect::>(); - - let total_data_column_sidecars = all_data_columns.len(); + let total_data_column_sidecars = downloaded_data_column_sidecar_list.len(); let all_data_columns = RuntimeVariableList::from_vec( - all_data_columns, + downloaded_data_column_sidecar_list.clone(), self.chain.spec.number_of_columns as usize, ); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 0418ab45534..ac94275701f 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -22,7 +22,7 @@ enum ByRangeRequest { Complete(T), } -enum RangeBlockDataRequest { +pub enum RangeBlockDataRequest { NoData, Blobs(ByRangeRequest>>>), DataColumns { diff --git a/beacon_node/network/src/sync/custody_sync/custody_batch.rs b/beacon_node/network/src/sync/custody_sync/custody_batch.rs index 5f4d12ee188..da648d33ae7 100644 --- a/beacon_node/network/src/sync/custody_sync/custody_batch.rs +++ b/beacon_node/network/src/sync/custody_sync/custody_batch.rs @@ -1,9 +1,14 @@ +use crate::sync::range_sync::BatchProcessingResult; +use crate::sync::CustodyBackSyncBatchId; use crate::sync::{ range_sync::{Attempt, WrongState}, BatchOperationOutcome, }; -use lighthouse_network::service::api_types::Id; +use beacon_chain::BeaconChainTypes; +use fnv::FnvHashSet; +use lighthouse_network::service::api_types::{DataColumnsByRangeRequestId, Id}; use lighthouse_network::PeerId; +use std::marker::PhantomData; use std::{ collections::HashSet, hash::{Hash, Hasher}, @@ -22,6 +27,13 @@ const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; #[derive(Debug)] pub struct CustodySyncBatchConfig {} +// TODO(cgc-backfill) rename this +#[derive(Hash, PartialEq, Eq, Clone)] +pub struct CustodyByRangeParentRequestId { + pub id: u32, + pub requester: CustodyBackSyncBatchId, +} + impl CustodySyncBatchConfig { fn max_batch_download_attempts() -> u8 { MAX_BATCH_DOWNLOAD_ATTEMPTS @@ -109,6 +121,40 @@ impl CustodyBatchInfo { } } + /// Marks the batch as ready to be processed if the data columns are in the range. The number of + /// received data columns is returned, or the wrong batch end on failure + #[must_use = "Batch may have failed"] + pub fn download_completed( + &mut self, + data_columns: DataColumnSidecarList, + peer: PeerId, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Downloading(_) => { + let received = data_columns.len(); + self.state = + CustodyBatchState::AwaitingProcessing(peer, data_columns, Instant::now()); + Ok(received) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Download completed for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Verifies if incoming data columns belong to this batch. + pub fn is_expecting_data_columns(&self, request_id: &Id) -> bool { + if let CustodyBatchState::Downloading(expected_id) = &self.state { + return expected_id == request_id; + } + false + } + /// Mark the batch as failed and return whether we can attempt a re-download. /// /// This can happen if a peer disconnects or some error occurred that was not the peers fault. @@ -159,6 +205,66 @@ impl CustodyBatchInfo { } } + /// Returns the peer that is currently responsible for progressing the state of the batch. + pub fn processing_peer(&self) -> Option<&PeerId> { + match &self.state { + CustodyBatchState::AwaitingDownload + | CustodyBatchState::Failed + | CustodyBatchState::Downloading(..) => None, + CustodyBatchState::AwaitingProcessing(peer_id, _, _) + | CustodyBatchState::Processing(Attempt { peer_id, .. }) + | CustodyBatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + } + } + + pub fn attempts(&self) -> &[Attempt] { + &self.failed_processing_attempts + } + + #[must_use = "Custody batch may have failed"] + pub fn processing_completed( + &mut self, + procesing_result: BatchProcessingResult, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Processing(attempt) => { + self.state = match procesing_result { + BatchProcessingResult::Success => { + CustodyBatchState::AwaitingValidation(attempt) + } + BatchProcessingResult::FaultyFailure => { + // register the failed attempt + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() + >= CustodySyncBatchConfig::max_batch_processing_attempts() as usize + { + CustodyBatchState::Failed + } else { + CustodyBatchState::AwaitingDownload + } + } + BatchProcessingResult::NonFaultyFailure => { + self.non_faulty_processing_attempts = + self.non_faulty_processing_attempts.saturating_add(1); + CustodyBatchState::AwaitingDownload + } + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Procesing completed for batch in wrong state: {:?}", + self.state + ))) + } + } + } + pub fn start_processing(&mut self) -> Result<(DataColumnSidecarList, Duration), WrongState> { match self.state.poison() { CustodyBatchState::AwaitingProcessing( @@ -182,6 +288,33 @@ impl CustodyBatchInfo { } } } + + #[must_use = "Batch may have failed"] + pub fn validation_failed(&mut self) -> Result { + match self.state.poison() { + CustodyBatchState::AwaitingValidation(attempt) => { + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + self.state = if self.failed_processing_attempts.len() + >= CustodySyncBatchConfig::max_batch_processing_attempts() as usize + { + CustodyBatchState::Failed + } else { + CustodyBatchState::AwaitingDownload + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Validation failed for batch in wrong state: {:?}", + self.state + ))) + } + } + } } impl std::fmt::Display for CustodyBatchInfo { diff --git a/beacon_node/network/src/sync/custody_sync/custody_sync_manager.rs b/beacon_node/network/src/sync/custody_sync/custody_sync_manager.rs new file mode 100644 index 00000000000..dca02b70a9f --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/custody_sync_manager.rs @@ -0,0 +1,28 @@ +use crate::sync::custody_sync::CustodyBackfillSync; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::PeerAction; +use std::sync::Arc; + +struct CustodySyncManager { + /// A reference to the underlying beacon chain. + chain: Arc>, + + /// Custody Backfill syncing. + custody_backfill_sync: CustodyBackfillSync, +} + +/// The result of processing multiple data columns for custody backfill sync. +#[derive(Debug)] +pub enum CustodyBatchProcessResult { + /// The batch was completed successfully. It carries whether the sent batch contained data columns. + Success { + sent_data_columns: usize, + imported_data_columns: usize, + }, + /// The batch processing failed. It carries whether the processing imported any data columns. + FaultyFailure { + imported_data_columns: usize, + penalty: PeerAction, + }, + NonFaultyFailure, +} diff --git a/beacon_node/network/src/sync/custody_sync/mod.rs b/beacon_node/network/src/sync/custody_sync/mod.rs index aae0609ac89..48647942cf0 100644 --- a/beacon_node/network/src/sync/custody_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_sync/mod.rs @@ -1,37 +1,40 @@ mod custody_batch; +mod custody_sync_manager; use std::{ collections::{btree_map::Entry, BTreeMap, HashSet}, sync::Arc, + time::Instant, }; +use crate::sync::range_sync::WrongState; + use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::{ - rpc::methods::DataColumnsByRangeRequest, service::api_types::RangeRequestId, - types::BackFillState, NetworkGlobals, PeerId, + service::api_types::{Id, RangeRequestId}, + types::BackFillState, + NetworkGlobals, PeerAction, PeerId, }; use logging::crit; use tracing::{debug, error, info, instrument, warn}; -use types::{DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Slot}; - -use crate::{ - network_beacon_processor::ChainSegmentProcessId, - sync::{ - backfill_sync::{ - BackFillError, ProcessResult, ResetEpochError, SyncStart, BACKFILL_EPOCHS_PER_BATCH, - }, - custody_sync::custody_batch::{CustodyBatchInfo, CustodyBatchState}, - network_context::{RpcRequestSendError, SyncNetworkContext}, - range_sync::BatchId, - BatchOperationOutcome, - }, +use types::{DataColumnSidecarList, Epoch, EthSpec, Slot}; + +use crate::sync::{ + backfill_sync::{BackFillError, ProcessResult, SyncStart, BACKFILL_EPOCHS_PER_BATCH}, + custody_sync::custody_batch::{CustodyBatchInfo, CustodyBatchState}, + network_context::{RpcRequestSendError, SyncNetworkContext}, + range_sync::{BatchId, BatchProcessingResult}, + BatchOperationOutcome, BatchProcessResult, }; -pub use custody_batch::CustodySyncBatchConfig; +pub use custody_batch::{CustodyByRangeParentRequestId, CustodySyncBatchConfig}; +pub use custody_sync_manager::CustodyBatchProcessResult; /// The maximum number of batches to queue before requesting more. const BACKFILL_BATCH_BUFFER_SIZE: u8 = 20; +pub type CustodyBackSyncBatchId = Epoch; + pub struct CustodyBackfillSync { /// Keeps track of the current progress of the custody backfill. /// This only gets refreshed from the beacon chain if we enter a failed state. @@ -83,9 +86,9 @@ impl CustodyBackfillSync { skip_all )] pub fn new( - data_column_indices: Vec, - current_start: Epoch, beacon_chain: Arc>, + current_start: Epoch, + data_column_indices: Vec, network_globals: Arc>, ) -> Self { let cbs = CustodyBackfillSync { @@ -235,20 +238,17 @@ impl CustodyBackfillSync { // TODO(cgc-backfill) calculate start slot // match network.dat match network.custody_sync_data_columns_by_range_request( - todo!(), + Slot::new(0), T::EthSpec::slots_per_epoch(), - self.data_column_indices, - RangeRequestId::CustodySync { batch_id }, + &self.data_column_indices, + batch_id, &synced_peers, &failed_peers, ) { - Ok(request_ids) => { - for request_id in request_ids { - if let Err(e) = batch.start_downloading(request_id) { - return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); - } - debug!(epoch = %batch_id, %batch, "Requesting batch"); - return Ok(()); + Ok(request_id) => { + debug!(epoch = %batch_id, %batch, "Requesting batch"); + if let Err(e) = batch.start_downloading(request_id) { + return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); } } Err(e) => match e { @@ -457,29 +457,329 @@ impl CustodyBackfillSync { Ok(v) => v, }; - let process_id = ChainSegmentProcessId::CustodyBackSyncBatchId(batch_id); + let process_id = batch_id as CustodyBackSyncBatchId; self.current_processing_batch = Some(batch_id); if let Err(e) = network .beacon_processor() - .send_chain_segment(process_id, blocks) + .send_data_column_sidecar_list(process_id, data_column_sidecar_list) { crit!( msg = "process_batch", error = %e, batch = ?self.processing_target, - "Failed to send backfill segment to processor." + "Failed to send custody backfill segment to processor." ); // This is unlikely to happen but it would stall syncing since the batch now has no // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) + self.on_batch_process_result( + network, + batch_id, + &CustodyBatchProcessResult::NonFaultyFailure, + ) } else { Ok(ProcessResult::Successful) } } + /// The block processor has completed processing a batch. This function handles the result + /// of the batch processor. + /// If an error is returned the Custody BackFill sync has failed. + #[instrument(parent = None, + level = "info", + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + #[must_use = "A failure here indicates the custody backfill sync has failed and the global sync state should be updated"] + pub fn on_batch_process_result( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + result: &CustodyBatchProcessResult, + ) -> Result { + // The first two cases are possible in regular sync, should not occur in backfill, but we + // keep this logic for handling potential processing race conditions. + // result + let batch = match &self.current_processing_batch { + Some(processing_id) if *processing_id != batch_id => { + debug!( + batch_epoch = %batch_id.as_u64(), + expected_batch_epoch = processing_id.as_u64(), + "Unexpected batch result" + ); + return Ok(ProcessResult::Successful); + } + None => { + debug!(%batch_id, "Chain was not expecting a batch result"); + return Ok(ProcessResult::Successful); + } + _ => { + // batch_id matches, continue + self.current_processing_batch = None; + + match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => { + // This is an error. Fail the sync algorithm. + return self + .fail_sync(BackFillError::InvalidSyncState(format!( + "Current processing batch not found: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + } + } + } + }; + + let Some(peer) = batch.processing_peer() else { + self.fail_sync(BackFillError::BatchInvalidState( + batch_id, + String::from("Peer does not exist"), + ))?; + return Ok(ProcessResult::Successful); + }; + + debug!( + ?result, + %batch, + batch_epoch = %batch_id, + %peer, + client = %network.client_type(peer), + "Custody backfill batch processed" + ); + + match result { + CustodyBatchProcessResult::Success { + imported_data_columns, + .. + } => { + // TODO(cgc-backfill) + todo!() + } + CustodyBatchProcessResult::FaultyFailure { + imported_data_columns, + penalty, + } => { + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { + Err(e) => { + // Batch was in the wrong state + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) + .map(|_| ProcessResult::Successful) + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + // check that we have not exceeded the re-process retry counter + // If a batch has exceeded the invalid batch lookup attempts limit, it means + // that it is likely all peers are sending invalid batches + // repeatedly and are either malicious or faulty. We stop custody backfill sync and + // report all synced peers that have participated. + warn!( + score_adjustment = %penalty, + batch_epoch = %batch_id, + "Custody backfill batch failed to download. Penalizing peers" + ); + + for peer in self.participating_peers.drain() { + // TODO(das): `participating_peers` only includes block peers. Should we + // penalize the custody column peers too? + network.report_peer(peer, *penalty, "backfill_batch_failed"); + } + self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) + .map(|_| ProcessResult::Successful) + } + + Ok(BatchOperationOutcome::Continue) => { + // chain can continue. Check if it can be progressed + if *imported_data_columns > 0 { + // At least one block was successfully verified and imported, then we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.advance(network, batch_id); + } + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch_id) + .map(|_| ProcessResult::Successful) + } + } + } + CustodyBatchProcessResult::NonFaultyFailure => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure) + { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; + } + self.send_batch(network, batch_id)?; + Ok(ProcessResult::Successful) + } + } + } + + /// An invalid batch has been received that could not be processed, but that can be retried. + /// + /// These events occur when a peer has successfully responded with data columns, but the data columns we + /// have received are incorrect or invalid. This indicates the peer has not performed as + /// intended and can result in down scoring a peer. + #[instrument(parent = None, + level = "info", + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn handle_invalid_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), BackFillError> { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid. + + // TODO(cgc-backfill) update comments + // The previous batch could be incomplete due to the block sizes being too large to fit in + // a single RPC request or there could be consecutive empty batches which are not supposed + // to be there + + // The current (sub-optimal) strategy is to simply re-request all batches that could + // potentially be faulty. If a batch returns a different result than the original and + // results in successful processing, we downvote the original peer that sent us the batch. + + // this is our robust `processing_target`. All previous batches must be awaiting + // validation + let mut redownload_queue = Vec::new(); + + for (id, batch) in self + .batches + .iter_mut() + .filter(|(&id, _batch)| id > batch_id) + { + match batch + .validation_failed() + .map_err(|e| BackFillError::BatchInvalidState(batch_id, e.0))? + { + BatchOperationOutcome::Failed { blacklist: _ } => { + // Batch has failed and cannot be redownloaded. + return self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)); + } + BatchOperationOutcome::Continue => { + redownload_queue.push(*id); + } + } + } + + // no batch maxed out it process attempts, so now the chain's volatile progress must be + // reset + self.processing_target = self.current_start; + + for id in redownload_queue { + self.send_batch(network, id)?; + } + // finally, re-request the failed batch. + self.send_batch(network, batch_id) + } + + // TODO(cgc-backfill) update comments + /// Removes any batches previous to the given `validating_epoch` and updates the current + /// boundaries of the chain. + /// + /// The `validating_epoch` must align with batch boundaries. + /// + /// If a previous batch has been validated and it had been re-processed, penalize the original + /// peer. + #[instrument(parent = None, + level = "info", + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn advance(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { + // make sure this epoch produces an advancement + if validating_epoch >= self.current_start { + return; + } + + // We can now validate higher batches that the current batch. Here we remove all + // batches that are higher than the current batch. We add on an extra + // `BACKFILL_EPOCHS_PER_BATCH` as `split_off` is inclusive. + let removed_batches = self + .batches + .split_off(&(validating_epoch + BACKFILL_EPOCHS_PER_BATCH)); + + for (id, batch) in removed_batches.into_iter() { + self.validated_batches = self.validated_batches.saturating_add(1); + // only for batches awaiting validation can we be sure the last attempt is + // right, and thus, that any different attempt is wrong + match batch.state() { + CustodyBatchState::AwaitingValidation(ref processed_attempt) => { + for attempt in batch.attempts() { + // The validated batch has been re-processed + if attempt.hash != processed_attempt.hash { + // The re-downloaded version was different. + if processed_attempt.peer_id != attempt.peer_id { + // A different peer sent the correct batch, the previous peer did not + // We negatively score the original peer. + let action = PeerAction::LowToleranceError; + debug!( + batch_epoch = ?id, + score_adjustment = %action, + original_peer = %attempt.peer_id, + new_peer = %processed_attempt.peer_id, + "Re-processed batch validated. Scoring original peer" + ); + network.report_peer( + attempt.peer_id, + action, + "custody_backfill_reprocessed_original_peer", + ); + } else { + // The same peer corrected it's previous mistake. There was an error, so we + // negative score the original peer. + let action = PeerAction::MidToleranceError; + debug!( + batch_epoch = ?id, + score_adjustment = %action, + original_peer = %attempt.peer_id, + new_peer = %processed_attempt.peer_id, + "Re-processed batch validated by the same peer" + ); + network.report_peer( + attempt.peer_id, + action, + "custody_backfill_reprocessed_same_peer", + ); + } + } + } + } + CustodyBatchState::Downloading(..) => {} + CustodyBatchState::Failed + | CustodyBatchState::Poisoned + | CustodyBatchState::AwaitingDownload => { + crit!("batch indicates inconsistent chain state while advancing custody backfill sync") + } + CustodyBatchState::AwaitingProcessing(..) => {} + CustodyBatchState::Processing(_) => { + debug!(batch = %id, %batch, "Advancing custody backfill sync while processing a batch"); + if let Some(processing_id) = self.current_processing_batch { + if id >= processing_id { + self.current_processing_batch = None; + } + } + } + } + } + + self.processing_target = self.processing_target.min(validating_epoch); + self.current_start = validating_epoch; + self.to_be_downloaded = self.to_be_downloaded.min(validating_epoch); + if self.batches.contains_key(&self.to_be_downloaded) { + // if custody backfill is advanced by Range beyond the previous `self.to_be_downloaded`, we + // won't have this batch, so we need to request it. + self.to_be_downloaded -= BACKFILL_EPOCHS_PER_BATCH; + } + debug!(?validating_epoch, processing_target = ?self.processing_target, "Custody backfill advanced"); + } + /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. #[instrument(parent = None, @@ -542,6 +842,64 @@ impl CustodyBackfillSync { } } + /// Data columns have been received for this batch. + /// If the data column(s) correctly complete the batch it will be processed if possible. + /// If this returns an error, the custody backfill sync has failed and will be restarted once new peers + /// join the system. + /// The sync manager should update the global sync state on failure. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + #[must_use = "A failure here indicates the custdy backfill sync has failed and the global sync state should be updated"] + pub fn on_block_response( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + peer_id: &PeerId, + request_id: Id, + data_columns: DataColumnSidecarList, + ) -> Result { + // check if we have this batch + let Some(batch) = self.batches.get_mut(&batch_id) else { + if !matches!(self.state(), BackFillState::Failed) { + // A batch might get removed when custody backfill sync advances, so this is non fatal. + debug!(epoch = %batch_id, "Received a data column for unknown batch"); + } + return Ok(ProcessResult::Successful); + }; + + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed for other + // reasons. Check that the data columns belong to the expected peer, and that the + // request_id matches + if !batch.is_expecting_data_columns(&request_id) { + return Ok(ProcessResult::Successful); + } + + match batch.download_completed(data_columns, *peer_id) { + Ok(received) => { + let awaiting_batches = + self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; + debug!( + epoch = %batch_id, + blocks = received, + %awaiting_batches, + "Completed batch received" + ); + + // pre-emptively request more data columns from peers whilst we process current data columns, + self.request_batches(network)?; + self.process_completed_batches(network) + } + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; + Ok(ProcessResult::Successful) + } + } + } + /// Checks if custody backfill would complete by syncing to `start_epoch`. #[instrument(parent = None, fields(service = "custody_backfill_sync"), diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a6ceb14e22d..0f3d0ba5a98 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -215,11 +215,6 @@ pub enum BatchProcessResult { sent_blocks: usize, imported_blocks: usize, }, - /// The custody backfill batch was completed successfully. - CustodyBackfillSuccess { - sent_data_columns: usize, - imported_data_columns: usize, - }, /// The batch processing failed. It carries whether the processing imported any block. FaultyFailure { imported_blocks: usize, @@ -322,7 +317,7 @@ impl SyncManager { fork_context.clone(), ), range_sync: RangeSync::new(beacon_chain.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals), + backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals.clone()), block_lookups: BlockLookups::new(), notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index ca9ccf17a4a..124d8a32a0f 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -4,7 +4,7 @@ mod backfill_sync; mod block_lookups; mod block_sidecar_coupling; -mod custody_sync; +pub mod custody_sync; pub mod manager; mod network_context; mod peer_sampling; @@ -13,7 +13,10 @@ mod range_sync; #[cfg(test)] mod tests; -pub use custody_sync::CustodySyncBatchConfig; +pub use custody_sync::{ + CustodyBackSyncBatchId, CustodyByRangeParentRequestId, CustodySyncBatchConfig, +}; + pub use lighthouse_network::service::api_types::SamplingId; pub use manager::{BatchProcessResult, SyncMessage}; pub use range_sync::{BatchOperationOutcome, ChainId}; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index be5f12a1b6d..537f726c3b7 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -15,6 +15,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; +use crate::sync::{CustodyBackSyncBatchId, CustodyByRangeParentRequestId}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; @@ -24,8 +25,8 @@ use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, Req pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + CustodyByRangeRequest, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, + DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -194,13 +195,14 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, - /// A mapping of active DataColumnsByRange requests for custody sync - custody_sync_data_columns_by_range_requests: - ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, + /// Mapping of active custody column requests for a block root + custody_by_range_requests: + FnvHashMap>, + /// BlocksByRange requests paired with other ByRange requests for data components components_by_range_requests: FnvHashMap>, @@ -285,10 +287,8 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), - custody_sync_data_columns_by_range_requests: ActiveRequests::new( - "custody_sync_data_columns_by_range", - ), custody_by_root_requests: <_>::default(), + custody_by_range_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -315,9 +315,10 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, - custody_sync_data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, + // custody_by_range_requests is a meta request for the custody sync service + custody_by_range_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, execution_engine_state: _, @@ -351,18 +352,12 @@ impl SyncNetworkContext { .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); - let custody_sync_data_column_by_range_ids = custody_sync_data_columns_by_range_requests - .active_requests_of_peer(peer_id) - .into_iter() - .map(|req_id| SyncRequestId::CustodySyncDataColumnsByRange(*req_id)); - blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) - .chain(custody_sync_data_column_by_range_ids) .collect() } @@ -426,9 +421,10 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, - custody_sync_data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, + // custody_by_range_requests is a meta request for the custody sync service + custody_by_range_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, execution_engine_state: _, @@ -448,7 +444,6 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) - .chain(custody_sync_data_columns_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -460,13 +455,19 @@ impl SyncNetworkContext { &mut self, start_slot: Slot, count: u64, - column_indices: Vec, - requester: RangeRequestId, + column_indices: &[ColumnIndex], + requester: CustodyBackSyncBatchId, peers: &HashSet, peers_to_deprioritize: &HashSet, - ) -> Result, RpcRequestSendError> { + ) -> Result { + // Create the overall components_by_range request ID before its individual components + let id = CustodyByRangeParentRequestId { + id: self.next_id(), + requester, + }; + let active_request_count_by_peer = self.active_request_count_by_peer(); - let column_indices = column_indices.clone().into_iter().collect(); + let column_indices = column_indices.into_iter().cloned().collect(); // Attempt to find all required custody peers before sending any request or creating an ID let columns_by_range_peers_to_request = { Some(self.select_columns_by_range_peers_to_request( @@ -482,21 +483,27 @@ impl SyncNetworkContext { columns_by_range_peers_to_request .into_iter() .map(|(peer_id, columns)| { - self.send_data_columns_by_range_request_for_custody_sync( + self.send_data_columns_by_range_custody_request( peer_id, DataColumnsByRangeRequest { start_slot: start_slot.into(), count, columns, }, + id.clone(), ) }) .collect::, _>>() }) .transpose()?; - // TODO(cgc-backfill) unwrap - Ok(data_column_requests.unwrap()) + if let Some(data_column_requests) = data_column_requests { + let request = DataColumnsByRangeRequestItems {}; + self.custody_by_range_requests + .insert(id.clone(), data_column_requests); + } + + Ok(id.id) } /// A blocks by range request sent by the range sync algorithm @@ -1128,17 +1135,21 @@ impl SyncNetworkContext { Ok(id) } - fn send_data_columns_by_range_request_for_custody_sync( + fn send_data_columns_by_range_custody_request( &mut self, peer_id: PeerId, request: DataColumnsByRangeRequest, - ) -> Result { - let id = self.next_id(); + parent_request_id: CustodyByRangeParentRequestId, + ) -> Result { + let id = CustodyByRangeRequest { + id: self.next_id(), + parent_request_id, + }; self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: RequestType::DataColumnsByRange(request.clone()), - app_request_id: AppRequestId::Sync(SyncRequestId::CustodySyncDataColumnsByRange(id)), + app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), }) .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; @@ -1152,7 +1163,7 @@ impl SyncNetworkContext { "Sync RPC request sent" ); - self.custody_sync_data_columns_by_range_requests.insert( + self.data_columns_by_range_requests.insert( id, peer_id, // false = do not enforce max_requests are returned for *_by_range methods. We don't diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index f4d010b881e..4a757060cee 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -55,7 +55,7 @@ pub enum Error { }, } -struct ActiveBatchColumnsRequest { +pub struct ActiveBatchColumnsRequest { indices: Vec, } diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index fa1e057765e..a58a4658fef 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -366,6 +366,9 @@ impl TestRig { RangeRequestId::BackfillSync { batch_id } => { ChainSegmentProcessId::BackSyncBatchId(batch_id) } + RangeRequestId::CustodySync { batch_id } => { + ChainSegmentProcessId::CustodyBackSyncBatchId(batch_id) + } }; self.find_and_complete_processing_chain_segment(id); diff --git a/common/eth2/src/lighthouse/sync_state.rs b/common/eth2/src/lighthouse/sync_state.rs index 0327f7073fa..8a679f26261 100644 --- a/common/eth2/src/lighthouse/sync_state.rs +++ b/common/eth2/src/lighthouse/sync_state.rs @@ -15,6 +15,10 @@ pub enum SyncState { /// specified by its peers. Once completed, the node enters this sync state and attempts to /// download all required historical blocks. BackFillSyncing { completed: usize, remaining: usize }, + /// The node is undertaking a custody backfill sync. This occurs when the custody group count + /// for a validator connected to this node changes. The node attempts to download its newly required + /// custody columns from the current head state up to the data availability window. + CustodyBackFillSyncing { completed: usize, remaining: usize }, /// The node has completed syncing a finalized chain and is in the process of re-evaluating /// which sync state to progress to. SyncTransition, @@ -65,8 +69,8 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => true, SyncState::SyncTransition => true, - // Backfill doesn't effect any logic, we consider this state, not syncing. - SyncState::BackFillSyncing { .. } => false, + // Both types of backfill sync do not effect any logic, we consider these states as not syncing. + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -78,6 +82,7 @@ impl SyncState { SyncState::SyncingHead { .. } => false, SyncState::SyncTransition => false, SyncState::BackFillSyncing { .. } => false, + SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -108,6 +113,9 @@ impl std::fmt::Display for SyncState { SyncState::Stalled => write!(f, "Stalled"), SyncState::SyncTransition => write!(f, "Evaluating known peers"), SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"), + SyncState::CustodyBackFillSyncing { .. } => { + write!(f, "Syncing newly required custody data columns") + } } } }