Skip to content

Commit 073379e

Browse files
handle looping better
1 parent 5fe852d commit 073379e

7 files changed

Lines changed: 47 additions & 26 deletions

File tree

fastloop/fastloop.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,14 @@ async def lifespan(_: FastAPI):
109109
allow_headers=cors_config.get("allow_headers", ["*"]),
110110
)
111111

112-
@self.get("/events/{loop_id}/history")
113-
async def events_history_endpoint(loop_id: str): # type: ignore
114-
events = await self.state_manager.get_event_history(loop_id)
115-
return [event.to_dict() for event in events] # type: ignore
116-
117-
@self.get("/events/{loop_id}/sse")
118-
async def events_sse_endpoint(loop_id: str): # type: ignore
119-
return await self.loop_manager.events_sse(loop_id)
112+
@self.get("/events/{entity_id}/history")
113+
async def events_history_endpoint(entity_id: str): # type: ignore
114+
events = await self.state_manager.get_event_history(entity_id)
115+
return events
116+
117+
@self.get("/events/{entity_id}/sse")
118+
async def events_sse_endpoint(entity_id: str): # type: ignore
119+
return await self.loop_manager.events_sse(entity_id)
120120

121121
@property
122122
def config(self) -> BaseConfig:

fastloop/loop.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -369,28 +369,31 @@ async def active_loop_ids(self) -> set[str]:
369369

370370
return {loop_id for loop_id, _ in self.loop_tasks.items()}
371371

372-
async def events_sse(self, loop_id: str):
372+
async def events_sse(self, entity_id: str):
373373
"""
374374
SSE endpoint for streaming events to clients.
375375
Works for both loops and workflows.
376376
"""
377-
# Check if it's a loop or workflow
377+
# Check if it's a loop or workflow, get creation time for event filtering
378+
created_at = None
378379
try:
379-
await self.state_manager.get_loop(loop_id)
380+
loop = await self.state_manager.get_loop(entity_id)
381+
created_at = loop.created_at
380382
except LoopNotFoundError:
381383
try:
382-
await self.state_manager.get_workflow(loop_id)
384+
workflow = await self.state_manager.get_workflow(entity_id)
385+
created_at = workflow.created_at
383386
except WorkflowNotFoundError as e:
384387
raise HTTPException(
385388
status_code=404, detail="Loop/workflow not found"
386389
) from e
387390

388-
connection_time = int(datetime.now().timestamp())
391+
connection_time = int(created_at) if created_at else 0
389392
last_sent_nonce = 0
390393
connection_id = str(uuid.uuid4())
391394

392-
await self.state_manager.register_client_connection(loop_id, connection_id)
393-
pubsub = await self.state_manager.subscribe_to_events(loop_id)
395+
await self.state_manager.register_client_connection(entity_id, connection_id)
396+
pubsub = await self.state_manager.subscribe_to_events(entity_id)
394397

395398
async def _event_generator():
396399
nonlocal last_sent_nonce
@@ -400,7 +403,7 @@ async def _event_generator():
400403
all_events: list[
401404
dict[str, Any]
402405
] = await self.state_manager.get_events_since(
403-
loop_id, connection_time
406+
entity_id, connection_time
404407
)
405408
server_events = [
406409
e
@@ -429,24 +432,24 @@ async def _event_generator():
429432

430433
# Refresh connection TTL periodically
431434
await self.state_manager.refresh_client_connection(
432-
loop_id, connection_id
435+
entity_id, connection_id
433436
)
434437

435438
except asyncio.CancelledError:
436439
pass
437440
except BaseException as e:
438441
logger.error(
439-
"Error in SSE stream for loop",
442+
"Error in SSE stream",
440443
extra={
441-
"loop_id": loop_id,
444+
"entity_id": entity_id,
442445
"error": str(e),
443446
"traceback": traceback.format_exc(),
444447
},
445448
)
446449
yield f'data: {{"type": "error", "message": "{e!s}"}}\n\n'
447450
finally:
448451
await self.state_manager.unregister_client_connection(
449-
loop_id, connection_id
452+
entity_id, connection_id
450453
)
451454
if pubsub is not None:
452455
await pubsub.unsubscribe() # type: ignore
@@ -514,9 +517,17 @@ async def _mark_block_completed(
514517
workflow = await self.state_manager.get_workflow(workflow_run_id)
515518
if idx not in workflow.completed_blocks:
516519
workflow.completed_blocks.append(idx)
520+
517521
workflow.current_block_index = next_idx
518522
workflow.next_payload = payload
519523
workflow.block_attempts.pop(idx, None)
524+
525+
# If going backwards, clear completed status for blocks we're rewinding to
526+
if next_idx <= idx:
527+
workflow.completed_blocks = [
528+
b for b in workflow.completed_blocks if b < next_idx
529+
]
530+
520531
await self.state_manager.update_workflow(workflow_run_id, workflow)
521532

522533
async def _apply_plan(

fastloop/state/state_redis.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,11 @@ async def get_events_since(
620620
Get events that occurred since the given timestamp.
621621
"""
622622
all_events = await self.get_event_history(loop_id)
623-
return [event for event in all_events if event["timestamp"] >= since_timestamp]
623+
return [
624+
event
625+
for event in all_events
626+
if float(event["timestamp"]) >= since_timestamp
627+
]
624628

625629
async def subscribe_to_events(self, loop_id: str) -> Any:
626630
"""Subscribe to event notifications for a specific loop"""

fastloop/state/state_s3.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,9 @@ async def get_events_since(
447447
self, loop_id: str, since_timestamp: float
448448
) -> list[dict[str, Any]]:
449449
history = await self.get_event_history(loop_id)
450-
return [event for event in history if event["timestamp"] >= since_timestamp]
450+
return [
451+
event for event in history if float(event["timestamp"]) >= since_timestamp
452+
]
451453

452454
async def pop_server_event(self, loop_id: str) -> dict[str, Any] | None:
453455
queue_key = S3Keys.loop_event_queue_server(self.prefix, self.app_name, loop_id)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.94"
3+
version = "0.1.97"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

tests/test_workflows.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,8 +570,12 @@ async def test_filter_by_status(self, state_manager):
570570
workflow_name="stopped", blocks=[{"type": "s", "text": ""}]
571571
)
572572

573-
await state_manager.update_workflow_status(w1.workflow_run_id, LoopStatus.RUNNING)
574-
await state_manager.update_workflow_status(w2.workflow_run_id, LoopStatus.STOPPED)
573+
await state_manager.update_workflow_status(
574+
w1.workflow_run_id, LoopStatus.RUNNING
575+
)
576+
await state_manager.update_workflow_status(
577+
w2.workflow_run_id, LoopStatus.STOPPED
578+
)
575579

576580
running = await state_manager.get_all_workflows(status=LoopStatus.RUNNING)
577581
assert len(running) == 1

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)