Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/beacon-node/src/sync/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@ export const EPOCHS_PER_BATCH = 1;
* TODO: When switching branches usually all batches in AwaitingProcessing are dropped, could it be optimized?
*/
export const BATCH_BUFFER_SIZE = Math.ceil(10 / EPOCHS_PER_BATCH);

/**
* Maximum number of concurrent requests to perform with a SyncChain.
* This is according to the spec https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md
*/
export const MAX_CONCURRENT_REQUESTS = 2;

/**
* Maximum number of epochs to download ahead when syncing.
* In fulu, to fully process a batch we may need to download columns from multiple peers
* so having this constant too big is a waste of resources and peers may rate limit us.
*/
export const MAX_LOOK_AHEAD_EPOCHS = 2;
16 changes: 15 additions & 1 deletion packages/beacon-node/src/sync/range/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {CustodyConfig} from "../../util/dataColumns.js";
import {ItTrigger} from "../../util/itTrigger.js";
import {PeerIdStr} from "../../util/peerId.js";
import {wrapError} from "../../util/wrapError.js";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH} from "../constants.js";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, MAX_LOOK_AHEAD_EPOCHS} from "../constants.js";
import {RangeSyncType} from "../utils/remoteSyncType.js";
import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js";
import {
Expand Down Expand Up @@ -409,6 +409,16 @@ export class SyncChain {
return null;
}

// if last processed epoch is n, we don't want to request batches with epoch > n + MAX_LOOK_AHEAD_EPOCHS
// we should have enough batches to process in the buffer: n + 1, ..., n + MAX_LOOK_AHEAD_EPOCHS
// let's focus on redownloading these batches first because it may have to reach different peers to get enough sampled columns
if (
batches.length > 0 &&
Math.max(...batches.map((b) => b.startEpoch)) >= this.lastEpochWithProcessBlocks + MAX_LOOK_AHEAD_EPOCHS
) {
return null;
}

// This line decides the starting epoch of the next batch. MUST ensure no duplicate batch for the same startEpoch
const startEpoch = toBeDownloadedStartEpoch(batches, this.lastEpochWithProcessBlocks);

Expand Down Expand Up @@ -584,6 +594,10 @@ export class SyncChain {
}

this.lastEpochWithProcessBlocks = newLastEpochWithProcessBlocks;
this.logger.verbose("Advanced chain", {
id: this.logId,
lastEpochWithProcessBlocks: this.lastEpochWithProcessBlocks,
});
}

private scrapeMetrics(metrics: Metrics): void {
Expand Down
37 changes: 32 additions & 5 deletions packages/beacon-node/src/sync/range/utils/peerBalancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {CustodyConfig} from "../../../util/dataColumns.js";
import {PeerIdStr} from "../../../util/peerId.js";
import {shuffle} from "../../../util/shuffle.js";
import {sortBy} from "../../../util/sortBy.js";
import {MAX_CONCURRENT_REQUESTS} from "../../constants.js";
import {Batch, BatchStatus} from "../batch.js";
import {ChainTarget} from "./chainTarget.js";

Expand All @@ -20,11 +21,21 @@ export class ChainPeersBalancer {
private peers: PeerSyncInfo[];
private activeRequestsByPeer = new Map<PeerIdStr, number>();
private readonly custodyConfig: CustodyConfig;
private readonly maxConcurrentRequests: number;

// TODO: @matthewkeil check if this needs to be updated for custody groups
constructor(peers: PeerSyncInfo[], batches: Batch[], custodyConfig: CustodyConfig) {
/**
* No need to specify `maxConcurrentRequests` for production code
* It is used for testing purposes to limit the number of concurrent requests
*/
constructor(
peers: PeerSyncInfo[],
batches: Batch[],
custodyConfig: CustodyConfig,
maxConcurrentRequests = MAX_CONCURRENT_REQUESTS
) {
this.peers = shuffle(peers);
this.custodyConfig = custodyConfig;
this.maxConcurrentRequests = maxConcurrentRequests;

// Compute activeRequestsByPeer from all batches internal states
for (const batch of batches) {
Expand Down Expand Up @@ -55,7 +66,17 @@ export class ChainPeersBalancer {
({columns}) => -1 * columns // prefer peers with the most columns
);

return sortedBestPeers.length > 0 ? sortedBestPeers[0].syncInfo : undefined;
if (sortedBestPeers.length > 0) {
const bestPeer = sortedBestPeers[0];
// we will use this peer for batch in SyncChain right after this call
this.activeRequestsByPeer.set(
bestPeer.syncInfo.peerId,
(this.activeRequestsByPeer.get(bestPeer.syncInfo.peerId) ?? 0) + 1
);
return bestPeer.syncInfo;
}

return undefined;
}

/**
Expand All @@ -82,14 +103,20 @@ export class ChainPeersBalancer {
return undefined;
}

private filterPeers(batch: Batch, requestColumns: number[], checkActiveRequest: boolean): PeerInfoColumn[] {
private filterPeers(batch: Batch, requestColumns: number[], noActiveRequest: boolean): PeerInfoColumn[] {
const eligiblePeers: PeerInfoColumn[] = [];

for (const peer of this.peers) {
const {earliestAvailableSlot, custodyGroups, target, peerId} = peer;

const activeRequest = this.activeRequestsByPeer.get(peerId) ?? 0;
if (checkActiveRequest && activeRequest > 0) {
if (noActiveRequest && activeRequest > 0) {
// consumer wants to find peer with no active request, but this peer has active request
continue;
}

if (activeRequest >= this.maxConcurrentRequests) {
// consumer wants to find peer with no more than MAX_CONCURRENT_REQUESTS active requests
continue;
}

Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/test/unit/sync/range/chain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ describe("sync / range / chain", () => {
targetEpoch: 16,
},
{
id: "Simulate sync with a very long range of skipped slots",
// due to BATCH_BUFFER_SIZE and MAX_LOOK_AHEAD_EPOCHS, lodestar cannot deal with unlimited skipped slots
// having a test with 2 epochs of skipped slots is enough to test the logic
// this hasn't happened in any networks as of Aug 2025
id: "Simulate sync with 2 epochs of skipped slots",
startEpoch: 0,
targetEpoch: 16,
skippedSlots: new Set(linspace(3 * SLOTS_PER_EPOCH, 10 * SLOTS_PER_EPOCH)),
skippedSlots: new Set(linspace(3 * SLOTS_PER_EPOCH, 4 * SLOTS_PER_EPOCH)),
},
{
id: "Simulate sync with multiple ranges of bad blocks",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ describe("sync / range / peerBalancer", () => {
custodyColumns: number[][];
targetEpochs: number[];
earliestAvailableSlots: (number | undefined | null)[];
expected: PeerIdStr;
maxConcurrentRequests?: number;
expected?: PeerIdStr;
}[] = [
{
isFulu: true,
Expand Down Expand Up @@ -69,7 +70,20 @@ describe("sync / range / peerBalancer", () => {
},
{
isFulu: true,
// peer3 and peer4 are free but peer4 has more clumns
// same to above but it should not return any peers
// peer3 is free but don't have any custody columns
// peer 4 has unrelated custody column
// peer 2 is busy downloading batch1 and maxConcurrentRequests is 1
// test custody columns condition and maxConcurrentRequests condition
custodyColumns: [[], [0, 1, 2, 3], [4, 5, 6, 7], [100]],
targetEpochs: [1, 2, 3, 4],
earliestAvailableSlots: [0, 0, 0, 0],
maxConcurrentRequests: 1,
expected: undefined,
},
{
isFulu: true,
// peer3 and peer4 are free but peer4 has more columns
// test custody columns condition
custodyColumns: [[], [0, 1, 2, 3], [2, 3, 4, 5], [1, 2, 3, 4]],
targetEpochs: [1, 2, 3, 4],
Expand All @@ -95,7 +109,10 @@ describe("sync / range / peerBalancer", () => {
expected: peer3.peerId,
},
];
for (const [i, {isFulu, custodyColumns, targetEpochs, earliestAvailableSlots, expected}] of testCases.entries()) {
for (const [
i,
{isFulu, custodyColumns, targetEpochs, earliestAvailableSlots, maxConcurrentRequests, expected},
] of testCases.entries()) {
it(`test case ${i}`, async () => {
const columnsByPeer = new Map<PeerIdStr, {custodyColumns: number[]}>();
for (const [i, custody] of custodyColumns.entries()) {
Expand Down Expand Up @@ -133,7 +150,7 @@ describe("sync / range / peerBalancer", () => {
// peer2 is busy downloading batch1
batch1.startDownloading(peer2.peerId);

const peerBalancer = new ChainPeersBalancer(peerInfos, [batch0, batch1], custodyConfig);
const peerBalancer = new ChainPeersBalancer(peerInfos, [batch0, batch1], custodyConfig, maxConcurrentRequests);
expect(peerBalancer.bestPeerToRetryBatch(batch0)?.peerId).toBe(expected);
});
}
Expand Down
Loading