Skip to content

Commit e8439cb

Browse files
add concurrency test
1 parent 48dde9d commit e8439cb

8 files changed

Lines changed: 278 additions & 40 deletions

File tree

fastloop/fastloop.py

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,6 @@ async def _event_handler(request: dict[str, Any], func: Any = func):
349349
loop_instance.ctx = context
350350
await loop_instance.on_event(context, event)
351351

352-
if loop.status != LoopStatus.RUNNING:
353-
loop = await self.state_manager.update_loop_status(
354-
loop.loop_id, LoopStatus.RUNNING
355-
)
356-
357352
if loop_instance or created:
358353
func_to_run = func
359354
else:
@@ -584,11 +579,6 @@ async def _start_handler(request: dict[str, Any]):
584579
state_manager=self.state_manager,
585580
)
586581

587-
if workflow.status != LoopStatus.RUNNING:
588-
await self.state_manager.update_workflow_status(
589-
workflow.workflow_run_id, LoopStatus.RUNNING
590-
)
591-
592582
await self.workflow_manager.start(
593583
func,
594584
context,
@@ -712,15 +702,7 @@ async def restart_loop(self, loop_id: str) -> bool:
712702
loop_delay=metadata["loop_delay"],
713703
)
714704
if started:
715-
await self.state_manager.update_loop_status(
716-
loop.loop_id, LoopStatus.RUNNING
717-
)
718-
logger.info(
719-
"Restarted loop",
720-
extra={
721-
"loop_id": loop.loop_id,
722-
},
723-
)
705+
logger.info("Restarted loop", extra={"loop_id": loop.loop_id})
724706
return True
725707
else:
726708
logger.warning(
@@ -790,15 +772,9 @@ async def restart_workflow(self, workflow_run_id: str) -> bool:
790772
)
791773

792774
if started:
793-
await self.state_manager.update_workflow_status(
794-
workflow.workflow_run_id, LoopStatus.RUNNING
795-
)
796775
logger.info(
797776
"Restarted workflow",
798-
extra={
799-
"workflow_run_id": workflow.workflow_run_id,
800-
"block_index": workflow.current_block_index,
801-
},
777+
extra={"workflow_run_id": workflow.workflow_run_id},
802778
)
803779
return started
804780

fastloop/loop.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,17 @@ async def _run(
7474
context: Any,
7575
loop_id: str,
7676
delay: float,
77+
loop_start_func: Callable[..., Any] | None,
7778
loop_stop_func: Callable[..., Any] | None,
7879
) -> None:
7980
try:
8081
async with self.state_manager.with_claim(loop_id): # type: ignore
82+
await self.state_manager.update_loop_status(loop_id, LoopStatus.RUNNING)
83+
if loop_start_func:
84+
if asyncio.iscoroutinefunction(loop_start_func):
85+
await loop_start_func(context)
86+
else:
87+
loop_start_func(context) # type: ignore
8188
idle_cycles = 0
8289

8390
while not context.should_stop and not context.should_pause:
@@ -177,15 +184,10 @@ async def start(
177184
if loop.loop_id in self.loop_tasks:
178185
return False
179186

180-
if loop_start_func:
181-
if asyncio.iscoroutinefunction(loop_start_func):
182-
await loop_start_func(context)
183-
else:
184-
loop_start_func(context) # type: ignore
185-
186-
# TODO: switch out executor for thread/process based on config
187187
self.loop_tasks[loop.loop_id] = asyncio.create_task(
188-
self._run(func, context, loop.loop_id, loop_delay, loop_stop_func)
188+
self._run(
189+
func, context, loop.loop_id, loop_delay, loop_start_func, loop_stop_func
190+
)
189191
)
190192

191193
return True

fastloop/monitor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ async def _check_orphaned_tasks(self) -> None:
158158
task_name=task.task_name,
159159
retry_policy=metadata.get("retry"),
160160
executor_type=metadata.get("executor"),
161-
)
161+
)
162162

163163
async def _check_scheduled_workflows(self) -> None:
164164
now = time.time()

fastloop/workflow.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ async def _run(
216216
func: Callable[..., Any],
217217
context: Any,
218218
workflow_run_id: str,
219+
on_start: Callable[..., Any] | None,
219220
on_stop: Callable[..., Any] | None,
220221
on_block_complete: Callable[..., Any] | None,
221222
on_error: Callable[..., Any] | None,
@@ -224,6 +225,10 @@ async def _run(
224225
) -> None:
225226
try:
226227
async with self.state_manager.with_workflow_claim(workflow_run_id):
228+
await self.state_manager.update_workflow_status(
229+
workflow_run_id, LoopStatus.RUNNING
230+
)
231+
await _call(on_start, context)
227232
while True:
228233
workflow = await self.state_manager.get_workflow(workflow_run_id)
229234
if workflow.status in (LoopStatus.STOPPED, LoopStatus.FAILED):
@@ -438,13 +443,12 @@ async def start(
438443
if workflow.workflow_run_id in self.tasks:
439444
return False
440445

441-
await _call(on_start, context)
442-
443446
self.tasks[workflow.workflow_run_id] = asyncio.create_task(
444447
self._run(
445448
func,
446449
context,
447450
workflow.workflow_run_id,
451+
on_start,
448452
on_stop,
449453
on_block_complete,
450454
on_error,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.100"
3+
version = "0.1.101"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

0 commit comments

Comments
 (0)