diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 57e19393165..c82945ea8c5 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -12,7 +12,7 @@ use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::debug; -use types::{FixedBytesExtended, Hash256, Slot}; +use types::{ColumnIndex, DataColumnSidecarList, FixedBytesExtended, Hash256, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -38,12 +38,45 @@ pub enum HistoricalBlockError { StoreError(StoreError), } +#[derive(Debug)] +pub enum HistoricalDataColumnError { + // The provided data column sidecar contains at least one column with a mismatched block root. + MismatchedBlockRoot { + data_column_block_root: Hash256, + data_column_index: ColumnIndex, + expected_block_root: Hash256, + }, + + /// The provided data column sidecar contains a block signature that doesn't match + /// the block stored in the database. + InvalidSignature { + data_column_block_root: Hash256, + }, + + // The provided data column sidecar pertains to a block that doesn't exist in the database. + NoBlockFound { + data_column_block_root: Hash256, + }, + + /// Logic error: should never occur. + IndexOutOfBounds, + + /// Internal store error + StoreError(StoreError), +} + impl From for HistoricalBlockError { fn from(e: StoreError) -> Self { Self::StoreError(e) } } +impl From for HistoricalDataColumnError { + fn from(e: StoreError) -> Self { + Self::StoreError(e) + } +} + impl BeaconChain { /// Store a batch of historical blocks in the database. /// @@ -287,4 +320,76 @@ impl BeaconChain { Ok(num_relevant) } + + /// Store a batch of historical data columns in the database. + /// + /// The data columns block roots and proposer signatures are verified with the existing + /// block stored in the DB. This function assumes that KZG proofs have already been verified. + /// + /// Return the number of `data_columns` successfully imported. + pub fn import_historical_data_column_batch( + &self, + historical_data_column_sidecar_list: DataColumnSidecarList, + ) -> Result { + let mut total_imported = 0; + let expected_imported = historical_data_column_sidecar_list.len(); + let mut ops = vec![]; + + if historical_data_column_sidecar_list.is_empty() { + return Ok(total_imported); + } + + for data_column_sidecar in historical_data_column_sidecar_list { + let block_root = data_column_sidecar.block_root(); + + let Some(block) = self.store.get_blinded_block(&block_root)? else { + let error = HistoricalDataColumnError::NoBlockFound { + data_column_block_root: block_root, + }; + tracing::warn!( + %block_root, + num_blob_sidecars = expected_imported, + ?error, + "Aborting data column sidecar import" + ); + return Err(error); + }; + + if &data_column_sidecar.signed_block_header.signature != block.signature() { + let error = HistoricalDataColumnError::InvalidSignature { + data_column_block_root: block_root, + }; + tracing::warn!( + block_root = ?block_root, + column_index = data_column_sidecar.index, + ?error, + "Aborting data column sidecar import" + ); + return Err(error); + } + + if self + .store + .get_data_column(&block_root, &data_column_sidecar.index)? + .is_none() + { + tracing::debug!( + block_root = ?block_root, + column_index = data_column_sidecar.index, + "Skipping data column import as identical data column exists" + ); + continue; + } + + self.store + .data_column_as_kv_store_ops(&block_root, data_column_sidecar, &mut ops); + total_imported += 1; + } + + self.store.blobs_db.do_atomically(ops)?; + + tracing::debug!(total_imported, "Imported historical data columns"); + + Ok(total_imported) + } } diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 1169b64537d..0dd95342498 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -255,11 +255,18 @@ impl CustodyContext { /// The custody count changed because of a change in the /// number of validators being managed. +#[derive(Debug, Clone)] pub struct CustodyCountChanged { pub new_custody_group_count: u64, pub sampling_count: u64, } +/// The data columns to backfill due to a custody count change. +#[derive(Debug, Clone)] +pub struct CustodyColumnBackfill { + pub columns_to_backfill: Vec, +} + /// The custody information that gets persisted across runs. #[derive(Debug, Encode, Decode, Clone)] pub struct CustodyContextSsz { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 1be2879e1ab..8e7804948e8 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -12,8 +12,9 @@ use beacon_chain::test_utils::{ }; use beacon_chain::{ data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, - migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, - BlockError, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, + historical_blocks::HistoricalDataColumnError, migrate::MigratorConfig, BeaconChain, + BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig, + NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, }; use logging::create_test_tracing_subscriber; use maplit::hashset; @@ -2625,6 +2626,324 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0)); } +#[tokio::test] +async fn test_import_historical_data_columns_batch() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + data_columns_list.push(data_columns.unwrap()); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()) + } +} + +// This should verify that a data column sidecar containing mismatched block roots should fail to be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_mismatched_block_root() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + let mut data_column_sidecar = vec![]; + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + if data_column.index % 2 == 0 { + data_column.signed_block_header.message.body_root = Hash256::ZERO; + } + + data_column_sidecar.push(Arc::new(data_column)); + } + data_columns_list.push(data_column_sidecar); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::MismatchedBlockRoot { .. } + )); +} + +// This should verify that a data column sidecar associated to a block root that doesn't exist in the store cannot +// be imported. +#[tokio::test] +async fn test_import_historical_data_columns_batch_no_block_found() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + let mut data_column_sidecar = vec![]; + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + data_column.signed_block_header.message.body_root = Hash256::ZERO; + data_column_sidecar.push(Arc::new(data_column)); + } + data_columns_list.push(data_column_sidecar); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::NoBlockFound { .. } + )); +} + +// This should verify that a data column sidecar with an invalid signature cannot be imported into the store. +#[tokio::test] +async fn test_import_historical_data_columns_batch_invalid_signature() { + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + let start_slot = Slot::new(1); + let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1); + + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + (E::slots_per_epoch() * 2) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + harness.advance_slot(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + let mut data_columns_list = vec![]; + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_some()); + let mut data_column_sidecar = vec![]; + + for data_column in data_columns.unwrap() { + let mut data_column = (*data_column).clone(); + data_column.signed_block_header.signature = Signature::infinity().unwrap(); + data_column_sidecar.push(Arc::new(data_column)); + } + data_columns_list.push(data_column_sidecar); + } + + harness + .extend_chain( + (E::slots_per_epoch() * 4) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + + harness + .chain + .store + .try_prune_blobs(true, Epoch::new(2)) + .unwrap(); + + let block_root_iter = harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap(); + + for block in block_root_iter { + let (block_root, _) = block.unwrap(); + let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap(); + assert!(data_columns.is_none()) + } + + let error = harness + .chain + .import_historical_data_column_batch(data_columns_list) + .unwrap_err(); + + assert!(matches!( + error, + HistoricalDataColumnError::InvalidSignature { .. } + )); +} + /// Test that blocks and attestations that refer to states around an unaligned split state are /// processed correctly. #[tokio::test] diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 0f324071a1e..b33694fce19 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -121,6 +121,7 @@ pub struct BeaconProcessorQueueLengths { column_reconstruction_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, + custody_backfill_queue: usize, gossip_block_queue: usize, gossip_blob_queue: usize, gossip_data_column_queue: usize, @@ -189,6 +190,7 @@ impl BeaconProcessorQueueLengths { column_reconstruction_queue: 64, chain_segment_queue: 64, backfill_chain_segment: 64, + custody_backfill_queue: 64, gossip_block_queue: 1024, gossip_blob_queue: 1024, gossip_data_column_queue: 1024, @@ -619,6 +621,7 @@ pub enum Work { }, ChainSegment(AsyncFn), ChainSegmentBackfill(AsyncFn), + CustodyBackfill(AsyncFn), Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), @@ -674,6 +677,7 @@ pub enum WorkType { IgnoredRpcBlock, ChainSegment, ChainSegmentBackfill, + CustodyBackfill, Status, BlocksByRangeRequest, BlocksByRootsRequest, @@ -726,6 +730,7 @@ impl Work { Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, Work::ChainSegment { .. } => WorkType::ChainSegment, Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill, + Work::CustodyBackfill(_) => WorkType::CustodyBackfill, Work::Status(_) => WorkType::Status, Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, @@ -894,6 +899,7 @@ impl BeaconProcessor { FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); + let mut custody_backfill_queue = FifoQueue::new(queue_lengths.custody_backfill_queue); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); @@ -1251,7 +1257,9 @@ impl BeaconProcessor { // Handle backfill sync chain segments. } else if let Some(item) = backfill_chain_segment.pop() { Some(item) - // Handle light client requests. + } else if let Some(item) = custody_backfill_queue.pop() { + Some(item) + // Handle light client requests. } else if let Some(item) = lc_gossip_finality_update_queue.pop() { Some(item) } else if let Some(item) = lc_gossip_optimistic_update_queue.pop() { @@ -1390,6 +1398,9 @@ impl BeaconProcessor { Work::ChainSegmentBackfill { .. } => { backfill_chain_segment.push(work, work_id) } + Work::CustodyBackfill { .. } => { + custody_backfill_queue.push(work, work_id) + } Work::Status { .. } => status_queue.push(work, work_id), Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id), Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id), @@ -1478,6 +1489,7 @@ impl BeaconProcessor { WorkType::ColumnReconstruction => column_reconstruction_queue.len(), WorkType::ChainSegment => chain_segment_queue.len(), WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), + WorkType::CustodyBackfill => custody_backfill_queue.len(), WorkType::Status => status_queue.len(), WorkType::BlocksByRangeRequest => blbrange_queue.len(), WorkType::BlocksByRootsRequest => blbroots_queue.len(), @@ -1631,6 +1643,7 @@ impl BeaconProcessor { task_spawner.spawn_async(work) } Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn), + Work::CustodyBackfill(process_fn) => task_spawner.spawn_async(process_fn), Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn { BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn), BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn), diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index b36f8cc2154..ccb19363c4b 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -63,6 +63,15 @@ pub struct DataColumnsByRangeRequestId { pub parent_request_id: ComponentsByRangeRequestId, } +// TODO(cgc-backfill) rename this +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyByRangeRequestId { + /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for data columns. + pub parent_request_id: CustodyByRangeParentRequestId, +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index e2c6f244058..62de312f03c 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -885,14 +885,16 @@ impl Network { } } - /// Subscribe to all data columns determined by the cgc. + /// Subscribe to all data columns determined by the cgc and return the newly subscribed columns #[instrument(parent = None, level = "trace", fields(service = "libp2p"), name = "libp2p", skip_all )] - pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) { + pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) -> Vec { + let old_columns = self.network_globals.sampling_columns(); + self.network_globals .update_data_column_subnets(custody_column_count); @@ -900,6 +902,12 @@ impl Network { let kind = GossipKind::DataColumnSidecar(column); self.subscribe_kind(kind); } + + self.network_globals + .sampling_columns() + .difference(&old_columns) + .cloned() + .collect() } /// Returns the scoring parameters for a topic if set. diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f7c3a1bf8db..b94c0fc150d 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,4 +1,5 @@ use crate::sync::manager::BlockProcessType; +use crate::sync::CustodyBackSyncBatchId; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; @@ -535,13 +536,40 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to import `data_columns` as part of custody backfill sync. + pub fn send_data_column_sidecar_list( + self: &Arc, + process_id: CustodyBackSyncBatchId, + data_column_sidecar_list: DataColumnSidecarList, + ) -> Result<(), Error> { + // TODO(cgc-backill) len is not correct + debug!(data_columns_sidecars = data_column_sidecar_list.len(), id = ?process_id, "Batch data column sidecar list sending for process"); + + let processor = self.clone(); + let process_fn = async move { + processor + .process_data_column_sidecar_list(process_id, data_column_sidecar_list) + .await; + }; + + let process_fn = Box::pin(process_fn); + + let work = Work::CustodyBackfill(process_fn); + + self.try_send(BeaconWorkEvent { + drop_during_sync: true, + work, + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, process_id: ChainSegmentProcessId, blocks: Vec>, ) -> Result<(), Error> { - let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); + let is_backfill: bool = + matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process"); let processor = self.clone(); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index cff6e26165b..b43786a459b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,14 +1,17 @@ use crate::metrics::{self, register_process_result_metrics}; use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE}; +use crate::sync::custody_sync::CustodyBatchProcessResult; use crate::sync::BatchProcessResult; use crate::sync::{ manager::{BlockProcessType, SyncMessage}, - ChainId, + ChainId, CustodyBackSyncBatchId, }; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; -use beacon_chain::data_column_verification::verify_kzg_for_data_column_list; +use beacon_chain::data_column_verification::{ + verify_kzg_for_data_column_list, verify_kzg_for_data_column_list_with_scoring, +}; use beacon_chain::{ validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -25,7 +28,10 @@ use store::KzgCommitment; use tracing::{debug, error, info, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256}; +use types::{ + BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256, + RuntimeVariableList, +}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -445,6 +451,47 @@ impl NetworkBeaconProcessor { self.chain.process_sampling_completed(block_root).await; } + /// Attempt to import the `data_column_sidecar_list` to the beacon chain. + pub async fn process_data_column_sidecar_list( + &self, + sync_id: CustodyBackSyncBatchId, + downloaded_data_column_sidecar_list: DataColumnSidecarList, + ) { + // TODO(cgc-backfill) this number isnt actually correct, we'll need to flatten + let sent_data_columns = downloaded_data_column_sidecar_list.len(); + match self.process_custody_backfill_data_columns(downloaded_data_column_sidecar_list) { + (imported_data_columns, Ok(_)) => { + // TODO(cgc-backfill) maybe log more granular details for debugging purposes + debug!( + batch_epoch = %sync_id, + keep_execution_payload = !self.chain.store.get_config().prune_payloads, + service= "custody_backfill_sync", + "Custody backfill batch processed"); + CustodyBatchProcessResult::Success { + sent_data_columns, + imported_data_columns, + } + } + (_, Err(e)) => { + debug!( + batch_epoch = %sync_id, + error = %e.message, + service = "custody_backfill_sync", + "Custody backfill batch processing failed" + ); + match e.peer_action { + Some(penalty) => CustodyBatchProcessResult::FaultyFailure { + imported_data_columns: 0, + penalty, + }, + None => CustodyBatchProcessResult::NonFaultyFailure, + } + } + }; + + // self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( @@ -596,6 +643,55 @@ impl NetworkBeaconProcessor { } } + /// Helper function to process custody backfill data columns + fn process_custody_backfill_data_columns( + &self, + downloaded_data_column_sidecar_list: DataColumnSidecarList, + ) -> (usize, Result<(), ChainSegmentFailed>) { + let total_data_column_sidecars = downloaded_data_column_sidecar_list.len(); + + let all_data_columns = RuntimeVariableList::from_vec( + downloaded_data_column_sidecar_list.clone(), + self.chain.spec.number_of_columns as usize, + ); + + // TODO(cgc-backfill) clean up, we probably dont need a match + // Attributes fault to the specific peer that sent an invalid column + match verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.chain.kzg) + { + Ok(_) => {} + Err(_) => { + // TODO(cgc-backfill) how should we handle errors here + return ( + 0, + Err(ChainSegmentFailed { + peer_action: Some(PeerAction::LowToleranceError), + message: format!("KZG verification failed"), + }), + ); + } + } + + match self + .chain + .import_historical_data_column_batch(downloaded_data_column_sidecar_list) + { + Ok(imported_data_columns) => (imported_data_columns, Ok(())), + Err(e) => { + // TODO(cgc-backfill) which errors should be low tolerance vs high tolerance + let peer_action = match &e { + beacon_chain::historical_blocks::HistoricalDataColumnError::MismatchedBlockRoot { data_column_block_root, data_column_index, expected_block_root } => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::InvalidSignature { data_column_block_root } => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::NoBlockFound { data_column_block_root } => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::IndexOutOfBounds => todo!(), + beacon_chain::historical_blocks::HistoricalDataColumnError::StoreError(error) => todo!(), + }; + + todo!() + } + } + } + /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 5d5daae4aee..c8697b0dc60 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -6,7 +6,7 @@ #![allow(clippy::unit_arg)] use crate::network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor}; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, SyncServiceMessage}; use crate::status::status_message; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -82,6 +82,7 @@ impl Router { beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, + sync_service_recv: mpsc::UnboundedReceiver, executor: task_executor::TaskExecutor, invalid_block_storage: InvalidBlockStorage, beacon_processor_send: BeaconProcessorSend, @@ -113,6 +114,7 @@ impl Router { network_send.clone(), network_beacon_processor.clone(), sync_recv, + sync_service_recv, fork_context, ); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0a6d5152322..aca5d430e48 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -5,6 +5,7 @@ use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; use crate::router::{Router, RouterMessage}; use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription}; use crate::NetworkConfig; +use beacon_chain::validator_custody::CustodyColumnBackfill; use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_processor::BeaconProcessorSend; use futures::channel::mpsc::Sender; @@ -131,6 +132,15 @@ pub enum ValidatorSubscriptionMessage { }, } +/// Messages that Lighthouse services send to communicate with the Sync service. +#[derive(Debug, Clone, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum SyncServiceMessage { + /// Custody group count changed due to a change in validators' weight. + /// Trigger data column backfill. + CustodyColumnBackfill(CustodyColumnBackfill), +} + #[derive(Clone)] pub struct NetworkSenders { network_send: mpsc::UnboundedSender>, @@ -182,6 +192,8 @@ pub struct NetworkService { /// The sending channel for the network service to send messages to be routed throughout /// lighthouse. router_send: mpsc::UnboundedSender>, + /// The sending channel for the network service to send messages to the sync service. + sync_service_send: mpsc::UnboundedSender, /// A reference to lighthouse's database to persist the DHT. store: Arc>, /// A collection of global variables, accessible outside of the network service. @@ -205,10 +217,13 @@ pub struct NetworkService { } impl NetworkService { + #[allow(clippy::too_many_arguments)] async fn build( beacon_chain: Arc>, config: Arc, executor: task_executor::TaskExecutor, + sync_service_send: mpsc::UnboundedSender, + sync_service_recv: mpsc::UnboundedReceiver, libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, ) -> Result< @@ -311,6 +326,7 @@ impl NetworkService { beacon_chain.clone(), network_globals.clone(), network_senders.network_send(), + sync_service_recv, executor.clone(), invalid_block_storage, beacon_processor_send, @@ -342,6 +358,7 @@ impl NetworkService { subnet_service, network_recv, validator_subscription_recv, + sync_service_send, router_send, store, network_globals: network_globals.clone(), @@ -366,10 +383,13 @@ impl NetworkService { libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, ) -> Result<(Arc>, NetworkSenders), String> { + let (sync_service_send, sync_service_recv) = mpsc::unbounded_channel(); let (network_service, network_globals, network_senders) = Self::build( beacon_chain, config, executor.clone(), + sync_service_send, + sync_service_recv, libp2p_registry, beacon_processor_send, ) @@ -745,7 +765,8 @@ impl NetworkService { sampling_count, } => { // subscribe to `sampling_count` subnets - self.libp2p + let columns_to_backfill = self + .libp2p .subscribe_new_data_column_subnets(sampling_count); if self .network_globals @@ -754,6 +775,13 @@ impl NetworkService { .is_none() { self.libp2p.update_enr_cgc(new_custody_group_count); + let message = + SyncServiceMessage::CustodyColumnBackfill(CustodyColumnBackfill { + columns_to_backfill, + }); + if let Err(e) = self.sync_service_send.send(message.clone()) { + tracing::error!(error = %e, ?message, "Could not send message to the syn service."); + } } } } diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index db342117473..3580e775253 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -11,6 +11,7 @@ use lighthouse_network::{Enr, GossipTopic}; use std::str::FromStr; use std::sync::Arc; use tokio::runtime::Runtime; +use tokio::sync::mpsc; use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId}; impl NetworkService { @@ -125,6 +126,7 @@ fn test_removing_topic_weight_on_old_topics() { config.discv5_config.table_filter = |_| true; // Do not ignore local IPs config.upnp_enabled = false; let config = Arc::new(config); + let (sync_service_send, sync_service_recv) = mpsc::unbounded_channel(); let beacon_processor_channels = BeaconProcessorChannels::new(&BeaconProcessorConfig::default()); @@ -132,6 +134,8 @@ fn test_removing_topic_weight_on_old_topics() { beacon_chain.clone(), config, executor.clone(), + sync_service_send, + sync_service_recv, None, beacon_processor_channels.beacon_processor_tx, ) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 035349c4757..49c20779af0 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -1186,7 +1186,7 @@ impl BackFillSync { } /// Error kind for attempting to restart the sync from beacon chain parameters. -enum ResetEpochError { +pub enum ResetEpochError { /// The chain has already completed. SyncCompleted, } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 0418ab45534..ac94275701f 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -22,7 +22,7 @@ enum ByRangeRequest { Complete(T), } -enum RangeBlockDataRequest { +pub enum RangeBlockDataRequest { NoData, Blobs(ByRangeRequest>>>), DataColumns { diff --git a/beacon_node/network/src/sync/custody_sync/custody_batch.rs b/beacon_node/network/src/sync/custody_sync/custody_batch.rs new file mode 100644 index 00000000000..da648d33ae7 --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/custody_batch.rs @@ -0,0 +1,373 @@ +use crate::sync::range_sync::BatchProcessingResult; +use crate::sync::CustodyBackSyncBatchId; +use crate::sync::{ + range_sync::{Attempt, WrongState}, + BatchOperationOutcome, +}; +use beacon_chain::BeaconChainTypes; +use fnv::FnvHashSet; +use lighthouse_network::service::api_types::{DataColumnsByRangeRequestId, Id}; +use lighthouse_network::PeerId; +use std::marker::PhantomData; +use std::{ + collections::HashSet, + hash::{Hash, Hasher}, + time::{Duration, Instant}, +}; +use strum::Display; +use types::{DataColumnSidecarList, Epoch, EthSpec, Slot}; + +/// The number of times to retry a batch before it is considered failed. +const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; + +/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed +/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. +const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; + +#[derive(Debug)] +pub struct CustodySyncBatchConfig {} + +// TODO(cgc-backfill) rename this +#[derive(Hash, PartialEq, Eq, Clone)] +pub struct CustodyByRangeParentRequestId { + pub id: u32, + pub requester: CustodyBackSyncBatchId, +} + +impl CustodySyncBatchConfig { + fn max_batch_download_attempts() -> u8 { + MAX_BATCH_DOWNLOAD_ATTEMPTS + } + fn max_batch_processing_attempts() -> u8 { + MAX_BATCH_PROCESSING_ATTEMPTS + } + + pub fn batch_attempt_hash( + data_column_sidecar_list: &DataColumnSidecarList, + ) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + data_column_sidecar_list.hash(&mut hasher); + hasher.finish() + } +} + +#[derive(Debug)] +/// A segment of a chain. +pub struct CustodyBatchInfo { + /// Start slot of the batch. + start_slot: Slot, + /// End slot of the batch. + end_slot: Slot, + /// The `Attempts` that have been made and failed to send us this batch. + failed_processing_attempts: Vec, + /// Number of processing attempts that have failed but we do not count. + non_faulty_processing_attempts: u8, + /// The number of download retries this batch has undergone due to a failed request. + failed_download_attempts: Vec>, + /// State of the batch. + state: CustodyBatchState, +} + +impl CustodyBatchInfo { + pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self { + let start_slot = start_epoch.start_slot(E::slots_per_epoch()); + let end_slot = start_slot + num_of_epochs * E::slots_per_epoch(); + Self { + start_slot, + end_slot, + failed_processing_attempts: Vec::new(), + failed_download_attempts: Vec::new(), + non_faulty_processing_attempts: 0, + state: CustodyBatchState::AwaitingDownload, + } + } + + pub fn state(&self) -> &CustodyBatchState { + &self.state + } + + /// Gives a list of peers from which this batch has had a failed download or processing + /// attempt. + pub fn failed_peers(&self) -> HashSet { + let mut peers = HashSet::with_capacity( + self.failed_processing_attempts.len() + self.failed_download_attempts.len(), + ); + + for attempt in &self.failed_processing_attempts { + peers.insert(attempt.peer_id); + } + + for peer in self.failed_download_attempts.iter().flatten() { + peers.insert(*peer); + } + + peers + } + + pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { + match self.state.poison() { + CustodyBatchState::AwaitingDownload => { + self.state = CustodyBatchState::Downloading(request_id); + Ok(()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Starting download for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Marks the batch as ready to be processed if the data columns are in the range. The number of + /// received data columns is returned, or the wrong batch end on failure + #[must_use = "Batch may have failed"] + pub fn download_completed( + &mut self, + data_columns: DataColumnSidecarList, + peer: PeerId, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Downloading(_) => { + let received = data_columns.len(); + self.state = + CustodyBatchState::AwaitingProcessing(peer, data_columns, Instant::now()); + Ok(received) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Download completed for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// Verifies if incoming data columns belong to this batch. + pub fn is_expecting_data_columns(&self, request_id: &Id) -> bool { + if let CustodyBatchState::Downloading(expected_id) = &self.state { + return expected_id == request_id; + } + false + } + + /// Mark the batch as failed and return whether we can attempt a re-download. + /// + /// This can happen if a peer disconnects or some error occurred that was not the peers fault. + /// The `peer` parameter, when set to None, does not increment the failed attempts of + /// this batch and register the peer, rather attempts a re-download. + #[must_use = "Batch may have failed"] + pub fn download_failed( + &mut self, + peer: Option, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Downloading(_) => { + // register the attempt and check if the batch can be tried again + self.failed_download_attempts.push(peer); + + self.state = if self.failed_download_attempts.len() + >= CustodySyncBatchConfig::max_batch_download_attempts() as usize + { + CustodyBatchState::Failed + } else { + // drop the blocks + CustodyBatchState::AwaitingDownload + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Download failed for batch in wrong state {:?}", + self.state + ))) + } + } + } + + /// After different operations over a batch, this could be in a state that allows it to + /// continue, or in failed state. When the batch has failed, we check if it did mainly due to + /// processing failures. In this case the batch is considered failed and faulty. + pub fn outcome(&self) -> BatchOperationOutcome { + match self.state { + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + CustodyBatchState::Failed => BatchOperationOutcome::Failed { + blacklist: self.failed_processing_attempts.len() + > self.failed_download_attempts.len(), + }, + _ => BatchOperationOutcome::Continue, + } + } + + /// Returns the peer that is currently responsible for progressing the state of the batch. + pub fn processing_peer(&self) -> Option<&PeerId> { + match &self.state { + CustodyBatchState::AwaitingDownload + | CustodyBatchState::Failed + | CustodyBatchState::Downloading(..) => None, + CustodyBatchState::AwaitingProcessing(peer_id, _, _) + | CustodyBatchState::Processing(Attempt { peer_id, .. }) + | CustodyBatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + } + } + + pub fn attempts(&self) -> &[Attempt] { + &self.failed_processing_attempts + } + + #[must_use = "Custody batch may have failed"] + pub fn processing_completed( + &mut self, + procesing_result: BatchProcessingResult, + ) -> Result { + match self.state.poison() { + CustodyBatchState::Processing(attempt) => { + self.state = match procesing_result { + BatchProcessingResult::Success => { + CustodyBatchState::AwaitingValidation(attempt) + } + BatchProcessingResult::FaultyFailure => { + // register the failed attempt + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() + >= CustodySyncBatchConfig::max_batch_processing_attempts() as usize + { + CustodyBatchState::Failed + } else { + CustodyBatchState::AwaitingDownload + } + } + BatchProcessingResult::NonFaultyFailure => { + self.non_faulty_processing_attempts = + self.non_faulty_processing_attempts.saturating_add(1); + CustodyBatchState::AwaitingDownload + } + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Procesing completed for batch in wrong state: {:?}", + self.state + ))) + } + } + } + + pub fn start_processing(&mut self) -> Result<(DataColumnSidecarList, Duration), WrongState> { + match self.state.poison() { + CustodyBatchState::AwaitingProcessing( + peer, + data_column_sidecar_list, + start_instant, + ) => { + self.state = CustodyBatchState::Processing(Attempt::new_for_custody_backfill_sync( + peer, + &data_column_sidecar_list, + )); + Ok((data_column_sidecar_list, start_instant.elapsed())) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Starting processing batch in wrong state {:?}", + self.state + ))) + } + } + } + + #[must_use = "Batch may have failed"] + pub fn validation_failed(&mut self) -> Result { + match self.state.poison() { + CustodyBatchState::AwaitingValidation(attempt) => { + self.failed_processing_attempts.push(attempt); + + // check if the batch can be downloaded again + self.state = if self.failed_processing_attempts.len() + >= CustodySyncBatchConfig::max_batch_processing_attempts() as usize + { + CustodyBatchState::Failed + } else { + CustodyBatchState::AwaitingDownload + }; + Ok(self.outcome()) + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + other => { + self.state = other; + Err(WrongState(format!( + "Validation failed for batch in wrong state: {:?}", + self.state + ))) + } + } + } +} + +impl std::fmt::Display for CustodyBatchInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Start Slot: {}, End Slot: {}, State: {}", + self.start_slot, self.end_slot, self.state + ) + } +} + +#[derive(Debug, Display)] +/// Current state of a batch +pub enum CustodyBatchState { + /// The batch has failed either downloading or processing, but can be requested again. + AwaitingDownload, + /// The batch is being downloaded. + Downloading(Id), + /// The batch has been completely downloaded and is ready for processing. + AwaitingProcessing(PeerId, DataColumnSidecarList, Instant), + /// The batch is being processed. + Processing(Attempt), + /// The batch was successfully processed and is waiting to be validated. + /// + /// It is not sufficient to process a batch successfully to consider it correct. This is + /// because batches could be erroneously empty, or incomplete. Therefore, a batch is considered + /// valid, only if the next sequential batch imports at least a block. + AwaitingValidation(Attempt), + /// Intermediate state for inner state handling. + Poisoned, + /// The batch has maxed out the allowed attempts for either downloading or processing. It + /// cannot be recovered. + Failed, +} + +impl CustodyBatchState { + /// Helper function for poisoning a state. + pub fn poison(&mut self) -> CustodyBatchState { + std::mem::replace(self, CustodyBatchState::Poisoned) + } + + /// Creates a character representation/visualization for the batch state to display in logs for quicker and + /// easier recognition + fn visualize(&self) -> char { + match self { + CustodyBatchState::Downloading(..) => 'D', + CustodyBatchState::Processing(_) => 'P', + CustodyBatchState::AwaitingValidation(_) => 'v', + CustodyBatchState::AwaitingDownload => 'd', + CustodyBatchState::Failed => 'F', + CustodyBatchState::AwaitingProcessing(..) => 'p', + CustodyBatchState::Poisoned => 'X', + } + } +} diff --git a/beacon_node/network/src/sync/custody_sync/custody_sync_manager.rs b/beacon_node/network/src/sync/custody_sync/custody_sync_manager.rs new file mode 100644 index 00000000000..dca02b70a9f --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/custody_sync_manager.rs @@ -0,0 +1,28 @@ +use crate::sync::custody_sync::CustodyBackfillSync; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::PeerAction; +use std::sync::Arc; + +struct CustodySyncManager { + /// A reference to the underlying beacon chain. + chain: Arc>, + + /// Custody Backfill syncing. + custody_backfill_sync: CustodyBackfillSync, +} + +/// The result of processing multiple data columns for custody backfill sync. +#[derive(Debug)] +pub enum CustodyBatchProcessResult { + /// The batch was completed successfully. It carries whether the sent batch contained data columns. + Success { + sent_data_columns: usize, + imported_data_columns: usize, + }, + /// The batch processing failed. It carries whether the processing imported any data columns. + FaultyFailure { + imported_data_columns: usize, + penalty: PeerAction, + }, + NonFaultyFailure, +} diff --git a/beacon_node/network/src/sync/custody_sync/mod.rs b/beacon_node/network/src/sync/custody_sync/mod.rs new file mode 100644 index 00000000000..48647942cf0 --- /dev/null +++ b/beacon_node/network/src/sync/custody_sync/mod.rs @@ -0,0 +1,933 @@ +mod custody_batch; +mod custody_sync_manager; + +use std::{ + collections::{btree_map::Entry, BTreeMap, HashSet}, + sync::Arc, + time::Instant, +}; + +use crate::sync::range_sync::WrongState; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::{ + service::api_types::{Id, RangeRequestId}, + types::BackFillState, + NetworkGlobals, PeerAction, PeerId, +}; +use logging::crit; +use tracing::{debug, error, info, instrument, warn}; +use types::{DataColumnSidecarList, Epoch, EthSpec, Slot}; + +use crate::sync::{ + backfill_sync::{BackFillError, ProcessResult, SyncStart, BACKFILL_EPOCHS_PER_BATCH}, + custody_sync::custody_batch::{CustodyBatchInfo, CustodyBatchState}, + network_context::{RpcRequestSendError, SyncNetworkContext}, + range_sync::{BatchId, BatchProcessingResult}, + BatchOperationOutcome, BatchProcessResult, +}; + +pub use custody_batch::{CustodyByRangeParentRequestId, CustodySyncBatchConfig}; +pub use custody_sync_manager::CustodyBatchProcessResult; + +/// The maximum number of batches to queue before requesting more. +const BACKFILL_BATCH_BUFFER_SIZE: u8 = 20; + +pub type CustodyBackSyncBatchId = Epoch; + +pub struct CustodyBackfillSync { + /// Keeps track of the current progress of the custody backfill. + /// This only gets refreshed from the beacon chain if we enter a failed state. + current_start: BatchId, + + /// Starting epoch of the batch that needs to be processed next. + /// This is incremented as the custody backfill advances. + processing_target: BatchId, + + /// Starting epoch of the next batch that needs to be downloaded. + to_be_downloaded: BatchId, + + /// Keeps track if we have requested the final batch. + last_batch_downloaded: bool, + + /// Sorted map of batches undergoing some kind of processing. + batches: BTreeMap>, + + /// The current processing batch, if any. + current_processing_batch: Option, + + /// Batches validated. + validated_batches: u64, + + /// We keep track of peers that are participating in the backfill sync. CustodySync + /// only use peers that custody the columns we'd like to backfill. If CustodySync fails, we don't + /// want to penalize all our synced peers, so we use this variable to keep track of peers that + /// have participated and only penalize these peers if custody sync fails. + participating_peers: HashSet, + + /// When a custody backfill sync fails, we keep track of whether a new fully synced peer has joined. + /// This signifies that we are able to attempt to restart a failed chain. + restart_failed_sync: bool, + + /// Reference to the beacon chain to obtain initial starting points for the backfill sync. + beacon_chain: Arc>, + + /// Reference to the network globals in order to obtain valid peers to backfill blocks from + /// (i.e synced peers). + network_globals: Arc>, + + /// The data column indices we're looking to backfill. + data_column_indices: Vec, +} + +impl CustodyBackfillSync { + #[instrument(parent = None, + name = "custody_backfill_sync", + skip_all + )] + pub fn new( + beacon_chain: Arc>, + current_start: Epoch, + data_column_indices: Vec, + network_globals: Arc>, + ) -> Self { + let cbs = CustodyBackfillSync { + batches: BTreeMap::new(), + processing_target: current_start, + current_start, + last_batch_downloaded: false, + to_be_downloaded: current_start, + network_globals, + current_processing_batch: None, + validated_batches: 0, + participating_peers: HashSet::new(), + restart_failed_sync: false, + beacon_chain, + data_column_indices, + }; + + // Update the global network state with the current backfill state. + cbs.set_state(BackFillState::Paused); + cbs + } + + /// Starts or resumes syncing. + /// + /// If resuming is successful, reports back the current syncing metrics. + #[must_use = "A failure here indicates the custody backfill sync has failed and the global sync state should be updated"] + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + pub fn start( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + match self.state() { + BackFillState::Syncing => {} // already syncing ignore. + BackFillState::Paused => { + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + // TODO(cgc-backfill) get peers that we can sync with + // If there are peers to resume with, begin the resume. + debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming custody backfill sync"); + self.set_state(BackFillState::Syncing); + // Resume any previously failed batches. + self.resume_batches(network)?; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } + } + BackFillState::Failed => { + // Attempt to recover from a failed sync. All local variables should be reset and + // cleared already for a fresh start. + // We only attempt to restart a failed custody backfill sync if a new synced peer has been + // added. + if !self.restart_failed_sync { + return Ok(SyncStart::NotSyncing); + } + + self.set_state(BackFillState::Syncing); + + // TODO(cgc-backfill) we could try figuring out what epoch to start at again, or we can just + // live with potentially requesting redundant data columns + + debug!(start_epoch = %self.current_start, "Resuming a failed custody backfill sync"); + + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; + } + BackFillState::Completed => return Ok(SyncStart::NotSyncing), + } + + Ok(SyncStart::Syncing { + completed: (self.validated_batches + * BACKFILL_EPOCHS_PER_BATCH + * T::EthSpec::slots_per_epoch()) as usize, + remaining: self + .current_start + .start_slot(T::EthSpec::slots_per_epoch()) + .saturating_sub(self.beacon_chain.genesis_backfill_slot) + .as_usize(), + }) + } + + /// When resuming a chain, this function searches for batches that need to be re-downloaded and + /// transitions their state to redownload the batch. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn resume_batches(&mut self, network: &mut SyncNetworkContext) -> Result<(), BackFillError> { + let batch_ids_to_retry = self + .batches + .iter() + .filter_map(|(batch_id, batch)| { + // In principle there should only ever be on of these, and we could terminate the + // loop early, however the processing is negligible and we continue the search + // for robustness to handle potential future modification + if matches!(batch.state(), CustodyBatchState::AwaitingDownload) { + Some(*batch_id) + } else { + None + } + }) + .collect::>(); + + for batch_id in batch_ids_to_retry { + self.send_batch(network, batch_id)?; + } + Ok(()) + } + + /// Requests the batch assigned to the given id from a given peer. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn send_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), BackFillError> { + if let Some(batch) = self.batches.get_mut(&batch_id) { + let synced_peers = self + .network_globals + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); + + let failed_peers = batch.failed_peers(); + + // TODO(cgc-backfill) calculate start slot + // match network.dat + match network.custody_sync_data_columns_by_range_request( + Slot::new(0), + T::EthSpec::slots_per_epoch(), + &self.data_column_indices, + batch_id, + &synced_peers, + &failed_peers, + ) { + Ok(request_id) => { + debug!(epoch = %batch_id, %batch, "Requesting batch"); + if let Err(e) = batch.start_downloading(request_id) { + return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + } + } + Err(e) => match e { + RpcRequestSendError::NoPeer(no_peer_error) => { + // If we are here the chain has no more synced peers + info!( + "reason" = format!("insufficient_synced_peers({no_peer_error:?})"), + "Custody backfill sync paused" + ); + self.set_state(BackFillState::Paused); + return Err(BackFillError::Paused); + } + RpcRequestSendError::InternalError(e) => { + // NOTE: under normal conditions this shouldn't happen but we handle it anyway + warn!(%batch_id, error = ?e, %batch, "Could not send batch request"); + // register the failed download and check if the batch can be retried + if let Err(e) = batch.start_downloading(1) { + return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); + } + + match batch.download_failed(None) { + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.send_batch(network, batch_id) + } + } + } + }, + } + }; + + Ok(()) + } + + /// The syncing process has failed. + /// + /// This resets past variables, to allow for a fresh start when resuming. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn fail_sync(&mut self, error: BackFillError) -> Result<(), BackFillError> { + // Some errors shouldn't fail the chain. + if matches!(error, BackFillError::Paused) { + return Ok(()); + } + + // Set the state + self.set_state(BackFillState::Failed); + // Remove all batches and active requests and participating peers. + self.batches.clear(); + self.participating_peers.clear(); + self.restart_failed_sync = false; + + // Reset all downloading and processing targets + self.processing_target = self.current_start; + self.to_be_downloaded = self.current_start; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + + // NOTE: Lets keep validated_batches for posterity + + // Emit the log here + error!(?error, "Custody backfill sync failed"); + + // Return the error, kinda weird pattern, but I want to use + // `self.fail_chain(_)?` in other parts of the code. + Err(error) + } + + /// Attempts to request the next required batches from the peer pool if custody backfill is in syncing state. It will exhaust the peer + /// pool and left over batches until the batch buffer is reached or all peers are exhausted. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result<(), BackFillError> { + if !matches!(self.state(), BackFillState::Syncing) { + return Ok(()); + } + + // find the next pending batch and request it from the peer + // Note: for this function to not infinite loop we must: + // - If `include_next_batch` returns Some we MUST increase the count of batches that are + // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of + // that function. + while let Some(batch_id) = self.include_next_batch(network) { + // send the batch + self.send_batch(network, batch_id)?; + } + + // No more batches, simply stop + Ok(()) + } + + /// Processes the next ready batch. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn process_completed_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + // Only process batches if backfill is syncing and only process one batch at a time + if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { + return Ok(ProcessResult::Successful); + } + + // Find the id of the batch we are going to process. + if let Some(batch) = self.batches.get(&self.processing_target) { + let state = batch.state(); + match state { + CustodyBatchState::AwaitingProcessing(..) => { + return self.process_batch(network, self.processing_target); + } + CustodyBatchState::Downloading(..) => { + // Batch is not ready, nothing to process + } + CustodyBatchState::Poisoned => unreachable!("Poisoned batch"), + CustodyBatchState::Failed + | CustodyBatchState::AwaitingDownload + | CustodyBatchState::Processing(_) => { + // these are all inconsistent states: + // - Failed -> non recoverable batch. Chain should have been removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. + // - Processing -> `self.current_processing_batch` is None + self.fail_sync(BackFillError::InvalidSyncState(String::from( + "Invalid expected batch state", + )))?; + return Ok(ProcessResult::Successful); + } + CustodyBatchState::AwaitingValidation(_) => { + // TODO: I don't think this state is possible, log a CRIT just in case. + // If this is not observed, add it to the failed state branch above. + crit!( + batch = ?self.processing_target, + "Chain encountered a robust batch awaiting validation" + ); + + self.processing_target -= BACKFILL_EPOCHS_PER_BATCH; + if self.to_be_downloaded >= self.processing_target { + self.to_be_downloaded = self.processing_target - BACKFILL_EPOCHS_PER_BATCH; + } + self.request_batches(network)?; + } + } + } else { + self.fail_sync(BackFillError::InvalidSyncState(format!( + "Batch not found for current processing target {}", + self.processing_target + )))?; + return Ok(ProcessResult::Successful); + } + Ok(ProcessResult::Successful) + } + + /// Processes the batch with the given id. + /// The batch must exist and be ready for processing + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn process_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result { + // Only process batches if this chain is Syncing, and only one at a time + if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { + return Ok(ProcessResult::Successful); + } + + let Some(batch) = self.batches.get_mut(&batch_id) else { + return self + .fail_sync(BackFillError::InvalidSyncState(format!( + "Trying to process a batch that does not exist: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + }; + + // NOTE: We send empty batches to the processor in order to trigger the block processor + // result callback. This is done, because an empty batch could end a chain and the logic + // for removing chains and checking completion is in the callback. + + let (data_column_sidecar_list, _) = match batch.start_processing() { + Err(e) => { + return self + .fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) + .map(|_| ProcessResult::Successful) + } + Ok(v) => v, + }; + + let process_id = batch_id as CustodyBackSyncBatchId; + self.current_processing_batch = Some(batch_id); + + if let Err(e) = network + .beacon_processor() + .send_data_column_sidecar_list(process_id, data_column_sidecar_list) + { + crit!( + msg = "process_batch", + error = %e, + batch = ?self.processing_target, + "Failed to send custody backfill segment to processor." + ); + // This is unlikely to happen but it would stall syncing since the batch now has no + // blocks to continue, and the chain is expecting a processing result that won't + // arrive. To mitigate this, (fake) fail this processing so that the batch is + // re-downloaded. + self.on_batch_process_result( + network, + batch_id, + &CustodyBatchProcessResult::NonFaultyFailure, + ) + } else { + Ok(ProcessResult::Successful) + } + } + + /// The block processor has completed processing a batch. This function handles the result + /// of the batch processor. + /// If an error is returned the Custody BackFill sync has failed. + #[instrument(parent = None, + level = "info", + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + #[must_use = "A failure here indicates the custody backfill sync has failed and the global sync state should be updated"] + pub fn on_batch_process_result( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + result: &CustodyBatchProcessResult, + ) -> Result { + // The first two cases are possible in regular sync, should not occur in backfill, but we + // keep this logic for handling potential processing race conditions. + // result + let batch = match &self.current_processing_batch { + Some(processing_id) if *processing_id != batch_id => { + debug!( + batch_epoch = %batch_id.as_u64(), + expected_batch_epoch = processing_id.as_u64(), + "Unexpected batch result" + ); + return Ok(ProcessResult::Successful); + } + None => { + debug!(%batch_id, "Chain was not expecting a batch result"); + return Ok(ProcessResult::Successful); + } + _ => { + // batch_id matches, continue + self.current_processing_batch = None; + + match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => { + // This is an error. Fail the sync algorithm. + return self + .fail_sync(BackFillError::InvalidSyncState(format!( + "Current processing batch not found: {}", + batch_id + ))) + .map(|_| ProcessResult::Successful); + } + } + } + }; + + let Some(peer) = batch.processing_peer() else { + self.fail_sync(BackFillError::BatchInvalidState( + batch_id, + String::from("Peer does not exist"), + ))?; + return Ok(ProcessResult::Successful); + }; + + debug!( + ?result, + %batch, + batch_epoch = %batch_id, + %peer, + client = %network.client_type(peer), + "Custody backfill batch processed" + ); + + match result { + CustodyBatchProcessResult::Success { + imported_data_columns, + .. + } => { + // TODO(cgc-backfill) + todo!() + } + CustodyBatchProcessResult::FaultyFailure { + imported_data_columns, + penalty, + } => { + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { + Err(e) => { + // Batch was in the wrong state + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) + .map(|_| ProcessResult::Successful) + } + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + // check that we have not exceeded the re-process retry counter + // If a batch has exceeded the invalid batch lookup attempts limit, it means + // that it is likely all peers are sending invalid batches + // repeatedly and are either malicious or faulty. We stop custody backfill sync and + // report all synced peers that have participated. + warn!( + score_adjustment = %penalty, + batch_epoch = %batch_id, + "Custody backfill batch failed to download. Penalizing peers" + ); + + for peer in self.participating_peers.drain() { + // TODO(das): `participating_peers` only includes block peers. Should we + // penalize the custody column peers too? + network.report_peer(peer, *penalty, "backfill_batch_failed"); + } + self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) + .map(|_| ProcessResult::Successful) + } + + Ok(BatchOperationOutcome::Continue) => { + // chain can continue. Check if it can be progressed + if *imported_data_columns > 0 { + // At least one block was successfully verified and imported, then we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.advance(network, batch_id); + } + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch_id) + .map(|_| ProcessResult::Successful) + } + } + } + CustodyBatchProcessResult::NonFaultyFailure => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure) + { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; + } + self.send_batch(network, batch_id)?; + Ok(ProcessResult::Successful) + } + } + } + + /// An invalid batch has been received that could not be processed, but that can be retried. + /// + /// These events occur when a peer has successfully responded with data columns, but the data columns we + /// have received are incorrect or invalid. This indicates the peer has not performed as + /// intended and can result in down scoring a peer. + #[instrument(parent = None, + level = "info", + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn handle_invalid_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + ) -> Result<(), BackFillError> { + // The current batch could not be processed, indicating either the current or previous + // batches are invalid. + + // TODO(cgc-backfill) update comments + // The previous batch could be incomplete due to the block sizes being too large to fit in + // a single RPC request or there could be consecutive empty batches which are not supposed + // to be there + + // The current (sub-optimal) strategy is to simply re-request all batches that could + // potentially be faulty. If a batch returns a different result than the original and + // results in successful processing, we downvote the original peer that sent us the batch. + + // this is our robust `processing_target`. All previous batches must be awaiting + // validation + let mut redownload_queue = Vec::new(); + + for (id, batch) in self + .batches + .iter_mut() + .filter(|(&id, _batch)| id > batch_id) + { + match batch + .validation_failed() + .map_err(|e| BackFillError::BatchInvalidState(batch_id, e.0))? + { + BatchOperationOutcome::Failed { blacklist: _ } => { + // Batch has failed and cannot be redownloaded. + return self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)); + } + BatchOperationOutcome::Continue => { + redownload_queue.push(*id); + } + } + } + + // no batch maxed out it process attempts, so now the chain's volatile progress must be + // reset + self.processing_target = self.current_start; + + for id in redownload_queue { + self.send_batch(network, id)?; + } + // finally, re-request the failed batch. + self.send_batch(network, batch_id) + } + + // TODO(cgc-backfill) update comments + /// Removes any batches previous to the given `validating_epoch` and updates the current + /// boundaries of the chain. + /// + /// The `validating_epoch` must align with batch boundaries. + /// + /// If a previous batch has been validated and it had been re-processed, penalize the original + /// peer. + #[instrument(parent = None, + level = "info", + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn advance(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { + // make sure this epoch produces an advancement + if validating_epoch >= self.current_start { + return; + } + + // We can now validate higher batches that the current batch. Here we remove all + // batches that are higher than the current batch. We add on an extra + // `BACKFILL_EPOCHS_PER_BATCH` as `split_off` is inclusive. + let removed_batches = self + .batches + .split_off(&(validating_epoch + BACKFILL_EPOCHS_PER_BATCH)); + + for (id, batch) in removed_batches.into_iter() { + self.validated_batches = self.validated_batches.saturating_add(1); + // only for batches awaiting validation can we be sure the last attempt is + // right, and thus, that any different attempt is wrong + match batch.state() { + CustodyBatchState::AwaitingValidation(ref processed_attempt) => { + for attempt in batch.attempts() { + // The validated batch has been re-processed + if attempt.hash != processed_attempt.hash { + // The re-downloaded version was different. + if processed_attempt.peer_id != attempt.peer_id { + // A different peer sent the correct batch, the previous peer did not + // We negatively score the original peer. + let action = PeerAction::LowToleranceError; + debug!( + batch_epoch = ?id, + score_adjustment = %action, + original_peer = %attempt.peer_id, + new_peer = %processed_attempt.peer_id, + "Re-processed batch validated. Scoring original peer" + ); + network.report_peer( + attempt.peer_id, + action, + "custody_backfill_reprocessed_original_peer", + ); + } else { + // The same peer corrected it's previous mistake. There was an error, so we + // negative score the original peer. + let action = PeerAction::MidToleranceError; + debug!( + batch_epoch = ?id, + score_adjustment = %action, + original_peer = %attempt.peer_id, + new_peer = %processed_attempt.peer_id, + "Re-processed batch validated by the same peer" + ); + network.report_peer( + attempt.peer_id, + action, + "custody_backfill_reprocessed_same_peer", + ); + } + } + } + } + CustodyBatchState::Downloading(..) => {} + CustodyBatchState::Failed + | CustodyBatchState::Poisoned + | CustodyBatchState::AwaitingDownload => { + crit!("batch indicates inconsistent chain state while advancing custody backfill sync") + } + CustodyBatchState::AwaitingProcessing(..) => {} + CustodyBatchState::Processing(_) => { + debug!(batch = %id, %batch, "Advancing custody backfill sync while processing a batch"); + if let Some(processing_id) = self.current_processing_batch { + if id >= processing_id { + self.current_processing_batch = None; + } + } + } + } + } + + self.processing_target = self.processing_target.min(validating_epoch); + self.current_start = validating_epoch; + self.to_be_downloaded = self.to_be_downloaded.min(validating_epoch); + if self.batches.contains_key(&self.to_be_downloaded) { + // if custody backfill is advanced by Range beyond the previous `self.to_be_downloaded`, we + // won't have this batch, so we need to request it. + self.to_be_downloaded -= BACKFILL_EPOCHS_PER_BATCH; + } + debug!(?validating_epoch, processing_target = ?self.processing_target, "Custody backfill advanced"); + } + + /// Creates the next required batch from the chain. If there are no more batches required, + /// `false` is returned. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { + // TODO(cgc-backfill) make sure were stopping at the DA window as per + // the comment below + // don't request batches beyond the data availability window; + if self.last_batch_downloaded { + return None; + } + + // only request batches up to the buffer size limit + // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync + // if the current processing window is contained in a long range of skip slots. + let in_buffer = |batch: &CustodyBatchInfo| { + matches!( + batch.state(), + CustodyBatchState::Downloading(..) | CustodyBatchState::AwaitingProcessing(..) + ) + }; + if self + .batches + .iter() + .filter(|&(_epoch, batch)| in_buffer(batch)) + .count() + > BACKFILL_BATCH_BUFFER_SIZE as usize + { + return None; + } + + let batch_id = self.to_be_downloaded; + // this batch could have been included already being an optimistic batch + match self.batches.entry(batch_id) { + Entry::Occupied(_) => { + // this batch doesn't need downloading, let this same function decide the next batch + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); + self.include_next_batch(network) + } + Entry::Vacant(entry) => { + let batch_type = network.batch_type(batch_id); + entry.insert(CustodyBatchInfo::new(&batch_id, BACKFILL_EPOCHS_PER_BATCH)); + if self.would_complete(batch_id) { + self.last_batch_downloaded = true; + } + self.to_be_downloaded = self + .to_be_downloaded + .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); + Some(batch_id) + } + } + } + + /// Data columns have been received for this batch. + /// If the data column(s) correctly complete the batch it will be processed if possible. + /// If this returns an error, the custody backfill sync has failed and will be restarted once new peers + /// join the system. + /// The sync manager should update the global sync state on failure. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + #[must_use = "A failure here indicates the custdy backfill sync has failed and the global sync state should be updated"] + pub fn on_block_response( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + peer_id: &PeerId, + request_id: Id, + data_columns: DataColumnSidecarList, + ) -> Result { + // check if we have this batch + let Some(batch) = self.batches.get_mut(&batch_id) else { + if !matches!(self.state(), BackFillState::Failed) { + // A batch might get removed when custody backfill sync advances, so this is non fatal. + debug!(epoch = %batch_id, "Received a data column for unknown batch"); + } + return Ok(ProcessResult::Successful); + }; + + // A batch could be retried without the peer failing the request (disconnecting/ + // sending an error /timeout) if the peer is removed for other + // reasons. Check that the data columns belong to the expected peer, and that the + // request_id matches + if !batch.is_expecting_data_columns(&request_id) { + return Ok(ProcessResult::Successful); + } + + match batch.download_completed(data_columns, *peer_id) { + Ok(received) => { + let awaiting_batches = + self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; + debug!( + epoch = %batch_id, + blocks = received, + %awaiting_batches, + "Completed batch received" + ); + + // pre-emptively request more data columns from peers whilst we process current data columns, + self.request_batches(network)?; + self.process_completed_batches(network) + } + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; + Ok(ProcessResult::Successful) + } + } + } + + /// Checks if custody backfill would complete by syncing to `start_epoch`. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn would_complete(&self, _start_epoch: Epoch) -> bool { + // TODO(cgc-backfill) this should return true if start + // start epoch == DA window + false + } + + /// Updates the global network state indicating the current state of a backfill sync. + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn set_state(&self, state: BackFillState) { + *self.network_globals.backfill_state.write() = state; + } + + #[instrument(parent = None, + fields(service = "custody_backfill_sync"), + name = "custody_backfill_sync", + skip_all + )] + fn state(&self) -> BackFillState { + self.network_globals.backfill_state.read().clone() + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d11a18ed0ae..0f3d0ba5a98 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,7 +42,7 @@ use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, SyncServiceMessage}; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, @@ -234,6 +234,9 @@ pub struct SyncManager { /// A receiving channel sent by the message processor thread. input_channel: mpsc::UnboundedReceiver>, + /// A receiving channel sent by external lighthouse services. + service_channel: mpsc::UnboundedReceiver, + /// A network context to contact the network service. network: SyncNetworkContext, @@ -261,6 +264,7 @@ pub fn spawn( network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, + sync_service_recv: mpsc::UnboundedReceiver, fork_context: Arc, ) { assert!( @@ -274,6 +278,7 @@ pub fn spawn( network_send, beacon_processor, sync_recv, + sync_service_recv, SamplingConfig::Default, fork_context, ); @@ -296,6 +301,7 @@ impl SyncManager { network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, + sync_service_recv: mpsc::UnboundedReceiver, sampling_config: SamplingConfig, fork_context: Arc, ) -> Self { @@ -303,6 +309,7 @@ impl SyncManager { Self { chain: beacon_chain.clone(), input_channel: sync_recv, + service_channel: sync_service_recv, network: SyncNetworkContext::new( network_send, beacon_processor.clone(), @@ -310,7 +317,7 @@ impl SyncManager { fork_context.clone(), ), range_sync: RangeSync::new(beacon_chain.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals), + backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals.clone()), block_lookups: BlockLookups::new(), notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, @@ -736,6 +743,9 @@ impl SyncManager { Some(sync_message) = self.input_channel.recv() => { self.handle_message(sync_message); }, + Some(service_message) = self.service_channel.recv() => { + self.handle_service_message(service_message) + } Some(engine_state) = check_ee_stream.next(), if check_ee => { self.handle_new_execution_engine_state(engine_state); } @@ -752,6 +762,12 @@ impl SyncManager { } } + pub(crate) fn handle_service_message(&mut self, sync_service_message: SyncServiceMessage) { + match sync_service_message { + SyncServiceMessage::CustodyColumnBackfill(_custody_count_backfill) => {} + } + } + pub(crate) fn handle_message(&mut self, sync_message: SyncMessage) { match sync_message { SyncMessage::AddPeer(peer_id, info) => { diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 0f5fd6fb9f1..124d8a32a0f 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -4,6 +4,7 @@ mod backfill_sync; mod block_lookups; mod block_sidecar_coupling; +pub mod custody_sync; pub mod manager; mod network_context; mod peer_sampling; @@ -12,6 +13,10 @@ mod range_sync; #[cfg(test)] mod tests; +pub use custody_sync::{ + CustodyBackSyncBatchId, CustodyByRangeParentRequestId, CustodySyncBatchConfig, +}; + pub use lighthouse_network::service::api_types::SamplingId; pub use manager::{BatchProcessResult, SyncMessage}; pub use range_sync::{BatchOperationOutcome, ChainId}; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d0e62e4ada7..537f726c3b7 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -15,6 +15,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; +use crate::sync::{CustodyBackSyncBatchId, CustodyByRangeParentRequestId}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; @@ -24,8 +25,8 @@ use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, Req pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + CustodyByRangeRequest, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, + DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -198,6 +199,10 @@ pub struct SyncNetworkContext { /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, + /// Mapping of active custody column requests for a block root + custody_by_range_requests: + FnvHashMap>, + /// BlocksByRange requests paired with other ByRange requests for data components components_by_range_requests: FnvHashMap>, @@ -283,6 +288,7 @@ impl SyncNetworkContext { blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), custody_by_root_requests: <_>::default(), + custody_by_range_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -311,6 +317,8 @@ impl SyncNetworkContext { data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, + // custody_by_range_requests is a meta request for the custody sync service + custody_by_range_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, execution_engine_state: _, @@ -415,6 +423,8 @@ impl SyncNetworkContext { data_columns_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, + // custody_by_range_requests is a meta request for the custody sync service + custody_by_range_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, execution_engine_state: _, @@ -441,6 +451,61 @@ impl SyncNetworkContext { active_request_count_by_peer } + pub fn custody_sync_data_columns_by_range_request( + &mut self, + start_slot: Slot, + count: u64, + column_indices: &[ColumnIndex], + requester: CustodyBackSyncBatchId, + peers: &HashSet, + peers_to_deprioritize: &HashSet, + ) -> Result { + // Create the overall components_by_range request ID before its individual components + let id = CustodyByRangeParentRequestId { + id: self.next_id(), + requester, + }; + + let active_request_count_by_peer = self.active_request_count_by_peer(); + let column_indices = column_indices.into_iter().cloned().collect(); + // Attempt to find all required custody peers before sending any request or creating an ID + let columns_by_range_peers_to_request = { + Some(self.select_columns_by_range_peers_to_request( + &column_indices, + peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + }; + + let data_column_requests = columns_by_range_peers_to_request + .map(|columns_by_range_peers_to_request| { + columns_by_range_peers_to_request + .into_iter() + .map(|(peer_id, columns)| { + self.send_data_columns_by_range_custody_request( + peer_id, + DataColumnsByRangeRequest { + start_slot: start_slot.into(), + count, + columns, + }, + id.clone(), + ) + }) + .collect::, _>>() + }) + .transpose()?; + + if let Some(data_column_requests) = data_column_requests { + let request = DataColumnsByRangeRequestItems {}; + self.custody_by_range_requests + .insert(id.clone(), data_column_requests); + } + + Ok(id.id) + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, @@ -1070,6 +1135,45 @@ impl SyncNetworkContext { Ok(id) } + fn send_data_columns_by_range_custody_request( + &mut self, + peer_id: PeerId, + request: DataColumnsByRangeRequest, + parent_request_id: CustodyByRangeParentRequestId, + ) -> Result { + let id = CustodyByRangeRequest { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "DataColumnsByRange", + slots = request.count, + epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + columns = ?request.columns, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.data_columns_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + DataColumnsByRangeRequestItems::new(request), + ); + Ok(id) + } + fn send_data_columns_by_range_request( &mut self, peer_id: PeerId, diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index f4d010b881e..4a757060cee 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -55,7 +55,7 @@ pub enum Error { }, } -struct ActiveBatchColumnsRequest { +pub struct ActiveBatchColumnsRequest { indices: Vec, } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 264f83ee820..3217905484a 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -8,7 +8,9 @@ use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::time::{Duration, Instant}; use strum::Display; -use types::{Epoch, EthSpec, Slot}; +use types::{DataColumnSidecarList, Epoch, EthSpec, Slot}; + +use crate::sync::CustodySyncBatchConfig; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; @@ -454,6 +456,14 @@ impl Attempt { let hash = B::batch_attempt_hash(blocks); Attempt { peer_id, hash } } + + pub fn new_for_custody_backfill_sync( + peer_id: PeerId, + data_column_sidecar_list: &DataColumnSidecarList, + ) -> Self { + let hash = CustodySyncBatchConfig::batch_attempt_hash(data_column_sidecar_list); + Attempt { peer_id, hash } + } } impl std::fmt::Debug for BatchState { diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 8f881fba90f..9b09cd0b790 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,8 +8,8 @@ mod range; mod sync_type; pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, + Attempt, BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + ByRangeRequestType, WrongState, }; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; #[cfg(test)] diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a2c359c87e7..b6ee1e0c669 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -124,6 +124,7 @@ impl TestRig { beacon_processor.into(), // Pass empty recv not tied to any tx mpsc::unbounded_channel().1, + mpsc::unbounded_channel().1, SamplingConfig::Custom { required_successes: vec![SAMPLING_REQUIRED_SUCCESSES], }, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index fa1e057765e..a58a4658fef 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -366,6 +366,9 @@ impl TestRig { RangeRequestId::BackfillSync { batch_id } => { ChainSegmentProcessId::BackSyncBatchId(batch_id) } + RangeRequestId::CustodySync { batch_id } => { + ChainSegmentProcessId::CustodyBackSyncBatchId(batch_id) + } }; self.find_and_complete_processing_chain_segment(id); diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4d94042b5b0..44830bb64c6 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -922,6 +922,19 @@ impl, Cold: ItemStore> HotColdDB )); } + pub fn data_column_as_kv_store_ops( + &self, + block_root: &Hash256, + data_column: Arc>, + ops: &mut Vec, + ) { + ops.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconDataColumn, + get_data_column_key(block_root, &data_column.index), + data_column.as_ssz_bytes(), + )); + } + pub fn put_data_columns( &self, block_root: &Hash256, @@ -3538,7 +3551,7 @@ pub fn get_ancestor_state_root<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStor .get_cold_state_root(target_slot) .map_err(Box::new) .map_err(StateSummaryIteratorError::LoadStateRootError)? - .ok_or_else(|| StateSummaryIteratorError::MissingStateRoot { + .ok_or(StateSummaryIteratorError::MissingStateRoot { target_slot, state_upper_limit, }); diff --git a/common/eth2/src/lighthouse/sync_state.rs b/common/eth2/src/lighthouse/sync_state.rs index 0327f7073fa..8a679f26261 100644 --- a/common/eth2/src/lighthouse/sync_state.rs +++ b/common/eth2/src/lighthouse/sync_state.rs @@ -15,6 +15,10 @@ pub enum SyncState { /// specified by its peers. Once completed, the node enters this sync state and attempts to /// download all required historical blocks. BackFillSyncing { completed: usize, remaining: usize }, + /// The node is undertaking a custody backfill sync. This occurs when the custody group count + /// for a validator connected to this node changes. The node attempts to download its newly required + /// custody columns from the current head state up to the data availability window. + CustodyBackFillSyncing { completed: usize, remaining: usize }, /// The node has completed syncing a finalized chain and is in the process of re-evaluating /// which sync state to progress to. SyncTransition, @@ -65,8 +69,8 @@ impl SyncState { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => true, SyncState::SyncTransition => true, - // Backfill doesn't effect any logic, we consider this state, not syncing. - SyncState::BackFillSyncing { .. } => false, + // Both types of backfill sync do not effect any logic, we consider these states as not syncing. + SyncState::BackFillSyncing { .. } | SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -78,6 +82,7 @@ impl SyncState { SyncState::SyncingHead { .. } => false, SyncState::SyncTransition => false, SyncState::BackFillSyncing { .. } => false, + SyncState::CustodyBackFillSyncing { .. } => false, SyncState::Synced => false, SyncState::Stalled => false, } @@ -108,6 +113,9 @@ impl std::fmt::Display for SyncState { SyncState::Stalled => write!(f, "Stalled"), SyncState::SyncTransition => write!(f, "Evaluating known peers"), SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"), + SyncState::CustodyBackFillSyncing { .. } => { + write!(f, "Syncing newly required custody data columns") + } } } }