Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<!-- Temporarily disable security warnings for transitive packages. -->
<NoWarn>NU1901;NU1902;NU1903;NU1904</NoWarn>
<UseArtifactsOutput>true</UseArtifactsOutput>
</PropertyGroup>

<!-- Enable OpenAI OpenTelemetry so OpenAI calls participate in tracing and metrics. -->
Expand Down
11 changes: 0 additions & 11 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,6 @@

<!-- Ensure individual warnings are shown when publishing -->
<TrimmerSingleWarn>false</TrimmerSingleWarn>
<!-- But ignore the single warn files marked below to suppress their known warnings. -->
<NoWarn>$(NoWarn);IL2104</NoWarn>

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed now that we are on RabbitMQ.Client v7

</PropertyGroup>

<Target Name="ConfigureTrimming"
BeforeTargets="PrepareForILLink">
<!-- Single warn the following assemblies, which have known warnings, so the warnings can be suppressed for now. -->
<ItemGroup>
<!-- https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1410 is tracking fixing the one EventSource warning in RabbitMQ. -->
<IlcArg Include="--singlewarnassembly:RabbitMQ.Client" />
</ItemGroup>
</Target>

</Project>
51 changes: 25 additions & 26 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,62 @@
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<CentralPackageTransitivePinningEnabled>true</CentralPackageTransitivePinningEnabled>
<AspnetVersion>10.0.0-preview.7.25380.108</AspnetVersion>
<MicrosoftExtensionsVersion>10.0.0-preview.7.25380.108</MicrosoftExtensionsVersion>
<AspireVersion>9.5.0</AspireVersion>
<AspireUnstablePackagesVersion>9.5.0-preview.1.25474.7</AspireUnstablePackagesVersion>
<AspnetVersion>10.0.0</AspnetVersion>
<MicrosoftExtensionsVersion>10.0.0</MicrosoftExtensionsVersion>
<AspireVersion>13.0.1</AspireVersion>
<AspireUnstablePackagesVersion>13.0.0-preview.1.25560.3</AspireUnstablePackagesVersion>
<GrpcVersion>2.71.0</GrpcVersion>
<DuendeVersion>7.3.1</DuendeVersion>
<ApiVersioningVersion>8.1.0</ApiVersioningVersion>
</PropertyGroup>
<ItemGroup>
<!-- Version together with Aspire -->
<PackageVersion Include="Aspire.Hosting.AppHost" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Hosting.Azure.CognitiveServices" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Hosting.PostgreSQL" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Hosting.RabbitMQ" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Hosting.Redis" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Hosting.Yarp" Version="$(AspireUnstablePackagesVersion)" />
<PackageVersion Include="Aspire.Hosting.Yarp" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Npgsql" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Npgsql.EntityFrameworkCore.PostgreSQL" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.RabbitMQ.Client" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.StackExchange.Redis" Version="$(AspireVersion)" />
<PackageVersion Include="Aspire.Azure.AI.OpenAI" Version="$(AspireUnstablePackagesVersion)" />
<PackageVersion Include="Microsoft.Extensions.ApiDescription.Server" Version="10.0.0-rc.1.25451.107">
<PackageVersion Include="Microsoft.Extensions.ApiDescription.Server" Version="10.0.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageVersion>
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery" Version="$(AspireVersion)" />
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery.Yarp" Version="$(AspireVersion)" />
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery" Version="$(MicrosoftExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery.Yarp" Version="$(MicrosoftExtensionsVersion)" />
<!-- Version together with Asp.Versioning -->
<PackageVersion Include="Asp.Versioning.Http" Version="$(ApiVersioningVersion)" />
<PackageVersion Include="Asp.Versioning.Http.Client" Version="$(ApiVersioningVersion)" />
<PackageVersion Include="Asp.Versioning.Mvc.ApiExplorer" Version="$(ApiVersioningVersion)" />
<!-- Version together with ASP.NET -->
<PackageVersion Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.Components.QuickGrid" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.Components.Web" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.Identity.UI" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.OpenApi" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.Extensions.Identity.Stores" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="9.9.0" />
<PackageVersion Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.Components.QuickGrid" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.Components.Web" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.Identity.UI" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.OpenApi" Version="10.0.0" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="10.0.0" />
<PackageVersion Include="Microsoft.Extensions.Identity.Stores" Version="10.0.0" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="10.0.0" />
<PackageVersion Include="Microsoft.OpenApi" Version="2.3.2" />
<PackageVersion Include="MSTest.TestFramework" Version="3.10.4" />
<PackageVersion Include="MSTest.TestAdapter" Version="3.10.4" />
<!-- Version together with EF -->
<PackageVersion Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0-rc.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Tools" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Tools" Version="10.0.0" />
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="NSubstitute.Analyzers.CSharp" Version="1.0.17" />
<PackageVersion Include="Pgvector" Version="0.3.2" />
<PackageVersion Include="Pgvector.EntityFrameworkCore" Version="0.2.2" />
<!-- Version together with runtime -->
<PackageVersion Include="Microsoft.Extensions.Options" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0-rc.1.25451.107" />
<PackageVersion Include="Microsoft.Extensions.Options" Version="10.0.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
<!-- Xabaril packages -->
<PackageVersion Include="AspNetCore.HealthChecks.Uris" Version="9.0.0" />
<!-- AI -->
Expand Down Expand Up @@ -89,7 +88,7 @@
<PackageVersion Include="Dapper" Version="2.1.35" />
<PackageVersion Include="FluentValidation" Version="12.0.0" />
<PackageVersion Include="FluentValidation.DependencyInjectionExtensions" Version="12.0.0" />
<PackageVersion Include="Google.Protobuf" Version="3.32.1" />
<PackageVersion Include="Google.Protobuf" Version="3.33.0" />
<PackageVersion Include="Microsoft.Web.LibraryManager.Build" Version="3.0.71" />
<PackageVersion Include="System.Reflection.TypeExtensions" Version="4.7.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
Expand Down
59 changes: 31 additions & 28 deletions src/EventBusRabbitMQ/RabbitMQEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public sealed class RabbitMQEventBus(
private readonly EventBusSubscriptionInfo _subscriptionInfo = subscriptionOptions.Value;
private IConnection _rabbitMQConnection;

private IModel _consumerChannel;
private IChannel _consumerChannel;

public Task PublishAsync(IntegrationEvent @event)
public async Task PublishAsync(IntegrationEvent @event)
{
var routingKey = @event.GetType().Name;

Expand All @@ -37,22 +37,24 @@ public Task PublishAsync(IntegrationEvent @event)
logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, routingKey);
}

using var channel = _rabbitMQConnection?.CreateModel() ?? throw new InvalidOperationException("RabbitMQ connection is not open");
using var channel = (await _rabbitMQConnection?.CreateChannelAsync()) ?? throw new InvalidOperationException("RabbitMQ connection is not open");

if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
}

channel.ExchangeDeclare(exchange: ExchangeName, type: "direct");
await channel.ExchangeDeclareAsync(
exchange: ExchangeName,
type: "direct");

var body = SerializeMessage(@event);

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md
var activityName = $"{routingKey} publish";

return _pipeline.Execute(() =>
await _pipeline.Execute(async () =>
{
using var activity = _activitySource.StartActivity(activityName, ActivityKind.Client);

Expand All @@ -70,9 +72,10 @@ public Task PublishAsync(IntegrationEvent @event)
contextToInject = Activity.Current.Context;
}

var properties = channel.CreateBasicProperties();
// persistent
properties.DeliveryMode = 2;
var properties = new BasicProperties()
{
DeliveryMode = DeliveryModes.Persistent
};

static void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, string value)
{
Expand All @@ -91,14 +94,12 @@ static void InjectTraceContextIntoBasicProperties(IBasicProperties props, string

try
{
channel.BasicPublish(
await channel.BasicPublishAsync(
exchange: ExchangeName,
routingKey: routingKey,
mandatory: true,
basicProperties: properties,
body: body);

return Task.CompletedTask;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -130,7 +131,7 @@ public void Dispose()

private async Task OnMessageReceived(object sender, BasicDeliverEventArgs eventArgs)
{
static IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperties props, string key)
static IEnumerable<string> ExtractTraceContextFromBasicProperties(IReadOnlyBasicProperties props, string key)
{
if (props.Headers.TryGetValue(key, out var value))
{
Expand Down Expand Up @@ -176,7 +177,7 @@ static IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperti
// Even on exception we take the message off the queue.
// in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).
// For more information see: https://www.rabbitmq.com/dlx.html
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
await _consumerChannel.BasicAckAsync(eventArgs.DeliveryTag, multiple: false);
}

private async Task ProcessEvent(string eventName, string message)
Expand Down Expand Up @@ -224,9 +225,8 @@ private byte[] SerializeMessage(IntegrationEvent @event)

public Task StartAsync(CancellationToken cancellationToken)
{
// Messaging is async so we don't need to wait for it to complete. On top of this
// the APIs are blocking, so we need to run this on a background thread.
_ = Task.Factory.StartNew(() =>
// Messaging is async so we don't need to wait for it to complete.
_ = Task.Factory.StartNew(async () =>
Comment thread
eerhardt marked this conversation as resolved.
{
try
{
Expand All @@ -243,21 +243,24 @@ public Task StartAsync(CancellationToken cancellationToken)
logger.LogTrace("Creating RabbitMQ consumer channel");
}

_consumerChannel = _rabbitMQConnection.CreateModel();
_consumerChannel = await _rabbitMQConnection.CreateChannelAsync();

_consumerChannel.CallbackException += (sender, ea) =>
_consumerChannel.CallbackExceptionAsync += (sender, ea) =>
{
logger.LogWarning(ea.Exception, "Error with RabbitMQ consumer channel");
return Task.CompletedTask;
};

_consumerChannel.ExchangeDeclare(exchange: ExchangeName,
type: "direct");
await _consumerChannel.ExchangeDeclareAsync(
exchange: ExchangeName,
type: "direct");

_consumerChannel.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
await _consumerChannel.QueueDeclareAsync(
queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

if (logger.IsEnabled(LogLevel.Trace))
{
Expand All @@ -266,16 +269,16 @@ public Task StartAsync(CancellationToken cancellationToken)

var consumer = new AsyncEventingBasicConsumer(_consumerChannel);

consumer.Received += OnMessageReceived;
consumer.ReceivedAsync += OnMessageReceived;

_consumerChannel.BasicConsume(
await _consumerChannel.BasicConsumeAsync(
queue: _queueName,
autoAck: false,
consumer: consumer);

foreach (var (eventName, _) in _subscriptionInfo.EventTypes)
{
_consumerChannel.QueueBind(
await _consumerChannel.QueueBindAsync(
queue: _queueName,
exchange: ExchangeName,
routingKey: eventName);
Expand Down
11 changes: 1 addition & 10 deletions src/EventBusRabbitMQ/RabbitMqDependencyInjectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using eShop.EventBusRabbitMQ;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using System.Diagnostics.CodeAnalysis;

namespace Microsoft.Extensions.Hosting;

Expand All @@ -16,18 +14,11 @@ public static class RabbitMqDependencyInjectionExtensions

private const string SectionName = "EventBus";

[UnconditionalSuppressMessage("Trimming", "IL2026:RequiresUnreferencedCode",
Justification = "EventBusOptions is a simple POCO with public properties that are safe for trimming.")]
[UnconditionalSuppressMessage("AOT", "IL3050:RequiresDynamicCode",
Justification = "EventBusOptions is a simple POCO with public properties that are safe for AOT.")]
public static IEventBusBuilder AddRabbitMqEventBus(this IHostApplicationBuilder builder, string connectionName)
{
ArgumentNullException.ThrowIfNull(builder);

builder.AddRabbitMQClient(connectionName, configureConnectionFactory: factory =>
{
((ConnectionFactory)factory).DispatchConsumersAsync = true;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RabbitMQ v7 is all async now, so this property no longer exists.

});
builder.AddRabbitMQClient(connectionName);

// RabbitMQ.Client doesn't have built-in support for OpenTelemetry, so we need to add it ourselves
builder.Services.AddOpenTelemetry()
Expand Down
24 changes: 15 additions & 9 deletions src/eShop.AppHost/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Aspire.Hosting.Lifecycle;
using Aspire.Hosting.Eventing;
using Aspire.Hosting.Lifecycle;
using Aspire.Hosting.Yarp;
using Aspire.Hosting.Yarp.Transforms;
using Yarp.ReverseProxy.Configuration;
Expand All @@ -20,21 +21,26 @@ internal static class Extensions
/// </summary>
public static IDistributedApplicationBuilder AddForwardedHeaders(this IDistributedApplicationBuilder builder)
{
builder.Services.TryAddLifecycleHook<AddForwardHeadersHook>();
builder.Services.TryAddEventingSubscriber<AddForwardHeadersSubscriber>();
return builder;
}

private class AddForwardHeadersHook : IDistributedApplicationLifecycleHook
private class AddForwardHeadersSubscriber : IDistributedApplicationEventingSubscriber

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we need this anymore.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you added it in e77b039. I'm not sure what this does TBH.

{
public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationToken cancellationToken = default)
public Task SubscribeAsync(IDistributedApplicationEventing eventing, DistributedApplicationExecutionContext executionContext, CancellationToken cancellationToken)
{
foreach (var p in appModel.GetProjectResources())
eventing.Subscribe<BeforeStartEvent>((@event, ct) =>
{
p.Annotations.Add(new EnvironmentCallbackAnnotation(context =>
foreach (var p in @event.Model.GetProjectResources())
{
context.EnvironmentVariables["ASPNETCORE_FORWARDEDHEADERS_ENABLED"] = "true";
}));
}
p.Annotations.Add(new EnvironmentCallbackAnnotation(context =>
{
context.EnvironmentVariables["ASPNETCORE_FORWARDEDHEADERS_ENABLED"] = "true";
}));
}

return Task.CompletedTask;
});

return Task.CompletedTask;
}
Expand Down
4 changes: 1 addition & 3 deletions src/eShop.AppHost/eShop.AppHost.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Sdk Name="Aspire.AppHost.Sdk" Version="9.5.0" />
<Project Sdk="Aspire.AppHost.Sdk/13.0.0">

<PropertyGroup>
<OutputType>Exe</OutputType>
Expand All @@ -10,7 +9,6 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Aspire.Hosting.AppHost" />
<PackageReference Include="Aspire.Hosting.RabbitMQ" />
<PackageReference Include="Aspire.Hosting.Redis" />
<PackageReference Include="Aspire.Hosting.PostgreSQL" />
Expand Down
4 changes: 1 addition & 3 deletions tests/Catalog.FunctionalTests/Catalog.FunctionalTests.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Sdk Name="Aspire.AppHost.Sdk" Version="9.5.0" />
<Project Sdk="Aspire.AppHost.Sdk/13.0.0">

<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
Expand All @@ -9,7 +8,6 @@

<ItemGroup>
<PackageReference Include="Asp.Versioning.Http.Client" />
<PackageReference Include="Aspire.Hosting.AppHost" />
<PackageReference Include="Aspire.Hosting.PostgreSQL" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" />
<PackageReference Include="Microsoft.AspNetCore.TestHost" />
Expand Down
Loading