Skip to content

Commit 4cc0fc7

Browse files
feat: Update ExecuteQuery to use Prepare (#1100)
* feat: update execute_query to use PrepareQuery API (#1095) * feat: Implement updated execute query protocol (#1096) * feat: Refactor Metadata, add system tests, remove preview warning (#1099) * Fix setup.py merge * fix: skip sql tests for emulator
1 parent 97e5f24 commit 4cc0fc7

31 files changed

+1871
-1131
lines changed

packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/client.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@
3535
from grpc import Channel
3636

3737
from google.cloud.bigtable.data.execute_query.values import ExecuteQueryValueType
38-
from google.cloud.bigtable.data.execute_query.metadata import SqlType
38+
from google.cloud.bigtable.data.execute_query.metadata import (
39+
SqlType,
40+
_pb_metadata_to_metadata_types,
41+
)
3942
from google.cloud.bigtable.data.execute_query._parameters_formatting import (
4043
_format_execute_query_params,
44+
_to_param_types,
4145
)
4246
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
4347
DEFAULT_CLIENT_INFO,
@@ -59,7 +63,7 @@
5963
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
6064
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
6165

62-
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
66+
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
6367
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
6468
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
6569
from google.cloud.bigtable.data._helpers import _retry_exception_factory
@@ -542,6 +546,12 @@ async def execute_query(
542546
ServiceUnavailable,
543547
Aborted,
544548
),
549+
prepare_operation_timeout: float = 60,
550+
prepare_attempt_timeout: float | None = 20,
551+
prepare_retryable_errors: Sequence[type[Exception]] = (
552+
DeadlineExceeded,
553+
ServiceUnavailable,
554+
),
545555
) -> "ExecuteQueryIteratorAsync":
546556
"""
547557
Executes an SQL query on an instance.
@@ -550,6 +560,10 @@ async def execute_query(
550560
Failed requests within operation_timeout will be retried based on the
551561
retryable_errors list until operation_timeout is reached.
552562
563+
Note that this makes two requests, one to ``PrepareQuery`` and one to ``ExecuteQuery``.
564+
These have separate retry configurations. ``ExecuteQuery`` is where the bulk of the
565+
work happens.
566+
553567
Args:
554568
query: Query to be run on Bigtable instance. The query can use ``@param``
555569
placeholders to use parameter interpolation on the server. Values for all
@@ -566,16 +580,26 @@ async def execute_query(
566580
an empty dict).
567581
app_profile_id: The app profile to associate with requests.
568582
https://cloud.google.com/bigtable/docs/app-profiles
569-
operation_timeout: the time budget for the entire operation, in seconds.
583+
operation_timeout: the time budget for the entire executeQuery operation, in seconds.
570584
Failed requests will be retried within the budget.
571585
Defaults to 600 seconds.
572-
attempt_timeout: the time budget for an individual network request, in seconds.
586+
attempt_timeout: the time budget for an individual executeQuery network request, in seconds.
573587
If it takes longer than this time to complete, the request will be cancelled with
574588
a DeadlineExceeded exception, and a retry will be attempted.
575589
Defaults to the 20 seconds.
576590
If None, defaults to operation_timeout.
577-
retryable_errors: a list of errors that will be retried if encountered.
591+
retryable_errors: a list of errors that will be retried if encountered during executeQuery.
578592
Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted)
593+
prepare_operation_timeout: the time budget for the entire prepareQuery operation, in seconds.
594+
Failed requests will be retried within the budget.
595+
Defaults to 60 seconds.
596+
prepare_attempt_timeout: the time budget for an individual prepareQuery network request, in seconds.
597+
If it takes longer than this time to complete, the request will be cancelled with
598+
a DeadlineExceeded exception, and a retry will be attempted.
599+
Defaults to the 20 seconds.
600+
If None, defaults to prepare_operation_timeout.
601+
prepare_retryable_errors: a list of errors that will be retried if encountered during prepareQuery.
602+
Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable)
579603
Returns:
580604
ExecuteQueryIteratorAsync: an asynchronous iterator that yields rows returned by the query
581605
Raises:
@@ -586,30 +610,59 @@ async def execute_query(
586610
google.cloud.bigtable.data.exceptions.ParameterTypeInferenceFailed: Raised if
587611
a parameter is passed without an explicit type, and the type cannot be infered
588612
"""
589-
warnings.warn(
590-
"ExecuteQuery is in preview and may change in the future.",
591-
category=RuntimeWarning,
613+
instance_name = self._gapic_client.instance_path(self.project, instance_id)
614+
converted_param_types = _to_param_types(parameters, parameter_types)
615+
prepare_request = {
616+
"instance_name": instance_name,
617+
"query": query,
618+
"app_profile_id": app_profile_id,
619+
"param_types": converted_param_types,
620+
"proto_format": {},
621+
}
622+
prepare_predicate = retries.if_exception_type(
623+
*[_get_error_type(e) for e in prepare_retryable_errors]
624+
)
625+
prepare_operation_timeout, prepare_attempt_timeout = _align_timeouts(
626+
prepare_operation_timeout, prepare_attempt_timeout
627+
)
628+
prepare_sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
629+
630+
target = partial(
631+
self._gapic_client.prepare_query,
632+
request=prepare_request,
633+
timeout=prepare_attempt_timeout,
634+
retry=None,
635+
)
636+
prepare_result = await CrossSync.retry_target(
637+
target,
638+
prepare_predicate,
639+
prepare_sleep_generator,
640+
prepare_operation_timeout,
641+
exception_factory=_retry_exception_factory,
592642
)
593643

644+
prepare_metadata = _pb_metadata_to_metadata_types(prepare_result.metadata)
645+
594646
retryable_excs = [_get_error_type(e) for e in retryable_errors]
595647

596648
pb_params = _format_execute_query_params(parameters, parameter_types)
597649

598-
instance_name = self._gapic_client.instance_path(self.project, instance_id)
599-
600650
request_body = {
601651
"instance_name": instance_name,
602652
"app_profile_id": app_profile_id,
603-
"query": query,
653+
"prepared_query": prepare_result.prepared_query,
604654
"params": pb_params,
605-
"proto_format": {},
606655
}
656+
operation_timeout, attempt_timeout = _align_timeouts(
657+
operation_timeout, attempt_timeout
658+
)
607659

608660
return CrossSync.ExecuteQueryIterator(
609661
self,
610662
instance_id,
611663
app_profile_id,
612664
request_body,
665+
prepare_metadata,
613666
attempt_timeout,
614667
operation_timeout,
615668
retryable_excs=retryable_excs,

packages/google-cloud-bigtable/google/cloud/bigtable/data/_helpers.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def _get_timeouts(
136136
attempt: The timeout value to use for each attempt, in seconds.
137137
table: The table to use for default values.
138138
Returns:
139-
typle[float, float]: A tuple of (operation_timeout, attempt_timeout)
139+
tuple[float, float]: A tuple of (operation_timeout, attempt_timeout)
140140
"""
141141
# load table defaults if necessary
142142
if operation == TABLE_DEFAULT.DEFAULT:
@@ -154,15 +154,33 @@ def _get_timeouts(
154154
elif attempt == TABLE_DEFAULT.MUTATE_ROWS:
155155
attempt = table.default_mutate_rows_attempt_timeout
156156

157+
return _align_timeouts(final_operation, attempt)
158+
159+
160+
def _align_timeouts(operation: float, attempt: float | None) -> tuple[float, float]:
161+
"""
162+
Convert passed in timeout values to floats.
163+
164+
attempt will use operation value if None, or if larger than operation.
165+
166+
Will call _validate_timeouts on the outputs, and raise ValueError if the
167+
resulting timeouts are invalid.
168+
169+
Args:
170+
operation: The timeout value to use for the entire operation, in seconds.
171+
attempt: The timeout value to use for each attempt, in seconds.
172+
Returns:
173+
tuple[float, float]: A tuple of (operation_timeout, attempt_timeout)
174+
"""
157175
if attempt is None:
158176
# no timeout specified, use operation timeout for both
159-
final_attempt = final_operation
177+
final_attempt = operation
160178
else:
161179
# cap attempt timeout at operation timeout
162-
final_attempt = min(attempt, final_operation) if final_operation else attempt
180+
final_attempt = min(attempt, operation) if operation else attempt
163181

164-
_validate_timeouts(final_operation, final_attempt, allow_none=False)
165-
return final_operation, final_attempt
182+
_validate_timeouts(operation, final_attempt, allow_none=False)
183+
return operation, final_attempt
166184

167185

168186
def _validate_timeouts(

packages/google-cloud-bigtable/google/cloud/bigtable/data/_sync_autogen/client.py

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@
2626
from functools import partial
2727
from grpc import Channel
2828
from google.cloud.bigtable.data.execute_query.values import ExecuteQueryValueType
29-
from google.cloud.bigtable.data.execute_query.metadata import SqlType
29+
from google.cloud.bigtable.data.execute_query.metadata import (
30+
SqlType,
31+
_pb_metadata_to_metadata_types,
32+
)
3033
from google.cloud.bigtable.data.execute_query._parameters_formatting import (
3134
_format_execute_query_params,
35+
_to_param_types,
3236
)
3337
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
3438
DEFAULT_CLIENT_INFO,
@@ -48,7 +52,7 @@
4852
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
4953
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
5054
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
51-
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
55+
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
5256
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
5357
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
5458
from google.cloud.bigtable.data._helpers import _retry_exception_factory
@@ -404,13 +408,23 @@ def execute_query(
404408
ServiceUnavailable,
405409
Aborted,
406410
),
411+
prepare_operation_timeout: float = 60,
412+
prepare_attempt_timeout: float | None = 20,
413+
prepare_retryable_errors: Sequence[type[Exception]] = (
414+
DeadlineExceeded,
415+
ServiceUnavailable,
416+
),
407417
) -> "ExecuteQueryIterator":
408418
"""Executes an SQL query on an instance.
409419
Returns an iterator to asynchronously stream back columns from selected rows.
410420
411421
Failed requests within operation_timeout will be retried based on the
412422
retryable_errors list until operation_timeout is reached.
413423
424+
Note that this makes two requests, one to ``PrepareQuery`` and one to ``ExecuteQuery``.
425+
These have separate retry configurations. ``ExecuteQuery`` is where the bulk of the
426+
work happens.
427+
414428
Args:
415429
query: Query to be run on Bigtable instance. The query can use ``@param``
416430
placeholders to use parameter interpolation on the server. Values for all
@@ -427,16 +441,26 @@ def execute_query(
427441
an empty dict).
428442
app_profile_id: The app profile to associate with requests.
429443
https://cloud.google.com/bigtable/docs/app-profiles
430-
operation_timeout: the time budget for the entire operation, in seconds.
444+
operation_timeout: the time budget for the entire executeQuery operation, in seconds.
431445
Failed requests will be retried within the budget.
432446
Defaults to 600 seconds.
433-
attempt_timeout: the time budget for an individual network request, in seconds.
447+
attempt_timeout: the time budget for an individual executeQuery network request, in seconds.
434448
If it takes longer than this time to complete, the request will be cancelled with
435449
a DeadlineExceeded exception, and a retry will be attempted.
436450
Defaults to the 20 seconds.
437451
If None, defaults to operation_timeout.
438-
retryable_errors: a list of errors that will be retried if encountered.
452+
retryable_errors: a list of errors that will be retried if encountered during executeQuery.
439453
Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted)
454+
prepare_operation_timeout: the time budget for the entire prepareQuery operation, in seconds.
455+
Failed requests will be retried within the budget.
456+
Defaults to 60 seconds.
457+
prepare_attempt_timeout: the time budget for an individual prepareQuery network request, in seconds.
458+
If it takes longer than this time to complete, the request will be cancelled with
459+
a DeadlineExceeded exception, and a retry will be attempted.
460+
Defaults to the 20 seconds.
461+
If None, defaults to prepare_operation_timeout.
462+
prepare_retryable_errors: a list of errors that will be retried if encountered during prepareQuery.
463+
Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable)
440464
Returns:
441465
ExecuteQueryIterator: an asynchronous iterator that yields rows returned by the query
442466
Raises:
@@ -447,25 +471,53 @@ def execute_query(
447471
google.cloud.bigtable.data.exceptions.ParameterTypeInferenceFailed: Raised if
448472
a parameter is passed without an explicit type, and the type cannot be infered
449473
"""
450-
warnings.warn(
451-
"ExecuteQuery is in preview and may change in the future.",
452-
category=RuntimeWarning,
474+
instance_name = self._gapic_client.instance_path(self.project, instance_id)
475+
converted_param_types = _to_param_types(parameters, parameter_types)
476+
prepare_request = {
477+
"instance_name": instance_name,
478+
"query": query,
479+
"app_profile_id": app_profile_id,
480+
"param_types": converted_param_types,
481+
"proto_format": {},
482+
}
483+
prepare_predicate = retries.if_exception_type(
484+
*[_get_error_type(e) for e in prepare_retryable_errors]
485+
)
486+
(prepare_operation_timeout, prepare_attempt_timeout) = _align_timeouts(
487+
prepare_operation_timeout, prepare_attempt_timeout
488+
)
489+
prepare_sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
490+
target = partial(
491+
self._gapic_client.prepare_query,
492+
request=prepare_request,
493+
timeout=prepare_attempt_timeout,
494+
retry=None,
453495
)
496+
prepare_result = CrossSync._Sync_Impl.retry_target(
497+
target,
498+
prepare_predicate,
499+
prepare_sleep_generator,
500+
prepare_operation_timeout,
501+
exception_factory=_retry_exception_factory,
502+
)
503+
prepare_metadata = _pb_metadata_to_metadata_types(prepare_result.metadata)
454504
retryable_excs = [_get_error_type(e) for e in retryable_errors]
455505
pb_params = _format_execute_query_params(parameters, parameter_types)
456-
instance_name = self._gapic_client.instance_path(self.project, instance_id)
457506
request_body = {
458507
"instance_name": instance_name,
459508
"app_profile_id": app_profile_id,
460-
"query": query,
509+
"prepared_query": prepare_result.prepared_query,
461510
"params": pb_params,
462-
"proto_format": {},
463511
}
512+
(operation_timeout, attempt_timeout) = _align_timeouts(
513+
operation_timeout, attempt_timeout
514+
)
464515
return CrossSync._Sync_Impl.ExecuteQueryIterator(
465516
self,
466517
instance_id,
467518
app_profile_id,
468519
request_body,
520+
prepare_metadata,
469521
attempt_timeout,
470522
operation_timeout,
471523
retryable_excs=retryable_excs,

packages/google-cloud-bigtable/google/cloud/bigtable/data/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,3 +334,7 @@ class InvalidExecuteQueryResponse(core_exceptions.GoogleAPICallError):
334334

335335
class ParameterTypeInferenceFailed(ValueError):
336336
"""Exception raised when query parameter types were not provided and cannot be inferred."""
337+
338+
339+
class EarlyMetadataCallError(RuntimeError):
340+
"""Execption raised when metadata is request from an ExecuteQueryIterator before the first row has been read, or the query has completed"""

packages/google-cloud-bigtable/google/cloud/bigtable/data/execute_query/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
)
2121
from google.cloud.bigtable.data.execute_query.metadata import (
2222
Metadata,
23-
ProtoMetadata,
2423
SqlType,
2524
)
2625
from google.cloud.bigtable.data.execute_query.values import (
@@ -39,7 +38,6 @@
3938
"QueryResultRow",
4039
"Struct",
4140
"Metadata",
42-
"ProtoMetadata",
4341
"ExecuteQueryIteratorAsync",
4442
"ExecuteQueryIterator",
4543
]

0 commit comments

Comments
 (0)