Skip to content

Commit f287ca5

Browse files
fix: FeatureView serialization with cycle detection (#5502)
DCO failed due to co-author and don't know how to fix it * update Signed-off-by: HaoXuAI <[email protected]> * update Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * fix doc Signed-off-by: HaoXuAI <[email protected]> * fix doc Signed-off-by: HaoXuAI <[email protected]> * fix doc Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * Update docs/reference/compute-engine/README.md Co-authored-by: Francisco Arceo <[email protected]> * Update docs/reference/compute-engine/README.md Co-authored-by: Francisco Arceo <[email protected]> * Update docs/getting-started/concepts/batch-feature-view.md Co-authored-by: Francisco Arceo <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> --------- Signed-off-by: HaoXuAI <[email protected]> Co-authored-by: Francisco Arceo <[email protected]>
1 parent 9f85892 commit f287ca5

File tree

11 files changed

+499
-41
lines changed

11 files changed

+499
-41
lines changed

docs/getting-started/architecture/feature-transformation.md

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,101 @@ when to use which transformation engine/communication pattern is extremely criti
1818
the success of your implementation.
1919

2020
In general, we recommend transformation engines and network calls to be chosen by aligning it with what is most
21-
appropriate for the data producer, feature/model usage, and overall product.
21+
appropriate for the data producer, feature/model usage, and overall product.
22+
23+
24+
## API
25+
### feature_transformation
26+
`feature_transformation` or `udf` are the core APIs for defining feature transformations in Feast. They allow you to specify custom logic that can be applied to the data during materialization or retrieval. Examples include:
27+
28+
```python
29+
def remove_extra_spaces(df: DataFrame) -> DataFrame:
30+
df['name'] = df['name'].str.replace('\s+', ' ')
31+
return df
32+
33+
spark_transformation = SparkTransformation(
34+
mode=TransformationMode.SPARK,
35+
udf=remove_extra_spaces,
36+
udf_string="remove extra spaces",
37+
)
38+
feature_view = FeatureView(
39+
feature_transformation=spark_transformation,
40+
...
41+
)
42+
```
43+
OR
44+
```python
45+
spark_transformation = Transformation(
46+
mode=TransformationMode.SPARK_SQL,
47+
udf=remove_extra_spaces_sql,
48+
udf_string="remove extra spaces sql",
49+
)
50+
feature_view = FeatureView(
51+
feature_transformation=spark_transformation,
52+
...
53+
)
54+
```
55+
OR
56+
```python
57+
@transformation(mode=TransformationMode.SPARK)
58+
def remove_extra_spaces_udf(df: pd.DataFrame) -> pd.DataFrame:
59+
return df.assign(name=df['name'].str.replace('\s+', ' '))
60+
61+
feature_view = FeatureView(
62+
feature_transformation=remove_extra_spaces_udf,
63+
...
64+
)
65+
```
66+
67+
### Aggregation
68+
Aggregation is builtin API for defining batch or streamable aggregations on data. It allows you to specify how to aggregate data over a time window, such as calculating the average or sum of a feature over a specified period. Examples include:
69+
```python
70+
from feast import Aggregation
71+
feature_view = FeatureView(
72+
aggregations=[
73+
Aggregation(
74+
column="amount",
75+
function="sum"
76+
)
77+
Aggregation(
78+
column="amount",
79+
function="avg",
80+
time_window="1h"
81+
),
82+
]
83+
...
84+
)
85+
```
86+
87+
### Filter
88+
ttl: They amount of time that the features will be available for materialization or retrieval. The entity rows' timestamp higher that the current time minus the ttl will be used to filter the features. This is useful for ensuring that only recent data is used in feature calculations. Examples include:
89+
90+
```python
91+
feature_view = FeatureView(
92+
ttl="1d", # Features will be available for 1 day
93+
...
94+
)
95+
```
96+
97+
### Join
98+
Feast can join multiple feature views together to create a composite feature view. This allows you to combine features from different sources or views into a single view. Examples include:
99+
```python
100+
feature_view = FeatureView(
101+
name="composite_feature_view",
102+
entities=["entity_id"],
103+
source=[
104+
FeatureView(
105+
name="feature_view_1",
106+
features=["feature_1", "feature_2"],
107+
...
108+
),
109+
FeatureView(
110+
name="feature_view_2",
111+
features=["feature_3", "feature_4"],
112+
...
113+
)
114+
]
115+
...
116+
)
117+
```
118+
The underlying implementation of the join is an inner join by default, and join key is the entity id.
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# 🧬 BatchFeatureView in Feast
2+
3+
`BatchFeatureView` is a flexible abstraction in Feast that allows users to define features derived from batch data sources or even other `FeatureView`s, enabling composable and reusable feature pipelines. It is an extension of the `FeatureView` class, with support for user-defined transformations, aggregations, and recursive chaining of feature logic.
4+
5+
---
6+
7+
## ✅ Key Capabilities
8+
9+
- **Composable DAG of FeatureViews**: Supports defining a `BatchFeatureView` on top of one or more other `FeatureView`s.
10+
- **Transformations**: Apply [transformation](../../getting-started/architecture/feature-transformation.md) logic (`feature_transformation` or `udf`) to raw data source, can also be used to deal with multiple data sources.
11+
- **Aggregations**: Define time-windowed aggregations (e.g. `sum`, `avg`) over event-timestamped data.
12+
- **Feature resolution & execution**: Automatically resolves and executes DAGs of dependent views during materialization or retrieval. More details in the [Compute engine documentation](../../reference/compute-engine/README.md).
13+
- **Materialization Sink Customization**: Specify a custom `sink_source` to define where derived feature data should be persisted.
14+
15+
---
16+
17+
## 📐 Class Signature
18+
19+
```python
20+
class BatchFeatureView(FeatureView):
21+
def __init__(
22+
*,
23+
name: str,
24+
source: Union[DataSource, FeatureView, List[FeatureView]],
25+
sink_source: Optional[DataSource] = None,
26+
schema: Optional[List[Field]] = None,
27+
entities: Optional[List[Entity]] = None,
28+
aggregations: Optional[List[Aggregation]] = None,
29+
udf: Optional[Callable[[DataFrame], DataFrame]] = None,
30+
udf_string: Optional[str] = None,
31+
ttl: Optional[timedelta] = timedelta(days=0),
32+
online: bool = True,
33+
offline: bool = False,
34+
description: str = "",
35+
tags: Optional[Dict[str, str]] = None,
36+
owner: str = "",
37+
)
38+
```
39+
40+
---
41+
42+
## 🧠 Usage
43+
44+
### 1. Simple Feature View from Data Source
45+
46+
```python
47+
from feast import BatchFeatureView, Field
48+
from feast.types import Float32, Int32
49+
from feast import FileSource
50+
from feast.aggregation import Aggregation
51+
from datetime import timedelta
52+
53+
source = FileSource(
54+
path="s3://bucket/path/data.parquet",
55+
timestamp_field="event_timestamp",
56+
created_timestamp_column="created",
57+
)
58+
59+
driver_fv = BatchFeatureView(
60+
name="driver_hourly_stats",
61+
entities=["driver_id"],
62+
schema=[
63+
Field(name="driver_id", dtype=Int32),
64+
Field(name="conv_rate", dtype=Float32),
65+
],
66+
aggregations=[
67+
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)),
68+
],
69+
source=source,
70+
)
71+
```
72+
73+
---
74+
75+
### 2. Derived Feature View from Another View
76+
You can build feature views on top of other features by deriving a feature view from another view. Let's take a look at an example.
77+
```python
78+
from feast import BatchFeatureView, Field
79+
from pyspark.sql import DataFrame
80+
from feast.types import Float32, Int32
81+
from feast import FileSource
82+
83+
def transform(df: DataFrame) -> DataFrame:
84+
return df.withColumn("conv_rate", df["conv_rate"] * 2)
85+
86+
daily_driver_stats = BatchFeatureView(
87+
name="daily_driver_stats",
88+
entities=["driver_id"],
89+
schema=[
90+
Field(name="driver_id", dtype=Int32),
91+
Field(name="conv_rate", dtype=Float32),
92+
],
93+
udf=transform,
94+
source=driver_fv,
95+
sink_source=FileSource( # Required to specify where to sink the derived view
96+
name="daily_driver_stats_sink",
97+
path="s3://bucket/daily_stats/",
98+
file_format="parquet",
99+
timestamp_field="event_timestamp",
100+
created_timestamp_column="created",
101+
),
102+
)
103+
```
104+
105+
---
106+
107+
## 🔄 Execution Flow
108+
109+
Feast automatically resolves the DAG of `BatchFeatureView` dependencies during:
110+
111+
- `materialize()`: recursively resolves and executes the feature view graph.
112+
- `get_historical_features()`: builds the execution plan for retrieving point-in-time correct features.
113+
- `apply()`: registers the feature view DAG structure to the registry.
114+
115+
Each transformation and aggregation is turned into a DAG node (e.g., `SparkTransformationNode`, `SparkAggregationNode`) executed by the compute engine (e.g., `SparkComputeEngine`).
116+
117+
---
118+
119+
## ⚙️ How Materialization Works
120+
121+
- If the `BatchFeatureView` is backed by a base source (`FileSource`, `BigQuerySource`, `SparkSource` etc), the `batch_source` is used directly.
122+
- If the source is another feature view (i.e., chained views), the `sink_source` must be provided to define the materialization target data source.
123+
- During DAG planning, `SparkWriteNode` uses the `sink_source` as the batch sink.
124+
125+
---
126+
127+
## 🧪 Example Tests
128+
129+
See:
130+
131+
- `test_spark_dag_materialize_recursive_view()`: Validates chaining of two feature views and output validation.
132+
- `test_spark_compute_engine_materialize()`: Validates transformation and write of features into offline and online stores.
133+
134+
---
135+
136+
## 🛑 Gotchas
137+
138+
- `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them).
139+
- Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist.
140+
- Aggregation logic must reference columns present in the raw source or transformed inputs.
141+
142+
---
143+
144+
## 🔮 Future Directions
145+
146+
- Support additional offline stores (e.g., Snowflake, Redshift) with auto-generated sink sources.
147+
- Enable fully declarative transform logic (SQL + UDF mix).
148+
- Introduce optimization passes for DAG pruning and fusion.

docs/reference/compute-engine/README.md

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,39 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
1313

1414
## 🧠 Core Concepts
1515

16-
| Component | Description |
17-
|--------------------|----------------------------------------------------------------------|
18-
| `ComputeEngine` | Interface for executing materialization and retrieval tasks |
19-
| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend |
20-
| `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) |
21-
| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs |
22-
| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs |
16+
| Component | Description | API |
17+
|--------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
18+
| `ComputeEngine` | Interface for executing materialization and retrieval tasks | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/base.py) |
19+
| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) |
20+
| `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) |
21+
| `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) |
22+
| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |
23+
| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |
2324

2425
---
2526

27+
## Feature resolver and builder
28+
The `FeatureBuilder` initializes a `FeatureResolver` that extracts a DAG from the `FeatureView` definitions, resolving dependencies and ensuring the correct execution order. \
29+
The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.). \
30+
When defining a `FeatureView`, the source can be a physical `DataSource`, a derived `FeatureView`, or a list of `FeatureViews`.
31+
The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and returns a head node that represents the final output of the DAG. \
32+
Subsequently, the `FeatureBuilder` builds the DAG nodes from the resolved head node, creating a `DAGNode` for each operation (read, join, filter, aggregate, etc.).
33+
An example of built output from FeatureBuilder:
34+
```markdown
35+
- Output(Agg(daily_driver_stats))
36+
- Agg(daily_driver_stats)
37+
- Filter(daily_driver_stats)
38+
- Transform(daily_driver_stats)
39+
- Agg(hourly_driver_stats)
40+
- Filter(hourly_driver_stats)
41+
- Transform(hourly_driver_stats)
42+
- Source(hourly_driver_stats)
43+
```
44+
45+
## Diagram
46+
![feature_dag.png](feature_dag.png)
47+
48+
2649
## ✨ Available Engines
2750

2851
### 🔥 SparkComputeEngine
@@ -44,7 +67,7 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
4467
SourceReadNode
4568
|
4669
v
47-
JoinNode (Only for get_historical_features with entity df)
70+
TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources)
4871
|
4972
v
5073
FilterNode (Always included; applies TTL or user-defined filters)
@@ -56,9 +79,6 @@ AggregationNode (If aggregations are defined in FeatureView)
5679
DeduplicationNode (If no aggregation is defined for get_historical_features)
5780
|
5881
v
59-
TransformationNode (If feature_transformation is defined)
60-
|
61-
v
6282
ValidationNode (If enable_validation = True)
6383
|
6484
v
@@ -79,20 +99,54 @@ To create your own compute engine:
7999

80100
```python
81101
from feast.infra.compute_engines.base import ComputeEngine
82-
from feast.infra.materialization.batch_materialization_engine import MaterializationTask, MaterializationJob
83-
from feast.infra.compute_engines.tasks import HistoricalRetrievalTask
102+
from typing import Sequence, Union
103+
from feast.batch_feature_view import BatchFeatureView
104+
from feast.entity import Entity
105+
from feast.feature_view import FeatureView
106+
from feast.infra.common.materialization_job import (
107+
MaterializationJob,
108+
MaterializationTask,
109+
)
110+
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
111+
from feast.infra.offline_stores.offline_store import RetrievalJob
112+
from feast.infra.registry.base_registry import BaseRegistry
113+
from feast.on_demand_feature_view import OnDemandFeatureView
114+
from feast.stream_feature_view import StreamFeatureView
115+
116+
84117
class MyComputeEngine(ComputeEngine):
85-
def materialize(self, task: MaterializationTask) -> MaterializationJob:
118+
def update(
119+
self,
120+
project: str,
121+
views_to_delete: Sequence[
122+
Union[BatchFeatureView, StreamFeatureView, FeatureView]
123+
],
124+
views_to_keep: Sequence[
125+
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
126+
],
127+
entities_to_delete: Sequence[Entity],
128+
entities_to_keep: Sequence[Entity],
129+
):
130+
...
131+
132+
def _materialize_one(
133+
self,
134+
registry: BaseRegistry,
135+
task: MaterializationTask,
136+
**kwargs,
137+
) -> MaterializationJob:
86138
...
87139

88140
def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob:
89141
...
142+
90143
```
91144

92145
2. Create a FeatureBuilder
93146
```python
94147
from feast.infra.compute_engines.feature_builder import FeatureBuilder
95148

149+
96150
class CustomFeatureBuilder(FeatureBuilder):
97151
def build_source_node(self): ...
98152
def build_aggregation_node(self, input_node): ...
@@ -101,6 +155,7 @@ class CustomFeatureBuilder(FeatureBuilder):
101155
def build_dedup_node(self, input_node):
102156
def build_transformation_node(self, input_node): ...
103157
def build_output_nodes(self, input_node): ...
158+
def build_validation_node(self, input_node): ...
104159
```
105160

106161
3. Define DAGNode subclasses
@@ -114,7 +169,7 @@ class CustomFeatureBuilder(FeatureBuilder):
114169
## 🚧 Roadmap
115170
- [x] Modular, backend-agnostic DAG execution framework
116171
- [x] Spark engine with native support for materialization + PIT joins
117-
- [ ] PyArrow + Pandas engine for local compute
118-
- [ ] Native multi-feature-view DAG optimization
172+
- [x] PyArrow + Pandas engine for local compute
173+
- [x] Native multi-feature-view DAG optimization
119174
- [ ] DAG validation, metrics, and debug output
120175
- [ ] Scalable distributed backend via Ray or Polars
510 KB
Loading

0 commit comments

Comments
 (0)