Skip to content
This repository was archived by the owner on Mar 2, 2026. It is now read-only.

Commit 17e71b9

Browse files
feat: add pipelines structure (#1046)
1 parent dc808b5 commit 17e71b9

21 files changed

Lines changed: 1979 additions & 2 deletions

google/cloud/firestore_v1/_helpers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ def __ne__(self, other):
120120
else:
121121
return not equality_val
122122

123+
def __repr__(self):
124+
return f"{type(self).__name__}(latitude={self.latitude}, longitude={self.longitude})"
125+
123126

124127
def verify_path(path, is_collection) -> None:
125128
"""Verifies that a ``path`` has the correct form.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
from typing import Optional
17+
from abc import ABC
18+
from abc import abstractmethod
19+
20+
from google.cloud.firestore_v1.types.document import Pipeline as Pipeline_pb
21+
from google.cloud.firestore_v1.types.document import Value
22+
from google.cloud.firestore_v1.pipeline_expressions import Expr
23+
24+
25+
class Stage(ABC):
26+
"""Base class for all pipeline stages.
27+
28+
Each stage represents a specific operation (e.g., filtering, sorting,
29+
transforming) within a Firestore pipeline. Subclasses define the specific
30+
arguments and behavior for each operation.
31+
"""
32+
33+
def __init__(self, custom_name: Optional[str] = None):
34+
self.name = custom_name or type(self).__name__.lower()
35+
36+
def _to_pb(self) -> Pipeline_pb.Stage:
37+
return Pipeline_pb.Stage(
38+
name=self.name, args=self._pb_args(), options=self._pb_options()
39+
)
40+
41+
@abstractmethod
42+
def _pb_args(self) -> list[Value]:
43+
"""Return Ordered list of arguments the given stage expects"""
44+
raise NotImplementedError
45+
46+
def _pb_options(self) -> dict[str, Value]:
47+
"""Return optional named arguments that certain functions may support."""
48+
return {}
49+
50+
def __repr__(self):
51+
items = ("%s=%r" % (k, v) for k, v in self.__dict__.items() if k != "name")
52+
return f"{self.__class__.__name__}({', '.join(items)})"
53+
54+
55+
class Collection(Stage):
56+
"""Specifies a collection as the initial data source."""
57+
58+
def __init__(self, path: str):
59+
super().__init__()
60+
if not path.startswith("/"):
61+
path = f"/{path}"
62+
self.path = path
63+
64+
def _pb_args(self):
65+
return [Value(reference_value=self.path)]
66+
67+
68+
class GenericStage(Stage):
69+
"""Represents a generic, named stage with parameters."""
70+
71+
def __init__(self, name: str, *params: Expr | Value):
72+
super().__init__(name)
73+
self.params: list[Value] = [
74+
p._to_pb() if isinstance(p, Expr) else p for p in params
75+
]
76+
77+
def _pb_args(self):
78+
return self.params
79+
80+
def __repr__(self):
81+
return f"{self.__class__.__name__}(name='{self.name}')"

google/cloud/firestore_v1/async_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
from google.cloud.firestore_v1.services.firestore.transports import (
4848
grpc_asyncio as firestore_grpc_transport,
4949
)
50+
from google.cloud.firestore_v1.async_pipeline import AsyncPipeline
51+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
5052

5153
if TYPE_CHECKING: # pragma: NO COVER
5254
import datetime
@@ -427,3 +429,10 @@ def transaction(self, **kwargs) -> AsyncTransaction:
427429
A transaction attached to this client.
428430
"""
429431
return AsyncTransaction(self, **kwargs)
432+
433+
@property
434+
def _pipeline_cls(self):
435+
return AsyncPipeline
436+
437+
def pipeline(self) -> PipelineSource:
438+
return PipelineSource(self)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
from typing import AsyncIterable, TYPE_CHECKING
17+
from google.cloud.firestore_v1 import _pipeline_stages as stages
18+
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
19+
20+
if TYPE_CHECKING: # pragma: NO COVER
21+
from google.cloud.firestore_v1.async_client import AsyncClient
22+
from google.cloud.firestore_v1.pipeline_result import PipelineResult
23+
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
24+
25+
26+
class AsyncPipeline(_BasePipeline):
27+
"""
28+
Pipelines allow for complex data transformations and queries involving
29+
multiple stages like filtering, projection, aggregation, and vector search.
30+
31+
This class extends `_BasePipeline` and provides methods to execute the
32+
defined pipeline stages using an asynchronous `AsyncClient`.
33+
34+
Usage Example:
35+
>>> from google.cloud.firestore_v1.pipeline_expressions import Field
36+
>>>
37+
>>> async def run_pipeline():
38+
... client = AsyncClient(...)
39+
... pipeline = client.pipeline()
40+
... .collection("books")
41+
... .where(Field.of("published").gt(1980))
42+
... .select("title", "author")
43+
... async for result in pipeline.execute():
44+
... print(result)
45+
46+
Use `client.pipeline()` to create instances of this class.
47+
"""
48+
49+
def __init__(self, client: AsyncClient, *stages: stages.Stage):
50+
"""
51+
Initializes an asynchronous Pipeline.
52+
53+
Args:
54+
client: The asynchronous `AsyncClient` instance to use for execution.
55+
*stages: Initial stages for the pipeline.
56+
"""
57+
super().__init__(client, *stages)
58+
59+
async def execute(
60+
self,
61+
transaction: "AsyncTransaction" | None = None,
62+
) -> list[PipelineResult]:
63+
"""
64+
Executes this pipeline and returns results as a list
65+
66+
Args:
67+
transaction
68+
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
69+
An existing transaction that this query will run in.
70+
If a ``transaction`` is used and it already has write operations
71+
added, this method cannot be used (i.e. read-after-write is not
72+
allowed).
73+
"""
74+
return [result async for result in self.stream(transaction=transaction)]
75+
76+
async def stream(
77+
self,
78+
transaction: "AsyncTransaction" | None = None,
79+
) -> AsyncIterable[PipelineResult]:
80+
"""
81+
Process this pipeline as a stream, providing results through an Iterable
82+
83+
Args:
84+
transaction
85+
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
86+
An existing transaction that this query will run in.
87+
If a ``transaction`` is used and it already has write operations
88+
added, this method cannot be used (i.e. read-after-write is not
89+
allowed).
90+
"""
91+
request = self._prep_execute_request(transaction)
92+
async for response in await self._client._firestore_api.execute_pipeline(
93+
request
94+
):
95+
for result in self._execute_response_helper(response):
96+
yield result

google/cloud/firestore_v1/base_client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
Optional,
3838
Tuple,
3939
Union,
40+
Type,
4041
)
4142

4243
import google.api_core.client_options
@@ -61,6 +62,8 @@
6162
from google.cloud.firestore_v1.bulk_writer import BulkWriter, BulkWriterOptions
6263
from google.cloud.firestore_v1.field_path import render_field_path
6364
from google.cloud.firestore_v1.services.firestore import client as firestore_client
65+
from google.cloud.firestore_v1.pipeline_source import PipelineSource
66+
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
6467

6568
DEFAULT_DATABASE = "(default)"
6669
"""str: The default database used in a :class:`~google.cloud.firestore_v1.client.Client`."""
@@ -500,6 +503,20 @@ def batch(self) -> BaseWriteBatch:
500503
def transaction(self, **kwargs) -> BaseTransaction:
501504
raise NotImplementedError
502505

506+
def pipeline(self) -> PipelineSource:
507+
"""
508+
Start a pipeline with this client.
509+
510+
Returns:
511+
:class:`~google.cloud.firestore_v1.pipeline_source.PipelineSource`:
512+
A pipeline that uses this client`
513+
"""
514+
raise NotImplementedError
515+
516+
@property
517+
def _pipeline_cls(self) -> Type["_BasePipeline"]:
518+
raise NotImplementedError
519+
503520

504521
def _reference_info(references: list) -> Tuple[list, dict]:
505522
"""Get information about document references.
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
from typing import Iterable, Sequence, TYPE_CHECKING
17+
from google.cloud.firestore_v1 import _pipeline_stages as stages
18+
from google.cloud.firestore_v1.types.pipeline import (
19+
StructuredPipeline as StructuredPipeline_pb,
20+
)
21+
from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest
22+
from google.cloud.firestore_v1.pipeline_result import PipelineResult
23+
from google.cloud.firestore_v1.pipeline_expressions import Expr
24+
from google.cloud.firestore_v1 import _helpers
25+
26+
if TYPE_CHECKING: # pragma: NO COVER
27+
from google.cloud.firestore_v1.client import Client
28+
from google.cloud.firestore_v1.async_client import AsyncClient
29+
from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse
30+
from google.cloud.firestore_v1.transaction import BaseTransaction
31+
32+
33+
class _BasePipeline:
34+
"""
35+
Base class for building Firestore data transformation and query pipelines.
36+
37+
This class is not intended to be instantiated directly.
38+
Use `client.collection.("...").pipeline()` to create pipeline instances.
39+
"""
40+
41+
def __init__(self, client: Client | AsyncClient):
42+
"""
43+
Initializes a new pipeline.
44+
45+
Pipelines should not be instantiated directly. Instead,
46+
call client.pipeline() to create an instance
47+
48+
Args:
49+
client: The client associated with the pipeline
50+
"""
51+
self._client = client
52+
self.stages: Sequence[stages.Stage] = tuple()
53+
54+
@classmethod
55+
def _create_with_stages(
56+
cls, client: Client | AsyncClient, *stages
57+
) -> _BasePipeline:
58+
"""
59+
Initializes a new pipeline with the given stages.
60+
61+
Pipeline classes should not be instantiated directly.
62+
63+
Args:
64+
client: The client associated with the pipeline
65+
*stages: Initial stages for the pipeline.
66+
"""
67+
new_instance = cls(client)
68+
new_instance.stages = tuple(stages)
69+
return new_instance
70+
71+
def __repr__(self):
72+
cls_str = type(self).__name__
73+
if not self.stages:
74+
return f"{cls_str}()"
75+
elif len(self.stages) == 1:
76+
return f"{cls_str}({self.stages[0]!r})"
77+
else:
78+
stages_str = ",\n ".join([repr(s) for s in self.stages])
79+
return f"{cls_str}(\n {stages_str}\n)"
80+
81+
def _to_pb(self) -> StructuredPipeline_pb:
82+
return StructuredPipeline_pb(
83+
pipeline={"stages": [s._to_pb() for s in self.stages]}
84+
)
85+
86+
def _append(self, new_stage):
87+
"""
88+
Create a new Pipeline object with a new stage appended
89+
"""
90+
return self.__class__._create_with_stages(self._client, *self.stages, new_stage)
91+
92+
def _prep_execute_request(
93+
self, transaction: BaseTransaction | None
94+
) -> ExecutePipelineRequest:
95+
"""
96+
shared logic for creating an ExecutePipelineRequest
97+
"""
98+
database_name = (
99+
f"projects/{self._client.project}/databases/{self._client._database}"
100+
)
101+
transaction_id = (
102+
_helpers.get_transaction_id(transaction)
103+
if transaction is not None
104+
else None
105+
)
106+
request = ExecutePipelineRequest(
107+
database=database_name,
108+
transaction=transaction_id,
109+
structured_pipeline=self._to_pb(),
110+
)
111+
return request
112+
113+
def _execute_response_helper(
114+
self, response: ExecutePipelineResponse
115+
) -> Iterable[PipelineResult]:
116+
"""
117+
shared logic for unpacking an ExecutePipelineReponse into PipelineResults
118+
"""
119+
for doc in response.results:
120+
ref = self._client.document(doc.name) if doc.name else None
121+
yield PipelineResult(
122+
self._client,
123+
doc.fields,
124+
ref,
125+
response._pb.execution_time,
126+
doc._pb.create_time if doc.create_time else None,
127+
doc._pb.update_time if doc.update_time else None,
128+
)
129+
130+
def generic_stage(self, name: str, *params: Expr) -> "_BasePipeline":
131+
"""
132+
Adds a generic, named stage to the pipeline with specified parameters.
133+
134+
This method provides a flexible way to extend the pipeline's functionality
135+
by adding custom stages. Each generic stage is defined by a unique `name`
136+
and a set of `params` that control its behavior.
137+
138+
Example:
139+
>>> # Assume we don't have a built-in "where" stage
140+
>>> pipeline = client.pipeline().collection("books")
141+
>>> pipeline = pipeline.generic_stage("where", [Field.of("published").lt(900)])
142+
>>> pipeline = pipeline.select("title", "author")
143+
144+
Args:
145+
name: The name of the generic stage.
146+
*params: A sequence of `Expr` objects representing the parameters for the stage.
147+
148+
Returns:
149+
A new Pipeline object with this stage appended to the stage list
150+
"""
151+
return self._append(stages.GenericStage(name, *params))

0 commit comments

Comments
 (0)