Skip to content

Commit 2df0a46

Browse files
HaoXuAIj-wine
authored andcommitted
feat: Local compute engine (feast-dev#5278)
Signed-off-by: Jacob Weinhold <[email protected]>
1 parent 13586cc commit 2df0a46

34 files changed

+1046
-165
lines changed

docs/reference/compute-engine/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
3131
- Supports point-in-time joins and large-scale materialization
3232
- Integrates with `SparkOfflineStore` and `SparkMaterializationJob`
3333

34-
### 🧪 LocalComputeEngine (WIP)
34+
### 🧪 LocalComputeEngine
3535

36-
- Runs on Arrow + Pandas (or optionally DuckDB)
36+
- Runs on Arrow + Specified backend (e.g., Pandas, Polars)
3737
- Designed for local dev, testing, or lightweight feature generation
38+
- Supports `LocalMaterializationJob` and `LocalHistoricalRetrievalJob`
3839

3940
---
4041

sdk/python/feast/infra/common/__init__.py

Whitespace-only changes.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import enum
2+
from abc import ABC, abstractmethod
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
from typing import Callable, Optional, Union
6+
7+
from tqdm import tqdm
8+
9+
from feast import BatchFeatureView, FeatureView, StreamFeatureView
10+
11+
12+
@dataclass
13+
class MaterializationTask:
14+
"""
15+
A MaterializationTask represents a unit of data that needs to be materialized from an
16+
offline store to an online store.
17+
"""
18+
19+
project: str
20+
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
21+
start_time: datetime
22+
end_time: datetime
23+
tqdm_builder: Callable[[int], tqdm]
24+
25+
26+
class MaterializationJobStatus(enum.Enum):
27+
WAITING = 1
28+
RUNNING = 2
29+
AVAILABLE = 3
30+
ERROR = 4
31+
CANCELLING = 5
32+
CANCELLED = 6
33+
SUCCEEDED = 7
34+
PAUSED = 8
35+
RETRYING = 9
36+
37+
38+
class MaterializationJob(ABC):
39+
"""
40+
A MaterializationJob represents an ongoing or executed process that materializes data as per the
41+
definition of a materialization task.
42+
"""
43+
44+
task: MaterializationTask
45+
46+
@abstractmethod
47+
def status(self) -> MaterializationJobStatus: ...
48+
49+
@abstractmethod
50+
def error(self) -> Optional[BaseException]: ...
51+
52+
@abstractmethod
53+
def should_be_retried(self) -> bool: ...
54+
55+
@abstractmethod
56+
def job_id(self) -> str: ...
57+
58+
@abstractmethod
59+
def url(self) -> Optional[str]: ...

sdk/python/feast/infra/compute_engines/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
import pyarrow as pa
55

66
from feast import RepoConfig
7-
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
8-
from feast.infra.compute_engines.tasks import HistoricalRetrievalTask
9-
from feast.infra.materialization.batch_materialization_engine import (
7+
from feast.infra.common.materialization_job import (
108
MaterializationJob,
119
MaterializationTask,
1210
)
11+
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
12+
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
1313
from feast.infra.offline_stores.offline_store import OfflineStore
1414
from feast.infra.online_stores.online_store import OnlineStore
1515
from feast.infra.registry.registry import Registry

sdk/python/feast/infra/compute_engines/feature_builder.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
from abc import ABC, abstractmethod
22
from typing import Union
33

4-
from feast import BatchFeatureView, FeatureView, StreamFeatureView
4+
from feast.infra.common.materialization_job import MaterializationTask
5+
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
56
from feast.infra.compute_engines.dag.node import DAGNode
67
from feast.infra.compute_engines.dag.plan import ExecutionPlan
7-
from feast.infra.compute_engines.tasks import HistoricalRetrievalTask
8-
from feast.infra.materialization.batch_materialization_engine import MaterializationTask
98

109

1110
class FeatureBuilder(ABC):
@@ -16,10 +15,9 @@ class FeatureBuilder(ABC):
1615

1716
def __init__(
1817
self,
19-
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
2018
task: Union[MaterializationTask, HistoricalRetrievalTask],
2119
):
22-
self.feature_view = feature_view
20+
self.feature_view = task.feature_view
2321
self.task = task
2422
self.nodes: list[DAGNode] = []
2523

sdk/python/feast/infra/compute_engines/local/__init__.py

Whitespace-only changes.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import pyarrow as pa
2+
3+
from feast.infra.compute_engines.dag.model import DAGFormat
4+
from feast.infra.compute_engines.dag.value import DAGValue
5+
6+
7+
class ArrowTableValue(DAGValue):
8+
def __init__(self, data: pa.Table):
9+
super().__init__(data, DAGFormat.ARROW)
10+
11+
def __repr__(self):
12+
return f"ArrowTableValue(schema={self.data.schema}, rows={self.data.num_rows})"

sdk/python/feast/infra/compute_engines/local/backends/__init__.py

Whitespace-only changes.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from abc import ABC, abstractmethod
2+
from datetime import timedelta
3+
4+
5+
class DataFrameBackend(ABC):
6+
"""
7+
Abstract interface for DataFrame operations used by the LocalComputeEngine.
8+
9+
This interface defines the contract for implementing pluggable DataFrame backends
10+
such as Pandas, Polars, or DuckDB. Each backend must support core table operations
11+
such as joins, filtering, aggregation, conversion to/from Arrow, and deduplication.
12+
13+
The purpose of this abstraction is to allow seamless swapping of execution backends
14+
without changing DAGNode or ComputeEngine logic. All nodes operate on pyarrow.Table
15+
as the standard input/output format, while the backend defines how the computation
16+
is actually performed.
17+
18+
Expected implementations include:
19+
- PandasBackend
20+
- PolarsBackend
21+
- DuckDBBackend (future)
22+
23+
Methods
24+
-------
25+
from_arrow(table: pa.Table) -> Any
26+
Convert a pyarrow.Table to the backend-native DataFrame format.
27+
28+
to_arrow(df: Any) -> pa.Table
29+
Convert a backend-native DataFrame to pyarrow.Table.
30+
31+
join(left: Any, right: Any, on: List[str], how: str) -> Any
32+
Join two dataframes on specified keys with given join type.
33+
34+
groupby_agg(df: Any, group_keys: List[str], agg_ops: Dict[str, Tuple[str, str]]) -> Any
35+
Group and aggregate the dataframe. `agg_ops` maps output column names
36+
to (aggregation function, source column name) pairs.
37+
38+
filter(df: Any, expr: str) -> Any
39+
Apply a filter expression (string-based) to the DataFrame.
40+
41+
to_timedelta_value(delta: timedelta) -> Any
42+
Convert a Python timedelta object to a backend-compatible value
43+
that can be subtracted from a timestamp column.
44+
45+
drop_duplicates(df: Any, keys: List[str], sort_by: List[str], ascending: bool = False) -> Any
46+
Deduplicate the DataFrame by key columns, keeping the first row
47+
by descending or ascending sort order.
48+
49+
rename_columns(df: Any, columns: Dict[str, str]) -> Any
50+
Rename columns in the DataFrame according to the provided mapping.
51+
"""
52+
53+
@abstractmethod
54+
def columns(self, df): ...
55+
56+
@abstractmethod
57+
def from_arrow(self, table): ...
58+
59+
@abstractmethod
60+
def join(self, left, right, on, how): ...
61+
62+
@abstractmethod
63+
def groupby_agg(self, df, group_keys, agg_ops): ...
64+
65+
@abstractmethod
66+
def filter(self, df, expr): ...
67+
68+
@abstractmethod
69+
def to_arrow(self, df): ...
70+
71+
@abstractmethod
72+
def to_timedelta_value(self, delta: timedelta): ...
73+
74+
@abstractmethod
75+
def drop_duplicates(self, df, keys, sort_by, ascending: bool = False):
76+
pass
77+
78+
@abstractmethod
79+
def rename_columns(self, df, columns: dict[str, str]): ...

0 commit comments

Comments
 (0)