diff --git a/packages/beacon-node/src/sync/constants.ts b/packages/beacon-node/src/sync/constants.ts index 872b93be8749..83c94331cd27 100644 --- a/packages/beacon-node/src/sync/constants.ts +++ b/packages/beacon-node/src/sync/constants.ts @@ -51,3 +51,16 @@ export const EPOCHS_PER_BATCH = 1; * TODO: When switching branches usually all batches in AwaitingProcessing are dropped, could it be optimized? */ export const BATCH_BUFFER_SIZE = Math.ceil(10 / EPOCHS_PER_BATCH); + +/** + * Maximum number of concurrent requests to perform with a SyncChain. + * This is according to the spec https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md + */ +export const MAX_CONCURRENT_REQUESTS = 2; + +/** + * Maximum number of epochs to download ahead when syncing. + * In fulu, to fully process a batch we may need to download columns from multiple peers + * so having this constant too big is a waste of resources and peers may rate limit us. + */ +export const MAX_LOOK_AHEAD_EPOCHS = 2; diff --git a/packages/beacon-node/src/sync/range/chain.ts b/packages/beacon-node/src/sync/range/chain.ts index 2e45e8cdce66..4251b6586b45 100644 --- a/packages/beacon-node/src/sync/range/chain.ts +++ b/packages/beacon-node/src/sync/range/chain.ts @@ -11,7 +11,7 @@ import {CustodyConfig} from "../../util/dataColumns.js"; import {ItTrigger} from "../../util/itTrigger.js"; import {PeerIdStr} from "../../util/peerId.js"; import {wrapError} from "../../util/wrapError.js"; -import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH} from "../constants.js"; +import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, MAX_LOOK_AHEAD_EPOCHS} from "../constants.js"; import {RangeSyncType} from "../utils/remoteSyncType.js"; import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js"; import { @@ -409,6 +409,16 @@ export class SyncChain { return null; } + // if last processed epoch is n, we don't want to request batches with epoch > n + MAX_LOOK_AHEAD_EPOCHS + // we should have enough batches to process in the buffer: n + 1, ..., n + MAX_LOOK_AHEAD_EPOCHS + // let's focus on redownloading these batches first because it may have to reach different peers to get enough sampled columns + if ( + batches.length > 0 && + Math.max(...batches.map((b) => b.startEpoch)) >= this.lastEpochWithProcessBlocks + MAX_LOOK_AHEAD_EPOCHS + ) { + return null; + } + // This line decides the starting epoch of the next batch. MUST ensure no duplicate batch for the same startEpoch const startEpoch = toBeDownloadedStartEpoch(batches, this.lastEpochWithProcessBlocks); @@ -584,6 +594,10 @@ export class SyncChain { } this.lastEpochWithProcessBlocks = newLastEpochWithProcessBlocks; + this.logger.verbose("Advanced chain", { + id: this.logId, + lastEpochWithProcessBlocks: this.lastEpochWithProcessBlocks, + }); } private scrapeMetrics(metrics: Metrics): void { diff --git a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts index 6d48f375d013..ea9b8a32b54f 100644 --- a/packages/beacon-node/src/sync/range/utils/peerBalancer.ts +++ b/packages/beacon-node/src/sync/range/utils/peerBalancer.ts @@ -3,6 +3,7 @@ import {CustodyConfig} from "../../../util/dataColumns.js"; import {PeerIdStr} from "../../../util/peerId.js"; import {shuffle} from "../../../util/shuffle.js"; import {sortBy} from "../../../util/sortBy.js"; +import {MAX_CONCURRENT_REQUESTS} from "../../constants.js"; import {Batch, BatchStatus} from "../batch.js"; import {ChainTarget} from "./chainTarget.js"; @@ -20,11 +21,21 @@ export class ChainPeersBalancer { private peers: PeerSyncInfo[]; private activeRequestsByPeer = new Map(); private readonly custodyConfig: CustodyConfig; + private readonly maxConcurrentRequests: number; - // TODO: @matthewkeil check if this needs to be updated for custody groups - constructor(peers: PeerSyncInfo[], batches: Batch[], custodyConfig: CustodyConfig) { + /** + * No need to specify `maxConcurrentRequests` for production code + * It is used for testing purposes to limit the number of concurrent requests + */ + constructor( + peers: PeerSyncInfo[], + batches: Batch[], + custodyConfig: CustodyConfig, + maxConcurrentRequests = MAX_CONCURRENT_REQUESTS + ) { this.peers = shuffle(peers); this.custodyConfig = custodyConfig; + this.maxConcurrentRequests = maxConcurrentRequests; // Compute activeRequestsByPeer from all batches internal states for (const batch of batches) { @@ -55,7 +66,17 @@ export class ChainPeersBalancer { ({columns}) => -1 * columns // prefer peers with the most columns ); - return sortedBestPeers.length > 0 ? sortedBestPeers[0].syncInfo : undefined; + if (sortedBestPeers.length > 0) { + const bestPeer = sortedBestPeers[0]; + // we will use this peer for batch in SyncChain right after this call + this.activeRequestsByPeer.set( + bestPeer.syncInfo.peerId, + (this.activeRequestsByPeer.get(bestPeer.syncInfo.peerId) ?? 0) + 1 + ); + return bestPeer.syncInfo; + } + + return undefined; } /** @@ -82,14 +103,20 @@ export class ChainPeersBalancer { return undefined; } - private filterPeers(batch: Batch, requestColumns: number[], checkActiveRequest: boolean): PeerInfoColumn[] { + private filterPeers(batch: Batch, requestColumns: number[], noActiveRequest: boolean): PeerInfoColumn[] { const eligiblePeers: PeerInfoColumn[] = []; for (const peer of this.peers) { const {earliestAvailableSlot, custodyGroups, target, peerId} = peer; const activeRequest = this.activeRequestsByPeer.get(peerId) ?? 0; - if (checkActiveRequest && activeRequest > 0) { + if (noActiveRequest && activeRequest > 0) { + // consumer wants to find peer with no active request, but this peer has active request + continue; + } + + if (activeRequest >= this.maxConcurrentRequests) { + // consumer wants to find peer with no more than MAX_CONCURRENT_REQUESTS active requests continue; } diff --git a/packages/beacon-node/test/unit/sync/range/chain.test.ts b/packages/beacon-node/test/unit/sync/range/chain.test.ts index e72bd71c5113..fcee8d16de6a 100644 --- a/packages/beacon-node/test/unit/sync/range/chain.test.ts +++ b/packages/beacon-node/test/unit/sync/range/chain.test.ts @@ -28,10 +28,13 @@ describe("sync / range / chain", () => { targetEpoch: 16, }, { - id: "Simulate sync with a very long range of skipped slots", + // due to BATCH_BUFFER_SIZE and MAX_LOOK_AHEAD_EPOCHS, lodestar cannot deal with unlimited skipped slots + // having a test with 2 epochs of skipped slots is enough to test the logic + // this hasn't happened in any networks as of Aug 2025 + id: "Simulate sync with 2 epochs of skipped slots", startEpoch: 0, targetEpoch: 16, - skippedSlots: new Set(linspace(3 * SLOTS_PER_EPOCH, 10 * SLOTS_PER_EPOCH)), + skippedSlots: new Set(linspace(3 * SLOTS_PER_EPOCH, 4 * SLOTS_PER_EPOCH)), }, { id: "Simulate sync with multiple ranges of bad blocks", diff --git a/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts b/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts index 5a0c5779d32a..a202dc6e433e 100644 --- a/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts +++ b/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts @@ -25,7 +25,8 @@ describe("sync / range / peerBalancer", () => { custodyColumns: number[][]; targetEpochs: number[]; earliestAvailableSlots: (number | undefined | null)[]; - expected: PeerIdStr; + maxConcurrentRequests?: number; + expected?: PeerIdStr; }[] = [ { isFulu: true, @@ -69,7 +70,20 @@ describe("sync / range / peerBalancer", () => { }, { isFulu: true, - // peer3 and peer4 are free but peer4 has more clumns + // same to above but it should not return any peers + // peer3 is free but don't have any custody columns + // peer 4 has unrelated custody column + // peer 2 is busy downloading batch1 and maxConcurrentRequests is 1 + // test custody columns condition and maxConcurrentRequests condition + custodyColumns: [[], [0, 1, 2, 3], [4, 5, 6, 7], [100]], + targetEpochs: [1, 2, 3, 4], + earliestAvailableSlots: [0, 0, 0, 0], + maxConcurrentRequests: 1, + expected: undefined, + }, + { + isFulu: true, + // peer3 and peer4 are free but peer4 has more columns // test custody columns condition custodyColumns: [[], [0, 1, 2, 3], [2, 3, 4, 5], [1, 2, 3, 4]], targetEpochs: [1, 2, 3, 4], @@ -95,7 +109,10 @@ describe("sync / range / peerBalancer", () => { expected: peer3.peerId, }, ]; - for (const [i, {isFulu, custodyColumns, targetEpochs, earliestAvailableSlots, expected}] of testCases.entries()) { + for (const [ + i, + {isFulu, custodyColumns, targetEpochs, earliestAvailableSlots, maxConcurrentRequests, expected}, + ] of testCases.entries()) { it(`test case ${i}`, async () => { const columnsByPeer = new Map(); for (const [i, custody] of custodyColumns.entries()) { @@ -133,7 +150,7 @@ describe("sync / range / peerBalancer", () => { // peer2 is busy downloading batch1 batch1.startDownloading(peer2.peerId); - const peerBalancer = new ChainPeersBalancer(peerInfos, [batch0, batch1], custodyConfig); + const peerBalancer = new ChainPeersBalancer(peerInfos, [batch0, batch1], custodyConfig, maxConcurrentRequests); expect(peerBalancer.bestPeerToRetryBatch(batch0)?.peerId).toBe(expected); }); }