Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Collections.Concurrent;
using System.Diagnostics.Tracing;
using HotChocolate.Collections.Immutable;
using HotChocolate.Execution;
using HotChocolate.Fusion.Diagnostics;
using HotChocolate.Fusion.Execution.Nodes;
using HotChocolate.Fusion.Planning;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -16,13 +16,14 @@ public async Task Concurrent_Same_Operation_Should_Be_Coalesced_To_One_Planning_
// arrange
const int requestCount = 8;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var listener = new PlannerEventListener();
var listener = new PlanningCountDiagnosticListener();
var operationIds = new ConcurrentBag<string>();
var gate = new RequestGate(requestCount);

var executor = await new ServiceCollection()
.AddGraphQLGateway()
.UseDefaultPipeline()
.AddDiagnosticEventListener(_ => listener)
.InsertUseRequest(
before: WellKnownRequestMiddleware.OperationPlanCacheMiddleware,
(_, next) => CreateGateMiddleware(next, gate))
Expand Down Expand Up @@ -61,21 +62,22 @@ query SameOpCoalesce {
Assert.All(results, t => Assert.Empty(t.ExpectOperationResult().Errors));

var operationId = Assert.Single(operationIds.Distinct());
Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, operationId));
Assert.Equal(1, listener.PlanStartCount(operationId));
}

[Fact]
public async Task Concurrent_Distinct_Operations_Should_Not_Be_Coalesced()
{
// arrange
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var listener = new PlannerEventListener();
var listener = new PlanningCountDiagnosticListener();
var operationIds = new ConcurrentBag<string>();
var gate = new RequestGate(expectedRequests: 2);

var executor = await new ServiceCollection()
.AddGraphQLGateway()
.UseDefaultPipeline()
.AddDiagnosticEventListener(_ => listener)
.InsertUseRequest(
before: WellKnownRequestMiddleware.OperationPlanCacheMiddleware,
(_, next) => CreateGateMiddleware(next, gate))
Expand Down Expand Up @@ -119,28 +121,30 @@ query DistinctOpTwo {

var ids = operationIds.Distinct().ToArray();
Assert.Equal(2, ids.Length);
Assert.All(ids, id => Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, id)));
Assert.All(ids, id => Assert.Equal(1, listener.PlanStartCount(id)));
}

[Fact]
public async Task Leader_Planning_Failure_Should_Be_Observed_By_Followers()
{
// arrange
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var listener = new PlannerEventListener();
var listener = new PlanningCountDiagnosticListener();
var operationIds = new ConcurrentBag<string>();
var gate = new RequestGate(expectedRequests: 2);
var leaderGate = new SingleFlightLeaderGate();
var secondRequestObserver = new SecondRequestObserver();

var executor = await new ServiceCollection()
.AddGraphQLGateway()
.UseDefaultPipeline()
.AddDiagnosticEventListener(_ => listener)
.ModifyPlannerOptions(o => o.MaxPlanningTime = TimeSpan.FromTicks(1))
.InsertUseRequest(
before: WellKnownRequestMiddleware.OperationPlanCacheMiddleware,
(_, next) => CreateGateMiddleware(next, gate))
(_, next) => CreateSecondRequestEnteredDownstreamMiddleware(next, secondRequestObserver))
.InsertUseRequest(
before: WellKnownRequestMiddleware.OperationPlanMiddleware,
(_, next) => CreateSingleFlightLeaderDelayMiddleware(next, TimeSpan.FromMilliseconds(100)))
(_, next) => CreateSingleFlightLeaderBlockMiddleware(next, leaderGate))
.InsertUseRequest(
before: WellKnownRequestMiddleware.OperationPlanMiddleware,
(_, next) => CreateOperationIdCaptureMiddleware(next, operationIds))
Expand All @@ -163,16 +167,23 @@ query FailureCoalesce {
""";

// act
var leaderTask = executor.ExecuteAsync(operationText, cts.Token);
await leaderGate.WaitForEntryAsync(cts.Token);

var followerTask = executor.ExecuteAsync(operationText, cts.Token);
await secondRequestObserver.WaitForSecondRequestEnteredDownstreamAsync(cts.Token);
leaderGate.Release();

var results = await Task.WhenAll(
executor.ExecuteAsync(operationText, cts.Token),
executor.ExecuteAsync(operationText, cts.Token));
leaderTask,
followerTask);

// assert
Assert.All(results, t => Assert.NotEmpty(t.ExpectOperationResult().Errors));

var operationId = Assert.Single(operationIds.Distinct());
Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, operationId));
Assert.Equal(1, listener.Count(PlannerEventSource.PlanErrorEventId, operationId));
Assert.Equal(1, listener.PlanStartCount(operationId));
Assert.Equal(1, listener.PlanErrorCount(operationId));
}

[Fact]
Expand All @@ -181,13 +192,14 @@ public async Task Follower_Cancellation_Should_Not_Cancel_Leader_Planning()
// arrange
using var leaderCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var followerCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(150));
using var listener = new PlannerEventListener();
var listener = new PlanningCountDiagnosticListener();
var operationIds = new ConcurrentBag<string>();
var blockingInterceptor = new BlockingPlannerInterceptor();

var executor = await new ServiceCollection()
.AddGraphQLGateway()
.UseDefaultPipeline()
.AddDiagnosticEventListener(_ => listener)
.AddOperationPlannerInterceptor(_ => blockingInterceptor)
.InsertUseRequest(
before: WellKnownRequestMiddleware.OperationPlanMiddleware,
Expand Down Expand Up @@ -241,7 +253,7 @@ query CancelFollowerOnly {
Assert.Empty(leaderResult.ExpectOperationResult().Errors);

var operationId = Assert.Single(operationIds.Distinct());
Assert.Equal(1, listener.Count(PlannerEventSource.PlanStartEventId, operationId));
Assert.Equal(1, listener.PlanStartCount(operationId));
}

private static RequestDelegate CreateGateMiddleware(
Expand Down Expand Up @@ -275,6 +287,45 @@ private static RequestDelegate CreateSingleFlightLeaderDelayMiddleware(
await next(context);
};

private static RequestDelegate CreateSingleFlightLeaderBlockMiddleware(
RequestDelegate next,
SingleFlightLeaderGate gate)
=> async context =>
{
if (context.Features.Get<TaskCompletionSource<OperationPlan>>() is not null)
{
gate.SignalEntry();
await gate.WaitForReleaseAsync(context.RequestAborted);
}

await next(context);
};

private static RequestDelegate CreateSecondRequestEnteredDownstreamMiddleware(
RequestDelegate next,
SecondRequestObserver observer)
=> async context =>
{
if (!observer.IsSecondRequest())
{
await next(context);
return;
}

ValueTask execution;

try
{
execution = next(context);
}
finally
{
observer.SignalSecondRequestEnteredDownstream();
}

await execution;
};

private static RequestDelegate CreatePlanCaptureMiddleware()
=> context =>
{
Expand Down Expand Up @@ -322,44 +373,69 @@ public void OnAfterPlanCompleted(
}
}

private sealed class PlannerEventListener : EventListener
private sealed class SingleFlightLeaderGate
{
private readonly TaskCompletionSource _entered =
new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _release =
new(TaskCreationOptions.RunContinuationsAsynchronously);

public void SignalEntry()
=> _entered.TrySetResult();

public void Release()
=> _release.TrySetResult();

public ValueTask WaitForEntryAsync(CancellationToken cancellationToken)
=> new(_entered.Task.WaitAsync(cancellationToken));

public ValueTask WaitForReleaseAsync(CancellationToken cancellationToken)
=> new(_release.Task.WaitAsync(cancellationToken));
}

private sealed class SecondRequestObserver
{
private readonly TaskCompletionSource _secondRequestEnteredDownstream =
new(TaskCreationOptions.RunContinuationsAsynchronously);
private int _requestCount;

public bool IsSecondRequest()
=> Interlocked.Increment(ref _requestCount) == 2;

public void SignalSecondRequestEnteredDownstream()
=> _secondRequestEnteredDownstream.TrySetResult();

public ValueTask WaitForSecondRequestEnteredDownstreamAsync(CancellationToken cancellationToken)
=> new(_secondRequestEnteredDownstream.Task.WaitAsync(cancellationToken));
}

private sealed class PlanningCountDiagnosticListener : FusionExecutionDiagnosticEventListener
{
private readonly ConcurrentQueue<CapturedEvent> _events = [];
private readonly ConcurrentDictionary<string, int> _planStarts = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, int> _planErrors = new(StringComparer.Ordinal);

protected override void OnEventSourceCreated(EventSource eventSource)
public override IDisposable PlanOperation(RequestContext context, string operationPlanId)
{
if (eventSource.Name.Equals(PlannerEventSource.EventSourceName, StringComparison.Ordinal))
{
EnableEvents(eventSource, EventLevel.Informational, EventKeywords.All);
}
_planStarts.AddOrUpdate(operationPlanId, 1, static (_, count) => count + 1);
return EmptyScope;
}

protected override void OnEventWritten(EventWrittenEventArgs eventData)
public override void PlanOperationError(
RequestContext context,
string operationId,
Exception error)
{
if (!eventData.EventSource.Name.Equals(PlannerEventSource.EventSourceName, StringComparison.Ordinal))
{
return;
}

_events.Enqueue(
new CapturedEvent(
eventData.EventId,
eventData.Payload is null
? []
: [.. eventData.Payload]));
_planErrors.AddOrUpdate(operationId, 1, static (_, count) => count + 1);
}

public int Count(int eventId, string operationId)
=> _events.Count(t => t.EventId == eventId && t.HasOperationId(operationId));
}
public int PlanStartCount(string operationId)
=> _planStarts.TryGetValue(operationId, out var count)
? count
: 0;

private sealed record CapturedEvent(
int EventId,
IReadOnlyList<object?> Payload)
{
public bool HasOperationId(string operationId)
=> Payload.Count > 0
&& Payload[0] is string payloadOperationId
&& payloadOperationId.Equals(operationId, StringComparison.Ordinal);
public int PlanErrorCount(string operationId)
=> _planErrors.TryGetValue(operationId, out var count)
? count
: 0;
}
}
Loading