11import base64
22import json
3+ import enum
34from typing import Any , Final
45
56import sqlalchemy as sql
89from . import errors
910from . import filter_query_models
1011
12+ SYSTEM_KEY_PREFIX : Final [str ] = "system/"
13+
14+
15+ class PipelineRunAnnotationSystemKey (enum .StrEnum ):
16+ CREATED_BY = f"{ SYSTEM_KEY_PREFIX } pipeline_run.created_by"
17+
18+
19+ SYSTEM_KEY_SUPPORTED_PREDICATES : dict [PipelineRunAnnotationSystemKey , set [type ]] = {
20+ PipelineRunAnnotationSystemKey .CREATED_BY : {
21+ filter_query_models .KeyExistsPredicate ,
22+ filter_query_models .ValueEqualsPredicate ,
23+ filter_query_models .ValueInPredicate ,
24+ },
25+ }
26+
1127# ---------------------------------------------------------------------------
1228# Page-token helpers
1329# ---------------------------------------------------------------------------
@@ -44,6 +60,78 @@ def _resolve_filter_value(
4460 return filter , filter_query , offset
4561
4662
63+ # ---------------------------------------------------------------------------
64+ # PipelineRunAnnotationSystemKey validation and resolution
65+ # ---------------------------------------------------------------------------
66+
67+
68+ def _check_predicate_allowed (* , predicate : filter_query_models .Predicate ) -> None :
69+ """Raise if a system key is used with an unsupported predicate type."""
70+ if not isinstance (predicate , filter_query_models .KeyPredicateBase ):
71+ return
72+ key = predicate .key
73+
74+ try :
75+ system_key = PipelineRunAnnotationSystemKey (key )
76+ except ValueError :
77+ return
78+
79+ supported = SYSTEM_KEY_SUPPORTED_PREDICATES .get (system_key , set ())
80+ if type (predicate ) not in supported :
81+ raise errors .ApiValidationError (
82+ f"Predicate { type (predicate ).__name__ } is not supported "
83+ f"for system key { system_key !r} . "
84+ f"Supported: { [t .__name__ for t in supported ]} "
85+ )
86+
87+
88+ def _resolve_system_key_value (
89+ * ,
90+ key : str ,
91+ value : str ,
92+ current_user : str | None ,
93+ ) -> str :
94+ """Resolve special placeholder values for system keys."""
95+ if key == PipelineRunAnnotationSystemKey .CREATED_BY and value == "me" :
96+ return current_user if current_user is not None else ""
97+ return value
98+
99+
100+ def _maybe_resolve_system_values (
101+ * ,
102+ predicate : filter_query_models .ValueEqualsPredicate ,
103+ current_user : str | None ,
104+ ) -> filter_query_models .ValueEqualsPredicate :
105+ """Resolve special values in a ValueEqualsPredicate."""
106+ key = predicate .value_equals .key
107+ value = predicate .value_equals .value
108+ resolved = _resolve_system_key_value (
109+ key = key ,
110+ value = value ,
111+ current_user = current_user ,
112+ )
113+ if resolved != value :
114+ return filter_query_models .ValueEqualsPredicate (
115+ value_equals = filter_query_models .ValueEquals (key = key , value = resolved )
116+ )
117+ return predicate
118+
119+
120+ def _validate_and_resolve_predicate (
121+ * ,
122+ predicate : filter_query_models .Predicate ,
123+ current_user : str | None ,
124+ ) -> filter_query_models .Predicate :
125+ """Validate system key support, then resolve special values."""
126+ _check_predicate_allowed (predicate = predicate )
127+ if isinstance (predicate , filter_query_models .ValueEqualsPredicate ):
128+ return _maybe_resolve_system_values (
129+ predicate = predicate ,
130+ current_user = current_user ,
131+ )
132+ return predicate
133+
134+
47135# ---------------------------------------------------------------------------
48136# Public API
49137# ---------------------------------------------------------------------------
@@ -79,7 +167,12 @@ def build_list_filters(
79167
80168 if filter_query_value :
81169 parsed = filter_query_models .FilterQuery .model_validate_json (filter_query_value )
82- where_clauses .append (filter_query_to_where_clause (filter_query = parsed ))
170+ where_clauses .append (
171+ filter_query_to_where_clause (
172+ filter_query = parsed ,
173+ current_user = current_user ,
174+ )
175+ )
83176
84177 next_page_token = _encode_page_token (
85178 page_token_dict = {
@@ -95,10 +188,13 @@ def build_list_filters(
95188def filter_query_to_where_clause (
96189 * ,
97190 filter_query : filter_query_models .FilterQuery ,
191+ current_user : str | None = None ,
98192) -> sql .ColumnElement :
99193 predicates = filter_query .and_ or filter_query .or_
100194 is_and = filter_query .and_ is not None
101- clauses = [_predicate_to_clause (predicate = p ) for p in predicates ]
195+ clauses = [
196+ _predicate_to_clause (predicate = p , current_user = current_user ) for p in predicates
197+ ]
102198 return sql .and_ (* clauses ) if is_and else sql .or_ (* clauses )
103199
104200
@@ -163,17 +259,35 @@ def _build_filter_where_clauses(
163259
164260def _predicate_to_clause (
165261 * ,
166- predicate ,
262+ predicate : filter_query_models .Predicate ,
263+ current_user : str | None = None ,
167264) -> sql .ColumnElement :
265+ predicate = _validate_and_resolve_predicate (
266+ predicate = predicate ,
267+ current_user = current_user ,
268+ )
269+
168270 match predicate :
169271 case filter_query_models .AndPredicate ():
170272 return sql .and_ (
171- * [_predicate_to_clause (predicate = p ) for p in predicate .and_ ]
273+ * [
274+ _predicate_to_clause (predicate = p , current_user = current_user )
275+ for p in predicate .and_
276+ ]
172277 )
173278 case filter_query_models .OrPredicate ():
174- return sql .or_ (* [_predicate_to_clause (predicate = p ) for p in predicate .or_ ])
279+ return sql .or_ (
280+ * [
281+ _predicate_to_clause (predicate = p , current_user = current_user )
282+ for p in predicate .or_
283+ ]
284+ )
175285 case filter_query_models .NotPredicate ():
176- return sql .not_ (_predicate_to_clause (predicate = predicate .not_ ))
286+ return sql .not_ (
287+ _predicate_to_clause (
288+ predicate = predicate .not_ , current_user = current_user
289+ )
290+ )
177291 case filter_query_models .KeyExistsPredicate ():
178292 return _key_exists_to_clause (predicate = predicate )
179293 case filter_query_models .ValueEqualsPredicate ():
0 commit comments