Skip to content

Commit 7cfa945

Browse files
authored
[QUIC] Cosmetic changes to Read pipeline (#55591)
Follow-up for NITs and cosmetic changes from #55505
1 parent a487e4c commit 7cfa945

1 file changed

Lines changed: 46 additions & 28 deletions

File tree

  • src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic

src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ private sealed class State
3232
{
3333
public SafeMsQuicStreamHandle Handle = null!; // set in ctor.
3434
public GCHandle StateGCHandle;
35+
36+
public MsQuicStream? Stream; // roots the stream in the pinned state to prevent GC during an async read I/O.
3537
public MsQuicConnection.State ConnectionState = null!; // set in ctor.
3638
public string TraceId = null!; // set in ctor.
3739

@@ -48,7 +50,7 @@ private sealed class State
4850
// set when ReadState.PendingRead:
4951
public Memory<byte> ReceiveUserBuffer;
5052
public CancellationTokenRegistration ReceiveCancellationRegistration;
51-
public MsQuicStream? RootedReceiveStream; // roots the stream in the pinned state to prevent GC during an async read I/O.
53+
// Resettable completions to be used for multiple calls to receive.
5254
public readonly ResettableCompletionSource<int> ReceiveResettableCompletionSource = new ResettableCompletionSource<int>();
5355

5456
public SendState SendState;
@@ -363,27 +365,38 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
363365
NetEventSource.Info(_state, $"{TraceId()} Stream reading into Memory of '{destination.Length}' bytes.");
364366
}
365367

366-
ReadState readState;
367-
long abortError = -1;
368-
bool canceledSynchronously = false;
368+
ReadState initialReadState; // value before transitions
369+
long abortError;
370+
bool preCanceled = false;
369371

370372
lock (_state)
371373
{
372-
readState = _state.ReadState;
374+
initialReadState = _state.ReadState;
373375
abortError = _state.ReadErrorCode;
374376

375-
if (readState != ReadState.PendingRead && cancellationToken.IsCancellationRequested)
377+
// Failure scenario: pre-canceled token. Transition: any -> Aborted
378+
// PendingRead state indicates there is another concurrent read operation in flight
379+
// which is forbidden, so it is handled separately
380+
if (initialReadState != ReadState.PendingRead && cancellationToken.IsCancellationRequested)
376381
{
377-
readState = ReadState.Aborted;
382+
initialReadState = ReadState.Aborted;
378383
_state.ReadState = ReadState.Aborted;
379-
canceledSynchronously = true;
384+
preCanceled = true;
385+
}
386+
387+
// Success scenario: EOS already reached, completing synchronously. No transition (final state)
388+
if (initialReadState == ReadState.ReadsCompleted)
389+
{
390+
return new ValueTask<int>(0);
380391
}
381-
else if (readState == ReadState.None)
392+
393+
// Success scenario: no data available yet, will return a task to wait on. Transition None->PendingRead
394+
if (initialReadState == ReadState.None)
382395
{
383-
Debug.Assert(_state.RootedReceiveStream is null);
396+
Debug.Assert(_state.Stream is null);
384397

385398
_state.ReceiveUserBuffer = destination;
386-
_state.RootedReceiveStream = this;
399+
_state.Stream = this;
387400
_state.ReadState = ReadState.PendingRead;
388401

389402
if (cancellationToken.CanBeCanceled)
@@ -396,7 +409,7 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
396409
lock (state)
397410
{
398411
completePendingRead = state.ReadState == ReadState.PendingRead;
399-
state.RootedReceiveStream = null;
412+
state.Stream = null;
400413
state.ReceiveUserBuffer = null;
401414
state.ReadState = ReadState.Aborted;
402415
}
@@ -414,7 +427,9 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
414427

415428
return _state.ReceiveResettableCompletionSource.GetValueTask();
416429
}
417-
else if (readState == ReadState.IndividualReadComplete)
430+
431+
// Success scenario: data already available, completing synchronously. Transition IndividualReadComplete->None
432+
if (initialReadState == ReadState.IndividualReadComplete)
418433
{
419434
_state.ReadState = ReadState.None;
420435

@@ -431,25 +446,22 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
431446
}
432447
}
433448

449+
// All success scenarios returned at this point. Failure scenarios below:
450+
434451
Exception? ex = null;
435452

436-
switch (readState)
453+
switch (initialReadState)
437454
{
438-
case ReadState.ReadsCompleted:
439-
return new ValueTask<int>(0);
440455
case ReadState.PendingRead:
441456
ex = new InvalidOperationException("Only one read is supported at a time.");
442457
break;
443458
case ReadState.Aborted:
444-
ex =
445-
canceledSynchronously ? new OperationCanceledException(cancellationToken) : // aborted by token being canceled before the async op started.
446-
abortError == -1 ? new QuicOperationAbortedException() : // aborted by user via some other operation.
447-
new QuicStreamAbortedException(abortError); // aborted by peer.
448-
459+
ex = preCanceled ? new OperationCanceledException(cancellationToken) :
460+
ThrowHelper.GetStreamAbortedException(abortError);
449461
break;
450462
case ReadState.ConnectionClosed:
451463
default:
452-
Debug.Assert(readState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{readState}' is unaccounted for in {nameof(ReadAsync)}.");
464+
Debug.Assert(initialReadState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{initialReadState}' is unaccounted for in {nameof(ReadAsync)}.");
453465
ex = GetConnectionAbortedException(_state);
454466
break;
455467
}
@@ -490,7 +502,7 @@ internal override void AbortRead(long errorCode)
490502
if (_state.ReadState == ReadState.PendingRead)
491503
{
492504
shouldComplete = true;
493-
_state.RootedReceiveStream = null;
505+
_state.Stream = null;
494506
_state.ReceiveUserBuffer = null;
495507
}
496508
if (_state.ReadState < ReadState.ReadsCompleted)
@@ -833,7 +845,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
833845
if (receiveEvent.BufferCount == 0)
834846
{
835847
// This is a 0-length receive that happens once reads are finished (via abort or otherwise).
836-
// State changes for this are handled elsewhere.
848+
// State changes for this are handled in PEER_SEND_SHUTDOWN / PEER_SEND_ABORT / SHUTDOWN_COMPLETE event handlers.
837849
return MsQuicStatusCodes.Success;
838850
}
839851

@@ -847,6 +859,12 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
847859
case ReadState.None:
848860
// ReadAsync() hasn't been called yet. Stash the buffer so the next ReadAsync call completes synchronously.
849861

862+
// We are overwriting state.ReceiveQuicBuffers here even if we only partially consumed them
863+
// and it is intended, because unconsumed data will arrive again from the point we've stopped.
864+
// New RECEIVE event wouldn't come until we call EnableReceive(), and we call it only after we've consumed
865+
// as much as we could and said so to msquic in ReceiveComplete(taken), so new event will have all the
866+
// remaining data.
867+
850868
if ((uint)state.ReceiveQuicBuffers.Length < receiveEvent.BufferCount)
851869
{
852870
QuicBuffer[] oldReceiveBuffers = state.ReceiveQuicBuffers;
@@ -872,7 +890,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
872890

873891
state.ReceiveCancellationRegistration.Unregister();
874892
shouldComplete = true;
875-
state.RootedReceiveStream = null;
893+
state.Stream = null;
876894
state.ReadState = ReadState.None;
877895

878896
readLength = CopyMsQuicBuffersToUserBuffer(new ReadOnlySpan<QuicBuffer>(receiveEvent.Buffers, (int)receiveEvent.BufferCount), state.ReceiveUserBuffer.Span);
@@ -975,7 +993,7 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt
975993
if (state.ReadState == ReadState.PendingRead)
976994
{
977995
shouldReadComplete = true;
978-
state.RootedReceiveStream = null;
996+
state.Stream = null;
979997
state.ReceiveUserBuffer = null;
980998
}
981999
if (state.ReadState < ReadState.ReadsCompleted)
@@ -1029,7 +1047,7 @@ private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt)
10291047
if (state.ReadState == ReadState.PendingRead)
10301048
{
10311049
shouldComplete = true;
1032-
state.RootedReceiveStream = null;
1050+
state.Stream = null;
10331051
state.ReceiveUserBuffer = null;
10341052
}
10351053
state.ReadState = ReadState.Aborted;
@@ -1057,7 +1075,7 @@ private static uint HandleEventPeerSendShutdown(State state)
10571075
if (state.ReadState == ReadState.PendingRead)
10581076
{
10591077
shouldComplete = true;
1060-
state.RootedReceiveStream = null;
1078+
state.Stream = null;
10611079
state.ReceiveUserBuffer = null;
10621080
}
10631081
if (state.ReadState < ReadState.ReadsCompleted)

0 commit comments

Comments
 (0)