From fb90ca8fcfd75c252cc87289b13a81a42ce7b5ff Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Wed, 25 Feb 2026 11:39:17 +0000 Subject: [PATCH] Fix flaky Fusion operation-plan single-flight tests --- .../OperationPlanSingleFlightTests.cs | 166 +++++++++++++----- 1 file changed, 121 insertions(+), 45 deletions(-) diff --git a/src/HotChocolate/Fusion-vnext/test/Fusion.Execution.Tests/Execution/OperationPlanSingleFlightTests.cs b/src/HotChocolate/Fusion-vnext/test/Fusion.Execution.Tests/Execution/OperationPlanSingleFlightTests.cs index 4ce97c65cec..7bd1d835b1a 100644 --- a/src/HotChocolate/Fusion-vnext/test/Fusion.Execution.Tests/Execution/OperationPlanSingleFlightTests.cs +++ b/src/HotChocolate/Fusion-vnext/test/Fusion.Execution.Tests/Execution/OperationPlanSingleFlightTests.cs @@ -1,7 +1,7 @@ using System.Collections.Concurrent; -using System.Diagnostics.Tracing; using HotChocolate.Collections.Immutable; using HotChocolate.Execution; +using HotChocolate.Fusion.Diagnostics; using HotChocolate.Fusion.Execution.Nodes; using HotChocolate.Fusion.Planning; using Microsoft.Extensions.DependencyInjection; @@ -16,13 +16,14 @@ public async Task Concurrent_Same_Operation_Should_Be_Coalesced_To_One_Planning_ // arrange const int requestCount = 8; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - using var listener = new PlannerEventListener(); + var listener = new PlanningCountDiagnosticListener(); var operationIds = new ConcurrentBag(); var gate = new RequestGate(requestCount); var executor = await new ServiceCollection() .AddGraphQLGateway() .UseDefaultPipeline() + .AddDiagnosticEventListener(_ => listener) .InsertUseRequest( before: WellKnownRequestMiddleware.OperationPlanCacheMiddleware, (_, next) => CreateGateMiddleware(next, gate)) @@ -61,7 +62,7 @@ query SameOpCoalesce { Assert.All(results, t => Assert.Empty(t.ExpectOperationResult().Errors)); var operationId = Assert.Single(operationIds.Distinct()); - Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, operationId)); + Assert.Equal(1, listener.PlanStartCount(operationId)); } [Fact] @@ -69,13 +70,14 @@ public async Task Concurrent_Distinct_Operations_Should_Not_Be_Coalesced() { // arrange using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - using var listener = new PlannerEventListener(); + var listener = new PlanningCountDiagnosticListener(); var operationIds = new ConcurrentBag(); var gate = new RequestGate(expectedRequests: 2); var executor = await new ServiceCollection() .AddGraphQLGateway() .UseDefaultPipeline() + .AddDiagnosticEventListener(_ => listener) .InsertUseRequest( before: WellKnownRequestMiddleware.OperationPlanCacheMiddleware, (_, next) => CreateGateMiddleware(next, gate)) @@ -119,7 +121,7 @@ query DistinctOpTwo { var ids = operationIds.Distinct().ToArray(); Assert.Equal(2, ids.Length); - Assert.All(ids, id => Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, id))); + Assert.All(ids, id => Assert.Equal(1, listener.PlanStartCount(id))); } [Fact] @@ -127,20 +129,22 @@ public async Task Leader_Planning_Failure_Should_Be_Observed_By_Followers() { // arrange using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - using var listener = new PlannerEventListener(); + var listener = new PlanningCountDiagnosticListener(); var operationIds = new ConcurrentBag(); - var gate = new RequestGate(expectedRequests: 2); + var leaderGate = new SingleFlightLeaderGate(); + var secondRequestObserver = new SecondRequestObserver(); var executor = await new ServiceCollection() .AddGraphQLGateway() .UseDefaultPipeline() + .AddDiagnosticEventListener(_ => listener) .ModifyPlannerOptions(o => o.MaxPlanningTime = TimeSpan.FromTicks(1)) .InsertUseRequest( before: WellKnownRequestMiddleware.OperationPlanCacheMiddleware, - (_, next) => CreateGateMiddleware(next, gate)) + (_, next) => CreateSecondRequestEnteredDownstreamMiddleware(next, secondRequestObserver)) .InsertUseRequest( before: WellKnownRequestMiddleware.OperationPlanMiddleware, - (_, next) => CreateSingleFlightLeaderDelayMiddleware(next, TimeSpan.FromMilliseconds(100))) + (_, next) => CreateSingleFlightLeaderBlockMiddleware(next, leaderGate)) .InsertUseRequest( before: WellKnownRequestMiddleware.OperationPlanMiddleware, (_, next) => CreateOperationIdCaptureMiddleware(next, operationIds)) @@ -163,16 +167,23 @@ query FailureCoalesce { """; // act + var leaderTask = executor.ExecuteAsync(operationText, cts.Token); + await leaderGate.WaitForEntryAsync(cts.Token); + + var followerTask = executor.ExecuteAsync(operationText, cts.Token); + await secondRequestObserver.WaitForSecondRequestEnteredDownstreamAsync(cts.Token); + leaderGate.Release(); + var results = await Task.WhenAll( - executor.ExecuteAsync(operationText, cts.Token), - executor.ExecuteAsync(operationText, cts.Token)); + leaderTask, + followerTask); // assert Assert.All(results, t => Assert.NotEmpty(t.ExpectOperationResult().Errors)); var operationId = Assert.Single(operationIds.Distinct()); - Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, operationId)); - Assert.Equal(1, listener.Count(PlannerEventSource.PlanErrorEventId, operationId)); + Assert.Equal(1, listener.PlanStartCount(operationId)); + Assert.Equal(1, listener.PlanErrorCount(operationId)); } [Fact] @@ -181,13 +192,14 @@ public async Task Follower_Cancellation_Should_Not_Cancel_Leader_Planning() // arrange using var leaderCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); using var followerCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(150)); - using var listener = new PlannerEventListener(); + var listener = new PlanningCountDiagnosticListener(); var operationIds = new ConcurrentBag(); var blockingInterceptor = new BlockingPlannerInterceptor(); var executor = await new ServiceCollection() .AddGraphQLGateway() .UseDefaultPipeline() + .AddDiagnosticEventListener(_ => listener) .AddOperationPlannerInterceptor(_ => blockingInterceptor) .InsertUseRequest( before: WellKnownRequestMiddleware.OperationPlanMiddleware, @@ -241,7 +253,7 @@ query CancelFollowerOnly { Assert.Empty(leaderResult.ExpectOperationResult().Errors); var operationId = Assert.Single(operationIds.Distinct()); - Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, operationId)); + Assert.Equal(1, listener.PlanStartCount(operationId)); } private static RequestDelegate CreateGateMiddleware( @@ -275,6 +287,45 @@ private static RequestDelegate CreateSingleFlightLeaderDelayMiddleware( await next(context); }; + private static RequestDelegate CreateSingleFlightLeaderBlockMiddleware( + RequestDelegate next, + SingleFlightLeaderGate gate) + => async context => + { + if (context.Features.Get>() is not null) + { + gate.SignalEntry(); + await gate.WaitForReleaseAsync(context.RequestAborted); + } + + await next(context); + }; + + private static RequestDelegate CreateSecondRequestEnteredDownstreamMiddleware( + RequestDelegate next, + SecondRequestObserver observer) + => async context => + { + if (!observer.IsSecondRequest()) + { + await next(context); + return; + } + + ValueTask execution; + + try + { + execution = next(context); + } + finally + { + observer.SignalSecondRequestEnteredDownstream(); + } + + await execution; + }; + private static RequestDelegate CreatePlanCaptureMiddleware() => context => { @@ -322,44 +373,69 @@ public void OnAfterPlanCompleted( } } - private sealed class PlannerEventListener : EventListener + private sealed class SingleFlightLeaderGate + { + private readonly TaskCompletionSource _entered = + new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _release = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + public void SignalEntry() + => _entered.TrySetResult(); + + public void Release() + => _release.TrySetResult(); + + public ValueTask WaitForEntryAsync(CancellationToken cancellationToken) + => new(_entered.Task.WaitAsync(cancellationToken)); + + public ValueTask WaitForReleaseAsync(CancellationToken cancellationToken) + => new(_release.Task.WaitAsync(cancellationToken)); + } + + private sealed class SecondRequestObserver + { + private readonly TaskCompletionSource _secondRequestEnteredDownstream = + new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _requestCount; + + public bool IsSecondRequest() + => Interlocked.Increment(ref _requestCount) == 2; + + public void SignalSecondRequestEnteredDownstream() + => _secondRequestEnteredDownstream.TrySetResult(); + + public ValueTask WaitForSecondRequestEnteredDownstreamAsync(CancellationToken cancellationToken) + => new(_secondRequestEnteredDownstream.Task.WaitAsync(cancellationToken)); + } + + private sealed class PlanningCountDiagnosticListener : FusionExecutionDiagnosticEventListener { - private readonly ConcurrentQueue _events = []; + private readonly ConcurrentDictionary _planStarts = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _planErrors = new(StringComparer.Ordinal); - protected override void OnEventSourceCreated(EventSource eventSource) + public override IDisposable PlanOperation(RequestContext context, string operationPlanId) { - if (eventSource.Name.Equals(PlannerEventSource.EventSourceName, StringComparison.Ordinal)) - { - EnableEvents(eventSource, EventLevel.Informational, EventKeywords.All); - } + _planStarts.AddOrUpdate(operationPlanId, 1, static (_, count) => count + 1); + return EmptyScope; } - protected override void OnEventWritten(EventWrittenEventArgs eventData) + public override void PlanOperationError( + RequestContext context, + string operationId, + Exception error) { - if (!eventData.EventSource.Name.Equals(PlannerEventSource.EventSourceName, StringComparison.Ordinal)) - { - return; - } - - _events.Enqueue( - new CapturedEvent( - eventData.EventId, - eventData.Payload is null - ? [] - : [.. eventData.Payload])); + _planErrors.AddOrUpdate(operationId, 1, static (_, count) => count + 1); } - public int Count(int eventId, string operationId) - => _events.Count(t => t.EventId == eventId && t.HasOperationId(operationId)); - } + public int PlanStartCount(string operationId) + => _planStarts.TryGetValue(operationId, out var count) + ? count + : 0; - private sealed record CapturedEvent( - int EventId, - IReadOnlyList Payload) - { - public bool HasOperationId(string operationId) - => Payload.Count > 0 - && Payload[0] is string payloadOperationId - && payloadOperationId.Equals(operationId, StringComparison.Ordinal); + public int PlanErrorCount(string operationId) + => _planErrors.TryGetValue(operationId, out var count) + ? count + : 0; } }