Skip to content

fix(amber): wire ExecutionReconfigurationService back to the engine#4531

Merged
Yicong-Huang merged 1 commit into
apache:mainfrom
Yicong-Huang:fix/reconfiguration-rpc-wiring
Apr 27, 2026
Merged

fix(amber): wire ExecutionReconfigurationService back to the engine#4531
Yicong-Huang merged 1 commit into
apache:mainfrom
Yicong-Huang:fix/reconfiguration-rpc-wiring

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

@Yicong-Huang Yicong-Huang commented Apr 26, 2026

What changes were proposed in this PR?

Follow-up to #4220, restoring the full reconfiguration flow end-to-end. Two coupled gaps:

1. Web-service entrypoint never dispatched.
ExecutionReconfigurationService.performReconfigurationOnResume was still throwing "reconfiguration is tentatively disabled." and the body that calls controllerInterface.reconfigureWorkflow was commented out. Restored, adapted to the current proto shape (UpdateExecutorRequest(targetOpId, newExecInitInfo) — no more proto-Any boxing). Resets ExecutionReconfigurationStore with a fresh currentReconfigId after dispatch.

StateTransferFunc is dropped — the new request schema doesn't carry it.

2. Engine never reported per-worker completion.
ReconfigurationHandler collected the worker updateExecutor futures but only returned EmptyReturn when they all finished — no UpdateExecutorCompleted(worker) events were ever sent to the client. Without those, ExecutionReconfigurationService.completedReconfigurations stayed empty, the diff handler never fired, and the frontend never saw ModifyLogicCompletedEvent. The UpdateExecutorCompleted case class was effectively dead code.

Each per-worker future is now wrapped with sendToClient(UpdateExecutorCompleted(worker)) in both Fries-component branches (single-op and multi-op). The web-service client.registerCallback[UpdateExecutorCompleted] is re-enabled to advance completedReconfigurations on receipt.

Any related issues, documentation, discussions?

Follow-up to #4220. See discussion #4016.

How was this PR tested?

ExecutionReconfigurationServiceSpec (new) covers:

  • empty pending list → no dispatch, store unchanged;
  • non-empty list → one dispatch carrying the right (targetOpId, newExecInitInfo) pairs, store reset with a fresh currentReconfigId;
  • consecutive resumes get distinct reconfigurationIds;
  • worker completion (onWorkerReconfigured) updates completedReconfigurations with Set semantics (idempotent on duplicates).

The test uses three protected seams (dispatch, registerWorkerCompletionCallback, registerCompletionDiffHandler) so the service can be constructed without a live AmberClient or Workflow.

End-to-end engine path is covered by ReconfigurationSpec from #4220; the new sendToClient calls are no-ops when no callback is registered, so existing assertions are unaffected.

Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

@Yicong-Huang
Copy link
Copy Markdown
Contributor Author

Added a smoke test: ExecutionReconfigurationServiceSpec verifying the empty-short-circuit path doesn't dereference client or workflow and leaves the reconfiguration store unchanged.

I deliberately scoped the unit test to that one invariant: the dispatch path that calls controllerInterface.reconfigureWorkflow is hard to mock at this layer (constructing AmberClient requires an ActorSystem plus an actor handshake at construction time). The engine-side ReconfigurationSpec already covers the controller RPC end-to-end with both Java and Python operators.

Open to refactoring the constructor to take ControllerServiceFs2Grpc[Future, Unit] instead of the whole AmberClient if reviewers prefer — that would let us stub the interface cleanly. Happy to do it in a follow-up if it's preferred separate from this fix.

@Yicong-Huang Yicong-Huang self-assigned this Apr 26, 2026
Copy link
Copy Markdown
Contributor

@zuozhiw zuozhiw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good can we leave a TODO comment to say that the state transfer function for java/scala operators are not implemented

@Yicong-Huang
Copy link
Copy Markdown
Contributor Author

looks good can we leave a TODO comment to say that the state transfer function for java/scala operators are not implemented

yup already added comment

@Yicong-Huang Yicong-Huang enabled auto-merge (squash) April 27, 2026 00:22
@Yicong-Huang Yicong-Huang force-pushed the fix/reconfiguration-rpc-wiring branch 2 times, most recently from 066d379 to 2cd1167 Compare April 27, 2026 03:50
Follow-up to apache#4220, restoring the full reconfiguration flow end-to-end.
Three coupled gaps:

**1. Web-service entrypoint never dispatched.**
`ExecutionReconfigurationService.performReconfigurationOnResume` was
still throwing `"reconfiguration is tentatively disabled."` and the body
that calls `controllerInterface.reconfigureWorkflow` was commented out.
Restored, adapted to the current proto shape — `UpdateExecutorRequest`
now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any
boxing — and resets `ExecutionReconfigurationStore` with a fresh
`currentReconfigId` after dispatch. `StateTransferFunc` is dropped
because the new request schema doesn't carry it.

**2. Engine never reported per-worker completion.**
`ReconfigurationHandler` collected the worker `updateExecutor` futures
but only returned `EmptyReturn` when they all finished — no
`UpdateExecutorCompleted(worker)` events were ever sent to the client.
Without those, `ExecutionReconfigurationService.completedReconfigurations`
stayed empty, the diff handler never fired, and the frontend never saw
`ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with
`sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component
branches, and the web-service callback is re-enabled. The multi-op
branch is also restructured so per-worker futures are added with `++=`
instead of the original `+=` that put an `Iterable` into the
`ArrayBuffer` as a single element.

**3. ECM-embedded ControlInvocation reply was misrouted.**
When a `ControlInvocation` travels in-band inside an ECM along a data
channel between two workers, the receiving worker was replying along
the (swapped) data channel — i.e. back to the upstream worker — rather
than to the actor that originated the call (the controller). The
upstream worker logged a "received unknown ControlReturn" warning and
dropped the reply, so the controller's `Future.collect` on the
per-worker futures never resolved. This was masked before because the
old multi-op `+=` bug effectively ignored those futures; with #2 above
fixing the collection, the routing bug surfaced and hung
`ReconfigurationSpec`'s source-propagation test for 2h41m before CI
killed it.

Both worker implementations now use `command.context.sender / .receiver`
— set at invocation time by `mkContext` — instead of the network
channel's from/to. For normal RPC over a control channel they're
equivalent, so the non-ECM path is unaffected.

ReconfigurationSpec's bare `Await.result(...)` calls also gain
1-minute timeouts so future hangs surface in a bounded window instead
of running out the GHA 6-hour limit.

### Tests

`ExecutionReconfigurationServiceSpec` (new) covers:
- empty pending list → no dispatch, store unchanged;
- non-empty list → one dispatch carrying the right
  `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh
  `currentReconfigId`;
- consecutive resumes get distinct `reconfigurationId`s;
- worker completion (`onWorkerReconfigured`) updates
  `completedReconfigurations` with Set semantics (idempotent on
  duplicates).

The test uses three protected seams (`dispatch`,
`registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so
the service can be constructed without a live `AmberClient` or
`Workflow`.

End-to-end engine path is covered by `ReconfigurationSpec` from apache#4220;
all five tests including the source-propagation case now pass locally
in ~1 minute.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Yicong-Huang Yicong-Huang force-pushed the fix/reconfiguration-rpc-wiring branch from 2cd1167 to 8727c03 Compare April 27, 2026 03:58
@Yicong-Huang Yicong-Huang merged commit 23901be into apache:main Apr 27, 2026
11 checks passed
@aglinxinyuan
Copy link
Copy Markdown
Contributor

The ReconfigurationSpec fails frequently. I'm not sure if it's due to this PR or #4220.

@Yicong-Huang
Copy link
Copy Markdown
Contributor Author

Yicong-Huang commented Apr 28, 2026

yeah I am looking into it. it is indeed flaky

created an issue too track it #4545

Yicong-Huang added a commit that referenced this pull request Apr 28, 2026
…4546)

### What changes were proposed in this PR?

Marks `ReconfigurationSpec`'s `"Engine should propagate reconfiguration
through a source operator in workflow"` case as `ignore` to unblock CI.
The test consistently hangs at the 1-minute `Await` because the UDF
stops making progress after processing the `EndChannel` ECM in the
multi-worker (Python source -> Python UDF) propagation path.

The other four cases in the spec (single-op python UDF reconfigure, java
operator reconfigure, source-as-target rejection, two-UDF chain) still
run and pass.

A code comment is left at the test pointing at the symptom and the
condition for re-enabling.

### Any related issues, documentation, discussions?

Put out the fire in #4545. Investigation is in parallel.

Follow-up to #4220 (which added the test) and #4531 (which restored the
web-service entrypoint but did not fix this hang).

### How was this PR tested?

`sbt "WorkflowExecutionService/testOnly *ReconfigurationSpec"` locally —
4 cases pass, the 5th is now skipped (was previously timing out at 1
min).

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants