From 3591fb73bc045169bda44b9fe9a211f26825423e Mon Sep 17 00:00:00 2001 From: Simon Cropp Date: Tue, 23 Apr 2024 20:20:22 +1000 Subject: [PATCH 1/2] move to using declarations --- .../Adaptors/Data/MigrationSqlHelper.cs | 14 +- samples/HelloAsyncListeners/IpFyApi.cs | 22 +- samples/HelloWorldAsync/IpFyApi.cs | 22 +- .../GreetingsWeb/Database/SchemaCreation.cs | 28 +-- .../Database/SchemaCreation.cs | 52 ++-- .../GreetingsWeb/Database/SchemaCreation.cs | 38 ++- samples/WebAPI_EFCore/GreetingsWeb/Startup.cs | 6 +- .../Database/SchemaCreation.cs | 50 ++-- .../MsSqlInbox.cs | 92 ++++--- .../MySqlInbox.cs | 100 ++++---- .../PostgreSqlInbox.cs | 40 ++- .../SqliteInbox.cs | 102 ++++---- .../AWSMessagingGateway.cs | 36 ++- .../ChannelFactory.cs | 102 ++++---- .../SqsMessageConsumer.cs | 54 ++-- .../SqsMessageProducer.cs | 24 +- .../KafkaMessagingGateway.cs | 134 +++++----- .../SqlQueues/MsSqlMessageQueue.cs | 80 +++--- .../RedisMessageConsumer.cs | 38 ++- .../RedisMessageCreator.cs | 59 +++-- .../RedisMessageGateway.cs | 6 +- .../RedisMessageProducer.cs | 32 ++- .../MsSqlOutbox.cs | 116 ++++----- .../MySqlOutbox.cs | 130 +++++----- .../PostgreSqlOutbox.cs | 110 ++++---- .../SqliteOutbox.cs | 112 ++++----- .../S3LuggageStore.cs | 12 +- src/Paramore.Brighter/CommandProcessor.cs | 238 +++++++++--------- .../When_requeueing_redrives_to_the_dlq.cs | 30 +-- ...n_throwing_defer_action_respect_redrive.cs | 30 ++- ...hen_A_Request_Logger_Is_In_The_Pipeline.cs | 40 ++- ...Request_Logger_Is_In_The_Pipeline_Async.cs | 35 ++- .../MsSqlTestHelper.cs | 60 ++--- .../MySqlTestHelper.cs | 70 ++---- .../PostgresSqlTestHelper.cs | 68 ++--- ...ng_client_configuration_via_the_gateway.cs | 32 ++- .../Outbox/SQlOutboxMigrationTests.cs | 20 +- .../SqliteTestHelper.cs | 14 +- 38 files changed, 1011 insertions(+), 1237 deletions(-) diff --git a/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs b/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs index 1330da4202..49c23caac7 100644 --- a/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs +++ b/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs @@ -12,15 +12,11 @@ public static void ApplyResource(Migration migration, MigrationBuilder migration var assembly = Assembly.GetExecutingAssembly(); var name = $"{migration.GetType().Namespace}.{resourceName}"; - using (var stream = assembly.GetManifestResourceStream(name)) - { - if (stream == null) throw new ArgumentNullException(resourceName); - using (var textStreamReader = new StreamReader(stream)) - { - var sql = textStreamReader.ReadToEnd(); - migrationBuilder.Sql(sql); - } - } + using var stream = assembly.GetManifestResourceStream(name); + if (stream == null) throw new ArgumentNullException(resourceName); + using var textStreamReader = new StreamReader(stream); + var sql = textStreamReader.ReadToEnd(); + migrationBuilder.Sql(sql); } } } diff --git a/samples/HelloAsyncListeners/IpFyApi.cs b/samples/HelloAsyncListeners/IpFyApi.cs index 33f51ae762..7ffb64c342 100644 --- a/samples/HelloAsyncListeners/IpFyApi.cs +++ b/samples/HelloAsyncListeners/IpFyApi.cs @@ -29,19 +29,17 @@ public IpFyApi(Uri endpoint) public async Task GetAsync(CancellationToken cancellationToken = default) { - using (var client = new HttpClient()) - { - client.BaseAddress = _endpoint; - client.DefaultRequestHeaders.Clear(); + using var client = new HttpClient(); + client.BaseAddress = _endpoint; + client.DefaultRequestHeaders.Clear(); - var response = await client.GetAsync("", cancellationToken); - string result; - if (response.IsSuccessStatusCode) - result = await response.Content.ReadAsStringAsync(cancellationToken); - else - result = "API returned HTTP status " + response.StatusCode; - return new IpFyApiResult(response.IsSuccessStatusCode, result); - } + var response = await client.GetAsync("", cancellationToken); + string result; + if (response.IsSuccessStatusCode) + result = await response.Content.ReadAsStringAsync(cancellationToken); + else + result = "API returned HTTP status " + response.StatusCode; + return new IpFyApiResult(response.IsSuccessStatusCode, result); } } } diff --git a/samples/HelloWorldAsync/IpFyApi.cs b/samples/HelloWorldAsync/IpFyApi.cs index 78cf597860..d9185aa959 100644 --- a/samples/HelloWorldAsync/IpFyApi.cs +++ b/samples/HelloWorldAsync/IpFyApi.cs @@ -53,19 +53,17 @@ public IpFyApi(Uri endpoint) public async Task GetAsync(CancellationToken cancellationToken = default) { - using (var client = new HttpClient()) - { - client.BaseAddress = _endpoint; - client.DefaultRequestHeaders.Clear(); + using var client = new HttpClient(); + client.BaseAddress = _endpoint; + client.DefaultRequestHeaders.Clear(); - var response = await client.GetAsync("", cancellationToken); - string result; - if (response.IsSuccessStatusCode) - result = await response.Content.ReadAsStringAsync(cancellationToken); - else - result = "API returned HTTP status " + response.StatusCode; - return new IpFyApiResult(response.IsSuccessStatusCode, result); - } + var response = await client.GetAsync("", cancellationToken); + string result; + if (response.IsSuccessStatusCode) + result = await response.Content.ReadAsStringAsync(cancellationToken); + else + result = "API returned HTTP status " + response.StatusCode; + return new IpFyApiResult(response.IsSuccessStatusCode, result); } } } diff --git a/samples/WebAPI_Dapper/GreetingsWeb/Database/SchemaCreation.cs b/samples/WebAPI_Dapper/GreetingsWeb/Database/SchemaCreation.cs index 639a46424d..3661b74700 100644 --- a/samples/WebAPI_Dapper/GreetingsWeb/Database/SchemaCreation.cs +++ b/samples/WebAPI_Dapper/GreetingsWeb/Database/SchemaCreation.cs @@ -61,22 +61,20 @@ public static bool HasBinaryMessagePayload(this IHost webHost) public static IHost MigrateDatabase(this IHost webHost) { - using (var scope = webHost.Services.CreateScope()) - { - var services = scope.ServiceProvider; + using var scope = webHost.Services.CreateScope(); + var services = scope.ServiceProvider; - try - { - var runner = services.GetRequiredService(); - runner.ListMigrations(); - runner.MigrateUp(); - } - catch (Exception ex) - { - var logger = services.GetRequiredService>(); - logger.LogError(ex, "An error occurred while migrating the database."); - throw; - } + try + { + var runner = services.GetRequiredService(); + runner.ListMigrations(); + runner.MigrateUp(); + } + catch (Exception ex) + { + var logger = services.GetRequiredService>(); + logger.LogError(ex, "An error occurred while migrating the database."); + throw; } return webHost; diff --git a/samples/WebAPI_Dapper/SalutationAnalytics/Database/SchemaCreation.cs b/samples/WebAPI_Dapper/SalutationAnalytics/Database/SchemaCreation.cs index 9ddeeaf24e..541bd64188 100644 --- a/samples/WebAPI_Dapper/SalutationAnalytics/Database/SchemaCreation.cs +++ b/samples/WebAPI_Dapper/SalutationAnalytics/Database/SchemaCreation.cs @@ -47,50 +47,44 @@ public static IHost CheckDbIsUp(this IHost host) public static IHost CreateInbox(this IHost host) { - using (var scope = host.Services.CreateScope()) - { - var services = scope.ServiceProvider; - var env = services.GetService(); - var config = services.GetService(); + using var scope = host.Services.CreateScope(); + var services = scope.ServiceProvider; + var env = services.GetService(); + var config = services.GetService(); - CreateInbox(config, env); - } + CreateInbox(config, env); return host; } public static IHost CreateOutbox(this IHost webHost, bool hasBinaryMessagePayload) { - using (var scope = webHost.Services.CreateScope()) - { - var services = scope.ServiceProvider; - var env = services.GetService(); - var config = services.GetService(); + using var scope = webHost.Services.CreateScope(); + var services = scope.ServiceProvider; + var env = services.GetService(); + var config = services.GetService(); - CreateOutbox(config, env, hasBinaryMessagePayload); - } + CreateOutbox(config, env, hasBinaryMessagePayload); return webHost; } public static IHost MigrateDatabase(this IHost host) { - using (var scope = host.Services.CreateScope()) - { - var services = scope.ServiceProvider; + using var scope = host.Services.CreateScope(); + var services = scope.ServiceProvider; - try - { - var runner = services.GetRequiredService(); - runner.ListMigrations(); - runner.MigrateUp(); - } - catch (Exception ex) - { - var logger = services.GetRequiredService>(); - logger.LogError(ex, "An error occurred while migrating the database."); - throw; - } + try + { + var runner = services.GetRequiredService(); + runner.ListMigrations(); + runner.MigrateUp(); + } + catch (Exception ex) + { + var logger = services.GetRequiredService>(); + logger.LogError(ex, "An error occurred while migrating the database."); + throw; } return host; diff --git a/samples/WebAPI_EFCore/GreetingsWeb/Database/SchemaCreation.cs b/samples/WebAPI_EFCore/GreetingsWeb/Database/SchemaCreation.cs index fe4bccbfa0..04699fcf33 100644 --- a/samples/WebAPI_EFCore/GreetingsWeb/Database/SchemaCreation.cs +++ b/samples/WebAPI_EFCore/GreetingsWeb/Database/SchemaCreation.cs @@ -22,21 +22,19 @@ public static class SchemaCreation public static IHost MigrateDatabase(this IHost webHost) { - using (var scope = webHost.Services.CreateScope()) + using var scope = webHost.Services.CreateScope(); + var services = scope.ServiceProvider; + + try + { + var db = services.GetRequiredService(); + + db.Database.Migrate(); + } + catch (Exception ex) { - var services = scope.ServiceProvider; - - try - { - var db = services.GetRequiredService(); - - db.Database.Migrate(); - } - catch (Exception ex) - { - var logger = services.GetRequiredService>(); - logger.LogError(ex, "An error occurred while migrating the database."); - } + var logger = services.GetRequiredService>(); + logger.LogError(ex, "An error occurred while migrating the database."); } return webHost; @@ -86,14 +84,12 @@ private static void WaitToConnect(string connectionString) public static IHost CreateOutbox(this IHost webHost) { - using (var scope = webHost.Services.CreateScope()) - { - var services = scope.ServiceProvider; - var env = services.GetService(); - var config = services.GetService(); + using var scope = webHost.Services.CreateScope(); + var services = scope.ServiceProvider; + var env = services.GetService(); + var config = services.GetService(); - CreateOutbox(config, env); - } + CreateOutbox(config, env); return webHost; } diff --git a/samples/WebAPI_EFCore/GreetingsWeb/Startup.cs b/samples/WebAPI_EFCore/GreetingsWeb/Startup.cs index f4ae7fdde1..53d677202a 100644 --- a/samples/WebAPI_EFCore/GreetingsWeb/Startup.cs +++ b/samples/WebAPI_EFCore/GreetingsWeb/Startup.cs @@ -94,10 +94,8 @@ private void CheckDbIsUp() //don't check this for SQlite in development if (!_env.IsDevelopment()) { - using (var conn = new MySqlConnection(connectionString)) - { - conn.Open(); - } + using var conn = new MySqlConnection(connectionString); + conn.Open(); } }); } diff --git a/samples/WebAPI_EFCore/SalutationAnalytics/Database/SchemaCreation.cs b/samples/WebAPI_EFCore/SalutationAnalytics/Database/SchemaCreation.cs index 69fe4d1c23..7c5e138b2c 100644 --- a/samples/WebAPI_EFCore/SalutationAnalytics/Database/SchemaCreation.cs +++ b/samples/WebAPI_EFCore/SalutationAnalytics/Database/SchemaCreation.cs @@ -24,21 +24,19 @@ public static class SchemaCreation public static IHost MigrateDatabase(this IHost host) { - using (var scope = host.Services.CreateScope()) + using var scope = host.Services.CreateScope(); + var services = scope.ServiceProvider; + + try + { + var db = services.GetRequiredService(); + + db.Database.Migrate(); + } + catch (Exception ex) { - var services = scope.ServiceProvider; - - try - { - var db = services.GetRequiredService(); - - db.Database.Migrate(); - } - catch (Exception ex) - { - var logger = services.GetRequiredService>(); - logger.LogError(ex, "An error occurred while migrating the database."); - } + var logger = services.GetRequiredService>(); + logger.LogError(ex, "An error occurred while migrating the database."); } return host; @@ -88,14 +86,12 @@ private static void WaitToConnect(string connectionString) public static IHost CreateInbox(this IHost host) { - using (var scope = host.Services.CreateScope()) - { - var services = scope.ServiceProvider; - var env = services.GetService(); - var config = services.GetService(); + using var scope = host.Services.CreateScope(); + var services = scope.ServiceProvider; + var env = services.GetService(); + var config = services.GetService(); - CreateInbox(config, env); - } + CreateInbox(config, env); return host; } @@ -152,14 +148,12 @@ private static void CreateInboxProduction(string connectionString) public static IHost CreateOutbox(this IHost webHost) { - using (var scope = webHost.Services.CreateScope()) - { - var services = scope.ServiceProvider; - var env = services.GetService(); - var config = services.GetService(); + using var scope = webHost.Services.CreateScope(); + var services = scope.ServiceProvider; + var env = services.GetService(); + var config = services.GetService(); - CreateOutbox(config, env); - } + CreateOutbox(config, env); return webHost; } diff --git a/src/Paramore.Brighter.Inbox.MsSql/MsSqlInbox.cs b/src/Paramore.Brighter.Inbox.MsSql/MsSqlInbox.cs index da4260222a..b63fbf1f8f 100644 --- a/src/Paramore.Brighter.Inbox.MsSql/MsSqlInbox.cs +++ b/src/Paramore.Brighter.Inbox.MsSql/MsSqlInbox.cs @@ -82,25 +82,23 @@ public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) { var parameters = InitAddDbParameters(command, contextKey); - using (var connection = _connectionProvider.GetConnection()) + using var connection = _connectionProvider.GetConnection(); + var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); + try { - var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); - try + sqlcmd.ExecuteNonQuery(); + } + catch (SqlException sqlException) + { + if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) { - sqlcmd.ExecuteNonQuery(); + s_logger.LogWarning( + "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); + return; } - catch (SqlException sqlException) - { - if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) - { - s_logger.LogWarning( - "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - return; - } - throw; - } + throw; } } @@ -158,25 +156,23 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise { var parameters = InitAddDbParameters(command, contextKey); - using (var connection = await _connectionProvider.GetConnectionAsync(cancellationToken)) + using var connection = await _connectionProvider.GetConnectionAsync(cancellationToken); + var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); + try + { + await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + } + catch (SqlException sqlException) { - var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); - try + if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) { - await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + s_logger.LogWarning( + "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); + return; } - catch (SqlException sqlException) - { - if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) - { - s_logger.LogWarning( - "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - return; - } - throw; - } + throw; } } @@ -265,16 +261,14 @@ private T ExecuteCommand( params IDbDataParameter[] parameters ) { - using (var connection = _connectionProvider.GetConnection()) - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); - - var item = execute(command); - return item; - } + using var connection = _connectionProvider.GetConnection(); + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; + command.CommandText = sql; + command.Parameters.AddRange(parameters); + + var item = execute(command); + return item; } private async Task ExecuteCommandAsync( @@ -284,16 +278,14 @@ private async Task ExecuteCommandAsync( CancellationToken cancellationToken = default, params IDbDataParameter[] parameters) { - using (var connection = await _connectionProvider.GetConnectionAsync(cancellationToken)) - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); - - var item = await execute(command).ConfigureAwait(ContinueOnCapturedContext); - return item; - } + using var connection = await _connectionProvider.GetConnectionAsync(cancellationToken); + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; + command.CommandText = sql; + command.Parameters.AddRange(parameters); + + var item = await execute(command).ConfigureAwait(ContinueOnCapturedContext); + return item; } private DbCommand InitAddDbCommand(DbConnection connection, IDbDataParameter[] parameters, int timeoutInMilliseconds) diff --git a/src/Paramore.Brighter.Inbox.MySql/MySqlInbox.cs b/src/Paramore.Brighter.Inbox.MySql/MySqlInbox.cs index 58234f6469..d81c4c78bb 100644 --- a/src/Paramore.Brighter.Inbox.MySql/MySqlInbox.cs +++ b/src/Paramore.Brighter.Inbox.MySql/MySqlInbox.cs @@ -68,26 +68,24 @@ public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) { var parameters = InitAddDbParameters(command, contextKey); - using (var connection = GetConnection()) + using var connection = GetConnection(); + connection.Open(); + var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); + try { - connection.Open(); - var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); - try + sqlcmd.ExecuteNonQuery(); + } + catch (MySqlException sqlException) + { + if (sqlException.Number == MySqlDuplicateKeyError) { - sqlcmd.ExecuteNonQuery(); + s_logger.LogWarning( + "MySqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); + return; } - catch (MySqlException sqlException) - { - if (sqlException.Number == MySqlDuplicateKeyError) - { - s_logger.LogWarning( - "MySqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - return; - } - throw; - } + throw; } } @@ -179,26 +177,24 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise { var parameters = InitAddDbParameters(command, contextKey); - using (var connection = GetConnection()) + using var connection = GetConnection(); + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); + try + { + await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + } + catch (MySqlException sqlException) { - await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); - try + if (sqlException.Number == MySqlDuplicateKeyError) { - await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + s_logger.LogWarning( + "MySqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); + return; } - catch (MySqlException sqlException) - { - if (sqlException.Number == MySqlDuplicateKeyError) - { - s_logger.LogWarning( - "MySqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - return; - } - throw; - } + throw; } } @@ -254,17 +250,15 @@ private DbParameter CreateSqlParameter(string parameterName, object value) private T ExecuteCommand(Func execute, string sql, int timeoutInMilliseconds, params DbParameter[] parameters) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); - - connection.Open(); - var item = execute(command); - return item; - } + using var connection = GetConnection(); + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; + command.CommandText = sql; + command.Parameters.AddRange(parameters); + + connection.Open(); + var item = execute(command); + return item; } private async Task ExecuteCommandAsync( @@ -274,17 +268,15 @@ private async Task ExecuteCommandAsync( CancellationToken cancellationToken = default, params DbParameter[] parameters) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); - - await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - var item = await execute(command).ConfigureAwait(ContinueOnCapturedContext); - return item; - } + using var connection = GetConnection(); + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; + command.CommandText = sql; + command.Parameters.AddRange(parameters); + + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + var item = await execute(command).ConfigureAwait(ContinueOnCapturedContext); + return item; } private DbConnection GetConnection() diff --git a/src/Paramore.Brighter.Inbox.Postgres/PostgreSqlInbox.cs b/src/Paramore.Brighter.Inbox.Postgres/PostgreSqlInbox.cs index fc046b7ae4..5cd2b8baf9 100644 --- a/src/Paramore.Brighter.Inbox.Postgres/PostgreSqlInbox.cs +++ b/src/Paramore.Brighter.Inbox.Postgres/PostgreSqlInbox.cs @@ -66,10 +66,8 @@ public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) var connection = GetConnection(); try { - using (var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds)) - { - sqlcmd.ExecuteNonQuery(); - } + using var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); + sqlcmd.ExecuteNonQuery(); } catch (PostgresException sqlException) { @@ -120,10 +118,8 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise try { - using (var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds)) - { - await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - } + using var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); + await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); } catch (PostgresException sqlException) { @@ -242,16 +238,14 @@ private T ExecuteCommand(Func execute, string sql, int timeoutI try { - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) - command.CommandTimeout = timeoutInMilliseconds; + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) + command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); + command.CommandText = sql; + command.Parameters.AddRange(parameters); - return execute(command); - } + return execute(command); } finally { @@ -270,16 +264,14 @@ private async Task ExecuteCommandAsync( try { - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) - command.CommandTimeout = timeoutInMilliseconds; + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) + command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); + command.CommandText = sql; + command.Parameters.AddRange(parameters); - return await execute(command).ConfigureAwait(ContinueOnCapturedContext); - } + return await execute(command).ConfigureAwait(ContinueOnCapturedContext); } finally { diff --git a/src/Paramore.Brighter.Inbox.Sqlite/SqliteInbox.cs b/src/Paramore.Brighter.Inbox.Sqlite/SqliteInbox.cs index a2924dd073..d8f892a05e 100644 --- a/src/Paramore.Brighter.Inbox.Sqlite/SqliteInbox.cs +++ b/src/Paramore.Brighter.Inbox.Sqlite/SqliteInbox.cs @@ -60,26 +60,22 @@ public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) { var parameters = InitAddDbParameters(command, contextKey); - using (var connection = GetConnection()) + using var connection = GetConnection(); + connection.Open(); + var sqlAdd = GetAddSql(); + using var sqlcmd = connection.CreateCommand(); + FormatAddCommand(parameters, sqlcmd, sqlAdd, timeoutInMilliseconds); + try { - connection.Open(); - var sqlAdd = GetAddSql(); - using (var sqlcmd = connection.CreateCommand()) + sqlcmd.ExecuteNonQuery(); + } + catch (SqliteException sqliteException) + { + if (IsExceptionUnqiueOrDuplicateIssue(sqliteException)) { - FormatAddCommand(parameters, sqlcmd, sqlAdd, timeoutInMilliseconds); - try - { - sqlcmd.ExecuteNonQuery(); - } - catch (SqliteException sqliteException) - { - if (IsExceptionUnqiueOrDuplicateIssue(sqliteException)) - { - s_logger.LogWarning( - "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - } - } + s_logger.LogWarning( + "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); } } } @@ -159,25 +155,21 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise { var parameters = InitAddDbParameters(command, contextKey); - using (var connection = GetConnection()) + using var connection = GetConnection(); + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + var sqlAdd = GetAddSql(); + using var sqlcmd = connection.CreateCommand(); + FormatAddCommand(parameters, sqlcmd, sqlAdd, timeoutInMilliseconds); + try { - await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - var sqlAdd = GetAddSql(); - using (var sqlcmd = connection.CreateCommand()) - { - FormatAddCommand(parameters, sqlcmd, sqlAdd, timeoutInMilliseconds); - try - { - await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - } - catch (SqliteException sqliteException) - { - if (!IsExceptionUnqiueOrDuplicateIssue(sqliteException)) throw; - s_logger.LogWarning( - "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - } - } + await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + } + catch (SqliteException sqliteException) + { + if (!IsExceptionUnqiueOrDuplicateIssue(sqliteException)) throw; + s_logger.LogWarning( + "MsSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); } } @@ -231,32 +223,28 @@ public DbConnection GetConnection() public T ExecuteCommand(Func execute, string sql, int timeoutInMilliseconds, params DbParameter[] parameters) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - AddParamtersParamArrayToCollection(parameters, command); - - connection.Open(); - var item = execute(command); - return item; - } + using var connection = GetConnection(); + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; + command.CommandText = sql; + AddParamtersParamArrayToCollection(parameters, command); + + connection.Open(); + var item = execute(command); + return item; } public async Task ExecuteCommandAsync(Func> execute, string sql, int timeoutInMilliseconds, DbParameter[] parameters, CancellationToken cancellationToken = default) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - AddParamtersParamArrayToCollection(parameters, command); - - await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - return await execute(command).ConfigureAwait(ContinueOnCapturedContext); - } + using var connection = GetConnection(); + using var command = connection.CreateCommand(); + if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; + command.CommandText = sql; + AddParamtersParamArrayToCollection(parameters, command); + + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + return await execute(command).ConfigureAwait(ContinueOnCapturedContext); } private void FormatAddCommand(DbParameter[] parameters, DbCommand sqlcmd, string sqlAdd, int timeoutInMilliseconds) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/AWSMessagingGateway.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/AWSMessagingGateway.cs index 60fac28776..44b4ab4294 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/AWSMessagingGateway.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/AWSMessagingGateway.cs @@ -54,29 +54,27 @@ protected string EnsureTopic(RoutingKey topic, SnsAttributes attributes, TopicFi private void CreateTopic(RoutingKey topicName, SnsAttributes snsAttributes) { - using (var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region)) + using var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region); + var attributes = new Dictionary(); + if (snsAttributes != null) { - var attributes = new Dictionary(); - if (snsAttributes != null) - { - if (!string.IsNullOrEmpty(snsAttributes.DeliveryPolicy)) attributes.Add("DeliveryPolicy", snsAttributes.DeliveryPolicy); - if (!string.IsNullOrEmpty(snsAttributes.Policy)) attributes.Add("Policy", snsAttributes.Policy); - } + if (!string.IsNullOrEmpty(snsAttributes.DeliveryPolicy)) attributes.Add("DeliveryPolicy", snsAttributes.DeliveryPolicy); + if (!string.IsNullOrEmpty(snsAttributes.Policy)) attributes.Add("Policy", snsAttributes.Policy); + } - var createTopicRequest = new CreateTopicRequest(topicName) - { - Attributes = attributes, - Tags = new List {new Tag {Key = "Source", Value = "Brighter"}} - }; + var createTopicRequest = new CreateTopicRequest(topicName) + { + Attributes = attributes, + Tags = new List {new Tag {Key = "Source", Value = "Brighter"}} + }; - //create topic is idempotent, so safe to call even if topic already exists - var createTopic = snsClient.CreateTopicAsync(createTopicRequest).Result; + //create topic is idempotent, so safe to call even if topic already exists + var createTopic = snsClient.CreateTopicAsync(createTopicRequest).Result; - if (!string.IsNullOrEmpty(createTopic.TopicArn)) - ChannelTopicArn = createTopic.TopicArn; - else - throw new InvalidOperationException($"Could not create Topic topic: {topicName} on {_awsConnection.Region}"); - } + if (!string.IsNullOrEmpty(createTopic.TopicArn)) + ChannelTopicArn = createTopic.TopicArn; + else + throw new InvalidOperationException($"Could not create Topic topic: {topicName} on {_awsConnection.Region}"); } private void ValidateTopic(RoutingKey topic, TopicFindBy findTopicBy, OnMissingChannel onMissingChannel) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs index 42f10f37a1..f6818119a0 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs @@ -99,39 +99,37 @@ private void EnsureQueue() if (_subscription.MakeChannels == OnMissingChannel.Assume) return; - using (var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region)) - { - //Does the queue exist - this is an HTTP call, we should cache the results for a period of time - var queueName = _subscription.ChannelName.ToValidSQSQueueName(); - var topicName = _subscription.RoutingKey.ToValidSNSTopicName(); + using var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region); + //Does the queue exist - this is an HTTP call, we should cache the results for a period of time + var queueName = _subscription.ChannelName.ToValidSQSQueueName(); + var topicName = _subscription.RoutingKey.ToValidSNSTopicName(); - (bool exists, _) = QueueExists(sqsClient, queueName); - if (!exists) + (bool exists, _) = QueueExists(sqsClient, queueName); + if (!exists) + { + if (_subscription.MakeChannels == OnMissingChannel.Create) { - if (_subscription.MakeChannels == OnMissingChannel.Create) + if (_subscription.RedrivePolicy != null) { - if (_subscription.RedrivePolicy != null) - { - CreateDLQ(sqsClient); - } + CreateDLQ(sqsClient); + } - CreateQueue(sqsClient); + CreateQueue(sqsClient); - } - else if (_subscription.MakeChannels == OnMissingChannel.Validate) - { - var message = $"Queue does not exist: {queueName} for {topicName} on {_awsConnection.Region}"; - s_logger.LogDebug("Queue does not exist: {ChannelName} for {Topic} on {Region}", queueName, - topicName, _awsConnection.Region); - throw new QueueDoesNotExistException(message); - } } - else + else if (_subscription.MakeChannels == OnMissingChannel.Validate) { - s_logger.LogDebug("Queue exists: {ChannelName} subscribed to {Topic} on {Region}", - queueName, topicName, _awsConnection.Region); + var message = $"Queue does not exist: {queueName} for {topicName} on {_awsConnection.Region}"; + s_logger.LogDebug("Queue does not exist: {ChannelName} for {Topic} on {Region}", queueName, + topicName, _awsConnection.Region); + throw new QueueDoesNotExistException(message); } } + else + { + s_logger.LogDebug("Queue exists: {ChannelName} subscribed to {Topic} on {Region}", + queueName, topicName, _awsConnection.Region); + } } private void CreateQueue(AmazonSQSClient sqsClient) @@ -176,10 +174,8 @@ private void CreateQueue(AmazonSQSClient sqsClient) if (!string.IsNullOrEmpty(_queueUrl)) { s_logger.LogDebug("Queue created: {URL}", _queueUrl); - using (var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region)) - { - CheckSubscription(_subscription.MakeChannels, sqsClient, snsClient); - } + using var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region); + CheckSubscription(_subscription.MakeChannels, sqsClient, snsClient); } else { @@ -379,22 +375,20 @@ public void DeleteQueue() if (_subscription == null) return; - using (var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region)) - { - //Does the queue exist - this is an HTTP call, we should cache the results for a period of time - (bool exists, string name) queueExists = QueueExists(sqsClient, _subscription.ChannelName.ToValidSQSQueueName()); + using var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region); + //Does the queue exist - this is an HTTP call, we should cache the results for a period of time + (bool exists, string name) queueExists = QueueExists(sqsClient, _subscription.ChannelName.ToValidSQSQueueName()); - if (queueExists.exists) + if (queueExists.exists) + { + try { - try - { - sqsClient.DeleteQueueAsync(queueExists.name).Wait(); - } - catch (Exception) - { - //don't break on an exception here, if we can't delete, just exit - s_logger.LogError("Could not delete queue {ChannelName}", queueExists.name); - } + sqsClient.DeleteQueueAsync(queueExists.name).Wait(); + } + catch (Exception) + { + //don't break on an exception here, if we can't delete, just exit + s_logger.LogError("Could not delete queue {ChannelName}", queueExists.name); } } } @@ -404,22 +398,20 @@ public void DeleteTopic() if (_subscription == null) return; - using (var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region)) + using var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region); + (bool exists, string topicArn) = new ValidateTopicByArn(snsClient).Validate(ChannelTopicArn); + if (exists) { - (bool exists, string topicArn) = new ValidateTopicByArn(snsClient).Validate(ChannelTopicArn); - if (exists) + try { - try - { - UnsubscribeFromTopic(snsClient); + UnsubscribeFromTopic(snsClient); - DeleteTopic(snsClient); - } - catch (Exception) - { - //don't break on an exception here, if we can't delete, just exit - s_logger.LogError("Could not delete topic {TopicResourceName}", ChannelTopicArn); - } + DeleteTopic(snsClient); + } + catch (Exception) + { + //don't break on an exception here, if we can't delete, just exit + s_logger.LogError("Could not delete topic {TopicResourceName}", ChannelTopicArn); } } } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs index 64a601a54a..9ed794fe7c 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs @@ -148,14 +148,12 @@ public void Acknowledge(Message message) try { - using (var client = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region)) - { - var urlResponse = client.GetQueueUrlAsync(_queueName).Result; - client.DeleteMessageAsync(new DeleteMessageRequest(urlResponse.QueueUrl, receiptHandle)).Wait(); + using var client = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region); + var urlResponse = client.GetQueueUrlAsync(_queueName).Result; + client.DeleteMessageAsync(new DeleteMessageRequest(urlResponse.QueueUrl, receiptHandle)).Wait(); - s_logger.LogInformation("SqsMessageConsumer: Deleted the message {Id} with receipt handle {ReceiptHandle} on the queue {URL}", message.Id, receiptHandle, - urlResponse.QueueUrl); - } + s_logger.LogInformation("SqsMessageConsumer: Deleted the message {Id} with receipt handle {ReceiptHandle} on the queue {URL}", message.Id, receiptHandle, + urlResponse.QueueUrl); } catch (Exception exception) { @@ -182,17 +180,15 @@ public void Reject(Message message) message.Id, receiptHandle, _queueName ); - using (var client = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region)) + using var client = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region); + var urlResponse = client.GetQueueUrlAsync(_queueName).Result; + if (_hasDlq) { - var urlResponse = client.GetQueueUrlAsync(_queueName).Result; - if (_hasDlq) - { - client.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest(urlResponse.QueueUrl, receiptHandle, 0)).Wait(); - } - else - { - client.DeleteMessageAsync(urlResponse.QueueUrl, receiptHandle).Wait(); - } + client.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest(urlResponse.QueueUrl, receiptHandle, 0)).Wait(); + } + else + { + client.DeleteMessageAsync(urlResponse.QueueUrl, receiptHandle).Wait(); } } catch (Exception exception) @@ -209,15 +205,13 @@ public void Purge() { try { - using (var client = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region)) - { - s_logger.LogInformation("SqsMessageConsumer: Purging the queue {ChannelName}", _queueName); + using var client = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region); + s_logger.LogInformation("SqsMessageConsumer: Purging the queue {ChannelName}", _queueName); - var urlResponse = client.GetQueueUrlAsync(_queueName).Result; - client.PurgeQueueAsync(urlResponse.QueueUrl).Wait(); + var urlResponse = client.GetQueueUrlAsync(_queueName).Result; + client.PurgeQueueAsync(urlResponse.QueueUrl).Wait(); - s_logger.LogInformation("SqsMessageConsumer: Purged the queue {ChannelName}", _queueName); - } + s_logger.LogInformation("SqsMessageConsumer: Purged the queue {ChannelName}", _queueName); } catch (Exception exception) { @@ -262,13 +256,11 @@ public bool Requeue(Message message, int delayMilliseconds) private string FindTopicArnByName(RoutingKey topicName) { - using (var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region)) - { - var topic = snsClient.FindTopicAsync(topicName.Value).GetAwaiter().GetResult(); - if (topic == null) - throw new BrokerUnreachableException($"Unable to find a Topic ARN for {topicName.Value}"); - return topic.TopicArn; - } + using var snsClient = new AmazonSimpleNotificationServiceClient(_awsConnection.Credentials, _awsConnection.Region); + var topic = snsClient.FindTopicAsync(topicName.Value).GetAwaiter().GetResult(); + if (topic == null) + throw new BrokerUnreachableException($"Unable to find a Topic ARN for {topicName.Value}"); + return topic.TopicArn; } /// diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs index 2a9369813d..f3544b4705 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs @@ -80,21 +80,19 @@ public void Send(Message message) ConfirmTopicExists(message.Header.Topic); - using (var client = new AmazonSimpleNotificationServiceClient(_connection.Credentials, _connection.Region)) + using var client = new AmazonSimpleNotificationServiceClient(_connection.Credentials, _connection.Region); + var publisher = new SqsMessagePublisher(ChannelTopicArn, client); + var messageId = publisher.Publish(message); + if (messageId != null) { - var publisher = new SqsMessagePublisher(ChannelTopicArn, client); - var messageId = publisher.Publish(message); - if (messageId != null) - { - s_logger.LogDebug( - "SQSMessageProducer: Published message with topic {Topic}, Brighter messageId {MessageId} and SNS messageId {SNSMessageId}", - message.Header.Topic, message.Id, messageId); - return; - } - - throw new InvalidOperationException( - string.Format($"Failed to publish message with topic {message.Header.Topic} and id {message.Id} and message: {message.Body}")); + s_logger.LogDebug( + "SQSMessageProducer: Published message with topic {Topic}, Brighter messageId {MessageId} and SNS messageId {SNSMessageId}", + message.Header.Topic, message.Id, messageId); + return; } + + throw new InvalidOperationException( + string.Format($"Failed to publish message with topic {message.Header.Topic} and id {message.Id} and message: {message.Body}")); } /// diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs index 963a30b871..1c4283c41d 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessagingGateway.cs @@ -67,97 +67,93 @@ protected void EnsureTopic() private async Task MakeTopic() { - using (var adminClient = new AdminClientBuilder(_clientConfig).Build()) + using var adminClient = new AdminClientBuilder(_clientConfig).Build(); + try { - try + await adminClient.CreateTopicsAsync(new List { - await adminClient.CreateTopicsAsync(new List + new TopicSpecification { - new TopicSpecification - { - Name = Topic.Value, - NumPartitions = NumPartitions, - ReplicationFactor = ReplicationFactor - } - }); - } - catch (CreateTopicsException e) - { - if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) - { - throw new ChannelFailureException( - $"An error occured creating topic {Topic.Value}: {e.Results[0].Error.Reason}"); + Name = Topic.Value, + NumPartitions = NumPartitions, + ReplicationFactor = ReplicationFactor } - - s_logger.LogDebug("Topic {Topic} already exists", Topic.Value); + }); + } + catch (CreateTopicsException e) + { + if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) + { + throw new ChannelFailureException( + $"An error occured creating topic {Topic.Value}: {e.Results[0].Error.Reason}"); } + + s_logger.LogDebug("Topic {Topic} already exists", Topic.Value); } } private bool FindTopic() { - using (var adminClient = new AdminClientBuilder(_clientConfig).Build()) + using var adminClient = new AdminClientBuilder(_clientConfig).Build(); + try { - try - { - bool found = false; + bool found = false; - var metadata = adminClient.GetMetadata(Topic.Value, TimeSpan.FromMilliseconds(TopicFindTimeoutMs)); - //confirm we are in the list - var matchingTopics = metadata.Topics.Where(tp => tp.Topic == Topic.Value).ToArray(); - if (matchingTopics.Length > 0) - { - found = true; - var matchingTopic = matchingTopics[0]; + var metadata = adminClient.GetMetadata(Topic.Value, TimeSpan.FromMilliseconds(TopicFindTimeoutMs)); + //confirm we are in the list + var matchingTopics = metadata.Topics.Where(tp => tp.Topic == Topic.Value).ToArray(); + if (matchingTopics.Length > 0) + { + found = true; + var matchingTopic = matchingTopics[0]; - //was it really found? - found = matchingTopic.Error != null && matchingTopic.Error.Code != ErrorCode.UnknownTopicOrPart; - if (found) + //was it really found? + found = matchingTopic.Error != null && matchingTopic.Error.Code != ErrorCode.UnknownTopicOrPart; + if (found) + { + //is it in error, and does it have required number of partitions or replicas + bool inError = matchingTopic.Error != null && matchingTopic.Error.Code != ErrorCode.NoError; + bool matchingPartitions = matchingTopic.Partitions.Count == NumPartitions; + bool replicated = + matchingTopic.Partitions.All( + partition => partition.Replicas.Length == ReplicationFactor); + + bool valid = !inError && matchingPartitions && replicated; + + if (!valid) { - //is it in error, and does it have required number of partitions or replicas - bool inError = matchingTopic.Error != null && matchingTopic.Error.Code != ErrorCode.NoError; - bool matchingPartitions = matchingTopic.Partitions.Count == NumPartitions; - bool replicated = - matchingTopic.Partitions.All( - partition => partition.Replicas.Length == ReplicationFactor); + string error = "Topic exists but does not match publication: "; + //if topic is in error + if (inError) + { + error += $" topic is in error => {matchingTopic.Error.Reason};"; + } - bool valid = !inError && matchingPartitions && replicated; + if (!matchingPartitions) + { + error += + $"topic is misconfigured => NumPartitions should be {NumPartitions} but is {matchingTopic.Partitions.Count};"; + } - if (!valid) + if (!replicated) { - string error = "Topic exists but does not match publication: "; - //if topic is in error - if (inError) - { - error += $" topic is in error => {matchingTopic.Error.Reason};"; - } - - if (!matchingPartitions) - { - error += - $"topic is misconfigured => NumPartitions should be {NumPartitions} but is {matchingTopic.Partitions.Count};"; - } - - if (!replicated) - { - error += - $"topic is misconfigured => ReplicationFactor should be {ReplicationFactor} but is {matchingTopic.Partitions[0].Replicas.Length};"; - } - - s_logger.LogWarning(error); + error += + $"topic is misconfigured => ReplicationFactor should be {ReplicationFactor} but is {matchingTopic.Partitions[0].Replicas.Length};"; } + + s_logger.LogWarning(error); } } + } - if (found) - s_logger.LogInformation($"Topic {Topic.Value} exists"); + if (found) + s_logger.LogInformation($"Topic {Topic.Value} exists"); - return found; - } - catch (Exception e) - { - throw new ChannelFailureException($"Error finding topic {Topic.Value}", e); - } + return found; + } + catch (Exception e) + { + throw new ChannelFailureException($"Error finding topic {Topic.Value}", e); } } } diff --git a/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs b/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs index 830397468e..31cf9df114 100644 --- a/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs +++ b/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs @@ -57,11 +57,9 @@ public void Send(T message, string topic, int timeoutInMilliseconds = -1) var parameters = InitAddDbParameters(topic, message); - using (var connection = _connectionProvider.GetConnection()) - { - var sqlCmd = InitAddDbCommand(timeoutInMilliseconds, connection, parameters); - sqlCmd.ExecuteNonQuery(); - } + using var connection = _connectionProvider.GetConnection(); + var sqlCmd = InitAddDbCommand(timeoutInMilliseconds, connection, parameters); + sqlCmd.ExecuteNonQuery(); } /// @@ -78,11 +76,9 @@ public async Task SendAsync(T message, string topic, int timeoutInMilliseconds = var parameters = InitAddDbParameters(topic, message); - using (var connection = await _connectionProvider.GetConnectionAsync(cancellationToken)) - { - var sqlCmd = InitAddDbCommand(timeoutInMilliseconds, connection, parameters); - await sqlCmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - } + using var connection = await _connectionProvider.GetConnectionAsync(cancellationToken); + var sqlCmd = InitAddDbCommand(timeoutInMilliseconds, connection, parameters); + await sqlCmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); } /// @@ -118,18 +114,16 @@ private ReceivedResult TryReceive(string topic) var parameters = InitRemoveDbParameters(topic); - using (var connection = _connectionProvider.GetConnection()) - { - var sqlCmd = InitRemoveDbCommand(connection, parameters); - var reader = sqlCmd.ExecuteReader(); - if (!reader.Read()) - return ReceivedResult.Empty; - var json = (string) reader[0]; - var messageType = (string) reader[1]; - var id = (long) reader[3]; - var message = JsonSerializer.Deserialize(json, JsonSerialisationOptions.Options); - return new ReceivedResult(true, json, topic, messageType, id, message); - } + using var connection = _connectionProvider.GetConnection(); + var sqlCmd = InitRemoveDbCommand(connection, parameters); + var reader = sqlCmd.ExecuteReader(); + if (!reader.Read()) + return ReceivedResult.Empty; + var json = (string) reader[0]; + var messageType = (string) reader[1]; + var id = (long) reader[3]; + var message = JsonSerializer.Deserialize(json, JsonSerialisationOptions.Options); + return new ReceivedResult(true, json, topic, messageType, id, message); } /// @@ -145,19 +139,17 @@ public async Task> TryReceiveAsync(string topic, var parameters = InitRemoveDbParameters(topic); - using (var connection = await _connectionProvider.GetConnectionAsync(cancellationToken)) - { - var sqlCmd = InitRemoveDbCommand(connection, parameters); - var reader = await sqlCmd.ExecuteReaderAsync(cancellationToken) - .ConfigureAwait(ContinueOnCapturedContext); - if (!await reader.ReadAsync(cancellationToken)) - return ReceivedResult.Empty; - var json = (string) reader[0]; - var messageType = (string) reader[1]; - var id = (int) reader[3]; - var message = JsonSerializer.Deserialize(json, JsonSerialisationOptions.Options); - return new ReceivedResult(true, json, topic, messageType, id, message); - } + using var connection = await _connectionProvider.GetConnectionAsync(cancellationToken); + var sqlCmd = InitRemoveDbCommand(connection, parameters); + var reader = await sqlCmd.ExecuteReaderAsync(cancellationToken) + .ConfigureAwait(ContinueOnCapturedContext); + if (!await reader.ReadAsync(cancellationToken)) + return ReceivedResult.Empty; + var json = (string) reader[0]; + var messageType = (string) reader[1]; + var id = (int) reader[3]; + var message = JsonSerializer.Deserialize(json, JsonSerialisationOptions.Options); + return new ReceivedResult(true, json, topic, messageType, id, message); } public bool IsMessageReady(string topic) @@ -168,12 +160,10 @@ public bool IsMessageReady(string topic) public int NumberOfMessageReady(string topic) { var sql = $"select COUNT(*) from [{_configuration.QueueStoreTable}] where Topic='{topic}'"; - using (var connection = _connectionProvider.GetConnection()) - { - var sqlCmd = connection.CreateCommand(); - sqlCmd.CommandText = sql; - return (int) sqlCmd.ExecuteScalar(); - } + using var connection = _connectionProvider.GetConnection(); + var sqlCmd = connection.CreateCommand(); + sqlCmd.CommandText = sql; + return (int) sqlCmd.ExecuteScalar(); } /// @@ -183,11 +173,9 @@ public void Purge() { if (s_logger.IsEnabled(LogLevel.Debug)) s_logger.LogDebug("Purge()"); - using (var connection = _connectionProvider.GetConnection()) - { - var sqlCmd = InitPurgeDbCommand(connection); - sqlCmd.ExecuteNonQuery(); - } + using var connection = _connectionProvider.GetConnection(); + var sqlCmd = InitPurgeDbCommand(connection); + sqlCmd.ExecuteNonQuery(); } private static IDbDataParameter CreateDbDataParameter(string parameterName, object value) diff --git a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageConsumer.cs index 2f06fa8e1e..066d78e3e6 100644 --- a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageConsumer.cs @@ -92,12 +92,10 @@ public void Dispose() /// public void Purge() { - using (var client = Pool.Value.GetClient()) - { - s_logger.LogDebug("RmqMessageConsumer: Purging channel {ChannelName}", _queueName); - //This kills the queue, not the messages, which we assume expire - client.RemoveAllFromList(_queueName); - } + using var client = Pool.Value.GetClient(); + s_logger.LogDebug("RmqMessageConsumer: Purging channel {ChannelName}", _queueName); + //This kills the queue, not the messages, which we assume expire + client.RemoveAllFromList(_queueName); } /// @@ -169,22 +167,20 @@ public bool Requeue(Message message, int delayMilliseconds) message.Header.DelayedMilliseconds = delayMilliseconds; message.Header.HandledCount++; - using (var client = Pool.Value.GetClient()) + using var client = Pool.Value.GetClient(); + if (_inflight.ContainsKey(message.Id)) { - if (_inflight.ContainsKey(message.Id)) - { - var msgId = _inflight[message.Id]; - client.AddItemToList(_queueName, msgId); - var redisMsg = CreateRedisMessage(message); - StoreMessage(client, redisMsg, long.Parse(msgId)); - _inflight.Remove(message.Id); - return true; - } - else - { - s_logger.LogError(string.Format("Expected to find message id {0} in-flight but was not", message.Id.ToString())); - return false; - } + var msgId = _inflight[message.Id]; + client.AddItemToList(_queueName, msgId); + var redisMsg = CreateRedisMessage(message); + StoreMessage(client, redisMsg, long.Parse(msgId)); + _inflight.Remove(message.Id); + return true; + } + else + { + s_logger.LogError(string.Format("Expected to find message id {0} in-flight but was not", message.Id.ToString())); + return false; } } diff --git a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageCreator.cs index 531b5dc76e..1c3964a6d9 100644 --- a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageCreator.cs @@ -57,44 +57,41 @@ public Message CreateMessage(string redisMessage) { return message; } - - using (var reader = new StringReader(redisMessage)) + + using var reader = new StringReader(redisMessage); + var header = reader.ReadLine(); + if (header.TrimEnd() != ", but was {ErrorMessage}", redisMessage); - return message; - } - - var body = reader.ReadLine(); - if (body.TrimEnd() != ", but was {ErrorMessage}", redisMessage); + return message; + } - body = reader.ReadLine(); - if (body.TrimStart() != "BODY/>") - { - s_logger.LogError("Expected message to find end of BODY/>, but was {ErrorMessage}", redisMessage); - return message; - } + var body = reader.ReadLine(); + if (body.TrimEnd() != ", but was {ErrorMessage}", redisMessage); + return message; } + + message = new Message(messageHeader, messageBody); return message; } diff --git a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageGateway.cs b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageGateway.cs index 41f15894f5..3439fd9d56 100644 --- a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageGateway.cs +++ b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageGateway.cs @@ -63,10 +63,8 @@ protected static string CreateRedisMessage(Message message) { //Convert the message into something we can put out via Redis i.e. a string var redisMessage = RedisMessagePublisher.EMPTY_MESSAGE; - using (var redisMessageFactory = new RedisMessagePublisher()) - { - redisMessage = redisMessageFactory.Create(message); - } + using var redisMessageFactory = new RedisMessagePublisher(); + redisMessage = redisMessageFactory.Create(message); return redisMessage; } diff --git a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageProducer.cs index c6020ffbaf..c6e3c97706 100644 --- a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageProducer.cs @@ -81,25 +81,23 @@ public void Dispose() /// Task. public void Send(Message message) { - using (var client = Pool.Value.GetClient()) - { - Topic = message.Header.Topic; + using var client = Pool.Value.GetClient(); + Topic = message.Header.Topic; - s_logger.LogDebug("RedisMessageProducer: Preparing to send message"); + s_logger.LogDebug("RedisMessageProducer: Preparing to send message"); - var redisMessage = CreateRedisMessage(message); - - s_logger.LogDebug("RedisMessageProducer: Publishing message with topic {Topic} and id {Id} and body: {Request}", - message.Header.Topic, message.Id.ToString(), message.Body.Value); - //increment a counter to get the next message id - var nextMsgId = IncrementMessageCounter(client); - //store the message, against that id - StoreMessage(client, redisMessage, nextMsgId); - //If there are subscriber queues, push the message to the subscriber queues - var pushedTo = PushToQueues(client, nextMsgId); - s_logger.LogDebug("RedisMessageProducer: Published message with topic {Topic} and id {Id} and body: {Request} to queues: {3}", - message.Header.Topic, message.Id.ToString(), message.Body.Value, string.Join(", ", pushedTo)); - } + var redisMessage = CreateRedisMessage(message); + + s_logger.LogDebug("RedisMessageProducer: Publishing message with topic {Topic} and id {Id} and body: {Request}", + message.Header.Topic, message.Id.ToString(), message.Body.Value); + //increment a counter to get the next message id + var nextMsgId = IncrementMessageCounter(client); + //store the message, against that id + StoreMessage(client, redisMessage, nextMsgId); + //If there are subscriber queues, push the message to the subscriber queues + var pushedTo = PushToQueues(client, nextMsgId); + s_logger.LogDebug("RedisMessageProducer: Published message with topic {Topic} and id {Id} and body: {Request} to queues: {3}", + message.Header.Topic, message.Id.ToString(), message.Body.Value, string.Join(", ", pushedTo)); } /// diff --git a/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs b/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs index f77bd3dfac..11e343c633 100644 --- a/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs +++ b/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs @@ -82,32 +82,30 @@ protected override void WriteToStore( if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = transactionProvider.GetTransaction(); + command.ExecuteNonQuery(); + } + catch (SqlException sqlException) + { + if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || + sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = transactionProvider.GetTransaction(); - command.ExecuteNonQuery(); + loggingAction.Invoke(); + return; } - catch (SqlException sqlException) - { - if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || - sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) - { - loggingAction.Invoke(); - return; - } - throw; - } - finally - { - if (transactionProvider != null) - transactionProvider.Close(); - else - connection.Close(); - } + throw; + } + finally + { + if (transactionProvider != null) + transactionProvider.Close(); + else + connection.Close(); } } @@ -126,32 +124,30 @@ protected override async Task WriteToStoreAsync( if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try + { + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = transactionProvider.GetTransaction(); + await command.ExecuteNonQueryAsync(cancellationToken); + } + catch (SqlException sqlException) { - try + if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || + sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = transactionProvider.GetTransaction(); - await command.ExecuteNonQueryAsync(cancellationToken); + loggingAction.Invoke(); + return; } - catch (SqlException sqlException) - { - if (sqlException.Number == MsSqlDuplicateKeyError_UniqueIndexViolation || - sqlException.Number == MsSqlDuplicateKeyError_UniqueConstraintViolation) - { - loggingAction.Invoke(); - return; - } - throw; - } - finally - { - if (transactionProvider != null) - transactionProvider.Close(); - else - connection.Close(); - } + throw; + } + finally + { + if (transactionProvider != null) + transactionProvider.Close(); + else + connection.Close(); } } @@ -164,16 +160,14 @@ Func resultFunc if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - return resultFunc.Invoke(command.ExecuteReader()); - } - finally - { - connection.Close(); - } + return resultFunc.Invoke(command.ExecuteReader()); + } + finally + { + connection.Close(); } } @@ -187,16 +181,14 @@ CancellationToken cancellationToken if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); - } - finally - { - connection.Close(); - } + return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); + } + finally + { + connection.Close(); } } diff --git a/src/Paramore.Brighter.Outbox.MySql/MySqlOutbox.cs b/src/Paramore.Brighter.Outbox.MySql/MySqlOutbox.cs index fd702faa52..15af35a6f9 100644 --- a/src/Paramore.Brighter.Outbox.MySql/MySqlOutbox.cs +++ b/src/Paramore.Brighter.Outbox.MySql/MySqlOutbox.cs @@ -85,29 +85,27 @@ Action loggingAction if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = transactionProvider.GetTransaction(); + command.ExecuteNonQuery(); + } + catch (MySqlException sqlException) + { + if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = transactionProvider.GetTransaction(); - command.ExecuteNonQuery(); + s_logger.LogWarning( + "MsSqlOutbox: A duplicate was detected in the batch"); + return; } - catch (MySqlException sqlException) - { - if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) - { - s_logger.LogWarning( - "MsSqlOutbox: A duplicate was detected in the batch"); - return; - } - throw; - } - finally - { - transactionProvider?.Close(); - } + throw; + } + finally + { + transactionProvider?.Close(); } } @@ -127,29 +125,27 @@ CancellationToken cancellationToken if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try + { + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = await transactionProvider.GetTransactionAsync(cancellationToken); + await command.ExecuteNonQueryAsync(cancellationToken); + } + catch (MySqlException sqlException) { - try + if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = await transactionProvider.GetTransactionAsync(cancellationToken); - await command.ExecuteNonQueryAsync(cancellationToken); + s_logger.LogWarning( + "MsSqlOutbox: A duplicate was detected in the batch"); + return; } - catch (MySqlException sqlException) - { - if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) - { - s_logger.LogWarning( - "MsSqlOutbox: A duplicate was detected in the batch"); - return; - } - throw; - } - finally - { - transactionProvider?.Close(); - } + throw; + } + finally + { + transactionProvider?.Close(); } } @@ -162,16 +158,14 @@ Func resultFunc if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - return resultFunc.Invoke(command.ExecuteReader()); - } - finally - { - connection.Close(); - } + return resultFunc.Invoke(command.ExecuteReader()); + } + finally + { + connection.Close(); } } @@ -184,16 +178,14 @@ protected override async Task ReadFromStoreAsync( if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); - } - finally - { - connection.Close(); - } + return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); + } + finally + { + connection.Close(); } } @@ -407,22 +399,20 @@ private Message MapAMessage(IDataReader dr) private byte[] GetBodyAsBytes(MySqlDataReader dr) { var i = dr.GetOrdinal("Body"); - using (var ms = new MemoryStream()) + using var ms = new MemoryStream(); + var buffer = new byte[1024]; + int offset = 0; + var bytesRead = dr.GetBytes(i, offset, buffer, 0, 1024); + while (bytesRead > 0) { - var buffer = new byte[1024]; - int offset = 0; - var bytesRead = dr.GetBytes(i, offset, buffer, 0, 1024); - while (bytesRead > 0) - { - ms.Write(buffer, offset, (int)bytesRead); - offset += (int)bytesRead; - bytesRead = dr.GetBytes(i, offset, buffer, 0, 1024); - } - - ms.Flush(); - var body = ms.ToArray(); - return body; + ms.Write(buffer, offset, (int)bytesRead); + offset += (int)bytesRead; + bytesRead = dr.GetBytes(i, offset, buffer, 0, 1024); } + + ms.Flush(); + var body = ms.ToArray(); + return body; } private static string GetBodyAsString(IDataReader dr) diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutbox.cs b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutbox.cs index aa5d0402a5..13c2876f71 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutbox.cs +++ b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutbox.cs @@ -89,28 +89,26 @@ protected override void WriteToStore( if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = transactionProvider.GetTransaction(); + command.ExecuteNonQuery(); + } + catch (PostgresException sqlException) + { + if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = transactionProvider.GetTransaction(); - command.ExecuteNonQuery(); + loggingAction.Invoke(); + return; } - catch (PostgresException sqlException) - { - if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) - { - loggingAction.Invoke(); - return; - } - throw; - } - finally - { - transactionProvider?.Close(); - } + throw; + } + finally + { + transactionProvider?.Close(); } } @@ -129,32 +127,30 @@ protected override async Task WriteToStoreAsync( if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = transactionProvider.GetTransaction(); - await command.ExecuteNonQueryAsync(cancellationToken); - } - catch (PostgresException sqlException) - { - if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) - { - s_logger.LogWarning( - "PostgresSqlOutbox: A duplicate was detected in the batch"); - return; - } - - throw; - } - finally + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = transactionProvider.GetTransaction(); + await command.ExecuteNonQueryAsync(cancellationToken); + } + catch (PostgresException sqlException) + { + if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) { - if (transactionProvider != null) - transactionProvider.Close(); - else - connection.Close(); + s_logger.LogWarning( + "PostgresSqlOutbox: A duplicate was detected in the batch"); + return; } + + throw; + } + finally + { + if (transactionProvider != null) + transactionProvider.Close(); + else + connection.Close(); } } @@ -167,16 +163,14 @@ Func resultFunc if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - return resultFunc.Invoke(command.ExecuteReader()); - } - finally - { - connection.Close(); - } + return resultFunc.Invoke(command.ExecuteReader()); + } + finally + { + connection.Close(); } } @@ -190,16 +184,14 @@ CancellationToken cancellationToken if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); - } - finally - { - connection.Close(); - } + return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); + } + finally + { + connection.Close(); } } diff --git a/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutbox.cs b/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutbox.cs index d3091edd68..7e0daa6eb6 100644 --- a/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutbox.cs +++ b/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutbox.cs @@ -86,31 +86,29 @@ Action loggingAction if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = transactionProvider.GetTransaction(); + command.ExecuteNonQuery(); + } + catch (SqliteException sqlException) + { + if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = transactionProvider.GetTransaction(); - command.ExecuteNonQuery(); + loggingAction.Invoke(); + return; } - catch (SqliteException sqlException) - { - if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) - { - loggingAction.Invoke(); - return; - } - throw; - } - finally - { - if (transactionProvider != null) - transactionProvider.Close(); - else - connection.Close(); - } + throw; + } + finally + { + if (transactionProvider != null) + transactionProvider.Close(); + else + connection.Close(); } } @@ -128,35 +126,33 @@ protected override async Task WriteToStoreAsync( if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try + if (transactionProvider != null && transactionProvider.HasOpenTransaction) + command.Transaction = await transactionProvider.GetTransactionAsync(cancellationToken); + await command.ExecuteNonQueryAsync(cancellationToken); + } + catch (SqliteException sqlException) + { + if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) { - if (transactionProvider != null && transactionProvider.HasOpenTransaction) - command.Transaction = await transactionProvider.GetTransactionAsync(cancellationToken); - await command.ExecuteNonQueryAsync(cancellationToken); + loggingAction.Invoke(); + return; } - catch (SqliteException sqlException) - { - if (IsExceptionUnqiueOrDuplicateIssue(sqlException)) - { - loggingAction.Invoke(); - return; - } - throw; - } - finally - { - if (transactionProvider != null) - transactionProvider.Close(); - else + throw; + } + finally + { + if (transactionProvider != null) + transactionProvider.Close(); + else #if NETSTANDARD2_0 connection.Close(); #else - await connection.CloseAsync(); + await connection.CloseAsync(); #endif - } } } @@ -169,16 +165,14 @@ Func resultFunc if (connection.State != ConnectionState.Open) connection.Open(); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try { - try - { - return resultFunc.Invoke(command.ExecuteReader()); - } - finally - { - connection.Close(); - } + return resultFunc.Invoke(command.ExecuteReader()); + } + finally + { + connection.Close(); } } @@ -191,20 +185,18 @@ protected override async Task ReadFromStoreAsync( if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken); - using (var command = commandFunc.Invoke(connection)) + using var command = commandFunc.Invoke(connection); + try + { + return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); + } + finally { - try - { - return await resultFunc.Invoke(await command.ExecuteReaderAsync(cancellationToken)); - } - finally - { #if NETSTANDARD2_0 connection.Close(); #else - await connection.CloseAsync(); + await connection.CloseAsync(); #endif - } } } diff --git a/src/Paramore.Brighter.Tranformers.AWS/S3LuggageStore.cs b/src/Paramore.Brighter.Tranformers.AWS/S3LuggageStore.cs index b8d90658fe..f4766ee095 100644 --- a/src/Paramore.Brighter.Tranformers.AWS/S3LuggageStore.cs +++ b/src/Paramore.Brighter.Tranformers.AWS/S3LuggageStore.cs @@ -292,13 +292,11 @@ private static async Task BucketExistsAsync(IHttpClientFactory httpClientF { var httpClient = httpClientFactory.CreateClient(); httpClient.BaseAddress = new Uri($"https://{bucketName}.s3.{bucketRegion.Value}.amazonaws.com"); - using (var headRequest = new HttpRequestMessage(HttpMethod.Head, @"/")) - { - headRequest.Headers.Add("x-amz-expected-bucket-owner", accountId); - var response = await httpClient.SendAsync(headRequest); - //If we deny public access to the bucket, but it exists we get access denied; we get not-found if it does not exist - return (response.IsSuccessStatusCode || response.StatusCode == HttpStatusCode.Forbidden); - } + using var headRequest = new HttpRequestMessage(HttpMethod.Head, @"/"); + headRequest.Headers.Add("x-amz-expected-bucket-owner", accountId); + var response = await httpClient.SendAsync(headRequest); + //If we deny public access to the bucket, but it exists we get access denied; we get not-found if it does not exist + return (response.IsSuccessStatusCode || response.StatusCode == HttpStatusCode.Forbidden); } private static async Task CreateBucketAsync( diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index 608edcc1e0..e1dbb1bd55 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -221,27 +221,25 @@ public void Send(T command) where T : class, IRequest requestContext.Policies = _policyRegistry; requestContext.FeatureSwitches = _featureSwitchRegistry; - using (var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactorySync, _inboxConfiguration)) + using var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactorySync, _inboxConfiguration); + try { - try - { - s_logger.LogInformation("Building send pipeline for command: {CommandType} {Id}", command.GetType(), - command.Id); - var handlerChain = builder.Build(requestContext); + s_logger.LogInformation("Building send pipeline for command: {CommandType} {Id}", command.GetType(), + command.Id); + var handlerChain = builder.Build(requestContext); - AssertValidSendPipeline(command, handlerChain.Count()); + AssertValidSendPipeline(command, handlerChain.Count()); - handlerChain.First().Handle(command); - } - catch (Exception) - { - span.span?.SetStatus(ActivityStatusCode.Error); - throw; - } - finally - { - EndSpan(span.span); - } + handlerChain.First().Handle(command); + } + catch (Exception) + { + span.span?.SetStatus(ActivityStatusCode.Error); + throw; + } + finally + { + EndSpan(span.span); } } @@ -266,28 +264,26 @@ public async Task SendAsync(T command, bool continueOnCapturedContext = false requestContext.Policies = _policyRegistry; requestContext.FeatureSwitches = _featureSwitchRegistry; - using (var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactoryAsync, _inboxConfiguration)) + using var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactoryAsync, _inboxConfiguration); + try { - try - { - s_logger.LogInformation("Building send async pipeline for command: {CommandType} {Id}", - command.GetType(), command.Id); - var handlerChain = builder.BuildAsync(requestContext, continueOnCapturedContext); + s_logger.LogInformation("Building send async pipeline for command: {CommandType} {Id}", + command.GetType(), command.Id); + var handlerChain = builder.BuildAsync(requestContext, continueOnCapturedContext); - AssertValidSendPipeline(command, handlerChain.Count()); + AssertValidSendPipeline(command, handlerChain.Count()); - await handlerChain.First().HandleAsync(command, cancellationToken) - .ConfigureAwait(continueOnCapturedContext); - } - catch (Exception) - { - span.span?.SetStatus(ActivityStatusCode.Error); - throw; - } - finally - { - EndSpan(span.span); - } + await handlerChain.First().HandleAsync(command, cancellationToken) + .ConfigureAwait(continueOnCapturedContext); + } + catch (Exception) + { + span.span?.SetStatus(ActivityStatusCode.Error); + throw; + } + finally + { + EndSpan(span.span); } } @@ -312,43 +308,41 @@ public void Publish(T @event) where T : class, IRequest requestContext.Policies = _policyRegistry; requestContext.FeatureSwitches = _featureSwitchRegistry; - using (var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactorySync, _inboxConfiguration)) - { - s_logger.LogInformation("Building send pipeline for event: {EventType} {Id}", @event.GetType(), - @event.Id); - var handlerChain = builder.Build(requestContext); + using var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactorySync, _inboxConfiguration); + s_logger.LogInformation("Building send pipeline for event: {EventType} {Id}", @event.GetType(), + @event.Id); + var handlerChain = builder.Build(requestContext); - var handlerCount = handlerChain.Count(); + var handlerCount = handlerChain.Count(); - s_logger.LogInformation("Found {HandlerCount} pipelines for event: {EventType} {Id}", handlerCount, - @event.GetType(), @event.Id); + s_logger.LogInformation("Found {HandlerCount} pipelines for event: {EventType} {Id}", handlerCount, + @event.GetType(), @event.Id); - var exceptions = new List(); - foreach (var handleRequests in handlerChain) + var exceptions = new List(); + foreach (var handleRequests in handlerChain) + { + try { - try - { - handleRequests.Handle(@event); - } - catch (Exception e) - { - exceptions.Add(e); - } + handleRequests.Handle(@event); } - - if (span.created) + catch (Exception e) { - if (exceptions.Any()) - span.span?.SetStatus(ActivityStatusCode.Error); - EndSpan(span.span); + exceptions.Add(e); } + } + if (span.created) + { if (exceptions.Any()) - { - throw new AggregateException( - "Failed to publish to one more handlers successfully, see inner exceptions for details", - exceptions); - } + span.span?.SetStatus(ActivityStatusCode.Error); + EndSpan(span.span); + } + + if (exceptions.Any()) + { + throw new AggregateException( + "Failed to publish to one more handlers successfully, see inner exceptions for details", + exceptions); } } @@ -380,44 +374,42 @@ public async Task PublishAsync( requestContext.Policies = _policyRegistry; requestContext.FeatureSwitches = _featureSwitchRegistry; - using (var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactoryAsync, _inboxConfiguration)) - { - s_logger.LogInformation("Building send async pipeline for event: {EventType} {Id}", @event.GetType(), - @event.Id); + using var builder = new PipelineBuilder(_subscriberRegistry, _handlerFactoryAsync, _inboxConfiguration); + s_logger.LogInformation("Building send async pipeline for event: {EventType} {Id}", @event.GetType(), + @event.Id); - var handlerChain = builder.BuildAsync(requestContext, continueOnCapturedContext); - var handlerCount = handlerChain.Count(); + var handlerChain = builder.BuildAsync(requestContext, continueOnCapturedContext); + var handlerCount = handlerChain.Count(); - s_logger.LogInformation("Found {0} async pipelines for event: {EventType} {Id}", handlerCount, - @event.GetType(), @event.Id); + s_logger.LogInformation("Found {0} async pipelines for event: {EventType} {Id}", handlerCount, + @event.GetType(), @event.Id); - var exceptions = new List(); - foreach (var handler in handlerChain) + var exceptions = new List(); + foreach (var handler in handlerChain) + { + try { - try - { - await handler.HandleAsync(@event, cancellationToken).ConfigureAwait(continueOnCapturedContext); - } - catch (Exception e) - { - exceptions.Add(e); - } + await handler.HandleAsync(@event, cancellationToken).ConfigureAwait(continueOnCapturedContext); } - - - if (span.created) + catch (Exception e) { - if (exceptions.Any()) - span.span?.SetStatus(ActivityStatusCode.Error); - EndSpan(span.span); + exceptions.Add(e); } + } - if (exceptions.Count > 0) - { - throw new AggregateException( - "Failed to async publish to one more handlers successfully, see inner exceptions for details", - exceptions); - } + + if (span.created) + { + if (exceptions.Any()) + span.span?.SetStatus(ActivityStatusCode.Error); + EndSpan(span.span); + } + + if (exceptions.Count > 0) + { + throw new AggregateException( + "Failed to async publish to one more handlers successfully, see inner exceptions for details", + exceptions); } } @@ -797,43 +789,41 @@ public TResponse Call(T request, int timeOutInMilliseconds) subscription.ChannelName = new ChannelName(channelName.ToString()); subscription.RoutingKey = new RoutingKey(routingKey); - using (var responseChannel = _responseChannelFactory.CreateChannel(subscription)) - { - s_logger.LogInformation("Create reply queue for topic {ChannelName}", channelName); - request.ReplyAddress.Topic = routingKey; - request.ReplyAddress.CorrelationId = channelName.ToString(); + using var responseChannel = _responseChannelFactory.CreateChannel(subscription); + s_logger.LogInformation("Create reply queue for topic {ChannelName}", channelName); + request.ReplyAddress.Topic = routingKey; + request.ReplyAddress.CorrelationId = channelName.ToString(); - //we do this to create the channel on the broker, or we won't have anything to send to; we - //retry in case the subscription is poor. An alternative would be to extract the code from - //the channel to create the subscription, but this does not do much on a new queue - _bus.Retry(() => responseChannel.Purge()); + //we do this to create the channel on the broker, or we won't have anything to send to; we + //retry in case the subscription is poor. An alternative would be to extract the code from + //the channel to create the subscription, but this does not do much on a new queue + _bus.Retry(() => responseChannel.Purge()); - var outMessage = _bus.CreateMessageFromRequest(request); + var outMessage = _bus.CreateMessageFromRequest(request); - //We don't store the message, if we continue to fail further retry is left to the sender - //s_logger.LogDebug("Sending request with routingkey {0}", routingKey); - s_logger.LogDebug("Sending request with routingkey {ChannelName}", channelName); - _bus.CallViaExternalBus(outMessage); + //We don't store the message, if we continue to fail further retry is left to the sender + //s_logger.LogDebug("Sending request with routingkey {0}", routingKey); + s_logger.LogDebug("Sending request with routingkey {ChannelName}", channelName); + _bus.CallViaExternalBus(outMessage); - Message responseMessage = null; + Message responseMessage = null; - //now we block on the receiver to try and get the message, until timeout. - s_logger.LogDebug("Awaiting response on {ChannelName}", channelName); - _bus.Retry(() => responseMessage = responseChannel.Receive(timeOutInMilliseconds)); + //now we block on the receiver to try and get the message, until timeout. + s_logger.LogDebug("Awaiting response on {ChannelName}", channelName); + _bus.Retry(() => responseMessage = responseChannel.Receive(timeOutInMilliseconds)); - TResponse response = default(TResponse); - if (responseMessage.Header.MessageType != MessageType.MT_NONE) - { - s_logger.LogDebug("Reply received from {ChannelName}", channelName); - //map to request is map to a response, but it is a request from consumer point of view. Confusing, but... - _bus.CreateRequestFromMessage(responseMessage, out response); - Send(response); - } + TResponse response = default(TResponse); + if (responseMessage.Header.MessageType != MessageType.MT_NONE) + { + s_logger.LogDebug("Reply received from {ChannelName}", channelName); + //map to request is map to a response, but it is a request from consumer point of view. Confusing, but... + _bus.CreateRequestFromMessage(responseMessage, out response); + Send(response); + } - s_logger.LogInformation("Deleting queue for routingkey: {ChannelName}", channelName); + s_logger.LogInformation("Deleting queue for routingkey: {ChannelName}", channelName); - return response; - } //clean up everything at this point, whatever happens + return response; } /// diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_requeueing_redrives_to_the_dlq.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_requeueing_redrives_to_the_dlq.cs index 4ab1074352..d883a0069d 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_requeueing_redrives_to_the_dlq.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_requeueing_redrives_to_the_dlq.cs @@ -90,26 +90,22 @@ public void Dispose() public int GetDLQCount(string queueName) { - using(var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region)) + using var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region); + var queueUrlResponse = sqsClient.GetQueueUrlAsync(queueName).GetAwaiter().GetResult(); + var response = sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest { - var queueUrlResponse = sqsClient.GetQueueUrlAsync(queueName).GetAwaiter().GetResult(); - var response = sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest - { - QueueUrl = queueUrlResponse.QueueUrl, - WaitTimeSeconds = 5, - AttributeNames = new List { "ApproximateReceiveCount" }, - MessageAttributeNames = new List { "All" } - }).GetAwaiter().GetResult(); - - if (response.HttpStatusCode != HttpStatusCode.OK) - { - throw new AmazonSQSException($"Failed to GetMessagesAsync for queue {queueName}. Response: {response.HttpStatusCode}"); - } - - return response.Messages.Count; + QueueUrl = queueUrlResponse.QueueUrl, + WaitTimeSeconds = 5, + AttributeNames = new List { "ApproximateReceiveCount" }, + MessageAttributeNames = new List { "All" } + }).GetAwaiter().GetResult(); + if (response.HttpStatusCode != HttpStatusCode.OK) + { + throw new AmazonSQSException($"Failed to GetMessagesAsync for queue {queueName}. Response: {response.HttpStatusCode}"); } - + + return response.Messages.Count; } } diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_throwing_defer_action_respect_redrive.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_throwing_defer_action_respect_redrive.cs index ca3952c2f3..749a9d7618 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_throwing_defer_action_respect_redrive.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_throwing_defer_action_respect_redrive.cs @@ -114,24 +114,22 @@ public SnsReDrivePolicySDlqTests() public int GetDLQCount(string queueName) { - using (var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region)) + using var sqsClient = new AmazonSQSClient(_awsConnection.Credentials, _awsConnection.Region); + var queueUrlResponse = sqsClient.GetQueueUrlAsync(queueName).GetAwaiter().GetResult(); + var response = sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest { - var queueUrlResponse = sqsClient.GetQueueUrlAsync(queueName).GetAwaiter().GetResult(); - var response = sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest - { - QueueUrl = queueUrlResponse.QueueUrl, - WaitTimeSeconds = 5, - AttributeNames = new List { "ApproximateReceiveCount" }, - MessageAttributeNames = new List { "All" } - }).GetAwaiter().GetResult(); - - if (response.HttpStatusCode != HttpStatusCode.OK) - { - throw new AmazonSQSException($"Failed to GetMessagesAsync for queue {queueName}. Response: {response.HttpStatusCode}"); - } - - return response.Messages.Count; + QueueUrl = queueUrlResponse.QueueUrl, + WaitTimeSeconds = 5, + AttributeNames = new List { "ApproximateReceiveCount" }, + MessageAttributeNames = new List { "All" } + }).GetAwaiter().GetResult(); + + if (response.HttpStatusCode != HttpStatusCode.OK) + { + throw new AmazonSQSException($"Failed to GetMessagesAsync for queue {queueName}. Response: {response.HttpStatusCode}"); } + + return response.Messages.Count; } diff --git a/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline.cs b/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline.cs index 23d0d12658..7ced38a71f 100644 --- a/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline.cs +++ b/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline.cs @@ -27,37 +27,33 @@ public CommandProcessorWithLoggingInPipelineTests(ITestOutputHelper output) public void When_A_Request_Logger_Is_In_The_Pipeline() { Log.Logger = new LoggerConfiguration().MinimumLevel.Information().WriteTo.TestCorrelator().CreateLogger(); - using (var context = TestCorrelator.CreateContext()) - { - var myCommand = new MyCommand(); + using var context = TestCorrelator.CreateContext(); + var myCommand = new MyCommand(); - var registry = new SubscriberRegistry(); - registry.Register>(); + var registry = new SubscriberRegistry(); + registry.Register>(); - var requestLogger = new RequestLoggingHandler(); + var requestLogger = new RequestLoggingHandler(); - var container = new ServiceCollection(); - container.AddTransient(); - container.AddTransient(typeof(RequestLoggingHandler), provider => requestLogger); + var container = new ServiceCollection(); + container.AddTransient(); + container.AddTransient(typeof(RequestLoggingHandler), provider => requestLogger); - var handlerFactory = new ServiceProviderHandlerFactory(container.BuildServiceProvider()); + var handlerFactory = new ServiceProviderHandlerFactory(container.BuildServiceProvider()); - var commandProcessor = new CommandProcessor(registry, handlerFactory: handlerFactory, - new InMemoryRequestContextFactory(), new PolicyRegistry()); + var commandProcessor = new CommandProcessor(registry, handlerFactory: handlerFactory, + new InMemoryRequestContextFactory(), new PolicyRegistry()); - commandProcessor.Send(myCommand); + commandProcessor.Send(myCommand); - //_should_log_the_request_handler_call - //_should_log_the_type_of_handler_in_the_call - - //TestCorrelator.GetLogEventsFromCurrentContext().Should().HaveCount(3); - TestCorrelator.GetLogEventsFromContextGuid(context.Guid) - .Should().Contain(x => x.MessageTemplate.Text.StartsWith("Logging handler pipeline call")) - .Which.Properties["1"].ToString().Should().Be($"\"{typeof(MyCommand)}\""); - - } + //_should_log_the_request_handler_call + //_should_log_the_type_of_handler_in_the_call + //TestCorrelator.GetLogEventsFromCurrentContext().Should().HaveCount(3); + TestCorrelator.GetLogEventsFromContextGuid(context.Guid) + .Should().Contain(x => x.MessageTemplate.Text.StartsWith("Logging handler pipeline call")) + .Which.Properties["1"].ToString().Should().Be($"\"{typeof(MyCommand)}\""); } public void Dispose() diff --git a/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline_Async.cs b/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline_Async.cs index a9a8f6094a..f59c1964a9 100644 --- a/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/Logging/When_A_Request_Logger_Is_In_The_Pipeline_Async.cs @@ -29,32 +29,29 @@ public CommandProcessorWithLoggingInPipelineAsyncTests(ITestOutputHelper output) public async Task When_A_Request_Logger_Is_In_The_Pipeline_Async() { Log.Logger = new LoggerConfiguration().MinimumLevel.Information().WriteTo.TestCorrelator().CreateLogger(); - using (var context = TestCorrelator.CreateContext()) - { - var myCommand = new MyCommand(); + using var context = TestCorrelator.CreateContext(); + var myCommand = new MyCommand(); - var registry = new SubscriberRegistry(); - registry.RegisterAsync(); + var registry = new SubscriberRegistry(); + registry.RegisterAsync(); - var container = new ServiceCollection(); - container.AddTransient(); - container.AddTransient(typeof(RequestLoggingHandlerAsync<>), typeof(RequestLoggingHandlerAsync<>)); + var container = new ServiceCollection(); + container.AddTransient(); + container.AddTransient(typeof(RequestLoggingHandlerAsync<>), typeof(RequestLoggingHandlerAsync<>)); - var handlerFactory = new ServiceProviderHandlerFactory(container.BuildServiceProvider()); + var handlerFactory = new ServiceProviderHandlerFactory(container.BuildServiceProvider()); - var commandProcessor = new CommandProcessor(registry, handlerFactory, - new InMemoryRequestContextFactory(), new PolicyRegistry()); + var commandProcessor = new CommandProcessor(registry, handlerFactory, + new InMemoryRequestContextFactory(), new PolicyRegistry()); - await commandProcessor.SendAsync(myCommand); + await commandProcessor.SendAsync(myCommand); - //_should_log_the_request_handler_call - //_should_log_the_type_of_handler_in_the_call - TestCorrelator.GetLogEventsFromContextGuid(context.Guid) - .Should().Contain(x => x.MessageTemplate.Text.StartsWith("Logging handler pipeline call")) - .Which.Properties["1"].ToString().Should().Be($"\"{typeof(MyCommand)}\""); - - } + //_should_log_the_request_handler_call + //_should_log_the_type_of_handler_in_the_call + TestCorrelator.GetLogEventsFromContextGuid(context.Guid) + .Should().Contain(x => x.MessageTemplate.Text.StartsWith("Logging handler pipeline call")) + .Which.Properties["1"].ToString().Should().Be($"\"{typeof(MyCommand)}\""); } public void Dispose() diff --git a/tests/Paramore.Brighter.MSSQL.Tests/MsSqlTestHelper.cs b/tests/Paramore.Brighter.MSSQL.Tests/MsSqlTestHelper.cs index a461fb503b..598f6c110e 100644 --- a/tests/Paramore.Brighter.MSSQL.Tests/MsSqlTestHelper.cs +++ b/tests/Paramore.Brighter.MSSQL.Tests/MsSqlTestHelper.cs @@ -66,18 +66,14 @@ public MsSqlTestHelper(bool binaryMessagePayload = false) public void CreateDatabase() { - using (var connection = _masterConnectionProvider.GetConnection()) - { - using (var command = connection.CreateCommand()) - { - command.CommandText = @" + using var connection = _masterConnectionProvider.GetConnection(); + using var command = connection.CreateCommand(); + command.CommandText = @" IF DB_ID('BrighterTests') IS NULL BEGIN CREATE DATABASE BrighterTests; END;"; - command.ExecuteNonQuery(); - } - } + command.ExecuteNonQuery(); } public void SetupMessageDb() @@ -116,48 +112,36 @@ private void CreateQueueTable() public void CleanUpDb() { - using (var connection = _connectionProvider.GetConnection()) - { - using (var command = connection.CreateCommand()) - { - command.CommandText = $@" + using var connection = _connectionProvider.GetConnection(); + using var command = connection.CreateCommand(); + command.CommandText = $@" IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'{_tableName}') AND type in (N'U')) BEGIN DROP TABLE {_tableName} END;"; - command.ExecuteNonQuery(); - } - } + command.ExecuteNonQuery(); } public void CreateOutboxTable() { - using (var connection = _connectionProvider.GetConnection()) - { - _tableName = $"[message_{_tableName}]"; - var createTableSql = SqlOutboxBuilder.GetDDL(_tableName, _binaryMessagePayload); - - using (var command = connection.CreateCommand()) - { - command.CommandText = createTableSql; - command.ExecuteNonQuery(); - } - } + using var connection = _connectionProvider.GetConnection(); + _tableName = $"[message_{_tableName}]"; + var createTableSql = SqlOutboxBuilder.GetDDL(_tableName, _binaryMessagePayload); + + using var command = connection.CreateCommand(); + command.CommandText = createTableSql; + command.ExecuteNonQuery(); } public void CreateInboxTable() { - using (var connection = _connectionProvider.GetConnection()) - { - _tableName = $"[command_{_tableName}]"; - var createTableSql = SqlInboxBuilder.GetDDL(_tableName); - - using (var command = connection.CreateCommand()) - { - command.CommandText = createTableSql; - command.ExecuteNonQuery(); - } - } + using var connection = _connectionProvider.GetConnection(); + _tableName = $"[command_{_tableName}]"; + var createTableSql = SqlInboxBuilder.GetDDL(_tableName); + + using var command = connection.CreateCommand(); + command.CommandText = createTableSql; + command.ExecuteNonQuery(); } } diff --git a/tests/Paramore.Brighter.MySQL.Tests/MySqlTestHelper.cs b/tests/Paramore.Brighter.MySQL.Tests/MySqlTestHelper.cs index 4eddaeedf4..0a1f60a8bd 100644 --- a/tests/Paramore.Brighter.MySQL.Tests/MySqlTestHelper.cs +++ b/tests/Paramore.Brighter.MySQL.Tests/MySqlTestHelper.cs @@ -33,15 +33,11 @@ public MySqlTestHelper(bool binaryMessagePayload = false) public void CreateDatabase() { - using (var connection = new MySqlConnection(_mysqlSettings.TestsMasterConnectionString)) - { - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = @"CREATE DATABASE IF NOT EXISTS BrighterTests;"; - command.ExecuteNonQuery(); - } - } + using var connection = new MySqlConnection(_mysqlSettings.TestsMasterConnectionString); + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = @"CREATE DATABASE IF NOT EXISTS BrighterTests;"; + command.ExecuteNonQuery(); } public void SetupMessageDb() @@ -57,49 +53,37 @@ public void SetupCommandDb() } public void CleanUpDb() - { - using (var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString)) - { - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = $@"DROP TABLE IF EXISTS {_tableName}"; - command.ExecuteNonQuery(); - } - } - } + { + using var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString); + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = $@"DROP TABLE IF EXISTS {_tableName}"; + command.ExecuteNonQuery(); + } public void CreateOutboxTable() { - using (var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString)) - { - _tableName = $"`message_{_tableName}`"; - var createTableSql = - MySqlOutboxBuilder.GetDDL(_tableName, hasBinaryMessagePayload: _binaryMessagePayload); + using var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString); + _tableName = $"`message_{_tableName}`"; + var createTableSql = + MySqlOutboxBuilder.GetDDL(_tableName, hasBinaryMessagePayload: _binaryMessagePayload); - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = createTableSql; - command.ExecuteNonQuery(); - } - } + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = createTableSql; + command.ExecuteNonQuery(); } public void CreateInboxTable() { - using (var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString)) - { - _tableName = $"`command_{_tableName}`"; - var createTableSql = MySqlInboxBuilder.GetDDL(_tableName); + using var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString); + _tableName = $"`command_{_tableName}`"; + var createTableSql = MySqlInboxBuilder.GetDDL(_tableName); - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = createTableSql; - command.ExecuteNonQuery(); - } - } + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = createTableSql; + command.ExecuteNonQuery(); } } diff --git a/tests/Paramore.Brighter.PostgresSQL.Tests/PostgresSqlTestHelper.cs b/tests/Paramore.Brighter.PostgresSQL.Tests/PostgresSqlTestHelper.cs index 9c4bea9578..2cd19afd73 100644 --- a/tests/Paramore.Brighter.PostgresSQL.Tests/PostgresSqlTestHelper.cs +++ b/tests/Paramore.Brighter.PostgresSQL.Tests/PostgresSqlTestHelper.cs @@ -69,15 +69,11 @@ private void CreateDatabase() if (createDatabase) try { - using (var connection = new NpgsqlConnection(_postgreSqlSettings.TestsMasterConnectionString)) - { - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = @"CREATE DATABASE brightertests"; - command.ExecuteNonQuery(); - } - } + using var connection = new NpgsqlConnection(_postgreSqlSettings.TestsMasterConnectionString); + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = @"CREATE DATABASE brightertests"; + command.ExecuteNonQuery(); } catch (PostgresException sqlException) { @@ -93,46 +89,34 @@ private void CreateDatabase() private void CreateOutboxTable() { - using (var connection = new NpgsqlConnection(_postgreSqlSettings.TestsBrighterConnectionString)) - { - _tableName = $"message_{_tableName}"; - var createTableSql = PostgreSqlOutboxBulder.GetDDL(_tableName, Configuration.BinaryMessagePayload); - - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = createTableSql; - command.ExecuteNonQuery(); - } - } + using var connection = new NpgsqlConnection(_postgreSqlSettings.TestsBrighterConnectionString); + _tableName = $"message_{_tableName}"; + var createTableSql = PostgreSqlOutboxBulder.GetDDL(_tableName, Configuration.BinaryMessagePayload); + + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = createTableSql; + command.ExecuteNonQuery(); } public void CreateInboxTable() { - using (var connection = new NpgsqlConnection(_postgreSqlSettings.TestsBrighterConnectionString)) - { - _tableName = $"command_{_tableName}"; - var createTableSql = PostgreSqlInboxBuilder.GetDDL(_tableName); - - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = createTableSql; - command.ExecuteNonQuery(); - } - } + using var connection = new NpgsqlConnection(_postgreSqlSettings.TestsBrighterConnectionString); + _tableName = $"command_{_tableName}"; + var createTableSql = PostgreSqlInboxBuilder.GetDDL(_tableName); + + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = createTableSql; + command.ExecuteNonQuery(); } public void CleanUpDb() { - using (var connection = new NpgsqlConnection(_postgreSqlSettings.TestsBrighterConnectionString)) - { - connection.Open(); - using (var command = connection.CreateCommand()) - { - command.CommandText = $@"DROP TABLE IF EXISTS {_tableName}"; - command.ExecuteNonQuery(); - } - } + using var connection = new NpgsqlConnection(_postgreSqlSettings.TestsBrighterConnectionString); + connection.Open(); + using var command = connection.CreateCommand(); + command.CommandText = $@"DROP TABLE IF EXISTS {_tableName}"; + command.ExecuteNonQuery(); } } diff --git a/tests/Paramore.Brighter.Redis.Tests/MessagingGateway/When_overriding_client_configuration_via_the_gateway.cs b/tests/Paramore.Brighter.Redis.Tests/MessagingGateway/When_overriding_client_configuration_via_the_gateway.cs index 6ef821f781..fd6b5a39ad 100644 --- a/tests/Paramore.Brighter.Redis.Tests/MessagingGateway/When_overriding_client_configuration_via_the_gateway.cs +++ b/tests/Paramore.Brighter.Redis.Tests/MessagingGateway/When_overriding_client_configuration_via_the_gateway.cs @@ -29,23 +29,21 @@ public void When_overriding_client_configuration_via_the_gateway() MessageTimeToLive = TimeSpan.FromMinutes(30), VerifyMasterConnections = false }; - - using (var gateway = new TestRedisGateway(configuration)) - { - //Redis Config is static, so we can just look at the values we should have initialized - RedisConfig.BackOffMultiplier.Should().Be(configuration.BackoffMultiplier.Value); - RedisConfig.BackOffMultiplier.Should().Be(configuration.BackoffMultiplier.Value); - RedisConfig.DeactivatedClientsExpiry.Should().Be(configuration.DeactivatedClientsExpiry.Value); - RedisConfig.DefaultConnectTimeout.Should().Be(configuration.DefaultConnectTimeout.Value); - RedisConfig.DefaultIdleTimeOutSecs.Should().Be(configuration.DefaultIdleTimeOutSecs.Value); - RedisConfig.DefaultReceiveTimeout.Should().Be(configuration.DefaultReceiveTimeout.Value); - RedisConfig.DefaultSendTimeout.Should().Be(configuration.DefaultSendTimeout.Value); - RedisConfig.EnableVerboseLogging.Should().Be(!configuration.DisableVerboseLogging.Value); - RedisConfig.HostLookupTimeoutMs.Should().Be(configuration.HostLookupTimeoutMs.Value); - RedisConfig.DefaultMaxPoolSize.Should().Be(configuration.MaxPoolSize.Value); - gateway.MessageTimeToLive.Should().Be(configuration.MessageTimeToLive.Value); - RedisConfig.VerifyMasterConnections.Should().Be(configuration.VerifyMasterConnections.Value); - } + + using var gateway = new TestRedisGateway(configuration); + //Redis Config is static, so we can just look at the values we should have initialized + RedisConfig.BackOffMultiplier.Should().Be(configuration.BackoffMultiplier.Value); + RedisConfig.BackOffMultiplier.Should().Be(configuration.BackoffMultiplier.Value); + RedisConfig.DeactivatedClientsExpiry.Should().Be(configuration.DeactivatedClientsExpiry.Value); + RedisConfig.DefaultConnectTimeout.Should().Be(configuration.DefaultConnectTimeout.Value); + RedisConfig.DefaultIdleTimeOutSecs.Should().Be(configuration.DefaultIdleTimeOutSecs.Value); + RedisConfig.DefaultReceiveTimeout.Should().Be(configuration.DefaultReceiveTimeout.Value); + RedisConfig.DefaultSendTimeout.Should().Be(configuration.DefaultSendTimeout.Value); + RedisConfig.EnableVerboseLogging.Should().Be(!configuration.DisableVerboseLogging.Value); + RedisConfig.HostLookupTimeoutMs.Should().Be(configuration.HostLookupTimeoutMs.Value); + RedisConfig.DefaultMaxPoolSize.Should().Be(configuration.MaxPoolSize.Value); + gateway.MessageTimeToLive.Should().Be(configuration.MessageTimeToLive.Value); + RedisConfig.VerifyMasterConnections.Should().Be(configuration.VerifyMasterConnections.Value); } diff --git a/tests/Paramore.Brighter.Sqlite.Tests/Outbox/SQlOutboxMigrationTests.cs b/tests/Paramore.Brighter.Sqlite.Tests/Outbox/SQlOutboxMigrationTests.cs index de69a043cb..e7d6103263 100644 --- a/tests/Paramore.Brighter.Sqlite.Tests/Outbox/SQlOutboxMigrationTests.cs +++ b/tests/Paramore.Brighter.Sqlite.Tests/Outbox/SQlOutboxMigrationTests.cs @@ -63,19 +63,17 @@ private void AddHistoricMessage(Message message) new SqliteParameter("Body", message.Body.Value), }; - using (var connection = new SqliteConnection(_sqliteTestHelper.ConnectionString)) - using (var command = connection.CreateCommand()) - { - connection.Open(); + using var connection = new SqliteConnection(_sqliteTestHelper.ConnectionString); + using var command = connection.CreateCommand(); + connection.Open(); - command.CommandText = sql; - //command.Parameters.AddRange(parameters); used to work... but can't with current Sqlite lib. Iterator issue - for (var index = 0; index < parameters.Length; index++) - { - command.Parameters.Add(parameters[index]); - } - command.ExecuteNonQuery(); + command.CommandText = sql; + //command.Parameters.AddRange(parameters); used to work... but can't with current Sqlite lib. Iterator issue + for (var index = 0; index < parameters.Length; index++) + { + command.Parameters.Add(parameters[index]); } + command.ExecuteNonQuery(); } [Fact] diff --git a/tests/Paramore.Brighter.Sqlite.Tests/SqliteTestHelper.cs b/tests/Paramore.Brighter.Sqlite.Tests/SqliteTestHelper.cs index 8080a81699..2c837c00bc 100644 --- a/tests/Paramore.Brighter.Sqlite.Tests/SqliteTestHelper.cs +++ b/tests/Paramore.Brighter.Sqlite.Tests/SqliteTestHelper.cs @@ -68,16 +68,12 @@ public async Task CleanUpDbAsync() private void CreateDatabaseWithTable(string dataSourceTestDb, string createTableScript) { - using (var sqliteConnection = new SqliteConnection(dataSourceTestDb)) - { - using (var command = sqliteConnection.CreateCommand()) - { - command.CommandText = createTableScript; + using var sqliteConnection = new SqliteConnection(dataSourceTestDb); + using var command = sqliteConnection.CreateCommand(); + command.CommandText = createTableScript; - sqliteConnection.Open(); - command.ExecuteNonQuery(); - } - } + sqliteConnection.Open(); + command.ExecuteNonQuery(); } } } From 45b0ad992cde16e4b762d75f8e9f92d0cd5c843c Mon Sep 17 00:00:00 2001 From: Simon Cropp Date: Fri, 26 Apr 2024 20:22:15 +1000 Subject: [PATCH 2/2] Update MigrationSqlHelper.cs --- .../Greetings/Adaptors/Data/MigrationSqlHelper.cs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs b/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs index 49c23caac7..1330da4202 100644 --- a/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs +++ b/samples/ASBTaskQueue/Greetings/Adaptors/Data/MigrationSqlHelper.cs @@ -12,11 +12,15 @@ public static void ApplyResource(Migration migration, MigrationBuilder migration var assembly = Assembly.GetExecutingAssembly(); var name = $"{migration.GetType().Namespace}.{resourceName}"; - using var stream = assembly.GetManifestResourceStream(name); - if (stream == null) throw new ArgumentNullException(resourceName); - using var textStreamReader = new StreamReader(stream); - var sql = textStreamReader.ReadToEnd(); - migrationBuilder.Sql(sql); + using (var stream = assembly.GetManifestResourceStream(name)) + { + if (stream == null) throw new ArgumentNullException(resourceName); + using (var textStreamReader = new StreamReader(stream)) + { + var sql = textStreamReader.ReadToEnd(); + migrationBuilder.Sql(sql); + } + } } } }