Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,6 @@ public static void MatchSnapshot(
postFix,
formatter: CoreFormatters.PlainText);

/// <summary>
/// Matches a snapshot of an execution result, merging an incrementally delivered
/// (<c>@defer</c>/<c>@stream</c>) response into its final aggregated form. The snapshot
/// is identical whether the transport delivered the response across several payloads or
/// as a single bundled payload, so it does not depend on delivery timing.
/// </summary>
public static void MatchAggregatedSnapshot(
this IExecutionResult? value,
string? postFix = null)
=> Snapshot.Match(
value,
postFix,
formatter: SnapshotValueFormatters.ExecutionResultAggregated);

/// <summary>
/// Markdown counterpart of <see cref="MatchAggregatedSnapshot"/>: matches a markdown
/// snapshot of an execution result, merging an incrementally delivered
/// (<c>@defer</c>/<c>@stream</c>) response into its final aggregated form so the snapshot
/// does not depend on whether the transport delivered it incrementally or bundled.
/// </summary>
public static void MatchAggregatedMarkdownSnapshot(
this IExecutionResult? value,
object? postFix = null,
string? extension = null)
=> Snapshot.Create(postFix?.ToString(), extension)
.Add(value, formatter: SnapshotValueFormatters.ExecutionResultAggregated)
.MatchMarkdown();

public static Snapshot AddResult(
Comment thread
glen-84 marked this conversation as resolved.
this Snapshot snapshot,
IExecutionResult result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
using CookieCrumble.Formatters;
using HotChocolate;
using HotChocolate.Execution;
using static CookieCrumble.HotChocolate.Formatters.StableSnapshotHelpers;

namespace CookieCrumble.HotChocolate.Formatters;

internal sealed class ExecutionResultSnapshotValueFormatter(bool alwaysAggregate = false)
internal sealed class ExecutionResultSnapshotValueFormatter
: SnapshotValueFormatter<IExecutionResult>
{
protected override void Format(IBufferWriter<byte> snapshot, IExecutionResult value)
Expand All @@ -18,7 +19,7 @@ protected override void Format(IBufferWriter<byte> snapshot, IExecutionResult va
}
else
{
FormatStreamAsync(snapshot, (IResponseStream)value, alwaysAggregate).Wait();
FormatStreamAsync(snapshot, (IResponseStream)value).Wait();
}
Comment thread
glen-84 marked this conversation as resolved.
}

Expand All @@ -34,73 +35,247 @@ protected override void FormatMarkdown(IBufferWriter<byte> snapshot, IExecutionR
{
snapshot.Append("```text");
snapshot.AppendLine();
FormatStreamAsync(snapshot, (IResponseStream)value, alwaysAggregate).Wait();
FormatStreamAsync(snapshot, (IResponseStream)value).Wait();
}
Comment thread
glen-84 marked this conversation as resolved.

snapshot.AppendLine();
snapshot.Append("```");
snapshot.AppendLine();
}

private static async Task FormatStreamAsync(
private static Task FormatStreamAsync(IBufferWriter<byte> snapshot, IResponseStream stream)
// Only @defer/@stream responses carry the incremental-delivery envelope. Other
// streams (subscriptions, batches) are sequences of independent results and are
// written out verbatim, one payload after another.
=> stream.Kind is ExecutionResultKind.DeferredResult
? FormatIncrementalAsync(snapshot, stream)
: FormatEventStreamAsync(snapshot, stream);

private static async Task FormatEventStreamAsync(IBufferWriter<byte> snapshot, IResponseStream stream)
{
await foreach (var result in stream.ReadResultsAsync().ConfigureAwait(false))
{
snapshot.Append(result.ToJson());
snapshot.AppendLine();
}
}

// Reconstructs a single, delivery-order-independent view of an incrementally
// delivered (@defer/@stream) response: the initial payload's non-protocol
// fields plus the `pending`, `incremental`, and `completed` entries collected
// across every payload and ordered by `id`. The snapshot is identical whether
// the transport bundled the response into one payload or split it across several.
private static async Task FormatIncrementalAsync(
IBufferWriter<byte> snapshot,
IResponseStream stream,
bool alwaysAggregate)
IResponseStream stream)
{
var docs = new List<JsonDocument>();
JsonResultPatcher? patcher = null;
var first = true;
var accumulator = new StreamAccumulator();

try
// StreamAccumulator deep-clones every element it retains, so each parsed
// document is only needed for the duration of its AddPayload call and can be
// disposed immediately afterwards.
await foreach (var result in stream.ReadResultsAsync().ConfigureAwait(false))
{
await foreach (var queryResult in stream.ReadResultsAsync().ConfigureAwait(false))
{
if (first)
{
first = false;
using var document = JsonDocument.Parse(result.ToJson());
accumulator.AddPayload(document.RootElement);
}

// When aggregating, the initial payload seeds the patcher regardless of
// whether more payloads follow. This normalizes a response delivered as a
// single bundled payload to the same merged form as an incrementally
// delivered one, so the snapshot is independent of delivery batching.
if (alwaysAggregate || (queryResult.HasNext ?? false))
{
var doc = JsonDocument.Parse(queryResult.ToJson());
docs.Add(doc);
await using var writer = new Utf8JsonWriter(snapshot, IndentedWriterOptions);
WriteEnvelope(writer, accumulator);
writer.Flush();
snapshot.AppendLine();
}

patcher = new JsonResultPatcher();
patcher.SetResponse(doc);
continue;
}
}
private static void WriteEnvelope(Utf8JsonWriter writer, StreamAccumulator accumulator)
{
writer.WriteStartObject();

if (patcher is null)
// The initial payload's non-protocol fields (`data`, `errors`, `extensions`) in
// their original order; the incremental-delivery fields are rebuilt below.
if (accumulator.InitialPayload is { ValueKind: JsonValueKind.Object } initial)
{
foreach (var property in initial.EnumerateObject())
{
if (IsStreamField(property.Name))
{
snapshot.Append(queryResult.ToJson());
snapshot.AppendLine();
continue;
}
else
{
var doc = JsonDocument.Parse(queryResult.ToJson());
docs.Add(doc);

patcher.ApplyPatch(doc);
writer.WritePropertyName(property.Name);
property.Value.WriteTo(writer);
}
}

WritePending(writer, accumulator.PendingById);
WriteIncremental(writer, accumulator.IncrementalEntries);
WriteCompleted(writer, accumulator.CompletedEntries);

writer.WriteBoolean("hasNext", false);

writer.WriteEndObject();
}

private static void WritePending(
Utf8JsonWriter writer,
Dictionary<string, PendingEntry> pendingById)
{
if (pendingById.Count == 0)
{
return;
}

var pending = pendingById.Values.ToList();
pending.Sort(static (x, y) => CompareIds(x.Id, y.Id));

writer.WritePropertyName("pending");
writer.WriteStartArray();

foreach (var entry in pending)
{
writer.WriteStartObject();
writer.WriteString("id", entry.Id);
writer.WritePropertyName("path");
entry.Path.WriteTo(writer);

if (!string.IsNullOrEmpty(entry.Label))
{
writer.WriteString("label", entry.Label);
}

writer.WriteEndObject();
}

writer.WriteEndArray();
}

private static void WriteIncremental(
Utf8JsonWriter writer,
List<IncrementalEntry> entries)
{
if (entries.Count == 0)
{
return;
}

// Group by `id` so a stream delivered across several payloads collapses into a
// single entry, independent of how many frames the transport used.
var order = new List<string>();
var byId = new Dictionary<string, List<IncrementalEntry>>();

foreach (var entry in entries)
{
if (!byId.TryGetValue(entry.Id, out var group))
{
group = [];
byId.Add(entry.Id, group);
order.Add(entry.Id);
}

group.Add(entry);
}

order.Sort(CompareIds);

writer.WritePropertyName("incremental");
writer.WriteStartArray();

foreach (var id in order)
{
var group = byId[id];

writer.WriteStartObject();
writer.WriteString("id", id);

if (group.FirstOrDefault(e => e.SubPath is not null)?.SubPath is { } subPath)
{
writer.WritePropertyName("subPath");
subPath.WriteTo(writer);
}

if (group.Any(e => e.Items is not null))
{
writer.WritePropertyName("items");
writer.WriteStartArray();
foreach (var entry in group)
{
if (entry.Items is { } items)
{
foreach (var item in items.EnumerateArray())
{
item.WriteTo(writer);
}
}
}
writer.WriteEndArray();
}
else if (group.FirstOrDefault(e => e.Data is not null)?.Data is { } data)
{
writer.WritePropertyName("data");
data.WriteTo(writer);
}

if (patcher is not null)
WriteMergedErrors(writer, group);

writer.WriteEndObject();
}

writer.WriteEndArray();
}

private static void WriteCompleted(
Utf8JsonWriter writer,
List<CompletedEntry> entries)
{
if (entries.Count == 0)
{
return;
}

var completed = entries.ToList();
completed.Sort(static (x, y) => CompareIds(x.Id, y.Id));

writer.WritePropertyName("completed");
writer.WriteStartArray();

foreach (var entry in completed)
{
writer.WriteStartObject();
writer.WriteString("id", entry.Id);

if (entry.Errors is { } errors)
{
patcher.WriteResponse(snapshot);
snapshot.AppendLine();
writer.WritePropertyName("errors");
errors.WriteTo(writer);
}

writer.WriteEndObject();
}
finally

writer.WriteEndArray();
}

private static void WriteMergedErrors(Utf8JsonWriter writer, List<IncrementalEntry> group)
{
if (!group.Any(e => e.Errors is not null))
{
foreach (var doc in docs)
return;
}

writer.WritePropertyName("errors");
writer.WriteStartArray();

foreach (var entry in group)
{
if (entry.Errors is { } errors)
{
doc.Dispose();
foreach (var error in errors.EnumerateArray())
{
error.WriteTo(writer);
}
}
}

writer.WriteEndArray();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ public static class SnapshotValueFormatters
public static ISnapshotValueFormatter ExecutionResult { get; } =
new ExecutionResultSnapshotValueFormatter();

public static ISnapshotValueFormatter ExecutionResultAggregated { get; } =
new ExecutionResultSnapshotValueFormatter(alwaysAggregate: true);

public static ISnapshotValueFormatter ExecutionResultStable { get; } =
new StableExecutionResultSnapshotValueFormatter();
Comment thread
glen-84 marked this conversation as resolved.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ... @defer {
""",
TestContext.Current.CancellationToken);

Assert.IsType<ResponseStream>(result).MatchAggregatedMarkdownSnapshot();
Assert.IsType<ResponseStream>(result).MatchMarkdownSnapshot();
}

/// <summary>
Expand Down Expand Up @@ -232,7 +232,7 @@ ... @defer {
""",
TestContext.Current.CancellationToken);

Assert.IsType<ResponseStream>(result).MatchAggregatedMarkdownSnapshot();
Assert.IsType<ResponseStream>(result).MatchMarkdownSnapshot();
}

// ========================================================================
Expand Down Expand Up @@ -430,7 +430,7 @@ ... @defer {
""",
TestContext.Current.CancellationToken);

Assert.IsType<ResponseStream>(result).MatchAggregatedMarkdownSnapshot();
Assert.IsType<ResponseStream>(result).MatchMarkdownSnapshot();
}

/// <summary>
Expand Down Expand Up @@ -513,7 +513,7 @@ ... @defer {
""",
TestContext.Current.CancellationToken);

Assert.IsType<ResponseStream>(result).MatchAggregatedMarkdownSnapshot();
Assert.IsType<ResponseStream>(result).MatchMarkdownSnapshot();
}

// ========================================================================
Expand Down Expand Up @@ -670,7 +670,7 @@ ... on Hero @defer {
""",
TestContext.Current.CancellationToken);

Assert.IsType<ResponseStream>(result).MatchAggregatedMarkdownSnapshot();
Assert.IsType<ResponseStream>(result).MatchMarkdownSnapshot();
}

/// <summary>
Expand Down
Loading