diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 2e13daa..9959b25 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1255,7 +1255,12 @@ def _mirror_system_annotations( created_by: str | None, pipeline_name: str | None, ) -> None: - """Mirror pipeline run fields as system annotations for filter_query search""" + """Mirror pipeline run fields as system annotations for filter_query search. + + Always creates an annotation for every run, even when the source value is + None or empty (stored as ""). This ensures data parity so every run has a + row for each system key. + """ # TODO: The original pipeline_run.created_by and the pipeline name stored in # extra_data / task_spec are saved untruncated, while the annotation mirror @@ -1263,32 +1268,49 @@ def _mirror_system_annotations( # the source columns and their annotation copies. Revisit this to either # widen the annotation column or enforce the same limit at the source. - if created_by: - created_by = _truncate_for_annotation( - value=created_by, - field_name=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + created_by_value = created_by + if created_by_value is None: + created_by_value = "" + _logger.warning( + f"Pipeline run id {pipeline_run_id} `created_by` is None, " + 'setting it to empty string "" for data parity' + ) + + created_by_value = _truncate_for_annotation( + value=created_by_value, + field_name=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + pipeline_run_id=pipeline_run_id, + ) + + session.add( + bts.PipelineRunAnnotation( pipeline_run_id=pipeline_run_id, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + value=created_by_value, ) - session.add( - bts.PipelineRunAnnotation( - pipeline_run_id=pipeline_run_id, - key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, - value=created_by, - ) + ) + + pipeline_name_value = pipeline_name + if pipeline_name_value is None: + pipeline_name_value = "" + _logger.warning( + f"Pipeline run id {pipeline_run_id} `pipeline_name` is None, " + 'setting it to empty string "" for data parity' ) - if pipeline_name: - pipeline_name = _truncate_for_annotation( - value=pipeline_name, - field_name=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + + pipeline_name_value = _truncate_for_annotation( + value=pipeline_name_value, + field_name=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + pipeline_run_id=pipeline_run_id, + ) + + session.add( + bts.PipelineRunAnnotation( pipeline_run_id=pipeline_run_id, + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + value=pipeline_name_value, ) - session.add( - bts.PipelineRunAnnotation( - pipeline_run_id=pipeline_run_id, - key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, - value=pipeline_name, - ) - ) + ) def _recursively_create_all_executions_and_artifacts_root( diff --git a/cloud_pipelines_backend/database_migrate.py b/cloud_pipelines_backend/database_migrate.py new file mode 100644 index 0000000..3771d1a --- /dev/null +++ b/cloud_pipelines_backend/database_migrate.py @@ -0,0 +1,542 @@ +import logging + +import sqlalchemy +from sqlalchemy import orm + +from . import backend_types_sql as bts +from . import filter_query_sql + +_logger = logging.getLogger(__name__) + + +def _is_pipeline_run_annotation_key_already_backfilled( + *, + session: orm.Session, + key: str, +) -> bool: + """Return True if at least one annotation with the given key exists.""" + return session.query( + sqlalchemy.exists( + sqlalchemy.select(sqlalchemy.literal(1)) + .select_from(bts.PipelineRunAnnotation) + .where( + bts.PipelineRunAnnotation.key == key, + ) + ) + ).scalar() + + +def backfill_created_by_annotations( + *, + session: orm.Session, + auto_commit: bool, +) -> int: + """Idempotent backfill: copy pipeline_run.created_by into annotations. + + Source column: pipeline_run.created_by + + INSERT INTO pipeline_run_annotation + SELECT pr.id, 'system/pipeline_run.created_by', COALESCE(pr.created_by, '') + FROM pipeline_run pr + LEFT JOIN pipeline_run_annotation ann + ON ann.pipeline_run_id = pr.id + AND ann.key = 'system/pipeline_run.created_by' + WHERE ann.pipeline_run_id IS NULL + + Starting table: + + pipeline_run + +----+------------+ + | id | created_by | + +----+------------+ + | 1 | alice | + | 2 | NULL | + | 3 | | (empty string) + | 4 | bob | + +----+------------+ + + pipeline_run_annotation (pre-existing) + +--------+--------------------------------------+-------+ + | run_id | key | value | + +--------+--------------------------------------+-------+ + | 1 | system/pipeline_run.created_by | alice | + +--------+--------------------------------------+-------+ + + Step 1 -- LEFT JOIN annotation (anti-join): + Finds runs missing a created_by annotation. + + +----+------------+--------------+ + | id | created_by | ann.run_id | + +----+------------+--------------+ + | 1 | alice | 1 | <- has annotation, SKIP + | 2 | NULL | NULL | <- missing, INSERT + | 3 | | NULL | <- missing, INSERT + | 4 | bob | NULL | <- missing, INSERT + +----+------------+--------------+ + + Step 2 -- INSERT with COALESCE: + + INSERT INTO pipeline_run_annotation (pipeline_run_id, key, value) + +--------+--------------------------------------+-------+ + | run_id | key | value | + +--------+--------------------------------------+-------+ + | 2 | system/pipeline_run.created_by | | (COALESCE(NULL, '') = '') + | 3 | system/pipeline_run.created_by | | ('' unchanged) + | 4 | system/pipeline_run.created_by | bob | + +--------+--------------------------------------+-------+ + + Data parity: Every pipeline_run gets a created_by annotation. + - NULL or empty created_by -> annotation value = "" (empty string) + - Non-empty created_by -> annotation value = created_by + + Idempotent: Anti-join (LEFT JOIN + IS NULL) ensures runs that + already have a created_by annotation are skipped. Safe to call + multiple times -- never produces duplicates. + + Portable across databases via SQLAlchemy (SQLite, MySQL, PostgreSQL). + COALESCE is ANSI SQL, supported by all three. + + Args: + session: SQLAlchemy session. Caller controls the transaction + when auto_commit=False. + auto_commit: Must be explicitly set. True commits after insert. + False defers commit to the caller (e.g. tangle OSS + migrate_db batches all 3 then commits once). + + Returns: + Number of annotation rows inserted. + """ + _logger.info("Starting backfill for `created_by` annotations") + + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + existing_ann = orm.aliased(bts.PipelineRunAnnotation) + + stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( + ["pipeline_run_id", "key", "value"], + sqlalchemy.select( + bts.PipelineRun.id, + sqlalchemy.literal(str(key)), + # Step 2: COALESCE(created_by, '') — NULL/empty -> "" + sqlalchemy.func.coalesce(bts.PipelineRun.created_by, ""), + ) + # Step 1: LEFT JOIN annotation (anti-join) — find runs missing created_by + .outerjoin( + existing_ann, + sqlalchemy.and_( + existing_ann.pipeline_run_id == bts.PipelineRun.id, + existing_ann.key == key, + ), + ).where( + existing_ann.pipeline_run_id.is_(None), + ), + ) + result = session.execute(stmt) + rowcount = result.rowcount + _logger.info( + "Backfill created_by (source: pipeline_run.created_by): " + f"{rowcount} rows inserted" + ) + if auto_commit: + session.commit() + return rowcount + + +def backfill_pipeline_names_from_extra_data( + *, + session: orm.Session, + auto_commit: bool, +) -> int: + """Idempotent backfill: extract pipeline name from extra_data JSON. + + Source path: pipeline_run.extra_data -> 'pipeline_name' + + INSERT INTO pipeline_run_annotation + SELECT pr.id, 'system/pipeline_run.name', + SUBSTR(JSON_EXTRACT(pr.extra_data, '$.pipeline_name'), 1, 255) + FROM pipeline_run pr + LEFT JOIN pipeline_run_annotation ann + ON ann.pipeline_run_id = pr.id + AND ann.key = 'system/pipeline_run.name' + WHERE ann.pipeline_run_id IS NULL + AND JSON_EXTRACT(pr.extra_data, '$.pipeline_name') IS NOT NULL + + Starting table: + + pipeline_run + +----+--------------------------------------+ + | id | extra_data | + +----+--------------------------------------+ + | 1 | {"pipeline_name": "my-pipeline"} | + | 2 | NULL | + | 3 | {} | + | 4 | {"pipeline_name": null} | + | 5 | {"pipeline_name": ""} | + | 6 | {"pipeline_name": "already-exists"} | + +----+--------------------------------------+ + + pipeline_run_annotation (pre-existing) + +--------+---------------------------+----------------+ + | run_id | key | value | + +--------+---------------------------+----------------+ + | 6 | system/pipeline_run.name | already-exists | + +--------+---------------------------+----------------+ + + Step 1 -- LEFT JOIN annotation (anti-join): + Finds runs missing a name annotation. + + Runs 1-5: ann.run_id IS NULL -> candidates for insert + Run 6: ann.run_id = 6 -> SKIP (already has annotation) + + Step 2 -- JSON extraction + NULL filter: + + +----+--------------------------------------+------------+-----------+--------+ + | id | extra_data | ann.run_id | extracted | action | + +----+--------------------------------------+------------+-----------+--------+ + | 1 | {"pipeline_name": "my-pipeline"} | NULL | "my-pipe" | INSERT | + | 2 | NULL | NULL | NULL | SKIP | + | 3 | {} | NULL | NULL | SKIP | + | 4 | {"pipeline_name": null} | NULL | NULL | SKIP | + | 5 | {"pipeline_name": ""} | NULL | "" | INSERT | + +----+--------------------------------------+------------+-----------+--------+ + + Step 3 -- INSERT into annotations: + + +--------+---------------------------+-------------+ + | run_id | key | value | + +--------+---------------------------+-------------+ + | 1 | system/pipeline_run.name | my-pipeline | + | 5 | system/pipeline_run.name | | (empty string) + +--------+---------------------------+-------------+ + + Valid (creates annotation row): + extra_data = {"pipeline_name": "my-pipeline"} -> value = "my-pipeline" + extra_data = {"pipeline_name": ""} -> value = "" + + Skipped (no annotation row): + extra_data = NULL -> JSON_EXTRACT = NULL + extra_data = {} -> key absent, NULL + extra_data = {"pipeline_name": null} -> JSON_EXTRACT = NULL + + Idempotent: Anti-join (LEFT JOIN + IS NULL) ensures runs that + already have a name annotation are skipped. Safe to call multiple + times -- never produces duplicates. Order-independent with + backfill_pipeline_names_from_component_spec: either can run first. + + Values are truncated to 255 characters via SUBSTR to respect the + VARCHAR(255) column limit on MySQL. + + Portable across databases: SQLAlchemy's JSON path extraction + generates the correct SQL for each dialect: + - SQLite: JSON_EXTRACT(extra_data, '$.pipeline_name') + - MySQL: JSON_UNQUOTE(JSON_EXTRACT(...)) + - PostgreSQL: extra_data ->> 'pipeline_name' + + Args: + session: SQLAlchemy session. Caller controls the transaction + when auto_commit=False. + auto_commit: Must be explicitly set. True commits after insert. + False defers commit to the caller. + + Returns: + Number of annotation rows inserted. + """ + _logger.info("Starting backfill for `pipeline_name` from `extra_data` annotations") + + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + # Step 2: JSON extraction — extra_data -> 'pipeline_name' + pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string() + # Step 3: SUBSTR truncation for VARCHAR(255) + truncated_name = sqlalchemy.func.substr( + pipeline_name_expr, + 1, + bts._STR_MAX_LENGTH, + ) + existing_ann = orm.aliased(bts.PipelineRunAnnotation) + + stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( + ["pipeline_run_id", "key", "value"], + sqlalchemy.select( + bts.PipelineRun.id, + sqlalchemy.literal(str(key)), + truncated_name, + ) + # Step 1: LEFT JOIN annotation (anti-join) — find runs missing name annotation + .outerjoin( + existing_ann, + sqlalchemy.and_( + existing_ann.pipeline_run_id == bts.PipelineRun.id, + existing_ann.key == key, + ), + ).where( + existing_ann.pipeline_run_id.is_(None), + # Step 2 cont: NULL filter — skip NULL extra_data / missing key / null value + pipeline_name_expr.isnot(None), + ), + ) + result = session.execute(stmt) + rowcount = result.rowcount + _logger.info( + f"Backfill pipeline_name (source: extra_data): {rowcount} rows inserted" + ) + if auto_commit: + session.commit() + return rowcount + + +def backfill_pipeline_names_from_component_spec( + *, + session: orm.Session, + auto_commit: bool, +) -> int: + """Idempotent backfill: extract pipeline name from component_spec JSON. + + Source path: execution_node.task_spec -> 'componentRef' -> 'spec' -> 'name' + + INSERT INTO pipeline_run_annotation + SELECT pr.id, 'system/pipeline_run.name', + SUBSTR(JSON_EXTRACT(en.task_spec, '$.componentRef.spec.name'), 1, 255) + FROM pipeline_run pr + JOIN execution_node en ON en.id = pr.root_execution_id + LEFT JOIN pipeline_run_annotation ann + ON ann.pipeline_run_id = pr.id + AND ann.key = 'system/pipeline_run.name' + WHERE ann.pipeline_run_id IS NULL + AND JSON_EXTRACT(en.task_spec, '$.componentRef.spec.name') IS NOT NULL + + Starting tables: + + pipeline_run execution_node + +----+------------------+ +--------+-------------------------------------------+ + | id | root_execution_id| | id | task_spec (JSON) | + +----+------------------+ +--------+-------------------------------------------+ + | 1 | exec_1 | | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | + | 2 | exec_2 | | exec_2 | {"componentRef":{"spec":null}} | + | 3 | exec_3 | | exec_3 | {"componentRef":{"spec":{"name":""}}} | + | 4 | exec_4 | | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | + | 5 | exec_99 | +--------+-------------------------------------------+ + +----+------------------+ (no exec_99 row) + + pipeline_run_annotation (pre-existing) + +--------+---------------------------+-------+ + | run_id | key | value | + +--------+---------------------------+-------+ + | 1 | system/pipeline_run.name | A | + | 3 | user/custom_tag | hello | + +--------+---------------------------+-------+ + + Step 1 -- JOIN execution_node (INNER JOIN): + Attaches task_spec to each run. Drops runs with no execution_node. + + FROM pipeline_run pr + JOIN execution_node en ON en.id = pr.root_execution_id + + +----+--------+-------------------------------------------+ + | id | en.id | en.task_spec | + +----+--------+-------------------------------------------+ + | 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | + | 2 | exec_2 | {"componentRef":{"spec":null}} | + | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | + | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | + +----+--------+-------------------------------------------+ + (run 5 dropped -- exec_99 doesn't exist) + + Step 2a -- LEFT JOIN annotation: + Attempts to match each run to an existing name annotation. + + LEFT JOIN pipeline_run_annotation ann + ON ann.pipeline_run_id = pr.id + AND ann.key = 'system/pipeline_run.name' + + +----+--------+------------------------------------------+------------------+----------+ + | id | en.id | en.task_spec | ann.run_id | ann.key | + +----+--------+------------------------------------------+------------------+----------+ + | 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | 1 | sys/name | + | 2 | exec_2 | {"componentRef":{"spec":null}} | NULL | NULL | + | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | NULL | NULL | + | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | NULL | NULL | + +----+--------+------------------------------------------+------------------+----------+ + (run 1 matched -- has 'system/pipeline_run.name' annotation) + (run 3 NULL -- has 'user/custom_tag' but ON requires key = 'system/pipeline_run.name') + + Step 2b -- WHERE ann.pipeline_run_id IS NULL (anti-join filter): + Keeps only runs where the LEFT JOIN found no match. + + WHERE ann.pipeline_run_id IS NULL + + +----+--------+-------------------------------------------+ + | id | en.id | en.task_spec | + +----+--------+-------------------------------------------+ + | 2 | exec_2 | {"componentRef":{"spec":null}} | + | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | + | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | + +----+--------+-------------------------------------------+ + (run 1 dropped -- ann.run_id was 1, not NULL) + + Step 3 -- JSON extraction + NULL filter: + Extracts name from JSON path, keeps only non-null (empty string is allowed). + + WHERE task_spec->'componentRef'->'spec'->>'name' IS NOT NULL + + +----+-------------------------------------------+-----------+ + | id | en.task_spec | name_expr | + +----+-------------------------------------------+-----------+ + | 2 | {"componentRef":{"spec":null}} | NULL | <- dropped + | 3 | {"componentRef":{"spec":{"name":""}}} | "" | <- kept (empty string OK) + | 4 | {"componentRef":{"spec":{"name":"B"}}} | "B" | <- kept + +----+-------------------------------------------+-----------+ + + Step 4 -- INSERT INTO pipeline_run_annotation: + Inserts one row per surviving run. + + INSERT INTO pipeline_run_annotation (pipeline_run_id, key, value) + +--------+---------------------------+-------+ + | run_id | key | value | + +--------+---------------------------+-------+ + | 3 | system/pipeline_run.name | | + | 4 | system/pipeline_run.name | B | + +--------+---------------------------+-------+ + + The JSON path is portable across databases via SQLAlchemy: + - SQLite: JSON_EXTRACT(task_spec, '$.componentRef.spec.name') + - MySQL: JSON_UNQUOTE(JSON_EXTRACT(...)) + - PostgreSQL: task_spec -> 'componentRef' -> 'spec' ->> 'name' + + Any null at any depth (task_spec NULL, componentRef missing, + spec null, name missing) produces SQL NULL, filtered out by + IS NOT NULL. Empty string is allowed and will be inserted. + + Idempotent: Anti-join (LEFT JOIN + IS NULL) ensures runs that + already have a name annotation are skipped. Safe to call multiple + times -- never produces duplicates. Order-independent with + backfill_pipeline_names_from_extra_data: either can run first. + + Values are truncated to 255 characters via SUBSTR to respect the + VARCHAR(255) column limit on MySQL. + + Args: + session: SQLAlchemy session. Caller controls the transaction + when auto_commit=False. + auto_commit: Must be explicitly set. True commits after insert. + False defers commit to the caller. + + Returns: + Number of annotation rows inserted. + """ + _logger.info("Starting backfill for `pipeline_name` from `component_spec` annotations") + + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + # Step 3: JSON extraction — task_spec -> 'componentRef' -> 'spec' -> 'name' + name_expr = bts.ExecutionNode.task_spec[ + ("componentRef", "spec", "name") + ].as_string() + # Step 4: SUBSTR truncation for VARCHAR(255) + truncated_name = sqlalchemy.func.substr( + name_expr, + 1, + bts._STR_MAX_LENGTH, + ) + existing_ann = orm.aliased(bts.PipelineRunAnnotation) + + stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( + ["pipeline_run_id", "key", "value"], + sqlalchemy.select( + bts.PipelineRun.id, + sqlalchemy.literal(str(key)), + truncated_name, + ) + # Step 1: INNER JOIN execution_node — attach task_spec, drop missing nodes + .join( + bts.ExecutionNode, + bts.ExecutionNode.id == bts.PipelineRun.root_execution_id, + ) + # Step 2a: LEFT JOIN annotation (anti-join) — find runs missing name annotation + .outerjoin( + existing_ann, + sqlalchemy.and_( + existing_ann.pipeline_run_id == bts.PipelineRun.id, + existing_ann.key == key, + ), + ).where( + # Step 2b: anti-join filter — keep only unmatched runs + existing_ann.pipeline_run_id.is_(None), + # Step 3 cont: NULL filter — any null at any JSON depth -> skip + name_expr.isnot(None), + ), + ) + result = session.execute(stmt) + rowcount = result.rowcount + _logger.info( + f"Backfill pipeline_name (source: component_spec): {rowcount} rows inserted" + ) + if auto_commit: + session.commit() + return rowcount + + +def run_all_annotation_backfills( + *, + session: orm.Session, + do_skip_already_backfilled: bool, +) -> None: + """Run all annotation backfills in a single transaction. + + Called from migrate_db on application startup. Wraps all 3 backfill + functions with a try-catch so that failures do not block startup or + deployment. + + All 3 backfill functions are idempotent (anti-join ensures only + missing rows are inserted, never duplicates). It is always safe to + re-run them. + + Args: + session: SQLAlchemy session. Commit is called once at the end + after all 3 backfills succeed. + do_skip_already_backfilled: Must be explicitly set. + True -- skip guards check whether annotations of each key + already exist and skip the backfill if so (normal startup). + False -- force re-run of all backfills. Safe because each + backfill is idempotent (anti-join: only inserts rows that + don't already exist, never duplicates). + + Order: component_spec is called before extra_data so it is the + preferred source for pipeline names when both exist. + """ + _logger.info("Enter backfill for annotations table") + + try: + should_run_created_by = not do_skip_already_backfilled or ( + not _is_pipeline_run_annotation_key_already_backfilled( + session=session, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + ) + ) + if should_run_created_by: + backfill_created_by_annotations( + session=session, + auto_commit=False, + ) + + should_run_names = not do_skip_already_backfilled or ( + not _is_pipeline_run_annotation_key_already_backfilled( + session=session, + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + ) + ) + if should_run_names: + backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=False, + ) + backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=False, + ) + # TODO: Do we need a final catchall backfill that inserts empty string + # for all pipeline names, which happens to not have a name in + # component_spec nor extra_data? + + session.commit() + except Exception: + _logger.exception("Annotation backfill failed -- will retry on next restart") + + _logger.info("Exit backfill for annotations table") diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index 3672f99..c3ea4b2 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -1,9 +1,12 @@ +import logging import sqlalchemy from sqlalchemy import orm from . import backend_types_sql as bts -from . import filter_query_sql +from . import database_migrate + +_logger = logging.getLogger(__name__) def create_db_engine_and_migrate_db( @@ -58,6 +61,8 @@ def create_db_engine( def migrate_db(db_engine: sqlalchemy.Engine): + _logger.info("Enter migrate DB") + # # Example: # sqlalchemy.Index( # "ix_pipeline_run_created_by_created_at_desc", @@ -89,301 +94,13 @@ def migrate_db(db_engine: sqlalchemy.Engine): index.create(db_engine, checkfirst=True) break - _backfill_pipeline_run_created_by_annotations(db_engine=db_engine) - # Disable backfill since it's failing. - # IntegrityError: (1062, "Duplicate entry '019860807454ece868df-system/pipeline_run.name' for key 'pipeline_run_annotation.PRIMARY'") - # _backfill_pipeline_run_name_annotations(db_engine=db_engine) - - -def _is_pipeline_run_annotation_key_already_backfilled( - *, - session: orm.Session, - key: str, -) -> bool: - """Return True if at least one annotation with the given key exists.""" - return session.query( - sqlalchemy.exists( - sqlalchemy.select(sqlalchemy.literal(1)) - .select_from(bts.PipelineRunAnnotation) - .where( - bts.PipelineRunAnnotation.key == key, - ) - ) - ).scalar() - - -def _backfill_pipeline_run_created_by_annotations( - *, - db_engine: sqlalchemy.Engine, -) -> None: - """Copy pipeline_run.created_by into pipeline_run_annotation so - annotation-based search works for created_by. - - The check and insert run in a single session/transaction to avoid - TOCTOU races between concurrent startup processes. - - Skips entirely if any created_by annotation key already exists (i.e. the - write-path is populating them, so the backfill has already run or is - no longer needed). - """ + # do_skip_already_backfilled=True: skip if annotations already exist. + # Set to False to force re-run -- safe because backfills are idempotent + # (anti-join: only inserts missing rows, never duplicates). with orm.Session(db_engine) as session: - if _is_pipeline_run_annotation_key_already_backfilled( + database_migrate.run_all_annotation_backfills( session=session, - key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, - ): - return - - stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( - ["pipeline_run_id", "key", "value"], - sqlalchemy.select( - bts.PipelineRun.id, - sqlalchemy.literal( - filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - ), - bts.PipelineRun.created_by, - ).where( - bts.PipelineRun.created_by.isnot(None), - bts.PipelineRun.created_by != "", - ), + do_skip_already_backfilled=True, ) - session.execute(stmt) - session.commit() - - -def _backfill_pipeline_names_from_extra_data( - *, - session: orm.Session, -) -> None: - """Phase 1: bulk SQL backfill from extra_data['pipeline_name']. - - INSERT INTO pipeline_run_annotation - SELECT id, key, json_extract(extra_data, '$.pipeline_name') - FROM pipeline_run - WHERE json_extract(...) IS NOT NULL - - Valid (creates annotation row): - extra_data = {"pipeline_name": "my-pipeline"} -> value = "my-pipeline" - extra_data = {"pipeline_name": ""} -> value = "" - - Skipped (no annotation row): - extra_data = NULL -> JSON_EXTRACT = NULL - extra_data = {} -> key absent, NULL - extra_data = {"pipeline_name": null} -> JSON_EXTRACT = NULL - - SQLAlchemy's JSON path extraction is NULL-safe: returns SQL NULL - when extra_data is NULL or the key is absent (no Python error). - """ - pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string() - truncated_name = sqlalchemy.func.substr( - pipeline_name_expr, - 1, - bts._STR_MAX_LENGTH, - ) - stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( - ["pipeline_run_id", "key", "value"], - sqlalchemy.select( - bts.PipelineRun.id, - sqlalchemy.literal( - filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, - ), - truncated_name, - ).where( - pipeline_name_expr.isnot(None), - ), - ) - session.execute(stmt) - - -def _backfill_pipeline_names_from_component_spec( - *, - session: orm.Session, -) -> None: - """Phase 2: Bulk SQL fallback for runs still missing a name annotation. - - Extracts the pipeline name from each run's ExecutionNode via the - JSON path: - - task_spec -> 'componentRef' -> 'spec' ->> 'name' - - Starting tables: - - pipeline_run execution_node - +----+------------------+ +--------+-------------------------------------------+ - | id | root_execution_id| | id | task_spec (JSON) | - +----+------------------+ +--------+-------------------------------------------+ - | 1 | exec_1 | | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | - | 2 | exec_2 | | exec_2 | {"componentRef":{"spec":null}} | - | 3 | exec_3 | | exec_3 | {"componentRef":{"spec":{"name":""}}} | - | 4 | exec_4 | | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | - | 5 | exec_99 | +--------+-------------------------------------------+ - +----+------------------+ (no exec_99 row) - - pipeline_run_annotation (pre-existing) - +--------+---------------------------+-------+ - | run_id | key | value | - +--------+---------------------------+-------+ - | 1 | system/pipeline_run.name | A | - | 3 | user/custom_tag | hello | - +--------+---------------------------+-------+ - - Step 1 -- JOIN execution_node (INNER JOIN): - Attaches task_spec to each run. Drops runs with no execution_node. - - FROM pipeline_run pr - JOIN execution_node en ON en.id = pr.root_execution_id - - +----+--------+-------------------------------------------+ - | id | en.id | en.task_spec | - +----+--------+-------------------------------------------+ - | 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | - | 2 | exec_2 | {"componentRef":{"spec":null}} | - | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | - | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | - +----+--------+-------------------------------------------+ - (run 5 dropped -- exec_99 doesn't exist) - - Step 2a -- LEFT JOIN annotation: - Attempts to match each run to an existing name annotation. - - LEFT JOIN pipeline_run_annotation ann - ON ann.pipeline_run_id = pr.id - AND ann.key = 'system/pipeline_run.name' - - +----+--------+------------------------------------------+------------------+----------+ - | id | en.id | en.task_spec | ann.run_id | ann.key | - +----+--------+------------------------------------------+------------------+----------+ - | 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | 1 | sys/name | - | 2 | exec_2 | {"componentRef":{"spec":null}} | NULL | NULL | - | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | NULL | NULL | - | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | NULL | NULL | - +----+--------+------------------------------------------+------------------+----------+ - (run 1 matched -- has 'system/pipeline_run.name' annotation) - (run 3 NULL -- has 'user/custom_tag' but ON requires key = 'system/pipeline_run.name') - - Step 2b -- WHERE ann.pipeline_run_id IS NULL (anti-join filter): - Keeps only runs where the LEFT JOIN found no match. - - WHERE ann.pipeline_run_id IS NULL - - +----+--------+-------------------------------------------+ - | id | en.id | en.task_spec | - +----+--------+-------------------------------------------+ - | 2 | exec_2 | {"componentRef":{"spec":null}} | - | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | - | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | - +----+--------+-------------------------------------------+ - (run 1 dropped -- ann.run_id was 1, not NULL) - - Step 3 -- JSON extraction + NULL filter: - Extracts name from JSON path, keeps only non-null (empty string is allowed). - - WHERE task_spec->'componentRef'->'spec'->>'name' IS NOT NULL - - +----+-------------------------------------------+-----------+ - | id | en.task_spec | name_expr | - +----+-------------------------------------------+-----------+ - | 2 | {"componentRef":{"spec":null}} | NULL | <- dropped - | 3 | {"componentRef":{"spec":{"name":""}}} | "" | <- kept (empty string OK) - | 4 | {"componentRef":{"spec":{"name":"B"}}} | "B" | <- kept - +----+-------------------------------------------+-----------+ - - Step 4 -- INSERT INTO pipeline_run_annotation: - Inserts one row per surviving run. - - INSERT INTO pipeline_run_annotation (pipeline_run_id, key, value) - +--------+---------------------------+-------+ - | run_id | key | value | - +--------+---------------------------+-------+ - | 3 | system/pipeline_run.name | | - | 4 | system/pipeline_run.name | B | - +--------+---------------------------+-------+ - - The JSON path is portable across databases via SQLAlchemy: - - SQLite: JSON_EXTRACT(task_spec, '$.componentRef.spec.name') - - MySQL: JSON_UNQUOTE(JSON_EXTRACT(...)) - - PostgreSQL: task_spec -> 'componentRef' -> 'spec' ->> 'name' - - Any null at any depth (task_spec NULL, componentRef missing, - spec null, name missing) produces SQL NULL, filtered out by - IS NOT NULL. Empty string is allowed and will be inserted. - """ - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - name_expr = bts.ExecutionNode.task_spec[ - ("componentRef", "spec", "name") - ].as_string() - truncated_name = sqlalchemy.func.substr( - name_expr, - 1, - bts._STR_MAX_LENGTH, - ) - existing_ann = orm.aliased(bts.PipelineRunAnnotation) - - # Step 4: INSERT INTO pipeline_run_annotation - stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( - ["pipeline_run_id", "key", "value"], - sqlalchemy.select( - bts.PipelineRun.id, - sqlalchemy.literal(str(key)), - truncated_name, - ) - # Step 1: INNER JOIN execution_node - .join( - bts.ExecutionNode, - bts.ExecutionNode.id == bts.PipelineRun.root_execution_id, - ) - # Step 2a: LEFT JOIN existing annotation - .outerjoin( - existing_ann, - sqlalchemy.and_( - existing_ann.pipeline_run_id == bts.PipelineRun.id, - existing_ann.key == key, - ), - ).where( - # Step 2b: Anti-join — keep only runs with no existing annotation - existing_ann.pipeline_run_id.is_(None), - # Step 3: JSON extraction — keep only non-NULL names - name_expr.isnot(None), - ), - ) - session.execute(stmt) - - -def _backfill_pipeline_run_name_annotations( - *, - db_engine: sqlalchemy.Engine, -) -> None: - """Backfill pipeline_run_annotation with pipeline names. - - The check and both inserts run in a single session/transaction to - avoid TOCTOU races between concurrent startup processes. If anything - fails, the entire transaction rolls back automatically. - - Skips entirely if any name annotation already exists (i.e. the - write-path is populating them, so the backfill has already run or is - no longer needed). - - Phase 1 -- _backfill_pipeline_names_from_extra_data: - Bulk SQL insert from extra_data['pipeline_name']. - - Phase 2 -- _backfill_pipeline_names_from_component_spec: - Bulk SQL fallback for runs Phase 1 missed (extra_data is NULL or - missing the key). Extracts name via JSON path - task_spec -> componentRef -> spec -> name. - - Annotation creation rules (same for both phases): - Creates row: any non-NULL string, including empty string "" - Skips row: NULL at any depth in the JSON path - """ - with orm.Session(db_engine) as session: - if _is_pipeline_run_annotation_key_already_backfilled( - session=session, - key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, - ): - return - # execute() - rows in DB buffer for Phase 2 - _backfill_pipeline_names_from_extra_data(session=session) - # Phase 2 sees Phase 1's rows via the shared transaction buffer. - _backfill_pipeline_names_from_component_spec(session=session) - # Both phases become permanent atomically. - session.commit() + _logger.info("Exit migrate DB") diff --git a/tests/test_api_server_sql.py b/tests/test_api_server_sql.py index b1a2592..bf30c53 100644 --- a/tests/test_api_server_sql.py +++ b/tests/test_api_server_sql.py @@ -379,8 +379,8 @@ def test_create_mirrors_name_only(self, session_factory, service): == "solo-pipeline" ) assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - not in annotations + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY] + == "" ) def test_create_mirrors_created_by_only(self, session_factory, service): @@ -396,11 +396,13 @@ def test_create_mirrors_created_by_only(self, session_factory, service): == "alice" ) assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - not in annotations + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME] + == "" ) - def test_create_skips_mirror_when_empty_values(self, session_factory, service): + def test_create_mirrors_empty_values_as_empty_string( + self, session_factory, service + ): run = _create_run( session_factory, service, @@ -410,27 +412,29 @@ def test_create_skips_mirror_when_empty_values(self, session_factory, service): with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - not in annotations + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME] + == "" ) assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - not in annotations + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY] + == "" ) - def test_create_skips_mirror_when_both_absent(self, session_factory, service): + def test_create_mirrors_absent_values_as_empty_string( + self, session_factory, service + ): task_spec = _make_task_spec("placeholder") task_spec.component_ref.spec.name = None run = _create_run(session_factory, service, root_task=task_spec) with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - not in annotations + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME] + == "" ) assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - not in annotations + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY] + == "" ) @@ -541,7 +545,8 @@ def test_list_annotations_only_system(self, session_factory, service): with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) assert annotations == { - filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME: "test-pipeline" + filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME: "test-pipeline", + filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY: "", } def test_set_annotation_rejects_system_key(self, session_factory, service): diff --git a/tests/test_database_migrate.py b/tests/test_database_migrate.py new file mode 100644 index 0000000..9c6603f --- /dev/null +++ b/tests/test_database_migrate.py @@ -0,0 +1,1619 @@ +"""Tests for database_migrate: backfill and annotation helpers. + +Pipeline Name Resolution Path +============================== + +Source 1 (bulk SQL -- extra_data path): + pipeline_run.extra_data -> ["pipeline_name"] -> value + | | | + +-- None +-- key missing +-- "" + v v v + SQL NULL (safe) SQL NULL (safe) valid (inserted) + +Source 2 (bulk SQL -- component_spec JSON path): + execution_node.task_spec -> 'componentRef' -> 'spec' ->> 'name' + | | | | + +-- NULL +-- key missing +-- null +-- null + v v v v + SQL NULL (safe) SQL NULL (safe) SQL NULL SQL NULL +""" + +import logging +from unittest import mock + +import pytest +import sqlalchemy +from sqlalchemy import orm +from typing import Any + +from cloud_pipelines_backend import api_server_sql +from cloud_pipelines_backend import backend_types_sql as bts +from cloud_pipelines_backend import component_structures as structures +from cloud_pipelines_backend import database_migrate +from cloud_pipelines_backend import database_ops +from cloud_pipelines_backend import filter_query_sql + + +def _make_task_spec( + *, + pipeline_name: str = "test-pipeline", +) -> structures.TaskSpec: + return structures.TaskSpec( + component_ref=structures.ComponentReference( + spec=structures.ComponentSpec( + name=pipeline_name, + implementation=structures.ContainerImplementation( + container=structures.ContainerSpec(image="test-image") + ), + ) + ) + ) + + +def _set_execution_node_task_spec( + *, + session_factory: orm.sessionmaker, + run_id: str, + task_spec: structures.TaskSpec, +) -> None: + """Replace the execution_node's task_spec JSON with the given TaskSpec. + + Use to test fallback paths where spec is None or name is None, + since the service's create() requires a valid spec to run. + """ + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run_id) + exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) + exec_node.task_spec = task_spec.to_json_dict() + session.commit() + + +def _set_execution_node_task_spec_raw( + *, + session_factory: orm.sessionmaker, + run_id: str, + task_spec_dict: dict[str, Any] | None, +) -> None: + """Set task_spec to an arbitrary dict (or None) bypassing Pydantic. + + Use to test JSON paths that Pydantic's TaskSpec cannot represent + (e.g. empty dict, missing componentRef, null task_spec). + """ + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run_id) + exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) + exec_node.task_spec = task_spec_dict + session.commit() + + +@pytest.fixture() +def session_factory() -> orm.sessionmaker: + db_engine = database_ops.create_db_engine(database_uri="sqlite://") + bts._TableBase.metadata.create_all(db_engine) + return orm.sessionmaker(db_engine) + + +def _create_run( + *, + session_factory: orm.sessionmaker, + service: api_server_sql.PipelineRunsApiService_Sql, + **kwargs: Any, +) -> api_server_sql.PipelineRunResponse: + """Create a pipeline run using a fresh session (mirrors production per-request sessions).""" + with session_factory() as session: + return service.create(session, **kwargs) + + +def _delete_annotation( + *, + session_factory: orm.sessionmaker, + run_id: str, + key: filter_query_sql.PipelineRunAnnotationSystemKey, +) -> None: + """Remove a write-path annotation so backfill can be tested in isolation.""" + with session_factory() as session: + session.execute( + sqlalchemy.delete(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.pipeline_run_id == run_id, + bts.PipelineRunAnnotation.key == key, + ) + ) + session.commit() + + +def _delete_all_annotations_for_key( + *, + session_factory: orm.sessionmaker, + key: filter_query_sql.PipelineRunAnnotationSystemKey, +) -> None: + """Remove all annotations with the given key from the DB.""" + with session_factory() as session: + session.execute( + sqlalchemy.delete(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.key == key, + ) + ) + session.commit() + + +def _count_annotations( + *, + session_factory: orm.sessionmaker, + key: filter_query_sql.PipelineRunAnnotationSystemKey, +) -> int: + """Count annotation rows with the given key.""" + with session_factory() as session: + return ( + session.query(bts.PipelineRunAnnotation) + .filter(bts.PipelineRunAnnotation.key == key) + .count() + ) + + +class TestIsAnnotationKeyAlreadyBackfilled: + def test_false_on_empty_db( + self, + session_factory: orm.sessionmaker, + ) -> None: + with session_factory() as session: + assert ( + database_migrate._is_pipeline_run_annotation_key_already_backfilled( + session=session, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + ) + is False + ) + + def test_false_with_unrelated_annotation( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + ) + _delete_all_annotations_for_key( + session_factory=session_factory, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + ) + assert ( + _count_annotations( + session_factory=session_factory, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + ) + == 0 + ) + with session_factory() as session: + service.set_annotation(session=session, id=run.id, key="team", value="ml") + with session_factory() as session: + assert ( + database_migrate._is_pipeline_run_annotation_key_already_backfilled( + session=session, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + ) + is False + ) + + def test_true_when_key_exists( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + with session_factory() as session: + assert ( + database_migrate._is_pipeline_run_annotation_key_already_backfilled( + session=session, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + ) + is False + ) + _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="alice", + ) + with session_factory() as session: + assert ( + database_migrate._is_pipeline_run_annotation_key_already_backfilled( + session=session, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + ) + is True + ) + + def test_matches_exact_key( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Only returns True for the exact key queried, not other keys.""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + ) + with session_factory() as session: + assert ( + database_migrate._is_pipeline_run_annotation_key_already_backfilled( + session=session, + key="team", + ) + is False + ) + with session_factory() as session: + service.set_annotation(session=session, id=run.id, key="team", value="ml") + with session_factory() as session: + assert ( + database_migrate._is_pipeline_run_annotation_key_already_backfilled( + session=session, + key="team", + ) + is True + ) + assert ( + database_migrate._is_pipeline_run_annotation_key_already_backfilled( + session=session, + key="other_key", + ) + is False + ) + + +class TestPipelineNameBackfill: + """Integration tests for pipeline name backfill functions.""" + + # --- Basic functionality --- + + def test_backfill_populates_name_from_extra_data( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="my-pipeline"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "my-pipeline" + + def test_backfill_populates_name_from_component_spec( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="fallback-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "fallback-name" + + def test_backfill_via_run_all( + self, + session_factory: orm.sessionmaker, + ) -> None: + """run_all_annotation_backfills resolves run_a via extra_data and run_b via component_spec.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run_a = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="from-extra-data"), + ) + + run_b = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="from-component-spec"), + ) + with session_factory() as session: + db_run_b = session.get(bts.PipelineRun, run_b.id) + db_run_b.extra_data = None + session.commit() + + _delete_all_annotations_for_key(session_factory=session_factory, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + database_migrate.run_all_annotation_backfills( + session=session, + do_skip_already_backfilled=True, + ) + + with session_factory() as session: + assert ( + service.list_annotations(session=session, id=run_a.id)[key] + == "from-extra-data" + ) + assert ( + service.list_annotations(session=session, id=run_b.id)[key] + == "from-component-spec" + ) + + def test_anti_join_prevents_duplicate( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Component_spec's anti-join sees extra_data's insert within the same transaction.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="shared-name"), + ) + _delete_all_annotations_for_key(session_factory=session_factory, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + database_migrate.run_all_annotation_backfills( + session=session, + do_skip_already_backfilled=True, + ) + + with session_factory() as session: + count = ( + session.query(bts.PipelineRunAnnotation) + .filter( + bts.PipelineRunAnnotation.pipeline_run_id == run.id, + bts.PipelineRunAnnotation.key == key, + ) + .count() + ) + assert count == 1 + + def test_backfill_skips_when_key_already_exists( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Once any NAME annotation exists, run_all_annotation_backfills skips the name backfill.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run_a = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="pipeline-a"), + ) + + run_b = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="pipeline-b"), + ) + _delete_annotation(session_factory=session_factory, run_id=run_b.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_b.id) + + with session_factory() as session: + database_migrate.run_all_annotation_backfills( + session=session, + do_skip_already_backfilled=True, + ) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_b.id) + + def test_force_backfill_fills_missing_name( + self, + session_factory: orm.sessionmaker, + ) -> None: + """do_skip_already_backfilled=False bypasses skip guard and fills missing rows.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run_a = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="pipeline-a"), + ) + + run_b = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="pipeline-b"), + ) + _delete_annotation(session_factory=session_factory, run_id=run_b.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_b.id) + + with session_factory() as session: + database_migrate.run_all_annotation_backfills( + session=session, + do_skip_already_backfilled=False, + ) + + with session_factory() as session: + assert ( + service.list_annotations(session=session, id=run_b.id)[key] + == "pipeline-b" + ) + + # --- Phase 1 tests (backfill_pipeline_names_from_extra_data) --- + + def test_backfill_extra_data_skips_none_extra_data( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_backfill_extra_data_skips_missing_key( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {} + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_backfill_extra_data_inserts_empty_name( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {"pipeline_name": ""} + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "" + + def test_backfill_extra_data_skips_null_value( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {"pipeline_name": None} + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + # --- Overflow truncation tests --- + + def test_backfill_extra_data_long_name_truncated( + self, + mysql_varchar_limit_session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=mysql_varchar_limit_session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation( + session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key + ) + assert ( + _count_annotations( + session_factory=mysql_varchar_limit_session_factory, key=key + ) + == 0 + ) + + long_name = "x" * 300 + with mysql_varchar_limit_session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {"pipeline_name": long_name} + session.commit() + + with mysql_varchar_limit_session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with mysql_varchar_limit_session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "x" * bts._STR_MAX_LENGTH + + def test_backfill_extra_data_exact_255_preserved( + self, + mysql_varchar_limit_session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=mysql_varchar_limit_session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation( + session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key + ) + assert ( + _count_annotations( + session_factory=mysql_varchar_limit_session_factory, key=key + ) + == 0 + ) + + exact_name = "x" * bts._STR_MAX_LENGTH + with mysql_varchar_limit_session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {"pipeline_name": exact_name} + session.commit() + + with mysql_varchar_limit_session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with mysql_varchar_limit_session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == exact_name + + def test_backfill_component_spec_long_name_truncated( + self, + mysql_varchar_limit_session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=mysql_varchar_limit_session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation( + session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key + ) + assert ( + _count_annotations( + session_factory=mysql_varchar_limit_session_factory, key=key + ) + == 0 + ) + + long_name = "y" * 300 + with mysql_varchar_limit_session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=mysql_varchar_limit_session_factory, + run_id=run.id, + task_spec_dict={ + "componentRef": {"spec": {"name": long_name}}, + }, + ) + + with mysql_varchar_limit_session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with mysql_varchar_limit_session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "y" * bts._STR_MAX_LENGTH + + def test_backfill_component_spec_exact_255_preserved( + self, + mysql_varchar_limit_session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=mysql_varchar_limit_session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation( + session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key + ) + assert ( + _count_annotations( + session_factory=mysql_varchar_limit_session_factory, key=key + ) + == 0 + ) + + exact_name = "y" * bts._STR_MAX_LENGTH + with mysql_varchar_limit_session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=mysql_varchar_limit_session_factory, + run_id=run.id, + task_spec_dict={ + "componentRef": {"spec": {"name": exact_name}}, + }, + ) + + with mysql_varchar_limit_session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with mysql_varchar_limit_session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == exact_name + + # --- Component spec depth tests --- + + def test_component_spec_depth0_execution_node_missing( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) + session.delete(exec_node) + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_component_spec_depth1_task_spec_null( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict=None, + ) + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_component_spec_depth1_task_spec_empty( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={}, + ) + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_component_spec_depth2_component_ref_missing( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={"other_key": "value"}, + ) + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_component_spec_depth2_component_ref_null( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={"componentRef": None}, + ) + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_component_spec_depth3_spec_null( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec( + session_factory=session_factory, + run_id=run.id, + task_spec=structures.TaskSpec( + component_ref=structures.ComponentReference( + name="placeholder", + spec=None, + ) + ), + ) + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_component_spec_depth4_name_null( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec( + session_factory=session_factory, + run_id=run.id, + task_spec=structures.TaskSpec( + component_ref=structures.ComponentReference( + spec=structures.ComponentSpec(name=None) + ) + ), + ) + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_component_spec_depth4_name_empty_string( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={"componentRef": {"spec": {"name": ""}}}, + ) + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "" + + +class TestCreatedByBackfill: + def test_backfill_populates_created_by( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="alice", + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "alice" + + def test_backfill_skips_when_key_already_exists( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Once any CREATED_BY annotation exists, run_all skips the backfill.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + + _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="alice", + ) + + run_bob = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="bob", + ) + _delete_annotation(session_factory=session_factory, run_id=run_bob.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_bob.id) + + with session_factory() as session: + database_migrate.run_all_annotation_backfills( + session=session, + do_skip_already_backfilled=True, + ) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_bob.id) + + def test_force_backfill_fills_missing_created_by( + self, + session_factory: orm.sessionmaker, + ) -> None: + """do_skip_already_backfilled=False bypasses skip guard and fills missing rows.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + + _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="alice", + ) + + run_bob = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="bob", + ) + _delete_annotation(session_factory=session_factory, run_id=run_bob.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_bob.id) + + with session_factory() as session: + database_migrate.run_all_annotation_backfills( + session=session, + do_skip_already_backfilled=False, + ) + + with session_factory() as session: + assert ( + service.list_annotations(session=session, id=run_bob.id)[key] == "bob" + ) + + +class TestBackfillIdempotency: + """Verify each backfill function is safe to call multiple times.""" + + def test_created_by_idempotent( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + for name in ["alice", "bob", "carol"]: + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by=name, + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + count1 = database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + assert count1 == 3 + assert _count_annotations(session_factory=session_factory, key=key) == 3 + + with session_factory() as session: + count2 = database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + assert count2 == 0 + assert _count_annotations(session_factory=session_factory, key=key) == 3 + + def test_names_extra_data_idempotent( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + for _ in range(3): + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="pipe"), + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict=None, + ) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + count1 = database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + assert count1 == 3 + assert _count_annotations(session_factory=session_factory, key=key) == 3 + + with session_factory() as session: + count2 = database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + assert count2 == 0 + assert _count_annotations(session_factory=session_factory, key=key) == 3 + + def test_names_component_spec_idempotent( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + for _ in range(3): + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="pipe"), + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + count1 = database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + assert count1 == 3 + assert _count_annotations(session_factory=session_factory, key=key) == 3 + + with session_factory() as session: + count2 = database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + assert count2 == 0 + assert _count_annotations(session_factory=session_factory, key=key) == 3 + + +class TestBackfillOrderIndependence: + """Verify pipeline name backfills produce the same result regardless of call order.""" + + def _setup_two_runs( + self, + *, + session_factory: orm.sessionmaker, + ) -> tuple[str, str]: + """Create run_a (extra_data=NULL, comp_spec="comp-a") and + run_b (extra_data="extra-b", comp_spec=NULL task_spec).""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run_a = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="comp-a"), + ) + _delete_annotation(session_factory=session_factory, run_id=run_a.id, key=key) + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run_a.id) + db_run.extra_data = None + session.commit() + + run_b = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="extra-b"), + ) + _delete_annotation(session_factory=session_factory, run_id=run_b.id, key=key) + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run_b.id, + task_spec_dict=None, + ) + + assert _count_annotations(session_factory=session_factory, key=key) == 0 + return run_a.id, run_b.id + + def test_extra_data_first_then_component_spec( + self, + session_factory: orm.sessionmaker, + ) -> None: + run_a_id, run_b_id = self._setup_two_runs(session_factory=session_factory) + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + assert ( + service.list_annotations(session=session, id=run_a_id)[key] == "comp-a" + ) + assert ( + service.list_annotations(session=session, id=run_b_id)[key] == "extra-b" + ) + + def test_component_spec_first_then_extra_data( + self, + session_factory: orm.sessionmaker, + ) -> None: + run_a_id, run_b_id = self._setup_two_runs(session_factory=session_factory) + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + assert ( + service.list_annotations(session=session, id=run_a_id)[key] == "comp-a" + ) + assert ( + service.list_annotations(session=session, id=run_b_id)[key] == "extra-b" + ) + + def test_order_with_both_sources_extra_data_first( + self, + session_factory: orm.sessionmaker, + ) -> None: + """When a run has both sources, whichever runs first wins.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="cs-name"), + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {"pipeline_name": "ed-name"} + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + assert ( + service.list_annotations(session=session, id=run.id)[key] == "ed-name" + ) + + def test_order_with_both_sources_component_spec_first( + self, + session_factory: orm.sessionmaker, + ) -> None: + """When a run has both sources, whichever runs first wins.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(pipeline_name="cs-name"), + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {"pipeline_name": "ed-name"} + session.commit() + + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_component_spec( + session=session, + auto_commit=True, + ) + with session_factory() as session: + database_migrate.backfill_pipeline_names_from_extra_data( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + assert ( + service.list_annotations(session=session, id=run.id)[key] == "cs-name" + ) + + +class TestBackfillDataParity: + """Verify backfill creates annotations for NULL/empty source values.""" + + def test_created_by_null_gets_empty_annotation( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "" + + def test_created_by_empty_gets_empty_annotation( + self, + session_factory: orm.sessionmaker, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="", + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "" + + def test_all_backfills_data_parity( + self, + session_factory: orm.sessionmaker, + ) -> None: + """After backfill, every run has a created_by annotation (data parity).""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + + for created_by in ["alice", None, "", "bob"]: + kwargs = {"root_task": _make_task_spec()} + if created_by is not None: + kwargs["created_by"] = created_by + run = _create_run( + session_factory=session_factory, service=service, **kwargs + ) + _delete_annotation( + session_factory=session_factory, + run_id=run.id, + key=key, + ) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + with session_factory() as session: + database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + + with session_factory() as session: + total_runs = session.query(bts.PipelineRun).count() + annotation_count = _count_annotations( + session_factory=session_factory, + key=key, + ) + assert annotation_count == total_runs + + +class TestBackfillErrorHandling: + """Verify error behavior: functions raise, orchestrator catches.""" + + def test_run_all_backfills_logs_exception_on_error( + self, + session_factory: orm.sessionmaker, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, + ) -> None: + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="alice", + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + monkeypatch.setattr( + database_migrate, + "backfill_created_by_annotations", + mock.Mock(side_effect=RuntimeError("boom")), + ) + with caplog.at_level( + logging.ERROR, logger="cloud_pipelines_backend.database_migrate" + ): + with session_factory() as session: + database_migrate.run_all_annotation_backfills( + session=session, + do_skip_already_backfilled=True, + ) + assert "Annotation backfill failed" in caplog.text + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + def test_backfill_function_propagates_error( + self, + session_factory: orm.sessionmaker, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Individual backfill functions DO raise (no internal try-catch).""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="alice", + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + original_execute = orm.Session.execute + + def _raising_execute( + self_session: orm.Session, + statement: Any, + *args: Any, + **kwargs: Any, + ) -> Any: + stmt_str = str(statement) + if "INSERT" in stmt_str and "pipeline_run_annotation" in stmt_str: + raise RuntimeError("simulated execute error") + return original_execute(self_session, statement, *args, **kwargs) + + monkeypatch.setattr(orm.Session, "execute", _raising_execute) + + with session_factory() as session: + with pytest.raises(RuntimeError, match="simulated execute error"): + database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + def test_backfill_error_does_not_commit( + self, + session_factory: orm.sessionmaker, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """If execute raises, commit is never called and no rows are inserted.""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + run = _create_run( + session_factory=session_factory, + service=service, + root_task=_make_task_spec(), + created_by="alice", + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + assert _count_annotations(session_factory=session_factory, key=key) == 0 + + commit_called = False + original_execute = orm.Session.execute + original_commit = orm.Session.commit + + def _raising_execute( + self_session: orm.Session, + statement: Any, + *args: Any, + **kwargs: Any, + ) -> Any: + stmt_str = str(statement) + if "INSERT" in stmt_str and "pipeline_run_annotation" in stmt_str: + raise RuntimeError("simulated execute error") + return original_execute(self_session, statement, *args, **kwargs) + + def _tracking_commit( + self_session: orm.Session, + ) -> None: + nonlocal commit_called + commit_called = True + return original_commit(self_session) + + monkeypatch.setattr(orm.Session, "execute", _raising_execute) + monkeypatch.setattr(orm.Session, "commit", _tracking_commit) + + with session_factory() as session: + with pytest.raises(RuntimeError, match="simulated execute error"): + database_migrate.backfill_created_by_annotations( + session=session, + auto_commit=True, + ) + + assert not commit_called + assert _count_annotations(session_factory=session_factory, key=key) == 0 diff --git a/tests/test_database_ops.py b/tests/test_database_ops.py index df91495..32b9bd3 100644 --- a/tests/test_database_ops.py +++ b/tests/test_database_ops.py @@ -1,119 +1,9 @@ -"""Tests for database_ops: backfill and annotation helpers. +"""Tests for database_ops: migrate_db and index creation.""" -Pipeline Name Resolution Path -============================== - -Phase 1 (bulk SQL -- extra_data path): - pipeline_run.extra_data -> ["pipeline_name"] -> value - | | | - +-- None +-- key missing +-- "" - v v v - SQL NULL (safe) SQL NULL (safe) filtered by != "" - -Phase 2 (bulk SQL -- component_spec JSON path): - execution_node.task_spec -> 'componentRef' -> 'spec' ->> 'name' - | | | | - +-- NULL +-- key missing +-- null +-- null - v v v v - SQL NULL (safe) SQL NULL (safe) SQL NULL SQL NULL -""" - -import pytest import sqlalchemy -from sqlalchemy import orm -from typing import Any -from cloud_pipelines_backend import api_server_sql from cloud_pipelines_backend import backend_types_sql as bts -from cloud_pipelines_backend import component_structures as structures from cloud_pipelines_backend import database_ops -from cloud_pipelines_backend import filter_query_sql - - -def _make_task_spec( - *, - pipeline_name: str = "test-pipeline", -) -> structures.TaskSpec: - return structures.TaskSpec( - component_ref=structures.ComponentReference( - spec=structures.ComponentSpec( - name=pipeline_name, - implementation=structures.ContainerImplementation( - container=structures.ContainerSpec(image="test-image") - ), - ) - ) - ) - - -def _set_execution_node_task_spec( - *, - session_factory: orm.sessionmaker, - run_id: str, - task_spec: structures.TaskSpec, -) -> None: - """Replace the execution_node's task_spec JSON with the given TaskSpec. - - Use to test Phase 2 fallback paths where spec is None or name is None, - since the service's create() requires a valid spec to run. - """ - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run_id) - exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) - exec_node.task_spec = task_spec.to_json_dict() - session.commit() - - -def _set_execution_node_task_spec_raw( - *, - session_factory: orm.sessionmaker, - run_id: str, - task_spec_dict: dict[str, Any] | None, -) -> None: - """Set task_spec to an arbitrary dict (or None) bypassing Pydantic. - - Use to test JSON paths that Pydantic's TaskSpec cannot represent - (e.g. empty dict, missing componentRef, null task_spec). - """ - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run_id) - exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) - exec_node.task_spec = task_spec_dict - session.commit() - - -@pytest.fixture() -def session_factory() -> orm.sessionmaker: - db_engine = database_ops.create_db_engine(database_uri="sqlite://") - bts._TableBase.metadata.create_all(db_engine) - return orm.sessionmaker(db_engine) - - -def _create_run( - session_factory: orm.sessionmaker, - service: api_server_sql.PipelineRunsApiService_Sql, - **kwargs, -) -> api_server_sql.PipelineRunResponse: - """Create a pipeline run using a fresh session (mirrors production per-request sessions).""" - with session_factory() as session: - return service.create(session, **kwargs) - - -def _delete_annotation( - *, - session_factory: orm.sessionmaker, - run_id: str, - key: filter_query_sql.PipelineRunAnnotationSystemKey, -) -> None: - """Remove a write-path annotation so backfill can be tested in isolation.""" - with session_factory() as session: - session.execute( - sqlalchemy.delete(bts.PipelineRunAnnotation).where( - bts.PipelineRunAnnotation.pipeline_run_id == run_id, - bts.PipelineRunAnnotation.key == key, - ) - ) - session.commit() def _get_index_names( @@ -128,8 +18,7 @@ def _get_index_names( class TestMigrateDb: """Verify migrate_db creates indexes on a pre-existing DB missing them.""" - @pytest.fixture() - def bare_engine(self) -> sqlalchemy.Engine: + def _bare_engine(self) -> sqlalchemy.Engine: """Create tables then drop all indexes so migrate_db can recreate them.""" db_engine = database_ops.create_db_engine(database_uri="sqlite://") bts._TableBase.metadata.create_all(db_engine) @@ -143,10 +32,8 @@ def bare_engine(self) -> sqlalchemy.Engine: conn.commit() return db_engine - def test_creates_execution_node_cache_key_index( - self, - bare_engine: sqlalchemy.Engine, - ) -> None: + def test_creates_execution_node_cache_key_index(self) -> None: + bare_engine = self._bare_engine() assert bts.ExecutionNode._IX_EXECUTION_NODE_CACHE_KEY not in _get_index_names( engine=bare_engine, table_name="execution_node", @@ -157,10 +44,8 @@ def test_creates_execution_node_cache_key_index( table_name="execution_node", ) - def test_creates_annotation_run_id_key_value_index( - self, - bare_engine: sqlalchemy.Engine, - ) -> None: + def test_creates_annotation_run_id_key_value_index(self) -> None: + bare_engine = self._bare_engine() assert ( bts.PipelineRunAnnotation._IX_ANNOTATION_RUN_ID_KEY_VALUE not in _get_index_names( @@ -175,10 +60,8 @@ def test_creates_annotation_run_id_key_value_index( ) ) - def test_idempotent( - self, - bare_engine: sqlalchemy.Engine, - ) -> None: + def test_idempotent(self) -> None: + bare_engine = self._bare_engine() database_ops.migrate_db(db_engine=bare_engine) indexes_after_first = { "execution_node": _get_index_names( @@ -200,929 +83,3 @@ def test_idempotent( ), } assert indexes_after_first == indexes_after_second - - -class TestIsAnnotationKeyAlreadyBackfilled: - def test_false_on_empty_db( - self, - session_factory: orm.sessionmaker, - ) -> None: - with session_factory() as session: - assert ( - database_ops._is_pipeline_run_annotation_key_already_backfilled( - session=session, - key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, - ) - is False - ) - - def test_false_with_unrelated_annotation( - self, - session_factory: orm.sessionmaker, - ) -> None: - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run(session_factory, service, root_task=_make_task_spec()) - with session_factory() as session: - service.set_annotation(session=session, id=run.id, key="team", value="ml") - with session_factory() as session: - assert ( - database_ops._is_pipeline_run_annotation_key_already_backfilled( - session=session, - key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, - ) - is False - ) - - def test_true_when_key_exists( - self, - session_factory: orm.sessionmaker, - ) -> None: - service = api_server_sql.PipelineRunsApiService_Sql() - _create_run( - session_factory, - service, - root_task=_make_task_spec(), - created_by="alice", - ) - with session_factory() as session: - assert ( - database_ops._is_pipeline_run_annotation_key_already_backfilled( - session=session, - key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, - ) - is True - ) - - def test_matches_exact_key( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Only returns True for the exact key queried, not other keys.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run(session_factory, service, root_task=_make_task_spec()) - with session_factory() as session: - service.set_annotation(session=session, id=run.id, key="team", value="ml") - with session_factory() as session: - assert ( - database_ops._is_pipeline_run_annotation_key_already_backfilled( - session=session, - key="team", - ) - is True - ) - assert ( - database_ops._is_pipeline_run_annotation_key_already_backfilled( - session=session, - key="other_key", - ) - is False - ) - - -class TestPipelineNameBackfill: - """Integration tests for pipeline name backfill (Phase 1 + Phase 2).""" - - # --- Orchestration tests (full backfill flow) --- - - def test_backfill_populates_name(self, session_factory): - """P1 happy path: extra_data -> ["pipeline_name"] -> [value] OK""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="my-pipeline"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - engine = session_factory.kw["bind"] - database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "my-pipeline" - - def test_backfill_populates_name_via_both_phases(self, session_factory): - """Orchestrator resolves run_a via P1 (extra_data) and run_b via P2 (component_spec).""" - service = api_server_sql.PipelineRunsApiService_Sql() - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - - run_a = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="from-extra-data"), - ) - _delete_annotation(session_factory=session_factory, run_id=run_a.id, key=key) - - run_b = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="from-component-spec"), - ) - _delete_annotation(session_factory=session_factory, run_id=run_b.id, key=key) - with session_factory() as session: - db_run_b = session.get(bts.PipelineRun, run_b.id) - db_run_b.extra_data = None - session.commit() - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run_a.id) - assert key not in service.list_annotations(session=session, id=run_b.id) - - engine = session_factory.kw["bind"] - database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) - - with session_factory() as session: - assert ( - service.list_annotations(session=session, id=run_a.id)[key] - == "from-extra-data" - ) - assert ( - service.list_annotations(session=session, id=run_b.id)[key] - == "from-component-spec" - ) - - def test_backfill_phase2_skips_phase1_insert_within_transaction( - self, session_factory - ): - """Phase 2's anti-join sees Phase 1's insert within the same transaction. - - A run that both phases could match (has extra_data AND component_spec - name). Phase 1 inserts the annotation; Phase 2 must skip it via the - anti-join, not insert a duplicate. Proves execute() writes are visible - within the shared transaction buffer. - """ - service = api_server_sql.PipelineRunsApiService_Sql() - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="shared-name"), - ) - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - engine = session_factory.kw["bind"] - database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "shared-name" - - # Verify exactly one annotation row — no duplicate from Phase 2. - with session_factory() as session: - count = ( - session.query(bts.PipelineRunAnnotation) - .filter( - bts.PipelineRunAnnotation.pipeline_run_id == run.id, - bts.PipelineRunAnnotation.key == key, - ) - .count() - ) - assert count == 1 - - def test_backfill_skips_when_key_already_exists(self, session_factory): - """Once any NAME annotation exists, subsequent backfill calls are no-ops.""" - service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - - run_a = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="pipeline-a"), - ) - _delete_annotation(session_factory=session_factory, run_id=run_a.id, key=key) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run_a.id) - - database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) - - with session_factory() as session: - assert ( - service.list_annotations(session=session, id=run_a.id)[key] - == "pipeline-a" - ) - - run_b = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="pipeline-b"), - ) - _delete_annotation(session_factory=session_factory, run_id=run_b.id, key=key) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run_b.id) - - # Backfill is a no-op because run_a's annotation already exists - database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run_b.id) - - # --- Phase 1 tests (_backfill_pipeline_names_from_extra_data) --- - - def test_backfill_phase1_skips_none_extra_data(self, session_factory): - """P1 null point: [extra_data=None] FAIL""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_extra_data(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_backfill_phase1_skips_missing_key(self, session_factory): - """P1 null point: extra_data -> [key missing] FAIL""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = {} - session.commit() - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_extra_data(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_backfill_phase1_inserts_empty_name(self, session_factory): - """P1 valid: extra_data -> ["pipeline_name"] -> [""] - Passes: empty string is a valid name, annotation inserted with value="".""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = {"pipeline_name": ""} - session.commit() - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_extra_data(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "" - - # --- Overflow truncation tests --- - # Uses mysql_varchar_limit_session_factory with SQLite TRIGGER enforcement. - # Verifies that backfill truncates >255 char values to 255, - # preventing MySQL DataError 1406. - - def test_backfill_p1_long_name_truncated( - self, - mysql_varchar_limit_session_factory: orm.sessionmaker, - ) -> None: - """P1: a 300-char pipeline_name in extra_data is truncated to 255.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - mysql_varchar_limit_session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation( - session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key - ) - with mysql_varchar_limit_session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - long_name = "x" * 300 - with mysql_varchar_limit_session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = {"pipeline_name": long_name} - session.commit() - - with mysql_varchar_limit_session_factory() as session: - database_ops._backfill_pipeline_names_from_extra_data(session=session) - session.commit() - - with mysql_varchar_limit_session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "x" * bts._STR_MAX_LENGTH - - def test_backfill_p1_exact_255_preserved( - self, - mysql_varchar_limit_session_factory: orm.sessionmaker, - ) -> None: - """P1: a 255-char pipeline_name is stored without truncation.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - mysql_varchar_limit_session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation( - session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key - ) - with mysql_varchar_limit_session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - exact_name = "x" * bts._STR_MAX_LENGTH - with mysql_varchar_limit_session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = {"pipeline_name": exact_name} - session.commit() - - with mysql_varchar_limit_session_factory() as session: - database_ops._backfill_pipeline_names_from_extra_data(session=session) - session.commit() - - with mysql_varchar_limit_session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == exact_name - - def test_backfill_p2_long_name_truncated( - self, - mysql_varchar_limit_session_factory: orm.sessionmaker, - ) -> None: - """P2: a 300-char name in task_spec is truncated to 255.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - mysql_varchar_limit_session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation( - session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key - ) - with mysql_varchar_limit_session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - long_name = "y" * 300 - with mysql_varchar_limit_session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec_raw( - session_factory=mysql_varchar_limit_session_factory, - run_id=run.id, - task_spec_dict={ - "componentRef": {"spec": {"name": long_name}}, - }, - ) - - with mysql_varchar_limit_session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec( - session=session, - ) - session.commit() - - with mysql_varchar_limit_session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "y" * bts._STR_MAX_LENGTH - - def test_backfill_p2_exact_255_preserved( - self, - mysql_varchar_limit_session_factory: orm.sessionmaker, - ) -> None: - """P2: a 255-char name in task_spec is stored without truncation.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - mysql_varchar_limit_session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation( - session_factory=mysql_varchar_limit_session_factory, run_id=run.id, key=key - ) - with mysql_varchar_limit_session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - exact_name = "y" * bts._STR_MAX_LENGTH - with mysql_varchar_limit_session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec_raw( - session_factory=mysql_varchar_limit_session_factory, - run_id=run.id, - task_spec_dict={ - "componentRef": {"spec": {"name": exact_name}}, - }, - ) - - with mysql_varchar_limit_session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec( - session=session, - ) - session.commit() - - with mysql_varchar_limit_session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == exact_name - - # --- Phase 2 tests (_backfill_pipeline_names_from_component_spec) --- - # Ordered by JSON traversal depth (0 -> 4). - # Path: ExecutionNode row -> task_spec -> componentRef -> spec -> name - - def test_p2_depth0_execution_node_missing( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: [ExecutionNode row missing] -> task_spec -> componentRef -> spec -> name - Fails at: INNER JOIN finds no execution_node, row excluded.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) - session.delete(exec_node) - session.commit() - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_p2_depth1_task_spec_null( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec=[NULL] -> componentRef -> spec -> name - Fails at: task_spec column is NULL, JSON extraction returns NULL.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec_raw( - session_factory=session_factory, - run_id=run.id, - task_spec_dict=None, - ) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_p2_depth1_task_spec_empty( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec={} -> [componentRef missing] -> spec -> name - Fails at: task_spec is empty dict, 'componentRef' key absent.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec_raw( - session_factory=session_factory, - run_id=run.id, - task_spec_dict={}, - ) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_p2_depth2_component_ref_missing( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec -> [componentRef missing] -> spec -> name - Fails at: 'componentRef' key absent from task_spec.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec_raw( - session_factory=session_factory, - run_id=run.id, - task_spec_dict={"other_key": "value"}, - ) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_p2_depth2_component_ref_null( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec -> [componentRef=null] -> spec -> name - Fails at: componentRef is null, JSON extraction returns NULL.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec_raw( - session_factory=session_factory, - run_id=run.id, - task_spec_dict={"componentRef": None}, - ) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_p2_depth3_spec_null( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec -> componentRef -> [spec=null] -> name - Fails at: spec is null, JSON extraction returns NULL.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec( - session_factory=session_factory, - run_id=run.id, - task_spec=structures.TaskSpec( - component_ref=structures.ComponentReference( - name="placeholder", - spec=None, - ) - ), - ) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_p2_depth4_name_null( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec -> componentRef -> spec -> [name=null] - Fails at: name is null, JSON extraction returns NULL.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec( - session_factory=session_factory, - run_id=run.id, - task_spec=structures.TaskSpec( - component_ref=structures.ComponentReference( - spec=structures.ComponentSpec( - name=None, - ) - ) - ), - ) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_p2_depth4_name_empty_string( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec -> componentRef -> spec -> [name=""] - Passes: empty string is a valid name, annotation inserted with value="".""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="some-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - _set_execution_node_task_spec_raw( - session_factory=session_factory, - run_id=run.id, - task_spec_dict={ - "componentRef": {"spec": {"name": ""}}, - }, - ) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "" - - def test_p2_depth4_name_valid( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Path: task_spec -> componentRef -> spec -> [name="fallback-name"] - Passes: valid name extracted, annotation inserted.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(pipeline_name="fallback-name"), - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - db_run = session.get(bts.PipelineRun, run.id) - db_run.extra_data = None - session.commit() - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - with session_factory() as session: - database_ops._backfill_pipeline_names_from_component_spec(session=session) - session.commit() - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "fallback-name" - - -class TestCreatedByBackfill: - def test_backfill_populates_created_by( - self, - session_factory: orm.sessionmaker, - ) -> None: - """The INSERT path produces the correct annotation value.""" - service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(), - created_by="alice", - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "alice" - - def test_backfill_skips_null_created_by(self, session_factory): - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run(session_factory, service, root_task=_make_task_spec()) - engine = session_factory.kw["bind"] - - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - not in annotations - ) - - def test_backfill_skips_empty_created_by(self, session_factory): - """Runs with created_by='' are not backfilled.""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(), - created_by="", - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - engine = session_factory.kw["bind"] - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - - with session_factory() as session: - annotations = service.list_annotations(session=session, id=run.id) - assert key not in annotations - - def test_backfill_skips_when_key_already_exists(self, session_factory): - """Once any CREATED_BY annotation exists, subsequent backfill calls are no-ops.""" - service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - - run_alice = _create_run( - session_factory, - service, - root_task=_make_task_spec(), - created_by="alice", - ) - _delete_annotation( - session_factory=session_factory, run_id=run_alice.id, key=key - ) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run_alice.id) - - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - - with session_factory() as session: - assert ( - service.list_annotations(session=session, id=run_alice.id)[key] - == "alice" - ) - - run_bob = _create_run( - session_factory, - service, - root_task=_make_task_spec(), - created_by="bob", - ) - _delete_annotation(session_factory=session_factory, run_id=run_bob.id, key=key) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run_bob.id) - - # Backfill is a no-op because run_alice's annotation already exists - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - - with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run_bob.id) - - def test_backfill_uses_single_session( - self, - session_factory: orm.sessionmaker, - monkeypatch: pytest.MonkeyPatch, - ) -> None: - """Check and insert happen in the same session (single transaction).""" - service = api_server_sql.PipelineRunsApiService_Sql() - run = _create_run( - session_factory, - service, - root_task=_make_task_spec(), - created_by="alice", - ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - with session_factory() as session: - session.query(bts.PipelineRunAnnotation).filter_by( - pipeline_run_id=run.id, - key=key, - ).delete() - session.commit() - - engine = session_factory.kw["bind"] - session_count = 0 - _original_session_init = orm.Session.__init__ - - def _counting_init( - self: Any, - *args: Any, - **kwargs: Any, - ) -> None: - nonlocal session_count - session_count += 1 - _original_session_init(self, *args, **kwargs) - - monkeypatch.setattr(orm.Session, "__init__", _counting_init) - - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - - assert session_count == 1