Skip to content

Commit 64bdafd

Browse files
authored
feat: Compute Engine Initial Implementation (#5223)
* add compute engine Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * add doc Signed-off-by: HaoXuAI <[email protected]> * add test Signed-off-by: HaoXuAI <[email protected]> * add integration test Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> * update test Signed-off-by: HaoXuAI <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> --------- Signed-off-by: HaoXuAI <[email protected]>
1 parent b976f27 commit 64bdafd

File tree

21 files changed

+1696
-5
lines changed

21 files changed

+1696
-5
lines changed

docs/getting-started/architecture/feature-transformation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Feature transformations can be executed by three types of "transformation engine
88

99
1. The Feast Feature Server
1010
2. An Offline Store (e.g., Snowflake, BigQuery, DuckDB, Spark, etc.)
11-
3. A Stream processor (e.g., Flink or Spark Streaming)
11+
3. [A Compute Engine](../../reference/compute-engine/README.md)
1212

1313
The three transformation engines are coupled with the [communication pattern used for writes](write-patterns.md).
1414

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# 🧠 ComputeEngine (WIP)
2+
3+
The `ComputeEngine` is Feast’s pluggable abstraction for executing feature pipelines — including transformations, aggregations, joins, and materializations/get_historical_features — on a backend of your choice (e.g., Spark, PyArrow, Pandas, Ray).
4+
5+
It powers both:
6+
7+
- `materialize()` – for batch and stream generation of features to offline/online stores
8+
- `get_historical_features()` – for point-in-time correct training dataset retrieval
9+
10+
This system builds and executes DAGs (Directed Acyclic Graphs) of typed operations, enabling modular and scalable workflows.
11+
12+
---
13+
14+
## 🧠 Core Concepts
15+
16+
| Component | Description |
17+
|--------------------|----------------------------------------------------------------------|
18+
| `ComputeEngine` | Interface for executing materialization and retrieval tasks |
19+
| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend |
20+
| `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) |
21+
| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs |
22+
| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs |
23+
24+
---
25+
26+
## ✨ Available Engines
27+
28+
### 🔥 SparkComputeEngine
29+
30+
- Distributed DAG execution via Apache Spark
31+
- Supports point-in-time joins and large-scale materialization
32+
- Integrates with `SparkOfflineStore` and `SparkMaterializationJob`
33+
34+
### 🧪 LocalComputeEngine (WIP)
35+
36+
- Runs on Arrow + Pandas (or optionally DuckDB)
37+
- Designed for local dev, testing, or lightweight feature generation
38+
39+
---
40+
41+
## 🛠️ Feature Builder Flow
42+
```markdown
43+
SourceReadNode
44+
|
45+
v
46+
JoinNode (Only for get_historical_features with entity df)
47+
|
48+
v
49+
FilterNode (Always included; applies TTL or user-defined filters)
50+
|
51+
v
52+
AggregationNode (If aggregations are defined in FeatureView)
53+
|
54+
v
55+
DeduplicationNode (If no aggregation is defined for get_historical_features)
56+
|
57+
v
58+
TransformationNode (If feature_transformation is defined)
59+
|
60+
v
61+
ValidationNode (If enable_validation = True)
62+
|
63+
v
64+
Output
65+
├──> RetrievalOutput (For get_historical_features)
66+
└──> OnlineStoreWrite / OfflineStoreWrite (For materialize)
67+
```
68+
69+
Each step is implemented as a `DAGNode`. An `ExecutionPlan` executes these nodes in topological order, caching `DAGValue` outputs.
70+
71+
---
72+
73+
## 🧩 Implementing a Custom Compute Engine
74+
75+
To create your own compute engine:
76+
77+
1. **Implement the interface**
78+
79+
```python
80+
from feast.infra.compute_engines.base import ComputeEngine
81+
from feast.infra.materialization.batch_materialization_engine import MaterializationTask, MaterializationJob
82+
from feast.infra.compute_engines.tasks import HistoricalRetrievalTask
83+
class MyComputeEngine(ComputeEngine):
84+
def materialize(self, task: MaterializationTask) -> MaterializationJob:
85+
...
86+
87+
def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob:
88+
...
89+
```
90+
91+
2. Create a FeatureBuilder
92+
```python
93+
from feast.infra.compute_engines.feature_builder import FeatureBuilder
94+
95+
class CustomFeatureBuilder(FeatureBuilder):
96+
def build_source_node(self): ...
97+
def build_aggregation_node(self, input_node): ...
98+
def build_join_node(self, input_node): ...
99+
def build_filter_node(self, input_node):
100+
def build_dedup_node(self, input_node):
101+
def build_transformation_node(self, input_node): ...
102+
def build_output_nodes(self, input_node): ...
103+
```
104+
105+
3. Define DAGNode subclasses
106+
* ReadNode, AggregationNode, JoinNode, WriteNode, etc.
107+
* Each DAGNode.execute(context) -> DAGValue
108+
109+
4. Return an ExecutionPlan
110+
* ExecutionPlan stores DAG nodes in topological order
111+
* Automatically handles intermediate value caching
112+
113+
## 🚧 Roadmap
114+
- [x] Modular, backend-agnostic DAG execution framework
115+
- [x] Spark engine with native support for materialization + PIT joins
116+
- [ ] PyArrow + Pandas engine for local compute
117+
- [ ] Native multi-feature-view DAG optimization
118+
- [ ] DAG validation, metrics, and debug output
119+
- [ ] Scalable distributed backend via Ray or Polars

sdk/python/feast/batch_feature_view.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import dill
77

88
from feast import flags_helper
9+
from feast.aggregation import Aggregation
910
from feast.data_source import DataSource
1011
from feast.entity import Entity
1112
from feast.feature_view import FeatureView
@@ -40,7 +41,8 @@ class BatchFeatureView(FeatureView):
4041
schema: The schema of the feature view, including feature, timestamp, and entity
4142
columns. If not specified, can be inferred from the underlying data source.
4243
source: The batch source of data where this group of features is stored.
43-
online: A boolean indicating whether online retrieval is enabled for this feature view.
44+
online: A boolean indicating whether online retrieval and write to online store is enabled for this feature view.
45+
offline: A boolean indicating whether offline retrieval and write to offline store is enabled for this feature view.
4446
description: A human-readable description.
4547
tags: A dictionary of key-value pairs to store arbitrary metadata.
4648
owner: The owner of the batch feature view, typically the email of the primary maintainer.
@@ -55,6 +57,7 @@ class BatchFeatureView(FeatureView):
5557
entity_columns: List[Field]
5658
features: List[Field]
5759
online: bool
60+
offline: bool
5861
description: str
5962
tags: Dict[str, str]
6063
owner: str
@@ -63,6 +66,8 @@ class BatchFeatureView(FeatureView):
6366
udf: Optional[Callable[[Any], Any]]
6467
udf_string: Optional[str]
6568
feature_transformation: Transformation
69+
batch_engine: Optional[Field]
70+
aggregations: Optional[List[Aggregation]]
6671

6772
def __init__(
6873
self,
@@ -73,13 +78,16 @@ def __init__(
7378
entities: Optional[List[Entity]] = None,
7479
ttl: Optional[timedelta] = None,
7580
tags: Optional[Dict[str, str]] = None,
76-
online: bool = True,
81+
online: bool = False,
82+
offline: bool = True,
7783
description: str = "",
7884
owner: str = "",
7985
schema: Optional[List[Field]] = None,
8086
udf: Optional[Callable[[Any], Any]],
8187
udf_string: Optional[str] = "",
8288
feature_transformation: Optional[Transformation] = None,
89+
batch_engine: Optional[Field] = None,
90+
aggregations: Optional[List[Aggregation]] = None,
8391
):
8492
if not flags_helper.is_test():
8593
warnings.warn(
@@ -103,13 +111,16 @@ def __init__(
103111
self.feature_transformation = (
104112
feature_transformation or self.get_feature_transformation()
105113
)
114+
self.batch_engine = batch_engine
115+
self.aggregations = aggregations or []
106116

107117
super().__init__(
108118
name=name,
109119
entities=entities,
110120
ttl=ttl,
111121
tags=tags,
112122
online=online,
123+
offline=offline,
113124
description=description,
114125
owner=owner,
115126
schema=schema,
@@ -144,18 +155,21 @@ def batch_feature_view(
144155
source: Optional[DataSource] = None,
145156
tags: Optional[Dict[str, str]] = None,
146157
online: bool = True,
158+
offline: bool = True,
147159
description: str = "",
148160
owner: str = "",
149161
schema: Optional[List[Field]] = None,
150162
):
151163
"""
152164
Args:
153165
name:
166+
mode:
154167
entities:
155168
ttl:
156169
source:
157170
tags:
158171
online:
172+
offline:
159173
description:
160174
owner:
161175
schema:
@@ -181,6 +195,7 @@ def decorator(user_function):
181195
source=source,
182196
tags=tags,
183197
online=online,
198+
offline=offline,
184199
description=description,
185200
owner=owner,
186201
schema=schema,

sdk/python/feast/feature_view.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ class FeatureView(BaseFeatureView):
9393
entity_columns: List[Field]
9494
features: List[Field]
9595
online: bool
96+
offline: bool
9697
description: str
9798
tags: Dict[str, str]
9899
owner: str
@@ -107,6 +108,7 @@ def __init__(
107108
entities: Optional[List[Entity]] = None,
108109
ttl: Optional[timedelta] = timedelta(days=0),
109110
online: bool = True,
111+
offline: bool = False,
110112
description: str = "",
111113
tags: Optional[Dict[str, str]] = None,
112114
owner: str = "",
@@ -127,6 +129,8 @@ def __init__(
127129
can result in extremely computationally intensive queries.
128130
online (optional): A boolean indicating whether online retrieval is enabled for
129131
this feature view.
132+
offline (optional): A boolean indicating whether write to offline store is enabled for
133+
this feature view.
130134
description (optional): A human-readable description.
131135
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
132136
owner (optional): The owner of the feature view, typically the email of the
@@ -218,6 +222,7 @@ def __init__(
218222
source=source,
219223
)
220224
self.online = online
225+
self.offline = offline
221226
self.materialization_intervals = []
222227

223228
def __hash__(self):
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from abc import ABC
2+
from typing import Union
3+
4+
import pyarrow as pa
5+
6+
from feast import RepoConfig
7+
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
8+
from feast.infra.compute_engines.tasks import HistoricalRetrievalTask
9+
from feast.infra.materialization.batch_materialization_engine import (
10+
MaterializationJob,
11+
MaterializationTask,
12+
)
13+
from feast.infra.offline_stores.offline_store import OfflineStore
14+
from feast.infra.online_stores.online_store import OnlineStore
15+
from feast.infra.registry.registry import Registry
16+
from feast.utils import _get_column_names
17+
18+
19+
class ComputeEngine(ABC):
20+
"""
21+
The interface that Feast uses to control the compute system that handles materialization and get_historical_features.
22+
Each engine must implement:
23+
- materialize(): to generate and persist features
24+
- get_historical_features(): to perform point-in-time correct joins
25+
Engines should use FeatureBuilder and DAGNode abstractions to build modular, pluggable workflows.
26+
"""
27+
28+
def __init__(
29+
self,
30+
*,
31+
registry: Registry,
32+
repo_config: RepoConfig,
33+
offline_store: OfflineStore,
34+
online_store: OnlineStore,
35+
**kwargs,
36+
):
37+
self.registry = registry
38+
self.repo_config = repo_config
39+
self.offline_store = offline_store
40+
self.online_store = online_store
41+
42+
def materialize(self, task: MaterializationTask) -> MaterializationJob:
43+
raise NotImplementedError
44+
45+
def get_historical_features(self, task: HistoricalRetrievalTask) -> pa.Table:
46+
raise NotImplementedError
47+
48+
def get_execution_context(
49+
self,
50+
task: Union[MaterializationTask, HistoricalRetrievalTask],
51+
) -> ExecutionContext:
52+
entity_defs = [
53+
self.registry.get_entity(name, task.project)
54+
for name in task.feature_view.entities
55+
]
56+
entity_df = None
57+
if hasattr(task, "entity_df") and task.entity_df is not None:
58+
entity_df = task.entity_df
59+
60+
column_info = self.get_column_info(task)
61+
return ExecutionContext(
62+
project=task.project,
63+
repo_config=self.repo_config,
64+
offline_store=self.offline_store,
65+
online_store=self.online_store,
66+
entity_defs=entity_defs,
67+
column_info=column_info,
68+
entity_df=entity_df,
69+
)
70+
71+
def get_column_info(
72+
self,
73+
task: Union[MaterializationTask, HistoricalRetrievalTask],
74+
) -> ColumnInfo:
75+
join_keys, feature_cols, ts_col, created_ts_col = _get_column_names(
76+
task.feature_view, self.registry.list_entities(task.project)
77+
)
78+
return ColumnInfo(
79+
join_keys=join_keys,
80+
feature_cols=feature_cols,
81+
ts_col=ts_col,
82+
created_ts_col=created_ts_col,
83+
)

0 commit comments

Comments
 (0)