Skip to content

Commit 269055e

Browse files
authored
fix: Add columns for user metadata in the tables (#2760)
* fix: Add columns for user metadata in the tables Signed-off-by: Achal Shah <[email protected]> * registry -> base registry Signed-off-by: Achal Shah <[email protected]> * metadata methods Signed-off-by: Achal Shah <[email protected]> * metadata methods Signed-off-by: Achal Shah <[email protected]> * tests Signed-off-by: Achal Shah <[email protected]> * one more test assert Signed-off-by: Achal Shah <[email protected]> * cr update Signed-off-by: Achal Shah <[email protected]>
1 parent 4339c0a commit 269055e

File tree

15 files changed

+400
-119
lines changed

15 files changed

+400
-119
lines changed

sdk/python/feast/diff/registry_diff.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from feast.protos.feast.core.ValidationProfile_pb2 import (
2727
ValidationReference as ValidationReferenceProto,
2828
)
29-
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
29+
from feast.registry import FEAST_OBJECT_TYPES, BaseRegistry, FeastObjectType
3030
from feast.repo_contents import RepoContents
3131

3232

@@ -161,7 +161,7 @@ def diff_registry_objects(
161161

162162

163163
def extract_objects_for_keep_delete_update_add(
164-
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
164+
registry: BaseRegistry, current_project: str, desired_repo_contents: RepoContents,
165165
) -> Tuple[
166166
Dict[FeastObjectType, Set[FeastObject]],
167167
Dict[FeastObjectType, Set[FeastObject]],
@@ -208,7 +208,7 @@ def extract_objects_for_keep_delete_update_add(
208208

209209

210210
def diff_between(
211-
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
211+
registry: BaseRegistry, current_project: str, desired_repo_contents: RepoContents,
212212
) -> RegistryDiff:
213213
"""
214214
Returns the difference between the current and desired repo states.
@@ -267,7 +267,10 @@ def diff_between(
267267

268268

269269
def apply_diff_to_registry(
270-
registry: Registry, registry_diff: RegistryDiff, project: str, commit: bool = True
270+
registry: BaseRegistry,
271+
registry_diff: RegistryDiff,
272+
project: str,
273+
commit: bool = True,
271274
):
272275
"""
273276
Applies the given diff to the given Feast project in the registry.

sdk/python/feast/feature_logging.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
)
1818

1919
if TYPE_CHECKING:
20-
from feast import FeatureService
21-
from feast.registry import Registry
20+
from feast.feature_service import FeatureService
21+
from feast.registry import BaseRegistry
2222

2323

2424
REQUEST_ID_FIELD = "__request_id"
@@ -33,7 +33,7 @@ class LoggingSource:
3333
"""
3434

3535
@abc.abstractmethod
36-
def get_schema(self, registry: "Registry") -> pa.Schema:
36+
def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
3737
""" Generate schema for logs destination. """
3838
raise NotImplementedError
3939

@@ -48,7 +48,7 @@ def __init__(self, feature_service: "FeatureService", project: str):
4848
self._feature_service = feature_service
4949
self._project = project
5050

51-
def get_schema(self, registry: "Registry") -> pa.Schema:
51+
def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
5252
fields: Dict[str, pa.DataType] = {}
5353

5454
for projection in self._feature_service.feature_view_projections:

sdk/python/feast/feature_store.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,13 @@
7474
from feast.infra.registry_stores.sql import SqlRegistry
7575
from feast.on_demand_feature_view import OnDemandFeatureView
7676
from feast.online_response import OnlineResponse
77-
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
7877
from feast.protos.feast.serving.ServingService_pb2 import (
7978
FieldStatus,
8079
GetOnlineFeaturesResponse,
8180
)
8281
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
8382
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
84-
from feast.registry import Registry
83+
from feast.registry import BaseRegistry, Registry
8584
from feast.repo_config import RepoConfig, load_repo_config
8685
from feast.repo_contents import RepoContents
8786
from feast.request_feature_view import RequestFeatureView
@@ -113,7 +112,7 @@ class FeatureStore:
113112

114113
config: RepoConfig
115114
repo_path: Path
116-
_registry: Registry
115+
_registry: BaseRegistry
117116
_provider: Provider
118117
_go_server: "EmbeddedOnlineFeatureServer"
119118

@@ -142,8 +141,9 @@ def __init__(
142141
if registry_config.registry_type == "sql":
143142
self._registry = SqlRegistry(registry_config, None)
144143
else:
145-
self._registry = Registry(registry_config, repo_path=self.repo_path)
146-
self._registry._initialize_registry()
144+
r = Registry(registry_config, repo_path=self.repo_path)
145+
r._initialize_registry()
146+
self._registry = r
147147
self._provider = get_provider(self.config, self.repo_path)
148148
self._go_server = None
149149

@@ -153,7 +153,7 @@ def version(self) -> str:
153153
return get_version()
154154

155155
@property
156-
def registry(self) -> Registry:
156+
def registry(self) -> BaseRegistry:
157157
"""Gets the registry of this feature store."""
158158
return self._registry
159159

@@ -644,12 +644,7 @@ def _plan(
644644
# Compute the desired difference between the current infra, as stored in the registry,
645645
# and the desired infra.
646646
self._registry.refresh()
647-
current_infra_proto = (
648-
self._registry.cached_registry_proto.infra.__deepcopy__()
649-
if hasattr(self._registry, "cached_registry_proto")
650-
and self._registry.cached_registry_proto
651-
else InfraProto()
652-
)
647+
current_infra_proto = self._registry.proto().infra.__deepcopy__()
653648
desired_registry_proto = desired_repo_contents.to_registry_proto()
654649
new_infra = self._provider.plan_infra(self.config, desired_registry_proto)
655650
new_infra_proto = new_infra.to_proto()

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
RetrievalMetadata,
4040
)
4141
from feast.on_demand_feature_view import OnDemandFeatureView
42-
from feast.registry import Registry
42+
from feast.registry import BaseRegistry
4343
from feast.repo_config import FeastConfigBaseModel, RepoConfig
4444

4545
from ...saved_dataset import SavedDatasetStorage
@@ -169,7 +169,7 @@ def get_historical_features(
169169
feature_views: List[FeatureView],
170170
feature_refs: List[str],
171171
entity_df: Union[pd.DataFrame, str],
172-
registry: Registry,
172+
registry: BaseRegistry,
173173
project: str,
174174
full_feature_names: bool = False,
175175
) -> RetrievalJob:
@@ -262,7 +262,7 @@ def write_logged_features(
262262
data: Union[pyarrow.Table, Path],
263263
source: LoggingSource,
264264
logging_config: LoggingConfig,
265-
registry: Registry,
265+
registry: BaseRegistry,
266266
):
267267
destination = logging_config.destination
268268
assert isinstance(destination, BigQueryLoggingDestination)

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
_get_requested_feature_views_to_features_dict,
3333
_run_dask_field_mapping,
3434
)
35-
from feast.registry import Registry
35+
from feast.registry import BaseRegistry
3636
from feast.repo_config import FeastConfigBaseModel, RepoConfig
3737
from feast.saved_dataset import SavedDatasetStorage
3838
from feast.usage import log_exceptions_and_usage
@@ -113,7 +113,7 @@ def get_historical_features(
113113
feature_views: List[FeatureView],
114114
feature_refs: List[str],
115115
entity_df: Union[pd.DataFrame, str],
116-
registry: Registry,
116+
registry: BaseRegistry,
117117
project: str,
118118
full_feature_names: bool = False,
119119
) -> RetrievalJob:
@@ -380,7 +380,7 @@ def write_logged_features(
380380
data: Union[pyarrow.Table, Path],
381381
source: LoggingSource,
382382
logging_config: LoggingConfig,
383-
registry: Registry,
383+
registry: BaseRegistry,
384384
):
385385
destination = logging_config.destination
386386
assert isinstance(destination, FileLoggingDestination)

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from feast.feature_logging import LoggingConfig, LoggingSource
2626
from feast.feature_view import FeatureView
2727
from feast.on_demand_feature_view import OnDemandFeatureView
28-
from feast.registry import Registry
28+
from feast.registry import BaseRegistry
2929
from feast.repo_config import RepoConfig
3030
from feast.saved_dataset import SavedDatasetStorage
3131

@@ -211,7 +211,7 @@ def get_historical_features(
211211
feature_views: List[FeatureView],
212212
feature_refs: List[str],
213213
entity_df: Union[pd.DataFrame, str],
214-
registry: Registry,
214+
registry: BaseRegistry,
215215
project: str,
216216
full_feature_names: bool = False,
217217
) -> RetrievalJob:
@@ -252,7 +252,7 @@ def write_logged_features(
252252
data: Union[pyarrow.Table, Path],
253253
source: LoggingSource,
254254
logging_config: LoggingConfig,
255-
registry: Registry,
255+
registry: BaseRegistry,
256256
):
257257
"""
258258
Write logged features to a specified destination (taken from logging_config) in the offline store.

sdk/python/feast/infra/offline_stores/offline_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from jinja2 import BaseLoader, Environment
99
from pandas import Timestamp
1010

11-
import feast
1211
from feast.errors import (
1312
EntityTimestampInferenceException,
1413
FeastEntityDFMissingColumnsError,
@@ -17,7 +16,7 @@
1716
from feast.importer import import_class
1817
from feast.infra.offline_stores.offline_store import OfflineStore
1918
from feast.infra.provider import _get_requested_feature_views_to_features_dict
20-
from feast.registry import Registry
19+
from feast.registry import BaseRegistry
2120
from feast.utils import to_naive_utc
2221

2322
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
@@ -55,8 +54,9 @@ def assert_expected_columns_in_entity_df(
5554
raise FeastEntityDFMissingColumnsError(expected_columns, missing_keys)
5655

5756

57+
# TODO: Remove project and registry from the interface and call sites.
5858
def get_expected_join_keys(
59-
project: str, feature_views: List["feast.FeatureView"], registry: Registry
59+
project: str, feature_views: List[FeatureView], registry: BaseRegistry
6060
) -> Set[str]:
6161
join_keys = set()
6262
for feature_view in feature_views:
@@ -95,7 +95,7 @@ class FeatureViewQueryContext:
9595
def get_feature_view_query_context(
9696
feature_refs: List[str],
9797
feature_views: List[FeatureView],
98-
registry: Registry,
98+
registry: BaseRegistry,
9999
project: str,
100100
entity_df_timestamp_range: Tuple[datetime, datetime],
101101
) -> List[FeatureViewQueryContext]:

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
SavedDatasetRedshiftStorage,
3939
)
4040
from feast.infra.utils import aws_utils
41-
from feast.registry import Registry
41+
from feast.registry import BaseRegistry
4242
from feast.repo_config import FeastConfigBaseModel, RepoConfig
4343
from feast.saved_dataset import SavedDatasetStorage
4444
from feast.usage import log_exceptions_and_usage
@@ -176,7 +176,7 @@ def get_historical_features(
176176
feature_views: List[FeatureView],
177177
feature_refs: List[str],
178178
entity_df: Union[pd.DataFrame, str],
179-
registry: Registry,
179+
registry: BaseRegistry,
180180
project: str,
181181
full_feature_names: bool = False,
182182
) -> RetrievalJob:
@@ -269,7 +269,7 @@ def write_logged_features(
269269
data: Union[pyarrow.Table, Path],
270270
source: LoggingSource,
271271
logging_config: LoggingConfig,
272-
registry: Registry,
272+
registry: BaseRegistry,
273273
):
274274
destination = logging_config.destination
275275
assert isinstance(destination, RedshiftLoggingDestination)

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
write_pandas,
4545
write_parquet,
4646
)
47-
from feast.registry import Registry
47+
from feast.registry import BaseRegistry
4848
from feast.repo_config import FeastConfigBaseModel, RepoConfig
4949
from feast.saved_dataset import SavedDatasetStorage
5050
from feast.usage import log_exceptions_and_usage
@@ -206,7 +206,7 @@ def get_historical_features(
206206
feature_views: List[FeatureView],
207207
feature_refs: List[str],
208208
entity_df: Union[pd.DataFrame, str],
209-
registry: Registry,
209+
registry: BaseRegistry,
210210
project: str,
211211
full_feature_names: bool = False,
212212
) -> RetrievalJob:
@@ -284,7 +284,7 @@ def write_logged_features(
284284
data: Union[pyarrow.Table, Path],
285285
source: LoggingSource,
286286
logging_config: LoggingConfig,
287-
registry: Registry,
287+
registry: BaseRegistry,
288288
):
289289
assert isinstance(logging_config.destination, SnowflakeLoggingDestination)
290290

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
)
2222
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2323
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
24-
from feast.registry import Registry
24+
from feast.registry import BaseRegistry
2525
from feast.repo_config import RepoConfig
2626
from feast.saved_dataset import SavedDataset
2727
from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute
@@ -138,7 +138,7 @@ def materialize_single_feature_view(
138138
feature_view: FeatureView,
139139
start_date: datetime,
140140
end_date: datetime,
141-
registry: Registry,
141+
registry: BaseRegistry,
142142
project: str,
143143
tqdm_builder: Callable[[int], tqdm],
144144
) -> None:
@@ -194,7 +194,7 @@ def get_historical_features(
194194
feature_views: List[FeatureView],
195195
feature_refs: List[str],
196196
entity_df: Union[pandas.DataFrame, str],
197-
registry: Registry,
197+
registry: BaseRegistry,
198198
project: str,
199199
full_feature_names: bool,
200200
) -> RetrievalJob:
@@ -240,7 +240,7 @@ def write_feature_service_logs(
240240
feature_service: FeatureService,
241241
logs: Union[pyarrow.Table, str],
242242
config: RepoConfig,
243-
registry: Registry,
243+
registry: BaseRegistry,
244244
):
245245
assert (
246246
feature_service.logging_config is not None
@@ -260,7 +260,7 @@ def retrieve_feature_service_logs(
260260
start_date: datetime,
261261
end_date: datetime,
262262
config: RepoConfig,
263-
registry: Registry,
263+
registry: BaseRegistry,
264264
) -> RetrievalJob:
265265
assert (
266266
feature_service.logging_config is not None

0 commit comments

Comments
 (0)