Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 9b81289

Browse files
feat: implement mutate rows (#769)
1 parent 3de7a68 commit 9b81289

15 files changed

Lines changed: 2596 additions & 114 deletions

google/cloud/bigtable/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
from google.cloud.bigtable.mutations_batcher import MutationsBatcher
3030
from google.cloud.bigtable.mutations import Mutation
31-
from google.cloud.bigtable.mutations import BulkMutationsEntry
31+
from google.cloud.bigtable.mutations import RowMutationEntry
3232
from google.cloud.bigtable.mutations import SetCell
3333
from google.cloud.bigtable.mutations import DeleteRangeFromColumn
3434
from google.cloud.bigtable.mutations import DeleteAllFromFamily
@@ -47,7 +47,7 @@
4747
"RowRange",
4848
"MutationsBatcher",
4949
"Mutation",
50-
"BulkMutationsEntry",
50+
"RowMutationEntry",
5151
"SetCell",
5252
"DeleteRangeFromColumn",
5353
"DeleteAllFromFamily",

google/cloud/bigtable/_helpers.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
#
14+
from __future__ import annotations
15+
16+
from typing import Callable, Any
17+
from inspect import iscoroutinefunction
18+
import time
19+
20+
from google.api_core import exceptions as core_exceptions
21+
from google.cloud.bigtable.exceptions import RetryExceptionGroup
22+
23+
"""
24+
Helper functions used in various places in the library.
25+
"""
26+
27+
28+
def _make_metadata(
29+
table_name: str, app_profile_id: str | None
30+
) -> list[tuple[str, str]]:
31+
"""
32+
Create properly formatted gRPC metadata for requests.
33+
"""
34+
params = []
35+
params.append(f"table_name={table_name}")
36+
if app_profile_id is not None:
37+
params.append(f"app_profile_id={app_profile_id}")
38+
params_str = ",".join(params)
39+
return [("x-goog-request-params", params_str)]
40+
41+
42+
def _attempt_timeout_generator(
43+
per_request_timeout: float | None, operation_timeout: float
44+
):
45+
"""
46+
Generator that yields the timeout value for each attempt of a retry loop.
47+
48+
Will return per_request_timeout until the operation_timeout is approached,
49+
at which point it will return the remaining time in the operation_timeout.
50+
51+
Args:
52+
- per_request_timeout: The timeout value to use for each request, in seconds.
53+
If None, the operation_timeout will be used for each request.
54+
- operation_timeout: The timeout value to use for the entire operationm in seconds.
55+
Yields:
56+
- The timeout value to use for the next request, in seonds
57+
"""
58+
per_request_timeout = (
59+
per_request_timeout if per_request_timeout is not None else operation_timeout
60+
)
61+
deadline = operation_timeout + time.monotonic()
62+
while True:
63+
yield max(0, min(per_request_timeout, deadline - time.monotonic()))
64+
65+
66+
def _convert_retry_deadline(
67+
func: Callable[..., Any],
68+
timeout_value: float | None = None,
69+
retry_errors: list[Exception] | None = None,
70+
):
71+
"""
72+
Decorator to convert RetryErrors raised by api_core.retry into
73+
DeadlineExceeded exceptions, indicating that the underlying retries have
74+
exhaused the timeout value.
75+
Optionally attaches a RetryExceptionGroup to the DeadlineExceeded.__cause__,
76+
detailing the failed exceptions associated with each retry.
77+
78+
Supports both sync and async function wrapping.
79+
80+
Args:
81+
- func: The function to decorate
82+
- timeout_value: The timeout value to display in the DeadlineExceeded error message
83+
- retry_errors: An optional list of exceptions to attach as a RetryExceptionGroup to the DeadlineExceeded.__cause__
84+
"""
85+
timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else ""
86+
error_str = f"operation_timeout{timeout_str} exceeded"
87+
88+
def handle_error():
89+
new_exc = core_exceptions.DeadlineExceeded(
90+
error_str,
91+
)
92+
source_exc = None
93+
if retry_errors:
94+
source_exc = RetryExceptionGroup(retry_errors)
95+
new_exc.__cause__ = source_exc
96+
raise new_exc from source_exc
97+
98+
# separate wrappers for async and sync functions
99+
async def wrapper_async(*args, **kwargs):
100+
try:
101+
return await func(*args, **kwargs)
102+
except core_exceptions.RetryError:
103+
handle_error()
104+
105+
def wrapper(*args, **kwargs):
106+
try:
107+
return func(*args, **kwargs)
108+
except core_exceptions.RetryError:
109+
handle_error()
110+
111+
return wrapper_async if iscoroutinefunction(func) else wrapper
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
from __future__ import annotations
16+
17+
from typing import TYPE_CHECKING
18+
import functools
19+
20+
from google.api_core import exceptions as core_exceptions
21+
from google.api_core import retry_async as retries
22+
import google.cloud.bigtable.exceptions as bt_exceptions
23+
from google.cloud.bigtable._helpers import _make_metadata
24+
from google.cloud.bigtable._helpers import _convert_retry_deadline
25+
from google.cloud.bigtable._helpers import _attempt_timeout_generator
26+
27+
if TYPE_CHECKING:
28+
from google.cloud.bigtable_v2.services.bigtable.async_client import (
29+
BigtableAsyncClient,
30+
)
31+
from google.cloud.bigtable.client import Table
32+
from google.cloud.bigtable.mutations import RowMutationEntry
33+
34+
35+
class _MutateRowsIncomplete(RuntimeError):
36+
"""
37+
Exception raised when a mutate_rows call has unfinished work.
38+
"""
39+
40+
pass
41+
42+
43+
class _MutateRowsOperation:
44+
"""
45+
MutateRowsOperation manages the logic of sending a set of row mutations,
46+
and retrying on failed entries. It manages this using the _run_attempt
47+
function, which attempts to mutate all outstanding entries, and raises
48+
_MutateRowsIncomplete if any retryable errors are encountered.
49+
50+
Errors are exposed as a MutationsExceptionGroup, which contains a list of
51+
exceptions organized by the related failed mutation entries.
52+
"""
53+
54+
def __init__(
55+
self,
56+
gapic_client: "BigtableAsyncClient",
57+
table: "Table",
58+
mutation_entries: list["RowMutationEntry"],
59+
operation_timeout: float,
60+
per_request_timeout: float | None,
61+
):
62+
"""
63+
Args:
64+
- gapic_client: the client to use for the mutate_rows call
65+
- table: the table associated with the request
66+
- mutation_entries: a list of RowMutationEntry objects to send to the server
67+
- operation_timeout: the timeout t o use for the entire operation, in seconds.
68+
- per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds.
69+
If not specified, the request will run until operation_timeout is reached.
70+
"""
71+
# create partial function to pass to trigger rpc call
72+
metadata = _make_metadata(table.table_name, table.app_profile_id)
73+
self._gapic_fn = functools.partial(
74+
gapic_client.mutate_rows,
75+
table_name=table.table_name,
76+
app_profile_id=table.app_profile_id,
77+
metadata=metadata,
78+
)
79+
# create predicate for determining which errors are retryable
80+
self.is_retryable = retries.if_exception_type(
81+
# RPC level errors
82+
core_exceptions.DeadlineExceeded,
83+
core_exceptions.ServiceUnavailable,
84+
# Entry level errors
85+
_MutateRowsIncomplete,
86+
)
87+
# build retryable operation
88+
retry = retries.AsyncRetry(
89+
predicate=self.is_retryable,
90+
timeout=operation_timeout,
91+
initial=0.01,
92+
multiplier=2,
93+
maximum=60,
94+
)
95+
retry_wrapped = retry(self._run_attempt)
96+
self._operation = _convert_retry_deadline(retry_wrapped, operation_timeout)
97+
# initialize state
98+
self.timeout_generator = _attempt_timeout_generator(
99+
per_request_timeout, operation_timeout
100+
)
101+
self.mutations = mutation_entries
102+
self.remaining_indices = list(range(len(self.mutations)))
103+
self.errors: dict[int, list[Exception]] = {}
104+
105+
async def start(self):
106+
"""
107+
Start the operation, and run until completion
108+
109+
Raises:
110+
- MutationsExceptionGroup: if any mutations failed
111+
"""
112+
try:
113+
# trigger mutate_rows
114+
await self._operation()
115+
except Exception as exc:
116+
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
117+
incomplete_indices = self.remaining_indices.copy()
118+
for idx in incomplete_indices:
119+
self._handle_entry_error(idx, exc)
120+
finally:
121+
# raise exception detailing incomplete mutations
122+
all_errors = []
123+
for idx, exc_list in self.errors.items():
124+
if len(exc_list) == 0:
125+
raise core_exceptions.ClientError(
126+
f"Mutation {idx} failed with no associated errors"
127+
)
128+
elif len(exc_list) == 1:
129+
cause_exc = exc_list[0]
130+
else:
131+
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
132+
entry = self.mutations[idx]
133+
all_errors.append(
134+
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
135+
)
136+
if all_errors:
137+
raise bt_exceptions.MutationsExceptionGroup(
138+
all_errors, len(self.mutations)
139+
)
140+
141+
async def _run_attempt(self):
142+
"""
143+
Run a single attempt of the mutate_rows rpc.
144+
145+
Raises:
146+
- _MutateRowsIncomplete: if there are failed mutations eligible for
147+
retry after the attempt is complete
148+
- GoogleAPICallError: if the gapic rpc fails
149+
"""
150+
request_entries = [
151+
self.mutations[idx]._to_dict() for idx in self.remaining_indices
152+
]
153+
# track mutations in this request that have not been finalized yet
154+
active_request_indices = {
155+
req_idx: orig_idx for req_idx, orig_idx in enumerate(self.remaining_indices)
156+
}
157+
self.remaining_indices = []
158+
if not request_entries:
159+
# no more mutations. return early
160+
return
161+
# make gapic request
162+
try:
163+
result_generator = await self._gapic_fn(
164+
timeout=next(self.timeout_generator),
165+
entries=request_entries,
166+
)
167+
async for result_list in result_generator:
168+
for result in result_list.entries:
169+
# convert sub-request index to global index
170+
orig_idx = active_request_indices[result.index]
171+
entry_error = core_exceptions.from_grpc_status(
172+
result.status.code,
173+
result.status.message,
174+
details=result.status.details,
175+
)
176+
if result.status.code != 0:
177+
# mutation failed; update error list (and remaining_indices if retryable)
178+
self._handle_entry_error(orig_idx, entry_error)
179+
# remove processed entry from active list
180+
del active_request_indices[result.index]
181+
except Exception as exc:
182+
# add this exception to list for each mutation that wasn't
183+
# already handled, and update remaining_indices if mutation is retryable
184+
for idx in active_request_indices.values():
185+
self._handle_entry_error(idx, exc)
186+
# bubble up exception to be handled by retry wrapper
187+
raise
188+
# check if attempt succeeded, or needs to be retried
189+
if self.remaining_indices:
190+
# unfinished work; raise exception to trigger retry
191+
raise _MutateRowsIncomplete
192+
193+
def _handle_entry_error(self, idx: int, exc: Exception):
194+
"""
195+
Add an exception to the list of exceptions for a given mutation index,
196+
and add the index to the list of remaining indices if the exception is
197+
retryable.
198+
199+
Args:
200+
- idx: the index of the mutation that failed
201+
- exc: the exception to add to the list
202+
"""
203+
entry = self.mutations[idx]
204+
self.errors.setdefault(idx, []).append(exc)
205+
if (
206+
entry.is_idempotent()
207+
and self.is_retryable(exc)
208+
and idx not in self.remaining_indices
209+
):
210+
self.remaining_indices.append(idx)

0 commit comments

Comments
 (0)