From 1ad287bb9e38401f158f44568b79025f2c365bfa Mon Sep 17 00:00:00 2001 From: Aneep Tandel Date: Fri, 22 Dec 2017 18:42:45 +0530 Subject: [PATCH 1/3] Create retry logic on MutateRow on Row.commit() --- bigtable/google/cloud/bigtable/row.py | 43 ++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/bigtable/google/cloud/bigtable/row.py b/bigtable/google/cloud/bigtable/row.py index da9678cdf892..0f113a9722e2 100644 --- a/bigtable/google/cloud/bigtable/row.py +++ b/bigtable/google/cloud/bigtable/row.py @@ -1,4 +1,4 @@ -# Copyright 2015 Google LLC +# Copyright 2015 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ import six +import concurrent.futures + from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _microseconds_from_datetime from google.cloud._helpers import _to_bytes @@ -26,6 +28,8 @@ data_pb2 as data_v2_pb2) from google.cloud.bigtable._generated import ( bigtable_pb2 as messages_v2_pb2) +from google.api_core import retry +from google.cloud import exceptions _PACK_I64 = struct.Struct('>q').pack @@ -236,6 +240,26 @@ def _delete_cells(self, column_family_id, columns, time_range=None, mutations_list.extend(to_append) +class RetryCommit: + + def __init__(self, table, request_pb): + """ + + :param table: + :param request_pb: + """ + self.table = table + self.request_pb = request_pb + + def __call__(self): + """ + + :return: raise exception for MutateRow + """ + client = self.table._instance._client + client._data_stub.MutateRow(self.request_pb) + + class DirectRow(_SetDeleteRow): """Google Cloud Bigtable Row for sending "direct" mutations. @@ -269,6 +293,7 @@ class DirectRow(_SetDeleteRow): def __init__(self, row_key, table): super(DirectRow, self).__init__(row_key, table) self._pb_mutations = [] + self.request_pb = None def _get_mutations(self, state): # pylint: disable=unused-argument """Gets the list of mutations for a given state. @@ -412,9 +437,19 @@ def commit(self): row_key=self._row_key, mutations=mutations_list, ) - # We expect a `google.protobuf.empty_pb2.Empty` - client = self._table._instance._client - client._data_stub.MutateRow(request_pb) + + retry_commit = RetryCommit(self._table, request_pb) + retry_ = retry.Retry( + predicate=retry.if_exception_type(exceptions.GrpcRendezvous), + deadline=30) + + try: + retry_(retry_commit)() + except exceptions.RetryError: + raise concurrent.futures.TimeoutError( + 'Operation did not complete within the designated ' + 'timeout.') + self.clear() def clear(self): From c569ae7991c99a4b41731bcdbafe2a78f123dc4c Mon Sep 17 00:00:00 2001 From: Aneep Tandel Date: Mon, 25 Dec 2017 09:49:28 +0530 Subject: [PATCH 2/3] Import RetryError exception on row.py --- bigtable/google/cloud/bigtable/row.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigtable/google/cloud/bigtable/row.py b/bigtable/google/cloud/bigtable/row.py index 0f113a9722e2..dbc382cb6794 100644 --- a/bigtable/google/cloud/bigtable/row.py +++ b/bigtable/google/cloud/bigtable/row.py @@ -30,6 +30,7 @@ bigtable_pb2 as messages_v2_pb2) from google.api_core import retry from google.cloud import exceptions +from google.api_core.exceptions import RetryError _PACK_I64 = struct.Struct('>q').pack @@ -445,7 +446,7 @@ def commit(self): try: retry_(retry_commit)() - except exceptions.RetryError: + except RetryError: raise concurrent.futures.TimeoutError( 'Operation did not complete within the designated ' 'timeout.') From ead703976e0c2f10faea10c0603b60372e723fe9 Mon Sep 17 00:00:00 2001 From: Aneep Tandel Date: Mon, 25 Dec 2017 09:57:21 +0530 Subject: [PATCH 3/3] Import RetryError exception on exceptions.py --- bigtable/google/cloud/bigtable/row.py | 3 +-- core/google/cloud/exceptions.py | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigtable/google/cloud/bigtable/row.py b/bigtable/google/cloud/bigtable/row.py index dbc382cb6794..0f113a9722e2 100644 --- a/bigtable/google/cloud/bigtable/row.py +++ b/bigtable/google/cloud/bigtable/row.py @@ -30,7 +30,6 @@ bigtable_pb2 as messages_v2_pb2) from google.api_core import retry from google.cloud import exceptions -from google.api_core.exceptions import RetryError _PACK_I64 = struct.Struct('>q').pack @@ -446,7 +445,7 @@ def commit(self): try: retry_(retry_commit)() - except RetryError: + except exceptions.RetryError: raise concurrent.futures.TimeoutError( 'Operation did not complete within the designated ' 'timeout.') diff --git a/core/google/cloud/exceptions.py b/core/google/cloud/exceptions.py index 36ee6d14fcab..ddfe7ec09cb3 100644 --- a/core/google/cloud/exceptions.py +++ b/core/google/cloud/exceptions.py @@ -55,5 +55,6 @@ BadGateway = exceptions.BadGateway ServiceUnavailable = exceptions.ServiceUnavailable GatewayTimeout = exceptions.GatewayTimeout +RetryError = exceptions.RetryError from_http_status = exceptions.from_http_status from_http_response = exceptions.from_http_response