From feeee68eefd71f03a8274115baabf1e5a1eee05d Mon Sep 17 00:00:00 2001 From: Julien Mounier Date: Wed, 17 Sep 2025 18:52:58 +0200 Subject: [PATCH] feat(EFCore): enable optimistic concurrency support --- docs/guide/durability/efcore/sagas.md | 5 +- .../Optimistic_concurrency_with_ef_core.cs | 111 ++++++++++++++++++ .../Codegen/EFCorePersistenceFrameProvider.cs | 72 +++++++++++- 3 files changed, 185 insertions(+), 3 deletions(-) create mode 100644 src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs diff --git a/docs/guide/durability/efcore/sagas.md b/docs/guide/durability/efcore/sagas.md index e0e4af151..29b075ce8 100644 --- a/docs/guide/durability/efcore/sagas.md +++ b/docs/guide/durability/efcore/sagas.md @@ -112,9 +112,12 @@ public class OrdersDbContext : DbContext modelBuilder.Entity(map => { map.ToTable("orders", "sample"); - map.HasKey(x => x.Id); map.Property(x => x.OrderStatus) .HasConversion(v => v.ToString(), v => Enum.Parse(v)); + + // enable optimistic concurrency + map.Property(x => x.Version) + .IsConcurrencyToken(); }); } } diff --git a/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs new file mode 100644 index 000000000..7de6002bf --- /dev/null +++ b/src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs @@ -0,0 +1,111 @@ +using EfCoreTests.MultiTenancy; +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Microsoft.Data.SqlClient; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using SharedPersistenceModels.Items; +using Shouldly; +using Weasel.Core; +using Weasel.SqlServer; +using Weasel.SqlServer.Tables; +using Wolverine; +using Wolverine.Attributes; +using Wolverine.EntityFrameworkCore; +using Wolverine.Runtime.Handlers; +using Wolverine.SqlServer; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace EfCoreTests; + +[Collection("sqlserver")] +public class Optimistic_concurrency_with_ef_core +{ + private readonly ITestOutputHelper _output; + + public Optimistic_concurrency_with_ef_core(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task detect_concurrency_exception_as_SagaConcurrencyException() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opt => + { + opt.Services.AddDbContextWithWolverineIntegration(o => + { + o.UseSqlServer(Servers.SqlServerConnectionString); + }); + + opt.Services.AddScoped(); + + opt.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString); + opt.UseEntityFrameworkCoreTransactions(); + opt.Policies.UseDurableLocalQueues(); + opt.Policies.AutoApplyTransactions(); + }).StartAsync(); + + var table = new Table("ConcurrencyTestSagas"); + table.AddColumn("id").AsPrimaryKey(); + table.AddColumn("value"); + table.AddColumn("version"); + await using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + + var migration = await SchemaMigration.DetermineAsync(conn, table); + await new SqlServerMigrator().ApplyAllAsync(conn, migration, AutoCreate.All); + + using var scope = host.Services.CreateScope(); + var dbContext = scope.ServiceProvider.GetRequiredService(); + await dbContext.Database.EnsureCreatedAsync(); + + await conn.CloseAsync(); + + await dbContext.ConcurrencyTestSagas.AddAsync(new() + { + Id = Guid.NewGuid(), + Value = "initial value", + Version = 0, + }); + await dbContext.SaveChangesAsync(); + + Should.ThrowAsync(() => host.InvokeMessageAndWaitAsync(new UpdateConcurrencyTestSaga(Guid.NewGuid(), "updated value"))); + } +} + +public class OptConcurrencyDbContext : DbContext +{ + public OptConcurrencyDbContext(DbContextOptions options) : base(options) + { + } + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.Entity() + .Property("Version") + .IsConcurrencyToken(); + } + + public DbSet ConcurrencyTestSagas { get; set; } +} + +public record UpdateConcurrencyTestSaga(Guid Id, string NewValue); + +public class ConcurrencyTestSaga : Saga +{ + public Guid Id { get; set; } + public string Value { get; set; } + public void Handle(UpdateConcurrencyTestSaga order, OptConcurrencyDbContext ctx) + { + // Fake 999 updates of the saga while this event is being handled + ctx.ConcurrencyTestSagas.Entry(this).Property("Version").OriginalValue = 999; + + Value = order.NewValue; + } +} diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs index 6d4873d3e..27f547025 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/Codegen/EFCorePersistenceFrameProvider.cs @@ -71,12 +71,13 @@ public Frame CommitUnitOfWorkFrame(Variable saga, IServiceContainer container) call.CommentText = "Committing any pending entity changes to the database"; call.ReturnVariable!.OverrideName(call.ReturnVariable.Usage + "1"); - return call; + return new WrapSagaConcurrencyException(saga, call); } public Frame DetermineUpdateFrame(Variable saga, IServiceContainer container) { - return new CommentFrame("No explicit update necessary with EF Core"); + var dbContextType = DetermineDbContextType(saga.VariableType, container); + return new IncrementSagaVersionIfNecessary(dbContextType, saga); } public Frame DetermineDeleteFrame(Variable sagaId, Variable saga, IServiceContainer container) @@ -342,6 +343,73 @@ public override IEnumerable FindVariables(IMethodVariables chain) } } + public class IncrementSagaVersionIfNecessary : SyncFrame + { + private readonly Type _dbContextType; + private readonly Variable _saga; + private Variable? _context; + + public IncrementSagaVersionIfNecessary(Type dbContextType, Variable saga) + { + _dbContextType = dbContextType; + _saga = saga; + } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + yield return _saga; + + _context = chain.FindVariable(_dbContextType); + yield return _context; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteLine(""); + writer.WriteComment("If the saga state changed, then increment it's version to support optimistic concurrency"); + writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}.Type == EntityState.Modified) {{ {_saga.Usage}.Version += 1; }}"); + + Next?.GenerateCode(method, writer); + } + } + + public class WrapSagaConcurrencyException : SyncFrame + { + private readonly Variable _saga; + private readonly Frame _frame; + + public WrapSagaConcurrencyException(Variable saga, Frame frame) + { + _saga = saga; + _frame = frame; + } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + foreach (var variable in _frame.FindVariables(chain)) yield return variable; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteLine("BLOCK:try"); + _frame.GenerateCode(method, writer); + writer.FinishBlock(); + + writer.WriteLine("BLOCK:catch (DbUpdateConcurrencyException error)"); + writer.WriteComment("Only intercepts concurrency error on the saga itself"); + + writer.WriteLine($"BLOCK:if (error.Entries.Any(e => e.Entity == ${_saga.Usage})"); + writer.WriteLine($"throw new SagaConcurrencyException($\"Saga of type {_saga.VariableType.FullNameInCode()} and id {{ {SagaChain.SagaIdVariableName} }} cannot be updated because of optimistic concurrency violations\");"); + writer.FinishBlock(); + + writer.WriteComment("Rethrow any other exception"); + writer.WriteLine("throw;"); + writer.FinishBlock(); + + Next?.GenerateCode(method, writer); + } + } + public class CommitDbContextTransactionIfNecessary : SyncFrame { private Variable? _envelopeTransaction;