Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
class RedisOnlineStore(OnlineStore):
_client: Optional[Union[Redis, RedisCluster]] = None

def delete_table_values(self, config: RepoConfig, table: FeatureView):
def delete_entity_values(self, config: RepoConfig, join_keys: List[str]):
client = self._get_client(config.online_store)
deleted_count = 0
pipeline = client.pipeline()
prefix = _redis_key_prefix(table.entities)
prefix = _redis_key_prefix(join_keys)

for _k in client.scan_iter(
b"".join([prefix, b"*", config.project.encode("utf8")])
Expand All @@ -85,7 +85,7 @@ def delete_table_values(self, config: RepoConfig, table: FeatureView):
deleted_count += 1
pipeline.execute()

logger.debug(f"Deleted {deleted_count} keys for {table.name}")
logger.debug(f"Deleted {deleted_count} rows for entity {', '.join(join_keys)}")

@log_exceptions_and_usage(online_store="redis")
def update(
Expand All @@ -98,10 +98,16 @@ def update(
partial: bool,
):
"""
We delete the keys in redis for tables/views being removed.
Look for join_keys (list of entities) that are not in use anymore
(usually this happens when the last feature view that was using specific compound key is deleted)
and remove all features attached to this "join_keys".
"""
for table in tables_to_delete:
self.delete_table_values(config, table)
join_keys_to_keep = set(tuple(table.entities) for table in tables_to_keep)

join_keys_to_delete = set(tuple(table.entities) for table in tables_to_delete)

for join_keys in join_keys_to_delete - join_keys_to_keep:
self.delete_entity_values(config, list(join_keys))

def teardown(
self,
Expand All @@ -112,8 +118,10 @@ def teardown(
"""
We delete the keys in redis for tables/views being removed.
"""
for table in tables:
self.delete_table_values(config, table)
join_keys_to_delete = set(tuple(table.entities) for table in tables)

for join_keys in join_keys_to_delete:
self.delete_entity_values(config, list(join_keys))

@staticmethod
def _parse_connection_string(connection_string: str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union

import pandas as pd
import yaml
Expand Down Expand Up @@ -283,7 +283,9 @@ def construct_test_environment(
execution_role_name="arn:aws:iam::402087665549:role/lambda_execution_role",
)

registry = f"s3://feast-integration-tests/registries/{project}/registry.db"
registry = (
f"s3://feast-integration-tests/registries/{project}/registry.db"
) # type: Union[str, RegistryConfig]
else:
# Note: even if it's a local feature server, the repo config does not have this configured
feature_server = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from tests.integration.feature_repos.universal.feature_views import (
create_driver_hourly_stats_feature_view,
driver_feature_view,
)
from tests.utils.data_source_utils import prep_file_source

Expand Down Expand Up @@ -503,6 +504,79 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name
)


@pytest.mark.integration
@pytest.mark.universal
def test_online_store_cleanup(environment, universal_data_sources):
"""
Some online store implementations (like Redis) keep features from different features views
but with common entities together.
This might end up with deletion of all features attached to the entity,
when only one feature view was deletion target (see https://github.com/feast-dev/feast/issues/2150).

Plan:
1. Register two feature views with common entity "driver"
2. Materialize data
3. Check if features are available (via online retrieval)
4. Delete one feature view
5. Check that features for other are still available
6. Delete another feature view (and create again)
7. Verify that features for both feature view were deleted
"""
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
driver_stats_fv = construct_universal_feature_views(data_sources)["driver"]

df = pd.DataFrame(
{
"ts_1": [environment.end_date] * len(entities["driver"]),
"created_ts": [environment.end_date] * len(entities["driver"]),
"driver_id": entities["driver"],
"value": np.random.random(size=len(entities["driver"])),
}
)

ds = environment.data_source_creator.create_data_source(
df, destination_name="simple_driver_dataset"
)

simple_driver_fv = driver_feature_view(
data_source=ds, name="test_universal_online_simple_driver"
)

fs.apply([driver(), simple_driver_fv, driver_stats_fv])

fs.materialize(
environment.start_date - timedelta(days=1),
environment.end_date + timedelta(days=1),
)
expected_values = df.sort_values(by="driver_id")

features = [f"{simple_driver_fv.name}:value"]
entity_rows = [{"driver": driver_id} for driver_id in sorted(entities["driver"])]

online_features = fs.get_online_features(
features=features, entity_rows=entity_rows
).to_dict()
assert np.allclose(expected_values["value"], online_features["value"])

fs.apply(
objects=[simple_driver_fv], objects_to_delete=[driver_stats_fv], partial=False
)

online_features = fs.get_online_features(
features=features, entity_rows=entity_rows
).to_dict()
assert np.allclose(expected_values["value"], online_features["value"])

fs.apply(objects=[], objects_to_delete=[simple_driver_fv], partial=False)
fs.apply([simple_driver_fv])

online_features = fs.get_online_features(
features=features, entity_rows=entity_rows
).to_dict()
assert all(v is None for v in online_features["value"])


def response_feature_name(feature: str, full_feature_names: bool) -> str:
if (
feature in {"current_balance", "avg_passenger_count", "lifetime_trip_count"}
Expand Down