From bf03286a75b72d4053b1e95d3e2d685797f95472 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 18 Mar 2025 16:37:14 +1100 Subject: [PATCH 1/3] Add `MAX_BLOBS_PER_BLOCK_FULU` config. --- .../chiado/config.yaml | 3 +- .../gnosis/config.yaml | 3 +- .../holesky/config.yaml | 3 +- .../mainnet/config.yaml | 3 +- .../sepolia/config.yaml | 3 +- consensus/types/src/chain_spec.rs | 29 ++++++++++++++++++- 6 files changed, 38 insertions(+), 6 deletions(-) diff --git a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml index 1455ec5f637..7256af8400d 100644 --- a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml @@ -150,9 +150,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 2 # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml index 9ff5a161980..42d3d968da1 100644 --- a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml @@ -121,9 +121,10 @@ BLOB_SIDECAR_SUBNET_COUNT: 6 # `uint64(6)` MAX_BLOBS_PER_BLOCK: 6 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml index e5f38b8c9b5..d8e2d658ff5 100644 --- a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml @@ -139,9 +139,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9 # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index 74fe7278674..6b2e82cbd0f 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -142,9 +142,10 @@ BLOB_SIDECAR_SUBNET_COUNT: 6 # `uint64(6)` MAX_BLOBS_PER_BLOCK: 6 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml index af783322055..9323192c82b 100644 --- a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml @@ -140,9 +140,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9 # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 1650001db61..333b9544dfd 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -241,6 +241,11 @@ pub struct ChainSpec { blob_sidecar_subnet_count_electra: u64, max_request_blob_sidecars_electra: u64, + /* + * Networking Fulu + */ + max_blobs_per_block_fulu: u64, + /* * Networking Derived * @@ -656,7 +661,9 @@ impl ChainSpec { /// Return the value of `MAX_BLOBS_PER_BLOCK` appropriate for `fork`. pub fn max_blobs_per_block_by_fork(&self, fork_name: ForkName) -> u64 { - if fork_name.electra_enabled() { + if fork_name.fulu_enabled() { + self.max_blobs_per_block_fulu + } else if fork_name.electra_enabled() { self.max_blobs_per_block_electra } else { self.max_blobs_per_block @@ -965,6 +972,11 @@ impl ChainSpec { blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(), max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(), + /* + * Networking Fulu specific + */ + max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(), + /* * Application specific */ @@ -1295,6 +1307,11 @@ impl ChainSpec { blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(), max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(), + /* + * Networking Fulu specific + */ + max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(), + /* * Application specific */ @@ -1517,6 +1534,9 @@ pub struct Config { #[serde(default = "default_custody_requirement")] #[serde(with = "serde_utils::quoted_u64")] custody_requirement: u64, + #[serde(default = "default_max_blobs_per_block_fulu")] + #[serde(with = "serde_utils::quoted_u64")] + max_blobs_per_block_fulu: u64, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -1658,6 +1678,10 @@ const fn default_max_blobs_per_block_electra() -> u64 { 9 } +const fn default_max_blobs_per_block_fulu() -> u64 { + 12 +} + const fn default_attestation_propagation_slot_range() -> u64 { 32 } @@ -1886,6 +1910,7 @@ impl Config { data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count, samples_per_slot: spec.samples_per_slot, custody_requirement: spec.custody_requirement, + max_blobs_per_block_fulu: spec.max_blobs_per_block_fulu, } } @@ -1965,6 +1990,7 @@ impl Config { data_column_sidecar_subnet_count, samples_per_slot, custody_requirement, + max_blobs_per_block_fulu, } = self; if preset_base != E::spec_name().to_string().as_str() { @@ -2048,6 +2074,7 @@ impl Config { data_column_sidecar_subnet_count, samples_per_slot, custody_requirement, + max_blobs_per_block_fulu, ..chain_spec.clone() }) From b9cd495a8e7e61bda90a6f0777f164d82b356202 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 21 Mar 2025 17:16:40 +1100 Subject: [PATCH 2/3] Fix failing tests. --- .../src/network_beacon_processor/tests.rs | 132 ++++++++++++++---- 1 file changed, 105 insertions(+), 27 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 69ba5c1dbd9..28641bdb37e 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -9,11 +9,14 @@ use crate::{ sync::{manager::BlockProcessType, SyncMessage}, }; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; use beacon_chain::test_utils::{ - test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, + get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, + EphemeralHarnessType, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; +use itertools::Itertools; use lighthouse_network::discovery::ConnectionId; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; use lighthouse_network::rpc::{RequestId, SubstreamId}; @@ -30,9 +33,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, - SubnetId, + Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList, + DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -53,6 +56,7 @@ struct TestRig { chain: Arc>, next_block: Arc>, next_blobs: Option>, + next_data_columns: Option>, attestations: Vec<(Attestation, SubnetId)>, next_block_attestations: Vec<(Attestation, SubnetId)>, next_block_aggregate_attestations: Vec>, @@ -242,7 +246,7 @@ impl TestRig { let network_beacon_processor = Arc::new(network_beacon_processor); let beacon_processor = BeaconProcessor { - network_globals, + network_globals: network_globals.clone(), executor, current_workers: 0, config: beacon_processor_config, @@ -263,15 +267,36 @@ impl TestRig { assert!(beacon_processor.is_ok()); let block = next_block_tuple.0; - let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 { - Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap()) + let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 { + if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + let kzg = get_kzg(&chain.spec); + let custody_columns: DataColumnSidecarList = blobs_to_data_column_sidecars( + &blobs.iter().collect_vec(), + &block, + &kzg, + &chain.spec, + ) + .unwrap() + .into_iter() + .filter(|c| network_globals.sampling_columns.contains(&c.index)) + .collect::>() + .into(); + + (None, Some(custody_columns.into())) + } else { + let blob_sidecars = + BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap(); + (Some(blob_sidecars), None) + } } else { - None + (None, None) }; + Self { chain, next_block: block, next_blobs: blob_sidecars, + next_data_columns: data_columns, attestations, next_block_attestations, next_block_aggregate_attestations, @@ -324,6 +349,22 @@ impl TestRig { } } + pub fn enqueue_gossip_data_columns(&self, col_index: usize) { + if let Some(data_columns) = self.next_data_columns.as_ref() { + let data_column = data_columns.get(col_index).unwrap(); + self.network_beacon_processor + .send_gossip_data_column_sidecar( + junk_message_id(), + junk_peer_id(), + Client::default(), + DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec), + data_column.clone(), + Duration::from_secs(0), + ) + .unwrap(); + } + } + pub fn enqueue_rpc_block(&self) { let block_root = self.next_block.canonical_root(); self.network_beacon_processor @@ -362,6 +403,19 @@ impl TestRig { } } + pub fn enqueue_single_lookup_rpc_data_columns(&self) { + if let Some(data_columns) = self.next_data_columns.clone() { + self.network_beacon_processor + .send_rpc_custody_columns( + self.next_block.canonical_root(), + data_columns, + Duration::default(), + BlockProcessType::SingleCustodyColumn(1), + ) + .unwrap(); + } + } + pub fn enqueue_blobs_by_range_request(&self, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( @@ -621,6 +675,13 @@ async fn import_gossip_block_acceptably_early() { .await; } + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock // and check the head in the time between the block arrived early and when its due for // processing. @@ -697,19 +758,20 @@ async fn import_gossip_block_at_current_slot() { rig.assert_event_journal_completes(&[WorkType::GossipBlock]) .await; - let num_blobs = rig - .next_blobs - .as_ref() - .map(|blobs| blobs.len()) - .unwrap_or(0); - + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); for i in 0..num_blobs { rig.enqueue_gossip_blob(i); - rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar]) .await; } + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + assert_eq!( rig.head_root(), rig.next_block.canonical_root(), @@ -762,11 +824,8 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod ); // Send the block and ensure that the attestation is received back and imported. - let num_blobs = rig - .next_blobs - .as_ref() - .map(|blobs| blobs.len()) - .unwrap_or(0); + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); let mut events = vec![]; match import_method { BlockImportMethod::Gossip => { @@ -776,6 +835,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod rig.enqueue_gossip_blob(i); events.push(WorkType::GossipBlobSidecar); } + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + events.push(WorkType::GossipDataColumnSidecar); + } } BlockImportMethod::Rpc => { rig.enqueue_rpc_block(); @@ -784,6 +847,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod rig.enqueue_single_lookup_rpc_blobs(); events.push(WorkType::RpcBlobs); } + if num_data_columns > 0 { + rig.enqueue_single_lookup_rpc_data_columns(); + events.push(WorkType::RpcCustodyColumn); + } } }; @@ -843,11 +910,8 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod ); // Send the block and ensure that the attestation is received back and imported. - let num_blobs = rig - .next_blobs - .as_ref() - .map(|blobs| blobs.len()) - .unwrap_or(0); + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); let mut events = vec![]; match import_method { BlockImportMethod::Gossip => { @@ -857,6 +921,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod rig.enqueue_gossip_blob(i); events.push(WorkType::GossipBlobSidecar); } + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + events.push(WorkType::GossipDataColumnSidecar) + } } BlockImportMethod::Rpc => { rig.enqueue_rpc_block(); @@ -865,6 +933,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod rig.enqueue_single_lookup_rpc_blobs(); events.push(WorkType::RpcBlobs); } + if num_data_columns > 0 { + rig.enqueue_single_lookup_rpc_data_columns(); + events.push(WorkType::RpcCustodyColumn); + } } }; @@ -1049,12 +1121,18 @@ async fn test_rpc_block_reprocessing() { rig.assert_event_journal_completes(&[WorkType::RpcBlock]) .await; - rig.enqueue_single_lookup_rpc_blobs(); - if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); + if num_blobs > 0 { rig.assert_event_journal_completes(&[WorkType::RpcBlobs]) .await; } + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + if num_data_columns > 0 { + rig.assert_event_journal_completes(&[WorkType::RpcCustodyColumn]) + .await; + } + // next_block shouldn't be processed since it couldn't get the // duplicate cache handle assert_ne!(next_block_root, rig.head_root()); From b48c507c2cc935573decb913230af11e3a3c9bca Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 25 Mar 2025 23:38:59 +1100 Subject: [PATCH 3/3] Fix failing tests and lint. --- beacon_node/network/src/network_beacon_processor/tests.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 28641bdb37e..d05851629e9 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -279,10 +279,9 @@ impl TestRig { .unwrap() .into_iter() .filter(|c| network_globals.sampling_columns.contains(&c.index)) - .collect::>() - .into(); + .collect::>(); - (None, Some(custody_columns.into())) + (None, Some(custody_columns)) } else { let blob_sidecars = BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap(); @@ -1123,12 +1122,14 @@ async fn test_rpc_block_reprocessing() { let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); if num_blobs > 0 { + rig.enqueue_single_lookup_rpc_blobs(); rig.assert_event_journal_completes(&[WorkType::RpcBlobs]) .await; } let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); if num_data_columns > 0 { + rig.enqueue_single_lookup_rpc_data_columns(); rig.assert_event_journal_completes(&[WorkType::RpcCustodyColumn]) .await; }