Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions apps/sim/executor/execution/edge-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,176 @@ describe('EdgeManager', () => {
})
})

describe('Multiple error ports to same target', () => {
it('should mark target ready when one source errors and another succeeds', () => {
// This tests the case where a node has multiple incoming error edges
// from different sources. When one source errors (activating its error edge)
// and another source succeeds (deactivating its error edge), the target
// should become ready after both sources complete.
//
// Workflow 1 (errors) ─── error ───┐
// ├──→ Error Handler
// Workflow 7 (succeeds) ─ error ───┘

const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'

const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])

const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])

const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)

// Workflow 1 errors first - error edge activates
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
error: 'Something went wrong',
})
// Error handler should NOT be ready yet (waiting for workflow 7)
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)

// Workflow 7 succeeds - error edge deactivates
const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
result: 'success',
})
// Error handler SHOULD be ready now (workflow 1's error edge activated)
expect(readyAfterWorkflow7).toContain(errorHandlerId)
})

it('should mark target ready when first source succeeds then second errors', () => {
// Opposite order: first source succeeds, then second errors

const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'

const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])

const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])

const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)

// Workflow 1 succeeds first - error edge deactivates
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
result: 'success',
})
// Error handler should NOT be ready yet (waiting for workflow 7)
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)

// Workflow 7 errors - error edge activates
const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
error: 'Something went wrong',
})
// Error handler SHOULD be ready now (workflow 7's error edge activated)
expect(readyAfterWorkflow7).toContain(errorHandlerId)
})

it('should NOT mark target ready when all sources succeed (no errors)', () => {
// When neither source errors, the error handler should NOT run

const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'

const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])

const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])

const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)

// Both workflows succeed - both error edges deactivate
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
result: 'success',
})
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)

const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
result: 'success',
})
// Error handler should NOT be ready (no errors occurred)
expect(readyAfterWorkflow7).not.toContain(errorHandlerId)
})

it('should mark target ready when both sources error', () => {
// When both sources error, the error handler should run

const workflow1Id = 'workflow-1'
const workflow7Id = 'workflow-7'
const errorHandlerId = 'error-handler'

const workflow1Node = createMockNode(workflow1Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const workflow7Node = createMockNode(workflow7Id, [
{ target: errorHandlerId, sourceHandle: 'error' },
])

const errorHandlerNode = createMockNode(errorHandlerId, [], [workflow1Id, workflow7Id])

const nodes = new Map<string, DAGNode>([
[workflow1Id, workflow1Node],
[workflow7Id, workflow7Node],
[errorHandlerId, errorHandlerNode],
])

const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)

// Workflow 1 errors
const readyAfterWorkflow1 = edgeManager.processOutgoingEdges(workflow1Node, {
error: 'Error 1',
})
expect(readyAfterWorkflow1).not.toContain(errorHandlerId)

// Workflow 7 errors
const readyAfterWorkflow7 = edgeManager.processOutgoingEdges(workflow7Node, {
error: 'Error 2',
})
// Error handler SHOULD be ready (both edges activated)
expect(readyAfterWorkflow7).toContain(errorHandlerId)
})
})

describe('Chained conditions', () => {
it('should handle sequential conditions (condition1 → condition2)', () => {
const condition1Id = 'condition-1'
Expand Down
29 changes: 28 additions & 1 deletion apps/sim/executor/execution/edge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const logger = createLogger('EdgeManager')

export class EdgeManager {
private deactivatedEdges = new Set<string>()
private nodesWithActivatedEdge = new Set<string>()

constructor(private dag: DAG) {}

Expand Down Expand Up @@ -35,6 +36,11 @@ export class EdgeManager {
activatedTargets.push(edge.target)
}

// Track nodes that have received at least one activated edge
for (const targetId of activatedTargets) {
this.nodesWithActivatedEdge.add(targetId)
}

const cascadeTargets = new Set<string>()
for (const { target, handle } of edgesToDeactivate) {
this.deactivateEdgeAndDescendants(node.id, target, handle, cascadeTargets)
Expand Down Expand Up @@ -71,6 +77,18 @@ export class EdgeManager {
}
}

// Check if any deactivation targets that previously received an activated edge are now ready
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
}
}

return readyNodes
}

Expand All @@ -90,6 +108,7 @@ export class EdgeManager {

clearDeactivatedEdges(): void {
this.deactivatedEdges.clear()
this.nodesWithActivatedEdge.clear()
}

/**
Expand All @@ -108,6 +127,10 @@ export class EdgeManager {
for (const edgeKey of edgesToRemove) {
this.deactivatedEdges.delete(edgeKey)
}
// Also clear activated edge tracking for these nodes
for (const nodeId of nodeIds) {
this.nodesWithActivatedEdge.delete(nodeId)
}
}

private isTargetReady(targetId: string): boolean {
Expand Down Expand Up @@ -210,7 +233,11 @@ export class EdgeManager {
cascadeTargets?.add(targetId)
}

if (this.hasActiveIncomingEdges(targetNode, edgeKey)) {
// Don't cascade if node has active incoming edges OR has received an activated edge
if (
this.hasActiveIncomingEdges(targetNode, edgeKey) ||
this.nodesWithActivatedEdge.has(targetId)
) {
return
}

Expand Down