Skip to content

Commit 3af562b

Browse files
authored
feat: Search within date range in pipeline run API (#130)
### TL;DR Implemented search within date range in Pipeline Runs. ### What changed? #### Functional - API `GET /api/pipeline_runs/` - Search by date range in `filter_query`. Example query (not URL encoded for example): ``` /api?filter_query={ "and": [{ "time_range": { "key": "system/pipeline_run.date.created_at", "start_time": "2024-01-01T00:00:00Z", "end_time": "2024-02-01T00:00:00Z" } }] } ``` - Valid time ranges: - Start time only - End time only - (Start + End) time #### Other - Added timezone handling to convert aware datetimes to naive UTC for database compatibility (i.e. date range search will convert API timezone to UTC because DB datetime is only UTC) ### How to test? ``` uv run pytest tests/test_api_server_sql.py tests/test_filter_query_sql.py ``` The implementation includes comprehensive tests covering: - Basic time range filtering with start and end times - Start-only time ranges (no end time) - Boundary conditions (inclusive start, exclusive end) - Timezone offset handling (e.g., `+05:30` converted to UTC) - Negation with `NOT` operator - Combination with annotation-based filters - Nested logical operations (`AND`/`OR`) - Multiple time ranges in the same query ### Why make this change? - This change enables users to filter pipeline runs by creation date range. - The implementation properly handles timezone conversions to ensure consistent behavior across different client timezones while maintaining compatibility with the database's naive UTC storage format.
1 parent 6fcc19d commit 3af562b

4 files changed

Lines changed: 863 additions & 12 deletions

File tree

cloud_pipelines_backend/filter_query_models.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,13 @@ def key(self) -> str:
9999
return self.value_equals.key
100100

101101

102-
class TimeRangePredicate(_BaseModel):
102+
class TimeRangePredicate(KeyPredicateBase):
103103
time_range: TimeRange
104104

105+
@property
106+
def key(self) -> str:
107+
return self.time_range.key
108+
105109

106110
LeafPredicate = (
107111
KeyExistsPredicate

cloud_pipelines_backend/filter_query_sql.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import base64
2+
import datetime
23
import json
34
import enum
45
from typing import Any, Final
@@ -16,6 +17,7 @@
1617
class PipelineRunAnnotationSystemKey(enum.StrEnum):
1718
CREATED_BY = f"{_PIPELINE_RUN_KEY_PREFIX}created_by"
1819
PIPELINE_NAME = f"{_PIPELINE_RUN_KEY_PREFIX}name"
20+
CREATED_AT = f"{_PIPELINE_RUN_KEY_PREFIX}date.created_at"
1921

2022

2123
SYSTEM_KEY_SUPPORTED_PREDICATES: dict[PipelineRunAnnotationSystemKey, set[type]] = {
@@ -30,6 +32,9 @@ class PipelineRunAnnotationSystemKey(enum.StrEnum):
3032
filter_query_models.ValueContainsPredicate,
3133
filter_query_models.ValueInPredicate,
3234
},
35+
PipelineRunAnnotationSystemKey.CREATED_AT: {
36+
filter_query_models.TimeRangePredicate,
37+
},
3338
}
3439

3540
# ---------------------------------------------------------------------------
@@ -301,8 +306,9 @@ def _predicate_to_clause(
301306
return _value_contains_to_clause(predicate=predicate)
302307
case filter_query_models.ValueInPredicate():
303308
return _value_in_to_clause(predicate=predicate)
309+
case filter_query_models.TimeRangePredicate():
310+
return _time_range_to_clause(predicate=predicate)
304311
case _:
305-
# TODO: TimeRangePredicate -- not supported currently, will be supported in the future.
306312
raise NotImplementedError(
307313
f"Predicate type {type(predicate).__name__} is not yet implemented."
308314
)
@@ -363,3 +369,59 @@ def _value_in_to_clause(
363369
bts.PipelineRunAnnotation.value.in_(predicate.value_in.values),
364370
],
365371
)
372+
373+
374+
# ---------------------------------------------------------------------------
375+
# Column-based predicates (bypass annotation table)
376+
# ---------------------------------------------------------------------------
377+
378+
379+
def _time_range_to_clause(
380+
*, predicate: filter_query_models.TimeRangePredicate
381+
) -> sql.ColumnElement:
382+
"""Build a WHERE clause for pipeline_run.created_at from a time range.
383+
384+
Pydantic's AwareDatetime preserves the original timezone offset, so we
385+
must normalize to naive UTC before comparing against the DB column.
386+
387+
The DB stores "naive UTC" datetimes -- the values represent UTC but carry
388+
no timezone label. For example, the DB stores '2024-01-01 02:30:00', not
389+
'2024-01-01 02:30:00+00:00'. The UtcDateTime type decorator (in
390+
backend_types_sql.py) strips tzinfo on write and re-attaches UTC on read.
391+
392+
Conversion pipeline for input '2024-01-01T08:00:00+05:30':
393+
394+
API request (JSON string)
395+
'2024-01-01T08:00:00+05:30'
396+
|
397+
v
398+
Pydantic AwareDatetime (preserves offset)
399+
datetime(2024, 1, 1, 8, 0, 0, tzinfo=+05:30)
400+
|
401+
v .astimezone(utc) -- converts 08:00 - 05:30 = 02:30
402+
UTC-aware datetime
403+
datetime(2024, 1, 1, 2, 30, 0, tzinfo=UTC)
404+
|
405+
v .replace(tzinfo=None) -- strips timezone label
406+
Naive datetime
407+
datetime(2024, 1, 1, 2, 30, 0)
408+
|
409+
v SQLAlchemy literal_binds -- adds microsecond precision
410+
SQL string
411+
'2024-01-01 02:30:00.000000' <-- matches DB storage format
412+
"""
413+
tr = predicate.time_range
414+
if tr.key != PipelineRunAnnotationSystemKey.CREATED_AT:
415+
raise errors.ApiValidationError(
416+
"time_range only supports key "
417+
f"{PipelineRunAnnotationSystemKey.CREATED_AT!r}, got {tr.key!r}"
418+
)
419+
# Convert aware datetimes to naive UTC to match DB storage format.
420+
clauses: list[sql.ColumnElement] = []
421+
if tr.start_time is not None:
422+
start_utc = tr.start_time.astimezone(datetime.timezone.utc).replace(tzinfo=None)
423+
clauses.append(bts.PipelineRun.created_at >= start_utc)
424+
if tr.end_time is not None:
425+
end_utc = tr.end_time.astimezone(datetime.timezone.utc).replace(tzinfo=None)
426+
clauses.append(bts.PipelineRun.created_at < end_utc)
427+
return sql.and_(*clauses)

0 commit comments

Comments
 (0)