Skip to content

Commit 285cdd3

Browse files
feat: expose retryable error codes to users (#879)
1 parent b191451 commit 285cdd3

File tree

9 files changed

+514
-204
lines changed

9 files changed

+514
-204
lines changed

google/cloud/bigtable/data/_async/_mutate_rows.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
#
1515
from __future__ import annotations
1616

17-
from typing import TYPE_CHECKING
17+
from typing import Sequence, TYPE_CHECKING
1818
import asyncio
1919
from dataclasses import dataclass
2020
import functools
@@ -66,6 +66,7 @@ def __init__(
6666
mutation_entries: list["RowMutationEntry"],
6767
operation_timeout: float,
6868
attempt_timeout: float | None,
69+
retryable_exceptions: Sequence[type[Exception]] = (),
6970
):
7071
"""
7172
Args:
@@ -96,8 +97,7 @@ def __init__(
9697
# create predicate for determining which errors are retryable
9798
self.is_retryable = retries.if_exception_type(
9899
# RPC level errors
99-
core_exceptions.DeadlineExceeded,
100-
core_exceptions.ServiceUnavailable,
100+
*retryable_exceptions,
101101
# Entry level errors
102102
bt_exceptions._MutateRowsIncomplete,
103103
)

google/cloud/bigtable/data/_async/_read_rows.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515

1616
from __future__ import annotations
1717

18-
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable
18+
from typing import (
19+
TYPE_CHECKING,
20+
AsyncGenerator,
21+
AsyncIterable,
22+
Awaitable,
23+
Sequence,
24+
)
1925

2026
from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB
2127
from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB
@@ -74,6 +80,7 @@ def __init__(
7480
table: "TableAsync",
7581
operation_timeout: float,
7682
attempt_timeout: float,
83+
retryable_exceptions: Sequence[type[Exception]] = (),
7784
):
7885
self.attempt_timeout_gen = _attempt_timeout_generator(
7986
attempt_timeout, operation_timeout
@@ -88,11 +95,7 @@ def __init__(
8895
else:
8996
self.request = query._to_pb(table)
9097
self.table = table
91-
self._predicate = retries.if_exception_type(
92-
core_exceptions.DeadlineExceeded,
93-
core_exceptions.ServiceUnavailable,
94-
core_exceptions.Aborted,
95-
)
98+
self._predicate = retries.if_exception_type(*retryable_exceptions)
9699
self._metadata = _make_metadata(
97100
table.table_name,
98101
table.app_profile_id,

google/cloud/bigtable/data/_async/client.py

Lines changed: 108 additions & 32 deletions
Large diffs are not rendered by default.

google/cloud/bigtable/data/_async/mutations_batcher.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
#
1515
from __future__ import annotations
1616

17-
from typing import Any, TYPE_CHECKING
17+
from typing import Any, Sequence, TYPE_CHECKING
1818
import asyncio
1919
import atexit
2020
import warnings
@@ -23,6 +23,7 @@
2323
from google.cloud.bigtable.data.mutations import RowMutationEntry
2424
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
2525
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
26+
from google.cloud.bigtable.data._helpers import _get_retryable_errors
2627
from google.cloud.bigtable.data._helpers import _get_timeouts
2728
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
2829

@@ -192,6 +193,8 @@ def __init__(
192193
flow_control_max_bytes: int = 100 * _MB_SIZE,
193194
batch_operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
194195
batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
196+
batch_retryable_errors: Sequence[type[Exception]]
197+
| TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
195198
):
196199
"""
197200
Args:
@@ -208,10 +211,16 @@ def __init__(
208211
- batch_attempt_timeout: timeout for each individual request, in seconds.
209212
If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout.
210213
If None, defaults to batch_operation_timeout.
214+
- batch_retryable_errors: a list of errors that will be retried if encountered.
215+
Defaults to the Table's default_mutate_rows_retryable_errors.
211216
"""
212217
self._operation_timeout, self._attempt_timeout = _get_timeouts(
213218
batch_operation_timeout, batch_attempt_timeout, table
214219
)
220+
self._retryable_errors: list[type[Exception]] = _get_retryable_errors(
221+
batch_retryable_errors, table
222+
)
223+
215224
self.closed: bool = False
216225
self._table = table
217226
self._staged_entries: list[RowMutationEntry] = []
@@ -349,6 +358,7 @@ async def _execute_mutate_rows(
349358
batch,
350359
operation_timeout=self._operation_timeout,
351360
attempt_timeout=self._attempt_timeout,
361+
retryable_exceptions=self._retryable_errors,
352362
)
353363
await operation.start()
354364
except MutationsExceptionGroup as e:

google/cloud/bigtable/data/_helpers.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14+
"""
15+
Helper functions used in various places in the library.
16+
"""
1417
from __future__ import annotations
1518

16-
from typing import Callable, List, Tuple, Any
19+
from typing import Callable, Sequence, List, Tuple, Any, TYPE_CHECKING
1720
import time
1821
import enum
1922
from collections import namedtuple
@@ -22,6 +25,10 @@
2225
from google.api_core import exceptions as core_exceptions
2326
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
2427

28+
if TYPE_CHECKING:
29+
import grpc
30+
from google.cloud.bigtable.data import TableAsync
31+
2532
"""
2633
Helper functions used in various places in the library.
2734
"""
@@ -142,7 +149,9 @@ def wrapper(*args, **kwargs):
142149

143150

144151
def _get_timeouts(
145-
operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, table
152+
operation: float | TABLE_DEFAULT,
153+
attempt: float | None | TABLE_DEFAULT,
154+
table: "TableAsync",
146155
) -> tuple[float, float]:
147156
"""
148157
Convert passed in timeout values to floats, using table defaults if necessary.
@@ -209,3 +218,21 @@ def _validate_timeouts(
209218
elif attempt_timeout is not None:
210219
if attempt_timeout <= 0:
211220
raise ValueError("attempt_timeout must be greater than 0")
221+
222+
223+
def _get_retryable_errors(
224+
call_codes: Sequence["grpc.StatusCode" | int | type[Exception]] | TABLE_DEFAULT,
225+
table: "TableAsync",
226+
) -> list[type[Exception]]:
227+
# load table defaults if necessary
228+
if call_codes == TABLE_DEFAULT.DEFAULT:
229+
call_codes = table.default_retryable_errors
230+
elif call_codes == TABLE_DEFAULT.READ_ROWS:
231+
call_codes = table.default_read_rows_retryable_errors
232+
elif call_codes == TABLE_DEFAULT.MUTATE_ROWS:
233+
call_codes = table.default_mutate_rows_retryable_errors
234+
235+
return [
236+
e if isinstance(e, type) else type(core_exceptions.from_grpc_status(e, ""))
237+
for e in call_codes
238+
]

tests/unit/data/_async/test__mutate_rows.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ def _make_one(self, *args, **kwargs):
4646
if not args:
4747
kwargs["gapic_client"] = kwargs.pop("gapic_client", mock.Mock())
4848
kwargs["table"] = kwargs.pop("table", AsyncMock())
49-
kwargs["mutation_entries"] = kwargs.pop("mutation_entries", [])
5049
kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5)
5150
kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1)
51+
kwargs["retryable_exceptions"] = kwargs.pop("retryable_exceptions", ())
52+
kwargs["mutation_entries"] = kwargs.pop("mutation_entries", [])
5253
return self._target_class()(*args, **kwargs)
5354

5455
async def _mock_stream(self, mutation_list, error_dict):
@@ -78,15 +79,21 @@ def test_ctor(self):
7879
from google.cloud.bigtable.data._async._mutate_rows import _EntryWithProto
7980
from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete
8081
from google.api_core.exceptions import DeadlineExceeded
81-
from google.api_core.exceptions import ServiceUnavailable
82+
from google.api_core.exceptions import Aborted
8283

8384
client = mock.Mock()
8485
table = mock.Mock()
8586
entries = [_make_mutation(), _make_mutation()]
8687
operation_timeout = 0.05
8788
attempt_timeout = 0.01
89+
retryable_exceptions = ()
8890
instance = self._make_one(
89-
client, table, entries, operation_timeout, attempt_timeout
91+
client,
92+
table,
93+
entries,
94+
operation_timeout,
95+
attempt_timeout,
96+
retryable_exceptions,
9097
)
9198
# running gapic_fn should trigger a client call
9299
assert client.mutate_rows.call_count == 0
@@ -110,8 +117,8 @@ def test_ctor(self):
110117
assert next(instance.timeout_generator) == attempt_timeout
111118
# ensure predicate is set
112119
assert instance.is_retryable is not None
113-
assert instance.is_retryable(DeadlineExceeded("")) is True
114-
assert instance.is_retryable(ServiceUnavailable("")) is True
120+
assert instance.is_retryable(DeadlineExceeded("")) is False
121+
assert instance.is_retryable(Aborted("")) is False
115122
assert instance.is_retryable(_MutateRowsIncomplete("")) is True
116123
assert instance.is_retryable(RuntimeError("")) is False
117124
assert instance.remaining_indices == list(range(len(entries)))
@@ -232,7 +239,7 @@ async def test_mutate_rows_exception(self, exc_type):
232239

233240
@pytest.mark.parametrize(
234241
"exc_type",
235-
[core_exceptions.DeadlineExceeded, core_exceptions.ServiceUnavailable],
242+
[core_exceptions.DeadlineExceeded, RuntimeError],
236243
)
237244
@pytest.mark.asyncio
238245
async def test_mutate_rows_exception_retryable_eventually_pass(self, exc_type):
@@ -256,7 +263,12 @@ async def test_mutate_rows_exception_retryable_eventually_pass(self, exc_type):
256263
) as attempt_mock:
257264
attempt_mock.side_effect = [expected_cause] * num_retries + [None]
258265
instance = self._make_one(
259-
client, table, entries, operation_timeout, operation_timeout
266+
client,
267+
table,
268+
entries,
269+
operation_timeout,
270+
operation_timeout,
271+
retryable_exceptions=(exc_type,),
260272
)
261273
await instance.start()
262274
assert attempt_mock.call_count == num_retries + 1

0 commit comments

Comments
 (0)