Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions docs/reference/beta-on-demand-feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,40 @@ When defining an ODFV, you can specify the transformation mode using the `mode`

### Singleton Transformations in Native Python Mode

Native Python mode supports transformations on singleton dictionaries by setting `singleton=True`. This allows you to
write transformation functions that operate on a single row at a time, making the code more intuitive and aligning with
Native Python mode supports transformations on singleton dictionaries by setting `singleton=True`. This allows you to
write transformation functions that operate on a single row at a time, making the code more intuitive and aligning with
how data scientists typically think about data transformations.

## Aggregations

On Demand Feature Views support aggregations that compute aggregate statistics over groups of rows. When using aggregations, data is grouped by entity columns (e.g., `driver_id`) and aggregated before being passed to the transformation function.

**Important**: Aggregations and transformations are mutually exclusive. When aggregations are specified, they replace the transformation function.

### Usage

```python
from feast import Aggregation
from datetime import timedelta

@on_demand_feature_view(
sources=[driver_hourly_stats_view],
schema=[
Field(name="total_trips", dtype=Int64),
Field(name="avg_rating", dtype=Float64),
],
aggregations=[
Aggregation(column="trips", function="sum"),
Aggregation(column="rating", function="mean"),
],
)
def driver_aggregated_stats(inputs):
# No transformation function needed when using aggregations
pass
```

Aggregated columns are automatically named using the pattern `{function}_{column}` (e.g., `sum_trips`, `mean_rating`).

## Example
See [https://github.com/feast-dev/on-demand-feature-views-demo](https://github.com/feast-dev/on-demand-feature-views-demo) for an example on how to use on demand feature views.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,39 @@ Submodules
feast.infra.compute\_engines.local.backends.base module
-------------------------------------------------------

.. automodule:: feast.infra.compute_engines.local.backends.base
.. automodule:: feast.infra.compute_engines.backends.base
:members:
:undoc-members:
:show-inheritance:

feast.infra.compute\_engines.local.backends.factory module
----------------------------------------------------------

.. automodule:: feast.infra.compute_engines.local.backends.factory
.. automodule:: feast.infra.compute_engines.backends.factory
:members:
:undoc-members:
:show-inheritance:

feast.infra.compute\_engines.local.backends.pandas\_backend module
------------------------------------------------------------------

.. automodule:: feast.infra.compute_engines.local.backends.pandas_backend
.. automodule:: feast.infra.compute_engines.backends.pandas_backend
:members:
:undoc-members:
:show-inheritance:

feast.infra.compute\_engines.local.backends.polars\_backend module
------------------------------------------------------------------

.. automodule:: feast.infra.compute_engines.local.backends.polars_backend
.. automodule:: feast.infra.compute_engines.backends.polars_backend
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------

.. automodule:: feast.infra.compute_engines.local.backends
.. automodule:: feast.infra.compute_engines.backends
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Subpackages
.. toctree::
:maxdepth: 4

feast.infra.compute_engines.local.backends
feast.infra.compute_engines.backends

Submodules
----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import pandas as pd
import pyarrow

from feast.infra.compute_engines.local.backends.base import DataFrameBackend
from feast.infra.compute_engines.local.backends.pandas_backend import PandasBackend
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.backends.pandas_backend import PandasBackend


class BackendFactory:
Expand Down Expand Up @@ -46,7 +46,7 @@ def _is_polars(entity_df) -> bool:

@staticmethod
def _get_polars_backend():
from feast.infra.compute_engines.local.backends.polars_backend import (
from feast.infra.compute_engines.backends.polars_backend import (
PolarsBackend,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pandas as pd
import pyarrow as pa

from feast.infra.compute_engines.local.backends.base import DataFrameBackend
from feast.infra.compute_engines.backends.base import DataFrameBackend


class PandasBackend(DataFrameBackend):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import polars as pl
import pyarrow as pa

from feast.infra.compute_engines.local.backends.base import DataFrameBackend
from feast.infra.compute_engines.backends.base import DataFrameBackend


class PolarsBackend(DataFrameBackend):
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/compute_engines/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
MaterializationTask,
)
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.backends.factory import BackendFactory
from feast.infra.compute_engines.base import ComputeEngine
from feast.infra.compute_engines.dag.context import ExecutionContext
from feast.infra.compute_engines.local.backends.base import DataFrameBackend
from feast.infra.compute_engines.local.backends.factory import BackendFactory
from feast.infra.compute_engines.local.feature_builder import LocalFeatureBuilder
from feast.infra.compute_engines.local.job import (
LocalMaterializationJob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from feast.infra.common.materialization_job import MaterializationTask
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.feature_builder import FeatureBuilder
from feast.infra.compute_engines.local.backends.base import DataFrameBackend
from feast.infra.compute_engines.local.nodes import (
LocalAggregationNode,
LocalDedupNode,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/compute_engines/local/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

from feast import BatchFeatureView, StreamFeatureView
from feast.data_source import DataSource
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
from feast.infra.compute_engines.dag.model import DAGFormat
from feast.infra.compute_engines.dag.node import DAGNode
from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue
from feast.infra.compute_engines.local.backends.base import DataFrameBackend
from feast.infra.compute_engines.local.local_node import LocalNode
from feast.infra.compute_engines.utils import (
create_offline_store_retrieval_job,
Expand Down
97 changes: 96 additions & 1 deletion sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
RequestDataNotFoundInEntityRowsException,
)
from feast.field import Field
from feast.infra.compute_engines.backends.pandas_backend import PandasBackend
from feast.infra.key_encoding_utils import deserialize_entity_key
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
Expand Down Expand Up @@ -561,6 +562,76 @@ def construct_response_feature_vector(
)


def _get_aggregate_operations(agg_specs) -> dict:
"""
Convert Aggregation specs to agg_ops format for PandasBackend.

Reused from LocalFeatureBuilder logic.
TODO: This logic is duplicated from feast.infra.compute_engines.local.feature_builder.LocalFeatureBuilder._get_aggregate_operations().
Consider refactoring to a shared utility module in the future.
"""
agg_ops = {}
for agg in agg_specs:
if agg.time_window is not None:
raise ValueError(
"Time window aggregation is not supported in online serving."
)
alias = f"{agg.function}_{agg.column}"
agg_ops[alias] = (agg.function, agg.column)
return agg_ops


def _apply_aggregations_to_response(
response_data: Union[pyarrow.Table, Dict[str, List[Any]]],
aggregations,
group_keys: Optional[List[str]],
mode: str,
) -> Union[pyarrow.Table, Dict[str, List[Any]]]:
"""
Apply aggregations using PandasBackend.

Args:
response_data: Either a pyarrow.Table or dict of lists containing the data
aggregations: List of Aggregation objects to apply
group_keys: List of column names to group by (optional)
mode: Transformation mode ("python", "pandas", or "substrait")

Returns:
Aggregated data in the same format as input

TODO: Consider refactoring to support backends other than pandas in the future.
"""
if not aggregations:
return response_data

backend = PandasBackend()

# Convert to pandas DataFrame
if isinstance(response_data, dict):
df = pd.DataFrame(response_data)
else: # pyarrow.Table
df = backend.from_arrow(response_data)

if df.empty:
return response_data

# Convert aggregations to agg_ops format
agg_ops = _get_aggregate_operations(aggregations)

# Apply aggregations using PandasBackend
if group_keys:
result_df = backend.groupby_agg(df, group_keys, agg_ops)
else:
# No grouping - aggregate over entire dataset
result_df = backend.groupby_agg(df, [], agg_ops)

# Convert back to original format
if mode == "python":
return {col: result_df[col].tolist() for col in result_df.columns}
else: # pandas or substrait
return backend.to_arrow(result_df)


def _augment_response_with_on_demand_transforms(
online_features_response: GetOnlineFeaturesResponse,
feature_refs: List[str],
Expand Down Expand Up @@ -605,7 +676,31 @@ def _augment_response_with_on_demand_transforms(
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = requested_odfv_map[odfv_name]
if not odfv.write_to_online_store:
if odfv.mode == "python":
# Apply aggregations if configured.
if odfv.aggregations:
if odfv.mode == "python":
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently if this is a python input, we don't have aggregation function to use OOTD, so it use the pandas Dataframe backend to aggregate. I think in long term we need to create a real time compute engine, where we support python native compute operations such as aggregations, filtering, joining etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does OOTD mean ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOTB :)

if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
initial_response_dict = _apply_aggregations_to_response(
initial_response_dict,
odfv.aggregations,
odfv.entities,
odfv.mode,
)
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_arrow is None:
initial_response_arrow = initial_response.to_arrow()
initial_response_arrow = _apply_aggregations_to_response(
initial_response_arrow,
odfv.aggregations,
odfv.entities,
odfv.mode,
)

# Apply transformation. Note: aggregations and transformation configs are mutually exclusive
# TODO: Fix to make it work for having both aggregation and transformation
# ticket: https://github.com/feast-dev/feast/issues/5689
elif odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import pandas as pd
import pyarrow as pa

from feast.infra.compute_engines.backends.pandas_backend import PandasBackend
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue
from feast.infra.compute_engines.local.backends.pandas_backend import PandasBackend
from feast.infra.compute_engines.local.nodes import (
LocalAggregationNode,
LocalDedupNode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2025 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for OnDemandFeatureView aggregations in online serving."""

import pyarrow as pa

from feast.aggregation import Aggregation
from feast.utils import _apply_aggregations_to_response


def test_aggregation_python_mode():
"""Test aggregations in Python mode (dict format)."""
data = {
"driver_id": [1, 1, 2, 2],
"trips": [10, 20, 15, 25],
}
aggs = [Aggregation(column="trips", function="sum")]

result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")

assert result == {"driver_id": [1, 2], "sum_trips": [30, 40]}


def test_aggregation_pandas_mode():
"""Test aggregations in Pandas mode (Arrow table format)."""
table = pa.table(
{
"driver_id": [1, 1, 2, 2],
"trips": [10, 20, 15, 25],
}
)
aggs = [Aggregation(column="trips", function="sum")]

result = _apply_aggregations_to_response(table, aggs, ["driver_id"], "pandas")

assert isinstance(result, pa.Table)
result_df = result.to_pandas()
assert list(result_df["driver_id"]) == [1, 2]
assert list(result_df["sum_trips"]) == [30, 40]


def test_multiple_aggregations():
"""Test multiple aggregation functions."""
data = {
"driver_id": [1, 1, 2, 2],
"trips": [10, 20, 15, 25],
"revenue": [100.0, 200.0, 150.0, 250.0],
}
aggs = [
Aggregation(column="trips", function="sum"),
Aggregation(column="revenue", function="mean"),
]

result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")

assert result["driver_id"] == [1, 2]
assert result["sum_trips"] == [30, 40]
assert result["mean_revenue"] == [150.0, 200.0]


def test_no_aggregations_returns_original():
"""Test that no aggregations returns original data."""
data = {"driver_id": [1, 2], "trips": [10, 20]}

result = _apply_aggregations_to_response(data, [], ["driver_id"], "python")

assert result == data


def test_empty_data_returns_empty():
"""Test that empty data returns empty result."""
data = {"driver_id": [], "trips": []}
aggs = [Aggregation(column="trips", function="sum")]

result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")

assert result == data
Loading