From 2035411eb104ecc0bf6ad79948e351cc2fdee18e Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 7 Aug 2025 15:14:52 +0700 Subject: [PATCH 1/5] feat: enforce no more than MAX_CONCURRENT_REQUESTS per peer in sync --- .../src/sync/range/utils/peerBalancer.ts | 30 ++++++++++++++++--- .../sync/range/utils/peerBalancer.test.ts | 25 +++++++++++++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts index 6d48f375d013..4ced2d9400a1 100644 --- a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts +++ b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts @@ -12,6 +12,12 @@ export type PeerSyncInfo = PeerSyncMeta & { type PeerInfoColumn = {syncInfo: PeerSyncInfo; columns: number; hasEarliestAvailableSlots: boolean}; +/** + * Maximum number of concurrent requests to perform with a SyncChain. + * This is according to the spec https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md + */ +const MAX_CONCURRENT_REQUESTS = 2; + /** * Balance and organize peers to perform requests with a SyncChain * Shuffles peers only once on instantiation @@ -20,11 +26,21 @@ export class ChainPeersBalancer { private peers: PeerSyncInfo[]; private activeRequestsByPeer = new Map(); private readonly custodyConfig: CustodyConfig; + private readonly maxConcurrentRequests: number; - // TODO: @matthewkeil check if this needs to be updated for custody groups - constructor(peers: PeerSyncInfo[], batches: Batch[], custodyConfig: CustodyConfig) { + /** + * No need to specify `maxConcurrentRequests` for production code + * It is used for testing purposes to limit the number of concurrent requests + */ + constructor( + peers: PeerSyncInfo[], + batches: Batch[], + custodyConfig: CustodyConfig, + maxConcurrentRequests = MAX_CONCURRENT_REQUESTS + ) { this.peers = shuffle(peers); this.custodyConfig = custodyConfig; + this.maxConcurrentRequests = maxConcurrentRequests; // Compute activeRequestsByPeer from all batches internal states for (const batch of batches) { @@ -82,14 +98,20 @@ export class ChainPeersBalancer { return undefined; } - private filterPeers(batch: Batch, requestColumns: number[], checkActiveRequest: boolean): PeerInfoColumn[] { + private filterPeers(batch: Batch, requestColumns: number[], noActiveRequest: boolean): PeerInfoColumn[] { const eligiblePeers: PeerInfoColumn[] = []; for (const peer of this.peers) { const {earliestAvailableSlot, custodyGroups, target, peerId} = peer; const activeRequest = this.activeRequestsByPeer.get(peerId) ?? 0; - if (checkActiveRequest && activeRequest > 0) { + if (noActiveRequest && activeRequest > 0) { + // consumer wants to find peer with no active request, but this peer has active request + continue; + } + + if (!noActiveRequest && activeRequest >= this.maxConcurrentRequests) { + // consumer wants to find peer with no more than MAX_CONCURRENT_REQUESTS active requests continue; } diff --git a/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts b/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts index 5a0c5779d32a..a202dc6e433e 100644 --- a/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts +++ b/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts @@ -25,7 +25,8 @@ describe("sync / range / peerBalancer", () => { custodyColumns: number[][]; targetEpochs: number[]; earliestAvailableSlots: (number | undefined | null)[]; - expected: PeerIdStr; + maxConcurrentRequests?: number; + expected?: PeerIdStr; }[] = [ { isFulu: true, @@ -69,7 +70,20 @@ describe("sync / range / peerBalancer", () => { }, { isFulu: true, - // peer3 and peer4 are free but peer4 has more clumns + // same to above but it should not return any peers + // peer3 is free but don't have any custody columns + // peer 4 has unrelated custody column + // peer 2 is busy downloading batch1 and maxConcurrentRequests is 1 + // test custody columns condition and maxConcurrentRequests condition + custodyColumns: [[], [0, 1, 2, 3], [4, 5, 6, 7], [100]], + targetEpochs: [1, 2, 3, 4], + earliestAvailableSlots: [0, 0, 0, 0], + maxConcurrentRequests: 1, + expected: undefined, + }, + { + isFulu: true, + // peer3 and peer4 are free but peer4 has more columns // test custody columns condition custodyColumns: [[], [0, 1, 2, 3], [2, 3, 4, 5], [1, 2, 3, 4]], targetEpochs: [1, 2, 3, 4], @@ -95,7 +109,10 @@ describe("sync / range / peerBalancer", () => { expected: peer3.peerId, }, ]; - for (const [i, {isFulu, custodyColumns, targetEpochs, earliestAvailableSlots, expected}] of testCases.entries()) { + for (const [ + i, + {isFulu, custodyColumns, targetEpochs, earliestAvailableSlots, maxConcurrentRequests, expected}, + ] of testCases.entries()) { it(`test case ${i}`, async () => { const columnsByPeer = new Map(); for (const [i, custody] of custodyColumns.entries()) { @@ -133,7 +150,7 @@ describe("sync / range / peerBalancer", () => { // peer2 is busy downloading batch1 batch1.startDownloading(peer2.peerId); - const peerBalancer = new ChainPeersBalancer(peerInfos, [batch0, batch1], custodyConfig); + const peerBalancer = new ChainPeersBalancer(peerInfos, [batch0, batch1], custodyConfig, maxConcurrentRequests); expect(peerBalancer.bestPeerToRetryBatch(batch0)?.peerId).toBe(expected); }); } From c8a4a298e0daa80b478d6fa7ee701dff2efa0e10 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 7 Aug 2025 15:52:33 +0700 Subject: [PATCH 2/5] feat: limit load ahead epoch in sync --- packages/beacon-node/src/sync/constants.ts | 13 +++++++++++++ packages/beacon-node/src/sync/range/chain.ts | 12 +++++++++++- .../src/sync/range/utils/peerBalancer.ts | 7 +------ .../beacon-node/test/unit/sync/range/chain.test.ts | 7 +++++-- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/packages/beacon-node/src/sync/constants.ts b/packages/beacon-node/src/sync/constants.ts index 872b93be8749..83c94331cd27 100644 --- a/packages/beacon-node/src/sync/constants.ts +++ b/packages/beacon-node/src/sync/constants.ts @@ -51,3 +51,16 @@ export const EPOCHS_PER_BATCH = 1; * TODO: When switching branches usually all batches in AwaitingProcessing are dropped, could it be optimized? */ export const BATCH_BUFFER_SIZE = Math.ceil(10 / EPOCHS_PER_BATCH); + +/** + * Maximum number of concurrent requests to perform with a SyncChain. + * This is according to the spec https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md + */ +export const MAX_CONCURRENT_REQUESTS = 2; + +/** + * Maximum number of epochs to download ahead when syncing. + * In fulu, to fully process a batch we may need to download columns from multiple peers + * so having this constant too big is a waste of resources and peers may rate limit us. + */ +export const MAX_LOOK_AHEAD_EPOCHS = 2; diff --git a/packages/beacon-node/src/sync/range/chain.ts b/packages/beacon-node/src/sync/range/chain.ts index 2e45e8cdce66..60884652971b 100644 --- a/packages/beacon-node/src/sync/range/chain.ts +++ b/packages/beacon-node/src/sync/range/chain.ts @@ -11,7 +11,7 @@ import {CustodyConfig} from "../../util/dataColumns.js"; import {ItTrigger} from "../../util/itTrigger.js"; import {PeerIdStr} from "../../util/peerId.js"; import {wrapError} from "../../util/wrapError.js"; -import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH} from "../constants.js"; +import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, MAX_LOOK_AHEAD_EPOCHS} from "../constants.js"; import {RangeSyncType} from "../utils/remoteSyncType.js"; import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js"; import { @@ -409,6 +409,16 @@ export class SyncChain { return null; } + // if last processed epoch is n, we don't want to request batches with epoch > n + MAX_LOOK_AHEAD_EPOCHS + // we should have enough batches to process in the buffer: n + 1, ..., n + MAX_LOOK_AHEAD_EPOCHS + // let's focus on redownloading these batches first because it may have to reach different peers to get enough sampled columns + if ( + batches.length > 0 && + Math.max(...batches.map((b) => b.startEpoch)) >= this.lastEpochWithProcessBlocks + MAX_LOOK_AHEAD_EPOCHS + ) { + return null; + } + // This line decides the starting epoch of the next batch. MUST ensure no duplicate batch for the same startEpoch const startEpoch = toBeDownloadedStartEpoch(batches, this.lastEpochWithProcessBlocks); diff --git a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts index 4ced2d9400a1..be186018aa9b 100644 --- a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts +++ b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts @@ -3,6 +3,7 @@ import {CustodyConfig} from "../../../util/dataColumns.js"; import {PeerIdStr} from "../../../util/peerId.js"; import {shuffle} from "../../../util/shuffle.js"; import {sortBy} from "../../../util/sortBy.js"; +import {MAX_CONCURRENT_REQUESTS} from "../../constants.js"; import {Batch, BatchStatus} from "../batch.js"; import {ChainTarget} from "./chainTarget.js"; @@ -12,12 +13,6 @@ export type PeerSyncInfo = PeerSyncMeta & { type PeerInfoColumn = {syncInfo: PeerSyncInfo; columns: number; hasEarliestAvailableSlots: boolean}; -/** - * Maximum number of concurrent requests to perform with a SyncChain. - * This is according to the spec https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md - */ -const MAX_CONCURRENT_REQUESTS = 2; - /** * Balance and organize peers to perform requests with a SyncChain * Shuffles peers only once on instantiation diff --git a/packages/beacon-node/test/unit/sync/range/chain.test.ts b/packages/beacon-node/test/unit/sync/range/chain.test.ts index e72bd71c5113..fcee8d16de6a 100644 --- a/packages/beacon-node/test/unit/sync/range/chain.test.ts +++ b/packages/beacon-node/test/unit/sync/range/chain.test.ts @@ -28,10 +28,13 @@ describe("sync / range / chain", () => { targetEpoch: 16, }, { - id: "Simulate sync with a very long range of skipped slots", + // due to BATCH_BUFFER_SIZE and MAX_LOOK_AHEAD_EPOCHS, lodestar cannot deal with unlimited skipped slots + // having a test with 2 epochs of skipped slots is enough to test the logic + // this hasn't happened in any networks as of Aug 2025 + id: "Simulate sync with 2 epochs of skipped slots", startEpoch: 0, targetEpoch: 16, - skippedSlots: new Set(linspace(3 * SLOTS_PER_EPOCH, 10 * SLOTS_PER_EPOCH)), + skippedSlots: new Set(linspace(3 * SLOTS_PER_EPOCH, 4 * SLOTS_PER_EPOCH)), }, { id: "Simulate sync with multiple ranges of bad blocks", From 3a5155322eee3113e52857f674c24334a11c4e5d Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 8 Aug 2025 10:01:11 +0700 Subject: [PATCH 3/5] fix: update activeRequestsByPeer after using a peer --- .../beacon-node/src/sync/range/utils/peerBalancer.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts index be186018aa9b..2dbea73d03b1 100644 --- a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts +++ b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts @@ -66,7 +66,17 @@ export class ChainPeersBalancer { ({columns}) => -1 * columns // prefer peers with the most columns ); - return sortedBestPeers.length > 0 ? sortedBestPeers[0].syncInfo : undefined; + if (sortedBestPeers.length > 0) { + const bestPeer = sortedBestPeers[0]; + // we will use this peer for batch in SyncChain right after this call + this.activeRequestsByPeer.set( + bestPeer.syncInfo.peerId, + (this.activeRequestsByPeer.get(bestPeer.syncInfo.peerId) ?? 0) + 1 + ); + return bestPeer.syncInfo; + } + + return undefined; } /** From eb31a1f2962280170fb0f06e9bbea81d23786fca Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 8 Aug 2025 10:53:38 +0700 Subject: [PATCH 4/5] chore: log advanceChain --- packages/beacon-node/src/sync/range/chain.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/beacon-node/src/sync/range/chain.ts b/packages/beacon-node/src/sync/range/chain.ts index 60884652971b..4251b6586b45 100644 --- a/packages/beacon-node/src/sync/range/chain.ts +++ b/packages/beacon-node/src/sync/range/chain.ts @@ -594,6 +594,10 @@ export class SyncChain { } this.lastEpochWithProcessBlocks = newLastEpochWithProcessBlocks; + this.logger.verbose("Advanced chain", { + id: this.logId, + lastEpochWithProcessBlocks: this.lastEpochWithProcessBlocks, + }); } private scrapeMetrics(metrics: Metrics): void { From 8345f317b33998384b49495724f50f50db156ea4 Mon Sep 17 00:00:00 2001 From: Cayman Date: Mon, 11 Aug 2025 10:02:17 -0400 Subject: [PATCH 5/5] Update packages/beacon-node/src/sync/range/utils/peerBalancer.ts --- packages/beacon-node/src/sync/range/utils/peerBalancer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts index 2dbea73d03b1..ea9b8a32b54f 100644 --- a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts +++ b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts @@ -115,7 +115,7 @@ export class ChainPeersBalancer { continue; } - if (!noActiveRequest && activeRequest >= this.maxConcurrentRequests) { + if (activeRequest >= this.maxConcurrentRequests) { // consumer wants to find peer with no more than MAX_CONCURRENT_REQUESTS active requests continue; }