Skip to content

Commit 5fe852d

Browse files
cancel workflows
1 parent 5350913 commit 5fe852d

File tree

10 files changed

+278
-248
lines changed

10 files changed

+278
-248
lines changed

examples/workflows.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,20 +243,23 @@ async def execute(
243243
# ]
244244
# }'
245245
#
246-
# 2. Subscribe to SSE events (use workflow_id from step 1):
247-
# curl -N http://localhost:8111/events/<workflow_id>/sse
246+
# 2. Subscribe to SSE events (use workflow_run_id from step 1):
247+
# curl -N http://localhost:8111/events/<workflow_run_id>/sse
248248
#
249249
# 3. Send event to workflow:
250-
# curl -X POST http://localhost:8111/onboarding/<workflow_id>/event \
250+
# curl -X POST http://localhost:8111/onboarding/<workflow_run_id>/event \
251251
# -H "Content-Type: application/json" \
252-
# -d '{"type": "user_input", "value": "John Doe", "workflow_id": "<id>"}'
252+
# -d '{"type": "user_input", "value": "John Doe", "workflow_run_id": "<id>"}'
253253
#
254254
# 4. Check workflow status:
255-
# curl http://localhost:8111/onboarding/<workflow_id>
255+
# curl http://localhost:8111/onboarding/<workflow_run_id>
256256
#
257-
# 5. If service restarts, workflow resumes from current block automatically
257+
# 5. Cancel a running workflow:
258+
# curl -X POST http://localhost:8111/onboarding/<workflow_run_id>/cancel
258259
#
259-
# 6. Start email monitor workflow with plan function:
260+
# 6. If service restarts, workflow resumes from current block automatically
261+
#
262+
# 7. Start email monitor workflow with plan function:
260263
# curl -X POST http://localhost:8111/email_monitor \
261264
# -H "Content-Type: application/json" \
262265
# -d '{

fastloop/exceptions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,15 @@ class WorkflowNotFoundError(Exception):
6969
class WorkflowMaxRetriesError(Exception):
7070
def __init__(
7171
self,
72-
workflow_id: str,
72+
workflow_run_id: str,
7373
block_index: int,
7474
attempts: int,
7575
last_error: str | None = None,
7676
):
77-
self.workflow_id = workflow_id
77+
self.workflow_run_id = workflow_run_id
7878
self.block_index = block_index
7979
self.attempts = attempts
8080
self.last_error = last_error
8181
super().__init__(
82-
f"Workflow {workflow_id} block {block_index} failed after {attempts} attempts"
82+
f"Workflow run {workflow_run_id} block {block_index} failed after {attempts} attempts"
8383
)

fastloop/fastloop.py

Lines changed: 66 additions & 59 deletions
Large diffs are not rendered by default.

fastloop/loop.py

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class WorkflowBlock(BaseModel):
159159
class WorkflowState:
160160
"""Persisted state for a running workflow."""
161161

162-
workflow_id: str
162+
workflow_run_id: str
163163
workflow_name: str | None = None
164164
created_at: float = field(default_factory=lambda: datetime.now().timestamp())
165165
status: LoopStatus = LoopStatus.PENDING
@@ -499,30 +499,30 @@ def __init__(self, state_manager: StateManager):
499499
self.state_manager = state_manager
500500

501501
async def _persist_block_attempt(
502-
self, workflow_id: str, idx: int, error: str | None = None
502+
self, workflow_run_id: str, idx: int, error: str | None = None
503503
) -> int:
504-
workflow = await self.state_manager.get_workflow(workflow_id)
504+
workflow = await self.state_manager.get_workflow(workflow_run_id)
505505
attempts = workflow.block_attempts.get(idx, 0) + 1
506506
workflow.block_attempts[idx] = attempts
507507
workflow.last_error = error
508-
await self.state_manager.update_workflow(workflow_id, workflow)
508+
await self.state_manager.update_workflow(workflow_run_id, workflow)
509509
return attempts
510510

511511
async def _mark_block_completed(
512-
self, workflow_id: str, idx: int, next_idx: int, payload: dict | None
512+
self, workflow_run_id: str, idx: int, next_idx: int, payload: dict | None
513513
) -> None:
514-
workflow = await self.state_manager.get_workflow(workflow_id)
514+
workflow = await self.state_manager.get_workflow(workflow_run_id)
515515
if idx not in workflow.completed_blocks:
516516
workflow.completed_blocks.append(idx)
517517
workflow.current_block_index = next_idx
518518
workflow.next_payload = payload
519519
workflow.block_attempts.pop(idx, None)
520-
await self.state_manager.update_workflow(workflow_id, workflow)
520+
await self.state_manager.update_workflow(workflow_run_id, workflow)
521521

522522
async def _apply_plan(
523523
self,
524524
plan_result: BlockPlan | None,
525-
workflow_id: str,
525+
workflow_run_id: str,
526526
idx: int,
527527
blocks: list[WorkflowBlock],
528528
) -> tuple[int | None, float | None]:
@@ -540,7 +540,7 @@ async def _apply_plan(
540540
logger.warning(
541541
"Plan returned invalid block index, using next sequential",
542542
extra={
543-
"workflow_id": workflow_id,
543+
"workflow_run_id": workflow_run_id,
544544
"requested_index": next_idx,
545545
"block_count": len(blocks),
546546
},
@@ -555,19 +555,21 @@ async def _apply_plan(
555555

556556
async def _schedule_delay(
557557
self,
558-
workflow_id: str,
558+
workflow_run_id: str,
559559
delay_seconds: float,
560560
block_output: Any,
561561
reason: str | None = None,
562562
) -> None:
563563
"""Schedule a workflow to resume after a delay using Redis scheduler."""
564564
wake_time = datetime.now().timestamp() + delay_seconds
565-
await self.state_manager.set_workflow_block_output(workflow_id, block_output)
566-
await self.state_manager.set_workflow_wake_time(workflow_id, wake_time)
565+
await self.state_manager.set_workflow_block_output(
566+
workflow_run_id, block_output
567+
)
568+
await self.state_manager.set_workflow_wake_time(workflow_run_id, wake_time)
567569
logger.info(
568570
"Workflow scheduled to resume",
569571
extra={
570-
"workflow_id": workflow_id,
572+
"workflow_run_id": workflow_run_id,
571573
"delay_seconds": delay_seconds,
572574
"reason": reason,
573575
},
@@ -578,17 +580,17 @@ async def _run(
578580
self,
579581
func: Callable[..., Any],
580582
context: Any,
581-
workflow_id: str,
583+
workflow_run_id: str,
582584
on_stop: Callable[..., Any] | None,
583585
on_block_complete: Callable[..., Any] | None,
584586
on_error: Callable[..., Any] | None,
585587
plan_func: Callable[..., Any] | None,
586588
retry_policy: RetryPolicy,
587589
) -> None:
588590
try:
589-
async with self.state_manager.with_workflow_claim(workflow_id):
591+
async with self.state_manager.with_workflow_claim(workflow_run_id):
590592
while True:
591-
workflow = await self.state_manager.get_workflow(workflow_id)
593+
workflow = await self.state_manager.get_workflow(workflow_run_id)
592594
if workflow.status in (LoopStatus.STOPPED, LoopStatus.FAILED):
593595
break
594596

@@ -600,7 +602,7 @@ async def _run(
600602

601603
if idx in workflow.completed_blocks:
602604
await self._mark_block_completed(
603-
workflow_id, idx, idx + 1, workflow.next_payload
605+
workflow_run_id, idx, idx + 1, workflow.next_payload
604606
)
605607
continue
606608

@@ -611,7 +613,9 @@ async def _run(
611613
context.current_block = current_block
612614
context.previous_payload = workflow.next_payload
613615
context.block_output = (
614-
await self.state_manager.get_workflow_block_output(workflow_id)
616+
await self.state_manager.get_workflow_block_output(
617+
workflow_run_id
618+
)
615619
)
616620

617621
try:
@@ -631,7 +635,7 @@ async def _run(
631635
logger.info(
632636
"Plan function returned result",
633637
extra={
634-
"workflow_id": workflow_id,
638+
"workflow_run_id": workflow_run_id,
635639
"block_index": idx,
636640
"plan": plan_result.to_dict()
637641
if isinstance(plan_result, BlockPlan)
@@ -640,7 +644,7 @@ async def _run(
640644
)
641645

642646
next_idx, delay = await self._apply_plan(
643-
plan_result, workflow_id, idx, blocks
647+
plan_result, workflow_run_id, idx, blocks
644648
)
645649

646650
if next_idx is None:
@@ -651,28 +655,28 @@ async def _run(
651655
# If staying on same block (retry/loop), don't mark completed
652656
if next_idx != idx:
653657
await self._mark_block_completed(
654-
workflow_id, idx, next_idx, None
658+
workflow_run_id, idx, next_idx, None
655659
)
656660
await _call(on_block_complete, context, current_block, None)
657661
else:
658662
# Staying on same block - just update the current index
659663
workflow = await self.state_manager.get_workflow(
660-
workflow_id
664+
workflow_run_id
661665
)
662666
workflow.current_block_index = next_idx
663667
await self.state_manager.update_workflow(
664-
workflow_id, workflow
668+
workflow_run_id, workflow
665669
)
666670

667671
if delay and delay > 0:
668672
reason = plan_result.reason if plan_result else None
669673
await self._schedule_delay(
670-
workflow_id, delay, block_output, reason
674+
workflow_run_id, delay, block_output, reason
671675
)
672676

673677
except WorkflowNextError as e:
674678
await self._mark_block_completed(
675-
workflow_id, idx, idx + 1, e.payload
679+
workflow_run_id, idx, idx + 1, e.payload
676680
)
677681
await _call(
678682
on_block_complete, context, current_block, e.payload
@@ -687,21 +691,21 @@ async def _run(
687691
logger.warning(
688692
"goto() called with invalid index, stopping",
689693
extra={
690-
"workflow_id": workflow_id,
694+
"workflow_run_id": workflow_run_id,
691695
"requested_index": next_idx,
692696
"block_count": len(blocks),
693697
},
694698
)
695699
raise LoopStoppedError() from None
696700

697701
await self._mark_block_completed(
698-
workflow_id, idx, next_idx, None
702+
workflow_run_id, idx, next_idx, None
699703
)
700704
await _call(on_block_complete, context, current_block, None)
701705

702706
if e.delay_seconds and e.delay_seconds > 0:
703707
await self._schedule_delay(
704-
workflow_id, e.delay_seconds, None, e.reason
708+
workflow_run_id, e.delay_seconds, None, e.reason
705709
)
706710

707711
except (asyncio.CancelledError, LoopPausedError, LoopStoppedError):
@@ -712,15 +716,15 @@ async def _run(
712716
logger.error(
713717
"Workflow block error",
714718
extra={
715-
"workflow_id": workflow_id,
719+
"workflow_run_id": workflow_run_id,
716720
"block_index": idx,
717721
"error": error_str,
718722
"traceback": traceback.format_exc(),
719723
},
720724
)
721725

722726
attempts = await self._persist_block_attempt(
723-
workflow_id, idx, error_str
727+
workflow_run_id, idx, error_str
724728
)
725729

726730
should_retry = False
@@ -738,7 +742,7 @@ async def _run(
738742
logger.info(
739743
"Retrying workflow block",
740744
extra={
741-
"workflow_id": workflow_id,
745+
"workflow_run_id": workflow_run_id,
742746
"block_index": idx,
743747
"attempt": attempts,
744748
"delay": delay,
@@ -748,39 +752,41 @@ async def _run(
748752
continue
749753

750754
max_retries_error = WorkflowMaxRetriesError(
751-
workflow_id, idx, attempts, error_str
755+
workflow_run_id, idx, attempts, error_str
752756
)
753757
logger.error(
754758
"Workflow block failed after max retries",
755759
extra={
756-
"workflow_id": workflow_id,
760+
"workflow_run_id": workflow_run_id,
757761
"block_index": idx,
758762
"attempts": attempts,
759763
},
760764
)
761765
await _call(on_error, context, current_block, max_retries_error)
762766
await self.state_manager.update_workflow_status(
763-
workflow_id, LoopStatus.FAILED
767+
workflow_run_id, LoopStatus.FAILED
764768
)
765769
await _call(on_stop, context)
766770
return
767771

768772
except asyncio.CancelledError:
769773
pass
770774
except LoopClaimError:
771-
logger.warning("Workflow claim failed", extra={"workflow_id": workflow_id})
775+
logger.warning(
776+
"Workflow claim failed", extra={"workflow_run_id": workflow_run_id}
777+
)
772778
except LoopStoppedError:
773779
await self.state_manager.update_workflow_status(
774-
workflow_id, LoopStatus.STOPPED
780+
workflow_run_id, LoopStatus.STOPPED
775781
)
776782
await _call(on_stop, context)
777783
except LoopPausedError:
778784
await self.state_manager.update_workflow_status(
779-
workflow_id, LoopStatus.IDLE
785+
workflow_run_id, LoopStatus.IDLE
780786
)
781787
# Don't call on_stop - workflow is just paused, not finished
782788
finally:
783-
self.tasks.pop(workflow_id, None)
789+
self.tasks.pop(workflow_run_id, None)
784790

785791
async def start(
786792
self,
@@ -794,16 +800,16 @@ async def start(
794800
plan: Callable[..., Any] | None = None,
795801
retry_policy: RetryPolicy | None = None,
796802
) -> bool:
797-
if workflow.workflow_id in self.tasks:
803+
if workflow.workflow_run_id in self.tasks:
798804
return False
799805

800806
await _call(on_start, context)
801807

802-
self.tasks[workflow.workflow_id] = asyncio.create_task(
808+
self.tasks[workflow.workflow_run_id] = asyncio.create_task(
803809
self._run(
804810
func,
805811
context,
806-
workflow.workflow_id,
812+
workflow.workflow_run_id,
807813
on_stop,
808814
on_block_complete,
809815
on_error,
@@ -813,8 +819,8 @@ async def start(
813819
)
814820
return True
815821

816-
async def stop(self, workflow_id: str) -> bool:
817-
task = self.tasks.pop(workflow_id, None)
822+
async def stop(self, workflow_run_id: str) -> bool:
823+
task = self.tasks.pop(workflow_run_id, None)
818824
if not task:
819825
return False
820826
task.cancel()

0 commit comments

Comments
 (0)