Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ public enum CachingStrategyType
/// </summary>
public const int DEFAULT_CACHE_SIZE = 1_000_000;

/// <summary>
/// Gets or sets the number of directory partitions per silo.
/// </summary>
/// <remarks>
/// This option only applies when using the <see cref="DistributedGrainDirectory"/>.
/// </remarks>
public int PartitionsPerSilo { get; set; } = DEFAULT_PARTITIONS_PER_SILO;

/// <summary>
/// The default value for <see cref="PartitionsPerSilo"/>.
/// </summary>
public const int DEFAULT_PARTITIONS_PER_SILO = 1;

/// <summary>
/// Gets or sets the initial (minimum) time, in seconds, to keep a cache entry before revalidating.
/// </summary>
Expand Down
16 changes: 14 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/DirectoryMembershipService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ internal sealed partial class DirectoryMembershipService : IAsyncDisposable
private readonly CancellationTokenSource _shutdownCts = new();
private readonly Task _runTask;
private readonly AsyncEnumerable<DirectoryMembershipSnapshot> _viewUpdates;
private readonly int _partitionsPerSilo;
private readonly Func<SiloAddress, int, uint[]> _getRingBoundaries;

public DirectoryMembershipSnapshot CurrentView { get; private set; } = DirectoryMembershipSnapshot.Default;

public int PartitionsPerSilo => _partitionsPerSilo;

public IAsyncEnumerable<DirectoryMembershipSnapshot> ViewUpdates => _viewUpdates;

public ClusterMembershipService ClusterMembershipService { get; }
Expand All @@ -41,8 +45,16 @@ public async ValueTask<DirectoryMembershipSnapshot> RefreshViewAsync(MembershipV
return CurrentView;
}

public DirectoryMembershipService(ClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, ILogger<DirectoryMembershipService> logger)
public DirectoryMembershipService(
ClusterMembershipService clusterMembershipService,
IInternalGrainFactory grainFactory,
ILogger<DirectoryMembershipService> logger,
int partitionsPerSilo,
Func<SiloAddress, int, uint[]> getRingBoundaries)
{
ArgumentOutOfRangeException.ThrowIfLessThan(partitionsPerSilo, 1);
_partitionsPerSilo = partitionsPerSilo;
_getRingBoundaries = getRingBoundaries;
_viewUpdates = new(
DirectoryMembershipSnapshot.Default,
(previous, proposed) => proposed.Version >= previous.Version,
Expand All @@ -64,7 +76,7 @@ private async Task ProcessMembershipUpdates()
{
await foreach (var update in ClusterMembershipService.MembershipUpdates.WithCancellation(_shutdownCts.Token))
{
var view = new DirectoryMembershipSnapshot(update, _grainFactory);
var view = new DirectoryMembershipSnapshot(update, _grainFactory, _partitionsPerSilo, _getRingBoundaries);
_viewUpdates.Publish(view);
}
}
Expand Down
32 changes: 18 additions & 14 deletions src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,30 @@
using System.Linq;
using System.Runtime.CompilerServices;
using Microsoft.CodeAnalysis;
using Orleans.Configuration;
using Orleans.Runtime.Utilities;

namespace Orleans.Runtime.GrainDirectory;

internal sealed class DirectoryMembershipSnapshot
{
internal const int PartitionsPerSilo = ConsistentRingOptions.DEFAULT_NUM_VIRTUAL_RING_BUCKETS;
/// <summary>
/// The default hash function for directory ring boundaries, matching the <see cref="LocalGrainDirectory"/> partitioning scheme.
/// </summary>
internal static readonly Func<SiloAddress, int, uint[]> DefaultGetRingBoundaries = static (silo, count) =>
{
if (count == 1)
{
return [unchecked((uint)silo.GetConsistentHashCode())];
}

return silo.GetUniformHashCodes(count);
};

private readonly ImmutableArray<(uint Start, int MemberIndex, int PartitionIndex)> _ringBoundaries;
private readonly RingRangeCollection[] _rangesByMember;
private readonly ImmutableArray<ImmutableArray<IGrainDirectoryPartition>> _partitionsByMember;
private readonly ImmutableArray<ImmutableArray<RingRange>> _rangesByMemberPartition;

public DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory)
: this(snapshot, grainFactory, PartitionsPerSilo, static (silo, count) => silo.GetUniformHashCodes(count))
{
}

internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory, Func<SiloAddress, int, uint[]> getRingBoundaries)
: this(snapshot, grainFactory, PartitionsPerSilo, getRingBoundaries)
{
}

internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory, int partitionCount, Func<SiloAddress, int, uint[]> getRingBoundaries)
{
ArgumentOutOfRangeException.ThrowIfLessThan(partitionCount, 1);
Expand Down Expand Up @@ -134,7 +135,7 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern
}

public static DirectoryMembershipSnapshot Default { get; } = new DirectoryMembershipSnapshot(
new ClusterMembershipSnapshot(ImmutableDictionary<SiloAddress, ClusterMember>.Empty, MembershipVersion.MinValue), null!);
new ClusterMembershipSnapshot(ImmutableDictionary<SiloAddress, ClusterMember>.Empty, MembershipVersion.MinValue), null!, partitionCount: 1, DefaultGetRingBoundaries);

public MembershipVersion Version => ClusterMembershipSnapshot.Version;

Expand All @@ -145,7 +146,10 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern
public RingRange GetRange(SiloAddress address, int partitionIndex)
{
ArgumentOutOfRangeException.ThrowIfLessThan(partitionIndex, 0);
ArgumentOutOfRangeException.ThrowIfGreaterThan(partitionIndex, PartitionCount - 1);
if (partitionIndex >= PartitionCount)
{
return RingRange.Empty;
}

var memberIndex = TryGetMemberIndex(address);
if (memberIndex < 0)
Expand Down
85 changes: 54 additions & 31 deletions src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,41 +91,52 @@ public DistributedGrainDirectory(
_serviceProvider = serviceProvider;
_membershipService = membershipService;
_logger = logger;
var partitions = ImmutableArray.CreateBuilder<GrainDirectoryPartition>(DirectoryMembershipSnapshot.PartitionsPerSilo);
for (var i = 0; i < DirectoryMembershipSnapshot.PartitionsPerSilo; i++)
var partitionsPerSilo = membershipService.PartitionsPerSilo;
var partitions = ImmutableArray.CreateBuilder<GrainDirectoryPartition>(partitionsPerSilo);
for (var i = 0; i < partitionsPerSilo; i++)
{
partitions.Add(new GrainDirectoryPartition(i, this, grainFactory, shared));
}

_partitions = partitions.ToImmutable();
shared.ActivationDirectory.RecordNewTarget(this);

// Register IRemoteGrainDirectory system targets so that silos running LocalGrainDirectory
// can forward directory requests to this silo during a rolling upgrade.
DistributedRemoteGrainDirectory.Create(this, membershipService, shared);
}

public async Task<GrainAddress?> Lookup(GrainId grainId) => await InvokeAsync(
grainId,
static (partition, version, grainId, cancellationToken) => partition.LookupAsync(version, grainId),
grainId,
CancellationToken.None);
public async Task<GrainAddress?> Lookup(GrainId grainId) => await LookupAsync(grainId, CancellationToken.None);

public async Task<GrainAddress?> Register(GrainAddress address) => await InvokeAsync(
address.GrainId,
static (partition, version, address, cancellationToken) => partition.RegisterAsync(version, address, null),
address,
CancellationToken.None);
public async Task<GrainAddress?> Register(GrainAddress address) => await RegisterAsync(address, null, CancellationToken.None);

public async Task Unregister(GrainAddress address) => await InvokeAsync(
address.GrainId,
static (partition, version, address, cancellationToken) => partition.DeregisterAsync(version, address),
address,
CancellationToken.None);

public async Task<GrainAddress?> Register(GrainAddress address, GrainAddress? previousAddress) => await InvokeAsync(
public async Task<GrainAddress?> Register(GrainAddress address, GrainAddress? previousAddress) => await RegisterAsync(address, previousAddress, CancellationToken.None);

public Task UnregisterSilos(List<SiloAddress> siloAddresses) => Task.CompletedTask;

internal Task<GrainAddress?> LookupAsync(GrainId grainId, CancellationToken cancellationToken) => InvokeAsync(
grainId,
static (partition, version, grainId, cancellationToken) => partition.LookupAsync(version, grainId),
grainId,
cancellationToken);

internal async Task<GrainAddress?> RegisterAsync(GrainAddress address, GrainAddress? previousAddress, CancellationToken cancellationToken) => await InvokeAsync(
address.GrainId,
static (partition, version, state, cancellationToken) => partition.RegisterAsync(version, state.Address, state.PreviousAddress),
(Address: address, PreviousAddress: previousAddress),
CancellationToken.None);
cancellationToken);

public Task UnregisterSilos(List<SiloAddress> siloAddresses) => Task.CompletedTask;
internal Task UnregisterAsync(GrainAddress address, CancellationToken cancellationToken) => InvokeAsync(
address.GrainId,
static (partition, version, address, cancellationToken) => partition.DeregisterAsync(version, address),
address,
cancellationToken);

private async Task<TResult> InvokeAsync<TState, TResult>(
GrainId grainId,
Expand Down Expand Up @@ -261,24 +272,24 @@ public ValueTask<Immutable<List<GrainAddress>>> GetRegisteredActivations(Members
}

return new(result.AsImmutable());
}

static IGrainDirectory? GetGrainDirectory(IGrainContext grainContext, GrainDirectoryResolver grainDirectoryResolver)
internal static IGrainDirectory? GetGrainDirectory(IGrainContext grainContext, GrainDirectoryResolver grainDirectoryResolver)
{
if (grainContext is ActivationData activationData)
{
return activationData.Shared.GrainDirectory;
}
else if (grainContext is SystemTarget)
{
if (grainContext is ActivationData activationData)
{
return activationData.Shared.GrainDirectory;
}
else if (grainContext is SystemTarget systemTarget)
{
return null;
}
else if (grainContext.GetComponent<PlacementStrategy>() is { IsUsingGrainDirectory: true })
{
return grainDirectoryResolver.Resolve(grainContext.GrainId.Type);
}

return null;
}
else if (grainContext.GetComponent<PlacementStrategy>() is { IsUsingGrainDirectory: true })
{
return grainDirectoryResolver.Resolve(grainContext.GrainId.Type);
}

return null;
}

internal ValueTask<DirectoryMembershipSnapshot> RefreshViewAsync(MembershipVersion version, CancellationToken cancellationToken) => _membershipService.RefreshViewAsync(version, cancellationToken);
Expand Down Expand Up @@ -344,7 +355,7 @@ private async Task ProcessMembershipUpdates()
{
foreach (var partition in _partitions)
{
tasks.Add(partition.OnSiloRemovedFromClusterAsync(change));
tasks.Add(ObserveMembershipUpdateTask(partition.OnSiloRemovedFromClusterAsync(change)));
}
}
}
Expand All @@ -354,7 +365,7 @@ private async Task ProcessMembershipUpdates()

foreach (var partition in _partitions)
{
tasks.Add(partition.ProcessMembershipUpdateAsync(current));
tasks.Add(ObserveMembershipUpdateTask(partition.ProcessMembershipUpdateAsync(current)));
}

var deltaSize = currentRanges.SizePercent - previousRanges.SizePercent;
Expand All @@ -379,6 +390,18 @@ private async Task ProcessMembershipUpdates()
await Task.WhenAll(tasks).SuppressThrowing();
}

private async Task ObserveMembershipUpdateTask(Task task)
{
try
{
await task;
}
catch (Exception exception) when (!_stoppedCts.IsCancellationRequested)
{
LogErrorProcessingMembershipUpdates(exception);
}
}

SiloAddress? ITestHooks.GetPrimaryForGrain(GrainId grainId)
{
_membershipService.CurrentView.TryGetOwner(grainId, out var owner, out _);
Expand Down
Loading
Loading