diff --git a/src/Orleans.Runtime/Diagnostics/GrainDirectoryEvents.cs b/src/Orleans.Runtime/Diagnostics/GrainDirectoryEvents.cs new file mode 100644 index 00000000000..dc2536a8142 --- /dev/null +++ b/src/Orleans.Runtime/Diagnostics/GrainDirectoryEvents.cs @@ -0,0 +1,136 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using Orleans.Runtime.GrainDirectory; + +namespace Orleans.Runtime.Diagnostics; + +internal static class GrainDirectoryEvents +{ + internal const string ListenerName = "Orleans.GrainDirectory"; + internal const string AcquireOperationName = "acquire"; + internal const string ReleaseOperationName = "release"; + + private static readonly DiagnosticListener Listener = new(ListenerName); + + internal static IObservable AllEvents { get; } = new Observable(); + + internal abstract class GrainDirectoryEvent( + SiloAddress siloAddress, + int partitionIndex, + MembershipVersion version, + RingRange range) + { + public readonly SiloAddress SiloAddress = siloAddress; + public readonly int PartitionIndex = partitionIndex; + public readonly MembershipVersion Version = version; + public readonly RingRange Range = range; + } + + internal abstract class RangeOperationEvent( + SiloAddress siloAddress, + int partitionIndex, + MembershipVersion version, + RingRange range, + string operationName) : GrainDirectoryEvent(siloAddress, partitionIndex, version, range) + { + public readonly string OperationName = operationName; + } + + internal sealed class RangeOperationStarted( + SiloAddress siloAddress, + int partitionIndex, + MembershipVersion version, + RingRange range, + string operationName) : RangeOperationEvent(siloAddress, partitionIndex, version, range, operationName); + + internal sealed class RangeOperationCompleted( + SiloAddress siloAddress, + int partitionIndex, + MembershipVersion version, + RingRange range, + string operationName, + TimeSpan heldDuration, + bool canceled) : RangeOperationEvent(siloAddress, partitionIndex, version, range, operationName) + { + public readonly TimeSpan HeldDuration = heldDuration; + public readonly bool Canceled = canceled; + } + + internal static void EmitRangeOperationStarted( + SiloAddress siloAddress, + int partitionIndex, + MembershipVersion version, + RingRange range, + string operationName) + { + if (!Listener.IsEnabled(nameof(RangeOperationStarted))) + { + return; + } + + Emit(siloAddress, partitionIndex, version, range, operationName); + + [MethodImpl(MethodImplOptions.NoInlining)] + static void Emit(SiloAddress siloAddress, int partitionIndex, MembershipVersion version, RingRange range, string operationName) + { + Listener.Write(nameof(RangeOperationStarted), new RangeOperationStarted(siloAddress, partitionIndex, version, range, operationName)); + } + } + + internal static void EmitRangeOperationCompleted( + SiloAddress siloAddress, + int partitionIndex, + MembershipVersion version, + RingRange range, + string operationName, + TimeSpan heldDuration, + bool canceled) + { + if (!Listener.IsEnabled(nameof(RangeOperationCompleted))) + { + return; + } + + Emit(siloAddress, partitionIndex, version, range, operationName, heldDuration, canceled); + + [MethodImpl(MethodImplOptions.NoInlining)] + static void Emit( + SiloAddress siloAddress, + int partitionIndex, + MembershipVersion version, + RingRange range, + string operationName, + TimeSpan heldDuration, + bool canceled) + { + Listener.Write(nameof(RangeOperationCompleted), new RangeOperationCompleted( + siloAddress, + partitionIndex, + version, + range, + operationName, + heldDuration, + canceled)); + } + } + + private sealed class Observable : IObservable + { + public IDisposable Subscribe(IObserver observer) => Listener.Subscribe(new Observer(observer)); + + private sealed class Observer(IObserver observer) : IObserver> + { + public void OnCompleted() => observer.OnCompleted(); + + public void OnError(Exception error) => observer.OnError(error); + + public void OnNext(KeyValuePair value) + { + if (value.Value is GrainDirectoryEvent evt) + { + observer.OnNext(evt); + } + } + } + } +} diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs index 7c876932c6c..9f1ebcc87aa 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs @@ -10,6 +10,7 @@ using Microsoft.Extensions.Logging; using Orleans.Concurrency; using Orleans.Internal; +using Orleans.Runtime.Diagnostics; using Orleans.Runtime.Scheduler; using Orleans.Runtime.Utilities; @@ -345,7 +346,7 @@ private void ProcessMembershipUpdate(DirectoryMembershipSnapshot current) private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, DirectoryMembershipSnapshot current, RingRange removedRange) { GrainRuntime.CheckRuntimeContext(this); - var (tcs, sw) = LockRange(removedRange, current.Version); + var (tcs, sw) = LockRange(removedRange, current.Version, GrainDirectoryEvents.ReleaseOperationName); LogDebugRelinquishingOwnership(_logger, removedRange, current.Version); try @@ -407,7 +408,7 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc } finally { - UnlockRange(removedRange, current.Version, tcs, sw.Elapsed, "release"); + UnlockRange(removedRange, current.Version, tcs, sw.Elapsed, GrainDirectoryEvents.ReleaseOperationName); } } @@ -416,7 +417,7 @@ private async Task AcquireRangeAsync(DirectoryMembershipSnapshot previous, Direc GrainRuntime.CheckRuntimeContext(this); // Suspend the range and transfer state from the previous owners. // If the predecessor becomes unavailable or membership advances quickly, we will declare data loss and unlock the range. - var (tcs, sw) = LockRange(addedRange, current.Version); + var (tcs, sw) = LockRange(addedRange, current.Version, GrainDirectoryEvents.AcquireOperationName); try { @@ -475,21 +476,23 @@ private async Task AcquireRangeAsync(DirectoryMembershipSnapshot previous, Direc } finally { - UnlockRange(addedRange, current.Version, tcs, sw.Elapsed, "acquire"); + UnlockRange(addedRange, current.Version, tcs, sw.Elapsed, GrainDirectoryEvents.AcquireOperationName); } } - private (TaskCompletionSource Lock, ValueStopwatch Stopwatch) LockRange(RingRange range, MembershipVersion version) + private (TaskCompletionSource Lock, ValueStopwatch Stopwatch) LockRange(RingRange range, MembershipVersion version, string operationName) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _rangeLocks.Add((range, version, tcs)); + GrainDirectoryEvents.EmitRangeOperationStarted(_id, _partitionIndex, version, range, operationName); return (tcs, ValueStopwatch.StartNew()); } private void UnlockRange(RingRange range, MembershipVersion version, TaskCompletionSource tcs, TimeSpan heldDuration, string operationName) { DirectoryInstruments.RangeLockHeldDuration.Record((long)heldDuration.TotalMilliseconds); - if (ShutdownToken.IsCancellationRequested) + var canceled = ShutdownToken.IsCancellationRequested; + if (canceled) { // If the partition is stopped, the range is never unlocked and the task is cancelled instead. tcs.SetCanceled(ShutdownToken); @@ -499,6 +502,8 @@ private void UnlockRange(RingRange range, MembershipVersion version, TaskComplet tcs.SetResult(); _rangeLocks.Remove((range, version, tcs)); } + + GrainDirectoryEvents.EmitRangeOperationCompleted(_id, _partitionIndex, version, range, operationName, heldDuration, canceled); } private async Task TransferSnapshotAsync( diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs index 0baa7ec36f0..c561a85bfb2 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs @@ -4,10 +4,12 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Configuration; +using Orleans.Runtime.Diagnostics; using Orleans.Runtime.GrainDirectory; using Orleans.Serialization; using Orleans.Storage; using Orleans.TestingHost; +using Orleans.TestingHost.Diagnostics; using Xunit; using Xunit.Abstractions; @@ -27,6 +29,8 @@ internal class MyDirectoryTestGrain : Grain, IMyDirectoryTestGrain [TestCategory("Stress"), TestCategory("Directory")] public sealed class GrainDirectoryResilienceTests { + private static readonly TimeSpan DirectoryMigrationTimeout = TimeSpan.FromMinutes(1); + /// /// Cluster chaos test: tests directory functionality & integrity while starting/stopping/killing silos frequently. /// @@ -57,7 +61,7 @@ public async Task ElasticChaos() var time = Stopwatch.StartNew(); var tasks = Enumerable.Range(0, CallsPerIteration).Select(i => client.GetGrain(idBase + i).Ping().AsTask()).ToList(); var workTask = Task.WhenAll(tasks); - + try { await workTask; @@ -68,7 +72,7 @@ public async Task ElasticChaos() } catch (OrleansMessageRejectionException omre) { - log.LogInformation(omre, "Swallowed rejection."); + log.LogInformation(omre, "Swallowed rejection."); } catch (Exception exception) { @@ -93,24 +97,7 @@ public async Task ElasticChaos() reconfigurationTimer.Restart(); await clusterOperation; - // Check integrity - var integrityChecks = new List(); - foreach (var silo in testCluster.Silos) - { - var address = silo.SiloAddress; - var partitionsPerSilo = ((InProcessSiloHandle)silo).ServiceProvider.GetRequiredService().PartitionsPerSilo; - for (var partitionIndex = 0; partitionIndex < partitionsPerSilo; partitionIndex++) - { - var replica = ((IInternalGrainFactory)client).GetSystemTarget(GrainDirectoryPartition.CreateGrainId(address, partitionIndex).GrainId); - integrityChecks.Add(replica.CheckIntegrityAsync().AsTask()); - } - } - - await Task.WhenAll(integrityChecks); - foreach (var task in integrityChecks) - { - await task; - } + await CheckIntegrityAsync(testCluster, client); clusterOperation = Task.Run(async () => { @@ -169,6 +156,196 @@ public async Task ElasticChaos() await testCluster.DisposeAsync(); } + [Fact] + public async Task JoiningSilo_DoesNotLeaveStaleEntriesOnPreviousOwner() + { + using var directoryEvents = new DiagnosticEventCollector(GrainDirectoryEvents.ListenerName); + var testClusterBuilder = new TestClusterBuilder(1); + testClusterBuilder.AddSiloBuilderConfigurator(); + var testCluster = testClusterBuilder.Build(); + await testCluster.DeployAsync(); + var log = testCluster.ServiceProvider.GetRequiredService>(); + var client = ((InProcessSiloHandle)testCluster.Primary).SiloHost.Services.GetRequiredService(); + var previousDirectoryView = await WaitForDirectoryViewAsync( + ((InProcessSiloHandle)testCluster.Primary).ServiceProvider.GetRequiredService(), + view => view.Members.Contains(testCluster.Primary.SiloAddress), + "initial directory membership view"); + const int CallsPerIteration = 100; + var nextGrainId = 0L; + + try + { + for (var i = 0; i < 10; i++) + { + await RunPingBatchAsync(client, log, nextGrainId, CallsPerIteration); + nextGrainId += CallsPerIteration; + } + + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); + var loadGrainId = nextGrainId; + var loadTask = Task.Run(async () => + { + while (!cts.IsCancellationRequested) + { + await RunPingBatchAsync(client, log, loadGrainId, CallsPerIteration); + loadGrainId += CallsPerIteration; + } + }); + + try + { + log.LogInformation("Starting new silo."); + var newSilo = await testCluster.StartAdditionalSiloAsync(); + log.LogInformation("Started '{Silo}'.", newSilo.SiloAddress); + + var currentDirectoryView = await WaitForDirectoryViewAsync( + ((InProcessSiloHandle)newSilo).ServiceProvider.GetRequiredService(), + view => view.Members.Contains(newSilo.SiloAddress), + $"directory membership view containing '{newSilo.SiloAddress}'"); + await WaitForDirectoryMigrationAsync(directoryEvents, previousDirectoryView, currentDirectoryView); + await CheckIntegrityAsync(testCluster, client); + } + finally + { + cts.Cancel(); + await loadTask; + } + } + finally + { + await testCluster.StopAllSilosAsync(); + await testCluster.DisposeAsync(); + } + } + + private static async Task CheckIntegrityAsync(TestCluster testCluster, IGrainFactory client) + { + var integrityChecks = new List(); + var internalGrainFactory = (IInternalGrainFactory)client; + foreach (var silo in testCluster.Silos) + { + var address = silo.SiloAddress; + var partitionsPerSilo = ((InProcessSiloHandle)silo).ServiceProvider.GetRequiredService().PartitionsPerSilo; + for (var partitionIndex = 0; partitionIndex < partitionsPerSilo; partitionIndex++) + { + var replica = internalGrainFactory.GetSystemTarget(GrainDirectoryPartition.CreateGrainId(address, partitionIndex).GrainId); + integrityChecks.Add(replica.CheckIntegrityAsync().AsTask()); + } + } + + await Task.WhenAll(integrityChecks); + } + + private static async Task WaitForDirectoryViewAsync( + DirectoryMembershipService directoryMembershipService, + Func predicate, + string description) + { + using var cts = new CancellationTokenSource(DirectoryMigrationTimeout); + try + { + await foreach (var view in directoryMembershipService.ViewUpdates.WithCancellation(cts.Token)) + { + if (predicate(view)) + { + return view; + } + } + } + catch (OperationCanceledException) when (cts.IsCancellationRequested) + { + throw new TimeoutException($"Timed out waiting for {description} after {DirectoryMigrationTimeout}."); + } + + throw new TimeoutException($"Timed out waiting for {description} after {DirectoryMigrationTimeout}."); + } + + private static async Task WaitForDirectoryMigrationAsync( + DiagnosticEventCollector directoryEvents, + DirectoryMembershipSnapshot previousView, + DirectoryMembershipSnapshot currentView) + { + var expectedOperations = GetExpectedRangeOperations(previousView, currentView).ToArray(); + Assert.NotEmpty(expectedOperations); + + await Task.WhenAll(expectedOperations.Select(operation => WaitForRangeOperationCompletedAsync(directoryEvents, operation))); + } + + private static IEnumerable GetExpectedRangeOperations( + DirectoryMembershipSnapshot previousView, + DirectoryMembershipSnapshot currentView) + { + var partitionCount = Math.Max(previousView.PartitionCount, currentView.PartitionCount); + foreach (var member in previousView.Members.Concat(currentView.Members).Distinct()) + { + for (var partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + var previousRange = previousView.GetRange(member, partitionIndex); + var currentRange = currentView.GetRange(member, partitionIndex); + foreach (var removedRange in previousRange.Difference(currentRange)) + { + if (!removedRange.IsEmpty) + { + yield return new( + member, + partitionIndex, + currentView.Version, + removedRange, + GrainDirectoryEvents.ReleaseOperationName); + } + } + + foreach (var addedRange in currentRange.Difference(previousRange)) + { + if (!addedRange.IsEmpty) + { + yield return new( + member, + partitionIndex, + currentView.Version, + addedRange, + GrainDirectoryEvents.AcquireOperationName); + } + } + } + } + } + + private static async Task WaitForRangeOperationCompletedAsync( + DiagnosticEventCollector directoryEvents, + ExpectedRangeOperation expectedOperation) + { + await directoryEvents.WaitForEventAsync( + nameof(GrainDirectoryEvents.RangeOperationCompleted), + evt => evt.Payload is GrainDirectoryEvents.RangeOperationCompleted completed + && !completed.Canceled + && completed.SiloAddress.Equals(expectedOperation.SiloAddress) + && completed.PartitionIndex == expectedOperation.PartitionIndex + && completed.Version == expectedOperation.Version + && completed.Range.Equals(expectedOperation.Range) + && string.Equals(completed.OperationName, expectedOperation.OperationName, StringComparison.Ordinal), + DirectoryMigrationTimeout); + } + + private static async Task RunPingBatchAsync(IGrainFactory client, ILogger log, long idBase, int callsPerIteration) + { + var tasks = Enumerable.Range(0, callsPerIteration).Select(i => client.GetGrain(idBase + i).Ping().AsTask()).ToList(); + var workTask = Task.WhenAll(tasks); + + try + { + await workTask; + } + catch (SiloUnavailableException sue) + { + log.LogInformation(sue, "Swallowed transient exception."); + } + catch (OrleansMessageRejectionException omre) + { + log.LogInformation(omre, "Swallowed rejection."); + } + } + private class SiloBuilderConfigurator : ISiloConfigurator { public void Configure(ISiloBuilder siloBuilder) @@ -179,5 +356,12 @@ public void Configure(ISiloBuilder siloBuilder) #pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. } } + + private readonly record struct ExpectedRangeOperation( + SiloAddress SiloAddress, + int PartitionIndex, + MembershipVersion Version, + RingRange Range, + string OperationName); }