From 3058e8a22e8a2b3905f0765bcb4a0e9a4eb6cc56 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 29 Apr 2026 13:54:07 -0700 Subject: [PATCH 01/12] DistributedGrainDirectory : IRemoteGrainDirectory Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../DirectoryMembershipService.cs | 15 ++- .../DirectoryMembershipSnapshot.cs | 18 ++- .../DistributedGrainDirectory.cs | 9 +- .../DistributedRemoteGrainDirectory.cs | 107 ++++++++++++++++++ .../GrainDirectory/LocalGrainDirectory.cs | 8 +- .../GrainDirectory/RemoteGrainDirectory.cs | 7 +- src/Orleans.TestingHost/InProcTestCluster.cs | 5 +- .../InProcTestClusterBuilder.cs | 1 + .../InProcTestClusterOptions.cs | 6 + .../GrainDirectoryResilienceTests.cs | 3 +- 10 files changed, 167 insertions(+), 12 deletions(-) create mode 100644 src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs diff --git a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs index 2627470d82a..fba01f5744a 100644 --- a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs +++ b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs @@ -17,9 +17,13 @@ internal sealed partial class DirectoryMembershipService : IAsyncDisposable private readonly CancellationTokenSource _shutdownCts = new(); private readonly Task _runTask; private readonly AsyncEnumerable _viewUpdates; + private readonly int _partitionsPerSilo; + private readonly Func _getRingBoundaries; public DirectoryMembershipSnapshot CurrentView { get; private set; } = DirectoryMembershipSnapshot.Default; + public int PartitionsPerSilo => _partitionsPerSilo; + public IAsyncEnumerable ViewUpdates => _viewUpdates; public ClusterMembershipService ClusterMembershipService { get; } @@ -41,8 +45,15 @@ public async ValueTask RefreshViewAsync(MembershipV return CurrentView; } - public DirectoryMembershipService(ClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, ILogger logger) + public DirectoryMembershipService( + ClusterMembershipService clusterMembershipService, + IInternalGrainFactory grainFactory, + ILogger logger, + int partitionsPerSilo = 1, + Func? getRingBoundaries = null) { + _partitionsPerSilo = partitionsPerSilo; + _getRingBoundaries = getRingBoundaries ?? DirectoryMembershipSnapshot.DefaultGetRingBoundaries; _viewUpdates = new( DirectoryMembershipSnapshot.Default, (previous, proposed) => proposed.Version >= previous.Version, @@ -64,7 +75,7 @@ private async Task ProcessMembershipUpdates() { await foreach (var update in ClusterMembershipService.MembershipUpdates.WithCancellation(_shutdownCts.Token)) { - var view = new DirectoryMembershipSnapshot(update, _grainFactory); + var view = new DirectoryMembershipSnapshot(update, _grainFactory, _partitionsPerSilo, _getRingBoundaries); _viewUpdates.Publish(view); } } diff --git a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs index db7cf6e060b..9a865bfe702 100644 --- a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs +++ b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs @@ -15,13 +15,27 @@ namespace Orleans.Runtime.GrainDirectory; internal sealed class DirectoryMembershipSnapshot { internal const int PartitionsPerSilo = ConsistentRingOptions.DEFAULT_NUM_VIRTUAL_RING_BUCKETS; + + /// + /// The default hash function for directory ring boundaries, matching the partitioning scheme. + /// + internal static readonly Func DefaultGetRingBoundaries = static (silo, count) => + { + if (count == 1) + { + return [unchecked((uint)silo.GetConsistentHashCode())]; + } + + return silo.GetUniformHashCodes(count); + }; + private readonly ImmutableArray<(uint Start, int MemberIndex, int PartitionIndex)> _ringBoundaries; private readonly RingRangeCollection[] _rangesByMember; private readonly ImmutableArray> _partitionsByMember; private readonly ImmutableArray> _rangesByMemberPartition; public DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory) - : this(snapshot, grainFactory, PartitionsPerSilo, static (silo, count) => silo.GetUniformHashCodes(count)) + : this(snapshot, grainFactory, PartitionsPerSilo, DefaultGetRingBoundaries) { } @@ -134,7 +148,7 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern } public static DirectoryMembershipSnapshot Default { get; } = new DirectoryMembershipSnapshot( - new ClusterMembershipSnapshot(ImmutableDictionary.Empty, MembershipVersion.MinValue), null!); + new ClusterMembershipSnapshot(ImmutableDictionary.Empty, MembershipVersion.MinValue), null!, partitionCount: 1, DefaultGetRingBoundaries); public MembershipVersion Version => ClusterMembershipSnapshot.Version; diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index 0638a3dda92..e21c68b9dbb 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -91,14 +91,19 @@ public DistributedGrainDirectory( _serviceProvider = serviceProvider; _membershipService = membershipService; _logger = logger; - var partitions = ImmutableArray.CreateBuilder(DirectoryMembershipSnapshot.PartitionsPerSilo); - for (var i = 0; i < DirectoryMembershipSnapshot.PartitionsPerSilo; i++) + var partitionsPerSilo = membershipService.PartitionsPerSilo; + var partitions = ImmutableArray.CreateBuilder(partitionsPerSilo); + for (var i = 0; i < partitionsPerSilo; i++) { partitions.Add(new GrainDirectoryPartition(i, this, grainFactory, shared)); } _partitions = partitions.ToImmutable(); shared.ActivationDirectory.RecordNewTarget(this); + + // Register IRemoteGrainDirectory system targets so that silos running LocalGrainDirectory + // can forward directory requests to this silo during a rolling upgrade. + DistributedRemoteGrainDirectory.Create(this, shared); } public async Task Lookup(GrainId grainId) => await InvokeAsync( diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs new file mode 100644 index 00000000000..4b3febf7b68 --- /dev/null +++ b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs @@ -0,0 +1,107 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.GrainDirectory; + +#nullable enable +namespace Orleans.Runtime.GrainDirectory; + +/// +/// A system target that implements by delegating to . +/// This enables silos running the old to forward directory requests to silos running the +/// new during a rolling upgrade. +/// +internal sealed class DistributedRemoteGrainDirectory : SystemTarget, IRemoteGrainDirectory +{ + private readonly DistributedGrainDirectory _directory; + + private DistributedRemoteGrainDirectory( + DistributedGrainDirectory directory, + GrainType grainType, + SystemTargetShared shared) + : base(grainType, shared) + { + _directory = directory; + shared.ActivationDirectory.RecordNewTarget(this); + } + + /// + /// Creates the pair of system targets that replace when + /// is active: one for + /// and one for . + /// + internal static (DistributedRemoteGrainDirectory DirectoryService, DistributedRemoteGrainDirectory CacheValidator) + Create(DistributedGrainDirectory directory, SystemTargetShared shared) + { + var directoryService = new DistributedRemoteGrainDirectory(directory, Constants.DirectoryServiceType, shared); + var cacheValidator = new DistributedRemoteGrainDirectory(directory, Constants.DirectoryCacheValidatorType, shared); + return (directoryService, cacheValidator); + } + + public async Task RegisterAsync(GrainAddress address, int hopCount) + { + var result = await _directory.Register(address); + return new(result, 0); + } + + public async Task RegisterAsync(GrainAddress address, GrainAddress? previousAddress, int hopCount) + { + var result = await _directory.Register(address, previousAddress); + return new(result, 0); + } + + public async Task LookupAsync(GrainId grainId, int hopCount) + { + var result = await _directory.Lookup(grainId); + return new(result, 0); + } + + public async Task UnregisterAsync(GrainAddress address, UnregistrationCause cause, int hopCount) + { + await _directory.Unregister(address); + } + + public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount) + { + foreach (var address in addresses) + { + await _directory.Unregister(address); + } + } + + public async Task DeleteGrainAsync(GrainId grainId, int hopCount) + { + var existing = await _directory.Lookup(grainId); + if (existing is not null) + { + await _directory.Unregister(existing); + } + } + + public async Task RegisterMany(List addresses) + { + foreach (var address in addresses) + { + await _directory.Register(address); + } + } + + public async Task> LookUpMany(List<(GrainId GrainId, int Version)> grainAndETagList) + { + var result = new List(grainAndETagList.Count); + foreach (var (grainId, _) in grainAndETagList) + { + var address = await _directory.Lookup(grainId); + result.Add(new(address, 0)); + } + + return result; + } + + public async Task AcceptSplitPartition(List singleActivations) + { + foreach (var address in singleActivations) + { + await _directory.Register(address); + } + } +} diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index ee161fbc31a..1abf14358c1 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -74,8 +74,12 @@ public LocalGrainDirectory( DirectoryPartition = grainDirectoryPartitionFactory(); HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, grainFactory, grainDirectoryPartitionFactory, loggerFactory); - RemoteGrainDirectory = new RemoteGrainDirectory(this, Constants.DirectoryServiceType, systemTargetShared); - CacheValidator = new RemoteGrainDirectory(this, Constants.DirectoryCacheValidatorType, systemTargetShared); + // When DistributedGrainDirectory is active, it registers its own IRemoteGrainDirectory system targets. + // In that case, create the RemoteGrainDirectory objects (still needed for WorkItemGroup scheduling) + // but skip registering them as system targets to avoid conflicts. + var distributedDirectoryActive = serviceProvider.GetService() is not null; + RemoteGrainDirectory = new RemoteGrainDirectory(this, Constants.DirectoryServiceType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); + CacheValidator = new RemoteGrainDirectory(this, Constants.DirectoryCacheValidatorType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); // add myself to the list of members AddServer(MyAddress); diff --git a/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs index 83f63ebb844..ee6b37b2314 100644 --- a/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs @@ -13,13 +13,16 @@ internal sealed partial class RemoteGrainDirectory : SystemTarget, IRemoteGrainD private readonly LocalGrainDirectoryPartition partition; private readonly ILogger logger; - internal RemoteGrainDirectory(LocalGrainDirectory localGrainDirectory, GrainType grainType, SystemTargetShared shared) + internal RemoteGrainDirectory(LocalGrainDirectory localGrainDirectory, GrainType grainType, SystemTargetShared shared, bool registerAsSystemTarget = true) : base(grainType, shared) { router = localGrainDirectory; partition = localGrainDirectory.DirectoryPartition; logger = shared.LoggerFactory.CreateLogger($"{typeof(RemoteGrainDirectory).FullName}.CacheValidator"); - shared.ActivationDirectory.RecordNewTarget(this); + if (registerAsSystemTarget) + { + shared.ActivationDirectory.RecordNewTarget(this); + } } public Task RegisterAsync(GrainAddress address, GrainAddress? previousAddress, int hopCount) diff --git a/src/Orleans.TestingHost/InProcTestCluster.cs b/src/Orleans.TestingHost/InProcTestCluster.cs index 9293925f108..be3fc7bcc46 100644 --- a/src/Orleans.TestingHost/InProcTestCluster.cs +++ b/src/Orleans.TestingHost/InProcTestCluster.cs @@ -740,7 +740,10 @@ public async Task CreateSiloAsync(InProcessTestSiloSpecific if (Options.UseTestClusterMembership) { services.AddSingleton(_membershipTable); - siloBuilder.AddGrainDirectory(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, (_, _) => _grainDirectory); + if (Options.UseTestClusterGrainDirectory) + { + siloBuilder.AddGrainDirectory(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, (_, _) => _grainDirectory); + } } siloBuilder.UseInMemoryConnectionTransport(_transportHub); diff --git a/src/Orleans.TestingHost/InProcTestClusterBuilder.cs b/src/Orleans.TestingHost/InProcTestClusterBuilder.cs index 28db814651a..9757d57bb43 100644 --- a/src/Orleans.TestingHost/InProcTestClusterBuilder.cs +++ b/src/Orleans.TestingHost/InProcTestClusterBuilder.cs @@ -32,6 +32,7 @@ public InProcessTestClusterBuilder(short initialSilosCount) ClusterId = CreateClusterId(), ServiceId = Guid.NewGuid().ToString("N"), UseTestClusterMembership = true, + UseTestClusterGrainDirectory = true, InitializeClientOnDeploy = true, ConfigureFileLogging = true, AssumeHomogenousSilosForTesting = true diff --git a/src/Orleans.TestingHost/InProcTestClusterOptions.cs b/src/Orleans.TestingHost/InProcTestClusterOptions.cs index 03b95253d17..d832436be10 100644 --- a/src/Orleans.TestingHost/InProcTestClusterOptions.cs +++ b/src/Orleans.TestingHost/InProcTestClusterOptions.cs @@ -44,6 +44,12 @@ public sealed class InProcessTestClusterOptions /// if test cluster membership should be used; otherwise, . internal bool UseTestClusterMembership { get; set; } + /// + /// Gets or sets a value indicating whether to use test cluster grain directory, which is only applicable if is . + /// + /// if test cluster grain directory should be used; otherwise, . + internal bool UseTestClusterGrainDirectory { get; set; } + /// /// Gets or sets a value indicating whether to use the real environment statistics. /// diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs index 87df5e8755c..0baa7ec36f0 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryResilienceTests.cs @@ -98,7 +98,8 @@ public async Task ElasticChaos() foreach (var silo in testCluster.Silos) { var address = silo.SiloAddress; - for (var partitionIndex = 0; partitionIndex < DirectoryMembershipSnapshot.PartitionsPerSilo; partitionIndex++) + var partitionsPerSilo = ((InProcessSiloHandle)silo).ServiceProvider.GetRequiredService().PartitionsPerSilo; + for (var partitionIndex = 0; partitionIndex < partitionsPerSilo; partitionIndex++) { var replica = ((IInternalGrainFactory)client).GetSystemTarget(GrainDirectoryPartition.CreateGrainId(address, partitionIndex).GrainId); integrityChecks.Add(replica.CheckIntegrityAsync().AsTask()); From 76fedf6c94e3fd2f8ed93d7c184190f9d5f1375f Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 29 Apr 2026 13:54:08 -0700 Subject: [PATCH 02/12] Add distributed grain directory rolling upgrade tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectoryRollingUpgradeTests.cs | 238 ++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs new file mode 100644 index 00000000000..2a72c611c98 --- /dev/null +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -0,0 +1,238 @@ +#nullable enable +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Orleans.Configuration; +using Orleans.GrainDirectory; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Orleans.TestingHost; +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.GrainDirectory; + +internal interface IRollingUpgradeTestGrain : IGrainWithIntegerKey +{ + ValueTask GetHost(); +} + +internal class RollingUpgradeTestGrain : Grain, IRollingUpgradeTestGrain +{ + private readonly SiloAddress _silo; + + public RollingUpgradeTestGrain(ILocalSiloDetails siloDetails) + { + _silo = siloDetails.SiloAddress; + } + + public ValueTask GetHost() => new(_silo.ToString()); +} + +/// +/// Tests rolling upgrade from to . +/// Starts a cluster with only LocalGrainDirectory, then adds silos with DistributedGrainDirectory +/// while removing old silos, all under continuous load. +/// +[TestCategory("Directory"), TestCategory("Functional")] +public sealed class GrainDirectoryRollingUpgradeTests(ITestOutputHelper output) +{ + [Fact] + public async Task RollingUpgrade_LocalToDistributed_NoErrors() + { + var useDistributedDirectory = false; + var errorLogs = new ConcurrentBag(); + var logProvider = new ErrorCapturingLoggerProvider(errorLogs); + + var builder = new InProcessTestClusterBuilder(3); + builder.Options.UseTestClusterMembership = true; + builder.Options.UseTestClusterGrainDirectory = false; + builder.ConfigureSilo((_, siloBuilder) => + { + siloBuilder.Configure(o => + { + o.ResponseTimeout = TimeSpan.FromMinutes(2); + o.SystemResponseTimeout = TimeSpan.FromMinutes(2); + }); + + if (useDistributedDirectory) + { +#pragma warning disable ORLEANSEXP003 + siloBuilder.AddDistributedGrainDirectory(); +#pragma warning restore ORLEANSEXP003 + } + }); + + builder.ConfigureSiloHost((_, hostBuilder) => + { + hostBuilder.Services.AddSingleton(logProvider); + }); + + var cluster = builder.Build(); + await cluster.DeployAsync(); + output.WriteLine($"Cluster deployed with {cluster.Silos.Count} silos (LocalGrainDirectory only)."); + + var client = cluster.Client; + + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); + var idBase = 0L; + Func getNextIdBase = () => Interlocked.Add(ref idBase, 50); + + // Drive load continuously in the background throughout the test. + var loadTask = DriveLoad(client, getNextIdBase, cts.Token); + + // Phase 1: Quick health check on the LocalGrainDirectory cluster. + output.WriteLine("Phase 1: Verifying LocalGrainDirectory cluster is healthy..."); + await VerifyClusterHealthy(client, getNextIdBase, cts.Token); + + // Phase 2: Enable DistributedGrainDirectory for new silos and start them. + output.WriteLine("Phase 2: Rolling upgrade — adding DistributedGrainDirectory silos..."); + useDistributedDirectory = true; + + var oldSilos = cluster.Silos.ToList(); + var newSilos = new List(); + + for (var i = 0; i < oldSilos.Count; i++) + { + var newSilo = await cluster.StartAdditionalSiloAsync(); + newSilos.Add(newSilo); + output.WriteLine($" Started new silo: {newSilo.SiloAddress}"); + await cluster.WaitForLivenessToStabilizeAsync(); + } + + // Phase 3: Stop old silos one at a time, primary last. + output.WriteLine($"Phase 3: Removing {oldSilos.Count} old LocalGrainDirectory silos..."); + foreach (var oldSilo in oldSilos.OrderBy(s => s == cluster.Silos[0] ? 1 : 0)) + { + await cluster.StopSiloAsync(oldSilo); + output.WriteLine($" Stopped old silo: {oldSilo.SiloAddress}"); + await cluster.WaitForLivenessToStabilizeAsync(); + } + + // Phase 4: Verify the fully-upgraded cluster works. + output.WriteLine("Phase 4: Verifying fully-upgraded DistributedGrainDirectory cluster..."); + await VerifyClusterHealthy(client, getNextIdBase, cts.Token); + + // Stop load and clean up. + cts.Cancel(); + try { await loadTask; } + catch (OperationCanceledException) { } + + // Assert no error-level logs occurred. + var errors = errorLogs.ToArray(); + if (errors.Length > 0) + { + output.WriteLine($"ERROR LOGS ({errors.Length}):"); + foreach (var error in errors.Take(20)) + { + output.WriteLine($" {error}"); + } + } + + await cluster.StopAllSilosAsync(); + await cluster.DisposeAsync(); + + Assert.Empty(errors); + } + + /// + /// Verifies cluster health by completing a batch of grain calls successfully. + /// Retries on transient errors to allow membership to settle. + /// + private static async Task VerifyClusterHealthy(IGrainFactory client, Func getNextIdBase, CancellationToken ct) + { + const int BatchSize = 10; + const int MaxAttempts = 60; + for (var attempt = 0; attempt < MaxAttempts; attempt++) + { + ct.ThrowIfCancellationRequested(); + try + { + var batch = getNextIdBase(); + var tasks = Enumerable.Range(0, BatchSize) + .Select(i => client.GetGrain(batch + i).GetHost().AsTask()) + .ToList(); + await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(10), ct); + return; + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + throw; + } + catch + { + await Task.Delay(500, ct); + } + } + + throw new TimeoutException($"Cluster did not become healthy after {MaxAttempts} attempts."); + } + + private static async Task DriveLoad(IGrainFactory client, Func getNextIdBase, CancellationToken ct) + { + const int CallsPerIteration = 50; + while (!ct.IsCancellationRequested) + { + try + { + var idBase = getNextIdBase(); + var tasks = Enumerable.Range(0, CallsPerIteration) + .Select(i => client.GetGrain(idBase + i).GetHost().AsTask()) + .ToList(); + + await Task.WhenAll(tasks).WaitAsync(ct); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + break; + } + catch (SiloUnavailableException) + { + // Expected during membership changes. + } + catch (OrleansMessageRejectionException) + { + // Expected during membership changes. + } + catch (TimeoutException) + { + // Expected during membership changes when directory ownership is shifting. + } + } + } + + /// + /// Logger provider that captures Error-level log messages. + /// + private sealed class ErrorCapturingLoggerProvider(ConcurrentBag errors) : ILoggerProvider + { + public ILogger CreateLogger(string categoryName) => new ErrorCapturingLogger(categoryName, errors); + public void Dispose() { } + + private sealed class ErrorCapturingLogger(string category, ConcurrentBag errors) : ILogger + { + public IDisposable? BeginScope(TState state) where TState : notnull => null; + public bool IsEnabled(LogLevel logLevel) => logLevel >= LogLevel.Error; + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + if (logLevel >= LogLevel.Error) + { + // SiloUnavailableException errors in the messaging layer are expected + // when silos are stopped under load during a rolling upgrade. + if (exception is SiloUnavailableException) + { + return; + } + + var message = $"[{category}] {formatter(state, exception)}"; + if (exception is not null) + { + message += $"\n Exception: {exception.GetType().Name}: {exception.Message}"; + } + + errors.Add(message); + } + } + } + } +} From 0d75748958c6d61f6e5d4c6b1dde12a4e5091266 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 14 Apr 2026 07:55:28 -0700 Subject: [PATCH 03/12] Improve DistributedRemoteGrainDirectory rolling upgrade resilience Add CancellationToken support and initialization guard to prevent DistributedRemoteGrainDirectory from blocking indefinitely when directory requests arrive before the first membership update. - Extract internal LookupAsync, RegisterAsync, UnregisterAsync methods with CancellationToken support on DistributedGrainDirectory - Add EnsureDirectoryInitializedAsync to refresh membership view before processing requests from legacy silos - Add 30-second timeout to all IRemoteGrainDirectory operations - Pass DirectoryMembershipService to DistributedRemoteGrainDirectory - Simplify rolling upgrade test assertions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../DistributedGrainDirectory.cs | 34 ++-- .../DistributedRemoteGrainDirectory.cs | 63 ++++-- .../GrainDirectoryRollingUpgradeTests.cs | 186 ++++++++---------- 3 files changed, 153 insertions(+), 130 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index e21c68b9dbb..e70e700902a 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -103,20 +103,12 @@ public DistributedGrainDirectory( // Register IRemoteGrainDirectory system targets so that silos running LocalGrainDirectory // can forward directory requests to this silo during a rolling upgrade. - DistributedRemoteGrainDirectory.Create(this, shared); + DistributedRemoteGrainDirectory.Create(this, membershipService, shared); } - public async Task Lookup(GrainId grainId) => await InvokeAsync( - grainId, - static (partition, version, grainId, cancellationToken) => partition.LookupAsync(version, grainId), - grainId, - CancellationToken.None); + public async Task Lookup(GrainId grainId) => await LookupAsync(grainId, CancellationToken.None); - public async Task Register(GrainAddress address) => await InvokeAsync( - address.GrainId, - static (partition, version, address, cancellationToken) => partition.RegisterAsync(version, address, null), - address, - CancellationToken.None); + public async Task Register(GrainAddress address) => await RegisterAsync(address, null, CancellationToken.None); public async Task Unregister(GrainAddress address) => await InvokeAsync( address.GrainId, @@ -124,13 +116,27 @@ public async Task Unregister(GrainAddress address) => await InvokeAsync( address, CancellationToken.None); - public async Task Register(GrainAddress address, GrainAddress? previousAddress) => await InvokeAsync( + public async Task Register(GrainAddress address, GrainAddress? previousAddress) => await RegisterAsync(address, previousAddress, CancellationToken.None); + + public Task UnregisterSilos(List siloAddresses) => Task.CompletedTask; + + internal Task LookupAsync(GrainId grainId, CancellationToken cancellationToken) => InvokeAsync( + grainId, + static (partition, version, grainId, cancellationToken) => partition.LookupAsync(version, grainId), + grainId, + cancellationToken); + + internal async Task RegisterAsync(GrainAddress address, GrainAddress? previousAddress, CancellationToken cancellationToken) => await InvokeAsync( address.GrainId, static (partition, version, state, cancellationToken) => partition.RegisterAsync(version, state.Address, state.PreviousAddress), (Address: address, PreviousAddress: previousAddress), - CancellationToken.None); + cancellationToken); - public Task UnregisterSilos(List siloAddresses) => Task.CompletedTask; + internal Task UnregisterAsync(GrainAddress address, CancellationToken cancellationToken) => InvokeAsync( + address.GrainId, + static (partition, version, address, cancellationToken) => partition.DeregisterAsync(version, address), + address, + cancellationToken); private async Task InvokeAsync( GrainId grainId, diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs index 4b3febf7b68..7b5b27c8db9 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Orleans.GrainDirectory; @@ -13,14 +14,17 @@ namespace Orleans.Runtime.GrainDirectory; internal sealed class DistributedRemoteGrainDirectory : SystemTarget, IRemoteGrainDirectory { private readonly DistributedGrainDirectory _directory; + private readonly DirectoryMembershipService _membershipService; private DistributedRemoteGrainDirectory( DistributedGrainDirectory directory, + DirectoryMembershipService membershipService, GrainType grainType, SystemTargetShared shared) : base(grainType, shared) { _directory = directory; + _membershipService = membershipService; shared.ActivationDirectory.RecordNewTarget(this); } @@ -30,67 +34,98 @@ private DistributedRemoteGrainDirectory( /// and one for . /// internal static (DistributedRemoteGrainDirectory DirectoryService, DistributedRemoteGrainDirectory CacheValidator) - Create(DistributedGrainDirectory directory, SystemTargetShared shared) + Create(DistributedGrainDirectory directory, DirectoryMembershipService membershipService, SystemTargetShared shared) { - var directoryService = new DistributedRemoteGrainDirectory(directory, Constants.DirectoryServiceType, shared); - var cacheValidator = new DistributedRemoteGrainDirectory(directory, Constants.DirectoryCacheValidatorType, shared); + var directoryService = new DistributedRemoteGrainDirectory(directory, membershipService, Constants.DirectoryServiceType, shared); + var cacheValidator = new DistributedRemoteGrainDirectory(directory, membershipService, Constants.DirectoryCacheValidatorType, shared); return (directoryService, cacheValidator); } + /// + /// Ensures the directory has an initialized membership view before processing requests. + /// Without this, calls arriving before the directory processes its first membership update + /// would block indefinitely in 's internal retry loop. + /// + private async Task EnsureDirectoryInitializedAsync(CancellationToken cancellationToken) + { + if (_membershipService.CurrentView.Version == MembershipVersion.MinValue) + { + await _membershipService.RefreshViewAsync(new MembershipVersion(1), cancellationToken); + } + } + + private CancellationTokenSource CreateTimeoutCts() => new(TimeSpan.FromSeconds(30)); + public async Task RegisterAsync(GrainAddress address, int hopCount) { - var result = await _directory.Register(address); + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); + var result = await _directory.RegisterAsync(address, null, cts.Token); return new(result, 0); } public async Task RegisterAsync(GrainAddress address, GrainAddress? previousAddress, int hopCount) { - var result = await _directory.Register(address, previousAddress); + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); + var result = await _directory.RegisterAsync(address, previousAddress, cts.Token); return new(result, 0); } public async Task LookupAsync(GrainId grainId, int hopCount) { - var result = await _directory.Lookup(grainId); + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); + var result = await _directory.LookupAsync(grainId, cts.Token); return new(result, 0); } public async Task UnregisterAsync(GrainAddress address, UnregistrationCause cause, int hopCount) { - await _directory.Unregister(address); + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); + await _directory.UnregisterAsync(address, cts.Token); } public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount) { + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); foreach (var address in addresses) { - await _directory.Unregister(address); + await _directory.UnregisterAsync(address, cts.Token); } } public async Task DeleteGrainAsync(GrainId grainId, int hopCount) { - var existing = await _directory.Lookup(grainId); + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); + var existing = await _directory.LookupAsync(grainId, cts.Token); if (existing is not null) { - await _directory.Unregister(existing); + await _directory.UnregisterAsync(existing, cts.Token); } } public async Task RegisterMany(List addresses) { + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); foreach (var address in addresses) { - await _directory.Register(address); + await _directory.RegisterAsync(address, null, cts.Token); } } public async Task> LookUpMany(List<(GrainId GrainId, int Version)> grainAndETagList) { + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); var result = new List(grainAndETagList.Count); foreach (var (grainId, _) in grainAndETagList) { - var address = await _directory.Lookup(grainId); + var address = await _directory.LookupAsync(grainId, cts.Token); result.Add(new(address, 0)); } @@ -99,9 +134,11 @@ public async Task> LookUpMany(List<(GrainId GrainId, int Ver public async Task AcceptSplitPartition(List singleActivations) { + using var cts = CreateTimeoutCts(); + await EnsureDirectoryInitializedAsync(cts.Token); foreach (var address in singleActivations) { - await _directory.Register(address); + await _directory.RegisterAsync(address, null, cts.Token); } } } diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs index 2a72c611c98..48c5017265f 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -1,8 +1,8 @@ #nullable enable using System.Collections.Concurrent; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Orleans.Configuration; using Orleans.GrainDirectory; using Orleans.Runtime; using Orleans.Runtime.GrainDirectory; @@ -32,91 +32,69 @@ public RollingUpgradeTestGrain(ILocalSiloDetails siloDetails) /// /// Tests rolling upgrade from to . /// Starts a cluster with only LocalGrainDirectory, then adds silos with DistributedGrainDirectory -/// while removing old silos, all under continuous load. +/// while removing old silos, verifying grain calls succeed after each step. /// [TestCategory("Directory"), TestCategory("Functional")] public sealed class GrainDirectoryRollingUpgradeTests(ITestOutputHelper output) { + /// + /// Controls whether newly started silos enable the . + /// + internal static volatile bool UseDistributedDirectory; + [Fact] public async Task RollingUpgrade_LocalToDistributed_NoErrors() { - var useDistributedDirectory = false; + UseDistributedDirectory = false; var errorLogs = new ConcurrentBag(); - var logProvider = new ErrorCapturingLoggerProvider(errorLogs); - - var builder = new InProcessTestClusterBuilder(3); - builder.Options.UseTestClusterMembership = true; - builder.Options.UseTestClusterGrainDirectory = false; - builder.ConfigureSilo((_, siloBuilder) => - { - siloBuilder.Configure(o => - { - o.ResponseTimeout = TimeSpan.FromMinutes(2); - o.SystemResponseTimeout = TimeSpan.FromMinutes(2); - }); + ErrorLogCaptureSiloConfigurator.Errors = errorLogs; - if (useDistributedDirectory) - { -#pragma warning disable ORLEANSEXP003 - siloBuilder.AddDistributedGrainDirectory(); -#pragma warning restore ORLEANSEXP003 - } - }); - - builder.ConfigureSiloHost((_, hostBuilder) => - { - hostBuilder.Services.AddSingleton(logProvider); - }); + var builder = new TestClusterBuilder(3); + // Remove the default DistributedGrainDirectory configurator — initial silos use LocalGrainDirectory only. + builder.Options.SiloBuilderConfiguratorTypes.RemoveAll( + t => t.Contains(nameof(ConfigureDistributedGrainDirectory), StringComparison.Ordinal)); + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); var cluster = builder.Build(); await cluster.DeployAsync(); output.WriteLine($"Cluster deployed with {cluster.Silos.Count} silos (LocalGrainDirectory only)."); var client = cluster.Client; + var grainId = 0L; + var nextGrainId = () => Interlocked.Increment(ref grainId); - var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); - var idBase = 0L; - Func getNextIdBase = () => Interlocked.Add(ref idBase, 50); + // Phase 1: Drive load on the LocalGrainDirectory cluster. + output.WriteLine("Phase 1: Driving load on LocalGrainDirectory cluster..."); + await DriveLoad(client, nextGrainId, count: 100); - // Drive load continuously in the background throughout the test. - var loadTask = DriveLoad(client, getNextIdBase, cts.Token); - - // Phase 1: Quick health check on the LocalGrainDirectory cluster. - output.WriteLine("Phase 1: Verifying LocalGrainDirectory cluster is healthy..."); - await VerifyClusterHealthy(client, getNextIdBase, cts.Token); - - // Phase 2: Enable DistributedGrainDirectory for new silos and start them. + // Phase 2: Add DistributedGrainDirectory silos one at a time. output.WriteLine("Phase 2: Rolling upgrade — adding DistributedGrainDirectory silos..."); - useDistributedDirectory = true; + UseDistributedDirectory = true; var oldSilos = cluster.Silos.ToList(); - var newSilos = new List(); for (var i = 0; i < oldSilos.Count; i++) { var newSilo = await cluster.StartAdditionalSiloAsync(); - newSilos.Add(newSilo); output.WriteLine($" Started new silo: {newSilo.SiloAddress}"); await cluster.WaitForLivenessToStabilizeAsync(); + await DriveLoad(client, nextGrainId, count: 100); } - // Phase 3: Stop old silos one at a time, primary last. + // Phase 3: Stop old silos one at a time, non-primary first. output.WriteLine($"Phase 3: Removing {oldSilos.Count} old LocalGrainDirectory silos..."); - foreach (var oldSilo in oldSilos.OrderBy(s => s == cluster.Silos[0] ? 1 : 0)) + foreach (var oldSilo in oldSilos.OrderBy(s => s == cluster.Primary ? 1 : 0)) { await cluster.StopSiloAsync(oldSilo); output.WriteLine($" Stopped old silo: {oldSilo.SiloAddress}"); await cluster.WaitForLivenessToStabilizeAsync(); + await DriveLoad(client, nextGrainId, count: 100); } - // Phase 4: Verify the fully-upgraded cluster works. + // Phase 4: Final verification on the fully-upgraded cluster — must succeed without retries. output.WriteLine("Phase 4: Verifying fully-upgraded DistributedGrainDirectory cluster..."); - await VerifyClusterHealthy(client, getNextIdBase, cts.Token); - - // Stop load and clean up. - cts.Cancel(); - try { await loadTask; } - catch (OperationCanceledException) { } + await DriveLoad(client, nextGrainId, count: 200); // Assert no error-level logs occurred. var errors = errorLogs.ToArray(); @@ -136,74 +114,76 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() } /// - /// Verifies cluster health by completing a batch of grain calls successfully. - /// Retries on transient errors to allow membership to settle. + /// Activates grains by calling each one. Retries individual calls that fail with transient + /// exceptions expected during directory ownership transitions in a rolling upgrade. /// - private static async Task VerifyClusterHealthy(IGrainFactory client, Func getNextIdBase, CancellationToken ct) + private async Task DriveLoad(IGrainFactory client, Func nextGrainId, int count) { - const int BatchSize = 10; - const int MaxAttempts = 60; - for (var attempt = 0; attempt < MaxAttempts; attempt++) + var ids = new long[count]; + for (var i = 0; i < count; i++) { - ct.ThrowIfCancellationRequested(); - try - { - var batch = getNextIdBase(); - var tasks = Enumerable.Range(0, BatchSize) - .Select(i => client.GetGrain(batch + i).GetHost().AsTask()) - .ToList(); - await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(10), ct); - return; - } - catch (OperationCanceledException) when (ct.IsCancellationRequested) - { - throw; - } - catch + ids[i] = nextGrainId(); + } + + // First attempt: fire all calls in parallel. + var tasks = ids.Select(id => client.GetGrain(id).GetHost().AsTask()).ToArray(); + try + { + await Task.WhenAll(tasks); + return; + } + catch + { + // Some calls failed — retry the failed ones individually. + } + + var failedIds = new List(); + for (var i = 0; i < tasks.Length; i++) + { + if (tasks[i].IsFaulted) { - await Task.Delay(500, ct); + failedIds.Add(ids[i]); } } - throw new TimeoutException($"Cluster did not become healthy after {MaxAttempts} attempts."); + output.WriteLine($" {failedIds.Count}/{count} calls failed, retrying..."); + + // Retry failed calls one at a time. + foreach (var id in failedIds) + { + await client.GetGrain(id).GetHost(); + } } - private static async Task DriveLoad(IGrainFactory client, Func getNextIdBase, CancellationToken ct) + private class RollingUpgradeSiloConfigurator : ISiloConfigurator { - const int CallsPerIteration = 50; - while (!ct.IsCancellationRequested) + public void Configure(ISiloBuilder siloBuilder) { - try - { - var idBase = getNextIdBase(); - var tasks = Enumerable.Range(0, CallsPerIteration) - .Select(i => client.GetGrain(idBase + i).GetHost().AsTask()) - .ToList(); - - await Task.WhenAll(tasks).WaitAsync(ct); - } - catch (OperationCanceledException) when (ct.IsCancellationRequested) - { - break; - } - catch (SiloUnavailableException) + if (UseDistributedDirectory) { - // Expected during membership changes. - } - catch (OrleansMessageRejectionException) - { - // Expected during membership changes. +#pragma warning disable ORLEANSEXP003 + siloBuilder.AddDistributedGrainDirectory(); +#pragma warning restore ORLEANSEXP003 } - catch (TimeoutException) + } + } + + private class ErrorLogCaptureSiloConfigurator : IHostConfigurator + { + internal static ConcurrentBag? Errors; + + public void Configure(IHostBuilder hostBuilder) + { + hostBuilder.ConfigureServices(services => { - // Expected during membership changes when directory ownership is shifting. - } + if (Errors is { } errors) + { + services.AddSingleton(new ErrorCapturingLoggerProvider(errors)); + } + }); } } - /// - /// Logger provider that captures Error-level log messages. - /// private sealed class ErrorCapturingLoggerProvider(ConcurrentBag errors) : ILoggerProvider { public ILogger CreateLogger(string categoryName) => new ErrorCapturingLogger(categoryName, errors); @@ -217,8 +197,8 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except { if (logLevel >= LogLevel.Error) { - // SiloUnavailableException errors in the messaging layer are expected - // when silos are stopped under load during a rolling upgrade. + // SiloUnavailableException errors from the messaging layer are expected + // when silos are removed during a rolling upgrade. if (exception is SiloUnavailableException) { return; From d73b5de5a7ae8758663ef9728b9fd64ce63acd6b Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 22 Apr 2026 17:17:36 -0700 Subject: [PATCH 04/12] Fix mixed local-distributed rolling upgrade Restore queued split-partition handoff semantics for local-to-distributed upgrades, add compatibility targets on local silos for distributed recovery APIs, refresh test-cluster gateways from active silos, and harden the rolling-upgrade regression test diagnostics and client refresh path. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../DistributedRemoteGrainDirectory.cs | 276 +++++++++++++-- .../GrainDirectoryHandoffManager.cs | 41 ++- .../GrainDirectory/LocalGrainDirectory.cs | 4 + .../LocalGrainDirectoryCompatibility.cs | 63 ++++ src/Orleans.TestingHost/TestCluster.cs | 22 +- .../GrainDirectoryRollingUpgradeTests.cs | 324 ++++++++++++++---- 6 files changed, 636 insertions(+), 94 deletions(-) create mode 100644 src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs index 7b5b27c8db9..5d5b6c5f28b 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs @@ -1,7 +1,12 @@ using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Orleans.GrainDirectory; +using Orleans.Internal; +using Orleans.Runtime.Scheduler; #nullable enable namespace Orleans.Runtime.GrainDirectory; @@ -11,10 +16,17 @@ namespace Orleans.Runtime.GrainDirectory; /// This enables silos running the old to forward directory requests to silos running the /// new during a rolling upgrade. /// -internal sealed class DistributedRemoteGrainDirectory : SystemTarget, IRemoteGrainDirectory +internal sealed partial class DistributedRemoteGrainDirectory : SystemTarget, IRemoteGrainDirectory { + private const int MaxBatchDegreeOfParallelism = 32; + private static readonly TimeSpan OperationTimeout = TimeSpan.FromSeconds(30); + private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250); + private readonly DistributedGrainDirectory _directory; + private readonly ILogger _logger; private readonly DirectoryMembershipService _membershipService; + private readonly Queue<(string Name, object State, Func Action)> _pendingOperations = new(); + private readonly AsyncLock _executorLock = new(); private DistributedRemoteGrainDirectory( DistributedGrainDirectory directory, @@ -24,6 +36,7 @@ private DistributedRemoteGrainDirectory( : base(grainType, shared) { _directory = directory; + _logger = shared.LoggerFactory.CreateLogger(); _membershipService = membershipService; shared.ActivationDirectory.RecordNewTarget(this); } @@ -54,7 +67,181 @@ private async Task EnsureDirectoryInitializedAsync(CancellationToken cancellatio } } - private CancellationTokenSource CreateTimeoutCts() => new(TimeSpan.FromSeconds(30)); + private static ParallelOptions CreateParallelOptions(CancellationToken cancellationToken) => new() + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = MaxBatchDegreeOfParallelism, + TaskScheduler = TaskScheduler.Current, + }; + + private CancellationTokenSource CreateTimeoutCts() => new(OperationTimeout); + + private CancellationTokenSource CreateTimeoutCts(CancellationToken cancellationToken) + { + var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _directory.OnStoppedToken); + cts.CancelAfter(OperationTimeout); + return cts; + } + + private async Task RunBatchOperationAsync(List values, Func operation) + { + using (var cts = CreateTimeoutCts(_directory.OnStoppedToken)) + { + await EnsureDirectoryInitializedAsync(cts.Token); + } + + var options = CreateParallelOptions(_directory.OnStoppedToken); + await Parallel.ForEachAsync(values, options, async (value, cancellationToken) => + { + using var cts = CreateTimeoutCts(cancellationToken); + await operation(value, cts.Token); + }); + } + + private IInternalGrainFactory GrainFactory => ActivationServices.GetRequiredService(); + + private ISiloStatusOracle SiloStatusOracle => ActivationServices.GetRequiredService(); + + private void EnqueueOperation(string name, object state, Func action) + { + lock (_pendingOperations) + { + _pendingOperations.Enqueue((name, state, action)); + if (_pendingOperations.Count <= 2) + { + WorkItemGroup.QueueTask(ExecutePendingOperations, this); + } + } + } + + private async Task ExecutePendingOperations() + { + using (await _executorLock.LockAsync()) + { + while (!_directory.OnStoppedToken.IsCancellationRequested) + { + (string Name, object State, Func Action) op; + lock (_pendingOperations) + { + if (_pendingOperations.Count == 0) + { + break; + } + + op = _pendingOperations.Peek(); + } + + try + { + await op.Action(this, op.State); + lock (_pendingOperations) + { + _pendingOperations.Dequeue(); + } + } + catch (Exception exception) when (!_directory.OnStoppedToken.IsCancellationRequested) + { + LogWarningOperationFailedRetry(_logger, exception, op.Name); + await Task.Delay(RetryDelay, _directory.OnStoppedToken).SuppressThrowing(); + } + } + } + } + + private void DestroyDuplicateActivations(Dictionary>? duplicates) + { + if (duplicates is null || duplicates.Count == 0) + { + return; + } + + EnqueueOperation( + nameof(DestroyDuplicateActivations), + duplicates, + static (self, state) => self.DestroyDuplicateActivationsAsync((Dictionary>)state)); + } + + private async Task DestroyDuplicateActivationsAsync(Dictionary> duplicates) + { + while (duplicates.Count > 0) + { + var pair = duplicates.First(); + if (SiloStatusOracle.GetApproximateSiloStatus(pair.Key) == SiloStatus.Active) + { + var remoteCatalog = GrainFactory.GetSystemTarget(Constants.CatalogType, pair.Key); + await remoteCatalog.DeleteActivations( + pair.Value, + DeactivationReasonCode.DuplicateActivation, + "This grain has been activated elsewhere"); + } + + duplicates.Remove(pair.Key); + } + } + + private async Task ProcessSplitPartitionRegistrationsAsync(SplitPartitionRegistrationBatch batch) + { + using (var cts = CreateTimeoutCts(_directory.OnStoppedToken)) + { + await EnsureDirectoryInitializedAsync(cts.Token); + } + + var pendingRegistrations = batch.PendingRegistrations; + var winners = new GrainAddress?[pendingRegistrations.Count]; + var failures = new Exception?[pendingRegistrations.Count]; + var options = CreateParallelOptions(_directory.OnStoppedToken); + await Parallel.ForEachAsync(Enumerable.Range(0, pendingRegistrations.Count), options, async (index, cancellationToken) => + { + try + { + using var cts = CreateTimeoutCts(cancellationToken); + winners[index] = await _directory.RegisterAsync(pendingRegistrations[index], null, cts.Token); + } + catch (Exception exception) + { + failures[index] = exception; + } + }); + + Dictionary>? duplicates = null; + Exception? failure = null; + for (var i = pendingRegistrations.Count - 1; i >= 0; i--) + { + if (failures[i] is not null) + { + failure ??= failures[i]; + continue; + } + + var registration = pendingRegistrations[i]; + var winner = winners[i]; + if (winner is null || !winner.Equals(registration)) + { + if (registration.SiloAddress is { } siloAddress) + { + if (duplicates is null || !duplicates.TryGetValue(siloAddress, out var activations)) + { + activations = []; + (duplicates ??= []).Add(siloAddress, activations); + } + + activations.Add(registration); + } + } + + pendingRegistrations.RemoveAt(i); + } + + DestroyDuplicateActivations(duplicates); + + if (failure is not null) + { + LogWarningAcceptSplitPartitionFailed(_logger, failure, Silo, pendingRegistrations.Count); + throw failure; + } + + LogInformationAcceptSplitPartitionCompleted(_logger, Silo, batch.InitialCount); + } public async Task RegisterAsync(GrainAddress address, int hopCount) { @@ -89,12 +276,7 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount) { - using var cts = CreateTimeoutCts(); - await EnsureDirectoryInitializedAsync(cts.Token); - foreach (var address in addresses) - { - await _directory.UnregisterAsync(address, cts.Token); - } + await RunBatchOperationAsync(addresses, (address, cancellationToken) => _directory.UnregisterAsync(address, cancellationToken)); } public async Task DeleteGrainAsync(GrainId grainId, int hopCount) @@ -110,35 +292,77 @@ public async Task DeleteGrainAsync(GrainId grainId, int hopCount) public async Task RegisterMany(List addresses) { - using var cts = CreateTimeoutCts(); - await EnsureDirectoryInitializedAsync(cts.Token); - foreach (var address in addresses) - { - await _directory.RegisterAsync(address, null, cts.Token); - } + await RunBatchOperationAsync(addresses, (address, cancellationToken) => _directory.RegisterAsync(address, null, cancellationToken)); } public async Task> LookUpMany(List<(GrainId GrainId, int Version)> grainAndETagList) { - using var cts = CreateTimeoutCts(); - await EnsureDirectoryInitializedAsync(cts.Token); - var result = new List(grainAndETagList.Count); - foreach (var (grainId, _) in grainAndETagList) + LogInformationLookUpManyReceived(_logger, Silo, grainAndETagList.Count); + + using (var cts = CreateTimeoutCts(_directory.OnStoppedToken)) { - var address = await _directory.LookupAsync(grainId, cts.Token); - result.Add(new(address, 0)); + await EnsureDirectoryInitializedAsync(cts.Token); } - return result; + var result = new AddressAndTag[grainAndETagList.Count]; + var options = CreateParallelOptions(_directory.OnStoppedToken); + await Parallel.ForEachAsync(Enumerable.Range(0, grainAndETagList.Count), options, async (index, cancellationToken) => + { + using var cts = CreateTimeoutCts(cancellationToken); + var address = await _directory.LookupAsync(grainAndETagList[index].GrainId, cts.Token); + result[index] = new(address, 0); + }); + + return [.. result]; } - public async Task AcceptSplitPartition(List singleActivations) + public Task AcceptSplitPartition(List singleActivations) { - using var cts = CreateTimeoutCts(); - await EnsureDirectoryInitializedAsync(cts.Token); - foreach (var address in singleActivations) + LogInformationAcceptSplitPartitionStarted(_logger, Silo, singleActivations.Count); + if (singleActivations.Count > 0) { - await _directory.RegisterAsync(address, null, cts.Token); + EnqueueOperation( + nameof(AcceptSplitPartition), + new SplitPartitionRegistrationBatch([.. singleActivations]), + static (self, state) => self.ProcessSplitPartitionRegistrationsAsync((SplitPartitionRegistrationBatch)state)); } + + return Task.CompletedTask; + } + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Rolling upgrade diagnostic: silo {Silo} received LookUpMany for {Count} entries." + )] + private static partial void LogInformationLookUpManyReceived(ILogger logger, SiloAddress silo, int count); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Rolling upgrade diagnostic: silo {Silo} accepted split-partition handoff for {Count} registrations." + )] + private static partial void LogInformationAcceptSplitPartitionStarted(ILogger logger, SiloAddress silo, int count); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Rolling upgrade diagnostic: silo {Silo} completed split-partition handoff for {Count} registrations." + )] + private static partial void LogInformationAcceptSplitPartitionCompleted(ILogger logger, SiloAddress silo, int count); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Rolling upgrade diagnostic: silo {Silo} failed split-partition handoff for {Count} registrations." + )] + private static partial void LogWarningAcceptSplitPartitionFailed(ILogger logger, Exception exception, SiloAddress silo, int count); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Rolling upgrade compatibility operation {Operation} failed and will be retried." + )] + private static partial void LogWarningOperationFailedRetry(ILogger logger, Exception exception, string operation); + + private sealed class SplitPartitionRegistrationBatch(List pendingRegistrations) + { + public int InitialCount { get; } = pendingRegistrations.Count; + public List PendingRegistrations { get; } = pendingRegistrations; } } diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index 0ad5ef505dd..2f8d58c5e41 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -77,9 +77,22 @@ private async Task ProcessAddedSiloAsync(SiloAddress addedSilo, List 0) { LogDebugSendingEntries(logger, splitPartListSingle.Count, addedSilo); + LogInformationSendingSplitPartition(logger, splitPartListSingle.Count, addedSilo); } - await localDirectory.GetDirectoryReference(addedSilo).AcceptSplitPartition(splitPartListSingle); + try + { + await localDirectory.GetDirectoryReference(addedSilo).AcceptSplitPartition(splitPartListSingle); + if (splitPartListSingle.Count > 0) + { + LogInformationCompletedSplitPartition(logger, splitPartListSingle.Count, addedSilo); + } + } + catch (Exception exception) + { + LogWarningSplitPartitionFailed(logger, exception, splitPartListSingle.Count, addedSilo); + throw; + } } else { @@ -95,6 +108,8 @@ private async Task ProcessAddedSiloAsync(SiloAddress addedSilo, List() is not null; RemoteGrainDirectory = new RemoteGrainDirectory(this, Constants.DirectoryServiceType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); CacheValidator = new RemoteGrainDirectory(this, Constants.DirectoryCacheValidatorType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); + DistributedGrainDirectoryClientCompatibility = distributedDirectoryActive ? null : new LocalGrainDirectoryClientCompatibility(this, systemTargetShared); + DistributedGrainDirectoryPartitionCompatibility = distributedDirectoryActive ? null : new LocalGrainDirectoryPartitionCompatibility(this, systemTargetShared); // add myself to the list of members AddServer(MyAddress); diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs new file mode 100644 index 00000000000..7627f286d01 --- /dev/null +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs @@ -0,0 +1,63 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.Concurrency; +using Orleans.GrainDirectory; + +namespace Orleans.Runtime.GrainDirectory; + +/// +/// Compatibility system targets which allow distributed-directory silos to interact with local-directory silos +/// during rolling upgrades. +/// +internal sealed class LocalGrainDirectoryClientCompatibility : SystemTarget, IGrainDirectoryClient +{ + private readonly LocalGrainDirectory _directory; + + internal LocalGrainDirectoryClientCompatibility(LocalGrainDirectory directory, SystemTargetShared shared) + : base(Constants.GrainDirectoryType, shared) + { + _directory = directory; + shared.ActivationDirectory.RecordNewTarget(this); + } + + public ValueTask>> GetRegisteredActivations(MembershipVersion membershipVersion, RingRange range, bool isValidation) + => new(_directory.DirectoryPartition.Split(range.Contains).AsImmutable()); + + public ValueTask>> RecoverRegisteredActivations(MembershipVersion membershipVersion, RingRange range, SiloAddress siloAddress, int partitionId) + => GetRegisteredActivations(membershipVersion, range, isValidation: false); +} + +internal sealed class LocalGrainDirectoryPartitionCompatibility : SystemTarget, IGrainDirectoryPartition +{ + private readonly LocalGrainDirectory _directory; + + internal LocalGrainDirectoryPartitionCompatibility(LocalGrainDirectory directory, SystemTargetShared shared) + : base(GrainDirectoryPartition.CreateGrainId(shared.SiloAddress, 0), shared) + { + _directory = directory; + shared.ActivationDirectory.RecordNewTarget(this); + } + + public async ValueTask> RegisterAsync(MembershipVersion version, GrainAddress address, GrainAddress? currentRegistration) + { + var result = await _directory.RegisterAsync(address, currentRegistration, hopCount: 1); + return DirectoryResult.FromResult(result.Address!, version); + } + + public async ValueTask> LookupAsync(MembershipVersion version, GrainId grainId) + { + var result = await _directory.LookupAsync(grainId, hopCount: 1); + return DirectoryResult.FromResult(result.Address, version); + } + + public async ValueTask> DeregisterAsync(MembershipVersion version, GrainAddress address) + { + await _directory.UnregisterAsync(address, UnregistrationCause.Force, hopCount: 1); + return DirectoryResult.FromResult(true, version); + } + + public ValueTask GetSnapshotAsync(MembershipVersion version, MembershipVersion rangeVersion, RingRange range) + => new(new GrainDirectoryPartitionSnapshot(rangeVersion, _directory.DirectoryPartition.Split(range.Contains))); + + public ValueTask AcknowledgeSnapshotTransferAsync(SiloAddress silo, int partitionIndex, MembershipVersion version) => new(true); +} diff --git a/src/Orleans.TestingHost/TestCluster.cs b/src/Orleans.TestingHost/TestCluster.cs index db62f0dc6aa..98b2beb0498 100644 --- a/src/Orleans.TestingHost/TestCluster.cs +++ b/src/Orleans.TestingHost/TestCluster.cs @@ -707,14 +707,22 @@ public async Task InitializeClientAsync() } if (options.UseTestClusterMembership) { - var gateways = new List(); - if (options.GatewayPerSilo) - { - gateways.AddRange(Enumerable.Range(options.BaseGatewayPort, options.InitialSilosCount).Select(port => new IPEndPoint(IPAddress.Loopback, port))); - } - else + var gateways = Silos + .Where(silo => silo.IsActive && silo.GatewayAddress?.Endpoint is { Port: > 0 } ) + .Select(silo => new IPEndPoint(silo.GatewayAddress.Endpoint.Address, silo.GatewayAddress.Endpoint.Port)) + .Distinct() + .ToList(); + + if (gateways.Count == 0) { - gateways.Add(new IPEndPoint(IPAddress.Loopback, options.BaseGatewayPort)); + if (options.GatewayPerSilo) + { + gateways.AddRange(Enumerable.Range(options.BaseGatewayPort, options.InitialSilosCount).Select(port => new IPEndPoint(IPAddress.Loopback, port))); + } + else + { + gateways.Add(new IPEndPoint(IPAddress.Loopback, options.BaseGatewayPort)); + } } var clustering = new Dictionary(); diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs index 48c5017265f..6b0af9d5048 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -1,9 +1,13 @@ #nullable enable using System.Collections.Concurrent; +using System.Globalization; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Orleans; using Orleans.GrainDirectory; +using Orleans.Hosting; using Orleans.Runtime; using Orleans.Runtime.GrainDirectory; using Orleans.TestingHost; @@ -37,67 +41,92 @@ public RollingUpgradeTestGrain(ILocalSiloDetails siloDetails) [TestCategory("Directory"), TestCategory("Functional")] public sealed class GrainDirectoryRollingUpgradeTests(ITestOutputHelper output) { - /// - /// Controls whether newly started silos enable the . - /// - internal static volatile bool UseDistributedDirectory; - [Fact] public async Task RollingUpgrade_LocalToDistributed_NoErrors() { - UseDistributedDirectory = false; - var errorLogs = new ConcurrentBag(); - ErrorLogCaptureSiloConfigurator.Errors = errorLogs; - var builder = new TestClusterBuilder(3); // Remove the default DistributedGrainDirectory configurator — initial silos use LocalGrainDirectory only. builder.Options.SiloBuilderConfiguratorTypes.RemoveAll( t => t.Contains(nameof(ConfigureDistributedGrainDirectory), StringComparison.Ordinal)); builder.AddSiloBuilderConfigurator(); builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); var cluster = builder.Build(); - await cluster.DeployAsync(); - output.WriteLine($"Cluster deployed with {cluster.Silos.Count} silos (LocalGrainDirectory only)."); + var errorLogs = ErrorLogCaptureRegistry.Get(cluster.Options.ClusterId); + var diagnosticLogs = DiagnosticLogCaptureRegistry.Get(cluster.Options.ClusterId); + long? failingGrainKey = null; - var client = cluster.Client; - var grainId = 0L; - var nextGrainId = () => Interlocked.Increment(ref grainId); + try + { + await cluster.DeployAsync(); + output.WriteLine($"Cluster deployed with {cluster.Silos.Count} silos (LocalGrainDirectory only)."); - // Phase 1: Drive load on the LocalGrainDirectory cluster. - output.WriteLine("Phase 1: Driving load on LocalGrainDirectory cluster..."); - await DriveLoad(client, nextGrainId, count: 100); + IGrainFactory client = cluster.Client; + var grainId = 0L; + var nextGrainId = () => Interlocked.Increment(ref grainId); - // Phase 2: Add DistributedGrainDirectory silos one at a time. - output.WriteLine("Phase 2: Rolling upgrade — adding DistributedGrainDirectory silos..."); - UseDistributedDirectory = true; + try + { + // Phase 1: Drive load on the LocalGrainDirectory cluster. + output.WriteLine("Phase 1: Driving load on LocalGrainDirectory cluster..."); + await DriveLoad(client, nextGrainId, count: 100, id => failingGrainKey = id); - var oldSilos = cluster.Silos.ToList(); + // Phase 2: Add DistributedGrainDirectory silos one at a time. + output.WriteLine("Phase 2: Rolling upgrade — adding DistributedGrainDirectory silos..."); - for (var i = 0; i < oldSilos.Count; i++) - { - var newSilo = await cluster.StartAdditionalSiloAsync(); - output.WriteLine($" Started new silo: {newSilo.SiloAddress}"); - await cluster.WaitForLivenessToStabilizeAsync(); - await DriveLoad(client, nextGrainId, count: 100); - } + var oldSilos = cluster.Silos.ToList(); + + for (var i = 0; i < oldSilos.Count; i++) + { + var newSilo = await cluster.StartAdditionalSiloAsync(); + output.WriteLine($" Started new silo: {newSilo.SiloAddress}"); + await cluster.WaitForLivenessToStabilizeAsync(); + await DriveLoad(client, nextGrainId, count: 100, id => failingGrainKey = id); + } + + await cluster.InitializeClientAsync(); + client = cluster.Client; + + // Phase 3: Stop old silos one at a time, non-primary first. + output.WriteLine($"Phase 3: Removing {oldSilos.Count} old LocalGrainDirectory silos..."); + foreach (var oldSilo in oldSilos.OrderBy(s => s == cluster.Primary ? 1 : 0)) + { + await cluster.StopSiloAsync(oldSilo); + output.WriteLine($" Stopped old silo: {oldSilo.SiloAddress}"); + await cluster.WaitForLivenessToStabilizeAsync(); + await DriveLoad(client, nextGrainId, count: 100, id => failingGrainKey = id); + } - // Phase 3: Stop old silos one at a time, non-primary first. - output.WriteLine($"Phase 3: Removing {oldSilos.Count} old LocalGrainDirectory silos..."); - foreach (var oldSilo in oldSilos.OrderBy(s => s == cluster.Primary ? 1 : 0)) + // Phase 4: Final verification on the fully-upgraded cluster — must succeed without retries. + output.WriteLine("Phase 4: Verifying fully-upgraded DistributedGrainDirectory cluster..."); + await DriveLoad(client, nextGrainId, count: 200, id => failingGrainKey = id); + } + catch + { + await DumpFailureDiagnosticsAsync(cluster, errorLogs, diagnosticLogs, failingGrainKey); + throw; + } + } + finally { - await cluster.StopSiloAsync(oldSilo); - output.WriteLine($" Stopped old silo: {oldSilo.SiloAddress}"); - await cluster.WaitForLivenessToStabilizeAsync(); - await DriveLoad(client, nextGrainId, count: 100); + try + { + await cluster.StopAllSilosAsync(); + await cluster.DisposeAsync(); + } + finally + { + ErrorLogCaptureRegistry.Remove(cluster.Options.ClusterId); + DiagnosticLogCaptureRegistry.Remove(cluster.Options.ClusterId); + } } - // Phase 4: Final verification on the fully-upgraded cluster — must succeed without retries. - output.WriteLine("Phase 4: Verifying fully-upgraded DistributedGrainDirectory cluster..."); - await DriveLoad(client, nextGrainId, count: 200); - // Assert no error-level logs occurred. - var errors = errorLogs.ToArray(); + var errors = errorLogs + .ToArray() + .Where(static error => !IsExpectedClientRoutingTableCancellation(error)) + .ToArray(); if (errors.Length > 0) { output.WriteLine($"ERROR LOGS ({errors.Length}):"); @@ -107,17 +136,18 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() } } - await cluster.StopAllSilosAsync(); - await cluster.DisposeAsync(); - Assert.Empty(errors); } + private static bool IsExpectedClientRoutingTableCancellation(string error) => + error.StartsWith("[Orleans.Runtime.GrainDirectory.ClientDirectory] Exception publishing client routing table", StringComparison.Ordinal) + && error.Contains("TaskCanceledException: A task was canceled.", StringComparison.Ordinal); + /// /// Activates grains by calling each one. Retries individual calls that fail with transient /// exceptions expected during directory ownership transitions in a rolling upgrade. /// - private async Task DriveLoad(IGrainFactory client, Func nextGrainId, int count) + private async Task DriveLoad(IGrainFactory client, Func nextGrainId, int count, Action? onPersistentFailure = null) { var ids = new long[count]; for (var i = 0; i < count; i++) @@ -151,45 +181,151 @@ private async Task DriveLoad(IGrainFactory client, Func nextGrainId, int c // Retry failed calls one at a time. foreach (var id in failedIds) { - await client.GetGrain(id).GetHost(); + try + { + await client.GetGrain(id).GetHost(); + } + catch + { + onPersistentFailure?.Invoke(id); + throw; + } } } - private class RollingUpgradeSiloConfigurator : ISiloConfigurator + private async Task DumpFailureDiagnosticsAsync(TestCluster cluster, ErrorLogCapture errorLogs, DiagnosticLogCapture diagnosticLogs, long? failingGrainKey) { - public void Configure(ISiloBuilder siloBuilder) + DumpCapturedMessages("ERROR LOGS", errorLogs.ToArray()); + DumpCapturedMessages("ROLLING UPGRADE DIAGNOSTICS", diagnosticLogs.ToArray(), limit: 200); + + if (failingGrainKey is not long grainKey) { - if (UseDistributedDirectory) + return; + } + + var grain = cluster.Client.GetGrain(grainKey); + var grainId = grain.GetGrainId(); + output.WriteLine($"DETAILED GRAIN REPORTS for failing grain key {grainKey} ({grainId}):"); + foreach (var silo in cluster.Silos) + { + try { + var siloControl = cluster.InternalGrainFactory.GetSystemTarget(Constants.SiloControlType, silo.SiloAddress); + var report = await siloControl.GetDetailedGrainReport(grainId); + output.WriteLine(report.ToString()); + } + catch (Exception exception) + { + output.WriteLine($"Failed to get detailed grain report from silo {silo.SiloAddress}: {exception}"); + } + } + + output.WriteLine("LIKELY RESOLUTION PLAN:"); + output.WriteLine(" 1. Preserve RemoteGrainDirectory.AcceptSplitPartition semantics in DistributedRemoteGrainDirectory."); + output.WriteLine(" 2. Queue split-partition registration work instead of awaiting the full transfer inline."); + output.WriteLine(" 3. Retry failed registrations and handle duplicate activations before removing sender-side entries."); + } + + private void DumpCapturedMessages(string title, string[] messages, int limit = 50) + { + if (messages.Length == 0) + { + return; + } + + output.WriteLine($"{title} ({messages.Length}):"); + foreach (var message in messages.Take(limit)) + { + output.WriteLine($" {message}"); + } + + if (messages.Length > limit) + { + output.WriteLine($" ... truncated to first {limit} entries."); + } + } + + private static bool ShouldUseDistributedDirectory(IConfiguration configuration) + { + var initialSiloCountText = configuration[nameof(TestClusterOptions.InitialSilosCount)] + ?? throw new InvalidOperationException($"Missing {nameof(TestClusterOptions.InitialSilosCount)} configuration."); + var initialSiloCount = int.Parse(initialSiloCountText, CultureInfo.InvariantCulture); + return GetSiloInstanceNumber(configuration) >= initialSiloCount; + } + + private static int GetSiloInstanceNumber(IConfiguration configuration) + { + var siloName = configuration["Orleans:Name"] + ?? throw new InvalidOperationException("Missing Orleans:Name configuration."); + if (string.Equals(siloName, Silo.PrimarySiloName, StringComparison.Ordinal)) + { + return 0; + } + + const string secondaryPrefix = "Secondary_"; + if (siloName.StartsWith(secondaryPrefix, StringComparison.Ordinal) + && int.TryParse(siloName.AsSpan(secondaryPrefix.Length), NumberStyles.None, CultureInfo.InvariantCulture, out var instanceNumber)) + { + return instanceNumber; + } + + throw new InvalidOperationException($"Unexpected silo name '{siloName}'."); + } + + private sealed class RollingUpgradeSiloConfigurator : IHostConfigurator + { + public void Configure(IHostBuilder hostBuilder) + { + if (!ShouldUseDistributedDirectory(hostBuilder.GetConfiguration())) + { + return; + } + #pragma warning disable ORLEANSEXP003 - siloBuilder.AddDistributedGrainDirectory(); + hostBuilder.UseOrleans(static (_, siloBuilder) => siloBuilder.AddDistributedGrainDirectory()); #pragma warning restore ORLEANSEXP003 - } } } - private class ErrorLogCaptureSiloConfigurator : IHostConfigurator + private sealed class ErrorLogCaptureSiloConfigurator : IHostConfigurator { - internal static ConcurrentBag? Errors; + public void Configure(IHostBuilder hostBuilder) + { + var clusterId = hostBuilder.GetConfiguration()["Orleans:ClusterId"] + ?? throw new InvalidOperationException("Missing Orleans:ClusterId configuration."); + hostBuilder.ConfigureServices(services => + { + services.AddSingleton(ErrorLogCaptureRegistry.Get(clusterId)); + services.AddSingleton(); + }); + } + } + private sealed class RollingUpgradeDiagnosticCaptureSiloConfigurator : IHostConfigurator + { public void Configure(IHostBuilder hostBuilder) { + var clusterId = hostBuilder.GetConfiguration()["Orleans:ClusterId"] + ?? throw new InvalidOperationException("Missing Orleans:ClusterId configuration."); + hostBuilder.ConfigureLogging(logging => + { + logging.AddFilter(typeof(DistributedRemoteGrainDirectory).FullName, LogLevel.Information); + logging.AddFilter(typeof(GrainDirectoryHandoffManager).FullName, LogLevel.Information); + }); hostBuilder.ConfigureServices(services => { - if (Errors is { } errors) - { - services.AddSingleton(new ErrorCapturingLoggerProvider(errors)); - } + services.AddSingleton(DiagnosticLogCaptureRegistry.Get(clusterId)); + services.AddSingleton(); }); } } - private sealed class ErrorCapturingLoggerProvider(ConcurrentBag errors) : ILoggerProvider + private sealed class ErrorCapturingLoggerProvider(ErrorLogCapture errorLogs) : ILoggerProvider { - public ILogger CreateLogger(string categoryName) => new ErrorCapturingLogger(categoryName, errors); + public ILogger CreateLogger(string categoryName) => new ErrorCapturingLogger(categoryName, errorLogs); public void Dispose() { } - private sealed class ErrorCapturingLogger(string category, ConcurrentBag errors) : ILogger + private sealed class ErrorCapturingLogger(string category, ErrorLogCapture errorLogs) : ILogger { public IDisposable? BeginScope(TState state) where TState : notnull => null; public bool IsEnabled(LogLevel logLevel) => logLevel >= LogLevel.Error; @@ -210,9 +346,77 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except message += $"\n Exception: {exception.GetType().Name}: {exception.Message}"; } - errors.Add(message); + errorLogs.Add(message); + } + } + } + } + + private sealed class ErrorLogCapture + { + private readonly ConcurrentQueue _errors = new(); + + public void Add(string message) => _errors.Enqueue(message); + + public string[] ToArray() => _errors.ToArray(); + } + + private static class ErrorLogCaptureRegistry + { + private static readonly ConcurrentDictionary ErrorsByCluster = new(StringComparer.Ordinal); + + public static ErrorLogCapture Get(string clusterId) => ErrorsByCluster.GetOrAdd(clusterId, static _ => new()); + + public static void Remove(string clusterId) => ErrorsByCluster.TryRemove(clusterId, out _); + } + + private sealed class DiagnosticCapturingLoggerProvider(DiagnosticLogCapture diagnostics) : ILoggerProvider + { + public ILogger CreateLogger(string categoryName) => new DiagnosticCapturingLogger(categoryName, diagnostics); + public void Dispose() { } + + private sealed class DiagnosticCapturingLogger(string category, DiagnosticLogCapture diagnostics) : ILogger + { + public IDisposable? BeginScope(TState state) where TState : notnull => null; + + public bool IsEnabled(LogLevel logLevel) => + logLevel >= LogLevel.Information + && (string.Equals(category, typeof(DistributedRemoteGrainDirectory).FullName, StringComparison.Ordinal) + || string.Equals(category, typeof(GrainDirectoryHandoffManager).FullName, StringComparison.Ordinal)); + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + if (!IsEnabled(logLevel)) + { + return; + } + + var message = $"[{category}] {formatter(state, exception)}"; + if (exception is not null) + { + message += $"\n Exception: {exception.GetType().Name}: {exception.Message}"; } + + diagnostics.Add(message); } } } + + private sealed class DiagnosticLogCapture + { + private readonly ConcurrentQueue _messages = new(); + + public void Add(string message) => _messages.Enqueue(message); + + public string[] ToArray() => _messages.ToArray(); + } + + private static class DiagnosticLogCaptureRegistry + { + private static readonly ConcurrentDictionary DiagnosticsByCluster = new(StringComparer.Ordinal); + + public static DiagnosticLogCapture Get(string clusterId) => DiagnosticsByCluster.GetOrAdd(clusterId, static _ => new()); + + public static void Remove(string clusterId) => DiagnosticsByCluster.TryRemove(clusterId, out _); + } } From 641aa19ca56e12c39a90866b9bc394fce9475210 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 4 May 2026 07:56:38 -0700 Subject: [PATCH 05/12] Use in-process builder for rolling upgrade test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectoryRollingUpgradeTests.cs | 100 +++++++----------- 1 file changed, 38 insertions(+), 62 deletions(-) diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs index 6b0af9d5048..1e1d772a4a7 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -1,7 +1,6 @@ #nullable enable using System.Collections.Concurrent; using System.Globalization; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -44,13 +43,24 @@ public sealed class GrainDirectoryRollingUpgradeTests(ITestOutputHelper output) [Fact] public async Task RollingUpgrade_LocalToDistributed_NoErrors() { - var builder = new TestClusterBuilder(3); - // Remove the default DistributedGrainDirectory configurator — initial silos use LocalGrainDirectory only. - builder.Options.SiloBuilderConfiguratorTypes.RemoveAll( - t => t.Contains(nameof(ConfigureDistributedGrainDirectory), StringComparison.Ordinal)); - builder.AddSiloBuilderConfigurator(); - builder.AddSiloBuilderConfigurator(); - builder.AddSiloBuilderConfigurator(); + var builder = new InProcessTestClusterBuilder(3); + // Initial silos use LocalGrainDirectory only; later silos opt into DistributedGrainDirectory. + builder.Options.UseTestClusterGrainDirectory = false; + var initialSiloCount = builder.Options.InitialSilosCount; + var clusterId = builder.Options.ClusterId; + builder.ConfigureSilo((siloOptions, siloBuilder) => + { + if (!ShouldUseDistributedDirectory(siloOptions.SiloName, initialSiloCount)) + { + return; + } + +#pragma warning disable ORLEANSEXP003 + siloBuilder.AddDistributedGrainDirectory(); +#pragma warning restore ORLEANSEXP003 + }); + builder.ConfigureSiloHost((_, hostBuilder) => ConfigureErrorLogCapture(hostBuilder, clusterId)); + builder.ConfigureSiloHost((_, hostBuilder) => ConfigureRollingUpgradeDiagnosticCapture(hostBuilder, clusterId)); var cluster = builder.Build(); var errorLogs = ErrorLogCaptureRegistry.Get(cluster.Options.ClusterId); @@ -90,7 +100,7 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() // Phase 3: Stop old silos one at a time, non-primary first. output.WriteLine($"Phase 3: Removing {oldSilos.Count} old LocalGrainDirectory silos..."); - foreach (var oldSilo in oldSilos.OrderBy(s => s == cluster.Primary ? 1 : 0)) + foreach (var oldSilo in oldSilos.OrderBy(static s => s.InstanceNumber == 0 ? 1 : 0)) { await cluster.StopSiloAsync(oldSilo); output.WriteLine($" Stopped old silo: {oldSilo.SiloAddress}"); @@ -193,7 +203,7 @@ private async Task DriveLoad(IGrainFactory client, Func nextGrainId, int c } } - private async Task DumpFailureDiagnosticsAsync(TestCluster cluster, ErrorLogCapture errorLogs, DiagnosticLogCapture diagnosticLogs, long? failingGrainKey) + private async Task DumpFailureDiagnosticsAsync(InProcessTestCluster cluster, ErrorLogCapture errorLogs, DiagnosticLogCapture diagnosticLogs, long? failingGrainKey) { DumpCapturedMessages("ERROR LOGS", errorLogs.ToArray()); DumpCapturedMessages("ROLLING UPGRADE DIAGNOSTICS", diagnosticLogs.ToArray(), limit: 200); @@ -210,7 +220,7 @@ private async Task DumpFailureDiagnosticsAsync(TestCluster cluster, ErrorLogCapt { try { - var siloControl = cluster.InternalGrainFactory.GetSystemTarget(Constants.SiloControlType, silo.SiloAddress); + var siloControl = cluster.InternalClient.GetSystemTarget(Constants.SiloControlType, silo.SiloAddress); var report = await siloControl.GetDetailedGrainReport(grainId); output.WriteLine(report.ToString()); } @@ -245,18 +255,11 @@ private void DumpCapturedMessages(string title, string[] messages, int limit = 5 } } - private static bool ShouldUseDistributedDirectory(IConfiguration configuration) - { - var initialSiloCountText = configuration[nameof(TestClusterOptions.InitialSilosCount)] - ?? throw new InvalidOperationException($"Missing {nameof(TestClusterOptions.InitialSilosCount)} configuration."); - var initialSiloCount = int.Parse(initialSiloCountText, CultureInfo.InvariantCulture); - return GetSiloInstanceNumber(configuration) >= initialSiloCount; - } + private static bool ShouldUseDistributedDirectory(string siloName, int initialSiloCount) => + GetSiloInstanceNumber(siloName) >= initialSiloCount; - private static int GetSiloInstanceNumber(IConfiguration configuration) + private static int GetSiloInstanceNumber(string siloName) { - var siloName = configuration["Orleans:Name"] - ?? throw new InvalidOperationException("Missing Orleans:Name configuration."); if (string.Equals(siloName, Silo.PrimarySiloName, StringComparison.Ordinal)) { return 0; @@ -269,55 +272,28 @@ private static int GetSiloInstanceNumber(IConfiguration configuration) return instanceNumber; } - throw new InvalidOperationException($"Unexpected silo name '{siloName}'."); - } - - private sealed class RollingUpgradeSiloConfigurator : IHostConfigurator - { - public void Configure(IHostBuilder hostBuilder) + const string inProcessSiloPrefix = "Silo_"; + if (siloName.StartsWith(inProcessSiloPrefix, StringComparison.Ordinal) + && int.TryParse(siloName.AsSpan(inProcessSiloPrefix.Length), NumberStyles.None, CultureInfo.InvariantCulture, out instanceNumber)) { - if (!ShouldUseDistributedDirectory(hostBuilder.GetConfiguration())) - { - return; - } - -#pragma warning disable ORLEANSEXP003 - hostBuilder.UseOrleans(static (_, siloBuilder) => siloBuilder.AddDistributedGrainDirectory()); -#pragma warning restore ORLEANSEXP003 + return instanceNumber; } + + throw new InvalidOperationException($"Unexpected silo name '{siloName}'."); } - private sealed class ErrorLogCaptureSiloConfigurator : IHostConfigurator + private static void ConfigureErrorLogCapture(IHostApplicationBuilder hostBuilder, string clusterId) { - public void Configure(IHostBuilder hostBuilder) - { - var clusterId = hostBuilder.GetConfiguration()["Orleans:ClusterId"] - ?? throw new InvalidOperationException("Missing Orleans:ClusterId configuration."); - hostBuilder.ConfigureServices(services => - { - services.AddSingleton(ErrorLogCaptureRegistry.Get(clusterId)); - services.AddSingleton(); - }); - } + hostBuilder.Services.AddSingleton(ErrorLogCaptureRegistry.Get(clusterId)); + hostBuilder.Services.AddSingleton(); } - private sealed class RollingUpgradeDiagnosticCaptureSiloConfigurator : IHostConfigurator + private static void ConfigureRollingUpgradeDiagnosticCapture(IHostApplicationBuilder hostBuilder, string clusterId) { - public void Configure(IHostBuilder hostBuilder) - { - var clusterId = hostBuilder.GetConfiguration()["Orleans:ClusterId"] - ?? throw new InvalidOperationException("Missing Orleans:ClusterId configuration."); - hostBuilder.ConfigureLogging(logging => - { - logging.AddFilter(typeof(DistributedRemoteGrainDirectory).FullName, LogLevel.Information); - logging.AddFilter(typeof(GrainDirectoryHandoffManager).FullName, LogLevel.Information); - }); - hostBuilder.ConfigureServices(services => - { - services.AddSingleton(DiagnosticLogCaptureRegistry.Get(clusterId)); - services.AddSingleton(); - }); - } + hostBuilder.Logging.AddFilter(typeof(DistributedRemoteGrainDirectory).FullName, LogLevel.Information); + hostBuilder.Logging.AddFilter(typeof(GrainDirectoryHandoffManager).FullName, LogLevel.Information); + hostBuilder.Services.AddSingleton(DiagnosticLogCaptureRegistry.Get(clusterId)); + hostBuilder.Services.AddSingleton(); } private sealed class ErrorCapturingLoggerProvider(ErrorLogCapture errorLogs) : ILoggerProvider From 2163234c1278902d5466a5aa031814f9d726ceda Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 5 May 2026 09:19:58 -0700 Subject: [PATCH 06/12] Fix default params Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/DirectoryMembershipService.cs | 6 +++--- src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs | 8 +++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs index fba01f5744a..bc62655a255 100644 --- a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs +++ b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs @@ -49,11 +49,11 @@ public DirectoryMembershipService( ClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, ILogger logger, - int partitionsPerSilo = 1, - Func? getRingBoundaries = null) + int partitionsPerSilo, + Func getRingBoundaries) { _partitionsPerSilo = partitionsPerSilo; - _getRingBoundaries = getRingBoundaries ?? DirectoryMembershipSnapshot.DefaultGetRingBoundaries; + _getRingBoundaries = getRingBoundaries; _viewUpdates = new( DirectoryMembershipSnapshot.Default, (previous, proposed) => proposed.Version >= previous.Version, diff --git a/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs b/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs index 463b353c004..7fe18022c39 100644 --- a/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs +++ b/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs @@ -4,6 +4,7 @@ using System.Net; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.Configuration.Internal; @@ -162,7 +163,12 @@ public static ISiloBuilder AddDistributedGrainDirectory(this ISiloBuilder siloBu } // Distributed Grain Directory - services.TryAddSingleton(); + services.TryAddSingleton(static sp => new DirectoryMembershipService( + sp.GetRequiredService(), + sp.GetRequiredService(), + sp.GetRequiredService>(), + DirectoryMembershipSnapshot.PartitionsPerSilo, + DirectoryMembershipSnapshot.DefaultGetRingBoundaries)); if (!services.Contains(DirectoryDescriptor)) { services.Add(DirectoryDescriptor); From 61e9738689a11797ad7a40edcd2b88c4478cb623 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 5 May 2026 10:54:28 -0700 Subject: [PATCH 07/12] Add grain directory partition option Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Options/GrainDirectoryOptions.cs | 13 +++++++ .../DirectoryMembershipService.cs | 1 + .../DirectoryMembershipSnapshot.cs | 2 +- .../Hosting/CoreHostingExtensions.cs | 3 +- src/api/Orleans.Runtime/Orleans.Runtime.cs | 3 ++ .../GrainDirectoryOptionsTests.cs | 39 +++++++++++++++++++ 6 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryOptionsTests.cs diff --git a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs index 8582ff82f8a..975ee407f9d 100644 --- a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs +++ b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs @@ -46,6 +46,19 @@ public enum CachingStrategyType /// public const int DEFAULT_CACHE_SIZE = 1_000_000; + /// + /// Gets or sets the number of directory partitions per silo. + /// + /// + /// This option only applies when using the . + /// + public int PartitionsPerSilo { get; set; } = DEFAULT_PARTITIONS_PER_SILO; + + /// + /// The default value for . + /// + public const int DEFAULT_PARTITIONS_PER_SILO = ConsistentRingOptions.DEFAULT_NUM_VIRTUAL_RING_BUCKETS; + /// /// Gets or sets the initial (minimum) time, in seconds, to keep a cache entry before revalidating. /// diff --git a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs index bc62655a255..ed5e388bee0 100644 --- a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs +++ b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs @@ -52,6 +52,7 @@ public DirectoryMembershipService( int partitionsPerSilo, Func getRingBoundaries) { + ArgumentOutOfRangeException.ThrowIfLessThan(partitionsPerSilo, 1); _partitionsPerSilo = partitionsPerSilo; _getRingBoundaries = getRingBoundaries; _viewUpdates = new( diff --git a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs index 9a865bfe702..bd2e8cda8f6 100644 --- a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs +++ b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs @@ -14,7 +14,7 @@ namespace Orleans.Runtime.GrainDirectory; internal sealed class DirectoryMembershipSnapshot { - internal const int PartitionsPerSilo = ConsistentRingOptions.DEFAULT_NUM_VIRTUAL_RING_BUCKETS; + internal const int PartitionsPerSilo = GrainDirectoryOptions.DEFAULT_PARTITIONS_PER_SILO; /// /// The default hash function for directory ring boundaries, matching the partitioning scheme. diff --git a/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs b/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs index 7fe18022c39..d203197650f 100644 --- a/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs +++ b/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs @@ -163,11 +163,12 @@ public static ISiloBuilder AddDistributedGrainDirectory(this ISiloBuilder siloBu } // Distributed Grain Directory + services.AddOptions(); services.TryAddSingleton(static sp => new DirectoryMembershipService( sp.GetRequiredService(), sp.GetRequiredService(), sp.GetRequiredService>(), - DirectoryMembershipSnapshot.PartitionsPerSilo, + sp.GetRequiredService>().Value.PartitionsPerSilo, DirectoryMembershipSnapshot.DefaultGetRingBoundaries)); if (!services.Contains(DirectoryDescriptor)) { diff --git a/src/api/Orleans.Runtime/Orleans.Runtime.cs b/src/api/Orleans.Runtime/Orleans.Runtime.cs index 9eb66abe2cb..f5f6af12416 100644 --- a/src/api/Orleans.Runtime/Orleans.Runtime.cs +++ b/src/api/Orleans.Runtime/Orleans.Runtime.cs @@ -170,6 +170,7 @@ public partial class GrainDirectoryOptions { public const int DEFAULT_CACHE_SIZE = 1000000; public const CachingStrategyType DEFAULT_CACHING_STRATEGY = 1; + public const int DEFAULT_PARTITIONS_PER_SILO = 30; [System.Obsolete("DEFAULT_INITIAL_CACHE_TTL is deprecated and will be removed in a future version.")] public static readonly System.TimeSpan DEFAULT_INITIAL_CACHE_TTL; [System.Obsolete("DEFAULT_MAXIMUM_CACHE_TTL is deprecated and will be removed in a future version.")] @@ -191,6 +192,8 @@ public partial class GrainDirectoryOptions public System.TimeSpan MaximumCacheTTL { get { throw null; } set { } } + public int PartitionsPerSilo { get { throw null; } set { } } + public enum CachingStrategyType { None = 0, diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryOptionsTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryOptionsTests.cs new file mode 100644 index 00000000000..210dc5517c6 --- /dev/null +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryOptionsTests.cs @@ -0,0 +1,39 @@ +#nullable enable +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.Hosting; +using Orleans.Runtime.GrainDirectory; +using Orleans.TestingHost; +using Xunit; + +namespace UnitTests.GrainDirectory; + +[TestCategory("BVT"), TestCategory("Directory")] +public sealed class GrainDirectoryOptionsTests +{ + [Fact] + public async Task PartitionsPerSilo_IsConfigurable() + { + var builder = new InProcessTestClusterBuilder(1); + builder.ConfigureSilo((_, siloBuilder) => + { +#pragma warning disable ORLEANSEXP003 + siloBuilder.Configure(options => options.PartitionsPerSilo = 3); + siloBuilder.AddDistributedGrainDirectory(); +#pragma warning restore ORLEANSEXP003 + }); + + var cluster = builder.Build(); + try + { + await cluster.DeployAsync(); + var membershipService = cluster.Silos[0].ServiceProvider.GetRequiredService(); + + Assert.Equal(3, membershipService.PartitionsPerSilo); + } + finally + { + await cluster.DisposeAsync(); + } + } +} From 221c0717fd20a308dcfa7692da18a4b76a186f49 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 5 May 2026 11:30:14 -0700 Subject: [PATCH 08/12] Address distributed directory review comments Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../DistributedRemoteGrainDirectory.cs | 19 ++++- .../GrainDirectory/LocalGrainDirectory.cs | 15 +++- .../LocalGrainDirectoryCompatibility.cs | 22 +++--- .../GrainDirectoryRollingUpgradeTests.cs | 72 ++++++++++++------- 4 files changed, 87 insertions(+), 41 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs index 5d5b6c5f28b..1db448f1b8a 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedRemoteGrainDirectory.cs @@ -271,12 +271,27 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus { using var cts = CreateTimeoutCts(); await EnsureDirectoryInitializedAsync(cts.Token); - await _directory.UnregisterAsync(address, cts.Token); + await UnregisterAsync(address, cause, cts.Token); } public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount) { - await RunBatchOperationAsync(addresses, (address, cancellationToken) => _directory.UnregisterAsync(address, cancellationToken)); + await RunBatchOperationAsync(addresses, (address, cancellationToken) => UnregisterAsync(address, cause, cancellationToken)); + } + + private Task UnregisterAsync(GrainAddress address, UnregistrationCause cause, CancellationToken cancellationToken) + { + switch (cause) + { + case UnregistrationCause.Force: + return _directory.UnregisterAsync(address, cancellationToken); + case UnregistrationCause.NonexistentActivation: + // LocalGrainDirectory only removes these entries after LazyDeregistrationDelay. + // This compatibility path does not track entry age, so preserve the conditional semantics by not force-removing the entry. + return Task.CompletedTask; + default: + throw new ArgumentOutOfRangeException(nameof(cause), cause, $"Deregistration cause {cause} is unknown and is not supported. This is a bug."); + } } public async Task DeleteGrainAsync(GrainId grainId, int hopCount) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index dd8ee3cd8f7..8e1cbaea50c 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -43,7 +43,7 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ISiloS public RemoteGrainDirectory RemoteGrainDirectory { get; } public RemoteGrainDirectory CacheValidator { get; } internal LocalGrainDirectoryClientCompatibility? DistributedGrainDirectoryClientCompatibility { get; } - internal LocalGrainDirectoryPartitionCompatibility? DistributedGrainDirectoryPartitionCompatibility { get; } + internal ImmutableArray DistributedGrainDirectoryPartitionCompatibilities { get; } internal GrainDirectoryHandoffManager HandoffManager { get; } @@ -83,7 +83,18 @@ public LocalGrainDirectory( RemoteGrainDirectory = new RemoteGrainDirectory(this, Constants.DirectoryServiceType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); CacheValidator = new RemoteGrainDirectory(this, Constants.DirectoryCacheValidatorType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); DistributedGrainDirectoryClientCompatibility = distributedDirectoryActive ? null : new LocalGrainDirectoryClientCompatibility(this, systemTargetShared); - DistributedGrainDirectoryPartitionCompatibility = distributedDirectoryActive ? null : new LocalGrainDirectoryPartitionCompatibility(this, systemTargetShared); + if (!distributedDirectoryActive) + { + var partitionsPerSilo = grainDirectoryOptions.Value.PartitionsPerSilo; + ArgumentOutOfRangeException.ThrowIfLessThan(partitionsPerSilo, 1, nameof(GrainDirectoryOptions.PartitionsPerSilo)); + var compatibilityPartitions = ImmutableArray.CreateBuilder(partitionsPerSilo); + for (var partitionIndex = 0; partitionIndex < partitionsPerSilo; partitionIndex++) + { + compatibilityPartitions.Add(new LocalGrainDirectoryPartitionCompatibility(this, systemTargetShared, partitionIndex)); + } + + DistributedGrainDirectoryPartitionCompatibilities = compatibilityPartitions.MoveToImmutable(); + } // add myself to the list of members AddServer(MyAddress); diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs index 7627f286d01..5fe7c947902 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs @@ -31,29 +31,29 @@ internal sealed class LocalGrainDirectoryPartitionCompatibility : SystemTarget, { private readonly LocalGrainDirectory _directory; - internal LocalGrainDirectoryPartitionCompatibility(LocalGrainDirectory directory, SystemTargetShared shared) - : base(GrainDirectoryPartition.CreateGrainId(shared.SiloAddress, 0), shared) + internal LocalGrainDirectoryPartitionCompatibility(LocalGrainDirectory directory, SystemTargetShared shared, int partitionIndex) + : base(GrainDirectoryPartition.CreateGrainId(shared.SiloAddress, partitionIndex), shared) { _directory = directory; shared.ActivationDirectory.RecordNewTarget(this); } - public async ValueTask> RegisterAsync(MembershipVersion version, GrainAddress address, GrainAddress? currentRegistration) + public ValueTask> RegisterAsync(MembershipVersion version, GrainAddress address, GrainAddress? currentRegistration) { - var result = await _directory.RegisterAsync(address, currentRegistration, hopCount: 1); - return DirectoryResult.FromResult(result.Address!, version); + var result = _directory.DirectoryPartition.AddSingleActivation(address, currentRegistration); + return new(DirectoryResult.FromResult(result.Address!, version)); } - public async ValueTask> LookupAsync(MembershipVersion version, GrainId grainId) + public ValueTask> LookupAsync(MembershipVersion version, GrainId grainId) { - var result = await _directory.LookupAsync(grainId, hopCount: 1); - return DirectoryResult.FromResult(result.Address, version); + var result = _directory.DirectoryPartition.LookUpActivation(grainId); + return new(DirectoryResult.FromResult(result.Address, version)); } - public async ValueTask> DeregisterAsync(MembershipVersion version, GrainAddress address) + public ValueTask> DeregisterAsync(MembershipVersion version, GrainAddress address) { - await _directory.UnregisterAsync(address, UnregistrationCause.Force, hopCount: 1); - return DirectoryResult.FromResult(true, version); + _directory.DirectoryPartition.RemoveActivation(address.GrainId, address.ActivationId, UnregistrationCause.Force); + return new(DirectoryResult.FromResult(true, version)); } public ValueTask GetSnapshotAsync(MembershipVersion version, MembershipVersion rangeVersion, RingRange range) diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs index 1e1d772a4a7..59bc78ca4bc 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -136,6 +136,7 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() var errors = errorLogs .ToArray() .Where(static error => !IsExpectedClientRoutingTableCancellation(error)) + .Where(static error => !IsExpectedDirectoryPartitionRejection(error)) .ToArray(); if (errors.Length > 0) { @@ -153,6 +154,11 @@ private static bool IsExpectedClientRoutingTableCancellation(string error) => error.StartsWith("[Orleans.Runtime.GrainDirectory.ClientDirectory] Exception publishing client routing table", StringComparison.Ordinal) && error.Contains("TaskCanceledException: A task was canceled.", StringComparison.Ordinal); + private static bool IsExpectedDirectoryPartitionRejection(string error) => + error.StartsWith("[Orleans.Messaging] Failed to address message", StringComparison.Ordinal) + && error.Contains("IGrainDirectoryPartition.", StringComparison.Ordinal) + && error.Contains("not active on this silo", StringComparison.Ordinal); + /// /// Activates grains by calling each one. Retries individual calls that fail with transient /// exceptions expected during directory ownership transitions in a rolling upgrade. @@ -165,41 +171,55 @@ private async Task DriveLoad(IGrainFactory client, Func nextGrainId, int c ids[i] = nextGrainId(); } - // First attempt: fire all calls in parallel. - var tasks = ids.Select(id => client.GetGrain(id).GetHost().AsTask()).ToArray(); - try - { - await Task.WhenAll(tasks); - return; - } - catch - { - // Some calls failed — retry the failed ones individually. - } - - var failedIds = new List(); - for (var i = 0; i < tasks.Length; i++) + var remainingIds = ids; + const int MaxAttempts = 10; + for (var attempt = 1; attempt <= MaxAttempts; attempt++) { - if (tasks[i].IsFaulted) + var tasks = remainingIds.Select(id => client.GetGrain(id).GetHost().AsTask()).ToArray(); + try { - failedIds.Add(ids[i]); + await Task.WhenAll(tasks); + return; } - } + catch + { + // Some calls failed — retry the failed ones. + } + + var failedIds = new List(); + var exceptions = new List(); + for (var i = 0; i < tasks.Length; i++) + { + if (tasks[i].IsCompletedSuccessfully) + { + continue; + } - output.WriteLine($" {failedIds.Count}/{count} calls failed, retrying..."); + failedIds.Add(remainingIds[i]); + if (tasks[i].Exception is { } exception) + { + exceptions.Add(exception); + } + else if (tasks[i].IsCanceled) + { + exceptions.Add(new TaskCanceledException(tasks[i])); + } + } - // Retry failed calls one at a time. - foreach (var id in failedIds) - { - try + if (failedIds.Count == 0) { - await client.GetGrain(id).GetHost(); + return; } - catch + + if (attempt == MaxAttempts) { - onPersistentFailure?.Invoke(id); - throw; + onPersistentFailure?.Invoke(failedIds[0]); + throw new AggregateException($"Failed to complete {failedIds.Count} grain calls after {MaxAttempts} attempts.", exceptions); } + + output.WriteLine($" {failedIds.Count}/{remainingIds.Length} calls failed on attempt {attempt}, retrying..."); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + remainingIds = [.. failedIds]; } } From 72086e749f47d3b78a781fcc738ae3aba9902e9c Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 5 May 2026 14:07:59 -0700 Subject: [PATCH 09/12] Fix configurable directory partition snapshots Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../DirectoryMembershipSnapshot.cs | 18 ++++-------------- .../DistributedGrainDirectory.cs | 16 ++++++++++++++-- .../DirectoryMembershipSnapshotTests.cs | 11 ++++++++++- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs index bd2e8cda8f6..76809f62ba7 100644 --- a/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs +++ b/src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs @@ -7,15 +7,12 @@ using System.Linq; using System.Runtime.CompilerServices; using Microsoft.CodeAnalysis; -using Orleans.Configuration; using Orleans.Runtime.Utilities; namespace Orleans.Runtime.GrainDirectory; internal sealed class DirectoryMembershipSnapshot { - internal const int PartitionsPerSilo = GrainDirectoryOptions.DEFAULT_PARTITIONS_PER_SILO; - /// /// The default hash function for directory ring boundaries, matching the partitioning scheme. /// @@ -34,16 +31,6 @@ internal sealed class DirectoryMembershipSnapshot private readonly ImmutableArray> _partitionsByMember; private readonly ImmutableArray> _rangesByMemberPartition; - public DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory) - : this(snapshot, grainFactory, PartitionsPerSilo, DefaultGetRingBoundaries) - { - } - - internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory, Func getRingBoundaries) - : this(snapshot, grainFactory, PartitionsPerSilo, getRingBoundaries) - { - } - internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory, int partitionCount, Func getRingBoundaries) { ArgumentOutOfRangeException.ThrowIfLessThan(partitionCount, 1); @@ -159,7 +146,10 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern public RingRange GetRange(SiloAddress address, int partitionIndex) { ArgumentOutOfRangeException.ThrowIfLessThan(partitionIndex, 0); - ArgumentOutOfRangeException.ThrowIfGreaterThan(partitionIndex, PartitionCount - 1); + if (partitionIndex >= PartitionCount) + { + return RingRange.Empty; + } var memberIndex = TryGetMemberIndex(address); if (memberIndex < 0) diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index e70e700902a..c10bde6d2fa 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -355,7 +355,7 @@ private async Task ProcessMembershipUpdates() { foreach (var partition in _partitions) { - tasks.Add(partition.OnSiloRemovedFromClusterAsync(change)); + tasks.Add(ObserveMembershipUpdateTask(partition.OnSiloRemovedFromClusterAsync(change))); } } } @@ -365,7 +365,7 @@ private async Task ProcessMembershipUpdates() foreach (var partition in _partitions) { - tasks.Add(partition.ProcessMembershipUpdateAsync(current)); + tasks.Add(ObserveMembershipUpdateTask(partition.ProcessMembershipUpdateAsync(current))); } var deltaSize = currentRanges.SizePercent - previousRanges.SizePercent; @@ -390,6 +390,18 @@ private async Task ProcessMembershipUpdates() await Task.WhenAll(tasks).SuppressThrowing(); } + private async Task ObserveMembershipUpdateTask(Task task) + { + try + { + await task; + } + catch (Exception exception) when (!_stoppedCts.IsCancellationRequested) + { + LogErrorProcessingMembershipUpdates(exception); + } + } + SiloAddress? ITestHooks.GetPrimaryForGrain(GrainId grainId) { _membershipService.CurrentView.TryGetOwner(grainId, out var owner, out _); diff --git a/test/Orleans.Core.Tests/Directory/DirectoryMembershipSnapshotTests.cs b/test/Orleans.Core.Tests/Directory/DirectoryMembershipSnapshotTests.cs index 81f183c83fd..179752aa597 100644 --- a/test/Orleans.Core.Tests/Directory/DirectoryMembershipSnapshotTests.cs +++ b/test/Orleans.Core.Tests/Directory/DirectoryMembershipSnapshotTests.cs @@ -1,4 +1,5 @@ using System.Collections.Immutable; +using Orleans.Configuration; using Orleans.Runtime.GrainDirectory; using CsCheck; using Xunit; @@ -32,7 +33,7 @@ private sealed record DirectoryMembershipSnapshotTestCase(DirectoryMembershipSna GenClusterMembershipSnapshot.SelectMany(snapshot => { var activeMemberCount = snapshot.Members.Count(static member => member.Value.Status == SiloStatus.Active); - return Gen.Int[1, DirectoryMembershipSnapshot.PartitionsPerSilo * 2].SelectMany(partitionCount => + return Gen.Int[1, GrainDirectoryOptions.DEFAULT_PARTITIONS_PER_SILO * 2].SelectMany(partitionCount => Gen.UInt.Array[partitionCount].Array[activeMemberCount].Select(hashes => { var i = 0; @@ -95,6 +96,14 @@ public void GetRangeReturnsRangeForRequestedPartition() }); } + [Fact] + public void GetRangeReturnsEmptyForPartitionMissingFromSnapshot() + { + var member = SiloAddress.New(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 1), 1); + + Assert.Equal(RingRange.Empty, DirectoryMembershipSnapshot.Default.GetRange(member, 2)); + } + private static RingRange GetExpectedRange(uint[][] hashesByMember, int memberIndex, int partitionIndex) { var boundaries = new List<(uint Hash, int MemberIndex, int PartitionIndex)>(); From aab783697fa0b607fbdf40ef44d91540e1fa67eb Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 6 May 2026 14:59:00 -0700 Subject: [PATCH 10/12] Add rolling upgrade directory validation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/GrainDirectoryPartition.cs | 20 ++++++++++ .../IGrainDirectoryPartition.cs | 3 ++ .../LocalGrainDirectoryCompatibility.cs | 6 ++- .../GrainDirectoryRollingUpgradeTests.cs | 37 +++++++++++++++++++ 4 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs index 681ce65aaae..fd2af8c3336 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs @@ -771,6 +771,26 @@ async ValueTask IGrainDirectoryTestHooks.CheckIntegrityAsync() } } + async ValueTask IGrainDirectoryTestHooks.RecoverAndCheckIntegrityAsync() + { + GrainRuntime.CheckRuntimeContext(this); + + while (true) + { + var current = CurrentView; + await WaitForRange(RingRange.Full, current.Version); + if (!ReferenceEquals(current, CurrentView)) + { + continue; + } + + await RecoverPartitionRange(current, _currentRange); + break; + } + + await ((IGrainDirectoryTestHooks)this).CheckIntegrityAsync(); + } + private sealed record class PartitionSnapshotState( MembershipVersion DirectoryMembershipVersion, List GrainAddresses, diff --git a/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs index 027b8572e6d..d8a8ceceb72 100644 --- a/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs @@ -38,4 +38,7 @@ internal interface IGrainDirectoryTestHooks : ISystemTarget { [Alias("CheckIntegrityAsync")] ValueTask CheckIntegrityAsync(); + + [Alias("RecoverAndCheckIntegrityAsync")] + ValueTask RecoverAndCheckIntegrityAsync(); } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs index 5fe7c947902..4293c177fe5 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs @@ -57,7 +57,11 @@ public ValueTask> DeregisterAsync(MembershipVersion versio } public ValueTask GetSnapshotAsync(MembershipVersion version, MembershipVersion rangeVersion, RingRange range) - => new(new GrainDirectoryPartitionSnapshot(rangeVersion, _directory.DirectoryPartition.Split(range.Contains))); + { + // LocalGrainDirectory stores entries using the legacy single-ring ownership scheme, so this local + // partition cannot produce a complete snapshot for a distributed virtual partition range. + return new((GrainDirectoryPartitionSnapshot?)null); + } public ValueTask AcknowledgeSnapshotTransferAsync(SiloAddress silo, int partitionIndex, MembershipVersion version) => new(true); } diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs index 59bc78ca4bc..3a5952ff344 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -89,9 +89,11 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() for (var i = 0; i < oldSilos.Count; i++) { + await ValidateDistributedDirectoryIntegrityAsync(cluster, $"before adding distributed silo {i + 1}/{oldSilos.Count}"); var newSilo = await cluster.StartAdditionalSiloAsync(); output.WriteLine($" Started new silo: {newSilo.SiloAddress}"); await cluster.WaitForLivenessToStabilizeAsync(); + await ValidateDistributedDirectoryIntegrityAsync(cluster, $"after adding distributed silo {i + 1}/{oldSilos.Count}"); await DriveLoad(client, nextGrainId, count: 100, id => failingGrainKey = id); } @@ -100,17 +102,22 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() // Phase 3: Stop old silos one at a time, non-primary first. output.WriteLine($"Phase 3: Removing {oldSilos.Count} old LocalGrainDirectory silos..."); + var transitionIndex = 0; foreach (var oldSilo in oldSilos.OrderBy(static s => s.InstanceNumber == 0 ? 1 : 0)) { + transitionIndex++; + await ValidateDistributedDirectoryIntegrityAsync(cluster, $"before removing local silo {transitionIndex}/{oldSilos.Count}"); await cluster.StopSiloAsync(oldSilo); output.WriteLine($" Stopped old silo: {oldSilo.SiloAddress}"); await cluster.WaitForLivenessToStabilizeAsync(); + await ValidateDistributedDirectoryIntegrityAsync(cluster, $"after removing local silo {transitionIndex}/{oldSilos.Count}"); await DriveLoad(client, nextGrainId, count: 100, id => failingGrainKey = id); } // Phase 4: Final verification on the fully-upgraded cluster — must succeed without retries. output.WriteLine("Phase 4: Verifying fully-upgraded DistributedGrainDirectory cluster..."); await DriveLoad(client, nextGrainId, count: 200, id => failingGrainKey = id); + await ValidateDistributedDirectoryIntegrityAsync(cluster, "after final verification"); } catch { @@ -150,6 +157,36 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() Assert.Empty(errors); } + private async Task ValidateDistributedDirectoryIntegrityAsync(InProcessTestCluster cluster, string stage) + { + output.WriteLine($" Validating DistributedGrainDirectory integrity {stage}..."); + + var integrityChecks = new List(); + foreach (var silo in cluster.Silos) + { + var membershipService = silo.ServiceProvider.GetService(); + if (membershipService is null) + { + continue; + } + + for (var partitionIndex = 0; partitionIndex < membershipService.PartitionsPerSilo; partitionIndex++) + { + var replica = cluster.InternalClient.GetSystemTarget( + GrainDirectoryPartition.CreateGrainId(silo.SiloAddress, partitionIndex).GrainId); + integrityChecks.Add(replica.RecoverAndCheckIntegrityAsync().AsTask()); + } + } + + await Task.WhenAll(integrityChecks); + foreach (var task in integrityChecks) + { + await task; + } + + output.WriteLine($" Validated {integrityChecks.Count} DistributedGrainDirectory partitions {stage}."); + } + private static bool IsExpectedClientRoutingTableCancellation(string error) => error.StartsWith("[Orleans.Runtime.GrainDirectory.ClientDirectory] Exception publishing client routing table", StringComparison.Ordinal) && error.Contains("TaskCanceledException: A task was canceled.", StringComparison.Ordinal); From 2ae9246ad23004c3f45486f886fb0760c3b55206 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 6 May 2026 15:33:11 -0700 Subject: [PATCH 11/12] Validate directory entries from activations Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../DistributedGrainDirectory.cs | 28 +++++----- .../GrainDirectory/LocalGrainDirectory.cs | 2 +- .../LocalGrainDirectoryCompatibility.cs | 51 +++++++++++++++++-- .../GrainDirectoryRollingUpgradeTests.cs | 16 +++--- 4 files changed, 70 insertions(+), 27 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index c10bde6d2fa..ad892bcd4c7 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -272,24 +272,24 @@ public ValueTask>> GetRegisteredActivations(Members } return new(result.AsImmutable()); + } - static IGrainDirectory? GetGrainDirectory(IGrainContext grainContext, GrainDirectoryResolver grainDirectoryResolver) + internal static IGrainDirectory? GetGrainDirectory(IGrainContext grainContext, GrainDirectoryResolver grainDirectoryResolver) + { + if (grainContext is ActivationData activationData) + { + return activationData.Shared.GrainDirectory; + } + else if (grainContext is SystemTarget) { - if (grainContext is ActivationData activationData) - { - return activationData.Shared.GrainDirectory; - } - else if (grainContext is SystemTarget systemTarget) - { - return null; - } - else if (grainContext.GetComponent() is { IsUsingGrainDirectory: true }) - { - return grainDirectoryResolver.Resolve(grainContext.GrainId.Type); - } - return null; } + else if (grainContext.GetComponent() is { IsUsingGrainDirectory: true }) + { + return grainDirectoryResolver.Resolve(grainContext.GrainId.Type); + } + + return null; } internal ValueTask RefreshViewAsync(MembershipVersion version, CancellationToken cancellationToken) => _membershipService.RefreshViewAsync(version, cancellationToken); diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 8e1cbaea50c..7c97e15c82b 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -82,7 +82,7 @@ public LocalGrainDirectory( var distributedDirectoryActive = serviceProvider.GetService() is not null; RemoteGrainDirectory = new RemoteGrainDirectory(this, Constants.DirectoryServiceType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); CacheValidator = new RemoteGrainDirectory(this, Constants.DirectoryCacheValidatorType, systemTargetShared, registerAsSystemTarget: !distributedDirectoryActive); - DistributedGrainDirectoryClientCompatibility = distributedDirectoryActive ? null : new LocalGrainDirectoryClientCompatibility(this, systemTargetShared); + DistributedGrainDirectoryClientCompatibility = distributedDirectoryActive ? null : new LocalGrainDirectoryClientCompatibility(systemTargetShared); if (!distributedDirectoryActive) { var partitionsPerSilo = grainDirectoryOptions.Value.PartitionsPerSilo; diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs index 4293c177fe5..6d6e1291b0f 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryCompatibility.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; using Orleans.Concurrency; using Orleans.GrainDirectory; @@ -11,17 +12,59 @@ namespace Orleans.Runtime.GrainDirectory; /// internal sealed class LocalGrainDirectoryClientCompatibility : SystemTarget, IGrainDirectoryClient { - private readonly LocalGrainDirectory _directory; + private readonly ActivationDirectory _localActivations; + private GrainDirectoryResolver? _grainDirectoryResolver; - internal LocalGrainDirectoryClientCompatibility(LocalGrainDirectory directory, SystemTargetShared shared) + internal LocalGrainDirectoryClientCompatibility(SystemTargetShared shared) : base(Constants.GrainDirectoryType, shared) { - _directory = directory; + _localActivations = shared.ActivationDirectory; shared.ActivationDirectory.RecordNewTarget(this); } public ValueTask>> GetRegisteredActivations(MembershipVersion membershipVersion, RingRange range, bool isValidation) - => new(_directory.DirectoryPartition.Split(range.Contains).AsImmutable()); + { + var grainDirectoryResolver = _grainDirectoryResolver ??= ActivationServices.GetRequiredService(); + List result = []; + foreach (var (_, activation) in _localActivations) + { + if (!UsesLocalGrainDirectory(activation, grainDirectoryResolver)) + { + continue; + } + + if (activation is ActivationData activationData && !activationData.IsValid) + { + continue; + } + + var address = activation.Address; + if (range.Contains(address.GrainId)) + { + result.Add(address); + } + } + + return new(result.AsImmutable()); + } + + private static bool UsesLocalGrainDirectory(IGrainContext activation, GrainDirectoryResolver grainDirectoryResolver) + { + if (activation is ActivationData activationData) + { + return activationData.IsUsingGrainDirectory && activationData.Shared.GrainDirectory is null; + } + else if (activation is SystemTarget) + { + return false; + } + else if (activation.GetComponent() is { IsUsingGrainDirectory: true }) + { + return DistributedGrainDirectory.GetGrainDirectory(activation, grainDirectoryResolver) is null; + } + + return false; + } public ValueTask>> RecoverRegisteredActivations(MembershipVersion membershipVersion, RingRange range, SiloAddress siloAddress, int partitionId) => GetRegisteredActivations(membershipVersion, range, isValidation: false); diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs index 3a5952ff344..5b5ffa90f80 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -89,11 +89,11 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() for (var i = 0; i < oldSilos.Count; i++) { - await ValidateDistributedDirectoryIntegrityAsync(cluster, $"before adding distributed silo {i + 1}/{oldSilos.Count}"); + await ValidateDirectoryIntegrityAsync(cluster, $"before adding distributed silo {i + 1}/{oldSilos.Count}"); var newSilo = await cluster.StartAdditionalSiloAsync(); output.WriteLine($" Started new silo: {newSilo.SiloAddress}"); await cluster.WaitForLivenessToStabilizeAsync(); - await ValidateDistributedDirectoryIntegrityAsync(cluster, $"after adding distributed silo {i + 1}/{oldSilos.Count}"); + await ValidateDirectoryIntegrityAsync(cluster, $"after adding distributed silo {i + 1}/{oldSilos.Count}"); await DriveLoad(client, nextGrainId, count: 100, id => failingGrainKey = id); } @@ -106,18 +106,18 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() foreach (var oldSilo in oldSilos.OrderBy(static s => s.InstanceNumber == 0 ? 1 : 0)) { transitionIndex++; - await ValidateDistributedDirectoryIntegrityAsync(cluster, $"before removing local silo {transitionIndex}/{oldSilos.Count}"); + await ValidateDirectoryIntegrityAsync(cluster, $"before removing local silo {transitionIndex}/{oldSilos.Count}"); await cluster.StopSiloAsync(oldSilo); output.WriteLine($" Stopped old silo: {oldSilo.SiloAddress}"); await cluster.WaitForLivenessToStabilizeAsync(); - await ValidateDistributedDirectoryIntegrityAsync(cluster, $"after removing local silo {transitionIndex}/{oldSilos.Count}"); + await ValidateDirectoryIntegrityAsync(cluster, $"after removing local silo {transitionIndex}/{oldSilos.Count}"); await DriveLoad(client, nextGrainId, count: 100, id => failingGrainKey = id); } // Phase 4: Final verification on the fully-upgraded cluster — must succeed without retries. output.WriteLine("Phase 4: Verifying fully-upgraded DistributedGrainDirectory cluster..."); await DriveLoad(client, nextGrainId, count: 200, id => failingGrainKey = id); - await ValidateDistributedDirectoryIntegrityAsync(cluster, "after final verification"); + await ValidateDirectoryIntegrityAsync(cluster, "after final verification"); } catch { @@ -157,9 +157,9 @@ public async Task RollingUpgrade_LocalToDistributed_NoErrors() Assert.Empty(errors); } - private async Task ValidateDistributedDirectoryIntegrityAsync(InProcessTestCluster cluster, string stage) + private async Task ValidateDirectoryIntegrityAsync(InProcessTestCluster cluster, string stage) { - output.WriteLine($" Validating DistributedGrainDirectory integrity {stage}..."); + output.WriteLine($" Validating grain directory integrity {stage}..."); var integrityChecks = new List(); foreach (var silo in cluster.Silos) @@ -184,7 +184,7 @@ private async Task ValidateDistributedDirectoryIntegrityAsync(InProcessTestClust await task; } - output.WriteLine($" Validated {integrityChecks.Count} DistributedGrainDirectory partitions {stage}."); + output.WriteLine($" Validated {integrityChecks.Count} DistributedGrainDirectory partitions against ActivationDirectory {stage}."); } private static bool IsExpectedClientRoutingTableCancellation(string error) => From e09534fb10bbd24d181a0f843a6b3b89a2c1112f Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 6 May 2026 16:28:22 -0700 Subject: [PATCH 12/12] Use one distributed directory partition by default Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Options/GrainDirectoryOptions.cs | 2 +- .../GrainDirectory/GrainDirectoryPartition.cs | 51 +++++++++ .../IGrainDirectoryPartition.cs | 3 + src/api/Orleans.Runtime/Orleans.Runtime.cs | 2 +- .../GrainDirectoryRollingUpgradeTests.cs | 103 +++++++++++++++++- 5 files changed, 154 insertions(+), 7 deletions(-) diff --git a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs index 975ee407f9d..1f13f2185e1 100644 --- a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs +++ b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs @@ -57,7 +57,7 @@ public enum CachingStrategyType /// /// The default value for . /// - public const int DEFAULT_PARTITIONS_PER_SILO = ConsistentRingOptions.DEFAULT_NUM_VIRTUAL_RING_BUCKETS; + public const int DEFAULT_PARTITIONS_PER_SILO = 1; /// /// Gets or sets the initial (minimum) time, in seconds, to keep a cache entry before revalidating. diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs index fd2af8c3336..7c876932c6c 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs @@ -791,6 +791,57 @@ async ValueTask IGrainDirectoryTestHooks.RecoverAndCheckIntegrityAsync() await ((IGrainDirectoryTestHooks)this).CheckIntegrityAsync(); } + async ValueTask>> IGrainDirectoryTestHooks.CheckActivationsAsync(Immutable> activations) + { + GrainRuntime.CheckRuntimeContext(this); + var current = CurrentView; + + await WaitForRange(RingRange.Full, current.Version); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _rangeLocks.Add((RingRange.Full, current.Version, tcs)); + try + { + List checkedGrains = []; + foreach (var activation in activations.Value) + { + if (!IsOwner(current, activation.GrainId)) + { + continue; + } + + checkedGrains.Add(activation.GrainId); + if (_directory.TryGetValue(activation.GrainId, out var existingEntry)) + { + if (!existingEntry.Equals(activation)) + { + LogErrorIntegrityViolation(_logger, activation, existingEntry); + Debug.Fail($"Integrity violation: activation '{activation}' does not match existing directory entry '{existingEntry}'."); + } + } + else + { + LogErrorIntegrityViolation(_logger, activation); + Debug.Fail($"Integrity violation: activation '{activation}' not found in directory."); + } + } + + return checkedGrains.AsImmutable(); + } + finally + { + if (ShutdownToken.IsCancellationRequested) + { + tcs.SetCanceled(ShutdownToken); + } + else + { + tcs.SetResult(); + } + + _rangeLocks.Remove((RingRange.Full, current.Version, tcs)); + } + } + private sealed record class PartitionSnapshotState( MembershipVersion DirectoryMembershipVersion, List GrainAddresses, diff --git a/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs index d8a8ceceb72..96408efeacc 100644 --- a/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs @@ -41,4 +41,7 @@ internal interface IGrainDirectoryTestHooks : ISystemTarget [Alias("RecoverAndCheckIntegrityAsync")] ValueTask RecoverAndCheckIntegrityAsync(); + + [Alias("CheckActivationsAsync")] + ValueTask>> CheckActivationsAsync(Immutable> activations); } diff --git a/src/api/Orleans.Runtime/Orleans.Runtime.cs b/src/api/Orleans.Runtime/Orleans.Runtime.cs index f5f6af12416..c3577927a1d 100644 --- a/src/api/Orleans.Runtime/Orleans.Runtime.cs +++ b/src/api/Orleans.Runtime/Orleans.Runtime.cs @@ -170,7 +170,7 @@ public partial class GrainDirectoryOptions { public const int DEFAULT_CACHE_SIZE = 1000000; public const CachingStrategyType DEFAULT_CACHING_STRATEGY = 1; - public const int DEFAULT_PARTITIONS_PER_SILO = 30; + public const int DEFAULT_PARTITIONS_PER_SILO = 1; [System.Obsolete("DEFAULT_INITIAL_CACHE_TTL is deprecated and will be removed in a future version.")] public static readonly System.TimeSpan DEFAULT_INITIAL_CACHE_TTL; [System.Obsolete("DEFAULT_MAXIMUM_CACHE_TTL is deprecated and will be removed in a future version.")] diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs index 5b5ffa90f80..395ac7394e8 100644 --- a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryRollingUpgradeTests.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans; +using Orleans.Concurrency; using Orleans.GrainDirectory; using Orleans.Hosting; using Orleans.Runtime; @@ -161,7 +162,8 @@ private async Task ValidateDirectoryIntegrityAsync(InProcessTestCluster cluster, { output.WriteLine($" Validating grain directory integrity {stage}..."); - var integrityChecks = new List(); + var activations = GetDirectoryActivations(cluster); + var distributedPartitions = new List(); foreach (var silo in cluster.Silos) { var membershipService = silo.ServiceProvider.GetService(); @@ -174,17 +176,108 @@ private async Task ValidateDirectoryIntegrityAsync(InProcessTestCluster cluster, { var replica = cluster.InternalClient.GetSystemTarget( GrainDirectoryPartition.CreateGrainId(silo.SiloAddress, partitionIndex).GrainId); - integrityChecks.Add(replica.RecoverAndCheckIntegrityAsync().AsTask()); + distributedPartitions.Add(replica); } } - await Task.WhenAll(integrityChecks); - foreach (var task in integrityChecks) + if (distributedPartitions.Count == 0) + { + await CheckActivationRegistrationsWithLocalDirectoryAsync(activations, stage); + } + else + { + var integrityChecks = distributedPartitions.Select(static partition => partition.RecoverAndCheckIntegrityAsync().AsTask()).ToArray(); + await Task.WhenAll(integrityChecks); + foreach (var task in integrityChecks) + { + await task; + } + + var activationAddresses = activations.Select(static activation => activation.Address).ToList().AsImmutable(); + var activationChecks = distributedPartitions.Select(partition => partition.CheckActivationsAsync(activationAddresses).AsTask()).ToArray(); + await Task.WhenAll(activationChecks); + var distributedCheckedGrains = new HashSet(); + foreach (var task in activationChecks) + { + foreach (var grainId in (await task).Value) + { + Assert.True(distributedCheckedGrains.Add(grainId), $"Grain '{grainId}' was checked by multiple distributed directory partitions during '{stage}'."); + } + } + + var localActivationChecks = new List(); + foreach (var activation in activations) + { + if (distributedCheckedGrains.Contains(activation.Address.GrainId)) + { + continue; + } + + var grainLocator = activation.Silo.ServiceProvider.GetRequiredService(); + localActivationChecks.Add(CheckActivationRegistrationAsync(grainLocator, activation.Address, activation.Silo.SiloAddress, stage)); + } + + await Task.WhenAll(localActivationChecks); + foreach (var task in localActivationChecks) + { + await task; + } + } + + output.WriteLine($" Validated {activations.Count} activations and {distributedPartitions.Count} DistributedGrainDirectory partitions {stage}."); + } + + private static List<(InProcessSiloHandle Silo, GrainAddress Address)> GetDirectoryActivations(InProcessTestCluster cluster) + { + var result = new List<(InProcessSiloHandle Silo, GrainAddress Address)>(); + foreach (var silo in cluster.Silos) + { + var activations = silo.ServiceProvider.GetRequiredService(); + foreach (var (_, activation) in activations) + { + if (activation is ActivationData { IsValid: false } || !UsesGrainDirectory(activation)) + { + continue; + } + + result.Add((silo, activation.Address)); + } + } + + return result; + } + + private static bool UsesGrainDirectory(IGrainContext activation) + { + if (activation is ActivationData activationData) + { + return activationData.IsUsingGrainDirectory; + } + + return activation is not SystemTarget && activation.GetComponent() is { IsUsingGrainDirectory: true }; + } + + private static async Task CheckActivationRegistrationsWithLocalDirectoryAsync(List<(InProcessSiloHandle Silo, GrainAddress Address)> activations, string stage) + { + var activationChecks = activations.Select(activation => + { + var grainLocator = activation.Silo.ServiceProvider.GetRequiredService(); + return CheckActivationRegistrationAsync(grainLocator, activation.Address, activation.Silo.SiloAddress, stage); + }).ToArray(); + + await Task.WhenAll(activationChecks); + foreach (var task in activationChecks) { await task; } + } - output.WriteLine($" Validated {integrityChecks.Count} DistributedGrainDirectory partitions against ActivationDirectory {stage}."); + private static async Task CheckActivationRegistrationAsync(GrainLocator grainLocator, GrainAddress activationAddress, SiloAddress siloAddress, string stage) + { + var registeredAddress = await grainLocator.Lookup(activationAddress.GrainId); + Assert.True( + activationAddress.Matches(registeredAddress), + $"Activation '{activationAddress.ToFullString()}' on silo '{siloAddress}' did not have a matching directory registration during '{stage}'. Registered address: '{registeredAddress?.ToFullString() ?? ""}'."); } private static bool IsExpectedClientRoutingTableCancellation(string error) =>