From 814369c230a1aa530543d27e9c328ec576bdffbc Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Wed, 10 Sep 2025 16:05:02 +0100 Subject: [PATCH] fix: quartz sample --- .../GreetingsPumper/Program.cs | 171 +++++++++--------- .../GreetingsReceiverConsole.csproj | 1 - .../GreetingsReceiverConsole/Program.cs | 115 +++++------- 3 files changed, 129 insertions(+), 158 deletions(-) diff --git a/samples/Scheduler/QuartzTaskQueue/GreetingsPumper/Program.cs b/samples/Scheduler/QuartzTaskQueue/GreetingsPumper/Program.cs index 59efd94684..78ec5d0e16 100644 --- a/samples/Scheduler/QuartzTaskQueue/GreetingsPumper/Program.cs +++ b/samples/Scheduler/QuartzTaskQueue/GreetingsPumper/Program.cs @@ -2,6 +2,7 @@ using System.Threading; using System.Threading.Tasks; using Amazon; +using Amazon.Runtime; using Amazon.Runtime.CredentialManagement; using Greetings.Ports.Commands; using Microsoft.Extensions.DependencyInjection; @@ -14,110 +15,102 @@ using Quartz; using Serilog; -namespace GreetingsPumper; -static class Program -{ - private static async Task Main(string[] args) - { - Log.Logger = new LoggerConfiguration() - .MinimumLevel.Information() - .Enrich.FromLogContext() - .WriteTo.Console() - .CreateLogger(); +Log.Logger = new LoggerConfiguration() + .MinimumLevel.Information() + .Enrich.FromLogContext() + .WriteTo.Console() + .CreateLogger(); - var host = new HostBuilder() - .ConfigureServices((hostContext, services) => +var host = new HostBuilder() + .ConfigureServices((hostContext, services) => + { + services + .AddSingleton() + .AddQuartz(opt => { - services - .AddSingleton() - .AddQuartz(opt => - { - opt.SchedulerId = "QuartzBrighter"; - opt.SchedulerName = "QuartzBrighter"; - opt.UseSimpleTypeLoader(); - opt.UseInMemoryStore(); - }) - .AddQuartzHostedService(opt => - { - opt.WaitForJobsToComplete = true; - }); + opt.SchedulerId = "QuartzBrighter"; + opt.SchedulerName = "QuartzBrighter"; + opt.UseSimpleTypeLoader(); + opt.UseInMemoryStore(); + }) + .AddQuartzHostedService(opt => + { + opt.WaitForJobsToComplete = true; + }); - if (new CredentialProfileStoreChain().TryGetAWSCredentials("default", out var credentials)) + var awsConnection = new AWSMessagingGatewayConnection(new BasicAWSCredentials("test", "test"), RegionEndpoint.USEast1, + cfg => + { + var serviceURL = "http://localhost:4566/"; + if (!string.IsNullOrWhiteSpace(serviceURL)) { - var awsConnection = new AWSMessagingGatewayConnection(credentials, RegionEndpoint.USEast1, - cfg => - { - var serviceURL = - "http://localhost:4566/"; // Environment.GetEnvironmentVariable("LOCALSTACK_SERVICE_URL"); - if (!string.IsNullOrWhiteSpace(serviceURL)) - { - cfg.ServiceURL = serviceURL; - } - }); - - var producerRegistry = new SnsProducerRegistryFactory( - awsConnection, - [ - new SnsPublication - { - Topic = new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()), - RequestType = typeof(GreetingEvent) - }, - new SnsPublication - { - Topic = - new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)), - TopicAttributes = new SnsAttributes { Type = SqsType.Fifo } - } - ] - ).Create(); + cfg.ServiceURL = serviceURL; + } + }); - services.AddBrighter() - .AddProducers((configure) => - { - configure.ProducerRegistry = producerRegistry; - }) - .UseScheduler(provider => - { - var factory = provider.GetRequiredService(); - return new QuartzSchedulerFactory( - factory.GetScheduler().GetAwaiter().GetResult()); - }) - .AutoFromAssemblies([typeof(GreetingEvent).Assembly]); + var producerRegistry = new SnsProducerRegistryFactory( + awsConnection, + [ + new SnsPublication + { + Topic = new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()), + RequestType = typeof(GreetingEvent) + }, + new SnsPublication + { + Topic = + new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)), + TopicAttributes = new SnsAttributes { Type = SqsType.Fifo } } + ] + ).Create(); + + services.AddBrighter() + .AddProducers((configure) => + { + configure.ProducerRegistry = producerRegistry; + }) + .UseScheduler(provider => + { + var factory = provider.GetRequiredService(); + return new QuartzSchedulerFactory( + factory.GetScheduler().GetAwaiter().GetResult()); + }) + .AutoFromAssemblies([typeof(GreetingEvent).Assembly]); - services.AddHostedService(); - } - ) - .UseConsoleLifetime() - .UseSerilog() - .Build(); + services.AddHostedService(); + } + ) + .UseConsoleLifetime() + .UseSerilog() + .Build(); - await host.RunAsync(); - } - internal sealed class RunCommandProcessor(IAmACommandProcessor commandProcessor, ILogger logger) +Console.CancelKeyPress += (_, _) => host.StopAsync().Wait(); + +await host.RunAsync(); + +internal sealed class RunCommandProcessor(IAmACommandProcessor commandProcessor, ILogger logger) : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + long loop = 0; + while (!stoppingToken.IsCancellationRequested) { - long loop = 0; - while (!stoppingToken.IsCancellationRequested) - { - loop++; - - logger.LogInformation("Scheduling message #{Loop}", loop); - commandProcessor.Post(TimeSpan.FromMinutes(1), new GreetingEvent($"Scheduler message Ian #{loop}")); - - if (loop % 100 != 0) - { - continue; - } + loop++; - logger.LogInformation("Pausing for breath..."); - await Task.Delay(4000, stoppingToken); + logger.LogInformation("Scheduling message #{Loop}", loop); + commandProcessor.Post(TimeSpan.FromSeconds(10), new GreetingEvent($"Scheduler message Ian #{loop}")); + + if (loop % 100 != 0) + { + continue; } + + logger.LogInformation("Pausing for breath..."); + await Task.Delay(4000, stoppingToken); } } } diff --git a/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj b/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj index 2b9beb01ce..e1ec623b34 100644 --- a/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj +++ b/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj @@ -9,7 +9,6 @@ - diff --git a/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/Program.cs b/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/Program.cs index 25954ee783..45dd3c47a7 100644 --- a/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/Program.cs +++ b/samples/Scheduler/QuartzTaskQueue/GreetingsReceiverConsole/Program.cs @@ -26,6 +26,7 @@ THE SOFTWARE. */ using System; using System.Threading.Tasks; using Amazon; +using Amazon.Runtime; using Amazon.Runtime.CredentialManagement; using Greetings.Ports.Commands; using Microsoft.Extensions.DependencyInjection; @@ -36,79 +37,57 @@ THE SOFTWARE. */ using Paramore.Brighter.ServiceActivator.Extensions.Hosting; using Serilog; -namespace GreetingsReceiverConsole; +Log.Logger = new LoggerConfiguration() + .MinimumLevel.Information() + .Enrich.FromLogContext() + .WriteTo.Console() + .CreateLogger(); -public class Program -{ - public static async Task Main(string[] args) +var host = new HostBuilder() + .ConfigureServices((_, services) => { - Log.Logger = new LoggerConfiguration() - .MinimumLevel.Information() - .Enrich.FromLogContext() - .WriteTo.Console() - .CreateLogger(); + var subscriptions = new Subscription[] + { + new SqsSubscription( + subscriptionName: new SubscriptionName("paramore.example.greeting"), + channelName: new ChannelName(typeof(GreetingEvent).FullName.ToValidSNSTopicName()), + routingKey: new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()), + channelType: ChannelType.PubSub, + bufferSize: 10, + timeOut: TimeSpan.FromMilliseconds(20), + messagePumpType: MessagePumpType.Reactor, + queueAttributes: new SqsAttributes(lockTimeout: TimeSpan.FromSeconds(30))), + new SqsSubscription( + subscriptionName: new SubscriptionName("paramore.example.farewell"), + channelName: new ChannelName(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)), + routingKey: new RoutingKey(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)), + channelType: ChannelType.PubSub, + bufferSize: 10, + timeOut: TimeSpan.FromMilliseconds(20), + messagePumpType: MessagePumpType.Reactor, + topicAttributes: new SnsAttributes(type: SqsType.Fifo), + queueAttributes: new SqsAttributes(lockTimeout: TimeSpan.FromSeconds(30), type: SqsType.Fifo)) + }; - var host = new HostBuilder() - .ConfigureServices((_, services) => + //create the gateway + var serviceURL = "http://localhost:4566/"; + var region = RegionEndpoint.USEast1; + var awsConnection = new AWSMessagingGatewayConnection(new BasicAWSCredentials("test", "test"), region, + cfg => { cfg.ServiceURL = serviceURL; }); + services.AddConsumers(options => { - var subscriptions = new Subscription[] - { - new SqsSubscription( - subscriptionName: new SubscriptionName("paramore.example.greeting"), - channelName: new ChannelName(typeof(GreetingEvent).FullName.ToValidSNSTopicName()), - routingKey:new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()), - channelType: ChannelType.PubSub, - bufferSize: 10, - timeOut: TimeSpan.FromMilliseconds(20), - messagePumpType: MessagePumpType.Reactor, - queueAttributes: new SqsAttributes( - lockTimeout: TimeSpan.FromSeconds(30) - )), - new SqsSubscription( - subscriptionName:new SubscriptionName("paramore.example.farewell"), - channelName: new ChannelName(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)), - routingKey: new RoutingKey(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)), - channelType: ChannelType.PubSub, - bufferSize: 10, - timeOut: TimeSpan.FromMilliseconds(20), - messagePumpType: MessagePumpType.Reactor, - queueAttributes: new SqsAttributes( - lockTimeout: TimeSpan.FromSeconds(30), - type: SqsType.Fifo - )) - }; - - //create the gateway - if (new CredentialProfileStoreChain().TryGetAWSCredentials("default", out var credentials)) - { - var serviceURL = "http://localhost:4566/"; // Environment.GetEnvironmentVariable("LOCALSTACK_SERVICE_URL"); - var region = string.IsNullOrWhiteSpace(serviceURL) - ? RegionEndpoint.EUWest1 - : RegionEndpoint.USEast1; - var awsConnection = new AWSMessagingGatewayConnection(credentials, region, - cfg => - { - if (!string.IsNullOrWhiteSpace(serviceURL)) - { - cfg.ServiceURL = serviceURL; - } - }); + options.Subscriptions = subscriptions; + options.DefaultChannelFactory = new ChannelFactory(awsConnection); + }) + .AutoFromAssemblies(); - services.AddConsumers(options => - { - options.Subscriptions = subscriptions; - options.DefaultChannelFactory = new ChannelFactory(awsConnection); - }) - .AutoFromAssemblies(); - } + services.AddHostedService(); + }) + .UseConsoleLifetime() + .UseSerilog() + .Build(); - services.AddHostedService(); - }) - .UseConsoleLifetime() - .UseSerilog() - .Build(); +Console.CancelKeyPress += (_, _) => host.StopAsync().Wait(); - await host.RunAsync(); - } -} +await host.RunAsync();