Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using CoreTests.Runtime;
using NSubstitute;
using Wolverine.ComplianceTests;
using Wolverine.ErrorHandling;
using Wolverine.Runtime;
using Wolverine.Transports;
using Xunit;

namespace CoreTests.ErrorHandling;

public class PauseListenerContinuationTests
{
[Fact]
public async Task execute_with_null_envelope_does_not_throw()
{
var continuation = new PauseListenerContinuation(TimeSpan.FromSeconds(10));
var lifecycle = Substitute.For<IEnvelopeLifecycle>();
lifecycle.Envelope.Returns((Envelope?)null);

var runtime = new MockWolverineRuntime();

await continuation.ExecuteAsync(lifecycle, runtime, DateTimeOffset.UtcNow, null);
}

[Fact]
public async Task execute_with_missing_listener_uses_destination_lookup_without_throwing()
{
var continuation = new PauseListenerContinuation(TimeSpan.FromSeconds(10));
var lifecycle = Substitute.For<IEnvelopeLifecycle>();
var envelope = ObjectMother.Envelope();
var destination = new Uri("rabbitmq://queue/paused");
envelope.Destination = destination;
envelope.Listener = null;
lifecycle.Envelope.Returns(envelope);

var runtime = new MockWolverineRuntime();
runtime.Endpoints.FindListeningAgent(destination).Returns((IListenerCircuit?)null);

await continuation.ExecuteAsync(lifecycle, runtime, DateTimeOffset.UtcNow, null);

runtime.Endpoints.Received(1).FindListeningAgent(destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,25 @@
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Shouldly;
using Wolverine;
using Wolverine.Postgresql;
using Wolverine.RateLimiting;
using Xunit;
using Xunit.Abstractions;

namespace Wolverine.RabbitMQ.Tests;

public class rate_limiting_end_to_end
{
private readonly ITestOutputHelper _output;

public rate_limiting_end_to_end(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task rate_limited_messages_are_delayed_over_rabbitmq()
{
Expand Down Expand Up @@ -77,6 +86,91 @@ public async Task rate_limited_messages_are_delayed_over_rabbitmq()
}
}

[Fact]
public async Task rate_limited_messages_do_not_throw_when_rescheduled()
{
var queueName = $"rate-limit-pause-{Guid.NewGuid():N}";
var schemaName = $"rate_limit_pause_{Guid.NewGuid():N}";
var window = 5.Seconds();
var exceptions = new List<Exception>();
var logs = new List<string>();

IHost? publisher = null;
IHost? receiver = null;

try
{
receiver = await Host.CreateDefaultBuilder()
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddProvider(new ListLoggerProvider(logs, exceptions, _output));
logging.SetMinimumLevel(LogLevel.Debug);
})
.UseWolverine(opts =>
{
opts.ApplicationAssembly = typeof(rate_limiting_end_to_end).Assembly;

opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, schemaName);
opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision().AutoPurgeOnStartup();
opts.ListenToRabbitQueue(queueName).UseDurableInbox();

opts.Policies.ForMessagesOfType<RateLimitedPauseMessage>()
.RateLimit("pause-test", new RateLimit(1, window));

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

publisher = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision();
opts.PublishAllMessages().ToRabbitQueue(queueName);
opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

await publisher.ResetResourceState();
await receiver.ResetResourceState();
await Task.Delay(500.Milliseconds());

var bus = publisher.MessageBus();
for (var i = 0; i < 10; i++)
{
await bus.PublishAsync(new RateLimitedPauseMessage());
}

// Wait long enough for rescheduling to occur
await Task.Delay(8.Seconds());

// The critical assertion: no NullReferenceException during pause/resume
exceptions.Any(ContainsNullRef).ShouldBeFalse(
"Expected no NullReferenceException during rate-limited pause/resume cycle");
}
finally
{
if (receiver != null)
{
await safeStopAsync(receiver);
}

if (publisher != null)
{
await safeStopAsync(publisher);
}
}
}

private static bool ContainsNullRef(Exception ex)
{
var current = ex;
while (current != null)
{
if (current is NullReferenceException) return true;
current = current.InnerException;
}
return false;
}

private static async Task alignToWindowStart(TimeSpan window)
{
var windowTicks = window.Ticks;
Expand Down Expand Up @@ -116,6 +210,7 @@ private static async Task safeStopAsync(IHost host)
}

public record RateLimitedMessage;
public record RateLimitedPauseMessage;

public class RateLimitTracker
{
Expand Down Expand Up @@ -150,3 +245,66 @@ public static void Handle(RateLimitedMessage message, RateLimitTracker tracker)
tracker.RecordHandled();
}
}

public static class RateLimitedPauseMessageHandler
{
public static void Handle(RateLimitedPauseMessage message)
{
}
}

internal sealed class ListLoggerProvider : ILoggerProvider
{
private readonly List<string> _logs;
private readonly List<Exception> _exceptions;
private readonly ITestOutputHelper _output;

public ListLoggerProvider(List<string> logs, List<Exception> exceptions, ITestOutputHelper output)
{
_logs = logs;
_exceptions = exceptions;
_output = output;
}

public ILogger CreateLogger(string categoryName) => new ListLogger(categoryName, _logs, _exceptions, _output);
public void Dispose() { }
}

internal sealed class ListLogger : ILogger
{
private readonly string _categoryName;
private readonly List<string> _logs;
private readonly List<Exception> _exceptions;
private readonly ITestOutputHelper _output;

public ListLogger(string categoryName, List<string> logs, List<Exception> exceptions, ITestOutputHelper output)
{
_categoryName = categoryName;
_logs = logs;
_exceptions = exceptions;
_output = output;
}

public bool IsEnabled(LogLevel logLevel) => logLevel != LogLevel.None;
public IDisposable BeginScope<TState>(TState state) where TState : notnull => NullScope.Instance;

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception,
Func<TState, Exception?, string> formatter)
{
var message = formatter(state, exception);
var line = $"{_categoryName}/{logLevel}: {message}";
_logs.Add(line);
try { _output.WriteLine(line); } catch { /* disposed output */ }
if (exception != null)
{
_exceptions.Add(exception);
try { _output.WriteLine(exception.ToString()); } catch { }
}
}

private sealed class NullScope : IDisposable
{
public static readonly NullScope Instance = new();
public void Dispose() { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,22 @@ public RabbitMqListener(IWolverineRuntime runtime,

public async ValueTask StopAsync()
{
if (_consumer == null)
var consumer = _consumer;
if (consumer == null)
{
return;
}

foreach (var consumerTag in _consumer.ConsumerTags) await Channel!.BasicCancelAsync(consumerTag, true, default);
var channel = Channel;
if (channel != null)
{
foreach (var consumerTag in consumer.ConsumerTags)
{
await channel.BasicCancelAsync(consumerTag, true, default);
}
}

_consumer.Dispose();
consumer.Dispose();
_consumer = null;
}

Expand Down
14 changes: 12 additions & 2 deletions src/Wolverine/ErrorHandling/PauseListenerContinuation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@ public PauseListenerContinuation(TimeSpan pauseTime)
public ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now,
Activity? activity)
{
var envelope = lifecycle.Envelope;
if (envelope == null)
{
runtime.Logger.LogInformation("Unable to pause listening endpoint because no envelope is active.");
return ValueTask.CompletedTask;
}

IListenerCircuit? agent;
var destination = lifecycle.Envelope!.Destination;
var destination = envelope.Destination;
if (destination?.Scheme == "local")
{
// This will only work for durable, local queues
agent = runtime.Endpoints.AgentForLocalQueue(destination) as IListenerCircuit;
}
else
{
agent = runtime.Endpoints.FindListeningAgent(lifecycle.Envelope!.Listener!.Address);
var listenerAddress = envelope.Listener?.Address ?? destination;
agent = listenerAddress != null
? runtime.Endpoints.FindListeningAgent(listenerAddress)
: null;
}

if (agent != null)
Expand Down
31 changes: 21 additions & 10 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,32 +145,43 @@ public async ValueTask StopAndDrainAsync()
return;
}

if (Listener == null)
var listener = Listener;
var receiver = _receiver;
if (listener == null)
{
return;
}

Listener = null;
_receiver = null;

try
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Uri);

if (Listener == null) return;

await Listener.StopAsync();
await _receiver!.DrainAsync();
await listener.StopAsync();
if (receiver != null)
{
await receiver.DrainAsync();
}

await Listener.DisposeAsync();
_receiver?.Dispose();
try
{
await listener.DisposeAsync();
}
catch (ObjectDisposedException)
{
// Listener may already be disposed during rapid pause/stop cycles.
}

receiver?.Dispose();
}
catch (Exception e)
{
_logger.LogError(e, "Unable to stop and drain the listener for {Uri}", Uri);
}

Listener = null;
_receiver = null;

Status = ListeningStatus.Stopped;
_runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, Status));

Expand Down
Loading