Skip to content

feat(messaging): Avro Schema Registry and Kafka events (ADR-019)#91

Merged
phuongnse merged 2 commits into
mainfrom
feat/avro-schema-registry-adr-019
May 23, 2026
Merged

feat(messaging): Avro Schema Registry and Kafka events (ADR-019)#91
phuongnse merged 2 commits into
mainfrom
feat/avro-schema-registry-adr-019

Conversation

@phuongnse

@phuongnse phuongnse commented May 23, 2026

Copy link
Copy Markdown
Owner

Summary

Addresses CodeRabbit review on ADR-019 pilot: tenant-scoped WorkflowEngine upserts, deduplicated form references on publish, clearer Avro GUID parsing, CloudEvents using, and schema registration script file checks. Adds regression test for duplicate referencedFormIds in one event.

Linked spec

Requirements

  • Major: WorkflowPublishedHandler (FormBuilder) iterates deduplicated newFormIds; WorkflowEngine lookups filter by organizationId.
  • Quick wins: WorkflowBuilderEventExtensions field-level FormatException; CloudEventsEnvelopeRule uses PropertyInfo; register-avro-schemas.sh validates schema files exist.
  • Avro nested record consolidation deferred: avrogen does not resolve cross-file type refs without a bundled protocol (inline definitions kept).
Open in Web Open in Cursor 

Summary by CodeRabbit

  • New Features

    • Avro schema-based workflow lifecycle events and centralized Schema Registry support
    • Cross-module event publishing so Form Builder and Workflow Engine consume lifecycle events
  • Infrastructure Updates

    • CloudEvents metadata added to outgoing events for tracing and correlation
    • Kafka/Schema Registry wiring enabled with local-only routing option
  • Documentation

    • Progress and epic docs updated to reflect Phase 1 completion and Avro/Schema Registry rollout
  • Scripts

    • Added script to register Avro schemas in Schema Registry
  • Tests

    • Updated tests to use the new Avro-based event payloads

Review Change Stack

Add Axis.WorkflowBuilder.Contracts with Avro schemas, Confluent serializers,
CloudEvents envelope rules, and WorkflowBuilder lifecycle event routing.
Consumers use Contracts only; Testing uses local Wolverine routing.

Co-authored-by: Phuong Nguyen <phuongnse@users.noreply.github.com>
@coderabbitai

coderabbitai Bot commented May 23, 2026

Copy link
Copy Markdown

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: dbb222f0-5c8f-494a-a9d8-33176275b54f

📥 Commits

Reviewing files that changed from the base of the PR and between ec080f4 and 04d435c.

📒 Files selected for processing (6)
  • scripts/register-avro-schemas.sh
  • src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowPublishedHandler.cs
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderEventExtensions.cs
  • src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowPublishedHandler.cs
  • src/Shared/Axis.Shared.Infrastructure/Messaging/CloudEventsEnvelopeRule.cs
  • tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs

📝 Walkthrough

Walkthrough

This PR establishes Avro-based contract events for WorkflowBuilder lifecycle distribution via Kafka and Schema Registry. It adds a new contracts project with Avro schemas, maps domain events to Avro integration events, implements CloudEvents + Schema Registry wiring, updates handlers/tests to consume contract events, and configures the API to publish/listen via Avro.

Changes

WorkflowBuilder Contracts and Event Infrastructure

Layer / File(s) Summary
Avro schemas and contract types
src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Axis.WorkflowBuilder.Contracts.csproj, src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/*.avsc, src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderEventExtensions.cs, src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderKafkaTopics.cs
New Axis.WorkflowBuilder.Contracts project defines WorkflowPublishedEvent, WorkflowArchivedEvent, and WorkflowUnarchivedEvent Avro schemas with step and transition snapshots. Extension methods parse string UUIDs to Guid and expose form ID lists. Kafka topic constants are defined for each event type.
Domain event mapping and infrastructure updates
src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Infrastructure/Messaging/WorkflowBuilderEventMapper.cs, src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Infrastructure/Persistence/WorkflowBuilderUnitOfWork.cs, src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Infrastructure/Axis.WorkflowBuilder.Infrastructure.csproj, src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Axis.FormBuilder.Infrastructure.csproj, src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Axis.WorkflowEngine.Infrastructure.csproj
WorkflowBuilderEventMapper converts domain events to contract events, mapping IDs to strings and serializing step config as camelCase JSON. WorkflowBuilderUnitOfWork now maps events before publishing. FormBuilder and WorkflowEngine infrastructure projects now reference Axis.WorkflowBuilder.Contracts instead of Axis.WorkflowBuilder.Domain.
Event handler updates across modules
src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowArchivedHandler.cs, src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowPublishedHandler.cs, src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowUnarchivedHandler.cs, src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowArchivedHandler.cs, src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowPublishedHandler.cs, src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowUnarchivedHandler.cs
All six handlers now consume contract events from axis.workflowbuilder.events. Each handler extracts workflowId and organizationId once into locals for consistent logging and queries. WorkflowEngine's WorkflowPublishedHandler adds JSON deserialization of step configJson and extended concurrency/unique-constraint error handling.
Kafka and Schema Registry infrastructure
Directory.Packages.props, src/Shared/Axis.Shared.Infrastructure/Axis.Shared.Infrastructure.csproj, src/Shared/Axis.Shared.Infrastructure/Messaging/CloudEventsEnvelopeRule.cs, src/Shared/Axis.Shared.Infrastructure/Messaging/WolverineKafkaAvroExtensions.cs
Added Apache.Avro, Confluent.SchemaRegistry, Confluent.SchemaRegistry.Serdes.Avro, and WolverineFx.Kafka NuGet packages. CloudEventsEnvelopeRule stamps CloudEvents 1.0 headers and optional tenantid on Wolverine message envelopes, plus trace correlation. WolverineKafkaAvroExtensions configures Schema Registry client, Avro serializer, and provides helpers to publish/listen via Avro or publish locally.
API configuration and Kafka wiring
Axis.sln, src/Axis.Api/Program.cs, src/Axis.Api/appsettings.json, docker-compose.yml
Solution registers the new Contracts project and its build configurations. Program.cs imports contract namespaces and calls UseAxisKafkaAvro to register CloudEvents rule, conditionally configure Kafka Avro transport for WorkflowPublishedEvent, WorkflowArchivedEvent, and WorkflowUnarchivedEvent, and publish them locally in Testing environment. appsettings.json adds Schema Registry URL; docker-compose.yml exposes it to the api container.
Handler test updates
tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowArchivedHandlerTests.cs, tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs, tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowUnarchivedHandlerTests.cs, tests/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure.Tests/Handlers/WorkflowArchivedHandlerTests.cs, tests/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs, tests/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure.Tests/Handlers/WorkflowUnarchivedHandlerTests.cs
All handler tests updated to construct contract event objects (WorkflowPublishedEvent, etc.) with string IDs, empty snapshot arrays, and Avro-compatible record types, replacing prior domain event construction calls.
Documentation, roadmap, and schema registration
docs/PROGRESS.md, docs/epics/E04-workflow-builder/README.md, docs/epics/E05-form-builder/README.md, docs/epics/E06-workflow-engine/README.md, docs/epics/README.md, docs/playbooks/patterns.md, scripts/register-avro-schemas.sh
Progress and epic readmes updated to reflect Phase 1 Infrastructure completion and Avro/Schema Registry implementation for WorkflowBuilder events (ADR-019). Pattern playbook updated to reference schema location under Axis.WorkflowBuilder.Contracts/Schemas/. New register-avro-schemas.sh script POSTs the three workflow event Avro schemas to Schema Registry.

Sequence Diagram(s)

sequenceDiagram
  participant App as Application Startup
  participant Wolverine as WolverineOptions
  participant SR as Schema Registry
  participant Kafka as Kafka Broker
  App->>Wolverine: UseAxisKafkaAvro(config, useKafkaEventTransport)
  Wolverine->>Wolverine: Register CloudEventsEnvelopeRule
  alt Kafka transport enabled
    Wolverine->>SR: Read SchemaRegistry:Url
    Wolverine->>Wolverine: Create SchemaRegistryAvroSerializer
    Wolverine->>Kafka: PublishAndListenWithAvro(topic, serializer)
  else Testing environment
    Wolverine->>Wolverine: PublishLocally for workflow events
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • phuong-labs/axis#87: Related refactor of WorkflowBuilderUnitOfWork.SaveChangesAsync that this PR further adapts by mapping domain events before publishing.
  • phuong-labs/axis#46: Earlier changes to WorkflowEngine handlers that overlap with this PR's handler updates and snapshot upsert logic.

Poem

🐰 I nibble schemas under moonlight bright,
Avro petals stitched for Kafka's flight,
Contracts sing in topics, CloudEvents hum,
Handlers wake and parse the IDs I strum,
A rabbit's hop — registry registered, done!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 1.75% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main change: implementation of Avro, Schema Registry, and Kafka events with CloudEvents support (ADR-019), which is the primary objective of this comprehensive pull request.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/avro-schema-registry-adr-019

Comment @coderabbitai help to get the list of available commands and usage tips.

@phuongnse phuongnse marked this pull request as ready for review May 23, 2026 15:25

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderEventExtensions.cs (1)

8-24: ⚡ Quick win

Harden GUID parsing in WorkflowBuilderEventExtensions to improve poison-message diagnostics.

Guid.Parse is used directly for workflowId, organizationId, and referencedFormIds in WorkflowBuilderEventExtensions.cs, and multiple handlers in WorkflowEngine/FormBuilder call these accessors—so invalid payloads will throw with low field-level context. Switch to Guid.TryParse and throw a FormatException that includes the field name (as proposed).

Proposed refactor
 public static class WorkflowBuilderEventExtensions
 {
-    public static Guid WorkflowId(this WorkflowPublishedEvent `@event`) => Guid.Parse(`@event.workflowId`);
+    public static Guid WorkflowId(this WorkflowPublishedEvent `@event`)
+        => ParseRequiredGuid(`@event.workflowId`, nameof(`@event.workflowId`));
 
-    public static Guid OrganizationId(this WorkflowPublishedEvent `@event`) => Guid.Parse(`@event.organizationId`);
+    public static Guid OrganizationId(this WorkflowPublishedEvent `@event`)
+        => ParseRequiredGuid(`@event.organizationId`, nameof(`@event.organizationId`));
 
     public static IReadOnlyList<Guid> ReferencedFormIds(this WorkflowPublishedEvent `@event`)
-        => `@event.referencedFormIds.Select`(Guid.Parse).ToList();
+        => `@event.referencedFormIds`
+            .Select(id => ParseRequiredGuid(id, "referencedFormIds[]"))
+            .ToList();
 
-    public static Guid WorkflowId(this WorkflowArchivedEvent `@event`) => Guid.Parse(`@event.workflowId`);
+    public static Guid WorkflowId(this WorkflowArchivedEvent `@event`)
+        => ParseRequiredGuid(`@event.workflowId`, nameof(`@event.workflowId`));
 
-    public static Guid OrganizationId(this WorkflowArchivedEvent `@event`) => Guid.Parse(`@event.organizationId`);
+    public static Guid OrganizationId(this WorkflowArchivedEvent `@event`)
+        => ParseRequiredGuid(`@event.organizationId`, nameof(`@event.organizationId`));
 
-    public static Guid WorkflowId(this WorkflowUnarchivedEvent `@event`) => Guid.Parse(`@event.workflowId`);
+    public static Guid WorkflowId(this WorkflowUnarchivedEvent `@event`)
+        => ParseRequiredGuid(`@event.workflowId`, nameof(`@event.workflowId`));
 
-    public static Guid OrganizationId(this WorkflowUnarchivedEvent `@event`) => Guid.Parse(`@event.organizationId`);
+    public static Guid OrganizationId(this WorkflowUnarchivedEvent `@event`)
+        => ParseRequiredGuid(`@event.organizationId`, nameof(`@event.organizationId`));
 
     public static IReadOnlyList<Guid> ReferencedFormIds(this WorkflowUnarchivedEvent `@event`)
-        => `@event.referencedFormIds.Select`(Guid.Parse).ToList();
+        => `@event.referencedFormIds`
+            .Select(id => ParseRequiredGuid(id, "referencedFormIds[]"))
+            .ToList();
+
+    private static Guid ParseRequiredGuid(string value, string fieldName)
+        => Guid.TryParse(value, out Guid parsed)
+            ? parsed
+            : throw new FormatException($"Invalid GUID in field '{fieldName}'.");
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderEventExtensions.cs`
around lines 8 - 24, The current extension methods (WorkflowId, OrganizationId,
ReferencedFormIds) on WorkflowPublishedEvent, WorkflowArchivedEvent and
WorkflowUnarchivedEvent use Guid.Parse which throws without field-level context;
replace those parses with Guid.TryParse and, on failure, throw a FormatException
that includes the field name (e.g., "workflowId" or "organizationId") and the
offending string value to aid poison-message diagnostics; for ReferencedFormIds
iterate the collection, try-parse each entry (including index or value in the
error), collect parsed GUIDs into the same IReadOnlyList<Guid> return, and keep
method signatures (WorkflowId, OrganizationId, ReferencedFormIds) unchanged.
src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/WorkflowPublishedEvent.avsc (1)

17-43: 🏗️ Heavy lift

Consolidate StepSnapshotRecord / TransitionSnapshotRecord schema definitions to avoid drift

WorkflowPublishedEvent.avsc inlines StepSnapshotRecord and TransitionSnapshotRecord for the steps/transitions arrays, and standalone schemas exist too (axis.workflowbuilder.events.StepSnapshotRecord / axis.workflowbuilder.events.TransitionSnapshotRecord). Since scripts/register-avro-schemas.sh registers only the workflow event subjects and doesn’t register/express schema references for these standalone record schemas, make one set the single source-of-truth for codegen (e.g., generate the inline records from the standalone .avsc or remove one set) so they can’t diverge.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/WorkflowPublishedEvent.avsc`
around lines 17 - 43, The inline record definitions for StepSnapshotRecord and
TransitionSnapshotRecord inside WorkflowPublishedEvent.avsc are duplicating the
standalone schemas (axis.workflowbuilder.events.StepSnapshotRecord /
TransitionSnapshotRecord) and risk divergence; change
WorkflowPublishedEvent.avsc so the "steps" and "transitions" array item types
reference the existing named types instead of re-declaring them (use the named
type references "axis.workflowbuilder.events.StepSnapshotRecord" and
"axis.workflowbuilder.events.TransitionSnapshotRecord"), and update
scripts/register-avro-schemas.sh to ensure the standalone record schemas are
registered (or otherwise included) so codegen consumes the single
source-of-truth records rather than the inline copies.
scripts/register-avro-schemas.sh (2)

10-20: ⚡ Quick win

Consider adding file existence validation.

The register() function attempts to read and POST each schema file without verifying it exists. If a schema file is missing or the path is incorrect, tr will fail with a less-than-obvious error message.

🛡️ Proposed enhancement for clearer errors
 register() {
   local file="$1"
   local subject="$2"
+  if [[ ! -f "$file" ]]; then
+    echo "ERROR: schema file not found: $file" >&2
+    exit 1
+  fi
   local schema
   schema="$(tr -d '\n' <"$file" | sed 's/"/\\"/g')"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@scripts/register-avro-schemas.sh` around lines 10 - 20, The register()
function reads schema files without checking they exist; add a pre-check for the
"file" path (and readability) inside register() and handle missing files by
printing a clear error referencing the subject and file, then exit non-zero or
skip registration; update the error path to avoid calling tr/sed/curl when the
file is absent and ensure SCHEMA_REGISTRY_URL and subject are still shown in the
error message for easier debugging.

14-14: 💤 Low value

Schema JSON escaping may be fragile for complex Avro schemas.

The sed 's/"/\\"/g' approach escapes double quotes but does not handle backslashes, control characters, or other JSON special characters. For simple Avro schemas this is sufficient, but more complex schemas (with string defaults containing escape sequences) could break.

If schemas remain simple (as they are now), the current escaping is adequate. If you later encounter escaping issues, consider using jq -c to canonicalize the schema JSON instead:

schema="$(jq -c . <"$file" | sed 's/"/\\"/g')"

This normalizes the JSON and removes formatting inconsistencies before escaping.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@scripts/register-avro-schemas.sh` at line 14, The schema JSON is currently
built with the fragile line that assigns to the variable `schema` using a simple
quote-escape which doesn't handle backslashes or control characters; update the
`schema` assignment in scripts/register-avro-schemas.sh to canonicalize and
compact the JSON first (use `jq -c .` on the file) and then perform the
quote-escaping before exporting or using `schema` so backslashes and other
special characters are preserved; ensure you replace the existing `schema="..."`
assignment with this jq-based preprocessing and keep the subsequent code that
uses the `schema` variable unchanged.
src/Shared/Axis.Shared.Infrastructure/Messaging/CloudEventsEnvelopeRule.cs (1)

55-55: ⚡ Quick win

Replace fully-qualified reflection type with a using import.

Line 55 should use a using import instead of System.Reflection.PropertyInfo inline.

Suggested fix
+using System.Reflection;
 using System.Diagnostics;
 using Wolverine;
@@
-        System.Reflection.PropertyInfo? property = message.GetType().GetProperty("organizationId");
+        PropertyInfo? property = message.GetType().GetProperty("organizationId");

As per coding guidelines, "Use using declarations instead of fully-qualified names for imports in C# files."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/Shared/Axis.Shared.Infrastructure/Messaging/CloudEventsEnvelopeRule.cs`
at line 55, Replace the fully-qualified System.Reflection type with a
using-import: add "using System.Reflection;" at the top of
CloudEventsEnvelopeRule.cs and change the declaration
"System.Reflection.PropertyInfo? property =
message.GetType().GetProperty(\"organizationId\");" to use the short type name
"PropertyInfo?" so the variable (property) uses PropertyInfo? from the imported
namespace; ensure the file compiles after adding the using.
tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs (1)

26-34: ⚡ Quick win

Add a regression test for duplicate referencedFormIds in one event.

Current coverage validates redelivery idempotency but not duplicated form IDs inside a single payload. Add one case to assert only one reference row is persisted for repeated IDs.

Also applies to: 85-103

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs`
around lines 26 - 34, The tests lack a regression case for duplicated
referencedFormIds in a single WorkflowPublishedEvent payload; update the test
suite by adding a new test (or extending the existing test range around
WorkflowPublishedHandlerTests) that uses BuildEvent to construct a
WorkflowPublishedEvent where referencedFormIds contains the same form GUID
repeated, then assert that only one reference row is persisted for that form ID
(i.e., deduplication happens inside the handler). Modify or overload BuildEvent
if needed to allow constructing repeated IDs (referencedFormIds), and in the new
test verify persistence/query results (or persisted rows count) for the
deduplicated referencedFormIds after invoking the handler.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowPublishedHandler.cs`:
- Line 27: The handler is inserting (workflowId, formId, organizationId) entries
for each ID in referencedFormIds and can attempt duplicate inserts when
referencedFormIds contains duplicates; change the code to deduplicate
referencedFormIds before the insert loop (e.g., construct newFormIds as a
HashSet from referencedFormIds) and iterate over that set when calling the
repository/insert logic (the variables/methods to update: referencedFormIds ->
newFormIds/HashSet<Guid>, the insert loop that uses workflowId and
organizationId), and apply the same deduplication for the other similar
loop/section around lines 36-43 so duplicates won’t trigger
UniqueConstraintException.

In
`@src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowPublishedHandler.cs`:
- Around line 36-38: The lookup in WorkflowPublishedHandler that fetches
WorkflowActiveStatus currently matches only workflowId and must be scoped by
organizationId to preserve tenant boundaries: update the query against
context.WorkflowActiveStatuses (the one assigning to variable existing) to
include w.OrganizationId == organizationId (and do the same for the second
upsert query later in the same handler) so both reads/updates match on
(WorkflowId, OrganizationId); ensure any FirstOrDefaultAsync or update/create
logic (in WorkflowPublishedHandler) uses this composite predicate to avoid
cross-tenant updates.

---

Nitpick comments:
In `@scripts/register-avro-schemas.sh`:
- Around line 10-20: The register() function reads schema files without checking
they exist; add a pre-check for the "file" path (and readability) inside
register() and handle missing files by printing a clear error referencing the
subject and file, then exit non-zero or skip registration; update the error path
to avoid calling tr/sed/curl when the file is absent and ensure
SCHEMA_REGISTRY_URL and subject are still shown in the error message for easier
debugging.
- Line 14: The schema JSON is currently built with the fragile line that assigns
to the variable `schema` using a simple quote-escape which doesn't handle
backslashes or control characters; update the `schema` assignment in
scripts/register-avro-schemas.sh to canonicalize and compact the JSON first (use
`jq -c .` on the file) and then perform the quote-escaping before exporting or
using `schema` so backslashes and other special characters are preserved; ensure
you replace the existing `schema="..."` assignment with this jq-based
preprocessing and keep the subsequent code that uses the `schema` variable
unchanged.

In
`@src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/WorkflowPublishedEvent.avsc`:
- Around line 17-43: The inline record definitions for StepSnapshotRecord and
TransitionSnapshotRecord inside WorkflowPublishedEvent.avsc are duplicating the
standalone schemas (axis.workflowbuilder.events.StepSnapshotRecord /
TransitionSnapshotRecord) and risk divergence; change
WorkflowPublishedEvent.avsc so the "steps" and "transitions" array item types
reference the existing named types instead of re-declaring them (use the named
type references "axis.workflowbuilder.events.StepSnapshotRecord" and
"axis.workflowbuilder.events.TransitionSnapshotRecord"), and update
scripts/register-avro-schemas.sh to ensure the standalone record schemas are
registered (or otherwise included) so codegen consumes the single
source-of-truth records rather than the inline copies.

In
`@src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderEventExtensions.cs`:
- Around line 8-24: The current extension methods (WorkflowId, OrganizationId,
ReferencedFormIds) on WorkflowPublishedEvent, WorkflowArchivedEvent and
WorkflowUnarchivedEvent use Guid.Parse which throws without field-level context;
replace those parses with Guid.TryParse and, on failure, throw a FormatException
that includes the field name (e.g., "workflowId" or "organizationId") and the
offending string value to aid poison-message diagnostics; for ReferencedFormIds
iterate the collection, try-parse each entry (including index or value in the
error), collect parsed GUIDs into the same IReadOnlyList<Guid> return, and keep
method signatures (WorkflowId, OrganizationId, ReferencedFormIds) unchanged.

In `@src/Shared/Axis.Shared.Infrastructure/Messaging/CloudEventsEnvelopeRule.cs`:
- Line 55: Replace the fully-qualified System.Reflection type with a
using-import: add "using System.Reflection;" at the top of
CloudEventsEnvelopeRule.cs and change the declaration
"System.Reflection.PropertyInfo? property =
message.GetType().GetProperty(\"organizationId\");" to use the short type name
"PropertyInfo?" so the variable (property) uses PropertyInfo? from the imported
namespace; ensure the file compiles after adding the using.

In
`@tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs`:
- Around line 26-34: The tests lack a regression case for duplicated
referencedFormIds in a single WorkflowPublishedEvent payload; update the test
suite by adding a new test (or extending the existing test range around
WorkflowPublishedHandlerTests) that uses BuildEvent to construct a
WorkflowPublishedEvent where referencedFormIds contains the same form GUID
repeated, then assert that only one reference row is persisted for that form ID
(i.e., deduplication happens inside the handler). Modify or overload BuildEvent
if needed to allow constructing repeated IDs (referencedFormIds), and in the new
test verify persistence/query results (or persisted rows count) for the
deduplicated referencedFormIds after invoking the handler.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: a8a99639-1029-4fe3-ad1a-6296c5cf33d1

📥 Commits

Reviewing files that changed from the base of the PR and between fb0c70d and ec080f4.

⛔ Files ignored due to path filters (5)
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Generated/axis/workflowbuilder/events/StepSnapshotRecord.cs is excluded by !**/generated/**
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Generated/axis/workflowbuilder/events/TransitionSnapshotRecord.cs is excluded by !**/generated/**
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Generated/axis/workflowbuilder/events/WorkflowArchivedEvent.cs is excluded by !**/generated/**
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Generated/axis/workflowbuilder/events/WorkflowPublishedEvent.cs is excluded by !**/generated/**
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Generated/axis/workflowbuilder/events/WorkflowUnarchivedEvent.cs is excluded by !**/generated/**
📒 Files selected for processing (40)
  • Axis.sln
  • Directory.Packages.props
  • docker-compose.yml
  • docs/PROGRESS.md
  • docs/epics/E04-workflow-builder/README.md
  • docs/epics/E05-form-builder/README.md
  • docs/epics/E06-workflow-engine/README.md
  • docs/epics/README.md
  • docs/playbooks/patterns.md
  • scripts/register-avro-schemas.sh
  • src/Axis.Api/Program.cs
  • src/Axis.Api/appsettings.json
  • src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Axis.FormBuilder.Infrastructure.csproj
  • src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowArchivedHandler.cs
  • src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowPublishedHandler.cs
  • src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowUnarchivedHandler.cs
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Axis.WorkflowBuilder.Contracts.csproj
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/StepSnapshotRecord.avsc
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/TransitionSnapshotRecord.avsc
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/WorkflowArchivedEvent.avsc
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/WorkflowPublishedEvent.avsc
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/Schemas/WorkflowUnarchivedEvent.avsc
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderEventExtensions.cs
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Contracts/WorkflowBuilderKafkaTopics.cs
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Infrastructure/Axis.WorkflowBuilder.Infrastructure.csproj
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Infrastructure/Messaging/WorkflowBuilderEventMapper.cs
  • src/Modules/WorkflowBuilder/Axis.WorkflowBuilder.Infrastructure/Persistence/WorkflowBuilderUnitOfWork.cs
  • src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Axis.WorkflowEngine.Infrastructure.csproj
  • src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowArchivedHandler.cs
  • src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowPublishedHandler.cs
  • src/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure/Handlers/WorkflowUnarchivedHandler.cs
  • src/Shared/Axis.Shared.Infrastructure/Axis.Shared.Infrastructure.csproj
  • src/Shared/Axis.Shared.Infrastructure/Messaging/CloudEventsEnvelopeRule.cs
  • src/Shared/Axis.Shared.Infrastructure/Messaging/WolverineKafkaAvroExtensions.cs
  • tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowArchivedHandlerTests.cs
  • tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs
  • tests/Modules/FormBuilder/Axis.FormBuilder.Infrastructure.Tests/Handlers/WorkflowUnarchivedHandlerTests.cs
  • tests/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure.Tests/Handlers/WorkflowArchivedHandlerTests.cs
  • tests/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure.Tests/Handlers/WorkflowPublishedHandlerTests.cs
  • tests/Modules/WorkflowEngine/Axis.WorkflowEngine.Infrastructure.Tests/Handlers/WorkflowUnarchivedHandlerTests.cs


// Remove references no longer in the workflow
HashSet<Guid> newFormIds = [.. @event.ReferencedFormIds];
HashSet<Guid> newFormIds = [.. referencedFormIds];

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Deduplicate referencedFormIds before insert loop.

If the incoming event contains duplicate form IDs, this loop can add the same (workflowId, formId, organizationId) twice in one unit of work and fall into UniqueConstraintException, skipping the sync.

Suggested fix
-        foreach (Guid formId in referencedFormIds)
+        foreach (Guid formId in newFormIds)
         {
             FormWorkflowReference? existingRef = existing.FirstOrDefault(r => r.FormId == formId);
             if (existingRef is null)
             {
                 context.FormWorkflowReferences.Add(
                     FormWorkflowReference.Create(workflowId, formId, organizationId));
                 addedCount++;
             }
             else
                 existingRef.Reactivate();
         }

Also applies to: 36-43

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/Modules/FormBuilder/Axis.FormBuilder.Infrastructure/Handlers/WorkflowPublishedHandler.cs`
at line 27, The handler is inserting (workflowId, formId, organizationId)
entries for each ID in referencedFormIds and can attempt duplicate inserts when
referencedFormIds contains duplicates; change the code to deduplicate
referencedFormIds before the insert loop (e.g., construct newFormIds as a
HashSet from referencedFormIds) and iterate over that set when calling the
repository/insert logic (the variables/methods to update: referencedFormIds ->
newFormIds/HashSet<Guid>, the insert loop that uses workflowId and
organizationId), and apply the same deduplication for the other similar
loop/section around lines 36-43 so duplicates won’t trigger
UniqueConstraintException.

Co-authored-by: Phuong Nguyen <phuongnse@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants