Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/Http/Wolverine.Http.Tests/Samples/ExternalHttpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
var transport = new HttpTransport();
opts.Transports.Add(transport);
// Publish all messages to the external http endpoint using the named http client
// The .SendInline() method fails, so do not use it here
// This will publish anarray of envelopes to the external endpoint
opts.PublishAllMessages().ToHttpEndpoint(httpNamedClient);

// To publish individual messages instead of batches, use this instead
// opts.PublishAllMessages().ToHttpEndpoint(httpNamedClient).SendInline();

// If the httpendpoint supports native scheduled sends, use this instead
// opts.PublishAllMessages().ToHttpEndpoint(httpNamedClient, supportsNativeScheduledSend: true).SendInline();
});
builder.Services.AddWolverineHttp();
// Configure the named http client to point to the external wolverine server
Expand All @@ -24,9 +30,12 @@
client.BaseAddress = new Uri("https://where-ever-you want-message-to-go/");
//client.DefaultRequestHeaders.Add("Authorization", $"Bearer eyJ***");
});
// To handle the messages over HTTP, send them to https://where-your-app-with-message-handlers/_wolverine/batch/queue
// To handle the array of messages over HTTP, send them to https://where-your-app-with-message-handlers/_wolverine/batch/queue
// To handle single message over HTTP, send them to https://where-your-app-with-message-handlers/_wolverine/invoke
// Register the WolverineHttpTransportClient for sending messages over HTTP
builder.Services.AddScoped<WolverineHttpTransportClient>();
builder.Services.AddScoped<IWolverineHttpTransportClient, WolverineHttpTransportClient>();
// You can have your own implementation of IWolverineHttpTransportClient if you need custom behavior
// builder.Services.AddScoped<IWolverineHttpTransportClient, MyWolverineHttpTransportClient>();
var app = builder.Build();
app.MapWolverineEndpoints();
app.MapPost(
Expand Down
97 changes: 97 additions & 0 deletions src/Http/Wolverine.Http.Tests/Transport/HttpEndpointTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System.Reflection;
using JasperFx.Core;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using Shouldly;
using Wolverine.Configuration;
using Wolverine.Http.Transport;
using Wolverine.Runtime;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
using Xunit;

namespace Wolverine.Http.Tests.Transport;

public class HttpEndpointTests
{
private readonly Uri _uri = "http://localhost:5000".ToUri();
private readonly HttpEndpoint _endpoint;

public HttpEndpointTests()
{
_endpoint = new HttpEndpoint(_uri, EndpointRole.Application);
}

private ISender InvokeCreateSender(IWolverineRuntime runtime)
{
var method = typeof(HttpEndpoint).GetMethod("CreateSender", BindingFlags.Instance | BindingFlags.NonPublic);
return (ISender)method.Invoke(_endpoint, new object[] { runtime });
}

private bool InvokeSupportsMode(EndpointMode mode)
{
var method = typeof(HttpEndpoint).GetMethod("supportsMode", BindingFlags.Instance | BindingFlags.NonPublic);
return (bool)method.Invoke(_endpoint, new object[] { mode });
}

[Fact]
public void constructor_sets_uri_and_role()
{
_endpoint.Uri.ShouldBe(_uri);
_endpoint.Role.ShouldBe(EndpointRole.Application);
}

[Fact]
public async Task build_listener_returns_nullo_listener()
{
var runtime = Substitute.For<IWolverineRuntime>();
var receiver = Substitute.For<IReceiver>();

var listener = await _endpoint.BuildListenerAsync(runtime, receiver);

listener.ShouldBeOfType<NulloListener>();
listener.Address.ShouldBe(_uri);
}

[Fact]
public void create_sender_inline_mode()
{
var runtime = Substitute.For<IWolverineRuntime>();
runtime.Services.Returns(Substitute.For<IServiceProvider>());

_endpoint.Mode = EndpointMode.Inline;

var sender = InvokeCreateSender(runtime);

sender.ShouldBeOfType<InlineHttpSender>();
}

[Fact]
public void create_sender_buffered_mode()
{
var runtime = Substitute.For<IWolverineRuntime>();
runtime.Services.Returns(Substitute.For<IServiceProvider>());
runtime.LoggerFactory.Returns(NullLoggerFactory.Instance);

_endpoint.Mode = EndpointMode.BufferedInMemory;

var sender = InvokeCreateSender(runtime);

sender.ShouldBeOfType<BatchedSender>();
}

[Fact]
public void describe_properties_returns_base_properties()
{
var properties = _endpoint.DescribeProperties();
properties.ShouldNotBeNull();
}

[Fact]
public void supports_all_modes()
{
InvokeSupportsMode(EndpointMode.Inline).ShouldBeTrue();
InvokeSupportsMode(EndpointMode.BufferedInMemory).ShouldBeTrue();
InvokeSupportsMode(EndpointMode.Durable).ShouldBeTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task send_envelope_async()
_handler.LastRequest.ShouldNotBeNull();
_handler.LastRequest.Method.ShouldBe(HttpMethod.Post);
_handler.LastRequest.RequestUri.ToString().ShouldBe(uri);
_handler.LastRequest.Content.Headers.ContentType.MediaType.ShouldBe(HttpTransportExecutor.EnvelopeContentType);
_handler.LastRequest.Content.Headers.ContentType.MediaType.ShouldBe(HttpTransport.EnvelopeContentType);

var expectedData = EnvelopeSerializer.Serialize(envelope);
_handler.LastContent.ShouldBe(expectedData);
Expand All @@ -67,7 +67,7 @@ public async Task send_batch_async()
_handler.LastRequest.ShouldNotBeNull();
_handler.LastRequest.Method.ShouldBe(HttpMethod.Post);
_handler.LastRequest.RequestUri.ToString().ShouldBe("https://target-url/");
_handler.LastRequest.Content.Headers.ContentType.MediaType.ShouldBe(HttpTransportExecutor.EnvelopeBatchContentType);
_handler.LastRequest.Content.Headers.ContentType.MediaType.ShouldBe(HttpTransport.EnvelopeBatchContentType);

var expectedData = EnvelopeSerializer.Serialize(envelopes);
_handler.LastContent.ShouldBe(expectedData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task publish_multiple_messages()

var (tracked, result) = await TrackedHttpCall(s =>
{
s.Post.ByteArray(data).ToUrl("/_wolverine/batch/one").ContentType(HttpTransportExecutor.EnvelopeBatchContentType);
s.Post.ByteArray(data).ToUrl("/_wolverine/batch/one").ContentType(HttpTransport.EnvelopeBatchContentType);
});

tracked.Executed.SingleMessage<HttpMessage1>().Name.ShouldBe("one");
Expand All @@ -59,7 +59,7 @@ public async Task invoke_one_with_no_response()

var (tracked, result) = await TrackedHttpCall(s =>
{
s.Post.ByteArray(data).ToUrl("/_wolverine/invoke").ContentType(HttpTransportExecutor.EnvelopeContentType);
s.Post.ByteArray(data).ToUrl("/_wolverine/invoke").ContentType(HttpTransport.EnvelopeContentType);
});

tracked.Executed.SingleMessage<HttpMessage1>().Name.ShouldBe("Mat Cauthon");
Expand All @@ -80,8 +80,8 @@ public async Task invoke_one_with_expected_response()

var (tracked, result) = await TrackedHttpCall(s =>
{
s.Post.ByteArray(data).ToUrl("/_wolverine/invoke").ContentType(HttpTransportExecutor.EnvelopeContentType);
s.ContentTypeShouldBe(HttpTransportExecutor.EnvelopeContentType);
s.Post.ByteArray(data).ToUrl("/_wolverine/invoke").ContentType(HttpTransport.EnvelopeContentType);
s.ContentTypeShouldBe(HttpTransport.EnvelopeContentType);
});

var resultData = await result.Context.Response.Body.ReadAllBytesAsync();
Expand Down
18 changes: 11 additions & 7 deletions src/Http/Wolverine.Http/Transport/HttpEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public HttpEndpoint(Uri uri, EndpointRole role) : base(uri, role)
{
}

internal bool SupportsNativeScheduledSend { get; set; }
public string OutboundUri { get; set; }

public override ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver)
Expand All @@ -21,11 +22,13 @@ public override ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtim

protected override ISender CreateSender(IWolverineRuntime runtime)
{
return new BatchedSender(
this,
new HttpSenderProtocol(this, runtime.Services),
runtime.Cancellation,
runtime.LoggerFactory.CreateLogger<HttpSenderProtocol>());
return Mode == EndpointMode.Inline
? new InlineHttpSender(this, runtime, runtime.Services)
: new BatchedSender(
this,
new HttpSenderProtocol(this, runtime.Services),
runtime.Cancellation,
runtime.LoggerFactory.CreateLogger<HttpSenderProtocol>());
}

public override IDictionary<string, object> DescribeProperties()
Expand All @@ -35,6 +38,7 @@ public override IDictionary<string, object> DescribeProperties()

protected override bool supportsMode(EndpointMode mode)
{
return mode != EndpointMode.Inline;
return true;
}
}
}

4 changes: 2 additions & 2 deletions src/Http/Wolverine.Http/Transport/HttpSenderProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ internal class HttpSenderProtocol : ISenderProtocol
{
private readonly HttpEndpoint _endpoint;
private readonly IServiceProvider _services;
private readonly IHttpClientFactory _clientFactory;

public HttpSenderProtocol(HttpEndpoint endpoint, IServiceProvider services)
{
Expand All @@ -19,7 +18,8 @@ public HttpSenderProtocol(HttpEndpoint endpoint, IServiceProvider services)
public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch batch)
{
using var scope = _services.CreateScope();
var client = scope.ServiceProvider.GetRequiredService<WolverineHttpTransportClient>();
var client = scope.ServiceProvider.GetRequiredService<IWolverineHttpTransportClient>() ??
throw new InvalidOperationException("IWolverineHttpTransportClient is not registered in the service container");
await client.SendBatchAsync(_endpoint.OutboundUri, batch);
}
}
3 changes: 3 additions & 0 deletions src/Http/Wolverine.Http/Transport/HttpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public HttpTransport() : base("https", "HTTP Transport")
{
}

public const string EnvelopeContentType = "binary/wolverine-envelope";
public const string EnvelopeBatchContentType = "binary/wolverine-envelopes";

protected override IEnumerable<HttpEndpoint> endpoints()
{
return _endpoints;
Expand Down
7 changes: 2 additions & 5 deletions src/Http/Wolverine.Http/Transport/HttpTransportExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ namespace Wolverine.Http.Transport;

internal class HttpTransportExecutor
{
public static readonly string EnvelopeContentType = "binary/wolverine-envelope";
public static readonly string EnvelopeBatchContentType = "binary/wolverine-envelopes";

private readonly WolverineRuntime _runtime;
private readonly ILogger<HttpTransportExecutor> _logger;

Expand All @@ -29,7 +26,7 @@ public async Task<IResult> ExecuteBatchAsync(HttpContext httpContext)
{
if (httpContext.Request.Headers.TryGetValue("content-type", out var values))
{
if (values[0] != EnvelopeBatchContentType)
if (values[0] != HttpTransport.EnvelopeBatchContentType)
{
return Results.StatusCode(415);
}
Expand Down Expand Up @@ -74,7 +71,7 @@ public async Task<IResult> InvokeAsync(HttpContext httpContext)
{
if (httpContext.Request.Headers.TryGetValue("content-type", out var values))
{
if (values[0] != EnvelopeContentType)
if (values[0] != HttpTransport.EnvelopeContentType)
{
return Results.StatusCode(415);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Http/Wolverine.Http/Transport/HttpTransportExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static RouteGroupBuilder MapWolverineHttpTransportEndpoints(this IEndpoin
/// <param name="publishing"></param>
/// <param name="url"></param>
/// <returns></returns>
public static HttpTransportSubscriberConfiguration ToHttpEndpoint(this IPublishToExpression publishing, string url)
public static HttpTransportSubscriberConfiguration ToHttpEndpoint(this IPublishToExpression publishing, string url, bool supportsNativeScheduledSend = false)
{
var transports = publishing.As<PublishingExpression>().Parent.Transports;
var transport = transports.GetOrCreate<HttpTransport>();
Expand All @@ -42,7 +42,7 @@ public static HttpTransportSubscriberConfiguration ToHttpEndpoint(this IPublishT

// This is necessary unfortunately to hook up the subscription rules
publishing.To(endpoint.Uri);

endpoint.SupportsNativeScheduledSend = supportsNativeScheduledSend;
return new HttpTransportSubscriberConfiguration(endpoint);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Wolverine.Transports;

namespace Wolverine.Http.Transport;

public interface IWolverineHttpTransportClient
{
Task SendBatchAsync(string uri, OutgoingMessageBatch batch);
Task SendAsync(string uri, Envelope envelope);
}
34 changes: 34 additions & 0 deletions src/Http/Wolverine.Http/Transport/InlineHttpSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;
using Wolverine.Transports.Sending;

namespace Wolverine.Http.Transport;

internal class InlineHttpSender(HttpEndpoint endpoint, IWolverineRuntime runtime, IServiceProvider services) : ISender
{
public async ValueTask SendAsync(Envelope envelope)
{
try
{
using var scope = services.CreateScope();
var client = scope.ServiceProvider.GetRequiredService<IWolverineHttpTransportClient>() ??
throw new InvalidOperationException("IWolverineHttpTransportClient is not registered in the service container");
await client.SendAsync(endpoint.OutboundUri, envelope);
}
catch (Exception ex)
{
var logger = runtime.LoggerFactory.CreateLogger<InlineHttpSender>();
logger.LogError(
ex,
"Failed to send message {MessageId} to {Uri}",
envelope.Id,
endpoint.OutboundUri);
}
}

public bool SupportsNativeScheduledSend => endpoint.SupportsNativeScheduledSend;
public Uri Destination => endpoint.Uri;
public Task<bool> PingAsync() => Task.FromResult(true);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@

namespace Wolverine.Http.Transport;

public class WolverineHttpTransportClient(IHttpClientFactory clientFactory)
public class WolverineHttpTransportClient(IHttpClientFactory clientFactory) : IWolverineHttpTransportClient
{
public async Task SendBatchAsync(string uri, OutgoingMessageBatch batch)
{
var client = clientFactory.CreateClient(uri);
var content = new ByteArrayContent(EnvelopeSerializer.Serialize(batch.Messages));
content.Headers.ContentType = new MediaTypeHeaderValue(HttpTransportExecutor.EnvelopeBatchContentType);
content.Headers.ContentType = new MediaTypeHeaderValue(HttpTransport.EnvelopeBatchContentType);
await client.PostAsync(client.BaseAddress, content);
}

public async Task SendAsync(string uri, Envelope envelope)
{
var client = clientFactory.CreateClient(uri);
var content = new ByteArrayContent(EnvelopeSerializer.Serialize(envelope));
content.Headers.ContentType = new MediaTypeHeaderValue(HttpTransportExecutor.EnvelopeContentType);
await client.PostAsync(uri, content);
content.Headers.ContentType = new MediaTypeHeaderValue(HttpTransport.EnvelopeContentType);
await client.PostAsync(client.BaseAddress, content);
}
}
Loading