Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,11 @@ def online_read(
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
else:
batch_size_nones = ((None, None),) * len(batch)
result.extend(batch_size_nones)

# Not all entities in a batch may have responses
# Pad with remaining values in batch that were not found
batch_size_nones = ((None, None),) * (len(batch) - len(result))
result.extend(batch_size_nones)
return result

def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
DynamoDBOnlineStoreConfig,
DynamoDBTable,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from tests.utils.online_store_utils import (
_create_n_customer_test_samples,
Expand Down Expand Up @@ -175,6 +177,34 @@ def test_online_read(repo_config, n_samples):
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb2
def test_online_read_unknown_entity(repo_config):
"""Test DynamoDBOnlineStore online_read method."""
n_samples = 2
_create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
data = _create_n_customer_test_samples(n=n_samples)
_insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)

entity_keys, features, *rest = zip(*data)
# Append a nonsensical value
entity_keys = list(entity_keys)
entity_keys.append(
EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val="12359")]
)
)
features = list(features)
features.append(None)
dynamodb_store = DynamoDBOnlineStore()
returned_items = dynamodb_store.online_read(
config=repo_config,
table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"),
entity_keys=entity_keys,
)
assert len(returned_items) == len(entity_keys)
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb2
def test_write_batch_non_duplicates(repo_config):
"""Test DynamoDBOnline Store deduplicate write batch request items."""
Expand Down