Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 196 additions & 1 deletion beacon_node/network/src/sync/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ enum ByRangeDataRequestIds {
PostPeerDAS(Vec<(DataColumnsByRangeRequestId, PeerId)>),
}

type BlocksByRootRequestData = (BlocksByRootRequestId, PeerId, BlocksByRootRequest);

type BlobsByRootRequestData = (BlobsByRootRequestId, PeerId, BlobsByRootRequest);

type DataColumnsByRootRequestData = (DataColumnsByRootRequestId, PeerId, DataColumnsByRootRequest);

/// Sync tests are usually written in the form:
/// - Do some action
/// - Expect a request to be sent
Expand All @@ -47,9 +53,12 @@ enum ByRangeDataRequestIds {
/// _which_ request to complete. Picking the right request is critical for tests to pass, so this
/// filter allows better expressivity on the criteria to identify the right request.
#[derive(Default, Debug, Clone)]
struct RequestFilter {
pub struct RequestFilter {
peer: Option<PeerId>,
epoch: Option<u64>,
block_root: Option<Hash256>,
column_index: Option<u64>,
header_requests_only: bool,
}

impl RequestFilter {
Expand All @@ -62,12 +71,198 @@ impl RequestFilter {
self.epoch = Some(epoch);
self
}

pub fn block_root(mut self, block_root: Hash256) -> Self {
self.block_root = Some(block_root);
self
}

pub fn column_index(mut self, index: u64) -> Self {
self.column_index = Some(index);
self
}

pub fn header_requests_only(mut self) -> Self {
self.header_requests_only = true;
self
}

fn blocks_by_root_requests<E: EthSpec>(
&self,
ev: &NetworkMessage<E>,
) -> Option<BlocksByRootRequestData> {
match ev {
NetworkMessage::SendRequest {
peer_id,
request: RequestType::BlocksByRoot(req),
app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRoot(id)),
} if self.matches_blocks_by_root(peer_id, req, id) => {
Some((*id, *peer_id, req.clone()))
}
_ => None,
}
}

fn blobs_by_root_requests<E: EthSpec>(
&self,
ev: &NetworkMessage<E>,
) -> Option<BlobsByRootRequestData> {
match ev {
NetworkMessage::SendRequest {
peer_id,
request: RequestType::BlobsByRoot(req),
app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRoot(id)),
} if self.matches_blobs_by_root(peer_id, req) => Some((*id, *peer_id, req.clone())),
_ => None,
}
}

fn data_columns_by_root_requests<E: EthSpec>(
&self,
ev: &NetworkMessage<E>,
) -> Option<DataColumnsByRootRequestData> {
match ev {
NetworkMessage::SendRequest {
peer_id,
request: RequestType::DataColumnsByRoot(req),
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
} if self.matches_data_columns_by_root(peer_id, req) => {
Some((*id, *peer_id, req.clone()))
}
_ => None,
}
}

fn matches_blocks_by_root(
&self,
peer: &PeerId,
req: &BlocksByRootRequest,
id: &BlocksByRootRequestId,
) -> bool {
if self.header_requests_only
&& !matches!(id.parent_request_id, BlocksByRootRequester::Header(_))
{
return false;
}

if let Some(block_root) = self.block_root {
if !req.block_roots().iter().any(|b| *b == block_root) {
return false;
}
}

self.matches_peer(peer)
}

fn matches_blobs_by_root(&self, peer: &PeerId, req: &BlobsByRootRequest) -> bool {
if self.header_requests_only {
return false;
}

if let Some(block_root) = self.block_root {
if !req.blob_ids.iter().any(|id| id.block_root == block_root) {
return false;
}
}

self.matches_peer(peer)
}

fn matches_data_columns_by_root(&self, peer: &PeerId, req: &DataColumnsByRootRequest) -> bool {
if self.header_requests_only {
return false;
}

if let Some(index) = self.column_index {
if !req
.data_column_ids
.iter()
.any(|id| id.columns.iter().any(|i| *i == index))
{
return false;
}
}
self.matches_peer(peer)
}

fn matches_common(&self, peer: &PeerId, start_slot: u64) -> bool {
if let Some(expected_epoch) = self.epoch {
let epoch = Slot::new(start_slot).epoch(E::slots_per_epoch()).as_u64();
if epoch != expected_epoch {
return false;
}
}
self.matches_peer(peer)
}

fn matches_peer(&self, peer: &PeerId) -> bool {
if let Some(expected_peer) = self.peer {
if *peer != expected_peer {
return false;
}
}
true
}
}

fn filter() -> RequestFilter {
RequestFilter::default()
}

/// Instruct the testing rig how to complete requests for _by_range requests
#[derive(Default, Debug, Clone)]
pub struct CompleteConfig {
block_count: usize,
with_data: bool,
custody_failure_at_index: Option<u64>,
rpc_error: Option<RPCError>,
empty_sampling_response_once: bool,
stop_at_block: Option<Hash256>,
return_wrong_blocks: bool,
return_no_blocks_n_times: usize,
process_error: bool,
}

impl CompleteConfig {
pub fn custody_failure_at_index(mut self, index: u64) -> Self {
self.custody_failure_at_index = Some(index);
self
}

pub fn rpc_error(mut self, error: RPCError) -> Self {
self.rpc_error = Some(error);
self
}

pub fn rpc_error_response(self, error: RpcErrorResponse) -> Self {
self.rpc_error(RPCError::ErrorResponse(error, "".to_owned()))
}

pub fn empty_sampling_response_once(mut self) -> Self {
self.empty_sampling_response_once = true;
self
}

pub fn stop_at_block(mut self, block: Hash256) -> Self {
self.stop_at_block = Some(block);
self
}

pub fn return_wrong_blocks(mut self) -> Self {
self.return_wrong_blocks = true;
self
}

pub fn return_no_blocks(self) -> Self {
self.return_no_blocks_n_times(usize::MAX)
}

pub fn return_no_blocks_n_times(mut self, n_times: usize) -> Self {
self.return_no_blocks_n_times = n_times;
self
}
}

impl TestRig {
/// Produce a head peer with an advanced head
fn add_head_peer(&mut self) -> PeerId {
Expand Down
Loading