Skip to content

Commit 1d08786

Browse files
authored
feat: Feast dataframe phase2 (#5612)
* add to_feast_df Signed-off-by: HaoXuAI <[email protected]> * add test Signed-off-by: HaoXuAI <[email protected]> * update tests Signed-off-by: HaoXuAI <[email protected]> --------- Signed-off-by: HaoXuAI <[email protected]>
1 parent 2ce4198 commit 1d08786

File tree

4 files changed

+267
-1
lines changed

4 files changed

+267
-1
lines changed

sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from ray.data.context import DatasetContext
1717

1818
from feast.data_source import DataSource
19+
from feast.dataframe import DataFrameEngine, FeastDataFrame
1920
from feast.errors import (
2021
RequestDataNotFoundInEntityDfException,
2122
SavedDatasetLocationAlreadyExists,
@@ -1037,6 +1038,28 @@ def to_arrow(
10371038
df = result.to_pandas()
10381039
return pa.Table.from_pandas(df)
10391040

1041+
def to_feast_df(
1042+
self,
1043+
validation_reference: Optional[ValidationReference] = None,
1044+
timeout: Optional[int] = None,
1045+
) -> FeastDataFrame:
1046+
"""
1047+
Return the result as a FeastDataFrame with Ray engine.
1048+
1049+
This preserves Ray's lazy execution by wrapping the Ray Dataset directly.
1050+
"""
1051+
# If we have on-demand feature views, fall back to base class Arrow implementation
1052+
if self.on_demand_feature_views:
1053+
return super().to_feast_df(validation_reference, timeout)
1054+
1055+
# Get the Ray Dataset directly (maintains lazy execution)
1056+
ray_ds = self._get_ray_dataset()
1057+
1058+
return FeastDataFrame(
1059+
data=ray_ds,
1060+
engine=DataFrameEngine.RAY,
1061+
)
1062+
10401063
def to_remote_storage(self) -> list[str]:
10411064
if not self._staging_location:
10421065
raise ValueError("Staging location must be set for remote materialization.")

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,20 @@
44
import warnings
55
from dataclasses import asdict, dataclass
66
from datetime import datetime, timezone
7-
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
7+
from typing import (
8+
TYPE_CHECKING,
9+
Any,
10+
Callable,
11+
Dict,
12+
List,
13+
Optional,
14+
Tuple,
15+
Union,
16+
cast,
17+
)
18+
19+
if TYPE_CHECKING:
20+
from feast.saved_dataset import ValidationReference
821

922
import numpy as np
1023
import pandas
@@ -18,6 +31,7 @@
1831

1932
from feast import FeatureView, OnDemandFeatureView
2033
from feast.data_source import DataSource
34+
from feast.dataframe import DataFrameEngine, FeastDataFrame
2135
from feast.errors import EntitySQLEmptyResults, InvalidEntityType
2236
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL
2337
from feast.infra.offline_stores import offline_utils
@@ -389,6 +403,23 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
389403
"""Return dataset as pyarrow Table synchronously"""
390404
return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout))
391405

406+
def to_feast_df(
407+
self,
408+
validation_reference: Optional["ValidationReference"] = None,
409+
timeout: Optional[int] = None,
410+
) -> FeastDataFrame:
411+
"""
412+
Return the result as a FeastDataFrame with Spark engine.
413+
414+
This preserves Spark's lazy execution by wrapping the Spark DataFrame directly.
415+
"""
416+
# Get the Spark DataFrame directly (maintains lazy execution)
417+
spark_df = self.to_spark_df()
418+
return FeastDataFrame(
419+
data=spark_df,
420+
engine=DataFrameEngine.SPARK,
421+
)
422+
392423
def persist(
393424
self,
394425
storage: SavedDatasetStorage,

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from feast import flags_helper
3434
from feast.data_source import DataSource
35+
from feast.dataframe import DataFrameEngine, FeastDataFrame
3536
from feast.dqm.errors import ValidationFailed
3637
from feast.feature_logging import LoggingConfig, LoggingSource
3738
from feast.feature_view import FeatureView
@@ -93,6 +94,49 @@ def to_df(
9394
.reset_index(drop=True)
9495
)
9596

97+
def to_feast_df(
98+
self,
99+
validation_reference: Optional["ValidationReference"] = None,
100+
timeout: Optional[int] = None,
101+
) -> FeastDataFrame:
102+
"""
103+
Synchronously executes the underlying query and returns the result as a FeastDataFrame.
104+
105+
This is the new primary method that returns FeastDataFrame with proper engine detection.
106+
On demand transformations will be executed. If a validation reference is provided, the dataframe
107+
will be validated.
108+
109+
Args:
110+
validation_reference (optional): The validation to apply against the retrieved dataframe.
111+
timeout (optional): The query timeout if applicable.
112+
"""
113+
# Get Arrow table as before
114+
arrow_table = self.to_arrow(
115+
validation_reference=validation_reference, timeout=timeout
116+
)
117+
118+
# Prepare metadata
119+
metadata = {}
120+
121+
# Add features to metadata if available
122+
if hasattr(self, "features"):
123+
metadata["features"] = self.features
124+
else:
125+
metadata["features"] = []
126+
127+
# Add on-demand feature views to metadata
128+
if hasattr(self, "on_demand_feature_views") and self.on_demand_feature_views:
129+
metadata["on_demand_feature_views"] = [
130+
odfv.name for odfv in self.on_demand_feature_views
131+
]
132+
else:
133+
metadata["on_demand_feature_views"] = []
134+
135+
# Wrap in FeastDataFrame with Arrow engine and metadata
136+
return FeastDataFrame(
137+
data=arrow_table, engine=DataFrameEngine.ARROW, metadata=metadata
138+
)
139+
96140
def to_arrow(
97141
self,
98142
validation_reference: Optional["ValidationReference"] = None,
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""Tests for RetrievalJob FeastDataFrame integration."""
2+
3+
from unittest.mock import Mock
4+
5+
import pandas as pd
6+
import pyarrow as pa
7+
8+
from feast.dataframe import DataFrameEngine, FeastDataFrame
9+
from feast.infra.offline_stores.offline_store import RetrievalJob
10+
11+
12+
class MockRetrievalJob(RetrievalJob):
13+
"""Mock RetrievalJob for testing."""
14+
15+
def __init__(
16+
self, arrow_table: pa.Table, features: list = None, odfvs: list = None
17+
):
18+
self.arrow_table = arrow_table
19+
self.features = features or []
20+
self.odfvs = odfvs or []
21+
22+
def _to_arrow_internal(self, timeout=None):
23+
return self.arrow_table
24+
25+
@property
26+
def full_feature_names(self):
27+
return False
28+
29+
@property
30+
def on_demand_feature_views(self):
31+
return self.odfvs
32+
33+
34+
class TestRetrievalJobFeastDataFrame:
35+
"""Test RetrievalJob FeastDataFrame integration."""
36+
37+
def test_to_feast_df_basic(self):
38+
"""Test basic to_feast_df functionality."""
39+
# Create test data
40+
test_data = pa.table(
41+
{
42+
"feature1": [1, 2, 3],
43+
"feature2": ["a", "b", "c"],
44+
"timestamp": pd.to_datetime(["2023-01-01", "2023-01-02", "2023-01-03"]),
45+
}
46+
)
47+
48+
# Create mock retrieval job
49+
job = MockRetrievalJob(test_data, features=["feature1", "feature2"])
50+
51+
# Test to_feast_df
52+
feast_df = job.to_feast_df()
53+
54+
# Assertions
55+
assert isinstance(feast_df, FeastDataFrame)
56+
assert feast_df.engine == DataFrameEngine.ARROW
57+
assert isinstance(feast_df.data, pa.Table)
58+
assert feast_df.data.num_rows == 3
59+
assert feast_df.data.num_columns == 3
60+
61+
def test_to_feast_df_metadata(self):
62+
"""Test to_feast_df metadata population."""
63+
# Create test data
64+
test_data = pa.table({"feature1": [1, 2, 3], "feature2": [4.0, 5.0, 6.0]})
65+
66+
# Create mock on-demand feature views
67+
mock_odfv1 = Mock()
68+
mock_odfv1.name = "odfv1"
69+
# Mock transform_arrow to return an empty table (no new columns added)
70+
mock_odfv1.transform_arrow.return_value = pa.table({})
71+
72+
mock_odfv2 = Mock()
73+
mock_odfv2.name = "odfv2"
74+
# Mock transform_arrow to return an empty table (no new columns added)
75+
mock_odfv2.transform_arrow.return_value = pa.table({})
76+
77+
# Create mock retrieval job with features and ODFVs
78+
job = MockRetrievalJob(
79+
test_data, features=["feature1", "feature2"], odfvs=[mock_odfv1, mock_odfv2]
80+
)
81+
82+
# Test to_feast_df
83+
feast_df = job.to_feast_df()
84+
85+
# Check metadata
86+
assert "features" in feast_df.metadata
87+
assert "on_demand_feature_views" in feast_df.metadata
88+
assert feast_df.metadata["features"] == ["feature1", "feature2"]
89+
assert feast_df.metadata["on_demand_feature_views"] == ["odfv1", "odfv2"]
90+
91+
def test_to_feast_df_with_timeout(self):
92+
"""Test to_feast_df with timeout parameter."""
93+
test_data = pa.table({"feature1": [1, 2, 3]})
94+
job = MockRetrievalJob(test_data)
95+
96+
# Test with timeout - should not raise any errors
97+
feast_df = job.to_feast_df(timeout=30)
98+
99+
assert isinstance(feast_df, FeastDataFrame)
100+
assert feast_df.engine == DataFrameEngine.ARROW
101+
102+
def test_to_feast_df_empty_metadata(self):
103+
"""Test to_feast_df with empty features and ODFVs."""
104+
test_data = pa.table({"feature1": [1, 2, 3]})
105+
job = MockRetrievalJob(test_data) # No features or ODFVs provided
106+
107+
feast_df = job.to_feast_df()
108+
109+
# Should handle missing features gracefully
110+
assert feast_df.metadata["features"] == []
111+
assert feast_df.metadata["on_demand_feature_views"] == []
112+
113+
def test_to_feast_df_preserves_arrow_data(self):
114+
"""Test that to_feast_df preserves the original Arrow data."""
115+
# Create test data with specific types
116+
test_data = pa.table(
117+
{
118+
"int_feature": pa.array([1, 2, 3], type=pa.int64()),
119+
"float_feature": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
120+
"string_feature": pa.array(["a", "b", "c"], type=pa.string()),
121+
"bool_feature": pa.array([True, False, True], type=pa.bool_()),
122+
}
123+
)
124+
125+
job = MockRetrievalJob(test_data)
126+
feast_df = job.to_feast_df()
127+
128+
# Check that the Arrow data is exactly the same
129+
assert feast_df.data.equals(test_data)
130+
assert feast_df.data.schema == test_data.schema
131+
132+
# Check column names and types are preserved
133+
assert feast_df.data.column_names == test_data.column_names
134+
for i, column in enumerate(test_data.schema):
135+
assert feast_df.data.schema.field(i).type == column.type
136+
137+
def test_to_df_still_works(self):
138+
"""Test that the original to_df method still works unchanged."""
139+
test_data = pa.table({"feature1": [1, 2, 3], "feature2": ["a", "b", "c"]})
140+
141+
job = MockRetrievalJob(test_data)
142+
143+
# Test to_df returns pandas DataFrame
144+
df = job.to_df()
145+
146+
assert isinstance(df, pd.DataFrame)
147+
assert len(df) == 3
148+
assert list(df.columns) == ["feature1", "feature2"]
149+
assert df["feature1"].tolist() == [1, 2, 3]
150+
assert df["feature2"].tolist() == ["a", "b", "c"]
151+
152+
def test_both_methods_return_same_data(self):
153+
"""Test that to_df and to_feast_df return equivalent data."""
154+
test_data = pa.table(
155+
{"feature1": [1, 2, 3, 4], "feature2": [10.5, 20.5, 30.5, 40.5]}
156+
)
157+
158+
job = MockRetrievalJob(test_data)
159+
160+
# Get data from both methods
161+
df = job.to_df()
162+
feast_df = job.to_feast_df()
163+
164+
# Convert FeastDataFrame to pandas for comparison
165+
feast_as_pandas = feast_df.data.to_pandas().reset_index(drop=True)
166+
167+
# Should be equivalent
168+
pd.testing.assert_frame_equal(df, feast_as_pandas)

0 commit comments

Comments
 (0)