Skip to content

Commit f086771

Browse files
committed
refactor: Function to get execution status stats
1 parent b6873b2 commit f086771

2 files changed

Lines changed: 115 additions & 7 deletions

File tree

cloud_pipelines_backend/api_server_sql.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,10 @@ def create_pipeline_run_response(
241241
pipeline_name = component_spec.name
242242
response.pipeline_name = pipeline_name
243243
if include_execution_stats:
244-
execution_status_stats = self._calculate_execution_status_stats(
245-
session=session, root_execution_id=pipeline_run.root_execution_id
244+
response.execution_status_stats = self._get_execution_status_stats(
245+
session=session,
246+
root_execution_id=pipeline_run.root_execution_id,
246247
)
247-
response.execution_status_stats = {
248-
status.value: count
249-
for status, count in execution_status_stats.items()
250-
}
251248
return response
252249

253250
return ListPipelineJobsResponse(
@@ -258,6 +255,16 @@ def create_pipeline_run_response(
258255
next_page_token=next_page_token,
259256
)
260257

258+
def _get_execution_status_stats(
259+
self,
260+
session: orm.Session,
261+
root_execution_id: bts.IdType,
262+
) -> dict[str, int]:
263+
stats = self._calculate_execution_status_stats(
264+
session=session, root_execution_id=root_execution_id
265+
)
266+
return {status.value: count for status, count in stats.items()}
267+
261268
def _calculate_execution_status_stats(
262269
self, session: orm.Session, root_execution_id: bts.IdType
263270
) -> dict[bts.ContainerExecutionStatus, int]:

tests/test_api_server_sql.py

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,50 @@
1+
from sqlalchemy import orm
2+
13
from cloud_pipelines_backend import backend_types_sql as bts
2-
from cloud_pipelines_backend.api_server_sql import ExecutionStatusSummary
4+
from cloud_pipelines_backend import database_ops
5+
from cloud_pipelines_backend.api_server_sql import (
6+
ExecutionStatusSummary,
7+
PipelineRunsApiService_Sql,
8+
)
9+
10+
11+
def _initialize_db_and_get_session_factory():
12+
db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://")
13+
return lambda: orm.Session(bind=db_engine)
14+
15+
16+
def _create_execution_node(session, task_spec=None, status=None, parent=None):
17+
"""Helper to create an ExecutionNode with optional status and parent."""
18+
node = bts.ExecutionNode(task_spec=task_spec or {})
19+
if parent is not None:
20+
node.parent_execution = parent
21+
if status is not None:
22+
node.container_execution_status = status
23+
session.add(node)
24+
session.flush()
25+
return node
26+
27+
28+
def _link_ancestor(session, execution_node, ancestor_node):
29+
"""Create an ExecutionToAncestorExecutionLink."""
30+
link = bts.ExecutionToAncestorExecutionLink(
31+
ancestor_execution=ancestor_node,
32+
execution=execution_node,
33+
)
34+
session.add(link)
35+
session.flush()
36+
37+
38+
def _create_pipeline_run(session, root_execution, created_by=None, annotations=None):
39+
"""Helper to create a PipelineRun linked to a root execution node."""
40+
run = bts.PipelineRun(root_execution=root_execution)
41+
if created_by:
42+
run.created_by = created_by
43+
if annotations:
44+
run.annotations = annotations
45+
session.add(run)
46+
session.flush()
47+
return run
348

449

550
class TestExecutionStatusSummary:
@@ -55,3 +100,59 @@ def test_accumulate_all_statuses(self):
55100
assert summary.total_nodes == expected_total
56101
assert summary.ended_nodes == expected_ended
57102
assert summary.has_ended == (expected_ended == expected_total)
103+
104+
105+
class TestPipelineRunServiceList:
106+
def test_list_empty(self):
107+
session_factory = _initialize_db_and_get_session_factory()
108+
service = PipelineRunsApiService_Sql()
109+
with session_factory() as session:
110+
result = service.list(session=session)
111+
assert result.pipeline_runs == []
112+
assert result.next_page_token is None
113+
114+
def test_list_returns_pipeline_runs(self):
115+
session_factory = _initialize_db_and_get_session_factory()
116+
service = PipelineRunsApiService_Sql()
117+
with session_factory() as session:
118+
root = _create_execution_node(session)
119+
root_id = root.id
120+
_create_pipeline_run(session, root, created_by="user1")
121+
session.commit()
122+
123+
with session_factory() as session:
124+
result = service.list(session=session)
125+
assert len(result.pipeline_runs) == 1
126+
assert result.pipeline_runs[0].root_execution_id == root_id
127+
assert result.pipeline_runs[0].created_by == "user1"
128+
assert result.pipeline_runs[0].execution_status_stats is None
129+
130+
def test_list_with_execution_stats(self):
131+
session_factory = _initialize_db_and_get_session_factory()
132+
service = PipelineRunsApiService_Sql()
133+
with session_factory() as session:
134+
root = _create_execution_node(session)
135+
root_id = root.id
136+
child1 = _create_execution_node(
137+
session,
138+
parent=root,
139+
status=bts.ContainerExecutionStatus.SUCCEEDED,
140+
)
141+
child2 = _create_execution_node(
142+
session,
143+
parent=root,
144+
status=bts.ContainerExecutionStatus.RUNNING,
145+
)
146+
_link_ancestor(session, child1, root)
147+
_link_ancestor(session, child2, root)
148+
_create_pipeline_run(session, root)
149+
session.commit()
150+
151+
with session_factory() as session:
152+
result = service.list(session=session, include_execution_stats=True)
153+
assert len(result.pipeline_runs) == 1
154+
assert result.pipeline_runs[0].root_execution_id == root_id
155+
stats = result.pipeline_runs[0].execution_status_stats
156+
assert stats is not None
157+
assert stats["SUCCEEDED"] == 1
158+
assert stats["RUNNING"] == 1

0 commit comments

Comments
 (0)