Skip to content
This repository was archived by the owner on Mar 2, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9129306
Revert "remvoved unready stages and expressions"
daniel-sanche May 3, 2025
c0ebc00
Merge branch 'pipeline_queries_3_stable_stages' into pipeline_queries…
daniel-sanche May 6, 2025
ca38160
fixed docstrings
daniel-sanche May 6, 2025
0051d57
Merge branch 'pipeline_queries_3_stable_stages' into pipeline_queries…
daniel-sanche May 8, 2025
6d29008
Merge branch 'pipeline_queries_3_stable_stages' into pipeline_queries…
daniel-sanche May 13, 2025
60a770c
fixed lint
daniel-sanche May 13, 2025
44c7533
Merge branch 'pipeline_queries_3_stable_stages' into pipeline_queries…
daniel-sanche Jun 12, 2025
699fc05
added unit tests
daniel-sanche Jun 12, 2025
a1c3537
fixed lint
daniel-sanche Jun 12, 2025
dc3564c
Merge branch 'pipeline_queries_3_stable_stages' into pipeline_queries…
daniel-sanche Jun 13, 2025
9f8b1c2
Merge branch 'pipeline_queries_3_5_query_conversion' into pipeline_qu…
daniel-sanche Jun 25, 2025
046b803
Merge branch 'pipeline_queries_3_5_query_conversion' into pipeline_qu…
daniel-sanche Jul 11, 2025
8404765
Merge branch 'pipeline_queries_3_5_query_conversion' into pipeline_qu…
daniel-sanche Jul 11, 2025
407aa85
renamed classes
daniel-sanche Jul 12, 2025
8233f47
renamed expressions
daniel-sanche Oct 16, 2025
60924fe
added new math expressions
daniel-sanche Oct 16, 2025
39af626
Merge branch 'pipeline_queries_3_5_query_conversion' into pipeline_qu…
daniel-sanche Oct 17, 2025
bbc8915
Merge branch 'pipeline_queries_4_all' into pipeline_queries_5_cleanup
daniel-sanche Oct 17, 2025
3217b00
fixed tests
daniel-sanche Oct 17, 2025
e1a2f15
fixed lint
daniel-sanche Oct 17, 2025
4de3908
added test for AliasedAggregate
daniel-sanche Oct 17, 2025
5fd0ba4
removed unused expressions
daniel-sanche Oct 17, 2025
5a3eb14
improved array functions
daniel-sanche Oct 17, 2025
3a35035
removed duplicate code
daniel-sanche Oct 18, 2025
32c4e4b
added map expressions
daniel-sanche Oct 18, 2025
d5e854c
renamed if to conditional
daniel-sanche Oct 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions google/cloud/firestore_v1/_pipeline_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from google.cloud.firestore_v1.pipeline_expressions import (
Accumulator,
AggregateFunction,
Expr,
ExprWithAlias,
AliasedAggregate,
AliasedExpr,
Field,
FilterCondition,
BooleanExpr,
Selectable,
Ordering,
)
Expand Down Expand Up @@ -164,8 +165,8 @@ class Aggregate(Stage):

def __init__(
self,
*args: ExprWithAlias[Accumulator],
accumulators: Sequence[ExprWithAlias[Accumulator]] = (),
*args: AliasedExpr[AggregateFunction],
accumulators: Sequence[AliasedAggregate] = (),
groups: Sequence[str | Selectable] = (),
):
super().__init__()
Expand Down Expand Up @@ -350,6 +351,26 @@ def _pb_args(self) -> list[Value]:
return [f._to_pb() for f in self.fields]


class Replace(Stage):
"""Replaces the document content with the value of a specified field."""

class Mode(Enum):
FULL_REPLACE = 0
MERGE_PREFER_NEXT = 1
MERGE_PREFER_PARENT = 2

def __repr__(self):
return f"Replace.Mode.{self.name.upper()}"

def __init__(self, field: Selectable | str, mode: Mode | str = Mode.FULL_REPLACE):
super().__init__()
self.field = Field(field) if isinstance(field, str) else field
self.mode = self.Mode[mode.upper()] if isinstance(mode, str) else mode

def _pb_args(self):
return [self.field._to_pb(), Value(string_value=self.mode.name.lower())]


class Sample(Stage):
"""Performs pseudo-random sampling of documents."""

Expand Down Expand Up @@ -439,7 +460,7 @@ def _pb_options(self):
class Where(Stage):
"""Filters documents based on a specified condition."""

def __init__(self, condition: FilterCondition):
def __init__(self, condition: BooleanExpr):
super().__init__()
self.condition = condition

Expand Down
8 changes: 4 additions & 4 deletions google/cloud/firestore_v1/base_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
from google.cloud.firestore_v1.types import (
StructuredAggregationQuery,
)
from google.cloud.firestore_v1.pipeline_expressions import Accumulator
from google.cloud.firestore_v1.pipeline_expressions import AggregateFunction
from google.cloud.firestore_v1.pipeline_expressions import Count
from google.cloud.firestore_v1.pipeline_expressions import ExprWithAlias
from google.cloud.firestore_v1.pipeline_expressions import AliasedExpr
from google.cloud.firestore_v1.pipeline_expressions import Field

# Types needed only for Type Hints
Expand Down Expand Up @@ -86,7 +86,7 @@ def _to_protobuf(self):
@abc.abstractmethod
def _to_pipeline_expr(
self, autoindexer: Iterable[int]
) -> ExprWithAlias[Accumulator]:
) -> AliasedExpr[AggregateFunction]:
"""
Convert this instance to a pipeline expression for use with pipeline.aggregate()

Expand Down Expand Up @@ -162,7 +162,7 @@ def _to_protobuf(self):
return aggregation_pb

def _to_pipeline_expr(self, autoindexer: Iterable[int]):
return Field.of(self.field_ref).avg().as_(self._pipeline_alias(autoindexer))
return Field.of(self.field_ref).average().as_(self._pipeline_alias(autoindexer))


def _query_response_to_result(
Expand Down
67 changes: 57 additions & 10 deletions google/cloud/firestore_v1/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest
from google.cloud.firestore_v1.pipeline_result import PipelineResult
from google.cloud.firestore_v1.pipeline_expressions import (
Accumulator,
AliasedAggregate,
Expr,
ExprWithAlias,
Field,
FilterCondition,
BooleanExpr,
Selectable,
)
from google.cloud.firestore_v1 import _helpers
Expand Down Expand Up @@ -220,14 +219,14 @@ def select(self, *selections: str | Selectable) -> "_BasePipeline":
"""
return self._append(stages.Select(*selections))

def where(self, condition: FilterCondition) -> "_BasePipeline":
def where(self, condition: BooleanExpr) -> "_BasePipeline":
"""
Filters the documents from previous stages to only include those matching
the specified `FilterCondition`.
the specified `BooleanExpr`.

This stage allows you to apply conditions to the data, similar to a "WHERE"
clause in SQL. You can filter documents based on their field values, using
implementations of `FilterCondition`, typically including but not limited to:
implementations of `BooleanExpr`, typically including but not limited to:
- field comparators: `eq`, `lt` (less than), `gt` (greater than), etc.
- logical operators: `And`, `Or`, `Not`, etc.
- advanced functions: `regex_matches`, `array_contains`, etc.
Expand All @@ -252,7 +251,7 @@ def where(self, condition: FilterCondition) -> "_BasePipeline":


Args:
condition: The `FilterCondition` to apply.
condition: The `BooleanExpr` to apply.

Returns:
A new Pipeline object with this stage appended to the stage list
Expand Down Expand Up @@ -343,6 +342,54 @@ def sort(self, *orders: stages.Ordering) -> "_BasePipeline":
"""
return self._append(stages.Sort(*orders))

def replace(
self,
field: Selectable,
mode: stages.Replace.Mode = stages.Replace.Mode.FULL_REPLACE,
) -> "_BasePipeline":
"""
Replaces the entire document content with the value of a specified field,
typically a map.

This stage allows you to emit a map value as the new document structure.
Each key of the map becomes a field in the output document, containing the
corresponding value.

Example:
Input document:
```json
{
"name": "John Doe Jr.",
"parents": {
"father": "John Doe Sr.",
"mother": "Jane Doe"
}
}
```

>>> from google.cloud.firestore_v1.pipeline_expressions import Field
>>> pipeline = client.pipeline().collection("people")
>>> # Emit the 'parents' map as the document
>>> pipeline = pipeline.replace(Field.of("parents"))

Output document:
```json
{
"father": "John Doe Sr.",
"mother": "Jane Doe"
}
```

Args:
field: The `Selectable` field containing the map whose content will
replace the document.
mode: The replacement mode

Returns:
A new Pipeline object with this stage appended to the stage list
"""
return self._append(stages.Replace(field, mode))

def sample(self, limit_or_options: int | stages.SampleOptions) -> "_BasePipeline":
"""
Performs a pseudo-random sampling of the documents from the previous stage.
Expand Down Expand Up @@ -531,7 +578,7 @@ def limit(self, limit: int) -> "_BasePipeline":

def aggregate(
self,
*accumulators: ExprWithAlias[Accumulator],
*accumulators: AliasedAggregate,
groups: Sequence[str | Selectable] = (),
) -> "_BasePipeline":
"""
Expand All @@ -541,7 +588,7 @@ def aggregate(
This stage allows you to calculate aggregate values (like sum, average, count,
min, max) over a set of documents.

- **Accumulators:** Define the aggregation calculations using `Accumulator`
- **AggregateFunctions:** Define the aggregation calculations using `AggregateFunction`
expressions (e.g., `sum()`, `avg()`, `count()`, `min()`, `max()`) combined
with `as_()` to name the result field.
- **Groups:** Optionally specify fields (by name or `Selectable`) to group
Expand Down Expand Up @@ -569,7 +616,7 @@ def aggregate(


Args:
*accumulators: One or more `ExprWithAlias[Accumulator]` expressions defining
*accumulators: One or more `AliasedAggregate` expressions defining
the aggregations to perform and their output names.
groups: An optional sequence of field names (str) or `Selectable`
expressions to group by before aggregating.
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/firestore_v1/base_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ def pipeline(self):
# Filters
for filter_ in self._field_filters:
ppl = ppl.where(
pipeline_expressions.FilterCondition._from_query_filter_pb(
pipeline_expressions.BooleanExpr._from_query_filter_pb(
filter_, self._client
)
)
Expand Down
Loading