Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 82 additions & 89 deletions samples/Scheduler/QuartzTaskQueue/GreetingsPumper/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using Amazon;
using Amazon.Runtime;
using Amazon.Runtime.CredentialManagement;
using Greetings.Ports.Commands;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -14,110 +15,102 @@
using Quartz;
using Serilog;

namespace GreetingsPumper;

static class Program
{
private static async Task Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.Enrich.FromLogContext()
.WriteTo.Console()
.CreateLogger();
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.Enrich.FromLogContext()
.WriteTo.Console()
.CreateLogger();

var host = new HostBuilder()
.ConfigureServices((hostContext, services) =>
var host = new HostBuilder()
.ConfigureServices((hostContext, services) =>
{
services
.AddSingleton<QuartzBrighterJob>()
.AddQuartz(opt =>
{
services
.AddSingleton<QuartzBrighterJob>()
.AddQuartz(opt =>
{
opt.SchedulerId = "QuartzBrighter";
opt.SchedulerName = "QuartzBrighter";
opt.UseSimpleTypeLoader();
opt.UseInMemoryStore();
})
.AddQuartzHostedService(opt =>
{
opt.WaitForJobsToComplete = true;
});
opt.SchedulerId = "QuartzBrighter";
opt.SchedulerName = "QuartzBrighter";
opt.UseSimpleTypeLoader();
opt.UseInMemoryStore();
})
.AddQuartzHostedService(opt =>
{
opt.WaitForJobsToComplete = true;
});

if (new CredentialProfileStoreChain().TryGetAWSCredentials("default", out var credentials))
var awsConnection = new AWSMessagingGatewayConnection(new BasicAWSCredentials("test", "test"), RegionEndpoint.USEast1,
cfg =>
{
var serviceURL = "http://localhost:4566/";
if (!string.IsNullOrWhiteSpace(serviceURL))
{
var awsConnection = new AWSMessagingGatewayConnection(credentials, RegionEndpoint.USEast1,
cfg =>
{
var serviceURL =
"http://localhost:4566/"; // Environment.GetEnvironmentVariable("LOCALSTACK_SERVICE_URL");
if (!string.IsNullOrWhiteSpace(serviceURL))
{
cfg.ServiceURL = serviceURL;
}
});

var producerRegistry = new SnsProducerRegistryFactory(
awsConnection,
[
new SnsPublication
{
Topic = new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
RequestType = typeof(GreetingEvent)
},
new SnsPublication
{
Topic =
new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)),
TopicAttributes = new SnsAttributes { Type = SqsType.Fifo }
}
]
).Create();
cfg.ServiceURL = serviceURL;
}
});

services.AddBrighter()
.AddProducers((configure) =>
{
configure.ProducerRegistry = producerRegistry;
})
.UseScheduler(provider =>
{
var factory = provider.GetRequiredService<ISchedulerFactory>();
return new QuartzSchedulerFactory(
factory.GetScheduler().GetAwaiter().GetResult());
})
.AutoFromAssemblies([typeof(GreetingEvent).Assembly]);
var producerRegistry = new SnsProducerRegistryFactory(
awsConnection,
[
new SnsPublication
{
Topic = new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
RequestType = typeof(GreetingEvent)
},
new SnsPublication
{
Topic =
new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)),
TopicAttributes = new SnsAttributes { Type = SqsType.Fifo }
}
]
).Create();

services.AddBrighter()
.AddProducers((configure) =>
{
configure.ProducerRegistry = producerRegistry;
})
.UseScheduler(provider =>
{
var factory = provider.GetRequiredService<ISchedulerFactory>();
return new QuartzSchedulerFactory(
factory.GetScheduler().GetAwaiter().GetResult());
})
.AutoFromAssemblies([typeof(GreetingEvent).Assembly]);

services.AddHostedService<RunCommandProcessor>();
}
)
.UseConsoleLifetime()
.UseSerilog()
.Build();
services.AddHostedService<RunCommandProcessor>();
}
)
.UseConsoleLifetime()
.UseSerilog()
.Build();

await host.RunAsync();
}

internal sealed class RunCommandProcessor(IAmACommandProcessor commandProcessor, ILogger<RunCommandProcessor> logger)
Console.CancelKeyPress += (_, _) => host.StopAsync().Wait();

await host.RunAsync();

internal sealed class RunCommandProcessor(IAmACommandProcessor commandProcessor, ILogger<RunCommandProcessor> logger)
: BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
long loop = 0;
while (!stoppingToken.IsCancellationRequested)
{
long loop = 0;
while (!stoppingToken.IsCancellationRequested)
{
loop++;

logger.LogInformation("Scheduling message #{Loop}", loop);
commandProcessor.Post(TimeSpan.FromMinutes(1), new GreetingEvent($"Scheduler message Ian #{loop}"));

if (loop % 100 != 0)
{
continue;
}
loop++;

logger.LogInformation("Pausing for breath...");
await Task.Delay(4000, stoppingToken);
logger.LogInformation("Scheduling message #{Loop}", loop);
commandProcessor.Post(TimeSpan.FromSeconds(10), new GreetingEvent($"Scheduler message Ian #{loop}"));

if (loop % 100 != 0)
{
continue;
}

logger.LogInformation("Pausing for breath...");
await Task.Delay(4000, stoppingToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
<ProjectReference Include="..\Greetings\Greetings.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.Extensions.NETCore.Setup" VersionOverride="$(AWSSDKExtensionsNETCoreSetup)"/>
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ THE SOFTWARE. */
using System;
using System.Threading.Tasks;
using Amazon;
using Amazon.Runtime;
using Amazon.Runtime.CredentialManagement;
using Greetings.Ports.Commands;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -36,79 +37,57 @@ THE SOFTWARE. */
using Paramore.Brighter.ServiceActivator.Extensions.Hosting;
using Serilog;

namespace GreetingsReceiverConsole;
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.Enrich.FromLogContext()
.WriteTo.Console()
.CreateLogger();

public class Program
{
public static async Task Main(string[] args)
var host = new HostBuilder()
.ConfigureServices((_, services) =>
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.Enrich.FromLogContext()
.WriteTo.Console()
.CreateLogger();
var subscriptions = new Subscription[]
{
new SqsSubscription<GreetingEvent>(
subscriptionName: new SubscriptionName("paramore.example.greeting"),
channelName: new ChannelName(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
routingKey: new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
channelType: ChannelType.PubSub,
bufferSize: 10,
timeOut: TimeSpan.FromMilliseconds(20),
messagePumpType: MessagePumpType.Reactor,
queueAttributes: new SqsAttributes(lockTimeout: TimeSpan.FromSeconds(30))),
new SqsSubscription<FarewellEvent>(
subscriptionName: new SubscriptionName("paramore.example.farewell"),
channelName: new ChannelName(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)),
routingKey: new RoutingKey(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)),
channelType: ChannelType.PubSub,
bufferSize: 10,
timeOut: TimeSpan.FromMilliseconds(20),
messagePumpType: MessagePumpType.Reactor,
topicAttributes: new SnsAttributes(type: SqsType.Fifo),
queueAttributes: new SqsAttributes(lockTimeout: TimeSpan.FromSeconds(30), type: SqsType.Fifo))
};

var host = new HostBuilder()
.ConfigureServices((_, services) =>
//create the gateway
var serviceURL = "http://localhost:4566/";
var region = RegionEndpoint.USEast1;
var awsConnection = new AWSMessagingGatewayConnection(new BasicAWSCredentials("test", "test"), region,
cfg => { cfg.ServiceURL = serviceURL; });

services.AddConsumers(options =>
{
var subscriptions = new Subscription[]
{
new SqsSubscription<GreetingEvent>(
subscriptionName: new SubscriptionName("paramore.example.greeting"),
channelName: new ChannelName(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
routingKey:new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
channelType: ChannelType.PubSub,
bufferSize: 10,
timeOut: TimeSpan.FromMilliseconds(20),
messagePumpType: MessagePumpType.Reactor,
queueAttributes: new SqsAttributes(
lockTimeout: TimeSpan.FromSeconds(30)
)),
new SqsSubscription<FarewellEvent>(
subscriptionName:new SubscriptionName("paramore.example.farewell"),
channelName: new ChannelName(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)),
routingKey: new RoutingKey(typeof(FarewellEvent).FullName!.ToValidSNSTopicName(true)),
channelType: ChannelType.PubSub,
bufferSize: 10,
timeOut: TimeSpan.FromMilliseconds(20),
messagePumpType: MessagePumpType.Reactor,
queueAttributes: new SqsAttributes(
lockTimeout: TimeSpan.FromSeconds(30),
type: SqsType.Fifo
))
};

//create the gateway
if (new CredentialProfileStoreChain().TryGetAWSCredentials("default", out var credentials))
{
var serviceURL = "http://localhost:4566/"; // Environment.GetEnvironmentVariable("LOCALSTACK_SERVICE_URL");
var region = string.IsNullOrWhiteSpace(serviceURL)
? RegionEndpoint.EUWest1
: RegionEndpoint.USEast1;
var awsConnection = new AWSMessagingGatewayConnection(credentials, region,
cfg =>
{
if (!string.IsNullOrWhiteSpace(serviceURL))
{
cfg.ServiceURL = serviceURL;
}
});
options.Subscriptions = subscriptions;
options.DefaultChannelFactory = new ChannelFactory(awsConnection);
})
.AutoFromAssemblies();

services.AddConsumers(options =>
{
options.Subscriptions = subscriptions;
options.DefaultChannelFactory = new ChannelFactory(awsConnection);
})
.AutoFromAssemblies();
}
services.AddHostedService<ServiceActivatorHostedService>();
})
.UseConsoleLifetime()
.UseSerilog()
.Build();

services.AddHostedService<ServiceActivatorHostedService>();
})
.UseConsoleLifetime()
.UseSerilog()
.Build();
Console.CancelKeyPress += (_, _) => host.StopAsync().Wait();

await host.RunAsync();
}
}
await host.RunAsync();
Loading