Skip to content

Commit a88cd30

Browse files
authored
feat: Offline push endpoint for pushing to offline stores (#2837)
* Skaffolding for offline store push Signed-off-by: Kevin Zhang <[email protected]> * LInt Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * File source offline push Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Address review comments Signed-off-by: Kevin Zhang <[email protected]> * Add redshift function Signed-off-by: Kevin Zhang <[email protected]> * Add redshift Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Lint Signed-off-by: Kevin Zhang <[email protected]> * fix Signed-off-by: Kevin Zhang <[email protected]> * fix Signed-off-by: Kevin Zhang <[email protected]> * Fix test Signed-off-by: Kevin Zhang <[email protected]> * Fix test Signed-off-by: Kevin Zhang <[email protected]> * Fix test Signed-off-by: Kevin Zhang <[email protected]> * Fix interface Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Update Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Fix rebase Signed-off-by: Kevin Zhang <[email protected]> * Fix naming Signed-off-by: Kevin Zhang <[email protected]> * Fix Signed-off-by: Kevin Zhang <[email protected]> * Uncomment Signed-off-by: Kevin Zhang <[email protected]>
1 parent fde7075 commit a88cd30

File tree

7 files changed

+113
-12
lines changed

7 files changed

+113
-12
lines changed

docs/reference/feature-servers/python-feature-server.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Overview
44

5-
The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.
5+
The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.
66

77
## CLI
88

@@ -155,6 +155,10 @@ curl -X POST \
155155
### Pushing features to the online store
156156
You can push data corresponding to a push source to the online store (note that timestamps need to be strings):
157157

158+
You can also define a pushmode to push offline data, either to the online store, offline store, or both. The feature server will throw an error if the online/offline
159+
store doesn't support the push api functionality.
160+
161+
The request definition for pushmode is a string parameter `to` where the options are: ["online", "offline", "both"].
158162
```text
159163
curl -X POST "http://localhost:6566/push" -d '{
160164
"push_source_name": "driver_hourly_stats_push_source",
@@ -187,9 +191,10 @@ event_dict = {
187191
}
188192
push_data = {
189193
"push_source_name":"driver_stats_push_source",
190-
"df":event_dict
194+
"df":event_dict,
195+
"to":"online",
191196
}
192197
requests.post(
193-
"http://localhost:6566/push",
198+
"http://localhost:6566/push",
194199
data=json.dumps(push_data))
195200
```

sdk/python/feast/data_source.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,12 @@ def to_proto(self) -> DataSourceProto:
914914
return data_source_proto
915915

916916

917+
class PushMode(enum.Enum):
918+
ONLINE = 1
919+
OFFLINE = 2
920+
ONLINE_AND_OFFLINE = 3
921+
922+
917923
@typechecked
918924
class PushSource(DataSource):
919925
"""

sdk/python/feast/feature_server.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import feast
1414
from feast import proto_json
15+
from feast.data_source import PushMode
1516
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
1617

1718

@@ -26,6 +27,7 @@ class PushFeaturesRequest(BaseModel):
2627
push_source_name: str
2728
df: dict
2829
allow_registry_cache: bool = True
30+
to: str = "online"
2931

3032

3133
def get_app(store: "feast.FeatureStore"):
@@ -80,10 +82,17 @@ def push(body=Depends(get_body)):
8082
try:
8183
request = PushFeaturesRequest(**json.loads(body))
8284
df = pd.DataFrame(request.df)
85+
if request.to == "offline":
86+
to = PushMode.OFFLINE
87+
elif request.to == "online":
88+
to = PushMode.ONLINE
89+
else:
90+
to = PushMode.ONLINE_AND_OFFLINE
8391
store.push(
8492
push_source_name=request.push_source_name,
8593
df=df,
8694
allow_registry_cache=request.allow_registry_cache,
95+
to=to,
8796
)
8897
except Exception as e:
8998
# Print the original exception on the server side

sdk/python/feast/feature_store.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from feast import feature_server, flags, flags_helper, ui_server, utils
4444
from feast.base_feature_view import BaseFeatureView
4545
from feast.batch_feature_view import BatchFeatureView
46-
from feast.data_source import DataSource
46+
from feast.data_source import DataSource, PushMode
4747
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
4848
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
4949
from feast.dqm.errors import ValidationFailed
@@ -1341,15 +1341,20 @@ def tqdm_builder(length):
13411341

13421342
@log_exceptions_and_usage
13431343
def push(
1344-
self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True
1344+
self,
1345+
push_source_name: str,
1346+
df: pd.DataFrame,
1347+
allow_registry_cache: bool = True,
1348+
to: PushMode = PushMode.ONLINE,
13451349
):
13461350
"""
13471351
Push features to a push source. This updates all the feature views that have the push source as stream source.
13481352
13491353
Args:
13501354
push_source_name: The name of the push source we want to push data to.
1351-
df: the data being pushed.
1352-
allow_registry_cache: whether to allow cached versions of the registry.
1355+
df: The data being pushed.
1356+
allow_registry_cache: Whether to allow cached versions of the registry.
1357+
to: Whether to push to online or offline store. Defaults to online store only.
13531358
"""
13541359
warnings.warn(
13551360
"Push source is an experimental feature. "
@@ -1373,9 +1378,14 @@ def push(
13731378
}
13741379

13751380
for fv in fvs_with_push_sources:
1376-
self.write_to_online_store(
1377-
fv.name, df, allow_registry_cache=allow_registry_cache
1378-
)
1381+
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
1382+
self.write_to_online_store(
1383+
fv.name, df, allow_registry_cache=allow_registry_cache
1384+
)
1385+
if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE:
1386+
self._write_to_offline_store(
1387+
fv.name, df, allow_registry_cache=allow_registry_cache
1388+
)
13791389

13801390
@log_exceptions_and_usage
13811391
def write_to_online_store(

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ def offline_write_batch(
110110
set_usage_attribute("provider", self.__class__.__name__)
111111

112112
if self.offline_store:
113-
self.offline_store.offline_write_batch(config, feature_view, data, progress)
113+
self.offline_store.__class__.offline_write_batch(
114+
config, feature_view, data, progress
115+
)
114116

115117
@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
116118
def online_read(

sdk/python/tests/integration/offline_store/test_offline_write.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
123123
Field(name="acc_rate", dtype=Float32),
124124
],
125125
source=data_sources.driver,
126-
ttl=timedelta(minutes=10),
126+
ttl=timedelta(
127+
minutes=10
128+
), # This is to make sure all offline store data is out of date since get_historical_features() only searches backwards for a ttl window.
127129
)
128130

129131
now = datetime.utcnow()
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import datetime
2+
3+
import numpy as np
4+
import pandas as pd
5+
import pytest
6+
7+
from feast.data_source import PushMode
8+
from tests.integration.feature_repos.repo_configuration import (
9+
construct_universal_feature_views,
10+
)
11+
from tests.integration.feature_repos.universal.entities import (
12+
customer,
13+
driver,
14+
location,
15+
)
16+
17+
18+
@pytest.mark.integration
19+
@pytest.mark.universal_offline_stores(only=["file", "redshift"])
20+
@pytest.mark.universal_online_stores(only=["sqlite"])
21+
def test_push_features_and_read_from_offline_store(environment, universal_data_sources):
22+
store = environment.feature_store
23+
24+
(_, _, data_sources) = universal_data_sources
25+
feature_views = construct_universal_feature_views(data_sources)
26+
now = pd.Timestamp(datetime.datetime.utcnow()).round("ms")
27+
28+
store.apply([driver(), customer(), location(), *feature_views.values()])
29+
entity_df = pd.DataFrame.from_dict({"location_id": [1], "event_timestamp": [now]})
30+
31+
before_df = store.get_historical_features(
32+
entity_df=entity_df,
33+
features=["pushable_location_stats:temperature"],
34+
full_feature_names=False,
35+
).to_df()
36+
37+
data = {
38+
"event_timestamp": [now],
39+
"location_id": [1],
40+
"temperature": [4],
41+
"created": [now],
42+
}
43+
df_ingest = pd.DataFrame(data)
44+
assert np.where(
45+
before_df["location_id"].reset_index(drop=True)
46+
== df_ingest["location_id"].reset_index(drop=True)
47+
)
48+
assert np.where(
49+
before_df["temperature"].reset_index(drop=True)
50+
!= df_ingest["temperature"].reset_index(drop=True)
51+
)
52+
53+
store.push("location_stats_push_source", df_ingest, to=PushMode.OFFLINE)
54+
55+
df = store.get_historical_features(
56+
entity_df=entity_df,
57+
features=["pushable_location_stats:temperature"],
58+
full_feature_names=False,
59+
).to_df()
60+
assert np.where(
61+
df["location_id"].reset_index(drop=True)
62+
== df_ingest["location_id"].reset_index(drop=True)
63+
)
64+
assert np.where(
65+
df["temperature"].reset_index(drop=True)
66+
== df_ingest["temperature"].reset_index(drop=True)
67+
)

0 commit comments

Comments
 (0)