Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 46 additions & 36 deletions src/Orleans.Runtime/GrainDirectory/DirectoryMembershipSnapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Microsoft.CodeAnalysis;
using Orleans.Configuration;
using Orleans.Runtime.Utilities;
Expand All @@ -21,12 +20,21 @@ internal sealed class DirectoryMembershipSnapshot
private readonly ImmutableArray<ImmutableArray<IGrainDirectoryPartition>> _partitionsByMember;
private readonly ImmutableArray<ImmutableArray<RingRange>> _rangesByMemberPartition;

public DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory) : this(snapshot, grainFactory, static (silo, count) => silo.GetUniformHashCodes(count))
public DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory)
: this(snapshot, grainFactory, PartitionsPerSilo, static (silo, count) => silo.GetUniformHashCodes(count))
{
}

internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory, Func<SiloAddress, int, uint[]> getRingBoundaries)
: this(snapshot, grainFactory, PartitionsPerSilo, getRingBoundaries)
{
}

internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IInternalGrainFactory grainFactory, int partitionCount, Func<SiloAddress, int, uint[]> getRingBoundaries)
{
ArgumentOutOfRangeException.ThrowIfLessThan(partitionCount, 1);
PartitionCount = partitionCount;

var sortedActiveMembers = ImmutableArray.CreateBuilder<SiloAddress>(snapshot.Members.Count(static m => m.Value.Status == SiloStatus.Active));
foreach (var member in snapshot.Members)
{
Expand All @@ -38,16 +46,15 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern
}

sortedActiveMembers.Sort(static (left, right) => left.CompareTo(right));
var hashIndexPairs = ImmutableArray.CreateBuilder<(uint Hash, int MemberIndex, int PartitionIndex)>(PartitionsPerSilo * sortedActiveMembers.Count);
var hashIndexPairs = ImmutableArray.CreateBuilder<(uint Hash, int MemberIndex, int PartitionIndex)>(partitionCount * sortedActiveMembers.Count);
var memberPartitions = ImmutableArray.CreateBuilder<ImmutableArray<IGrainDirectoryPartition>>();
for (var memberIndex = 0; memberIndex < sortedActiveMembers.Count; memberIndex++)
{
var activeMember = sortedActiveMembers[memberIndex];
var hashCodes = getRingBoundaries(activeMember, PartitionsPerSilo).ToList();
hashCodes.Sort();
Debug.Assert(hashCodes.Count == PartitionsPerSilo);
var partitionReferences = ImmutableArray.CreateBuilder<IGrainDirectoryPartition>(PartitionsPerSilo);
for (var partitionIndex = 0; partitionIndex < hashCodes.Count; partitionIndex++)
var hashCodes = getRingBoundaries(activeMember, partitionCount);
Debug.Assert(hashCodes.Length == partitionCount);
var partitionReferences = ImmutableArray.CreateBuilder<IGrainDirectoryPartition>(partitionCount);
for (var partitionIndex = 0; partitionIndex < hashCodes.Length; partitionIndex++)
{
var hashCode = hashCodes[partitionIndex];
hashIndexPairs.Add((hashCode, memberIndex, partitionIndex));
Expand Down Expand Up @@ -76,32 +83,7 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern
return left.MemberIndex.CompareTo(right.MemberIndex);
});

Dictionary<int, ImmutableArray<RingRange>.Builder> rangesByMemberPartitionBuilders = [];
for (var i = 0; i < hashIndexPairs.Count; i++)
{
var (_, memberIndex, _) = hashIndexPairs[i];
ref var builder = ref CollectionsMarshal.GetValueRefOrAddDefault(rangesByMemberPartitionBuilders, memberIndex, out _);
builder ??= ImmutableArray.CreateBuilder<RingRange>(PartitionsPerSilo);
var (entryStart, _, _) = hashIndexPairs[i];
var (nextStart, _, _) = hashIndexPairs[(i + 1) % hashIndexPairs.Count];
var range = (entryStart == nextStart) switch
{
true when hashIndexPairs.Count == 1 => RingRange.Full,
true => RingRange.Empty,
_ => RingRange.Create(entryStart, nextStart)
};
builder.Add(range);
}

var rangesByMemberPartition = ImmutableArray.CreateBuilder<ImmutableArray<RingRange>>(sortedActiveMembers.Count);
for (var i = 0; i < sortedActiveMembers.Count; i++)
{
rangesByMemberPartition.Add(rangesByMemberPartitionBuilders[i].ToImmutable());
}

_rangesByMemberPartition = rangesByMemberPartition.ToImmutable();

// Remove empty ranges.
// Remove empty ranges caused by hash collisions.
if (hashIndexPairs.Count > 1)
{
for (var i = 1; i < hashIndexPairs.Count;)
Expand All @@ -118,9 +100,35 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern
}

_ringBoundaries = hashIndexPairs.ToImmutable();

Members = sortedActiveMembers.ToImmutable();

var rangesByMemberPartitionBuilders = new RingRange[sortedActiveMembers.Count][];
for (var i = 0; i < sortedActiveMembers.Count; i++)
{
rangesByMemberPartitionBuilders[i] = new RingRange[partitionCount];
}

var rangesByMemberPartition = ImmutableArray.CreateBuilder<ImmutableArray<RingRange>>(sortedActiveMembers.Count);
for (var i = 0; i < hashIndexPairs.Count; i++)
{
var (entryStart, memberIndex, partitionIndex) = hashIndexPairs[i];
var (nextStart, _, _) = hashIndexPairs[(i + 1) % hashIndexPairs.Count];
var range = (entryStart == nextStart) switch
{
true when hashIndexPairs.Count == 1 => RingRange.Full,
true => RingRange.Empty,
_ => RingRange.Create(entryStart, nextStart)
};

rangesByMemberPartitionBuilders[memberIndex][partitionIndex] = range;
}

for (var i = 0; i < sortedActiveMembers.Count; i++)
{
rangesByMemberPartition.Add(ImmutableArray.CreateRange(rangesByMemberPartitionBuilders[i]));
}

_rangesByMemberPartition = rangesByMemberPartition.ToImmutable();
_rangesByMember = new RingRangeCollection[Members.Length];
ClusterMembershipSnapshot = snapshot;
}
Expand All @@ -130,12 +138,14 @@ internal DirectoryMembershipSnapshot(ClusterMembershipSnapshot snapshot, IIntern

public MembershipVersion Version => ClusterMembershipSnapshot.Version;

internal int PartitionCount { get; }

public ImmutableArray<SiloAddress> Members { get; }

public RingRange GetRange(SiloAddress address, int partitionIndex)
{
ArgumentOutOfRangeException.ThrowIfLessThan(partitionIndex, 0);
ArgumentOutOfRangeException.ThrowIfGreaterThan(partitionIndex, PartitionsPerSilo - 1);
ArgumentOutOfRangeException.ThrowIfGreaterThan(partitionIndex, PartitionCount - 1);

var memberIndex = TryGetMemberIndex(address);
if (memberIndex < 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,24 @@ async ValueTask<DirectoryResult<GrainAddress>> IGrainDirectoryPartition.Register
ArgumentNullException.ThrowIfNull(address);
LogRegisterAsync(version, address, currentRegistration);

// Ensure that the current membership version is new enough.
await WaitForRange(address.GrainId, version);
if (!IsOwner(CurrentView, address.GrainId))
var currentView = await WaitForOwnershipViewAsync(address.GrainId, version);
if (!IsOwner(currentView, address.GrainId))
{
return DirectoryResult.RefreshRequired<GrainAddress>(CurrentView.Version);
return DirectoryResult.RefreshRequired<GrainAddress>(currentView.Version);
}

DebugAssertOwnership(address.GrainId);
return DirectoryResult.FromResult(RegisterCore(address, currentRegistration), version);
DebugAssertOwnership(currentView, address.GrainId);
return DirectoryResult.FromResult(RegisterCore(address, currentRegistration, currentView.Version), version);
}

async ValueTask<DirectoryResult<GrainAddress?>> IGrainDirectoryPartition.LookupAsync(MembershipVersion version, GrainId grainId)
{
LogLookupAsync(version, grainId);

// Ensure we can serve the request.
await WaitForRange(grainId, version);
if (!IsOwner(CurrentView, grainId))
var currentView = await WaitForOwnershipViewAsync(grainId, version);
if (!IsOwner(currentView, grainId))
{
return DirectoryResult.RefreshRequired<GrainAddress?>(CurrentView.Version);
return DirectoryResult.RefreshRequired<GrainAddress?>(currentView.Version);
}

return DirectoryResult.FromResult(LookupCore(grainId), version);
Expand All @@ -43,13 +41,13 @@ async ValueTask<DirectoryResult<bool>> IGrainDirectoryPartition.DeregisterAsync(
ArgumentNullException.ThrowIfNull(address);
LogDeregisterAsync(version, address);

await WaitForRange(address.GrainId, version);
if (!IsOwner(CurrentView, address.GrainId))
var currentView = await WaitForOwnershipViewAsync(address.GrainId, version);
if (!IsOwner(currentView, address.GrainId))
{
return DirectoryResult.RefreshRequired<bool>(CurrentView.Version);
return DirectoryResult.RefreshRequired<bool>(currentView.Version);
}

DebugAssertOwnership(address.GrainId);
DebugAssertOwnership(currentView, address.GrainId);
return DirectoryResult.FromResult(DeregisterCore(address), version);
}

Expand All @@ -73,21 +71,37 @@ private bool DeregisterCore(GrainAddress address)
return null;
}

private GrainAddress RegisterCore(GrainAddress newAddress, GrainAddress? existingAddress)
private async ValueTask<DirectoryMembershipSnapshot> WaitForOwnershipViewAsync(GrainId grainId, MembershipVersion version)
{
while (true)
{
// Requests which arrive with a stale membership version must still wait for any in-flight ownership
// transition in the current view before deciding whether this partition can serve them.
var currentView = CurrentView;
var waitVersion = currentView.Version > version ? currentView.Version : version;
await WaitForRange(grainId, waitVersion);
if (ReferenceEquals(currentView, CurrentView))
{
return currentView;
}
}
}

private GrainAddress RegisterCore(GrainAddress newAddress, GrainAddress? existingAddress, MembershipVersion currentVersion)
{
ref var existing = ref CollectionsMarshal.GetValueRefOrAddDefault(_directory, newAddress.GrainId, out _);

if (existing is null || existing.Matches(existingAddress) || IsSiloDead(existing))
{
if (newAddress.MembershipVersion != CurrentView.Version)
if (newAddress.MembershipVersion != currentVersion)
{
// Set the membership version to match the view number in which it was registered.
newAddress = new()
{
GrainId = newAddress.GrainId,
SiloAddress = newAddress.SiloAddress,
ActivationId = newAddress.ActivationId,
MembershipVersion = CurrentView.Version
MembershipVersion = currentVersion
};
}

Expand Down
77 changes: 49 additions & 28 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ private void ProcessMembershipUpdate(DirectoryMembershipSnapshot current)
Debug.Assert(!addedRange.Intersects(removedRange));
Debug.Assert(addedRange.IsEmpty || addedRange.Intersects(_currentRange));
Debug.Assert(!addedRange.Intersects(previousRange));
Debug.Assert(previousRange.IsEmpty || _currentRange.IsEmpty || previousRange.Start == _currentRange.Start);
Debug.Assert(previousRange.IsEmpty || previousRange.IsFull || _currentRange.IsEmpty || _currentRange.IsFull || previousRange.Start == _currentRange.Start);
#endif

if (!removedRange.IsEmpty)
Expand Down Expand Up @@ -440,7 +440,7 @@ private async Task AcquireRangeAsync(DirectoryMembershipSnapshot previous, Direc
var previousOwnerRange = previousOwnerRanges[partitionIndex];
if (previousOwnerRange.Intersects(addedRange))
{
tasks.Add(TransferSnapshotAsync(current, addedRange, previousOwner, partitionIndex, previous.Version));
tasks.Add(TransferSnapshotAsync(current, previousOwnerRange, addedRange, previousOwner, partitionIndex, previous.Version));
}
}
}
Expand Down Expand Up @@ -501,22 +501,52 @@ private void UnlockRange(RingRange range, MembershipVersion version, TaskComplet
}
}

private async Task<bool> TransferSnapshotAsync(DirectoryMembershipSnapshot current, RingRange addedRange, SiloAddress previousOwner, int partitionIndex, MembershipVersion previousVersion)
private async Task<bool> TransferSnapshotAsync(
DirectoryMembershipSnapshot current,
RingRange previousOwnerRange,
RingRange addedRange,
SiloAddress previousOwner,
int partitionIndex,
MembershipVersion previousVersion)
{
var currentTransferRange = addedRange;
try
{
var stopwatch = ValueStopwatch.StartNew();
LogTraceRequestingEntries(_logger, addedRange, previousOwner, previousVersion);

var partition = GetPartitionReference(previousOwner, partitionIndex);
var waitedForRange = false;
foreach (var transferRange in GetSnapshotTransferRanges(previousOwnerRange, addedRange))
{
currentTransferRange = transferRange;
LogTraceRequestingEntries(_logger, transferRange, previousOwner, previousVersion);

// Alternatively, the previous owner could push the snapshot. The pull-based approach is used here because it is simpler.
var snapshot = await partition.GetSnapshotAsync(current.Version, previousVersion, addedRange).AsTask().WaitAsync(ShutdownToken);
// Alternatively, the previous owner could push the snapshot. The pull-based approach is used here because it is simpler.
var snapshot = await partition.GetSnapshotAsync(current.Version, previousVersion, transferRange).AsTask().WaitAsync(ShutdownToken);

if (snapshot is null)
{
LogWarningExpectedValidSnapshot(_logger, previousOwner, addedRange);
return false;
if (snapshot is null)
{
LogWarningExpectedValidSnapshot(_logger, previousOwner, transferRange);
return false;
}

if (!waitedForRange)
{
// Wait for previous versions to be unlocked before proceeding.
await WaitForRange(addedRange, previousVersion);
waitedForRange = true;
}

// Incorporate the values into the grain directory.
foreach (var entry in snapshot.GrainAddresses)
{
DebugAssertOwnership(current, entry.GrainId);

LogTraceReceivedEntry(_logger, entry, previousOwner, previousVersion);
_directory[entry.GrainId] = entry;
}

LogDebugTransferredEntries(_logger, snapshot.GrainAddresses.Count, transferRange, previousOwner);
}

// The acknowledgement step lets the previous owner know that the snapshot has been received so that it can proceed.
Expand All @@ -526,20 +556,6 @@ private async Task<bool> TransferSnapshotAsync(DirectoryMembershipSnapshot curre
false,
nameof(IGrainDirectoryPartition.AcknowledgeSnapshotTransferAsync)).Ignore();

// Wait for previous versions to be unlocked before proceeding.
await WaitForRange(addedRange, previousVersion);

// Incorporate the values into the grain directory.
foreach (var entry in snapshot.GrainAddresses)
{
DebugAssertOwnership(current, entry.GrainId);

LogTraceReceivedEntry(_logger, entry, previousOwner, previousVersion);
_directory[entry.GrainId] = entry;
}

LogDebugTransferredEntries(_logger, snapshot.GrainAddresses.Count, addedRange, previousOwner);

DirectoryInstruments.SnapshotTransferCount.Add(1);
DirectoryInstruments.SnapshotTransferDuration.Record((long)stopwatch.Elapsed.TotalMilliseconds);

Expand All @@ -549,11 +565,11 @@ private async Task<bool> TransferSnapshotAsync(DirectoryMembershipSnapshot curre
{
if (exception is SiloUnavailableException)
{
LogWarningRemoteHostUnavailable(_logger, addedRange);
LogWarningRemoteHostUnavailable(_logger, currentTransferRange);
}
else
{
LogWarningErrorTransferringOwnership(_logger, exception, addedRange);
LogWarningErrorTransferringOwnership(_logger, exception, currentTransferRange);
}

return false;
Expand Down Expand Up @@ -640,6 +656,11 @@ async Task<List<GrainAddress>> GetRegisteredActivationsFromClusterMember(Members
}
}

internal static IEnumerable<RingRange> GetSnapshotTransferRanges(RingRange previousOwnerRange, RingRange addedRange)
{
return previousOwnerRange.Intersections(addedRange);
}

private async Task<T> InvokeOnClusterMember<T>(SiloAddress siloAddress, Func<Task<T>> func, T defaultValue, string operationName)
{
GrainRuntime.CheckRuntimeContext(this);
Expand Down Expand Up @@ -841,13 +862,13 @@ private sealed record class PartitionSnapshotState(

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Requesting entries for ranges '{Range}' from '{PreviousOwner}' at version '{PreviousVersion}'."
Message = "Requesting entries for range '{Range}' from '{PreviousOwner}' at version '{PreviousVersion}'."
)]
private static partial void LogTraceRequestingEntries(ILogger logger, RingRange range, SiloAddress previousOwner, MembershipVersion previousVersion);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Expected a valid snapshot from previous owner '{PreviousOwner}' for part of ranges '{Range}', but found none."
Message = "Expected a valid snapshot from previous owner '{PreviousOwner}' for range '{Range}', but found none."
)]
private static partial void LogWarningExpectedValidSnapshot(ILogger logger, SiloAddress previousOwner, RingRange range);

Expand Down
Loading
Loading