Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
"""User-friendly container for Google Cloud Bigtable Table."""


from google.api_core.exceptions import Aborted
from google.api_core.exceptions import DeadlineExceeded
from google.api_core.exceptions import from_grpc_status
from google.api_core.exceptions import RetryError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
from google.cloud._helpers import _to_bytes
Expand All @@ -43,14 +39,13 @@
# google.bigtable.v2#google.bigtable.v2.MutateRowRequest)
_MAX_BULK_MUTATIONS = 100000


class _BigtableRetryableError(Exception):
"""Retry-able error expected by the default retry strategy."""


DEFAULT_RETRY = Retry(
predicate=if_exception_type(
(
Aborted,
DeadlineExceeded,
ServiceUnavailable,
),
),
predicate=if_exception_type(_BigtableRetryableError),
initial=1.0,
maximum=15.0,
multiplier=2.0,
Expand Down Expand Up @@ -425,11 +420,19 @@ def __call__(self, retry=DEFAULT_RETRY):
corresponding to success or failure of each row mutation
sent. These will be in the same order as the ``rows``.
"""
mutate_rows = self._do_mutate_retryable_rows
if retry:
mutate_rows = retry(self._do_mutate_retryable_rows)

try:
retry(self._do_mutate_retryable_rows)()
except (RetryError, ValueError) as err:
# Upon timeout or sleep generator error, return responses_statuses
mutate_rows()
except (_BigtableRetryableError, RetryError) as err:
# - _BigtableRetryableError raised when no retry strategy is used
# and a retryable error on a mutation occurred.
# - RetryError raised when retry deadline is reached.
# In both cases, just return current `responses_statuses`.
pass

return self.responses_statuses

@staticmethod
Expand All @@ -448,10 +451,8 @@ def _do_mutate_retryable_rows(self):
:class:`~google.rpc.status_pb2.Status`.
:raises: One of the following:

* :exc:`~google.api_core.exceptions.ServiceUnavailable` if any
row returned a transient error. This is "artificial" in
the sense that we intentionally raise the error because it
will be caught by the retry strategy.
* :exc:`~.table._BigtableRetryableError` if any
row returned a transient error.
* :exc:`RuntimeError` if the number of responses doesn't
match the number of rows that were retried
"""
Expand Down Expand Up @@ -485,12 +486,11 @@ def _do_mutate_retryable_rows(self):

if len(retryable_rows) != num_responses:
raise RuntimeError(
'Unexpected the number of responses', num_responses,
'Unexpected number of responses', num_responses,
'Expected', len(retryable_rows))

if num_retryable_responses:
raise from_grpc_status(StatusCode.UNAVAILABLE,
'MutateRows retryable error.')
raise _BigtableRetryableError

return self.responses_statuses

Expand Down
Loading