Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
efec02f
feat: support query profiling
Linchin Jul 17, 2024
886c3b8
collection
Linchin Jul 17, 2024
4dde816
fix unit tests
Linchin Jul 18, 2024
431db7f
unit tests
Linchin Jul 21, 2024
1ebcb56
vector get and stream, unit tests
Linchin Jul 21, 2024
7338db9
aggregation get and stream, unit tests
Linchin Jul 21, 2024
ca6adc1
docstring
Linchin Jul 22, 2024
05d7578
query profile unit tests
Linchin Jul 22, 2024
dc97653
update base classes' method signature
Linchin Jul 22, 2024
75377fc
documentsnapshotlist unit tests
Linchin Jul 22, 2024
60acc88
func signatures
Linchin Jul 23, 2024
422d966
undo client.py change
Linchin Jul 23, 2024
bb12421
transaction.get()
Linchin Jul 23, 2024
83e62bb
lint
Linchin Jul 24, 2024
b53805d
system test
Linchin Jul 24, 2024
c11f382
fix shim test
Linchin Jul 24, 2024
841a754
fix sys test
Linchin Jul 24, 2024
a5748b2
fix sys test
Linchin Jul 24, 2024
920db4b
system test
Linchin Jul 24, 2024
7023417
another system test
Linchin Jul 25, 2024
1842709
skip system test in emulator
Linchin Jul 25, 2024
74db4fe
stream generator unit tests
Linchin Jul 25, 2024
8a4c5e4
coverage
Linchin Jul 26, 2024
e7dda7c
add system tests
Linchin Jul 26, 2024
e61815f
Merge branch 'main' into query-profiling-3
Linchin Jul 26, 2024
23b88b9
small fixes
Linchin Jul 26, 2024
7912c95
undo document change
Linchin Jul 29, 2024
aa7b3d9
add system tests
Linchin Jul 29, 2024
46e5139
vector query system tests
Linchin Jul 29, 2024
299af43
format
Linchin Jul 29, 2024
41fd646
fix system test
Linchin Jul 29, 2024
ca631ec
comments
Linchin Jul 29, 2024
10cc536
add system tests
Linchin Jul 29, 2024
697775d
improve stream generator
Linchin Jul 31, 2024
30e5efc
type checking
Linchin Aug 1, 2024
981a644
adding stars
Linchin Aug 1, 2024
28abbf2
delete comment
Linchin Aug 1, 2024
9ce86be
remove coverage requirements for type checking part
Linchin Aug 1, 2024
dff947e
add explain_options to StreamGenerator
Linchin Aug 1, 2024
4019d64
yield tuple instead
Linchin Aug 5, 2024
978268b
raise exception when explain_metrics is absent
Linchin Aug 6, 2024
c293787
refactor documentsnapshotlist into queryresultslist
Linchin Aug 6, 2024
6fc1600
add comment
Linchin Aug 6, 2024
a4e87bf
improve type hint
Linchin Aug 6, 2024
cb693f8
lint
Linchin Aug 6, 2024
7439e76
move QueryResultsList to stream_generator.py
Linchin Aug 6, 2024
bb60ce0
aggregation related type annotation
Linchin Aug 6, 2024
6f86854
transaction return type hint
Linchin Aug 7, 2024
05424bf
refactor QueryResultsList
Linchin Aug 7, 2024
5e15e6a
change stream generator to return ExplainMetrics instead of yield
Linchin Aug 12, 2024
0906d65
update aggregation query to use the new generator
Linchin Aug 13, 2024
ccbb623
update query to use the new generator
Linchin Aug 13, 2024
843dc05
update vector query to use the new generator
Linchin Aug 13, 2024
066ead5
lint
Linchin Aug 13, 2024
f542ac1
type annotations
Linchin Aug 15, 2024
5fb71dd
Merge branch 'main' into query-profiling-3
Linchin Aug 15, 2024
8b82957
fix type annotation to be python 3.9 compatible
Linchin Aug 15, 2024
a400f6f
fix type hint for python 3.8
Linchin Aug 15, 2024
12d18c0
fix system test
Linchin Aug 15, 2024
9bf8b00
add test coverage
Linchin Aug 16, 2024
7ae1028
use class method get_explain_metrics() instead of property explain_me…
Linchin Aug 19, 2024
f08a35d
Merge branch 'main' into query-profiling-3
Linchin Aug 26, 2024
560ba95
address comments
Linchin Aug 27, 2024
4955a0b
remove more Optional
Linchin Aug 27, 2024
1293a36
add type hint for async stream generator
Linchin Aug 28, 2024
bd18af2
simplify yield in aggregation stream
Linchin Aug 29, 2024
ab9e3bf
stream generator type annotation
Linchin Aug 29, 2024
fa45e05
more type hints
Linchin Aug 30, 2024
bb172be
remove "Integer"
Linchin Aug 30, 2024
d231424
docstring format
Linchin Aug 30, 2024
765420a
mypy
Linchin Sep 4, 2024
71bde2a
add more input verification for query_results.py
Linchin Sep 5, 2024
cf89254
Merge branch 'main' into query-profiling-3
Linchin Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/firestore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.firestore_v1 import DocumentSnapshot
from google.cloud.firestore_v1 import DocumentTransform
from google.cloud.firestore_v1 import ExistsOption
from google.cloud.firestore_v1 import ExplainOptions
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1 import GeoPoint
from google.cloud.firestore_v1 import Increment
Expand Down Expand Up @@ -78,6 +79,7 @@
"DocumentSnapshot",
"DocumentTransform",
"ExistsOption",
"ExplainOptions",
"FieldFilter",
"GeoPoint",
"Increment",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/firestore_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.query import CollectionGroup, Query
from google.cloud.firestore_v1.query_profile import ExplainOptions
from google.cloud.firestore_v1.transaction import Transaction, transactional
from google.cloud.firestore_v1.transforms import (
DELETE_FIELD,
Expand Down Expand Up @@ -131,6 +132,7 @@
"DocumentSnapshot",
"DocumentTransform",
"ExistsOption",
"ExplainOptions",
"FieldFilter",
"GeoPoint",
"Increment",
Expand Down
90 changes: 73 additions & 17 deletions google/cloud/firestore_v1/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
BaseAggregationQuery,
_query_response_to_result,
)
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from google.cloud.firestore_v1.query_results import QueryResultsList
from google.cloud.firestore_v1.stream_generator import StreamGenerator

# Types needed only for Type Hints
if TYPE_CHECKING:
from google.cloud.firestore_v1 import transaction # pragma: NO COVER
if TYPE_CHECKING: # pragma: NO COVER
from google.cloud.firestore_v1 import transaction
from google.cloud.firestore_v1.query_profile import ExplainMetrics
from google.cloud.firestore_v1.query_profile import ExplainOptions


class AggregationQuery(BaseAggregationQuery):
Expand All @@ -54,10 +56,14 @@ def get(
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
) -> List[AggregationResult]:
*,
explain_options: Optional[ExplainOptions] = None,
) -> QueryResultsList[AggregationResult]:
"""Runs the aggregation query.

This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
This sends a ``RunAggregationQuery`` RPC and returns a list of
aggregation results in the stream of ``RunAggregationQueryResponse``
messages.

Args:
transaction
Expand All @@ -70,20 +76,39 @@ def get(
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.

Returns:
list: The aggregation query results
QueryResultsList[AggregationResult]: The aggregation query results.

"""
result = self.stream(transaction=transaction, retry=retry, timeout=timeout)
return list(result) # type: ignore
explain_metrics: ExplainMetrics | None = None

def _get_stream_iterator(self, transaction, retry, timeout):
result = self.stream(
transaction=transaction,
retry=retry,
timeout=timeout,
explain_options=explain_options,
)
result_list = list(result)

if explain_options is None:
explain_metrics = None
else:
explain_metrics = result.get_explain_metrics()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would it be cleaner to move this stuff into the DocumentSnapshotList constructor?

Copy link
Contributor Author

@Linchin Linchin Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this same if/else code appears repeatedly in all get() methods. But considering we want to expand DocumentSnapshotList into a more general list class that may encompass types other than DocumentSnapshot, I wonder if the same logic checking explain_options will apply to all future types? Also if we put this logic inside DocumentSnapshotList, we would need to pass the original result (StreamGenerator) into the initialization. I personally feel like it's cleaner to initialize with the explain_metrics, but I also want to hear your opinion on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I think you're right, it's probably better to leave most of this stuff outside the init

I was thinking it could be cleaner to just pass in a StreamGenerator on init, since it should contain all the information needed by DocumentSnapshotList. But that gets complicated on the async side, because init isn't an async function. So we'd have trouble building the object there

Still though, one thing to consider: if we pass in the StreamGenerator and hold a reference to it in DocumentSnapshotList, you can delegate to that instance when calling explain_metrics(), which could help reduce some duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I'm trying to implement this, I still feel a bit unsure. Passing StreamGenerator feels like we are coupling it with QueryResultsList really deeply and may make these two classes less versatile in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it seems messy, don't worry about it, I was just throwing out ideas. We can just re-visit this if the duplicated code expands in the future

IMO I think it makes sense for StreamGenerator and QueryResultsList to be coupled in some way though, either through inheritance or composition. They have essentially the same use-case, QueryResultsList just has the stream fully loaded into memory

return QueryResultsList(result_list, explain_options, explain_metrics)

def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None):
"""Helper method for :meth:`stream`."""
request, kwargs = self._prep_stream(
transaction,
retry,
timeout,
explain_options,
)

return self._client._firestore_api.run_aggregation_query(
Expand All @@ -106,9 +131,12 @@ def _retry_query_after_exception(self, exc, retry, transaction):
def _make_stream(
self,
transaction: Optional[transaction.Transaction] = None,
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
) -> Union[Generator[List[AggregationResult], Any, None]]:
explain_options: Optional[ExplainOptions] = None,
) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
"""Internal method for stream(). Runs the aggregation query.

This sends a ``RunAggregationQuery`` RPC and then returns a generator
Expand All @@ -127,16 +155,27 @@ def _make_stream(
system-specified policy.
timeout (Optional[float]): The timeout for this request. Defaults
to a system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.

Yields:
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
List[AggregationResult]:
The result of aggregations of this query.

Returns:
(Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]):
The results of query profiling, if received from the service.

"""
metrics: ExplainMetrics | None = None

response_iterator = self._get_stream_iterator(
transaction,
retry,
timeout,
explain_options,
)
while True:
try:
Expand All @@ -154,15 +193,26 @@ def _make_stream(

if response is None: # EOI
break

if metrics is None and response.explain_metrics:
metrics = response.explain_metrics

result = _query_response_to_result(response)
yield result
if result:
yield result

return metrics

def stream(
self,
transaction: Optional["transaction.Transaction"] = None,
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
) -> "StreamGenerator[DocumentSnapshot]":
*,
explain_options: Optional[ExplainOptions] = None,
) -> StreamGenerator[List[AggregationResult]]:
"""Runs the aggregation query.

This sends a ``RunAggregationQuery`` RPC and then returns a generator
Expand All @@ -181,13 +231,19 @@ def stream(
system-specified policy.
timeout (Optinal[float]): The timeout for this request. Defaults
to a system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.

Returns:
`StreamGenerator[DocumentSnapshot]`: A generator of the query results.
`StreamGenerator[List[AggregationResult]]`:
A generator of the query results.
"""
inner_generator = self._make_stream(
transaction=transaction,
retry=retry,
timeout=timeout,
explain_options=explain_options,
)
return StreamGenerator(inner_generator)
return StreamGenerator(inner_generator, explain_options)
4 changes: 2 additions & 2 deletions google/cloud/firestore_v1/async_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def get(
retries.AsyncRetry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
) -> List[AggregationResult]:
) -> List[List[AggregationResult]]:
"""Runs the aggregation query.

This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
Expand All @@ -71,7 +71,7 @@ async def get(
system-specified value.

Returns:
list: The aggregation query results
List[List[AggregationResult]]: The aggregation query results

"""
stream_result = self.stream(
Expand Down
19 changes: 11 additions & 8 deletions google/cloud/firestore_v1/async_stream_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,28 @@
Firestore API.
"""

from collections import abc
from typing import Any, AsyncGenerator, Awaitable, TypeVar


class AsyncStreamGenerator(abc.AsyncGenerator):
T = TypeVar("T")


class AsyncStreamGenerator(AsyncGenerator[T, Any]):
"""Asynchronous generator for the streamed results."""

def __init__(self, response_generator):
def __init__(self, response_generator: AsyncGenerator[T, Any]):
self._generator = response_generator

def __aiter__(self):
return self._generator
def __aiter__(self) -> AsyncGenerator[T, Any]:
return self

def __anext__(self):
def __anext__(self) -> Awaitable[T]:
return self._generator.__anext__()

def asend(self, value=None):
def asend(self, value=None) -> Awaitable[Any]:
return self._generator.asend(value)

def athrow(self, exp=None):
def athrow(self, exp=None) -> Awaitable[Any]:
return self._generator.athrow(exp)

def aclose(self):
Expand Down
64 changes: 40 additions & 24 deletions google/cloud/firestore_v1/base_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,7 @@

import abc
from abc import ABC
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Coroutine,
Generator,
List,
Optional,
Tuple,
Union,
)
from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union

from google.api_core import gapic_v1
from google.api_core import retry as retries
Expand All @@ -47,8 +37,14 @@
)

# Types needed only for Type Hints
if TYPE_CHECKING:
from google.cloud.firestore_v1 import transaction # pragma: NO COVER
if TYPE_CHECKING: # pragma: NO COVER
from google.cloud.firestore_v1 import transaction
from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
from google.cloud.firestore_v1.query_profile import ExplainOptions
from google.cloud.firestore_v1.query_results import QueryResultsList
from google.cloud.firestore_v1.stream_generator import (
StreamGenerator,
)


class AggregationResult(object):
Expand All @@ -62,7 +58,7 @@ class AggregationResult(object):
:param value: The resulting read_time
"""

def __init__(self, alias: str, value: int, read_time=None):
def __init__(self, alias: str, value: float, read_time=None):
self.alias = alias
self.value = value
self.read_time = read_time
Expand Down Expand Up @@ -211,13 +207,16 @@ def _prep_stream(
transaction=None,
retry: Union[retries.Retry, None, gapic_v1.method._MethodDefault] = None,
timeout: float | None = None,
explain_options: Optional[ExplainOptions] = None,
) -> Tuple[dict, dict]:
parent_path, expected_prefix = self._collection_ref._parent_info()
request = {
"parent": parent_path,
"structured_aggregation_query": self._to_protobuf(),
"transaction": _helpers.get_transaction_id(transaction),
}
if explain_options:
request["explain_options"] = explain_options._to_dict()
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -230,10 +229,17 @@ def get(
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
) -> List[AggregationResult] | Coroutine[Any, Any, List[AggregationResult]]:
*,
explain_options: Optional[ExplainOptions] = None,
) -> (
QueryResultsList[AggregationResult]
| Coroutine[Any, Any, List[List[AggregationResult]]]
):
"""Runs the aggregation query.

This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
This sends a ``RunAggregationQuery`` RPC and returns a list of
aggregation results in the stream of ``RunAggregationQueryResponse``
messages.

Args:
transaction
Expand All @@ -246,22 +252,27 @@ def get(
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.

Returns:
list: The aggregation query results

(QueryResultsList[List[AggregationResult]] | Coroutine[Any, Any, List[List[AggregationResult]]]):
The aggregation query results.
"""

@abc.abstractmethod
def stream(
self,
transaction: Optional[transaction.Transaction] = None,
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
) -> (
Generator[List[AggregationResult], Any, None]
| AsyncGenerator[List[AggregationResult], None]
):
*,
explain_options: Optional[ExplainOptions] = None,
) -> StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator:
"""Runs the aggregation query.

This sends a``RunAggregationQuery`` RPC and returns a generator in the stream of ``RunAggregationQueryResponse`` messages.
Expand All @@ -274,8 +285,13 @@ def stream(
errors, if any, should be retried. Defaults to a
system-specified policy.
timeout (Optinal[float]): The timeout for this request. Defaults
to a system-specified value.
to a system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.

Returns:
StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator:
A generator of the query results.
"""
Loading
Loading