From 15573d369c176d912f631dc9e7e25e606dc5aecd Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 5 Nov 2025 16:09:11 -0800 Subject: [PATCH 01/20] support passing in explain_options --- google/cloud/firestore_v1/async_pipeline.py | 21 +++++++++++++++------ google/cloud/firestore_v1/base_pipeline.py | 15 +++++++++++---- google/cloud/firestore_v1/pipeline.py | 21 +++++++++++++++------ google/cloud/firestore_v1/query_profile.py | 7 +++++++ 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index 9fe0c8756d..ec41a8164f 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -21,6 +21,7 @@ from google.cloud.firestore_v1.async_client import AsyncClient from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.async_transaction import AsyncTransaction + from google.cloud.firestore_v1.query_profile import ExplainOptions class AsyncPipeline(_BasePipeline): @@ -58,37 +59,45 @@ def __init__(self, client: AsyncClient, *stages: stages.Stage): async def execute( self, + *, transaction: "AsyncTransaction" | None = None, + explain_options: ExplainOptions | None = None, ) -> list[PipelineResult]: """ Executes this pipeline and returns results as a list Args: - transaction - (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): + transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): An existing transaction that this query will run in. If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned list. """ - return [result async for result in self.stream(transaction=transaction)] + return [result async for result in self.stream(transaction=transaction, explain_options=explain_options)] async def stream( self, + *, transaction: "AsyncTransaction" | None = None, + explain_options: ExplainOptions | None = None, ) -> AsyncIterable[PipelineResult]: """ Process this pipeline as a stream, providing results through an Iterable Args: - transaction - (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): + transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): An existing transaction that this query will run in. If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. """ - request = self._prep_execute_request(transaction) + request = self._prep_execute_request(transaction, explain_options) async for response in await self._client._firestore_api.execute_pipeline( request ): diff --git a/google/cloud/firestore_v1/base_pipeline.py b/google/cloud/firestore_v1/base_pipeline.py index c663217931..1f19f8a060 100644 --- a/google/cloud/firestore_v1/base_pipeline.py +++ b/google/cloud/firestore_v1/base_pipeline.py @@ -37,6 +37,7 @@ from google.cloud.firestore_v1.async_client import AsyncClient from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse from google.cloud.firestore_v1.transaction import BaseTransaction + from google.cloud.firestore_v1.query_profile import ExplainOptions class _BasePipeline: @@ -87,9 +88,10 @@ def __repr__(self): stages_str = ",\n ".join([repr(s) for s in self.stages]) return f"{cls_str}(\n {stages_str}\n)" - def _to_pb(self) -> StructuredPipeline_pb: + def _to_pb(self, **options) -> StructuredPipeline_pb: return StructuredPipeline_pb( - pipeline={"stages": [s._to_pb() for s in self.stages]} + pipeline={"stages": [s._to_pb() for s in self.stages]}, + options=options, ) def _append(self, new_stage): @@ -99,7 +101,9 @@ def _append(self, new_stage): return self.__class__._create_with_stages(self._client, *self.stages, new_stage) def _prep_execute_request( - self, transaction: BaseTransaction | None + self, + transaction: BaseTransaction | None, + explain_options: ExplainOptions | None, ) -> ExecutePipelineRequest: """ shared logic for creating an ExecutePipelineRequest @@ -112,10 +116,13 @@ def _prep_execute_request( if transaction is not None else None ) + options = {} + if explain_options: + options["explain_options"] = explain_options._to_value() request = ExecutePipelineRequest( database=database_name, transaction=transaction_id, - structured_pipeline=self._to_pb(), + structured_pipeline=self._to_pb(**options) ) return request diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index f578e00b68..a92ff046dc 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -21,6 +21,7 @@ from google.cloud.firestore_v1.client import Client from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.transaction import Transaction + from google.cloud.firestore_v1.query_profile import ExplainOptions class Pipeline(_BasePipeline): @@ -55,36 +56,44 @@ def __init__(self, client: Client, *stages: stages.Stage): def execute( self, + *, transaction: "Transaction" | None = None, + explain_options: ExplainOptions | None = None, ) -> list[PipelineResult]: """ Executes this pipeline and returns results as a list Args: - transaction - (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): + transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): An existing transaction that this query will run in. If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned list. """ - return [result for result in self.stream(transaction=transaction)] + return [result for result in self.stream(transaction=transaction, explain_options=explain_options)] def stream( self, + *, transaction: "Transaction" | None = None, + explain_options: ExplainOptions | None = None, ) -> Iterable[PipelineResult]: """ Process this pipeline as a stream, providing results through an Iterable Args: - transaction - (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): + transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): An existing transaction that this query will run in. If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + Options to enable query profiling for this query. When set, + explain_metrics will be available on the returned generator. """ - request = self._prep_execute_request(transaction) + request = self._prep_execute_request(transaction, explain_options) for response in self._client._firestore_api.execute_pipeline(request): yield from self._execute_response_helper(response) diff --git a/google/cloud/firestore_v1/query_profile.py b/google/cloud/firestore_v1/query_profile.py index 6925f83ffa..48eeeb9952 100644 --- a/google/cloud/firestore_v1/query_profile.py +++ b/google/cloud/firestore_v1/query_profile.py @@ -19,6 +19,8 @@ from dataclasses import dataclass from google.protobuf.json_format import MessageToDict +from google.cloud.firestore_v1.types.document import MapValue +from google.cloud.firestore_v1.types.document import Value @dataclass(frozen=True) @@ -41,6 +43,11 @@ class ExplainOptions: def _to_dict(self): return {"analyze": self.analyze} + def _to_value(self): + mode_str = "analyze" if self.analyze else "explain" + value_pb = MapValue(fields={"mode": Value(string_value=mode_str)}) + return Value(map_value=value_pb) + @dataclass(frozen=True) class PlanSummary: From 406b9b23ee3013d3313408b78d4c99c872b7ff0e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 5 Nov 2025 16:44:07 -0800 Subject: [PATCH 02/20] added PipelineSnapshot and PipelineStream classes --- google/cloud/firestore_v1/pipeline.py | 13 +++++---- google/cloud/firestore_v1/pipeline_result.py | 30 +++++++++++++++++++- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index a92ff046dc..901faa021a 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -16,11 +16,14 @@ from typing import Iterable, TYPE_CHECKING from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.base_pipeline import _BasePipeline +from google.cloud.firestore_v1.pipeline_result import PipelineStream +from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.client import Client from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.transaction import Transaction + from google.cloud.firestore_v1.query_profile import ExplainMetrics from google.cloud.firestore_v1.query_profile import ExplainOptions @@ -59,7 +62,7 @@ def execute( *, transaction: "Transaction" | None = None, explain_options: ExplainOptions | None = None, - ) -> list[PipelineResult]: + ) -> PipelineSnapshot[PipelineResult]: """ Executes this pipeline and returns results as a list @@ -73,14 +76,14 @@ def execute( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. """ - return [result for result in self.stream(transaction=transaction, explain_options=explain_options)] + return PipelineSnapshot._from_stream(self.stream(transaction=transaction, explain_options=explain_options)) def stream( self, *, transaction: "Transaction" | None = None, explain_options: ExplainOptions | None = None, - ) -> Iterable[PipelineResult]: + ) -> PipelineStream[PipelineResult]: """ Process this pipeline as a stream, providing results through an Iterable @@ -95,5 +98,5 @@ def stream( explain_metrics will be available on the returned generator. """ request = self._prep_execute_request(transaction, explain_options) - for response in self._client._firestore_api.execute_pipeline(request): - yield from self._execute_response_helper(response) + stream = self._client._firestore_api.execute_pipeline(request) + return PipelineStream(self._client, stream) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index ada855fea3..99000461f5 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Any, MutableMapping, TYPE_CHECKING +from typing import Any, MutableMapping, Iterable, TYPE_CHECKING from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.field_path import get_nested_value from google.cloud.firestore_v1.field_path import FieldPath @@ -137,3 +137,31 @@ def get(self, field_path: str | FieldPath) -> Any: ) value = get_nested_value(str_path, self._fields_pb) return _helpers.decode_value(value, self._client) + +class PipelineSnapshot(list[PipelineResult]): + def __init__(self, results_list: list[PipelineResult]): + super().__init__(results_list) + + @classmethod + def _from_stream(cls, stream: PipelineStream): + results = [r for r in stream] + return cls(results) + +class PipelineStream(Iterable[PipelineResult]): + + def __init__(self, client, rpc_stream): + self._client = client + self._stream = rpc_stream + + def __iter__(self) -> PipelineStream: + for response in self._stream: + for doc in response.results: + ref = self._client.document(doc.name) if doc.name else None + yield PipelineResult( + self._client, + doc.fields, + ref, + response._pb.execution_time, + doc._pb.create_time if doc.create_time else None, + doc._pb.update_time if doc.update_time else None, + ) \ No newline at end of file From 5aca1cf5518acd17b2afa30feb8b31c1a95a5473 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 5 Nov 2025 17:13:24 -0800 Subject: [PATCH 03/20] added ExplainStats class --- google/cloud/firestore_v1/pipeline_result.py | 13 +++++- google/cloud/firestore_v1/query_profile.py | 45 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 99000461f5..dc298f6159 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -17,6 +17,7 @@ from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.field_path import get_nested_value from google.cloud.firestore_v1.field_path import FieldPath +from google.cloud.firestore_v1.query_profile import ExplainStats if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.base_client import BaseClient @@ -139,22 +140,30 @@ def get(self, field_path: str | FieldPath) -> Any: return _helpers.decode_value(value, self._client) class PipelineSnapshot(list[PipelineResult]): - def __init__(self, results_list: list[PipelineResult]): + def __init__( + self, + results_list: list[PipelineResult], + explain_stats: ExplainStats | None = None + ): super().__init__(results_list) + self._explain_stats = explain_stats @classmethod def _from_stream(cls, stream: PipelineStream): results = [r for r in stream] - return cls(results) + return cls(results, ExplainStats=stream._explain_stats) class PipelineStream(Iterable[PipelineResult]): def __init__(self, client, rpc_stream): self._client = client self._stream = rpc_stream + self._explain_stats = None def __iter__(self) -> PipelineStream: for response in self._stream: + if response.explain_stats: + self._explain_stats = ExplainStats(response.explain_stats) for doc in response.results: ref = self._client.document(doc.name) if doc.name else None yield PipelineResult( diff --git a/google/cloud/firestore_v1/query_profile.py b/google/cloud/firestore_v1/query_profile.py index 48eeeb9952..8aa0d7cac4 100644 --- a/google/cloud/firestore_v1/query_profile.py +++ b/google/cloud/firestore_v1/query_profile.py @@ -21,6 +21,7 @@ from google.protobuf.json_format import MessageToDict from google.cloud.firestore_v1.types.document import MapValue from google.cloud.firestore_v1.types.document import Value +from google.cloud.firestore_v1.types.explain_stats import ExplainStats as ExplainStats_pb @dataclass(frozen=True) @@ -150,3 +151,47 @@ class QueryExplainError(Exception): """ pass + +class ExplainStats: + """ + Contains query profiling statistics for a pipeline query. + + This class is not meant to be instantiated directly by the user. Instead, an + instance of `ExplainStats` may be returned by pipeline execution methods + when `explain_options` are provided. + + It provides methods to access the explain statistics in different formats. + """ + + def __init__(self, stats_pb: ExplainStats_pb): + """ + Args: + stats_pb (ExplainStats_pb): The raw protobuf message for explain stats. + """ + self._stats_pb = stats_pb + + def get_text(self) -> str: + """ + Returns the explain stats string verbatim as returned from the Firestore backend. + + Returns: + str: the string representation of the explain_stats object + + Raises: + QueryExplainError: If the explain stats cannot be parsed. + """ + try: + dict_repr = MessageToDict(self._stats_pb, preserving_proto_field_name=True) + return dict_repr["data"]["value"] + except (KeyError, TypeError) as e: + raise QueryExplainError("Failed to parse explain stats") from e + + def get_raw(self) -> ExplainStats_pb: + """ + Returns the explain stats in an encoded proto format, as returned from the Firestore backend. + The caller is responsible for unpacking this proto message. + + Returns: + google.cloud.firestore_v1.types.explain_stats.ExplainStats: the proto from the backend + """ + return self._stats_pb \ No newline at end of file From 065be33d0123c5147cb0f9406361d6551f26d7b0 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 12:03:56 -0800 Subject: [PATCH 04/20] got pipeline containers set up --- google/cloud/firestore_v1/pipeline.py | 2 +- google/cloud/firestore_v1/pipeline_result.py | 72 +++++++++++++++----- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index 901faa021a..b8393d7c63 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -99,4 +99,4 @@ def stream( """ request = self._prep_execute_request(transaction, explain_options) stream = self._client._firestore_api.execute_pipeline(request) - return PipelineStream(self._client, stream) \ No newline at end of file + return PipelineStream(stream, self, explain_options) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index dc298f6159..81377e64af 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -18,13 +18,21 @@ from google.cloud.firestore_v1.field_path import get_nested_value from google.cloud.firestore_v1.field_path import FieldPath from google.cloud.firestore_v1.query_profile import ExplainStats +from google.cloud.firestore_v1.query_profile import QueryExplainError if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.firestore_v1.async_client import AsyncClient + from google.cloud.firestore_v1.client import Client from google.cloud.firestore_v1.base_client import BaseClient from google.cloud.firestore_v1.base_document import BaseDocumentReference from google.protobuf.timestamp_pb2 import Timestamp + from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse from google.cloud.firestore_v1.types.document import Value as ValueProto from google.cloud.firestore_v1.vector import Vector + from google.cloud.firestore_v1.async_pipeline import AsyncPipeline + from google.cloud.firestore_v1.base_pipeline import _BasePipeline + from google.cloud.firestore_v1.pipeline import Pipeline + from google.cloud.firestore_v1.query_profile import ExplainOptions class PipelineResult: @@ -139,38 +147,70 @@ def get(self, field_path: str | FieldPath) -> Any: value = get_nested_value(str_path, self._fields_pb) return _helpers.decode_value(value, self._client) -class PipelineSnapshot(list[PipelineResult]): - def __init__( - self, - results_list: list[PipelineResult], - explain_stats: ExplainStats | None = None - ): - super().__init__(results_list) - self._explain_stats = explain_stats +class _PipelineResultContainer: + """Helper to hold shared attributes for PipelineSnapshot and PipelineStream""" + + def __init__(self, pipeline: _BasePipeline, explain_options: ExplainOptions | None): + # public + self.pipeline: _BasePipeline = pipeline + self.execution_time: Timestamp | None = None + # private + self._started: bool = False + self._explain_stats: ExplainStats | None = None + self._explain_options: ExplainOptions | None = explain_options + + @property + def explain_stats(self) -> ExplainStats: + if self._explain_stats is not None: + return self._explain_stats + elif self._explain_options is None: + raise QueryExplainError("explain_options not set on query.") + elif not self._started: + raise QueryExplainError("stream not started") + else: + raise QueryExplainError("explain_options not found") + + +class PipelineSnapshot(_PipelineResultContainer, list[PipelineResult]): + """ + A list type that holds the result of a pipeline.execute() operation, along with related metadata + """ + def __init__(self, results_list: list[PipelineResult], *args): + super().__init__(*args) + list.__init__(self, results_list) + # snapshots are always completed + self._started = True @classmethod def _from_stream(cls, stream: PipelineStream): results = [r for r in stream] - return cls(results, ExplainStats=stream._explain_stats) + return cls(results, stream.pipeline, stream._explain_stats) -class PipelineStream(Iterable[PipelineResult]): +class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): + """ + An iterable stream representing the result of a pipeline.stream() operation, along with related metadata + """ - def __init__(self, client, rpc_stream): - self._client = client - self._stream = rpc_stream - self._explain_stats = None + def __init__(self, rpc_stream: Iterable[ExecutePipelineResponse], pipeline: Pipeline, *args): + super().__init__(pipeline, *args) + self._client: Client = pipeline._client + self._stream: Iterable[ExecutePipelineResponse] = rpc_stream - def __iter__(self) -> PipelineStream: + def __iter__(self): + self._started = True for response in self._stream: if response.explain_stats: self._explain_stats = ExplainStats(response.explain_stats) + execution_time = response._pb.execution_time + if execution_time and not self.execution_time: + self.execution_time = execution_time for doc in response.results: ref = self._client.document(doc.name) if doc.name else None yield PipelineResult( self._client, doc.fields, ref, - response._pb.execution_time, + execution_time, doc._pb.create_time if doc.create_time else None, doc._pb.update_time if doc.update_time else None, ) \ No newline at end of file From e0ed49fab22932247e547b8e15278988c658c197 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 12:37:01 -0800 Subject: [PATCH 05/20] added async pipeline --- google/cloud/firestore_v1/async_pipeline.py | 23 +++---- google/cloud/firestore_v1/pipeline.py | 4 +- google/cloud/firestore_v1/pipeline_result.py | 63 +++++++++++++------- 3 files changed, 56 insertions(+), 34 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index ec41a8164f..0ce0f7a4f9 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -16,6 +16,8 @@ from typing import AsyncIterable, TYPE_CHECKING from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.base_pipeline import _BasePipeline +from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream +from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient @@ -41,7 +43,7 @@ class AsyncPipeline(_BasePipeline): ... .collection("books") ... .where(Field.of("published").gt(1980)) ... .select("title", "author") - ... async for result in pipeline.execute(): + ... async for result in pipeline.stream(): ... print(result) Use `client.pipeline()` to create instances of this class. @@ -62,7 +64,7 @@ async def execute( *, transaction: "AsyncTransaction" | None = None, explain_options: ExplainOptions | None = None, - ) -> list[PipelineResult]: + ) -> PipelineSnapshot[PipelineResult]: """ Executes this pipeline and returns results as a list @@ -76,16 +78,18 @@ async def execute( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. """ - return [result async for result in self.stream(transaction=transaction, explain_options=explain_options)] + stream = self.stream(transaction=transaction, explain_options=explain_options) + results = [result async for result in stream] + return await PipelineSnapshot._from_stream(results, stream) - async def stream( + def stream( self, *, transaction: "AsyncTransaction" | None = None, explain_options: ExplainOptions | None = None, - ) -> AsyncIterable[PipelineResult]: + ) -> AsyncPipelineStream[PipelineResult]: """ - Process this pipeline as a stream, providing results through an Iterable + Process this pipeline as a stream, providing results through an AsyncIterable Args: transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): @@ -98,8 +102,5 @@ async def stream( explain_metrics will be available on the returned generator. """ request = self._prep_execute_request(transaction, explain_options) - async for response in await self._client._firestore_api.execute_pipeline( - request - ): - for result in self._execute_response_helper(response): - yield result + stream = self._client._firestore_api.execute_pipeline(request) + return AsyncPipelineStream(stream, self, explain_options) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index b8393d7c63..8fa314b634 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -76,7 +76,9 @@ def execute( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. """ - return PipelineSnapshot._from_stream(self.stream(transaction=transaction, explain_options=explain_options)) + stream = self.stream(transaction=transaction, explain_options=explain_options) + results = [result for result in stream] + return PipelineSnapshot._from_stream(results, stream) def stream( self, diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 81377e64af..20c78fa2b1 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Any, MutableMapping, Iterable, TYPE_CHECKING +from typing import Any, Awaitable, AsyncIterable, Iterable, MutableMapping, TYPE_CHECKING from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.field_path import get_nested_value from google.cloud.firestore_v1.field_path import FieldPath @@ -150,11 +150,12 @@ def get(self, field_path: str | FieldPath) -> Any: class _PipelineResultContainer: """Helper to hold shared attributes for PipelineSnapshot and PipelineStream""" - def __init__(self, pipeline: _BasePipeline, explain_options: ExplainOptions | None): + def __init__(self, pipeline: Pipeline | AsyncPipeline, explain_options: ExplainOptions | None): # public self.pipeline: _BasePipeline = pipeline self.execution_time: Timestamp | None = None # private + self._client: Client | AsyncClient = pipeline._client self._started: bool = False self._explain_stats: ExplainStats | None = None self._explain_options: ExplainOptions | None = explain_options @@ -169,6 +170,24 @@ def explain_stats(self) -> ExplainStats: raise QueryExplainError("stream not started") else: raise QueryExplainError("explain_options not found") + + def _process_response(self, response: ExecutePipelineResponse): + """Shared logic for processing an individual response from a stream""" + if response.explain_stats: + self._explain_stats = ExplainStats(response.explain_stats) + execution_time = response._pb.execution_time + if execution_time and not self.execution_time: + self.execution_time = execution_time + for doc in response.results: + ref = self._client.document(doc.name) if doc.name else None + yield PipelineResult( + self._client, + doc.fields, + ref, + execution_time, + doc._pb.create_time if doc.create_time else None, + doc._pb.update_time if doc.update_time else None, + ) class PipelineSnapshot(_PipelineResultContainer, list[PipelineResult]): @@ -182,9 +201,9 @@ def __init__(self, results_list: list[PipelineResult], *args): self._started = True @classmethod - def _from_stream(cls, stream: PipelineStream): - results = [r for r in stream] - return cls(results, stream.pipeline, stream._explain_stats) + def _from_stream(cls, results: list[PipelineResult], source: _PipelineResultContainer): + return cls(results, source.pipeline, source._explain_stats) + class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): """ @@ -193,24 +212,24 @@ class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): def __init__(self, rpc_stream: Iterable[ExecutePipelineResponse], pipeline: Pipeline, *args): super().__init__(pipeline, *args) - self._client: Client = pipeline._client - self._stream: Iterable[ExecutePipelineResponse] = rpc_stream + self._stream = rpc_stream def __iter__(self): self._started = True for response in self._stream: - if response.explain_stats: - self._explain_stats = ExplainStats(response.explain_stats) - execution_time = response._pb.execution_time - if execution_time and not self.execution_time: - self.execution_time = execution_time - for doc in response.results: - ref = self._client.document(doc.name) if doc.name else None - yield PipelineResult( - self._client, - doc.fields, - ref, - execution_time, - doc._pb.create_time if doc.create_time else None, - doc._pb.update_time if doc.update_time else None, - ) \ No newline at end of file + yield from self._process_response(response) + +class AsyncPipelineStream(_PipelineResultContainer, AsyncIterable[PipelineResult]): + """ + An iterable stream representing the result of an async pipeline.stream() operation, along with related metadata + """ + + def __init__(self, rpc_stream: Awaitable[AsyncIterable[ExecutePipelineResponse]], pipeline: AsyncPipeline, *args): + super().__init__(pipeline, *args) + self._stream = rpc_stream + + async def __aiter__(self): + self._started = True + async for response in await self._stream: + for response in self._process_response(response): + yield response \ No newline at end of file From 129116ac08978cf84a9b989732a6ce4a820a9aef Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 13:04:46 -0800 Subject: [PATCH 06/20] moved request logic into PipelineStream class --- google/cloud/firestore_v1/async_pipeline.py | 4 +- google/cloud/firestore_v1/base_pipeline.py | 43 ---------------- google/cloud/firestore_v1/pipeline.py | 4 +- google/cloud/firestore_v1/pipeline_result.py | 52 +++++++++++++++----- 4 files changed, 42 insertions(+), 61 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index 0ce0f7a4f9..71ecb6a00c 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -101,6 +101,4 @@ def stream( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. """ - request = self._prep_execute_request(transaction, explain_options) - stream = self._client._firestore_api.execute_pipeline(request) - return AsyncPipelineStream(stream, self, explain_options) \ No newline at end of file + return AsyncPipelineStream(self, transaction, explain_options) \ No newline at end of file diff --git a/google/cloud/firestore_v1/base_pipeline.py b/google/cloud/firestore_v1/base_pipeline.py index 1f19f8a060..cfe699770a 100644 --- a/google/cloud/firestore_v1/base_pipeline.py +++ b/google/cloud/firestore_v1/base_pipeline.py @@ -100,49 +100,6 @@ def _append(self, new_stage): """ return self.__class__._create_with_stages(self._client, *self.stages, new_stage) - def _prep_execute_request( - self, - transaction: BaseTransaction | None, - explain_options: ExplainOptions | None, - ) -> ExecutePipelineRequest: - """ - shared logic for creating an ExecutePipelineRequest - """ - database_name = ( - f"projects/{self._client.project}/databases/{self._client._database}" - ) - transaction_id = ( - _helpers.get_transaction_id(transaction) - if transaction is not None - else None - ) - options = {} - if explain_options: - options["explain_options"] = explain_options._to_value() - request = ExecutePipelineRequest( - database=database_name, - transaction=transaction_id, - structured_pipeline=self._to_pb(**options) - ) - return request - - def _execute_response_helper( - self, response: ExecutePipelineResponse - ) -> Iterable[PipelineResult]: - """ - shared logic for unpacking an ExecutePipelineReponse into PipelineResults - """ - for doc in response.results: - ref = self._client.document(doc.name) if doc.name else None - yield PipelineResult( - self._client, - doc.fields, - ref, - response._pb.execution_time, - doc._pb.create_time if doc.create_time else None, - doc._pb.update_time if doc.update_time else None, - ) - def add_fields(self, *fields: Selectable) -> "_BasePipeline": """ Adds new fields to outputs from previous stages. diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index 8fa314b634..3cff7320c8 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -99,6 +99,4 @@ def stream( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. """ - request = self._prep_execute_request(transaction, explain_options) - stream = self._client._firestore_api.execute_pipeline(request) - return PipelineStream(stream, self, explain_options) \ No newline at end of file + return PipelineStream(self, transaction, explain_options) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 20c78fa2b1..73eb192ee8 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -19,11 +19,15 @@ from google.cloud.firestore_v1.field_path import FieldPath from google.cloud.firestore_v1.query_profile import ExplainStats from google.cloud.firestore_v1.query_profile import QueryExplainError +from google.cloud.firestore_v1.query_profile import ExplainStats +from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient from google.cloud.firestore_v1.client import Client from google.cloud.firestore_v1.base_client import BaseClient + from google.cloud.firestore_v1.async_transaction import AsyncTransaction + from google.cloud.firestore_v1.transaction import Transaction from google.cloud.firestore_v1.base_document import BaseDocumentReference from google.protobuf.timestamp_pb2 import Timestamp from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse @@ -150,8 +154,14 @@ def get(self, field_path: str | FieldPath) -> Any: class _PipelineResultContainer: """Helper to hold shared attributes for PipelineSnapshot and PipelineStream""" - def __init__(self, pipeline: Pipeline | AsyncPipeline, explain_options: ExplainOptions | None): + def __init__( + self, + pipeline: Pipeline | AsyncPipeline, + transaction: Transaction | AsyncTransaction | None, + explain_options: ExplainOptions | None + ): # public + self.transaction = transaction self.pipeline: _BasePipeline = pipeline self.execution_time: Timestamp | None = None # private @@ -171,6 +181,28 @@ def explain_stats(self) -> ExplainStats: else: raise QueryExplainError("explain_options not found") + def _build_request(self) -> ExecutePipelineRequest: + """ + shared logic for creating an ExecutePipelineRequest + """ + database_name = ( + f"projects/{self._client.project}/databases/{self._client._database}" + ) + transaction_id = ( + _helpers.get_transaction_id(self.transaction) + if self.transaction is not None + else None + ) + options = {} + if self._explain_options: + options["explain_options"] = self._explain_options._to_value() + request = ExecutePipelineRequest( + database=database_name, + transaction=transaction_id, + structured_pipeline=self.pipeline._to_pb(**options) + ) + return request + def _process_response(self, response: ExecutePipelineResponse): """Shared logic for processing an individual response from a stream""" if response.explain_stats: @@ -202,7 +234,7 @@ def __init__(self, results_list: list[PipelineResult], *args): @classmethod def _from_stream(cls, results: list[PipelineResult], source: _PipelineResultContainer): - return cls(results, source.pipeline, source._explain_stats) + return cls(results, source.pipeline, source.transaction, source._explain_stats) class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): @@ -210,13 +242,11 @@ class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): An iterable stream representing the result of a pipeline.stream() operation, along with related metadata """ - def __init__(self, rpc_stream: Iterable[ExecutePipelineResponse], pipeline: Pipeline, *args): - super().__init__(pipeline, *args) - self._stream = rpc_stream - def __iter__(self): self._started = True - for response in self._stream: + request = self._build_request() + stream = self._client._firestore_api.execute_pipeline(request) + for response in stream: yield from self._process_response(response) class AsyncPipelineStream(_PipelineResultContainer, AsyncIterable[PipelineResult]): @@ -224,12 +254,10 @@ class AsyncPipelineStream(_PipelineResultContainer, AsyncIterable[PipelineResult An iterable stream representing the result of an async pipeline.stream() operation, along with related metadata """ - def __init__(self, rpc_stream: Awaitable[AsyncIterable[ExecutePipelineResponse]], pipeline: AsyncPipeline, *args): - super().__init__(pipeline, *args) - self._stream = rpc_stream - async def __aiter__(self): self._started = True - async for response in await self._stream: + request = self._build_request() + stream = await self._client._firestore_api.execute_pipeline(request) + async for response in stream: for response in self._process_response(response): yield response \ No newline at end of file From 2a096538d03cdd628989c929a00400d9915741c6 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 13:56:52 -0800 Subject: [PATCH 07/20] added errors if iterated multiple times --- google/cloud/firestore_v1/pipeline_result.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 73eb192ee8..3dbf60df5f 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -234,7 +234,9 @@ def __init__(self, results_list: list[PipelineResult], *args): @classmethod def _from_stream(cls, results: list[PipelineResult], source: _PipelineResultContainer): - return cls(results, source.pipeline, source.transaction, source._explain_stats) + new_instance = cls(results, source.pipeline, source.transaction, source._explain_stats) + new_instance._explain_stats = source._explain_stats + new_instance.execution_time = source.execution_time class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): @@ -243,6 +245,8 @@ class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): """ def __iter__(self): + if self._started: + raise RuntimeError(f"{self.__class__.__name__} can only be iterated once") self._started = True request = self._build_request() stream = self._client._firestore_api.execute_pipeline(request) @@ -255,6 +259,8 @@ class AsyncPipelineStream(_PipelineResultContainer, AsyncIterable[PipelineResult """ async def __aiter__(self): + if self._started: + raise RuntimeError(f"{self.__class__.__name__} can only be iterated once") self._started = True request = self._build_request() stream = await self._client._firestore_api.execute_pipeline(request) From 6943391b96f4c776c19e5e864672e80f6e344b60 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 14:06:16 -0800 Subject: [PATCH 08/20] simplified creation of pipelinesnapshot --- google/cloud/firestore_v1/async_pipeline.py | 2 +- google/cloud/firestore_v1/pipeline.py | 2 +- google/cloud/firestore_v1/pipeline_result.py | 13 ++++--------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index 71ecb6a00c..e6610ebd25 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -80,7 +80,7 @@ async def execute( """ stream = self.stream(transaction=transaction, explain_options=explain_options) results = [result async for result in stream] - return await PipelineSnapshot._from_stream(results, stream) + return PipelineSnapshot(results, stream) def stream( self, diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index 3cff7320c8..74e79d21e6 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -78,7 +78,7 @@ def execute( """ stream = self.stream(transaction=transaction, explain_options=explain_options) results = [result for result in stream] - return PipelineSnapshot._from_stream(results, stream) + return PipelineSnapshot(results, stream) def stream( self, diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 3dbf60df5f..4f25974f95 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -14,6 +14,7 @@ from __future__ import annotations from typing import Any, Awaitable, AsyncIterable, Iterable, MutableMapping, TYPE_CHECKING +import copy from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.field_path import get_nested_value from google.cloud.firestore_v1.field_path import FieldPath @@ -226,18 +227,12 @@ class PipelineSnapshot(_PipelineResultContainer, list[PipelineResult]): """ A list type that holds the result of a pipeline.execute() operation, along with related metadata """ - def __init__(self, results_list: list[PipelineResult], *args): - super().__init__(*args) + def __init__(self, results_list: list[PipelineResult], source: _PipelineResultContainer): + self.__dict__.update(copy.deepcopy(source.__dict__)) list.__init__(self, results_list) - # snapshots are always completed + # snapshots are always complete self._started = True - @classmethod - def _from_stream(cls, results: list[PipelineResult], source: _PipelineResultContainer): - new_instance = cls(results, source.pipeline, source.transaction, source._explain_stats) - new_instance._explain_stats = source._explain_stats - new_instance.execution_time = source.execution_time - class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): """ From c866d7cd915e3b0a9eea4dcc81aecdb714504007 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 14:38:56 -0800 Subject: [PATCH 09/20] added generics --- google/cloud/firestore_v1/async_pipeline.py | 6 ++-- google/cloud/firestore_v1/pipeline.py | 6 ++-- google/cloud/firestore_v1/pipeline_result.py | 30 ++++++++++++-------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index e6610ebd25..f92763b95a 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -18,10 +18,10 @@ from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot +from google.cloud.firestore_v1.pipeline_result import PipelineResult if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient - from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.async_transaction import AsyncTransaction from google.cloud.firestore_v1.query_profile import ExplainOptions @@ -101,4 +101,6 @@ def stream( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. """ - return AsyncPipelineStream(self, transaction, explain_options) \ No newline at end of file + return AsyncPipelineStream( + PipelineResult, self, transaction, explain_options + ) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index 74e79d21e6..1c6c9fa92f 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -18,10 +18,10 @@ from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.pipeline_result import PipelineStream from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot +from google.cloud.firestore_v1.pipeline_result import PipelineResult if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.client import Client - from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.transaction import Transaction from google.cloud.firestore_v1.query_profile import ExplainMetrics from google.cloud.firestore_v1.query_profile import ExplainOptions @@ -99,4 +99,6 @@ def stream( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. """ - return PipelineStream(self, transaction, explain_options) \ No newline at end of file + return PipelineStream( + PipelineResult, self, transaction, explain_options + ) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 4f25974f95..f544aefadb 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Any, Awaitable, AsyncIterable, Iterable, MutableMapping, TYPE_CHECKING +from typing import Any, Awaitable, AsyncIterable, AsyncIterator, Iterable, Iterator, Generic, MutableMapping, Type, TypeVar, TYPE_CHECKING import copy from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.field_path import get_nested_value @@ -152,14 +152,19 @@ def get(self, field_path: str | FieldPath) -> Any: value = get_nested_value(str_path, self._fields_pb) return _helpers.decode_value(value, self._client) -class _PipelineResultContainer: + +T = TypeVar("T", bound=PipelineResult) + + +class _PipelineResultContainer(Generic[T]): """Helper to hold shared attributes for PipelineSnapshot and PipelineStream""" def __init__( self, + return_type: Type[T], pipeline: Pipeline | AsyncPipeline, transaction: Transaction | AsyncTransaction | None, - explain_options: ExplainOptions | None + explain_options: ExplainOptions | None, ): # public self.transaction = transaction @@ -170,6 +175,7 @@ def __init__( self._started: bool = False self._explain_stats: ExplainStats | None = None self._explain_options: ExplainOptions | None = explain_options + self._return_type = return_type @property def explain_stats(self) -> ExplainStats: @@ -204,7 +210,7 @@ def _build_request(self) -> ExecutePipelineRequest: ) return request - def _process_response(self, response: ExecutePipelineResponse): + def _process_response(self, response: ExecutePipelineResponse) -> Iterable[T]: """Shared logic for processing an individual response from a stream""" if response.explain_stats: self._explain_stats = ExplainStats(response.explain_stats) @@ -213,7 +219,7 @@ def _process_response(self, response: ExecutePipelineResponse): self.execution_time = execution_time for doc in response.results: ref = self._client.document(doc.name) if doc.name else None - yield PipelineResult( + yield self._return_type( self._client, doc.fields, ref, @@ -221,25 +227,25 @@ def _process_response(self, response: ExecutePipelineResponse): doc._pb.create_time if doc.create_time else None, doc._pb.update_time if doc.update_time else None, ) - -class PipelineSnapshot(_PipelineResultContainer, list[PipelineResult]): + +class PipelineSnapshot(_PipelineResultContainer[T], list[T]): """ A list type that holds the result of a pipeline.execute() operation, along with related metadata """ - def __init__(self, results_list: list[PipelineResult], source: _PipelineResultContainer): + def __init__(self, results_list: list[T], source: _PipelineResultContainer[T]): self.__dict__.update(copy.deepcopy(source.__dict__)) list.__init__(self, results_list) # snapshots are always complete self._started = True -class PipelineStream(_PipelineResultContainer, Iterable[PipelineResult]): +class PipelineStream(_PipelineResultContainer[T], Iterable[T]): """ An iterable stream representing the result of a pipeline.stream() operation, along with related metadata """ - def __iter__(self): + def __iter__(self) -> Iterator[T]: if self._started: raise RuntimeError(f"{self.__class__.__name__} can only be iterated once") self._started = True @@ -248,12 +254,12 @@ def __iter__(self): for response in stream: yield from self._process_response(response) -class AsyncPipelineStream(_PipelineResultContainer, AsyncIterable[PipelineResult]): +class AsyncPipelineStream(_PipelineResultContainer[T], AsyncIterable[T]): """ An iterable stream representing the result of an async pipeline.stream() operation, along with related metadata """ - async def __aiter__(self): + async def __aiter__(self) -> AsyncIterator[T]: if self._started: raise RuntimeError(f"{self.__class__.__name__} can only be iterated once") self._started = True From cb592cc33d0cf85c720071c7a81651071f940076 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 15:00:06 -0800 Subject: [PATCH 10/20] simplified how arguments are passed --- google/cloud/firestore_v1/async_pipeline.py | 8 ++++---- google/cloud/firestore_v1/pipeline.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index f92763b95a..d4f2d26ad9 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -78,7 +78,8 @@ async def execute( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. """ - stream = self.stream(transaction=transaction, explain_options=explain_options) + kwargs = {k: v for k, v in locals().items() if k != 'self'} + stream = AsyncPipelineStream(PipelineResult, self, **kwargs) results = [result async for result in stream] return PipelineSnapshot(results, stream) @@ -101,6 +102,5 @@ def stream( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. """ - return AsyncPipelineStream( - PipelineResult, self, transaction, explain_options - ) \ No newline at end of file + kwargs = {k: v for k, v in locals().items() if k != 'self'} + return AsyncPipelineStream(PipelineResult, self, **kwargs) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index 1c6c9fa92f..09c298f4ef 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -76,7 +76,8 @@ def execute( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. """ - stream = self.stream(transaction=transaction, explain_options=explain_options) + kwargs = {k: v for k, v in locals().items() if k != 'self'} + stream = PipelineStream(PipelineResult, self, **kwargs) results = [result for result in stream] return PipelineSnapshot(results, stream) @@ -99,6 +100,5 @@ def stream( Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. """ - return PipelineStream( - PipelineResult, self, transaction, explain_options - ) \ No newline at end of file + kwargs = {k: v for k, v in locals().items() if k != 'self'} + return PipelineStream(PipelineResult, self, **kwargs) \ No newline at end of file From 05a40f77e7d044c71eaa6ca4bb4f2280a8692332 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 15:34:21 -0800 Subject: [PATCH 11/20] added explain stats tests --- google/cloud/firestore_v1/pipeline_result.py | 1 - google/cloud/firestore_v1/query_profile.py | 14 +++-- tests/unit/v1/test_query_profile.py | 64 ++++++++++++++++++++ 3 files changed, 72 insertions(+), 7 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index f544aefadb..e5e2230b65 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -20,7 +20,6 @@ from google.cloud.firestore_v1.field_path import FieldPath from google.cloud.firestore_v1.query_profile import ExplainStats from google.cloud.firestore_v1.query_profile import QueryExplainError -from google.cloud.firestore_v1.query_profile import ExplainStats from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest if TYPE_CHECKING: # pragma: NO COVER diff --git a/google/cloud/firestore_v1/query_profile.py b/google/cloud/firestore_v1/query_profile.py index 8aa0d7cac4..0979632e6d 100644 --- a/google/cloud/firestore_v1/query_profile.py +++ b/google/cloud/firestore_v1/query_profile.py @@ -22,7 +22,7 @@ from google.cloud.firestore_v1.types.document import MapValue from google.cloud.firestore_v1.types.document import Value from google.cloud.firestore_v1.types.explain_stats import ExplainStats as ExplainStats_pb - +from google.protobuf.wrappers_pb2 import StringValue @dataclass(frozen=True) class ExplainOptions: @@ -180,11 +180,13 @@ def get_text(self) -> str: Raises: QueryExplainError: If the explain stats cannot be parsed. """ - try: - dict_repr = MessageToDict(self._stats_pb, preserving_proto_field_name=True) - return dict_repr["data"]["value"] - except (KeyError, TypeError) as e: - raise QueryExplainError("Failed to parse explain stats") from e + pb_data = self._stats_pb._pb.data + content = StringValue() + if pb_data.Unpack(content): + return content.value + raise QueryExplainError( + "Unable to decode explain stats. Did you request an output format that returns a string value, such as 'text' or 'json'?" + ) def get_raw(self) -> ExplainStats_pb: """ diff --git a/tests/unit/v1/test_query_profile.py b/tests/unit/v1/test_query_profile.py index a3b0390c61..9c7e211948 100644 --- a/tests/unit/v1/test_query_profile.py +++ b/tests/unit/v1/test_query_profile.py @@ -124,3 +124,67 @@ def test_explain_options__to_dict(): assert ExplainOptions(analyze=True)._to_dict() == {"analyze": True} assert ExplainOptions(analyze=False)._to_dict() == {"analyze": False} + +@pytest.mark.parametrize("analyze_bool,expected_str", [ + (True, "analyze"), (False, "explain") +]) +def test_explain_options__to_value(analyze_bool, expected_str): + """ + Should be able to create a Value protobuf representation of ExplainOptions + """ + from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.types.document import MapValue + from google.cloud.firestore_v1.types.document import Value + + options = ExplainOptions(analyze=analyze_bool) + expected_value = Value( + map_value=MapValue(fields={"mode": Value(string_value=expected_str)}) + ) + assert options._to_value() == expected_value + + +def test_explain_stats_get_raw(): + """ + Test ExplainStats.get_raw(). Should return input directly + """ + from google.cloud.firestore_v1.query_profile import ExplainStats + + input = object() + stats = ExplainStats(input) + assert stats.get_raw() is input + + +def test_explain_stats_get_text(): + """ + Test ExplainStats.get_text() + """ + from google.cloud.firestore_v1.query_profile import ExplainStats + from google.cloud.firestore_v1.types import explain_stats as explain_stats_pb2 + from google.protobuf import any_pb2 + from google.protobuf import wrappers_pb2 + + expected_text = "some text" + text_pb = any_pb2.Any() + text_pb.Pack(wrappers_pb2.StringValue(value=expected_text)) + expected_stats_pb = explain_stats_pb2.ExplainStats(data=text_pb) + stats = ExplainStats(expected_stats_pb) + assert stats.get_text() == expected_text + + +def test_explain_stats_get_text_error(): + """ + Test ExplainStats.get_text() raises QueryExplainError + """ + from google.cloud.firestore_v1.query_profile import ( + ExplainStats, + QueryExplainError, + ) + from google.cloud.firestore_v1.types import explain_stats as explain_stats_pb2 + from google.cloud.firestore_v1.types import document + from google.protobuf import any_pb2 + + expected_stats_pb = explain_stats_pb2.ExplainStats(data={}) + stats = ExplainStats(expected_stats_pb) + with pytest.raises(QueryExplainError) as exc: + stats.get_text() + assert "Unable to decode explain stats" in str(exc.value) From ee0dc0f74cee581b8f275d78abff92988f4f37f5 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 21:02:06 -0800 Subject: [PATCH 12/20] got tests passing --- google/cloud/firestore_v1/pipeline_result.py | 2 +- tests/unit/v1/test_async_pipeline.py | 18 ------------ tests/unit/v1/test_pipeline.py | 29 ++++++++------------ 3 files changed, 13 insertions(+), 36 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index e5e2230b65..fb7b0da438 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -233,7 +233,7 @@ class PipelineSnapshot(_PipelineResultContainer[T], list[T]): A list type that holds the result of a pipeline.execute() operation, along with related metadata """ def __init__(self, results_list: list[T], source: _PipelineResultContainer[T]): - self.__dict__.update(copy.deepcopy(source.__dict__)) + self.__dict__.update(source.__dict__.copy()) list.__init__(self, results_list) # snapshots are always complete self._started = True diff --git a/tests/unit/v1/test_async_pipeline.py b/tests/unit/v1/test_async_pipeline.py index 2fc39a906e..f1d6db507e 100644 --- a/tests/unit/v1/test_async_pipeline.py +++ b/tests/unit/v1/test_async_pipeline.py @@ -359,24 +359,6 @@ async def test_async_pipeline_stream_stream_equivalence(): assert stream_results[0].data()["key"] == "str_val" -@pytest.mark.asyncio -async def test_async_pipeline_stream_stream_equivalence_mocked(): - """ - pipeline.stream should call pipeline.stream internally - """ - ppl_1 = _make_async_pipeline() - expected_data = [object(), object()] - expected_arg = object() - with mock.patch.object(ppl_1, "stream") as mock_stream: - mock_stream.return_value = _async_it(expected_data) - stream_results = await ppl_1.execute(expected_arg) - assert mock_stream.call_count == 1 - assert mock_stream.call_args[0] == () - assert len(mock_stream.call_args[1]) == 1 - assert mock_stream.call_args[1]["transaction"] == expected_arg - assert stream_results == expected_data - - @pytest.mark.parametrize( "method,args,result_cls", [ diff --git a/tests/unit/v1/test_pipeline.py b/tests/unit/v1/test_pipeline.py index e203f6d691..e8ac437893 100644 --- a/tests/unit/v1/test_pipeline.py +++ b/tests/unit/v1/test_pipeline.py @@ -96,6 +96,18 @@ def test_pipeline__to_pb(): assert pb.pipeline.stages[1] == stage_2._to_pb() +def test_pipeline__to_pb_with_options(): + from google.cloud.firestore_v1.types.pipeline import StructuredPipeline + from google.cloud.firestore_v1.types.document import Value + + ppl = _make_pipeline() + options = {"option_1": Value(integer_value=1)} + pb = ppl._to_pb(**options) + assert isinstance(pb, StructuredPipeline) + assert pb.options["option_1"].integer_value == 1 + + + def test_pipeline_append(): """append should create a new pipeline with the additional stage""" @@ -337,23 +349,6 @@ def test_pipeline_execute_stream_equivalence(): assert execute_results[0].data()["key"] == "str_val" -def test_pipeline_execute_stream_equivalence_mocked(): - """ - pipeline.execute should call pipeline.stream internally - """ - ppl_1 = _make_pipeline() - expected_data = [object(), object()] - expected_arg = object() - with mock.patch.object(ppl_1, "stream") as mock_stream: - mock_stream.return_value = expected_data - stream_results = ppl_1.execute(expected_arg) - assert mock_stream.call_count == 1 - assert mock_stream.call_args[0] == () - assert len(mock_stream.call_args[1]) == 1 - assert mock_stream.call_args[1]["transaction"] == expected_arg - assert stream_results == expected_data - - @pytest.mark.parametrize( "method,args,result_cls", [ From 06fe8da5b03ce42d37408758e64640d3d5c913bc Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 21:39:32 -0800 Subject: [PATCH 13/20] added unit tests --- google/cloud/firestore_v1/pipeline_result.py | 6 +- tests/unit/v1/test_pipeline_result.py | 132 +++++++++++++++++++ 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index fb7b0da438..06cba49ddc 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -185,7 +185,7 @@ def explain_stats(self) -> ExplainStats: elif not self._started: raise QueryExplainError("stream not started") else: - raise QueryExplainError("explain_options not found") + raise QueryExplainError("explain_stats not found") def _build_request(self) -> ExecutePipelineRequest: """ @@ -265,5 +265,5 @@ async def __aiter__(self) -> AsyncIterator[T]: request = self._build_request() stream = await self._client._firestore_api.execute_pipeline(request) async for response in stream: - for response in self._process_response(response): - yield response \ No newline at end of file + for result in self._process_response(response): + yield result \ No newline at end of file diff --git a/tests/unit/v1/test_pipeline_result.py b/tests/unit/v1/test_pipeline_result.py index 2facf71104..a11bf69b53 100644 --- a/tests/unit/v1/test_pipeline_result.py +++ b/tests/unit/v1/test_pipeline_result.py @@ -16,6 +16,10 @@ import pytest from google.cloud.firestore_v1.pipeline_result import PipelineResult +from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot +from google.cloud.firestore_v1.pipeline_result import PipelineStream +from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream +from google.cloud.firestore_v1.query_profile import QueryExplainError class TestPipelineResult: @@ -174,3 +178,131 @@ def test_get_call(self): got = instance.get("key") decode_mock.assert_called_once_with("value", client) assert got == decode_mock.return_value + +class TestPipelineSnapshot: + + def _make_one(self, *args, **kwargs): + if not args: + # use defaults if not passed + args = [[], mock.Mock()] + return PipelineSnapshot(*args, **kwargs) + + def test_ctor(self): + in_arr = [1, 2, 3] + expected_type = object() + expected_pipeline = mock.Mock() + expected_transaction = object() + expected_options = object() + source = PipelineStream(expected_type, expected_pipeline, expected_transaction, expected_options) + instance = self._make_one(in_arr, source) + assert instance._return_type == expected_type + assert instance.pipeline == expected_pipeline + assert instance._client == expected_pipeline._client + assert instance._explain_options == expected_options + assert instance._explain_stats is None + assert instance._started is True + assert instance.execution_time is None + + def test_list_methods(self): + instance = self._make_one(list(range(10)), mock.Mock()) + assert isinstance(instance, list) + assert len(instance) == 10 + assert instance[0] == 0 + assert instance[-1] == 9 + + def test_explain_stats(self): + instance = self._make_one() + expected_stats = mock.Mock() + instance._explain_stats = expected_stats + assert instance.explain_stats == expected_stats + # test different failure modes + instance._explain_stats = None + instance._explain_options = None + # fail if explain_stats set without explain_options + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_options not set" in str(e) + # fail if explain_stats missing + instance._explain_options = object() + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_stats not found" in str(e) + + + +class TestPipelineStream: + def _make_one(self, *args, **kwargs): + if not args: + # use defaults if not passed + args = [PipelineResult, mock.Mock(), None, None] + return PipelineStream(*args, **kwargs) + + def test_explain_stats(self): + instance = self._make_one() + expected_stats = mock.Mock() + instance._started = True + instance._explain_stats = expected_stats + assert instance.explain_stats == expected_stats + # test different failure modes + instance._explain_stats = None + instance._explain_options = None + # fail if explain_stats set without explain_options + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_options not set" in str(e) + # fail if explain_stats missing + instance._explain_options = object() + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_stats not found" in str(e) + # fail if not started + instance._started = False + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "stream not started" in str(e) + + def test_double_iterate(self): + instance = self._make_one() + instance.pipeline._client.project = "project-id" + instance.pipeline._client._database = "database-id" + instance.pipeline._to_pb.return_value = {} + instance._client._firestore_api.execute_pipeline.return_value = [] + # consume the iterator + list(instance) + with pytest.raises(RuntimeError): + list(instance) + + +class TestAsyncPipelineStream: + def _make_one(self, *args, **kwargs): + if not args: + # use defaults if not passed + args = [PipelineResult, mock.Mock(), None, None] + return AsyncPipelineStream(*args, **kwargs) + + def test_explain_stats(self): + instance = self._make_one() + expected_stats = mock.Mock() + instance._started = True + instance._explain_stats = expected_stats + assert instance.explain_stats == expected_stats + # test different failure modes + instance._explain_stats = None + instance._explain_options = None + # fail if explain_stats set without explain_options + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_options not set" in str(e) + # fail if explain_stats missing + instance._explain_options = object() + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_stats not found" in str(e) + # fail if not started + instance._started = False + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "stream not started" in str(e) + + def test_double_iterate(self): + pass From bb911f824b5c167ca83c93c65d341522dba3df93 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 21:48:55 -0800 Subject: [PATCH 14/20] added more tests for pipeline streams --- tests/unit/v1/test_pipeline_result.py | 96 ++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 2 deletions(-) diff --git a/tests/unit/v1/test_pipeline_result.py b/tests/unit/v1/test_pipeline_result.py index a11bf69b53..7f5971d4b4 100644 --- a/tests/unit/v1/test_pipeline_result.py +++ b/tests/unit/v1/test_pipeline_result.py @@ -15,13 +15,31 @@ import mock import pytest +from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot from google.cloud.firestore_v1.pipeline_result import PipelineStream from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream from google.cloud.firestore_v1.query_profile import QueryExplainError +from google.cloud.firestore_v1.types.document import Document +from google.protobuf.timestamp_pb2 import Timestamp +_mock_stream_responses = [ + ExecutePipelineResponse( + results=[ + Document(name="projects/p/databases/d/documents/c/d1", fields={}) + ], + execution_time=Timestamp(seconds=1, nanos=2), + ), + ExecutePipelineResponse( + results=[ + Document(name="projects/p/databases/d/documents/c/d2", fields={}) + ], + execution_time=Timestamp(seconds=3, nanos=4), + ), +] + class TestPipelineResult: def _make_one(self, *args, **kwargs): if not args: @@ -261,6 +279,30 @@ def test_explain_stats(self): instance.explain_stats assert "stream not started" in str(e) + def test_iter(self): + pipeline = mock.Mock() + pipeline._client.project = "project-id" + pipeline._client._database = "database-id" + pipeline._client.document.side_effect = lambda path: mock.Mock(id=path.split("/")[-1]) + pipeline._to_pb.return_value = {} + + instance = self._make_one(PipelineResult, pipeline, None, None) + + instance._client._firestore_api.execute_pipeline.return_value = _mock_stream_responses + + results = list(instance) + + assert len(results) == 2 + assert isinstance(results[0], PipelineResult) + assert results[0].id == "d1" + assert isinstance(results[1], PipelineResult) + assert results[1].id == "d2" + + assert instance.execution_time.seconds == 1 + assert instance.execution_time.nanos == 2 + + instance._client._firestore_api.execute_pipeline.assert_called_once() + def test_double_iterate(self): instance = self._make_one() instance.pipeline._client.project = "project-id" @@ -304,5 +346,55 @@ def test_explain_stats(self): instance.explain_stats assert "stream not started" in str(e) - def test_double_iterate(self): - pass + @pytest.mark.asyncio + async def test_aiter(self): + pipeline = mock.Mock() + pipeline._client.project = "project-id" + pipeline._client._database = "database-id" + pipeline._client.document.side_effect = lambda path: mock.Mock(id=path.split("/")[-1]) + pipeline._to_pb.return_value = {} + + instance = self._make_one(PipelineResult, pipeline, None, None) + + async def async_gen(items): + for item in items: + yield item + + instance._client._firestore_api.execute_pipeline = mock.AsyncMock( + return_value=async_gen(_mock_stream_responses) + ) + + results = [item async for item in instance] + + assert len(results) == 2 + assert isinstance(results[0], PipelineResult) + assert results[0].id == "d1" + assert isinstance(results[1], PipelineResult) + assert results[1].id == "d2" + + assert instance.execution_time.seconds == 1 + assert instance.execution_time.nanos == 2 + + instance._client._firestore_api.execute_pipeline.assert_called_once() + + @pytest.mark.asyncio + async def test_double_iterate(self): + instance = self._make_one() + instance.pipeline._client.project = "project-id" + instance.pipeline._client._database = "database-id" + instance.pipeline._to_pb.return_value = {} + + async def async_gen(items): + for item in items: + yield item + + # mock the api call to avoid real network requests + instance._client._firestore_api.execute_pipeline = mock.AsyncMock( + return_value=async_gen([]) + ) + + # consume the iterator + [item async for item in instance] + # should fail on second attempt + with pytest.raises(RuntimeError): + [item async for item in instance] From e2137f2c70a5f20bc142f00bda703077f4a13d90 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 6 Nov 2025 22:21:30 -0800 Subject: [PATCH 15/20] added system test --- google/cloud/firestore_v1/pipeline_result.py | 2 +- tests/system/test_system.py | 74 +++++++++++++++++++ tests/system/test_system_async.py | 77 ++++++++++++++++++++ tests/unit/v1/test_pipeline_result.py | 4 +- 4 files changed, 154 insertions(+), 3 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 06cba49ddc..9e9e52f868 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -183,7 +183,7 @@ def explain_stats(self) -> ExplainStats: elif self._explain_options is None: raise QueryExplainError("explain_options not set on query.") elif not self._started: - raise QueryExplainError("stream not started") + raise QueryExplainError("explain_stats not available until query is complete") else: raise QueryExplainError("explain_stats not found") diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 592a73f671..f29caeee5c 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -46,6 +46,7 @@ TEST_DATABASES, TEST_DATABASES_W_ENTERPRISE, IS_KOKORO_TEST, + FIRESTORE_ENTERPRISE_DB, ) @@ -1650,6 +1651,79 @@ def test_query_stream_or_get_w_explain_options_analyze_false( explain_metrics.execution_stats +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["execute", "stream"]) +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) +def test_pipeline_explain_options_explain_mode( + database, method, query_docs +): + """Explain currently not supported by backend. Expect error""" + from google.api_core.exceptions import InvalidArgument + from google.cloud.firestore_v1.query_profile import ( + ExplainOptions, + ExplainStats, + ) + + collection, _, _ = query_docs + pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() + + # Tests either `execute()` or `stream()`. + method_under_test = getattr(pipeline, method) + explain_options = ExplainOptions(analyze=False) + + # for now, expect error on explain mode + with pytest.raises(InvalidArgument) as e: + results = method_under_test(explain_options=explain_options) + list(results) + assert"Explain execution mode is not supported" in str(e) + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["execute", "stream"]) +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) +def test_pipeline_explain_options_analyze_mode( + database, method, query_docs +): + from google.cloud.firestore_v1.query_profile import ( + ExplainOptions, + ExplainStats, + QueryExplainError, + ) + from google.cloud.firestore_v1.types.explain_stats import ExplainStats as ExplainStats_pb + + collection, _, allowed_vals = query_docs + pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() + + # Tests either `execute()` or `stream()`. + method_under_test = getattr(pipeline, method) + results = method_under_test(explain_options=ExplainOptions(analyze=True)) + + if method == "stream": + # check for error accessing explain stats before iterating + with pytest.raises( + QueryExplainError, + match="explain_stats not available until query is complete", + ): + results.explain_stats + + # Finish iterating results, and explain_stats should be available. + results_list = list(results) + num_results = len(results_list) + assert num_results == len(allowed_vals) + + # Verify explain_stats. + explain_stats = results.explain_stats + assert isinstance(explain_stats, ExplainStats) + + assert isinstance(explain_stats.get_raw(), ExplainStats_pb) + text_stats = explain_stats.get_text() + assert "Execution:" in text_stats + + @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) def test_query_stream_w_read_time(query_docs, cleanup, database): collection, stored, allowed_vals = query_docs diff --git a/tests/system/test_system_async.py b/tests/system/test_system_async.py index f87da01121..9cc0e7aaab 100644 --- a/tests/system/test_system_async.py +++ b/tests/system/test_system_async.py @@ -57,6 +57,7 @@ TEST_DATABASES, TEST_DATABASES_W_ENTERPRISE, IS_KOKORO_TEST, + FIRESTORE_ENTERPRISE_DB, ) RETRIES = retries.AsyncRetry( @@ -1571,6 +1572,82 @@ async def test_query_stream_or_get_w_explain_options_analyze_false( _verify_explain_metrics_analyze_false(explain_metrics) +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["execute", "stream"]) +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) +async def test_pipeline_explain_options_explain_mode( + database, method, query_docs +): + """Explain currently not supported by backend. Expect error""" + from google.api_core.exceptions import InvalidArgument + from google.cloud.firestore_v1.query_profile import ( + ExplainOptions, + ) + + collection, _, _ = query_docs + pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() + + method_under_test = getattr(pipeline, method) + explain_options = ExplainOptions(analyze=False) + + with pytest.raises(InvalidArgument) as e: + if method == "stream": + results = method_under_test(explain_options=explain_options) + _ = [i async for i in results] + else: + await method_under_test(explain_options=explain_options) + + assert "Explain execution mode is not supported" in str(e.value) + + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["execute", "stream"]) +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) +async def test_pipeline_explain_options_analyze_mode( + database, method, query_docs +): + from google.cloud.firestore_v1.query_profile import ( + ExplainOptions, + ExplainStats, + QueryExplainError, + ) + from google.cloud.firestore_v1.types.explain_stats import ( + ExplainStats as ExplainStats_pb, + ) + + collection, _, allowed_vals = query_docs + pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() + + method_under_test = getattr(pipeline, method) + explain_options = ExplainOptions(analyze=True) + + if method == "execute": + results = await method_under_test(explain_options=explain_options) + num_results = len(results) + else: + results = method_under_test(explain_options=explain_options) + with pytest.raises( + QueryExplainError, + match="explain_stats not available until query is complete", + ): + results.explain_stats + + num_results = len([item async for item in results]) + + explain_stats = results.explain_stats + + assert num_results == len(allowed_vals) + + assert isinstance(explain_stats, ExplainStats) + assert isinstance(explain_stats.get_raw(), ExplainStats_pb) + text_stats = explain_stats.get_text() + assert "Execution:" in text_stats + + @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) async def test_query_stream_w_read_time(query_docs, cleanup, database): collection, stored, allowed_vals = query_docs diff --git a/tests/unit/v1/test_pipeline_result.py b/tests/unit/v1/test_pipeline_result.py index 7f5971d4b4..49c697a473 100644 --- a/tests/unit/v1/test_pipeline_result.py +++ b/tests/unit/v1/test_pipeline_result.py @@ -277,7 +277,7 @@ def test_explain_stats(self): instance._started = False with pytest.raises(QueryExplainError) as e: instance.explain_stats - assert "stream not started" in str(e) + assert "not available until query is complete" in str(e) def test_iter(self): pipeline = mock.Mock() @@ -344,7 +344,7 @@ def test_explain_stats(self): instance._started = False with pytest.raises(QueryExplainError) as e: instance.explain_stats - assert "stream not started" in str(e) + assert "not available until query is complete" in str(e) @pytest.mark.asyncio async def test_aiter(self): From da5ef805f06d9bf2fb134ba100829db83d0e15f2 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 7 Nov 2025 09:53:22 -0800 Subject: [PATCH 16/20] added unit tests --- google/cloud/firestore_v1/async_pipeline.py | 12 ++ google/cloud/firestore_v1/pipeline.py | 12 ++ google/cloud/firestore_v1/pipeline_result.py | 11 +- tests/system/test_system.py | 57 ++++++++- tests/system/test_system_async.py | 41 +++++++ tests/unit/v1/test_pipeline_result.py | 117 ++++++++++++++----- 6 files changed, 216 insertions(+), 34 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index d4f2d26ad9..b15a492d4d 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -64,6 +64,8 @@ async def execute( *, transaction: "AsyncTransaction" | None = None, explain_options: ExplainOptions | None = None, + index_mode: str | None = None, + additional_options: dict[str, Value | Constant] = {}, ) -> PipelineSnapshot[PipelineResult]: """ Executes this pipeline and returns results as a list @@ -77,6 +79,10 @@ async def execute( explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. + index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. + Firestore will reject the request if there is not appropiate indexes to serve the query. + additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. + These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ kwargs = {k: v for k, v in locals().items() if k != 'self'} stream = AsyncPipelineStream(PipelineResult, self, **kwargs) @@ -88,6 +94,8 @@ def stream( *, transaction: "AsyncTransaction" | None = None, explain_options: ExplainOptions | None = None, + index_mode: str | None = None, + additional_options: dict[str, Value | Constant] = {}, ) -> AsyncPipelineStream[PipelineResult]: """ Process this pipeline as a stream, providing results through an AsyncIterable @@ -101,6 +109,10 @@ def stream( explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. + index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. + Firestore will reject the request if there is not appropiate indexes to serve the query. + additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. + These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ kwargs = {k: v for k, v in locals().items() if k != 'self'} return AsyncPipelineStream(PipelineResult, self, **kwargs) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index 09c298f4ef..98838fc0e7 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -62,6 +62,8 @@ def execute( *, transaction: "Transaction" | None = None, explain_options: ExplainOptions | None = None, + index_mode: str | None = None, + additional_options: dict[str, Value | Constant] = {}, ) -> PipelineSnapshot[PipelineResult]: """ Executes this pipeline and returns results as a list @@ -75,6 +77,10 @@ def execute( explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. + index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. + Firestore will reject the request if there is not appropiate indexes to serve the query. + additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. + These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ kwargs = {k: v for k, v in locals().items() if k != 'self'} stream = PipelineStream(PipelineResult, self, **kwargs) @@ -86,6 +92,8 @@ def stream( *, transaction: "Transaction" | None = None, explain_options: ExplainOptions | None = None, + index_mode: str | None = None, + additional_options: dict[str, Value | Constant] = {}, ) -> PipelineStream[PipelineResult]: """ Process this pipeline as a stream, providing results through an Iterable @@ -99,6 +107,10 @@ def stream( explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. + index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. + Firestore will reject the request if there is not appropiate indexes to serve the query. + additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. + These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ kwargs = {k: v for k, v in locals().items() if k != 'self'} return PipelineStream(PipelineResult, self, **kwargs) \ No newline at end of file diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 9e9e52f868..b4153ffc80 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -21,6 +21,7 @@ from google.cloud.firestore_v1.query_profile import ExplainStats from google.cloud.firestore_v1.query_profile import QueryExplainError from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest +from google.cloud.firestore_v1.types.document import Value if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient @@ -164,6 +165,8 @@ def __init__( pipeline: Pipeline | AsyncPipeline, transaction: Transaction | AsyncTransaction | None, explain_options: ExplainOptions | None, + index_mode: str | None, + additional_options: dict[str, Constant | Value], ): # public self.transaction = transaction @@ -175,6 +178,8 @@ def __init__( self._explain_stats: ExplainStats | None = None self._explain_options: ExplainOptions | None = explain_options self._return_type = return_type + self._index_mode = index_mode + self._additonal_options = {k: v if isinstance(v, Value) else v._to_pb() for k,v in additional_options.items()} @property def explain_stats(self) -> ExplainStats: @@ -195,13 +200,17 @@ def _build_request(self) -> ExecutePipelineRequest: f"projects/{self._client.project}/databases/{self._client._database}" ) transaction_id = ( - _helpers.get_transaction_id(self.transaction) + _helpers.get_transaction_id(self.transaction, read_operation=False) if self.transaction is not None else None ) options = {} if self._explain_options: options["explain_options"] = self._explain_options._to_value() + if self._index_mode: + options["index_mode"] = Value(string_value=self._index_mode) + if self._additonal_options: + options.update(self._additonal_options) request = ExecutePipelineRequest( database=database_name, transaction=transaction_id, diff --git a/tests/system/test_system.py b/tests/system/test_system.py index f29caeee5c..9c57d4aa54 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -21,6 +21,7 @@ import google.auth import pytest +import mock from google.api_core.exceptions import ( AlreadyExists, FailedPrecondition, @@ -1660,7 +1661,6 @@ def test_pipeline_explain_options_explain_mode( database, method, query_docs ): """Explain currently not supported by backend. Expect error""" - from google.api_core.exceptions import InvalidArgument from google.cloud.firestore_v1.query_profile import ( ExplainOptions, ExplainStats, @@ -1724,6 +1724,61 @@ def test_pipeline_explain_options_analyze_mode( assert "Execution:" in text_stats +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["execute", "stream"]) +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) +def test_pipeline_explain_options_using_additional_options( + database, method, query_docs +): + """additional_options field allows passing in arbitrary options. Test with explain_options""" + from google.cloud.firestore_v1.query_profile import ( + ExplainOptions, + ExplainStats, + QueryExplainError, + ) + from google.cloud.firestore_v1.types.explain_stats import ExplainStats as ExplainStats_pb + + collection, _, allowed_vals = query_docs + pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() + + # Tests either `execute()` or `stream()`. + method_under_test = getattr(pipeline, method) + + encoded_options = {"explain_options": ExplainOptions(analyze=True)._to_value()} + + results = method_under_test(explain_options=mock.Mock(), additional_options=encoded_options) + + # Finish iterating results, and explain_stats should be available. + results_list = list(results) + num_results = len(results_list) + assert num_results == len(allowed_vals) + + # Verify explain_stats. + explain_stats = results.explain_stats + assert isinstance(explain_stats, ExplainStats) + + assert isinstance(explain_stats.get_raw(), ExplainStats_pb) + text_stats = explain_stats.get_text() + assert "Execution:" in text_stats + +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) +def test_pipeline_index_mode( + database, query_docs +): + """test pipeline query with explicit index mode""" + + collection, _, allowed_vals = query_docs + pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() + with pytest.raises(InvalidArgument) as e: + pipeline.execute(index_mode="fake_index") + assert "Invalid index_mode: fake_index" in str(e) + + @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) def test_query_stream_w_read_time(query_docs, cleanup, database): collection, stored, allowed_vals = query_docs diff --git a/tests/system/test_system_async.py b/tests/system/test_system_async.py index 9cc0e7aaab..891f78ddf4 100644 --- a/tests/system/test_system_async.py +++ b/tests/system/test_system_async.py @@ -22,6 +22,7 @@ import google.auth import pytest import pytest_asyncio +import mock from google.api_core import exceptions as core_exceptions from google.api_core import retry_async as retries from google.api_core.exceptions import ( @@ -1647,6 +1648,46 @@ async def test_pipeline_explain_options_analyze_mode( text_stats = explain_stats.get_text() assert "Execution:" in text_stats +@pytest.mark.skipif( + FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." +) +@pytest.mark.parametrize("method", ["execute", "stream"]) +@pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) +async def test_pipeline_explain_options_using_additional_options( + database, method, query_docs +): + """additional_options field allows passing in arbitrary options. Test with explain_options""" + from google.cloud.firestore_v1.query_profile import ( + ExplainOptions, + ExplainStats, + QueryExplainError, + ) + from google.cloud.firestore_v1.types.explain_stats import ( + ExplainStats as ExplainStats_pb, + ) + + collection, _, allowed_vals = query_docs + pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() + + method_under_test = getattr(pipeline, method) + encoded_options = {"explain_options": ExplainOptions(analyze=True)._to_value()} + + stub = method_under_test(explain_options=mock.Mock(), additional_options=encoded_options) + if method == "execute": + results = await stub + num_results = len(results) + else: + results = stub + num_results = len([item async for item in results]) + + assert num_results == len(allowed_vals) + + explain_stats = results.explain_stats + assert isinstance(explain_stats, ExplainStats) + assert isinstance(explain_stats.get_raw(), ExplainStats_pb) + text_stats = explain_stats.get_text() + assert "Execution:" in text_stats + @pytest.mark.parametrize("database", TEST_DATABASES, indirect=True) async def test_query_stream_w_read_time(query_docs, cleanup, database): diff --git a/tests/unit/v1/test_pipeline_result.py b/tests/unit/v1/test_pipeline_result.py index 49c697a473..36d9ed80bd 100644 --- a/tests/unit/v1/test_pipeline_result.py +++ b/tests/unit/v1/test_pipeline_result.py @@ -16,11 +16,14 @@ import pytest from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse +from google.cloud.firestore_v1.pipeline_expressions import Constant from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot from google.cloud.firestore_v1.pipeline_result import PipelineStream from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream from google.cloud.firestore_v1.query_profile import QueryExplainError +from google.cloud.firestore_v1.query_profile import ExplainOptions +from google.cloud.firestore_v1._helpers import encode_value from google.cloud.firestore_v1.types.document import Document from google.protobuf.timestamp_pb2 import Timestamp @@ -210,13 +213,17 @@ def test_ctor(self): expected_type = object() expected_pipeline = mock.Mock() expected_transaction = object() - expected_options = object() - source = PipelineStream(expected_type, expected_pipeline, expected_transaction, expected_options) + expected_explain_options = object() + expected_index_mode = "mode" + expected_addtl_options = {} + source = PipelineStream(expected_type, expected_pipeline, expected_transaction, expected_explain_options, expected_index_mode, expected_addtl_options) instance = self._make_one(in_arr, source) assert instance._return_type == expected_type assert instance.pipeline == expected_pipeline assert instance._client == expected_pipeline._client - assert instance._explain_options == expected_options + assert instance._additonal_options == expected_addtl_options + assert instance._index_mode == expected_index_mode + assert instance._explain_options == expected_explain_options assert instance._explain_stats is None assert instance._started is True assert instance.execution_time is None @@ -246,13 +253,82 @@ def test_explain_stats(self): instance.explain_stats assert "explain_stats not found" in str(e) +class SharedStreamTests: + """ + Shared test logic for PipelineStream and AsyncPipelineStream + """ + def _make_one(self, *args, **kwargs): + raise NotImplementedError + def test_explain_stats(self): + instance = self._make_one() + expected_stats = mock.Mock() + instance._started = True + instance._explain_stats = expected_stats + assert instance.explain_stats == expected_stats + # test different failure modes + instance._explain_stats = None + instance._explain_options = None + # fail if explain_stats set without explain_options + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_options not set" in str(e) + # fail if explain_stats missing + instance._explain_options = object() + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "explain_stats not found" in str(e) + # fail if not started + instance._started = False + with pytest.raises(QueryExplainError) as e: + instance.explain_stats + assert "not available until query is complete" in str(e) -class TestPipelineStream: + @pytest.mark.parametrize("init_kwargs,expected_options", [ + ({"index_mode": "mode"}, {"index_mode": encode_value("mode")}), + ({"explain_options": ExplainOptions(analyze=True)}, {"explain_options": encode_value({"mode": "analyze"})}), + ({"explain_options": ExplainOptions(analyze=False)}, {"explain_options": encode_value({"mode": "explain"})}), + ({"additional_options": {"explain_options": Constant("custom")}}, {"explain_options": encode_value("custom")}), + ({"additional_options": {"explain_options": encode_value("custom")}}, {"explain_options": encode_value("custom")}), + ({"explain_options": ExplainOptions(), "additional_options": {"explain_options": Constant.of("override")}}, {"explain_options": encode_value("override")}), + ({"index_mode": "mode", "additional_options": {"index_mode": Constant("new")}}, {"index_mode": encode_value("new")}), + ]) + def test_build_request_options(self, init_kwargs, expected_options): + """ + Certain Arguments to PipelineStream should be passed to `options` field in proto request + """ + from google.cloud.firestore_v1.pipeline import Pipeline + option_kwargs = { + "explain_options": None, + "index_mode": None, + "additional_options": {}, + } + option_kwargs.update(init_kwargs) + pipeline = Pipeline(mock.Mock()) + instance = self._make_one(None, pipeline, None, **option_kwargs) + request = instance._build_request() + options = dict(request.structured_pipeline.options) + assert options == expected_options + assert len(options) == len(expected_options) + + def test_build_request_transaction(self): + """Ensure transaction is passed down when building request""" + from google.cloud.firestore_v1.pipeline import Pipeline + from google.cloud.firestore_v1.transaction import Transaction + + pipeline = Pipeline(mock.Mock()) + expected_id = b"expected" + transaction = Transaction(mock.Mock()) + transaction._id = expected_id + instance = self._make_one(None, pipeline, transaction, None, None, {}) + request = instance._build_request() + assert request.transaction == expected_id + +class TestPipelineStream(SharedStreamTests): def _make_one(self, *args, **kwargs): if not args: # use defaults if not passed - args = [PipelineResult, mock.Mock(), None, None] + args = [PipelineResult, mock.Mock(), None, None, None, {}] return PipelineStream(*args, **kwargs) def test_explain_stats(self): @@ -286,7 +362,7 @@ def test_iter(self): pipeline._client.document.side_effect = lambda path: mock.Mock(id=path.split("/")[-1]) pipeline._to_pb.return_value = {} - instance = self._make_one(PipelineResult, pipeline, None, None) + instance = self._make_one(PipelineResult, pipeline, None, None, None, {}) instance._client._firestore_api.execute_pipeline.return_value = _mock_stream_responses @@ -315,36 +391,13 @@ def test_double_iterate(self): list(instance) -class TestAsyncPipelineStream: +class TestAsyncPipelineStream(SharedStreamTests): def _make_one(self, *args, **kwargs): if not args: # use defaults if not passed - args = [PipelineResult, mock.Mock(), None, None] + args = [PipelineResult, mock.Mock(), None, None, None, {}] return AsyncPipelineStream(*args, **kwargs) - def test_explain_stats(self): - instance = self._make_one() - expected_stats = mock.Mock() - instance._started = True - instance._explain_stats = expected_stats - assert instance.explain_stats == expected_stats - # test different failure modes - instance._explain_stats = None - instance._explain_options = None - # fail if explain_stats set without explain_options - with pytest.raises(QueryExplainError) as e: - instance.explain_stats - assert "explain_options not set" in str(e) - # fail if explain_stats missing - instance._explain_options = object() - with pytest.raises(QueryExplainError) as e: - instance.explain_stats - assert "explain_stats not found" in str(e) - # fail if not started - instance._started = False - with pytest.raises(QueryExplainError) as e: - instance.explain_stats - assert "not available until query is complete" in str(e) @pytest.mark.asyncio async def test_aiter(self): @@ -354,7 +407,7 @@ async def test_aiter(self): pipeline._client.document.side_effect = lambda path: mock.Mock(id=path.split("/")[-1]) pipeline._to_pb.return_value = {} - instance = self._make_one(PipelineResult, pipeline, None, None) + instance = self._make_one(PipelineResult, pipeline, None, None, None, {}) async def async_gen(items): for item in items: From 36b2d3f0a6bef05cd347dab51aaefc8857e420e0 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 7 Nov 2025 10:33:14 -0800 Subject: [PATCH 17/20] fixed lint --- google/cloud/firestore_v1/async_pipeline.py | 10 ++- google/cloud/firestore_v1/base_pipeline.py | 8 +- google/cloud/firestore_v1/pipeline.py | 11 +-- google/cloud/firestore_v1/pipeline_result.py | 32 ++++++-- google/cloud/firestore_v1/query_profile.py | 8 +- tests/system/test_system.py | 29 ++++--- tests/system/test_system_async.py | 14 ++-- tests/unit/v1/test_pipeline.py | 1 - tests/unit/v1/test_pipeline_result.py | 84 +++++++++++++++----- tests/unit/v1/test_query_profile.py | 9 +-- 10 files changed, 130 insertions(+), 76 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index b15a492d4d..488a0f3263 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import AsyncIterable, TYPE_CHECKING +from typing import TYPE_CHECKING from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream @@ -23,6 +23,8 @@ if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.async_client import AsyncClient from google.cloud.firestore_v1.async_transaction import AsyncTransaction + from google.cloud.firestore_v1.pipeline_expressions import Constant + from google.cloud.firestore_v1.types.document import Value from google.cloud.firestore_v1.query_profile import ExplainOptions @@ -84,7 +86,7 @@ async def execute( additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ - kwargs = {k: v for k, v in locals().items() if k != 'self'} + kwargs = {k: v for k, v in locals().items() if k != "self"} stream = AsyncPipelineStream(PipelineResult, self, **kwargs) results = [result async for result in stream] return PipelineSnapshot(results, stream) @@ -114,5 +116,5 @@ def stream( additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ - kwargs = {k: v for k, v in locals().items() if k != 'self'} - return AsyncPipelineStream(PipelineResult, self, **kwargs) \ No newline at end of file + kwargs = {k: v for k, v in locals().items() if k != "self"} + return AsyncPipelineStream(PipelineResult, self, **kwargs) diff --git a/google/cloud/firestore_v1/base_pipeline.py b/google/cloud/firestore_v1/base_pipeline.py index cfe699770a..1535646639 100644 --- a/google/cloud/firestore_v1/base_pipeline.py +++ b/google/cloud/firestore_v1/base_pipeline.py @@ -13,15 +13,13 @@ # limitations under the License. from __future__ import annotations -from typing import Iterable, Sequence, TYPE_CHECKING +from typing import Sequence, TYPE_CHECKING from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.types.pipeline import ( StructuredPipeline as StructuredPipeline_pb, ) from google.cloud.firestore_v1.vector import Vector from google.cloud.firestore_v1.base_vector_query import DistanceMeasure -from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest -from google.cloud.firestore_v1.pipeline_result import PipelineResult from google.cloud.firestore_v1.pipeline_expressions import ( AggregateFunction, AliasedExpression, @@ -30,14 +28,10 @@ BooleanExpression, Selectable, ) -from google.cloud.firestore_v1 import _helpers if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.client import Client from google.cloud.firestore_v1.async_client import AsyncClient - from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse - from google.cloud.firestore_v1.transaction import BaseTransaction - from google.cloud.firestore_v1.query_profile import ExplainOptions class _BasePipeline: diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index 98838fc0e7..c65ac3f20a 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Iterable, TYPE_CHECKING +from typing import TYPE_CHECKING from google.cloud.firestore_v1 import pipeline_stages as stages from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.pipeline_result import PipelineStream @@ -22,8 +22,9 @@ if TYPE_CHECKING: # pragma: NO COVER from google.cloud.firestore_v1.client import Client + from google.cloud.firestore_v1.pipeline_expressions import Constant from google.cloud.firestore_v1.transaction import Transaction - from google.cloud.firestore_v1.query_profile import ExplainMetrics + from google.cloud.firestore_v1.types.document import Value from google.cloud.firestore_v1.query_profile import ExplainOptions @@ -82,7 +83,7 @@ def execute( additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ - kwargs = {k: v for k, v in locals().items() if k != 'self'} + kwargs = {k: v for k, v in locals().items() if k != "self"} stream = PipelineStream(PipelineResult, self, **kwargs) results = [result for result in stream] return PipelineSnapshot(results, stream) @@ -112,5 +113,5 @@ def stream( additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode) """ - kwargs = {k: v for k, v in locals().items() if k != 'self'} - return PipelineStream(PipelineResult, self, **kwargs) \ No newline at end of file + kwargs = {k: v for k, v in locals().items() if k != "self"} + return PipelineStream(PipelineResult, self, **kwargs) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index b4153ffc80..d80b9c3b26 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -13,8 +13,18 @@ # limitations under the License. from __future__ import annotations -from typing import Any, Awaitable, AsyncIterable, AsyncIterator, Iterable, Iterator, Generic, MutableMapping, Type, TypeVar, TYPE_CHECKING -import copy +from typing import ( + Any, + AsyncIterable, + AsyncIterator, + Iterable, + Iterator, + Generic, + MutableMapping, + Type, + TypeVar, + TYPE_CHECKING, +) from google.cloud.firestore_v1 import _helpers from google.cloud.firestore_v1.field_path import get_nested_value from google.cloud.firestore_v1.field_path import FieldPath @@ -37,6 +47,7 @@ from google.cloud.firestore_v1.async_pipeline import AsyncPipeline from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.pipeline import Pipeline + from google.cloud.firestore_v1.pipeline_expressions import Constant from google.cloud.firestore_v1.query_profile import ExplainOptions @@ -179,7 +190,10 @@ def __init__( self._explain_options: ExplainOptions | None = explain_options self._return_type = return_type self._index_mode = index_mode - self._additonal_options = {k: v if isinstance(v, Value) else v._to_pb() for k,v in additional_options.items()} + self._additonal_options = { + k: v if isinstance(v, Value) else v._to_pb() + for k, v in additional_options.items() + } @property def explain_stats(self) -> ExplainStats: @@ -188,7 +202,9 @@ def explain_stats(self) -> ExplainStats: elif self._explain_options is None: raise QueryExplainError("explain_options not set on query.") elif not self._started: - raise QueryExplainError("explain_stats not available until query is complete") + raise QueryExplainError( + "explain_stats not available until query is complete" + ) else: raise QueryExplainError("explain_stats not found") @@ -214,14 +230,14 @@ def _build_request(self) -> ExecutePipelineRequest: request = ExecutePipelineRequest( database=database_name, transaction=transaction_id, - structured_pipeline=self.pipeline._to_pb(**options) + structured_pipeline=self.pipeline._to_pb(**options), ) return request def _process_response(self, response: ExecutePipelineResponse) -> Iterable[T]: """Shared logic for processing an individual response from a stream""" if response.explain_stats: - self._explain_stats = ExplainStats(response.explain_stats) + self._explain_stats = ExplainStats(response.explain_stats) execution_time = response._pb.execution_time if execution_time and not self.execution_time: self.execution_time = execution_time @@ -241,6 +257,7 @@ class PipelineSnapshot(_PipelineResultContainer[T], list[T]): """ A list type that holds the result of a pipeline.execute() operation, along with related metadata """ + def __init__(self, results_list: list[T], source: _PipelineResultContainer[T]): self.__dict__.update(source.__dict__.copy()) list.__init__(self, results_list) @@ -262,6 +279,7 @@ def __iter__(self) -> Iterator[T]: for response in stream: yield from self._process_response(response) + class AsyncPipelineStream(_PipelineResultContainer[T], AsyncIterable[T]): """ An iterable stream representing the result of an async pipeline.stream() operation, along with related metadata @@ -275,4 +293,4 @@ async def __aiter__(self) -> AsyncIterator[T]: stream = await self._client._firestore_api.execute_pipeline(request) async for response in stream: for result in self._process_response(response): - yield result \ No newline at end of file + yield result diff --git a/google/cloud/firestore_v1/query_profile.py b/google/cloud/firestore_v1/query_profile.py index 0979632e6d..0d55cfd986 100644 --- a/google/cloud/firestore_v1/query_profile.py +++ b/google/cloud/firestore_v1/query_profile.py @@ -21,9 +21,12 @@ from google.protobuf.json_format import MessageToDict from google.cloud.firestore_v1.types.document import MapValue from google.cloud.firestore_v1.types.document import Value -from google.cloud.firestore_v1.types.explain_stats import ExplainStats as ExplainStats_pb +from google.cloud.firestore_v1.types.explain_stats import ( + ExplainStats as ExplainStats_pb, +) from google.protobuf.wrappers_pb2 import StringValue + @dataclass(frozen=True) class ExplainOptions: """ @@ -152,6 +155,7 @@ class QueryExplainError(Exception): pass + class ExplainStats: """ Contains query profiling statistics for a pipeline query. @@ -196,4 +200,4 @@ def get_raw(self) -> ExplainStats_pb: Returns: google.cloud.firestore_v1.types.explain_stats.ExplainStats: the proto from the backend """ - return self._stats_pb \ No newline at end of file + return self._stats_pb diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 9c57d4aa54..172a37f46e 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -1657,13 +1657,10 @@ def test_query_stream_or_get_w_explain_options_analyze_false( ) @pytest.mark.parametrize("method", ["execute", "stream"]) @pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) -def test_pipeline_explain_options_explain_mode( - database, method, query_docs -): +def test_pipeline_explain_options_explain_mode(database, method, query_docs): """Explain currently not supported by backend. Expect error""" from google.cloud.firestore_v1.query_profile import ( ExplainOptions, - ExplainStats, ) collection, _, _ = query_docs @@ -1677,7 +1674,7 @@ def test_pipeline_explain_options_explain_mode( with pytest.raises(InvalidArgument) as e: results = method_under_test(explain_options=explain_options) list(results) - assert"Explain execution mode is not supported" in str(e) + assert "Explain execution mode is not supported" in str(e) @pytest.mark.skipif( @@ -1685,15 +1682,15 @@ def test_pipeline_explain_options_explain_mode( ) @pytest.mark.parametrize("method", ["execute", "stream"]) @pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) -def test_pipeline_explain_options_analyze_mode( - database, method, query_docs -): +def test_pipeline_explain_options_analyze_mode(database, method, query_docs): from google.cloud.firestore_v1.query_profile import ( ExplainOptions, ExplainStats, QueryExplainError, ) - from google.cloud.firestore_v1.types.explain_stats import ExplainStats as ExplainStats_pb + from google.cloud.firestore_v1.types.explain_stats import ( + ExplainStats as ExplainStats_pb, + ) collection, _, allowed_vals = query_docs pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() @@ -1736,9 +1733,10 @@ def test_pipeline_explain_options_using_additional_options( from google.cloud.firestore_v1.query_profile import ( ExplainOptions, ExplainStats, - QueryExplainError, ) - from google.cloud.firestore_v1.types.explain_stats import ExplainStats as ExplainStats_pb + from google.cloud.firestore_v1.types.explain_stats import ( + ExplainStats as ExplainStats_pb, + ) collection, _, allowed_vals = query_docs pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() @@ -1748,7 +1746,9 @@ def test_pipeline_explain_options_using_additional_options( encoded_options = {"explain_options": ExplainOptions(analyze=True)._to_value()} - results = method_under_test(explain_options=mock.Mock(), additional_options=encoded_options) + results = method_under_test( + explain_options=mock.Mock(), additional_options=encoded_options + ) # Finish iterating results, and explain_stats should be available. results_list = list(results) @@ -1763,13 +1763,12 @@ def test_pipeline_explain_options_using_additional_options( text_stats = explain_stats.get_text() assert "Execution:" in text_stats + @pytest.mark.skipif( FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." ) @pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) -def test_pipeline_index_mode( - database, query_docs -): +def test_pipeline_index_mode(database, query_docs): """test pipeline query with explicit index mode""" collection, _, allowed_vals = query_docs diff --git a/tests/system/test_system_async.py b/tests/system/test_system_async.py index 891f78ddf4..2a533ea7d9 100644 --- a/tests/system/test_system_async.py +++ b/tests/system/test_system_async.py @@ -1578,9 +1578,7 @@ async def test_query_stream_or_get_w_explain_options_analyze_false( ) @pytest.mark.parametrize("method", ["execute", "stream"]) @pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) -async def test_pipeline_explain_options_explain_mode( - database, method, query_docs -): +async def test_pipeline_explain_options_explain_mode(database, method, query_docs): """Explain currently not supported by backend. Expect error""" from google.api_core.exceptions import InvalidArgument from google.cloud.firestore_v1.query_profile import ( @@ -1608,9 +1606,7 @@ async def test_pipeline_explain_options_explain_mode( ) @pytest.mark.parametrize("method", ["execute", "stream"]) @pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) -async def test_pipeline_explain_options_analyze_mode( - database, method, query_docs -): +async def test_pipeline_explain_options_analyze_mode(database, method, query_docs): from google.cloud.firestore_v1.query_profile import ( ExplainOptions, ExplainStats, @@ -1648,6 +1644,7 @@ async def test_pipeline_explain_options_analyze_mode( text_stats = explain_stats.get_text() assert "Execution:" in text_stats + @pytest.mark.skipif( FIRESTORE_EMULATOR, reason="Query profile not supported in emulator." ) @@ -1660,7 +1657,6 @@ async def test_pipeline_explain_options_using_additional_options( from google.cloud.firestore_v1.query_profile import ( ExplainOptions, ExplainStats, - QueryExplainError, ) from google.cloud.firestore_v1.types.explain_stats import ( ExplainStats as ExplainStats_pb, @@ -1672,7 +1668,9 @@ async def test_pipeline_explain_options_using_additional_options( method_under_test = getattr(pipeline, method) encoded_options = {"explain_options": ExplainOptions(analyze=True)._to_value()} - stub = method_under_test(explain_options=mock.Mock(), additional_options=encoded_options) + stub = method_under_test( + explain_options=mock.Mock(), additional_options=encoded_options + ) if method == "execute": results = await stub num_results = len(results) diff --git a/tests/unit/v1/test_pipeline.py b/tests/unit/v1/test_pipeline.py index e8ac437893..6b16c591f7 100644 --- a/tests/unit/v1/test_pipeline.py +++ b/tests/unit/v1/test_pipeline.py @@ -107,7 +107,6 @@ def test_pipeline__to_pb_with_options(): assert pb.options["option_1"].integer_value == 1 - def test_pipeline_append(): """append should create a new pipeline with the additional stage""" diff --git a/tests/unit/v1/test_pipeline_result.py b/tests/unit/v1/test_pipeline_result.py index 36d9ed80bd..10fd3fffdf 100644 --- a/tests/unit/v1/test_pipeline_result.py +++ b/tests/unit/v1/test_pipeline_result.py @@ -30,19 +30,16 @@ _mock_stream_responses = [ ExecutePipelineResponse( - results=[ - Document(name="projects/p/databases/d/documents/c/d1", fields={}) - ], + results=[Document(name="projects/p/databases/d/documents/c/d1", fields={})], execution_time=Timestamp(seconds=1, nanos=2), ), ExecutePipelineResponse( - results=[ - Document(name="projects/p/databases/d/documents/c/d2", fields={}) - ], + results=[Document(name="projects/p/databases/d/documents/c/d2", fields={})], execution_time=Timestamp(seconds=3, nanos=4), ), ] + class TestPipelineResult: def _make_one(self, *args, **kwargs): if not args: @@ -200,8 +197,8 @@ def test_get_call(self): decode_mock.assert_called_once_with("value", client) assert got == decode_mock.return_value -class TestPipelineSnapshot: +class TestPipelineSnapshot: def _make_one(self, *args, **kwargs): if not args: # use defaults if not passed @@ -214,9 +211,16 @@ def test_ctor(self): expected_pipeline = mock.Mock() expected_transaction = object() expected_explain_options = object() - expected_index_mode = "mode" + expected_index_mode = "mode" expected_addtl_options = {} - source = PipelineStream(expected_type, expected_pipeline, expected_transaction, expected_explain_options, expected_index_mode, expected_addtl_options) + source = PipelineStream( + expected_type, + expected_pipeline, + expected_transaction, + expected_explain_options, + expected_index_mode, + expected_addtl_options, + ) instance = self._make_one(in_arr, source) assert instance._return_type == expected_type assert instance.pipeline == expected_pipeline @@ -253,10 +257,12 @@ def test_explain_stats(self): instance.explain_stats assert "explain_stats not found" in str(e) + class SharedStreamTests: """ Shared test logic for PipelineStream and AsyncPipelineStream """ + def _make_one(self, *args, **kwargs): raise NotImplementedError @@ -284,20 +290,48 @@ def test_explain_stats(self): instance.explain_stats assert "not available until query is complete" in str(e) - @pytest.mark.parametrize("init_kwargs,expected_options", [ - ({"index_mode": "mode"}, {"index_mode": encode_value("mode")}), - ({"explain_options": ExplainOptions(analyze=True)}, {"explain_options": encode_value({"mode": "analyze"})}), - ({"explain_options": ExplainOptions(analyze=False)}, {"explain_options": encode_value({"mode": "explain"})}), - ({"additional_options": {"explain_options": Constant("custom")}}, {"explain_options": encode_value("custom")}), - ({"additional_options": {"explain_options": encode_value("custom")}}, {"explain_options": encode_value("custom")}), - ({"explain_options": ExplainOptions(), "additional_options": {"explain_options": Constant.of("override")}}, {"explain_options": encode_value("override")}), - ({"index_mode": "mode", "additional_options": {"index_mode": Constant("new")}}, {"index_mode": encode_value("new")}), - ]) + @pytest.mark.parametrize( + "init_kwargs,expected_options", + [ + ({"index_mode": "mode"}, {"index_mode": encode_value("mode")}), + ( + {"explain_options": ExplainOptions(analyze=True)}, + {"explain_options": encode_value({"mode": "analyze"})}, + ), + ( + {"explain_options": ExplainOptions(analyze=False)}, + {"explain_options": encode_value({"mode": "explain"})}, + ), + ( + {"additional_options": {"explain_options": Constant("custom")}}, + {"explain_options": encode_value("custom")}, + ), + ( + {"additional_options": {"explain_options": encode_value("custom")}}, + {"explain_options": encode_value("custom")}, + ), + ( + { + "explain_options": ExplainOptions(), + "additional_options": {"explain_options": Constant.of("override")}, + }, + {"explain_options": encode_value("override")}, + ), + ( + { + "index_mode": "mode", + "additional_options": {"index_mode": Constant("new")}, + }, + {"index_mode": encode_value("new")}, + ), + ], + ) def test_build_request_options(self, init_kwargs, expected_options): """ Certain Arguments to PipelineStream should be passed to `options` field in proto request """ from google.cloud.firestore_v1.pipeline import Pipeline + option_kwargs = { "explain_options": None, "index_mode": None, @@ -324,6 +358,7 @@ def test_build_request_transaction(self): request = instance._build_request() assert request.transaction == expected_id + class TestPipelineStream(SharedStreamTests): def _make_one(self, *args, **kwargs): if not args: @@ -359,12 +394,16 @@ def test_iter(self): pipeline = mock.Mock() pipeline._client.project = "project-id" pipeline._client._database = "database-id" - pipeline._client.document.side_effect = lambda path: mock.Mock(id=path.split("/")[-1]) + pipeline._client.document.side_effect = lambda path: mock.Mock( + id=path.split("/")[-1] + ) pipeline._to_pb.return_value = {} instance = self._make_one(PipelineResult, pipeline, None, None, None, {}) - instance._client._firestore_api.execute_pipeline.return_value = _mock_stream_responses + instance._client._firestore_api.execute_pipeline.return_value = ( + _mock_stream_responses + ) results = list(instance) @@ -398,13 +437,14 @@ def _make_one(self, *args, **kwargs): args = [PipelineResult, mock.Mock(), None, None, None, {}] return AsyncPipelineStream(*args, **kwargs) - @pytest.mark.asyncio async def test_aiter(self): pipeline = mock.Mock() pipeline._client.project = "project-id" pipeline._client._database = "database-id" - pipeline._client.document.side_effect = lambda path: mock.Mock(id=path.split("/")[-1]) + pipeline._client.document.side_effect = lambda path: mock.Mock( + id=path.split("/")[-1] + ) pipeline._to_pb.return_value = {} instance = self._make_one(PipelineResult, pipeline, None, None, None, {}) diff --git a/tests/unit/v1/test_query_profile.py b/tests/unit/v1/test_query_profile.py index 9c7e211948..840d7868ab 100644 --- a/tests/unit/v1/test_query_profile.py +++ b/tests/unit/v1/test_query_profile.py @@ -125,9 +125,10 @@ def test_explain_options__to_dict(): assert ExplainOptions(analyze=True)._to_dict() == {"analyze": True} assert ExplainOptions(analyze=False)._to_dict() == {"analyze": False} -@pytest.mark.parametrize("analyze_bool,expected_str", [ - (True, "analyze"), (False, "explain") -]) + +@pytest.mark.parametrize( + "analyze_bool,expected_str", [(True, "analyze"), (False, "explain")] +) def test_explain_options__to_value(analyze_bool, expected_str): """ Should be able to create a Value protobuf representation of ExplainOptions @@ -180,8 +181,6 @@ def test_explain_stats_get_text_error(): QueryExplainError, ) from google.cloud.firestore_v1.types import explain_stats as explain_stats_pb2 - from google.cloud.firestore_v1.types import document - from google.protobuf import any_pb2 expected_stats_pb = explain_stats_pb2.ExplainStats(data={}) stats = ExplainStats(expected_stats_pb) From e2fc5ce073411d686f3560e8b3d123d1935c7fc5 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 7 Nov 2025 10:45:09 -0800 Subject: [PATCH 18/20] updated docstrings --- google/cloud/firestore_v1/pipeline_result.py | 2 +- google/cloud/firestore_v1/query_profile.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index d80b9c3b26..79a2c3b284 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -168,7 +168,7 @@ def get(self, field_path: str | FieldPath) -> Any: class _PipelineResultContainer(Generic[T]): - """Helper to hold shared attributes for PipelineSnapshot and PipelineStream""" + """Base class to hold shared attributes for PipelineSnapshot and PipelineStream""" def __init__( self, diff --git a/google/cloud/firestore_v1/query_profile.py b/google/cloud/firestore_v1/query_profile.py index 0d55cfd986..395c959ada 100644 --- a/google/cloud/firestore_v1/query_profile.py +++ b/google/cloud/firestore_v1/query_profile.py @@ -176,13 +176,17 @@ def __init__(self, stats_pb: ExplainStats_pb): def get_text(self) -> str: """ - Returns the explain stats string verbatim as returned from the Firestore backend. + Returns the explain stats as a string. + + This method is suitable for explain formats that have a text-based output, + such as 'text' or 'json'. Returns: - str: the string representation of the explain_stats object + str: The string representation of the explain stats. Raises: - QueryExplainError: If the explain stats cannot be parsed. + QueryExplainError: If the explain stats payload from the backend is not + a string. This can happen if a non-text output format was requested. """ pb_data = self._stats_pb._pb.data content = StringValue() From 18a78d32e55d2ee18ddde9b1bfa4bc300ad5aac8 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 7 Nov 2025 12:48:59 -0800 Subject: [PATCH 19/20] use custom type for PipelineExplainOptions --- google/cloud/firestore_v1/async_pipeline.py | 10 ++++---- google/cloud/firestore_v1/pipeline.py | 10 ++++---- google/cloud/firestore_v1/pipeline_result.py | 6 ++--- google/cloud/firestore_v1/query_profile.py | 25 ++++++++++++++++++-- tests/system/test_system.py | 12 +++++----- tests/system/test_system_async.py | 12 +++++----- tests/unit/v1/test_pipeline_result.py | 8 +++---- tests/unit/v1/test_query_profile.py | 12 ++++------ 8 files changed, 57 insertions(+), 38 deletions(-) diff --git a/google/cloud/firestore_v1/async_pipeline.py b/google/cloud/firestore_v1/async_pipeline.py index 488a0f3263..3eebf132d1 100644 --- a/google/cloud/firestore_v1/async_pipeline.py +++ b/google/cloud/firestore_v1/async_pipeline.py @@ -25,7 +25,7 @@ from google.cloud.firestore_v1.async_transaction import AsyncTransaction from google.cloud.firestore_v1.pipeline_expressions import Constant from google.cloud.firestore_v1.types.document import Value - from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.query_profile import PipelineExplainOptions class AsyncPipeline(_BasePipeline): @@ -65,7 +65,7 @@ async def execute( self, *, transaction: "AsyncTransaction" | None = None, - explain_options: ExplainOptions | None = None, + explain_options: PipelineExplainOptions | None = None, index_mode: str | None = None, additional_options: dict[str, Value | Constant] = {}, ) -> PipelineSnapshot[PipelineResult]: @@ -78,7 +78,7 @@ async def execute( If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). - explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. @@ -95,7 +95,7 @@ def stream( self, *, transaction: "AsyncTransaction" | None = None, - explain_options: ExplainOptions | None = None, + explain_options: PipelineExplainOptions | None = None, index_mode: str | None = None, additional_options: dict[str, Value | Constant] = {}, ) -> AsyncPipelineStream[PipelineResult]: @@ -108,7 +108,7 @@ def stream( If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). - explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. diff --git a/google/cloud/firestore_v1/pipeline.py b/google/cloud/firestore_v1/pipeline.py index c65ac3f20a..b9976e4347 100644 --- a/google/cloud/firestore_v1/pipeline.py +++ b/google/cloud/firestore_v1/pipeline.py @@ -25,7 +25,7 @@ from google.cloud.firestore_v1.pipeline_expressions import Constant from google.cloud.firestore_v1.transaction import Transaction from google.cloud.firestore_v1.types.document import Value - from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.query_profile import PipelineExplainOptions class Pipeline(_BasePipeline): @@ -62,7 +62,7 @@ def execute( self, *, transaction: "Transaction" | None = None, - explain_options: ExplainOptions | None = None, + explain_options: PipelineExplainOptions | None = None, index_mode: str | None = None, additional_options: dict[str, Value | Constant] = {}, ) -> PipelineSnapshot[PipelineResult]: @@ -75,7 +75,7 @@ def execute( If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). - explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned list. index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. @@ -92,7 +92,7 @@ def stream( self, *, transaction: "Transaction" | None = None, - explain_options: ExplainOptions | None = None, + explain_options: PipelineExplainOptions | None = None, index_mode: str | None = None, additional_options: dict[str, Value | Constant] = {}, ) -> PipelineStream[PipelineResult]: @@ -105,7 +105,7 @@ def stream( If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). - explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): + explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present. diff --git a/google/cloud/firestore_v1/pipeline_result.py b/google/cloud/firestore_v1/pipeline_result.py index 79a2c3b284..3873f1360d 100644 --- a/google/cloud/firestore_v1/pipeline_result.py +++ b/google/cloud/firestore_v1/pipeline_result.py @@ -48,7 +48,7 @@ from google.cloud.firestore_v1.base_pipeline import _BasePipeline from google.cloud.firestore_v1.pipeline import Pipeline from google.cloud.firestore_v1.pipeline_expressions import Constant - from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.query_profile import PipelineExplainOptions class PipelineResult: @@ -175,7 +175,7 @@ def __init__( return_type: Type[T], pipeline: Pipeline | AsyncPipeline, transaction: Transaction | AsyncTransaction | None, - explain_options: ExplainOptions | None, + explain_options: PipelineExplainOptions | None, index_mode: str | None, additional_options: dict[str, Constant | Value], ): @@ -187,7 +187,7 @@ def __init__( self._client: Client | AsyncClient = pipeline._client self._started: bool = False self._explain_stats: ExplainStats | None = None - self._explain_options: ExplainOptions | None = explain_options + self._explain_options: PipelineExplainOptions | None = explain_options self._return_type = return_type self._index_mode = index_mode self._additonal_options = { diff --git a/google/cloud/firestore_v1/query_profile.py b/google/cloud/firestore_v1/query_profile.py index 395c959ada..5e8491fc6b 100644 --- a/google/cloud/firestore_v1/query_profile.py +++ b/google/cloud/firestore_v1/query_profile.py @@ -47,9 +47,30 @@ class ExplainOptions: def _to_dict(self): return {"analyze": self.analyze} + +@dataclass(frozen=True) +class PipelineExplainOptions: + """ + Explain options for pipeline queries. + + Set on a pipeline.execution() or pipeline.stream() call, to provide + explain_stats in the pipeline output + + :type mode: str + :param mode: Optional. The mode of operation for this explain query. + When set to 'analyze', the query will be executed and return the full + query results along with execution statistics. + + :type output_format: str | None + :param output_format: Optional. The format in which to return the explain + stats. + """ + + mode: str = "analyze" + def _to_value(self): - mode_str = "analyze" if self.analyze else "explain" - value_pb = MapValue(fields={"mode": Value(string_value=mode_str)}) + out_dict = {"mode": Value(string_value=self.mode)} + value_pb = MapValue(fields=out_dict) return Value(map_value=value_pb) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 172a37f46e..9bea54f33f 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -1660,7 +1660,7 @@ def test_query_stream_or_get_w_explain_options_analyze_false( def test_pipeline_explain_options_explain_mode(database, method, query_docs): """Explain currently not supported by backend. Expect error""" from google.cloud.firestore_v1.query_profile import ( - ExplainOptions, + PipelineExplainOptions, ) collection, _, _ = query_docs @@ -1668,7 +1668,7 @@ def test_pipeline_explain_options_explain_mode(database, method, query_docs): # Tests either `execute()` or `stream()`. method_under_test = getattr(pipeline, method) - explain_options = ExplainOptions(analyze=False) + explain_options = PipelineExplainOptions(mode="explain") # for now, expect error on explain mode with pytest.raises(InvalidArgument) as e: @@ -1684,7 +1684,7 @@ def test_pipeline_explain_options_explain_mode(database, method, query_docs): @pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) def test_pipeline_explain_options_analyze_mode(database, method, query_docs): from google.cloud.firestore_v1.query_profile import ( - ExplainOptions, + PipelineExplainOptions, ExplainStats, QueryExplainError, ) @@ -1697,7 +1697,7 @@ def test_pipeline_explain_options_analyze_mode(database, method, query_docs): # Tests either `execute()` or `stream()`. method_under_test = getattr(pipeline, method) - results = method_under_test(explain_options=ExplainOptions(analyze=True)) + results = method_under_test(explain_options=PipelineExplainOptions()) if method == "stream": # check for error accessing explain stats before iterating @@ -1731,7 +1731,7 @@ def test_pipeline_explain_options_using_additional_options( ): """additional_options field allows passing in arbitrary options. Test with explain_options""" from google.cloud.firestore_v1.query_profile import ( - ExplainOptions, + PipelineExplainOptions, ExplainStats, ) from google.cloud.firestore_v1.types.explain_stats import ( @@ -1744,7 +1744,7 @@ def test_pipeline_explain_options_using_additional_options( # Tests either `execute()` or `stream()`. method_under_test = getattr(pipeline, method) - encoded_options = {"explain_options": ExplainOptions(analyze=True)._to_value()} + encoded_options = {"explain_options": PipelineExplainOptions()._to_value()} results = method_under_test( explain_options=mock.Mock(), additional_options=encoded_options diff --git a/tests/system/test_system_async.py b/tests/system/test_system_async.py index 2a533ea7d9..21b4383b59 100644 --- a/tests/system/test_system_async.py +++ b/tests/system/test_system_async.py @@ -1582,14 +1582,14 @@ async def test_pipeline_explain_options_explain_mode(database, method, query_doc """Explain currently not supported by backend. Expect error""" from google.api_core.exceptions import InvalidArgument from google.cloud.firestore_v1.query_profile import ( - ExplainOptions, + PipelineExplainOptions, ) collection, _, _ = query_docs pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() method_under_test = getattr(pipeline, method) - explain_options = ExplainOptions(analyze=False) + explain_options = PipelineExplainOptions(mode="explain") with pytest.raises(InvalidArgument) as e: if method == "stream": @@ -1608,7 +1608,7 @@ async def test_pipeline_explain_options_explain_mode(database, method, query_doc @pytest.mark.parametrize("database", [FIRESTORE_ENTERPRISE_DB], indirect=True) async def test_pipeline_explain_options_analyze_mode(database, method, query_docs): from google.cloud.firestore_v1.query_profile import ( - ExplainOptions, + PipelineExplainOptions, ExplainStats, QueryExplainError, ) @@ -1620,7 +1620,7 @@ async def test_pipeline_explain_options_analyze_mode(database, method, query_doc pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() method_under_test = getattr(pipeline, method) - explain_options = ExplainOptions(analyze=True) + explain_options = PipelineExplainOptions() if method == "execute": results = await method_under_test(explain_options=explain_options) @@ -1655,7 +1655,7 @@ async def test_pipeline_explain_options_using_additional_options( ): """additional_options field allows passing in arbitrary options. Test with explain_options""" from google.cloud.firestore_v1.query_profile import ( - ExplainOptions, + PipelineExplainOptions, ExplainStats, ) from google.cloud.firestore_v1.types.explain_stats import ( @@ -1666,7 +1666,7 @@ async def test_pipeline_explain_options_using_additional_options( pipeline = collection.where(filter=FieldFilter("a", "==", 1)).pipeline() method_under_test = getattr(pipeline, method) - encoded_options = {"explain_options": ExplainOptions(analyze=True)._to_value()} + encoded_options = {"explain_options": PipelineExplainOptions()._to_value()} stub = method_under_test( explain_options=mock.Mock(), additional_options=encoded_options diff --git a/tests/unit/v1/test_pipeline_result.py b/tests/unit/v1/test_pipeline_result.py index 10fd3fffdf..e4e7e70386 100644 --- a/tests/unit/v1/test_pipeline_result.py +++ b/tests/unit/v1/test_pipeline_result.py @@ -22,7 +22,7 @@ from google.cloud.firestore_v1.pipeline_result import PipelineStream from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream from google.cloud.firestore_v1.query_profile import QueryExplainError -from google.cloud.firestore_v1.query_profile import ExplainOptions +from google.cloud.firestore_v1.query_profile import PipelineExplainOptions from google.cloud.firestore_v1._helpers import encode_value from google.cloud.firestore_v1.types.document import Document from google.protobuf.timestamp_pb2 import Timestamp @@ -295,11 +295,11 @@ def test_explain_stats(self): [ ({"index_mode": "mode"}, {"index_mode": encode_value("mode")}), ( - {"explain_options": ExplainOptions(analyze=True)}, + {"explain_options": PipelineExplainOptions()}, {"explain_options": encode_value({"mode": "analyze"})}, ), ( - {"explain_options": ExplainOptions(analyze=False)}, + {"explain_options": PipelineExplainOptions(mode="explain")}, {"explain_options": encode_value({"mode": "explain"})}, ), ( @@ -312,7 +312,7 @@ def test_explain_stats(self): ), ( { - "explain_options": ExplainOptions(), + "explain_options": PipelineExplainOptions(), "additional_options": {"explain_options": Constant.of("override")}, }, {"explain_options": encode_value("override")}, diff --git a/tests/unit/v1/test_query_profile.py b/tests/unit/v1/test_query_profile.py index 840d7868ab..5b1e470b8e 100644 --- a/tests/unit/v1/test_query_profile.py +++ b/tests/unit/v1/test_query_profile.py @@ -126,20 +126,18 @@ def test_explain_options__to_dict(): assert ExplainOptions(analyze=False)._to_dict() == {"analyze": False} -@pytest.mark.parametrize( - "analyze_bool,expected_str", [(True, "analyze"), (False, "explain")] -) -def test_explain_options__to_value(analyze_bool, expected_str): +@pytest.mark.parametrize("mode_str", ["analyze", "explain"]) +def test_pipeline_explain_options__to_value(mode_str): """ Should be able to create a Value protobuf representation of ExplainOptions """ - from google.cloud.firestore_v1.query_profile import ExplainOptions + from google.cloud.firestore_v1.query_profile import PipelineExplainOptions from google.cloud.firestore_v1.types.document import MapValue from google.cloud.firestore_v1.types.document import Value - options = ExplainOptions(analyze=analyze_bool) + options = PipelineExplainOptions(mode=mode_str) expected_value = Value( - map_value=MapValue(fields={"mode": Value(string_value=expected_str)}) + map_value=MapValue(fields={"mode": Value(string_value=mode_str)}) ) assert options._to_value() == expected_value From 6a40bf345c175a6eb30e0c8e7a4be11c4558f7c3 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 10 Nov 2025 15:42:24 -0800 Subject: [PATCH 20/20] add check for explain_stats --- tests/unit/v1/test_pipeline_result.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/unit/v1/test_pipeline_result.py b/tests/unit/v1/test_pipeline_result.py index 23e8fa4b79..5799927410 100644 --- a/tests/unit/v1/test_pipeline_result.py +++ b/tests/unit/v1/test_pipeline_result.py @@ -32,6 +32,7 @@ ExecutePipelineResponse( results=[Document(name="projects/p/databases/d/documents/c/d1", fields={})], execution_time=Timestamp(seconds=1, nanos=2), + explain_stats={"data": {}}, ), ExecutePipelineResponse( results=[Document(name="projects/p/databases/d/documents/c/d2", fields={})], @@ -431,6 +432,10 @@ def test_iter(self): assert instance.execution_time.seconds == 1 assert instance.execution_time.nanos == 2 + # expect empty stats + got_stats = instance.explain_stats.get_raw().data + assert got_stats.value == b"" + instance._client._firestore_api.execute_pipeline.assert_called_once() def test_double_iterate(self): @@ -479,6 +484,10 @@ async def async_gen(items): assert instance.execution_time.seconds == 1 assert instance.execution_time.nanos == 2 + # expect empty stats + got_stats = instance.explain_stats.get_raw().data + assert got_stats.value == b"" + instance._client._firestore_api.execute_pipeline.assert_called_once() @pytest.mark.asyncio