From 8727c03284e26c49afcb07cd5ddab5868c179cdf Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 26 Apr 2026 20:48:21 -0700 Subject: [PATCH] fix(amber): wire ExecutionReconfigurationService back to the engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #4220, restoring the full reconfiguration flow end-to-end. Three coupled gaps: **1. Web-service entrypoint never dispatched.** `ExecutionReconfigurationService.performReconfigurationOnResume` was still throwing `"reconfiguration is tentatively disabled."` and the body that calls `controllerInterface.reconfigureWorkflow` was commented out. Restored, adapted to the current proto shape — `UpdateExecutorRequest` now carries `(targetOpId, newExecInitInfo)` directly, no proto-Any boxing — and resets `ExecutionReconfigurationStore` with a fresh `currentReconfigId` after dispatch. `StateTransferFunc` is dropped because the new request schema doesn't carry it. **2. Engine never reported per-worker completion.** `ReconfigurationHandler` collected the worker `updateExecutor` futures but only returned `EmptyReturn` when they all finished — no `UpdateExecutorCompleted(worker)` events were ever sent to the client. Without those, `ExecutionReconfigurationService.completedReconfigurations` stayed empty, the diff handler never fired, and the frontend never saw `ModifyLogicCompletedEvent`. Each per-worker future is now wrapped with `sendToClient(UpdateExecutorCompleted(worker))` in both Fries-component branches, and the web-service callback is re-enabled. The multi-op branch is also restructured so per-worker futures are added with `++=` instead of the original `+=` that put an `Iterable` into the `ArrayBuffer` as a single element. **3. ECM-embedded ControlInvocation reply was misrouted.** When a `ControlInvocation` travels in-band inside an ECM along a data channel between two workers, the receiving worker was replying along the (swapped) data channel — i.e. back to the upstream worker — rather than to the actor that originated the call (the controller). The upstream worker logged a "received unknown ControlReturn" warning and dropped the reply, so the controller's `Future.collect` on the per-worker futures never resolved. This was masked before because the old multi-op `+=` bug effectively ignored those futures; with #2 above fixing the collection, the routing bug surfaced and hung `ReconfigurationSpec`'s source-propagation test for 2h41m before CI killed it. Both worker implementations now use `command.context.sender / .receiver` — set at invocation time by `mkContext` — instead of the network channel's from/to. For normal RPC over a control channel they're equivalent, so the non-ECM path is unaffected. ReconfigurationSpec's bare `Await.result(...)` calls also gain 1-minute timeouts so future hangs surface in a bounded window instead of running out the GHA 6-hour limit. ### Tests `ExecutionReconfigurationServiceSpec` (new) covers: - empty pending list → no dispatch, store unchanged; - non-empty list → one dispatch carrying the right `(targetOpId, newExecInitInfo)` pairs, store reset with a fresh `currentReconfigId`; - consecutive resumes get distinct `reconfigurationId`s; - worker completion (`onWorkerReconfigured`) updates `completedReconfigurations` with Set semantics (idempotent on duplicates). The test uses three protected seams (`dispatch`, `registerWorkerCompletionCallback`, `registerCompletionDiffHandler`) so the service can be constructed without a live `AmberClient` or `Workflow`. End-to-end engine path is covered by `ReconfigurationSpec` from #4220; all five tests including the source-propagation case now pass locally in ~1 minute. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core/architecture/rpc/async_rpc_server.py | 21 ++- .../ReconfigurationHandler.scala | 47 ++++-- .../architecture/worker/DataProcessor.scala | 11 +- .../ExecutionReconfigurationService.scala | 136 +++++++++------- .../engine/e2e/ReconfigurationSpec.scala | 18 ++- .../ExecutionReconfigurationServiceSpec.scala | 149 ++++++++++++++++++ 6 files changed, 299 insertions(+), 83 deletions(-) create mode 100644 amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala diff --git a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py index d7763070301..49dc5f05472 100644 --- a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py +++ b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py @@ -121,10 +121,23 @@ def receive(self, from_: ChannelIdentity, control_invocation: ControlInvocation) if self._no_reply_needed(control_invocation.command_id): return - # Reply to the sender. - target_channel_id = ChannelIdentity( - from_.to_worker_id, from_.from_worker_id, True - ) + # Reply to the actor that originated this ControlInvocation, identified + # by control_invocation.context.sender. For a normal RPC over a + # control channel this matches `from_.from_worker_id`; for an + # invocation carried in-band by an ECM along a data channel, `from_` + # is the data channel between two workers and the original sender + # lives only in the invocation's context. + # When the context is unset (e.g. unit-test inputs that construct + # ControlInvocation directly), fall back to swapping `from_`. + ctx = control_invocation.context + if ctx.sender.name and ctx.receiver.name: + target_channel_id = ChannelIdentity( + ctx.receiver, ctx.sender, is_control=True + ) + else: + target_channel_id = ChannelIdentity( + from_.to_worker_id, from_.from_worker_id, is_control=True + ) logger.debug( f"PYTHON returns a ReturnInvocation {payload}, replying the command" f" {command}" diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala index 210d7c5b98c..7653f873c13 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala @@ -21,16 +21,21 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers import com.twitter.util.Future import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity } -import org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerAsyncRPCHandlerInitializer, + UpdateExecutorCompleted +} import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, + ControlInvocation, WorkflowReconfigureRequest } -import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ControlReturn, EmptyReturn} import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.amber.util.VirtualIdentityUtils @@ -64,7 +69,12 @@ trait ReconfigurationHandler { .getLatestOperatorExecution(updateExecutorRequest.targetOpId) .getWorkerIds workerIds.foreach { worker => - futures.append(workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker))) + futures.append( + notifyOnComplete( + workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker)), + worker + ) + ) } } else { val channelScope = cp.workflowExecution.getRunningRegionExecutions @@ -88,20 +98,21 @@ trait ReconfigurationHandler { } } val finalScope = channelScope ++ controlChannels - val cmdMapping = + val workerCommands: Seq[(ActorVirtualIdentity, ControlInvocation, Future[ControlReturn])] = friesComponent.reconfigurations.flatMap { updateReq => val workers = cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds - workers.map(worker => - worker.name -> createInvocation( - METHOD_UPDATE_EXECUTOR.getBareMethodName, - updateReq, - worker - ) - ) - }.toMap - futures += cmdMapping.map { - case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture + workers.map { worker => + val (invocation, future) = + createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName, updateReq, worker) + (worker, invocation, future) + } + }.toSeq + val cmdMapping: Map[String, ControlInvocation] = workerCommands.map { + case (worker, invocation, _) => worker.name -> invocation + }.toMap + futures ++= workerCommands.map { + case (worker, _, future) => notifyOnComplete(future, worker) } friesComponent.sources.foreach { source => cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { worker => @@ -109,7 +120,7 @@ trait ReconfigurationHandler { EmbeddedControlMessageIdentity(msg.reconfigurationId), ALL_ALIGNMENT, finalScope.toSet, - cmdMapping.map(x => (x._1, x._2._1)), + cmdMapping, ChannelIdentity(actorId, worker, isControl = true) ) } @@ -121,4 +132,10 @@ trait ReconfigurationHandler { } } + // After a worker's updateExecutor completes, notify the client so the + // ExecutionReconfigurationService can advance completedReconfigurations + // and emit ModifyLogicCompletedEvent on the websocket. + private def notifyOnComplete[T](future: Future[T], worker: ActorVirtualIdentity): Future[T] = + future.onSuccess(_ => sendToClient(UpdateExecutorCompleted(worker))) + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 3aa5fa90a46..84f1e8ec659 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -242,7 +242,16 @@ class DataProcessor( // invoke the control command carried with the ECM logger.info(s"process ECM from $channelId, id = ${ecm.id}, cmd = $command") if (command.isDefined) { - asyncRPCServer.receive(command.get, channelId.fromWorkerId) + // The reply must go back to the actor that originated the invocation + // (recorded in command.context.sender), not to channelId.fromWorkerId. + // For ECM-embedded commands those differ: channelId is the data + // channel between two workers, while the originator is typically the + // controller. Fall back to the channel sender when the context is + // unset (e.g. unit-test inputs). + val ctx = command.get.context + val replyTo = + if (ctx.sender.name.nonEmpty) ctx.sender else channelId.fromWorkerId + asyncRPCServer.receive(command.get, replyTo) } // if this worker is not the final destination of the ECM, pass it downstream val downstreamChannelsInScope = ecm.scope.filter(_.fromWorkerId == actorId).toSet diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala index e5867277fc9..e7617fdfe16 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala @@ -19,7 +19,12 @@ package org.apache.texera.web.service -import org.apache.texera.amber.engine.architecture.controller.Workflow +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.engine.architecture.controller.{UpdateExecutorCompleted, Workflow} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + UpdateExecutorRequest, + WorkflowReconfigureRequest +} import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.web.SubscriptionManager import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent @@ -28,8 +33,9 @@ import org.apache.texera.web.model.websocket.response.{ ModifyLogicCompletedEvent, ModifyLogicResponse } -import org.apache.texera.web.storage.ExecutionStateStore +import org.apache.texera.web.storage.{ExecutionReconfigurationStore, ExecutionStateStore} +import java.util.UUID import scala.util.{Failure, Success} class ExecutionReconfigurationService( @@ -39,34 +45,11 @@ class ExecutionReconfigurationService( ) extends SubscriptionManager { // monitors notification from the engine that a reconfiguration on a worker is completed - // client.registerCallback[UpdateExecutorCompleted]((evt: UpdateExecutorCompleted) => { - // stateStore.reconfigurationStore.updateState(old => { - // old.copy(completedReconfigurations = old.completedReconfigurations + evt.id) - // }) - // }) + registerWorkerCompletionCallback() // monitors the reconfiguration state (completed workers) change, // notifies the frontend when all workers of an operator complete reconfiguration - addSubscription( - stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) => { - if ( - oldState.completedReconfigurations != newState.completedReconfigurations - && oldState.currentReconfigId == newState.currentReconfigId - ) { - val diff = newState.completedReconfigurations -- oldState.completedReconfigurations - val newlyCompletedOps = diff - .map(workerId => workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id) - .map(opId => opId.logicalOpId.id) - if (newlyCompletedOps.nonEmpty) { - List(ModifyLogicCompletedEvent(newlyCompletedOps.toList)) - } else { - List() - } - } else { - List() - } - }) - ) + registerCompletionDiffHandler() // handles reconfigure workflow logic from frontend // validate the modify logic request and notifies the frontend @@ -96,42 +79,77 @@ class ExecutionReconfigurationService( // actually performs all reconfiguration requests the user made during pause // sends ModifyLogic messages to operators and workers, - // there are two modes: transactional or non-transactional - // in the transactional mode, reconfigurations on multiple operators will be synchronized - // in the non-transaction mode, they are not synchronized, this is faster, but can lead to consistency issues - // for details, see the Fries reconfiguration paper + // see the Fries reconfiguration paper for the algorithm. + // Note: StateTransferFunc is currently not threaded through to the engine — + // the new UpdateExecutorRequest only carries (targetOpId, newOpExecInitInfo). def performReconfigurationOnResume(): Unit = { val reconfigurations = stateStore.reconfigurationStore.getState.unscheduledReconfigurations if (reconfigurations.isEmpty) { return } - throw new RuntimeException("reconfiguration is tentatively disabled.") - // // schedule all pending reconfigurations to the engine - // val reconfigurationId = UUID.randomUUID().toString - // val modifyLogicReq = AmberModifyLogicRequest(reconfigurations.map { - // case (op, stateTransferFunc) => - // val bytes = AmberRuntime.serde.serialize(op.opExecInitInfo).get - // val protoAny = Any.of( - // "org.apache.texera.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo", - // ByteString.copyFrom(bytes) - // ) - // val stateTransferFuncOpt = stateTransferFunc.map { func => - // val bytes = AmberRuntime.serde.serialize(func).get - // Any.of( - // "org.apache.texera.workflow.common.operators.StateTransferFunc", - // ByteString.copyFrom(bytes) - // ) - // } - // UpdateExecutorRequest(op.id, protoAny, stateTransferFuncOpt) - // }) - // client.controllerInterface.reconfigureWorkflow( - // WorkflowReconfigureRequest(modifyLogicReq, reconfigurationId), - // () - // ) - // - // // clear all un-scheduled reconfigurations, start a new reconfiguration ID - // stateStore.reconfigurationStore.updateState(_ => - // ExecutionReconfigurationStore(Some(reconfigurationId)) - // ) + + val reconfigurationId = UUID.randomUUID().toString + val updateExecutorRequests = reconfigurations.map { + case (op, _) => UpdateExecutorRequest(op.id, op.opExecInitInfo) + } + dispatch( + WorkflowReconfigureRequest( + reconfiguration = updateExecutorRequests, + reconfigurationId = reconfigurationId + ) + ) + + // clear all un-scheduled reconfigurations, start a new reconfiguration ID + stateStore.reconfigurationStore.updateState(_ => + ExecutionReconfigurationStore(currentReconfigId = Some(reconfigurationId)) + ) + } + + // Seam for unit testing the dispatch path without spinning up an AmberClient. + protected def dispatch(request: WorkflowReconfigureRequest): Unit = { + client.controllerInterface.reconfigureWorkflow(request, ()) + } + + // Seam for unit testing — production wires the engine's UpdateExecutorCompleted + // events into the reconfiguration store so the diff handler above can fire + // ModifyLogicCompletedEvent for the frontend. + protected def registerWorkerCompletionCallback(): Unit = { + client.registerCallback[UpdateExecutorCompleted]((evt: UpdateExecutorCompleted) => { + onWorkerReconfigured(evt.id) + }) + } + + // Exposed (instead of inlined in the callback) so tests can drive the + // completion path directly. + private[service] def onWorkerReconfigured(worker: ActorVirtualIdentity): Unit = { + stateStore.reconfigurationStore.updateState(old => + old.copy(completedReconfigurations = old.completedReconfigurations + worker) + ) + } + + // Seam for unit testing — the diff handler dereferences workflow.physicalPlan + // to map worker → logical op, which makes constructing a service in tests + // require a full Workflow. Tests override to no-op. + protected def registerCompletionDiffHandler(): Unit = { + addSubscription( + stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) => { + if ( + oldState.completedReconfigurations != newState.completedReconfigurations + && oldState.currentReconfigId == newState.currentReconfigId + ) { + val diff = newState.completedReconfigurations -- oldState.completedReconfigurations + val newlyCompletedOps = diff + .map(workerId => workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id) + .map(opId => opId.logicalOpId.id) + if (newlyCompletedOps.nonEmpty) { + List(ModifyLogicCompletedEvent(newlyCompletedOps.toList)) + } else { + List() + } + } else { + List() + } + }) + ) } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala index 6f344caae3a..92dfba19de1 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala @@ -150,9 +150,15 @@ class ReconfigurationSpec completion.setDone() } }) - Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) + Await.result( + client.controllerInterface.startWorkflow(EmptyRequest(), ()), + Duration.fromSeconds(5) + ) val pausedReached = stateReached(client, PAUSED) - Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ())) + Await.result( + client.controllerInterface.pauseWorkflow(EmptyRequest(), ()), + Duration.fromSeconds(5) + ) Await.result(pausedReached, Duration.fromSeconds(10)) val physicalOps = targetOps.flatMap(op => workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier) @@ -164,9 +170,13 @@ class ReconfigurationSpec reconfigurationId = "test-reconfigure-1" ), () - ) + ), + Duration.fromSeconds(5) + ) + Await.result( + client.controllerInterface.resumeWorkflow(EmptyRequest(), ()), + Duration.fromSeconds(5) ) - Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ())) Await.result(completion, Duration.fromMinutes(1)) result } diff --git a/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala new file mode 100644 index 00000000000..974db13286d --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ExecutionIdentity, + OperatorIdentity, + PhysicalOpIdentity, + WorkflowIdentity +} +import org.apache.texera.amber.core.workflow.PhysicalOp +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.WorkflowReconfigureRequest +import org.apache.texera.web.storage.{ExecutionReconfigurationStore, ExecutionStateStore} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.collection.mutable.ArrayBuffer + +/** + * Web-service-layer tests for ExecutionReconfigurationService. + * + * The end-to-end engine path (reconfigureWorkflow → Fries algorithm → + * UpdateExecutor on workers) is covered by ReconfigurationSpec. + * This spec focuses on the wiring inside performReconfigurationOnResume: + * empty short-circuit, request construction, and store reset semantics. + */ +class ExecutionReconfigurationServiceSpec extends AnyFlatSpec with Matchers { + + private def mkPhysicalOp(name: String): PhysicalOp = + PhysicalOp( + id = PhysicalOpIdentity(OperatorIdentity(name), "main"), + workflowId = WorkflowIdentity(0L), + executionId = ExecutionIdentity(0L), + opExecInitInfo = OpExecWithClassName(s"$name.Class", "") + ) + + /** Service variant that records dispatched requests and skips the AmberClient + * registration / workflow-dependent diff handler so it can be constructed + * without a live engine. + */ + private class RecordingService(stateStore: ExecutionStateStore) + extends ExecutionReconfigurationService(client = null, stateStore, workflow = null) { + val captured: ArrayBuffer[WorkflowReconfigureRequest] = ArrayBuffer.empty + override protected def dispatch(request: WorkflowReconfigureRequest): Unit = + captured += request + override protected def registerWorkerCompletionCallback(): Unit = () + override protected def registerCompletionDiffHandler(): Unit = () + } + + "performReconfigurationOnResume" should + "return without dispatching when no reconfigurations are pending" in { + val stateStore = new ExecutionStateStore() + val service = new RecordingService(stateStore) + + noException should be thrownBy service.performReconfigurationOnResume() + + service.captured shouldBe empty + val state = stateStore.reconfigurationStore.getState + state.unscheduledReconfigurations shouldBe empty + state.currentReconfigId shouldBe None + state.completedReconfigurations shouldBe empty + } + + it should "dispatch one request carrying every pending reconfiguration and reset the store" in { + val stateStore = new ExecutionStateStore() + val service = new RecordingService(stateStore) + + val op1 = mkPhysicalOp("op-1") + val op2 = mkPhysicalOp("op-2") + stateStore.reconfigurationStore.updateState(_ => + ExecutionReconfigurationStore(unscheduledReconfigurations = List((op1, None), (op2, None))) + ) + + service.performReconfigurationOnResume() + + service.captured should have size 1 + val request = service.captured.head + request.reconfigurationId should not be empty + request.reconfiguration.map(_.targetOpId) should contain theSameElementsInOrderAs Seq( + op1.id, + op2.id + ) + request.reconfiguration.map(_.newExecInitInfo) should contain theSameElementsInOrderAs Seq( + op1.opExecInitInfo, + op2.opExecInitInfo + ) + + val state = stateStore.reconfigurationStore.getState + state.unscheduledReconfigurations shouldBe empty + state.currentReconfigId shouldBe Some(request.reconfigurationId) + state.completedReconfigurations shouldBe empty + } + + it should "use a fresh reconfigurationId on each dispatch" in { + val stateStore = new ExecutionStateStore() + val service = new RecordingService(stateStore) + + def queueAndDispatch(opName: String): String = { + stateStore.reconfigurationStore.updateState(old => + old.copy(unscheduledReconfigurations = List((mkPhysicalOp(opName), None))) + ) + service.performReconfigurationOnResume() + service.captured.last.reconfigurationId + } + + val firstId = queueAndDispatch("op-a") + val secondId = queueAndDispatch("op-b") + + firstId should not be secondId + stateStore.reconfigurationStore.getState.currentReconfigId shouldBe Some(secondId) + } + + "onWorkerReconfigured" should + "add the worker id to completedReconfigurations so the diff handler can fire" in { + val stateStore = new ExecutionStateStore() + val service = new RecordingService(stateStore) + + val w1 = ActorVirtualIdentity("Worker:WF1-E1-op-main-0") + val w2 = ActorVirtualIdentity("Worker:WF1-E1-op-main-1") + service.onWorkerReconfigured(w1) + service.onWorkerReconfigured(w2) + // duplicate completion is idempotent (Set semantics). + service.onWorkerReconfigured(w1) + + stateStore.reconfigurationStore.getState.completedReconfigurations should contain theSameElementsAs Set( + w1, + w2 + ) + } +}