Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions src/Orleans.Runtime/Diagnostics/GrainDirectoryEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Orleans.Runtime.GrainDirectory;

namespace Orleans.Runtime.Diagnostics;

internal static class GrainDirectoryEvents
{
internal const string ListenerName = "Orleans.GrainDirectory";
internal const string AcquireOperationName = "acquire";
internal const string ReleaseOperationName = "release";

private static readonly DiagnosticListener Listener = new(ListenerName);

internal static IObservable<GrainDirectoryEvent> AllEvents { get; } = new Observable();

internal abstract class GrainDirectoryEvent(
SiloAddress siloAddress,
int partitionIndex,
MembershipVersion version,
RingRange range)
{
public readonly SiloAddress SiloAddress = siloAddress;
public readonly int PartitionIndex = partitionIndex;
public readonly MembershipVersion Version = version;
public readonly RingRange Range = range;
}

internal abstract class RangeOperationEvent(
SiloAddress siloAddress,
int partitionIndex,
MembershipVersion version,
RingRange range,
string operationName) : GrainDirectoryEvent(siloAddress, partitionIndex, version, range)
{
public readonly string OperationName = operationName;
}

internal sealed class RangeOperationStarted(
SiloAddress siloAddress,
int partitionIndex,
MembershipVersion version,
RingRange range,
string operationName) : RangeOperationEvent(siloAddress, partitionIndex, version, range, operationName);

internal sealed class RangeOperationCompleted(
SiloAddress siloAddress,
int partitionIndex,
MembershipVersion version,
RingRange range,
string operationName,
TimeSpan heldDuration,
bool canceled) : RangeOperationEvent(siloAddress, partitionIndex, version, range, operationName)
{
public readonly TimeSpan HeldDuration = heldDuration;
public readonly bool Canceled = canceled;
}

internal static void EmitRangeOperationStarted(
SiloAddress siloAddress,
int partitionIndex,
MembershipVersion version,
RingRange range,
string operationName)
{
if (!Listener.IsEnabled(nameof(RangeOperationStarted)))
{
return;
}

Emit(siloAddress, partitionIndex, version, range, operationName);

[MethodImpl(MethodImplOptions.NoInlining)]
static void Emit(SiloAddress siloAddress, int partitionIndex, MembershipVersion version, RingRange range, string operationName)
{
Listener.Write(nameof(RangeOperationStarted), new RangeOperationStarted(siloAddress, partitionIndex, version, range, operationName));
}
}

internal static void EmitRangeOperationCompleted(
SiloAddress siloAddress,
int partitionIndex,
MembershipVersion version,
RingRange range,
string operationName,
TimeSpan heldDuration,
bool canceled)
{
if (!Listener.IsEnabled(nameof(RangeOperationCompleted)))
{
return;
}

Emit(siloAddress, partitionIndex, version, range, operationName, heldDuration, canceled);

[MethodImpl(MethodImplOptions.NoInlining)]
static void Emit(
SiloAddress siloAddress,
int partitionIndex,
MembershipVersion version,
RingRange range,
string operationName,
TimeSpan heldDuration,
bool canceled)
{
Listener.Write(nameof(RangeOperationCompleted), new RangeOperationCompleted(
siloAddress,
partitionIndex,
version,
range,
operationName,
heldDuration,
canceled));
}
}

private sealed class Observable : IObservable<GrainDirectoryEvent>
{
public IDisposable Subscribe(IObserver<GrainDirectoryEvent> observer) => Listener.Subscribe(new Observer(observer));

private sealed class Observer(IObserver<GrainDirectoryEvent> observer) : IObserver<KeyValuePair<string, object?>>
{
public void OnCompleted() => observer.OnCompleted();

public void OnError(Exception error) => observer.OnError(error);

public void OnNext(KeyValuePair<string, object?> value)
{
if (value.Value is GrainDirectoryEvent evt)
{
observer.OnNext(evt);
}
}
}
}
}
17 changes: 11 additions & 6 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.Internal;
using Orleans.Runtime.Diagnostics;
using Orleans.Runtime.Scheduler;
using Orleans.Runtime.Utilities;

Expand Down Expand Up @@ -345,7 +346,7 @@ private void ProcessMembershipUpdate(DirectoryMembershipSnapshot current)
private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, DirectoryMembershipSnapshot current, RingRange removedRange)
{
GrainRuntime.CheckRuntimeContext(this);
var (tcs, sw) = LockRange(removedRange, current.Version);
var (tcs, sw) = LockRange(removedRange, current.Version, GrainDirectoryEvents.ReleaseOperationName);
LogDebugRelinquishingOwnership(_logger, removedRange, current.Version);

try
Expand Down Expand Up @@ -407,7 +408,7 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc
}
finally
{
UnlockRange(removedRange, current.Version, tcs, sw.Elapsed, "release");
UnlockRange(removedRange, current.Version, tcs, sw.Elapsed, GrainDirectoryEvents.ReleaseOperationName);
}
}

Expand All @@ -416,7 +417,7 @@ private async Task AcquireRangeAsync(DirectoryMembershipSnapshot previous, Direc
GrainRuntime.CheckRuntimeContext(this);
// Suspend the range and transfer state from the previous owners.
// If the predecessor becomes unavailable or membership advances quickly, we will declare data loss and unlock the range.
var (tcs, sw) = LockRange(addedRange, current.Version);
var (tcs, sw) = LockRange(addedRange, current.Version, GrainDirectoryEvents.AcquireOperationName);

try
{
Expand Down Expand Up @@ -475,21 +476,23 @@ private async Task AcquireRangeAsync(DirectoryMembershipSnapshot previous, Direc
}
finally
{
UnlockRange(addedRange, current.Version, tcs, sw.Elapsed, "acquire");
UnlockRange(addedRange, current.Version, tcs, sw.Elapsed, GrainDirectoryEvents.AcquireOperationName);
}
}

private (TaskCompletionSource Lock, ValueStopwatch Stopwatch) LockRange(RingRange range, MembershipVersion version)
private (TaskCompletionSource Lock, ValueStopwatch Stopwatch) LockRange(RingRange range, MembershipVersion version, string operationName)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_rangeLocks.Add((range, version, tcs));
GrainDirectoryEvents.EmitRangeOperationStarted(_id, _partitionIndex, version, range, operationName);
return (tcs, ValueStopwatch.StartNew());
}

private void UnlockRange(RingRange range, MembershipVersion version, TaskCompletionSource tcs, TimeSpan heldDuration, string operationName)
{
DirectoryInstruments.RangeLockHeldDuration.Record((long)heldDuration.TotalMilliseconds);
if (ShutdownToken.IsCancellationRequested)
var canceled = ShutdownToken.IsCancellationRequested;
if (canceled)
{
// If the partition is stopped, the range is never unlocked and the task is cancelled instead.
tcs.SetCanceled(ShutdownToken);
Expand All @@ -499,6 +502,8 @@ private void UnlockRange(RingRange range, MembershipVersion version, TaskComplet
tcs.SetResult();
_rangeLocks.Remove((range, version, tcs));
}

GrainDirectoryEvents.EmitRangeOperationCompleted(_id, _partitionIndex, version, range, operationName, heldDuration, canceled);
}

private async Task<bool> TransferSnapshotAsync(
Expand Down
Loading
Loading