Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,26 @@ where
"Storing split from weak subjectivity state"
);

// Set the store's split point *before* storing genesis so that genesis is stored
// immediately in the freezer DB.
// Set the store's split point *before* storing genesis so that if the genesis state
// is prior to the split slot, it will immediately be stored in the freezer DB.
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);

// It is also possible for the checkpoint state to be equal to the genesis state, in which
// case it will be stored in the hot DB. In this case, we need to ensure the store's anchor
// is initialised prior to storing the state, as the anchor is required for working out
// hdiff storage strategies.
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(
weak_subj_block.parent_root(),
weak_subj_block.slot(),
weak_subj_slot,
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);

let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
self = updated_builder;

Expand All @@ -541,20 +558,6 @@ where
"Stored frozen block roots at skipped slots"
);

// Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store
// states that do not align with the `start_slot` grid.
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(
weak_subj_block.parent_root(),
weak_subj_block.slot(),
weak_subj_slot,
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);

// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
// about on a crash restart.
store
Expand Down
212 changes: 129 additions & 83 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2281,6 +2281,19 @@ async fn weak_subjectivity_sync_skips_at_genesis() {
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}

// Checkpoint sync from the genesis state.
//
// This is a regression test for a bug we had involving the storage of the genesis state in the hot
// DB.
#[tokio::test]
async fn weak_subjectivity_sync_from_genesis() {
let start_slot = 1;
let end_slot = E::slots_per_epoch() * 2;
let slots = (start_slot..end_slot).map(Slot::new).collect();
let checkpoint_slot = Slot::new(0);
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}

async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
// Build an initial chain on one harness, representing a synced node with full history.
let num_final_blocks = E::slots_per_epoch() * 2;
Expand Down Expand Up @@ -2367,7 +2380,15 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
);
slot_clock.set_slot(harness.get_current_slot().as_u64());

let chain_config = ChainConfig {
// Set reconstruct_historic_states to true from the start in the genesis case. This makes
// some of the later checks more uniform across the genesis/non-genesis cases.
reconstruct_historic_states: checkpoint_slot == 0,
..ChainConfig::default()
};

let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec, kzg)
.chain_config(chain_config)
.store(store.clone())
.custom_spec(test_spec::<E>().into())
.task_executor(harness.chain.task_executor.clone())
Expand All @@ -2381,7 +2402,6 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.store_migrator_config(MigratorConfig::default().blocking())
.slot_clock(slot_clock)
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(1)))
.execution_layer(Some(mock.el))
.rng(Box::new(StdRng::seed_from_u64(42)))
Expand Down Expand Up @@ -2449,96 +2469,118 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
}

// Forwards iterator from 0 should fail as we lack blocks.
assert!(matches!(
beacon_chain.forwards_iter_block_roots(Slot::new(0)),
Err(BeaconChainError::HistoricalBlockOutOfRange { .. })
));

// Simulate processing of a `StatusMessage` with an older finalized epoch by calling
// `block_root_at_slot` with an old slot for which we don't know the block root. It should
// return `None` rather than erroring.
assert_eq!(
beacon_chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap(),
None
);
if checkpoint_slot != 0 {
// Forwards iterator from 0 should fail as we lack blocks (unless checkpoint slot is 0).
assert!(matches!(
beacon_chain.forwards_iter_block_roots(Slot::new(0)),
Err(BeaconChainError::HistoricalBlockOutOfRange { .. })
));
} else {
assert_eq!(
beacon_chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.next()
.unwrap()
.unwrap(),
(wss_block_root, Slot::new(0))
);
}

// Simulate querying the API for a historic state that is unknown. It should also return
// `None` rather than erroring.
assert_eq!(beacon_chain.state_root_at_slot(Slot::new(1)).unwrap(), None);
// The checks in this block only make sense if some data is missing as a result of the
// checkpoint sync, i.e. if we are not just checkpoint syncing from genesis.
if checkpoint_slot != 0 {
// Simulate processing of a `StatusMessage` with an older finalized epoch by calling
// `block_root_at_slot` with an old slot for which we don't know the block root. It should
// return `None` rather than erroring.
assert_eq!(
beacon_chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap(),
None
);

// Supply blocks backwards to reach genesis. Omit the genesis block to check genesis handling.
let historical_blocks = chain_dump[..wss_block.slot().as_usize()]
.iter()
.filter(|s| s.beacon_block.slot() != 0)
.map(|s| s.beacon_block.clone())
.collect::<Vec<_>>();
// Simulate querying the API for a historic state that is unknown. It should also return
// `None` rather than erroring.
assert_eq!(beacon_chain.state_root_at_slot(Slot::new(1)).unwrap(), None);

// Supply blocks backwards to reach genesis. Omit the genesis block to check genesis handling.
let historical_blocks = chain_dump[..wss_block.slot().as_usize()]
.iter()
.filter(|s| s.beacon_block.slot() != 0)
.map(|s| s.beacon_block.clone())
.collect::<Vec<_>>();

let mut available_blocks = vec![];
for blinded in historical_blocks {
let block_root = blinded.canonical_root();
let full_block = harness
.chain
.get_block(&block_root)
.await
.expect("should get block")
.expect("should get block");

let mut available_blocks = vec![];
for blinded in historical_blocks {
let block_root = blinded.canonical_root();
let full_block = harness
.chain
.get_block(&block_root)
.await
.expect("should get block")
.expect("should get block");
if let MaybeAvailableBlock::Available(block) = harness
.chain
.data_availability_checker
.verify_kzg_for_rpc_block(
harness
.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
)
.expect("should verify kzg")
{
available_blocks.push(block);
}
}

if let MaybeAvailableBlock::Available(block) = harness
.chain
.data_availability_checker
.verify_kzg_for_rpc_block(
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
// Corrupt the signature on the 1st block to ensure that the backfill processor is checking
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
let mut batch_with_invalid_first_block =
available_blocks.iter().map(clone_block).collect::<Vec<_>>();
batch_with_invalid_first_block[0] = {
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
let mut corrupt_block = (*block).clone();
*corrupt_block.signature_mut() = Signature::empty();
AvailableBlock::__new_for_testing(
block_root,
Arc::new(corrupt_block),
data,
Arc::new(spec),
)
.expect("should verify kzg")
{
available_blocks.push(block);
}
}
};

// Importing the invalid batch should error.
assert!(matches!(
beacon_chain
.import_historical_block_batch(batch_with_invalid_first_block)
.unwrap_err(),
HistoricalBlockError::InvalidSignature
));

// Corrupt the signature on the 1st block to ensure that the backfill processor is checking
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
let mut batch_with_invalid_first_block =
available_blocks.iter().map(clone_block).collect::<Vec<_>>();
batch_with_invalid_first_block[0] = {
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
let mut corrupt_block = (*block).clone();
*corrupt_block.signature_mut() = Signature::empty();
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec))
};
let available_blocks_slots = available_blocks
.iter()
.map(|block| (block.block().slot(), block.block().canonical_root()))
.collect::<Vec<_>>();
info!(
?available_blocks_slots,
"wss_block_slot" = wss_block.slot().as_usize(),
"Importing historical block batch"
);

// Importing the invalid batch should error.
assert!(matches!(
// Importing the batch with valid signatures should succeed.
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), wss_block.slot());
beacon_chain
.import_historical_block_batch(batch_with_invalid_first_block)
.unwrap_err(),
HistoricalBlockError::InvalidSignature
));

let available_blocks_slots = available_blocks
.iter()
.map(|block| (block.block().slot(), block.block().canonical_root()))
.collect::<Vec<_>>();
info!(
?available_blocks_slots,
"wss_block_slot" = wss_block.slot().as_usize(),
"Importing historical block batch"
);

// Importing the batch with valid signatures should succeed.
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), wss_block.slot());
beacon_chain
.import_historical_block_batch(available_blocks_dup)
.unwrap();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
.import_historical_block_batch(available_blocks_dup)
.unwrap();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);

// Resupplying the blocks should not fail, they can be safely ignored.
beacon_chain
.import_historical_block_batch(available_blocks)
.unwrap();
// Resupplying the blocks should not fail, they can be safely ignored.
beacon_chain
.import_historical_block_batch(available_blocks)
.unwrap();
}

// Sanity check for non-aligned WSS starts, to make sure the WSS block is persisted properly
if wss_block_slot != wss_state_slot {
Expand Down Expand Up @@ -2615,7 +2657,11 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
assert_eq!(
store.get_anchor_info().state_upper_limit,
Slot::new(u64::MAX)
if checkpoint_slot == 0 {
Slot::new(0)
} else {
Slot::new(u64::MAX)
}
);
info!(anchor = ?store.get_anchor_info(), "anchor pre");

Expand Down
6 changes: 6 additions & 0 deletions beacon_node/store/src/reconstruct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ where
let lower_limit_slot = anchor.state_lower_limit;
let upper_limit_slot = std::cmp::min(split.slot, anchor.state_upper_limit);

// If the split is at 0 we can't reconstruct historic states.
if split.slot == 0 {
debug!("No state reconstruction possible");
return Ok(());
}

// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
// of the state at slot `lower_limit_slot + num_blocks`.
Expand Down