Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
self.max_row_bytes = max_row_bytes

def mutate(self, row):
mutation_count = len(row._get_mutations())
if mutation_count > MAX_MUTATIONS:
raise MaxMutationsError(
"The row key {} exceeds the number of mutations {}.".format(
row.row_key, mutation_count
)
)

if (self.total_mutation_count + mutation_count) >= MAX_MUTATIONS:
self.flush()

self.rows.append(row)
self.total_mutation_count += mutation_count
self.total_size += row.get_mutations_size()

if self.total_size >= self.max_row_bytes or len(self.rows) >= self.flush_count:
self.flush()

async def async_mutate(self, row):
"""Add a row to the batch. If the current batch meets one of the size
limits, the batch is sent synchronously.

Expand Down Expand Up @@ -95,16 +114,20 @@ def mutate(self, row):
)

if (self.total_mutation_count + mutation_count) >= MAX_MUTATIONS:
self.flush()
await self.async_flush()

self.rows.append(row)
self.total_mutation_count += mutation_count
self.total_size += row.get_mutations_size()

if self.total_size >= self.max_row_bytes or len(self.rows) >= self.flush_count:
self.flush()
await self.async_flush()

def mutate_rows(self, rows):
for row in rows:
self.mutate(row)

async def async_mutate_rows(self, rows):
"""Add multiple rows to the batch. If the current batch meets one of the size
limits, the batch is sent synchronously.

Expand All @@ -127,9 +150,16 @@ def mutate_rows(self, rows):
mutations count.
"""
for row in rows:
self.mutate(row)
await self.async_mutate(row)

def flush(self):
if len(self.rows) != 0:
self.table.mutate_rows(self.rows)
self.total_mutation_count = 0
self.total_size = 0
self.rows = []

async def async_flush(self):
"""Sends the current. batch to Cloud Bigtable.
For example:

Expand All @@ -140,7 +170,7 @@ def flush(self):

"""
if len(self.rows) != 0:
self.table.mutate_rows(self.rows)
await self.table.async_mutate_rows(self.rows)
self.total_mutation_count = 0
self.total_size = 0
self.rows = []
22 changes: 21 additions & 1 deletion google/cloud/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@

from google.cloud import bigtable_v2
from google.cloud import bigtable_admin_v2
from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport
from google.cloud.bigtable_v2.services.bigtable.transports import (
BigtableGrpcTransport,
BigtableGrpcAsyncIOTransport,
)
from google.cloud.bigtable_admin_v2.services.bigtable_instance_admin.transports import (
BigtableInstanceAdminGrpcTransport,
)
Expand Down Expand Up @@ -145,6 +148,7 @@ class Client(ClientWithProject):
"""

_table_data_client = None
_async_table_data_client = None
_table_admin_client = None
_instance_admin_client = None

Expand Down Expand Up @@ -341,6 +345,22 @@ def table_data_client(self):
self._table_data_client = klass(self)
return self._table_data_client

@property
def async_table_data_client(self):
"""only mutate_rows is supported here"""
if self._async_table_data_client is None:
transport = self._create_gapic_client_channel(
bigtable_v2.BigtableAsyncClient,
BigtableGrpcAsyncIOTransport,
)
klass = _create_gapic_client(
bigtable_v2.BigtableAsyncClient,
client_options=self._client_options,
transport=transport,
)
self._async_table_data_client = klass(self)
return self._async_table_data_client

@property
def table_admin_client(self):
"""Getter for the gRPC stub used for the Table Admin API.
Expand Down
147 changes: 147 additions & 0 deletions google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.api_core.gapic_v1.method import DEFAULT
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud._helpers import _to_bytes # type: ignore
from google.cloud.bigtable.backup import Backup
from google.cloud.bigtable.column_family import _gc_rule_from_pb
Expand Down Expand Up @@ -91,6 +92,14 @@ class _BigtableRetryableError(Exception):
Used by :meth:`~google.cloud.bigtable.table.Table.mutate_rows`.
"""

ASYNC_DEFAULT_RETRY = AsyncRetry(
predicate=if_exception_type(_BigtableRetryableError),
initial=1.0,
maximum=15.0,
multiplier=2.0,
deadline=120.0, # 2 minutes
)


class TableMismatchError(ValueError):
"""Row from another table."""
Expand Down Expand Up @@ -677,6 +686,20 @@ def yield_rows(self, **kwargs):
)
return self.read_rows(**kwargs)

async def async_mutate_rows(self, rows, retry=ASYNC_DEFAULT_RETRY, timeout=DEFAULT):
"""async mutate_rows"""
if timeout is DEFAULT:
timeout = self.mutation_timeout

retryable_mutate_rows = _AsyncRetryableMutateRowsWorker(
self._instance._client,
self.name,
rows,
app_profile_id=self._app_profile_id,
timeout=timeout,
)
return await retryable_mutate_rows(retry=retry)

def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT):
"""Mutates multiple rows in bulk.

Expand Down Expand Up @@ -1179,6 +1202,130 @@ def _do_mutate_retryable_rows(self):
return self.responses_statuses


class _AsyncRetryableMutateRowsWorker(object):
"""Retry async mutate rows"""

def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None):
self.client = client
self.table_name = table_name
self.rows = rows
self.app_profile_id = app_profile_id
self.responses_statuses = [None] * len(self.rows)
self.timeout = timeout

async def __call__(self, retry=ASYNC_DEFAULT_RETRY):
"""Attempt to mutate all rows and retry rows with transient errors.

Will retry the rows with transient errors until all rows succeed or
``deadline`` specified in the `retry` is reached.

:rtype: list
:returns: A list of response statuses (`google.rpc.status_pb2.Status`)
corresponding to success or failure of each row mutation
sent. These will be in the same order as the ``rows``.
"""
mutate_rows = self._do_mutate_retryable_rows
if retry:
mutate_rows = retry(self._do_mutate_retryable_rows)

try:
await mutate_rows()
except (_BigtableRetryableError, RetryError):
# - _BigtableRetryableError raised when no retry strategy is used
# and a retryable error on a mutation occurred.
# - RetryError raised when retry deadline is reached.
# In both cases, just return current `responses_statuses`.
pass

return self.responses_statuses

@staticmethod
def _is_retryable(status):
return status is None or status.code in RETRYABLE_CODES

async def _do_mutate_retryable_rows(self):
"""Mutate all the rows that are eligible for retry.

A row is eligible for retry if it has not been tried or if it resulted
in a transient error in a previous call.

:rtype: list
:return: The responses statuses, which is a list of
:class:`~google.rpc.status_pb2.Status`.
:raises: One of the following:

* :exc:`~.table._BigtableRetryableError` if any
row returned a transient error.
* :exc:`RuntimeError` if the number of responses doesn't
match the number of rows that were retried
"""
retryable_rows = []
index_into_all_rows = []
for index, status in enumerate(self.responses_statuses):
if self._is_retryable(status):
retryable_rows.append(self.rows[index])
index_into_all_rows.append(index)

if not retryable_rows:
# All mutations are either successful or non-retryable now.
return self.responses_statuses

entries = _compile_mutation_entries(self.table_name, retryable_rows)
data_client = self.client.async_table_data_client

kwargs = {}
if self.timeout is not None:
kwargs["timeout"] = timeout.ExponentialTimeout(deadline=self.timeout)

try:
responses = await data_client.mutate_rows(
table_name=self.table_name,
entries=entries,
app_profile_id=self.app_profile_id,
retry=None,
**kwargs
)
except RETRYABLE_MUTATION_ERRORS as exc:
# If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is
# returned from the initial call, consider
# it to be retryable. Wrap as a Bigtable Retryable Error.
# For InternalServerError, it is only retriable if the message is related to RST Stream messages
if _retriable_internal_server_error(exc) or not isinstance(
exc, InternalServerError
):
raise _BigtableRetryableError
else:
# re-raise the original exception
raise

num_responses = 0
num_retryable_responses = 0

data = await responses.read()

for entry in data.entries:
num_responses += 1
index = index_into_all_rows[entry.index]
self.responses_statuses[index] = entry.status
if self._is_retryable(entry.status):
num_retryable_responses += 1
if entry.status.code == 0:
self.rows[index].clear()

if len(retryable_rows) != num_responses:
raise RuntimeError(
"Unexpected number of responses",
num_responses,
"Expected",
len(retryable_rows),
)

if num_retryable_responses:
raise _BigtableRetryableError

return self.responses_statuses


class ClusterState(object):
"""Representation of a Cluster State.

Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable_v2/services/bigtable/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ async def mutate_row(
# Done; return the response.
return response

def mutate_rows(
async def mutate_rows(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be fixed in gapic generator.
Related PR: googleapis/gapic-generator-python#1502

self,
request: Optional[Union[bigtable.MutateRowsRequest, dict]] = None,
*,
Expand Down Expand Up @@ -626,7 +626,7 @@ def mutate_rows(
)

# Send the request.
response = rpc(
response = await rpc(
request,
retry=retry,
timeout=timeout,
Expand Down