Skip to content

fix: Robust and Idempotent Backfill for Search Pipeline Run API#166

Draft
yuechao-qin wants to merge 1 commit intomasterfrom
ycq/fix-backfill
Draft

fix: Robust and Idempotent Backfill for Search Pipeline Run API#166
yuechao-qin wants to merge 1 commit intomasterfrom
ycq/fix-backfill

Conversation

@yuechao-qin
Copy link
Collaborator

@yuechao-qin yuechao-qin commented Mar 13, 2026

TL;DR

Refactored database migration logic to ensure data parity by creating system annotations for all pipeline runs, even when source values are null or empty, and moved backfill functions to a dedicated module with comprehensive error handling.

What changed?

  • System annotation mirroring now ensures data parity: The _mirror_system_annotations function always creates annotation rows for created_by and pipeline_name, storing empty string "" when source values are null/empty, with warning logs for null cases
  • New dedicated migration module: Created database_migrate.py containing three idempotent backfill functions:
    • backfill_created_by_annotations: Uses COALESCE to handle null values
    • backfill_pipeline_names_from_extra_data: Extracts from JSON with null filtering
    • backfill_pipeline_names_from_component_spec: Extracts from nested JSON path with anti-join logic
  • Robust orchestration: run_all_annotation_backfills wraps all backfills in try-catch with configurable skip guards and single transaction commit
  • Enhanced test coverage: Added 1600+ lines of comprehensive tests covering idempotency, order independence, data parity, error handling, and edge cases
  • Updated existing tests: Modified assertions to expect empty string annotations instead of missing keys

How to test?

Run the existing test suite - the new test_database_migrate.py provides extensive coverage including:

  • Basic backfill functionality for both sources
  • Idempotency verification (safe to run multiple times)
  • Order independence between pipeline name sources
  • Data parity validation (every run gets annotations)
  • Error handling and transaction rollback scenarios
  • Truncation behavior for long values

Why make this change?

  • Data consistency: Ensures every pipeline run has system annotations for reliable filtering/querying, eliminating gaps where some runs lack annotation rows
  • Improved maintainability: Separates migration logic from general database operations with better organization and comprehensive documentation
  • Production reliability: Adds proper error handling so migration failures don't block application startup, with detailed logging for debugging
  • Database portability: Uses SQLAlchemy abstractions for cross-database compatibility (SQLite, MySQL, PostgreSQL)

Copy link
Collaborator Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

service = api_server_sql.PipelineRunsApiService_Sql()
key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME

run_a = _create_run(
service = api_server_sql.PipelineRunsApiService_Sql()
key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME

run_a = _create_run(
Comment on lines +534 to +536
# TODO: Do we need a final catchall backfill that inserts empty string
# for all pipeline names, which happens to not have a name in
# component_spec nor extra_data?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? A catchall backfill for pipeline name if 1) extra_data and 2) component_spec_name doesn't exist/had issues?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update backfill from component spec, that if name does not exist to backfill with empty string.

session: orm.Session,
key: str,
) -> bool:
"""Return True if at least one annotation with the given key exists."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not safe anymore.
Due to changes in API Server, pipeline names are already being inserted into the DB (while bulk inserts weren't added).

The best way to check whether the backfill is complete is to compare the number of rows in pipeline runs table and the number of pipeline run annotations.

)
existing_ann = orm.aliased(bts.PipelineRunAnnotation)

stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI: If we expect most annotations to be missing, we could use the INSERT ... ON CONFLICT DO NOTHING statement: https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#sqlalchemy.dialects.sqlite.Insert.on_conflict_do_nothing
But if most annotations exist, then outer join might be better.

Comment on lines +476 to +480
def run_all_annotation_backfills(
*,
session: orm.Session,
do_skip_already_backfilled: bool,
) -> None:
Copy link
Collaborator Author

@yuechao-qin yuechao-qin Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alexey: Look into different transaction modes

https://www.postgresql.org/docs/current/transaction-iso.html

For backfills let's use the most reliable mode (serializable).

For example: expectations is if there are multiple migrations, that failure shouldn't of been duplicate entry, but a transaction conflict error (i.e. notice rows being added from another migration).

Try to see if this can be simulated/reproduced locally.

@yuechao-qin yuechao-qin marked this pull request as ready for review March 14, 2026 01:29
@yuechao-qin yuechao-qin marked this pull request as draft March 14, 2026 01:30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Modules should be names as plural or uncountable nouns.
Example: database_migrations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants