Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions amber/src/main/python/core/architecture/rpc/async_rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,23 @@ def receive(self, from_: ChannelIdentity, control_invocation: ControlInvocation)
if self._no_reply_needed(control_invocation.command_id):
return

# Reply to the sender.
target_channel_id = ChannelIdentity(
from_.to_worker_id, from_.from_worker_id, True
)
# Reply to the actor that originated this ControlInvocation, identified
# by control_invocation.context.sender. For a normal RPC over a
# control channel this matches `from_.from_worker_id`; for an
# invocation carried in-band by an ECM along a data channel, `from_`
# is the data channel between two workers and the original sender
# lives only in the invocation's context.
# When the context is unset (e.g. unit-test inputs that construct
# ControlInvocation directly), fall back to swapping `from_`.
ctx = control_invocation.context
if ctx.sender.name and ctx.receiver.name:
target_channel_id = ChannelIdentity(
ctx.receiver, ctx.sender, is_control=True
)
else:
target_channel_id = ChannelIdentity(
from_.to_worker_id, from_.from_worker_id, is_control=True
)
logger.debug(
f"PYTHON returns a ReturnInvocation {payload}, replying the command"
f" {command}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers

import com.twitter.util.Future
import org.apache.texera.amber.core.virtualidentity.{
ActorVirtualIdentity,
ChannelIdentity,
EmbeddedControlMessageIdentity
}
import org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
UpdateExecutorCompleted
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
ControlInvocation,
WorkflowReconfigureRequest
}
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ControlReturn, EmptyReturn}
import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
import org.apache.texera.amber.util.VirtualIdentityUtils
Expand Down Expand Up @@ -64,7 +69,12 @@ trait ReconfigurationHandler {
.getLatestOperatorExecution(updateExecutorRequest.targetOpId)
.getWorkerIds
workerIds.foreach { worker =>
futures.append(workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker)))
futures.append(
notifyOnComplete(
workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker)),
worker
)
)
}
} else {
val channelScope = cp.workflowExecution.getRunningRegionExecutions
Expand All @@ -88,28 +98,29 @@ trait ReconfigurationHandler {
}
}
val finalScope = channelScope ++ controlChannels
val cmdMapping =
val workerCommands: Seq[(ActorVirtualIdentity, ControlInvocation, Future[ControlReturn])] =
friesComponent.reconfigurations.flatMap { updateReq =>
val workers =
cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds
workers.map(worker =>
worker.name -> createInvocation(
METHOD_UPDATE_EXECUTOR.getBareMethodName,
updateReq,
worker
)
)
}.toMap
futures += cmdMapping.map {
case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture
workers.map { worker =>
val (invocation, future) =
createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName, updateReq, worker)
(worker, invocation, future)
}
}.toSeq
val cmdMapping: Map[String, ControlInvocation] = workerCommands.map {
case (worker, invocation, _) => worker.name -> invocation
}.toMap
futures ++= workerCommands.map {
case (worker, _, future) => notifyOnComplete(future, worker)
}
friesComponent.sources.foreach { source =>
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { worker =>
sendECM(
EmbeddedControlMessageIdentity(msg.reconfigurationId),
ALL_ALIGNMENT,
finalScope.toSet,
cmdMapping.map(x => (x._1, x._2._1)),
cmdMapping,
ChannelIdentity(actorId, worker, isControl = true)
)
}
Expand All @@ -121,4 +132,10 @@ trait ReconfigurationHandler {
}
}

// After a worker's updateExecutor completes, notify the client so the
// ExecutionReconfigurationService can advance completedReconfigurations
// and emit ModifyLogicCompletedEvent on the websocket.
private def notifyOnComplete[T](future: Future[T], worker: ActorVirtualIdentity): Future[T] =
future.onSuccess(_ => sendToClient(UpdateExecutorCompleted(worker)))

}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,16 @@ class DataProcessor(
// invoke the control command carried with the ECM
logger.info(s"process ECM from $channelId, id = ${ecm.id}, cmd = $command")
if (command.isDefined) {
asyncRPCServer.receive(command.get, channelId.fromWorkerId)
// The reply must go back to the actor that originated the invocation
// (recorded in command.context.sender), not to channelId.fromWorkerId.
// For ECM-embedded commands those differ: channelId is the data
// channel between two workers, while the originator is typically the
// controller. Fall back to the channel sender when the context is
// unset (e.g. unit-test inputs).
val ctx = command.get.context
val replyTo =
if (ctx.sender.name.nonEmpty) ctx.sender else channelId.fromWorkerId
asyncRPCServer.receive(command.get, replyTo)
}
// if this worker is not the final destination of the ECM, pass it downstream
val downstreamChannelsInScope = ecm.scope.filter(_.fromWorkerId == actorId).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

package org.apache.texera.web.service

import org.apache.texera.amber.engine.architecture.controller.Workflow
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.engine.architecture.controller.{UpdateExecutorCompleted, Workflow}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
UpdateExecutorRequest,
WorkflowReconfigureRequest
}
import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.web.SubscriptionManager
import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
Expand All @@ -28,8 +33,9 @@ import org.apache.texera.web.model.websocket.response.{
ModifyLogicCompletedEvent,
ModifyLogicResponse
}
import org.apache.texera.web.storage.ExecutionStateStore
import org.apache.texera.web.storage.{ExecutionReconfigurationStore, ExecutionStateStore}

import java.util.UUID
import scala.util.{Failure, Success}

class ExecutionReconfigurationService(
Expand All @@ -39,34 +45,11 @@ class ExecutionReconfigurationService(
) extends SubscriptionManager {

// monitors notification from the engine that a reconfiguration on a worker is completed
// client.registerCallback[UpdateExecutorCompleted]((evt: UpdateExecutorCompleted) => {
// stateStore.reconfigurationStore.updateState(old => {
// old.copy(completedReconfigurations = old.completedReconfigurations + evt.id)
// })
// })
registerWorkerCompletionCallback()

// monitors the reconfiguration state (completed workers) change,
// notifies the frontend when all workers of an operator complete reconfiguration
addSubscription(
stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) => {
if (
oldState.completedReconfigurations != newState.completedReconfigurations
&& oldState.currentReconfigId == newState.currentReconfigId
) {
val diff = newState.completedReconfigurations -- oldState.completedReconfigurations
val newlyCompletedOps = diff
.map(workerId => workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
.map(opId => opId.logicalOpId.id)
if (newlyCompletedOps.nonEmpty) {
List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
} else {
List()
}
} else {
List()
}
})
)
registerCompletionDiffHandler()

// handles reconfigure workflow logic from frontend
// validate the modify logic request and notifies the frontend
Expand Down Expand Up @@ -96,42 +79,77 @@ class ExecutionReconfigurationService(

// actually performs all reconfiguration requests the user made during pause
// sends ModifyLogic messages to operators and workers,
// there are two modes: transactional or non-transactional
// in the transactional mode, reconfigurations on multiple operators will be synchronized
// in the non-transaction mode, they are not synchronized, this is faster, but can lead to consistency issues
// for details, see the Fries reconfiguration paper
// see the Fries reconfiguration paper for the algorithm.
// Note: StateTransferFunc is currently not threaded through to the engine —
// the new UpdateExecutorRequest only carries (targetOpId, newOpExecInitInfo).
def performReconfigurationOnResume(): Unit = {
val reconfigurations = stateStore.reconfigurationStore.getState.unscheduledReconfigurations
if (reconfigurations.isEmpty) {
return
}
throw new RuntimeException("reconfiguration is tentatively disabled.")
// // schedule all pending reconfigurations to the engine
// val reconfigurationId = UUID.randomUUID().toString
// val modifyLogicReq = AmberModifyLogicRequest(reconfigurations.map {
// case (op, stateTransferFunc) =>
// val bytes = AmberRuntime.serde.serialize(op.opExecInitInfo).get
// val protoAny = Any.of(
// "org.apache.texera.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo",
// ByteString.copyFrom(bytes)
// )
// val stateTransferFuncOpt = stateTransferFunc.map { func =>
// val bytes = AmberRuntime.serde.serialize(func).get
// Any.of(
// "org.apache.texera.workflow.common.operators.StateTransferFunc",
// ByteString.copyFrom(bytes)
// )
// }
// UpdateExecutorRequest(op.id, protoAny, stateTransferFuncOpt)
// })
// client.controllerInterface.reconfigureWorkflow(
// WorkflowReconfigureRequest(modifyLogicReq, reconfigurationId),
// ()
// )
//
// // clear all un-scheduled reconfigurations, start a new reconfiguration ID
// stateStore.reconfigurationStore.updateState(_ =>
// ExecutionReconfigurationStore(Some(reconfigurationId))
// )

val reconfigurationId = UUID.randomUUID().toString
val updateExecutorRequests = reconfigurations.map {
case (op, _) => UpdateExecutorRequest(op.id, op.opExecInitInfo)
}
dispatch(
WorkflowReconfigureRequest(
reconfiguration = updateExecutorRequests,
reconfigurationId = reconfigurationId
)
)

// clear all un-scheduled reconfigurations, start a new reconfiguration ID
stateStore.reconfigurationStore.updateState(_ =>
ExecutionReconfigurationStore(currentReconfigId = Some(reconfigurationId))
)
}

// Seam for unit testing the dispatch path without spinning up an AmberClient.
protected def dispatch(request: WorkflowReconfigureRequest): Unit = {
client.controllerInterface.reconfigureWorkflow(request, ())
}

// Seam for unit testing — production wires the engine's UpdateExecutorCompleted
// events into the reconfiguration store so the diff handler above can fire
// ModifyLogicCompletedEvent for the frontend.
protected def registerWorkerCompletionCallback(): Unit = {
client.registerCallback[UpdateExecutorCompleted]((evt: UpdateExecutorCompleted) => {
onWorkerReconfigured(evt.id)
})
}

// Exposed (instead of inlined in the callback) so tests can drive the
// completion path directly.
private[service] def onWorkerReconfigured(worker: ActorVirtualIdentity): Unit = {
stateStore.reconfigurationStore.updateState(old =>
old.copy(completedReconfigurations = old.completedReconfigurations + worker)
)
}

// Seam for unit testing — the diff handler dereferences workflow.physicalPlan
// to map worker → logical op, which makes constructing a service in tests
// require a full Workflow. Tests override to no-op.
protected def registerCompletionDiffHandler(): Unit = {
addSubscription(
stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) => {
if (
oldState.completedReconfigurations != newState.completedReconfigurations
&& oldState.currentReconfigId == newState.currentReconfigId
) {
val diff = newState.completedReconfigurations -- oldState.completedReconfigurations
val newlyCompletedOps = diff
.map(workerId => workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
.map(opId => opId.logicalOpId.id)
if (newlyCompletedOps.nonEmpty) {
List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
} else {
List()
}
} else {
List()
}
})
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,15 @@ class ReconfigurationSpec
completion.setDone()
}
})
Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
Await.result(
client.controllerInterface.startWorkflow(EmptyRequest(), ()),
Duration.fromSeconds(5)
)
val pausedReached = stateReached(client, PAUSED)
Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
Await.result(
client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
Duration.fromSeconds(5)
)
Await.result(pausedReached, Duration.fromSeconds(10))
val physicalOps = targetOps.flatMap(op =>
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
Expand All @@ -164,9 +170,13 @@ class ReconfigurationSpec
reconfigurationId = "test-reconfigure-1"
),
()
)
),
Duration.fromSeconds(5)
)
Await.result(
client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
Duration.fromSeconds(5)
)
Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
Await.result(completion, Duration.fromMinutes(1))
result
}
Expand Down
Loading
Loading