Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ray.data.context import DatasetContext

from feast.data_source import DataSource
from feast.dataframe import DataFrameEngine, FeastDataFrame
from feast.errors import (
RequestDataNotFoundInEntityDfException,
SavedDatasetLocationAlreadyExists,
Expand Down Expand Up @@ -1037,6 +1038,28 @@ def to_arrow(
df = result.to_pandas()
return pa.Table.from_pandas(df)

def to_feast_df(
self,
validation_reference: Optional[ValidationReference] = None,
timeout: Optional[int] = None,
) -> FeastDataFrame:
"""
Return the result as a FeastDataFrame with Ray engine.

This preserves Ray's lazy execution by wrapping the Ray Dataset directly.
"""
# If we have on-demand feature views, fall back to base class Arrow implementation
if self.on_demand_feature_views:
return super().to_feast_df(validation_reference, timeout)

# Get the Ray Dataset directly (maintains lazy execution)
ray_ds = self._get_ray_dataset()

return FeastDataFrame(
data=ray_ds,
engine=DataFrameEngine.RAY,
)

def to_remote_storage(self) -> list[str]:
if not self._staging_location:
raise ValueError("Staging location must be set for remote materialization.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,20 @@
import warnings
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Union,
cast,
)

if TYPE_CHECKING:
from feast.saved_dataset import ValidationReference

import numpy as np
import pandas
Expand All @@ -18,6 +31,7 @@

from feast import FeatureView, OnDemandFeatureView
from feast.data_source import DataSource
from feast.dataframe import DataFrameEngine, FeastDataFrame
from feast.errors import EntitySQLEmptyResults, InvalidEntityType
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL
from feast.infra.offline_stores import offline_utils
Expand Down Expand Up @@ -389,6 +403,23 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
"""Return dataset as pyarrow Table synchronously"""
return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout))

def to_feast_df(
self,
validation_reference: Optional["ValidationReference"] = None,
timeout: Optional[int] = None,
) -> FeastDataFrame:
"""
Return the result as a FeastDataFrame with Spark engine.

This preserves Spark's lazy execution by wrapping the Spark DataFrame directly.
"""
# Get the Spark DataFrame directly (maintains lazy execution)
spark_df = self.to_spark_df()
return FeastDataFrame(
data=spark_df,
engine=DataFrameEngine.SPARK,
)

def persist(
self,
storage: SavedDatasetStorage,
Expand Down
44 changes: 44 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from feast import flags_helper
from feast.data_source import DataSource
from feast.dataframe import DataFrameEngine, FeastDataFrame
from feast.dqm.errors import ValidationFailed
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -93,6 +94,49 @@ def to_df(
.reset_index(drop=True)
)

def to_feast_df(
self,
validation_reference: Optional["ValidationReference"] = None,
timeout: Optional[int] = None,
) -> FeastDataFrame:
"""
Synchronously executes the underlying query and returns the result as a FeastDataFrame.

This is the new primary method that returns FeastDataFrame with proper engine detection.
On demand transformations will be executed. If a validation reference is provided, the dataframe
will be validated.

Args:
validation_reference (optional): The validation to apply against the retrieved dataframe.
timeout (optional): The query timeout if applicable.
"""
# Get Arrow table as before
arrow_table = self.to_arrow(
validation_reference=validation_reference, timeout=timeout
)

# Prepare metadata
metadata = {}

# Add features to metadata if available
if hasattr(self, "features"):
metadata["features"] = self.features
else:
metadata["features"] = []

# Add on-demand feature views to metadata
if hasattr(self, "on_demand_feature_views") and self.on_demand_feature_views:
metadata["on_demand_feature_views"] = [
odfv.name for odfv in self.on_demand_feature_views
]
else:
metadata["on_demand_feature_views"] = []

# Wrap in FeastDataFrame with Arrow engine and metadata
return FeastDataFrame(
data=arrow_table, engine=DataFrameEngine.ARROW, metadata=metadata
)

def to_arrow(
self,
validation_reference: Optional["ValidationReference"] = None,
Expand Down
168 changes: 168 additions & 0 deletions sdk/python/tests/unit/test_retrieval_job_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Tests for RetrievalJob FeastDataFrame integration."""

from unittest.mock import Mock

import pandas as pd
import pyarrow as pa

from feast.dataframe import DataFrameEngine, FeastDataFrame
from feast.infra.offline_stores.offline_store import RetrievalJob


class MockRetrievalJob(RetrievalJob):
"""Mock RetrievalJob for testing."""

def __init__(
self, arrow_table: pa.Table, features: list = None, odfvs: list = None
):
self.arrow_table = arrow_table
self.features = features or []
self.odfvs = odfvs or []

def _to_arrow_internal(self, timeout=None):
return self.arrow_table

@property
def full_feature_names(self):
return False

@property
def on_demand_feature_views(self):
return self.odfvs


class TestRetrievalJobFeastDataFrame:
"""Test RetrievalJob FeastDataFrame integration."""

def test_to_feast_df_basic(self):
"""Test basic to_feast_df functionality."""
# Create test data
test_data = pa.table(
{
"feature1": [1, 2, 3],
"feature2": ["a", "b", "c"],
"timestamp": pd.to_datetime(["2023-01-01", "2023-01-02", "2023-01-03"]),
}
)

# Create mock retrieval job
job = MockRetrievalJob(test_data, features=["feature1", "feature2"])

# Test to_feast_df
feast_df = job.to_feast_df()

# Assertions
assert isinstance(feast_df, FeastDataFrame)
assert feast_df.engine == DataFrameEngine.ARROW
assert isinstance(feast_df.data, pa.Table)
assert feast_df.data.num_rows == 3
assert feast_df.data.num_columns == 3

def test_to_feast_df_metadata(self):
"""Test to_feast_df metadata population."""
# Create test data
test_data = pa.table({"feature1": [1, 2, 3], "feature2": [4.0, 5.0, 6.0]})

# Create mock on-demand feature views
mock_odfv1 = Mock()
mock_odfv1.name = "odfv1"
# Mock transform_arrow to return an empty table (no new columns added)
mock_odfv1.transform_arrow.return_value = pa.table({})

mock_odfv2 = Mock()
mock_odfv2.name = "odfv2"
# Mock transform_arrow to return an empty table (no new columns added)
mock_odfv2.transform_arrow.return_value = pa.table({})

# Create mock retrieval job with features and ODFVs
job = MockRetrievalJob(
test_data, features=["feature1", "feature2"], odfvs=[mock_odfv1, mock_odfv2]
)

# Test to_feast_df
feast_df = job.to_feast_df()

# Check metadata
assert "features" in feast_df.metadata
assert "on_demand_feature_views" in feast_df.metadata
assert feast_df.metadata["features"] == ["feature1", "feature2"]
assert feast_df.metadata["on_demand_feature_views"] == ["odfv1", "odfv2"]

def test_to_feast_df_with_timeout(self):
"""Test to_feast_df with timeout parameter."""
test_data = pa.table({"feature1": [1, 2, 3]})
job = MockRetrievalJob(test_data)

# Test with timeout - should not raise any errors
feast_df = job.to_feast_df(timeout=30)

assert isinstance(feast_df, FeastDataFrame)
assert feast_df.engine == DataFrameEngine.ARROW

def test_to_feast_df_empty_metadata(self):
"""Test to_feast_df with empty features and ODFVs."""
test_data = pa.table({"feature1": [1, 2, 3]})
job = MockRetrievalJob(test_data) # No features or ODFVs provided

feast_df = job.to_feast_df()

# Should handle missing features gracefully
assert feast_df.metadata["features"] == []
assert feast_df.metadata["on_demand_feature_views"] == []

def test_to_feast_df_preserves_arrow_data(self):
"""Test that to_feast_df preserves the original Arrow data."""
# Create test data with specific types
test_data = pa.table(
{
"int_feature": pa.array([1, 2, 3], type=pa.int64()),
"float_feature": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
"string_feature": pa.array(["a", "b", "c"], type=pa.string()),
"bool_feature": pa.array([True, False, True], type=pa.bool_()),
}
)

job = MockRetrievalJob(test_data)
feast_df = job.to_feast_df()

# Check that the Arrow data is exactly the same
assert feast_df.data.equals(test_data)
assert feast_df.data.schema == test_data.schema

# Check column names and types are preserved
assert feast_df.data.column_names == test_data.column_names
for i, column in enumerate(test_data.schema):
assert feast_df.data.schema.field(i).type == column.type

def test_to_df_still_works(self):
"""Test that the original to_df method still works unchanged."""
test_data = pa.table({"feature1": [1, 2, 3], "feature2": ["a", "b", "c"]})

job = MockRetrievalJob(test_data)

# Test to_df returns pandas DataFrame
df = job.to_df()

assert isinstance(df, pd.DataFrame)
assert len(df) == 3
assert list(df.columns) == ["feature1", "feature2"]
assert df["feature1"].tolist() == [1, 2, 3]
assert df["feature2"].tolist() == ["a", "b", "c"]

def test_both_methods_return_same_data(self):
"""Test that to_df and to_feast_df return equivalent data."""
test_data = pa.table(
{"feature1": [1, 2, 3, 4], "feature2": [10.5, 20.5, 30.5, 40.5]}
)

job = MockRetrievalJob(test_data)

# Get data from both methods
df = job.to_df()
feast_df = job.to_feast_df()

# Convert FeastDataFrame to pandas for comparison
feast_as_pandas = feast_df.data.to_pandas().reset_index(drop=True)

# Should be equivalent
pd.testing.assert_frame_equal(df, feast_as_pandas)
Loading