Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,40 +1255,62 @@ def _mirror_system_annotations(
created_by: str | None,
pipeline_name: str | None,
) -> None:
"""Mirror pipeline run fields as system annotations for filter_query search"""
"""Mirror pipeline run fields as system annotations for filter_query search.

Always creates an annotation for every run, even when the source value is
None or empty (stored as ""). This ensures data parity so every run has a
row for each system key.
"""

# TODO: The original pipeline_run.created_by and the pipeline name stored in
# extra_data / task_spec are saved untruncated, while the annotation mirror
# is truncated to VARCHAR(255). This creates a data parity mismatch between
# the source columns and their annotation copies. Revisit this to either
# widen the annotation column or enforce the same limit at the source.

if created_by:
created_by = _truncate_for_annotation(
value=created_by,
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
created_by_value = created_by
if created_by_value is None:
created_by_value = ""
_logger.warning(
f"Pipeline run id {pipeline_run_id} `created_by` is None, "
'setting it to empty string "" for data parity'
)

created_by_value = _truncate_for_annotation(
value=created_by_value,
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
pipeline_run_id=pipeline_run_id,
)

session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
value=created_by_value,
)
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
value=created_by,
)
)

pipeline_name_value = pipeline_name
if pipeline_name_value is None:
pipeline_name_value = ""
_logger.warning(
f"Pipeline run id {pipeline_run_id} `pipeline_name` is None, "
'setting it to empty string "" for data parity'
)
if pipeline_name:
pipeline_name = _truncate_for_annotation(
value=pipeline_name,
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,

pipeline_name_value = _truncate_for_annotation(
value=pipeline_name_value,
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
pipeline_run_id=pipeline_run_id,
)

session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
value=pipeline_name_value,
)
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
value=pipeline_name,
)
)
)


def _recursively_create_all_executions_and_artifacts_root(
Expand Down
Loading
Loading