Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
region: StrictStr
"""AWS Region Name"""

sort_response: bool = True
"""Whether or not to sort BatchGetItem response."""

table_name_template: StrictStr = "{project}.{table_name}"
"""DynamoDB table name template"""

Expand Down Expand Up @@ -204,9 +201,6 @@ def online_read(
"""
Retrieve feature values from the online DynamoDB store.

Note: This method is currently not optimized to retrieve a lot of data at a time
as it does sequential gets from the DynamoDB table.

Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
Expand All @@ -224,7 +218,6 @@ def online_read(
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys]
batch_size = online_config.batch_size
sort_response = online_config.sort_response
entity_ids_iter = iter(entity_ids)
while True:
batch = list(itertools.islice(entity_ids_iter, batch_size))
Expand All @@ -243,20 +236,27 @@ def online_read(
response = response.get("Responses")
table_responses = response.get(table_instance.name)
if table_responses:
if sort_response:
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
)
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
)
entity_idx = 0
for tbl_res in table_responses:
entity_id = tbl_res["entity_id"]
while entity_id != batch[entity_idx]:
result.append((None, None))
entity_idx += 1
res = {}
for feature_name, value_bin in tbl_res["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
else:
batch_size_nones = ((None, None),) * len(batch)
result.extend(batch_size_nones)
entity_idx += 1

# Not all entities in a batch may have responses
# Pad with remaining values in batch that were not found
batch_size_nones = ((None, None),) * (len(batch) - len(result))
result.extend(batch_size_nones)
return result

def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def online_read(
entity_keys: a list of entity keys that should be read from the FeatureStore.
requested_features: (Optional) A subset of the features that should be read from the FeatureStore.
Returns:
Data is returned as a list, one item per entity key. Each item in the list is a tuple
of event_ts for the row, and the feature data as a dict from feature names to values.
Values are returned as Value proto message.
Data is returned as a list, one item per entity key in the original order as the entity_keys argument.
Each item in the list is a tuple of event_ts for the row, and the feature data as a dict from feature names
to values. Values are returned as Value proto message.
Comment on lines +79 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be highlighted - also smells like a fragile interface. But that discussion is out of scope for this PR.

"""
...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
DynamoDBOnlineStoreConfig,
DynamoDBTable,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from tests.utils.online_store_utils import (
_create_n_customer_test_samples,
Expand Down Expand Up @@ -49,7 +51,6 @@ def test_online_store_config_default():
assert dynamodb_store_config.batch_size == 40
assert dynamodb_store_config.endpoint_url is None
assert dynamodb_store_config.region == aws_region
assert dynamodb_store_config.sort_response is True
assert dynamodb_store_config.table_name_template == "{project}.{table_name}"


Expand All @@ -70,20 +71,17 @@ def test_online_store_config_custom_params():
aws_region = "us-west-2"
batch_size = 20
endpoint_url = "http://localhost:8000"
sort_response = False
table_name_template = "feast_test.dynamodb_table"
dynamodb_store_config = DynamoDBOnlineStoreConfig(
region=aws_region,
batch_size=batch_size,
endpoint_url=endpoint_url,
sort_response=sort_response,
table_name_template=table_name_template,
)
assert dynamodb_store_config.type == "dynamodb"
assert dynamodb_store_config.batch_size == batch_size
assert dynamodb_store_config.endpoint_url == endpoint_url
assert dynamodb_store_config.region == aws_region
assert dynamodb_store_config.sort_response == sort_response
assert dynamodb_store_config.table_name_template == table_name_template


Expand Down Expand Up @@ -175,6 +173,42 @@ def test_online_read(repo_config, n_samples):
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb2
def test_online_read_unknown_entity(repo_config):
"""Test DynamoDBOnlineStore online_read method."""
n_samples = 2
_create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
data = _create_n_customer_test_samples(n=n_samples)
_insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)

entity_keys, features, *rest = zip(*data)
# Append a nonsensical entity to search for
entity_keys = list(entity_keys)
features = list(features)
dynamodb_store = DynamoDBOnlineStore()

# Have the unknown entity be in the beginning, middle, and end of the list of entities.
for pos in range(len(entity_keys)):
entity_keys_with_unknown = deepcopy(entity_keys)
entity_keys_with_unknown.insert(
pos,
EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val="12359")]
),
)
features_with_none = deepcopy(features)
features_with_none.insert(pos, None)
returned_items = dynamodb_store.online_read(
config=repo_config,
table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"),
entity_keys=entity_keys_with_unknown,
)
assert len(returned_items) == len(entity_keys_with_unknown)
assert [item[1] for item in returned_items] == list(features_with_none)
# The order should match the original entity key order
assert returned_items[pos] == (None, None)


@mock_dynamodb2
def test_write_batch_non_duplicates(repo_config):
"""Test DynamoDBOnline Store deduplicate write batch request items."""
Expand Down