Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/getting-started/components/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
[feature-server.md](feature-server.md)
{% endcontent-ref %}

{% content-ref url="batch-materialization-engine.md" %}
[batch-materialization-engine.md](batch-materialization-engine.md)
{% content-ref url="compute-engine.md" %}
[compute-engine.md](compute-engine.md)
{% endcontent-ref %}

{% content-ref url="provider.md" %}
Expand Down
11 changes: 0 additions & 11 deletions docs/getting-started/components/batch-materialization-engine.md

This file was deleted.

111 changes: 111 additions & 0 deletions docs/getting-started/components/compute-engine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Compute Engine (Batch Materialization Engine)

Note: The materialization is now constructed via unified compute engine interface.

A Compute Engine in Feast is a component that handles materialization and historical retrieval tasks. It is responsible
for executing the logic defined in feature views, such as aggregations, transformations, and custom user-defined
functions (UDFs).

A materialization task abstracts over specific technologies or frameworks that are used to materialize data. It allows
users to use a pure local serialized approach (which is the default LocalComputeEngine), or delegates the
materialization to seperate components (e.g. AWS Lambda, as implemented by the the LambdaComputeEngine).

If the built-in engines are not sufficient, you can create your own custom materialization engine. Please
see [this guide](../../how-to-guides/customizing-feast/creating-a-custom-compute-engine.md) for more details.

Please see [feature\_store.yaml](../../reference/feature-repository/feature-store-yaml.md#overview) for configuring
engines.

### Supported Compute Engines
```markdown
| Compute Engine | Description | Supported | Link |
|-------------------------|-------------------------------------------------------------------------------------------------|------------|------|
| LocalComputeEngine | Runs on Arrow + Pandas/Polars/Dask etc., designed for light weight transformation. | ✅ | |
| SparkComputeEngine | Runs on Apache Spark, designed for large-scale distributed feature generation. | ✅ | |
| LambdaComputeEngine | Runs on AWS Lambda, designed for serverless feature generation. | ✅ | |
| FlinkComputeEngine | Runs on Apache Flink, designed for stream processing and real-time feature generation. | ❌ | |
| RayComputeEngine | Runs on Ray, designed for distributed feature generation and machine learning workloads. | ❌ | |
```

### Batch Engine
Batch Engine Config can be configured in the `feature_store.yaml` file, and it serves as the default configuration for all materialization and historical retrieval tasks. The `batch_engine` config in BatchFeatureView. E.g
```yaml
batch_engine:
type: SparkComputeEngine
config:
spark_master: "local[*]"
spark_app_name: "Feast Batch Engine"
spark_conf:
spark.sql.shuffle.partitions: 100
spark.executor.memory: "4g"

```
in BatchFeatureView.
```python
from feast import BatchFeatureView

fv = BatchFeatureView(
batch_engine={
"spark_conf": {
"spark.sql.shuffle.partitions": 200,
"spark.executor.memory": "8g"
},
}
)
```
Then, when you materialize the feature view, it will use the batch_engine configuration specified in the feature view, which has shuffle partitions set to 200 and executor memory set to 8g.

### Stream Engine
Stream Engine Config can be configured in the `feature_store.yaml` file, and it serves as the default configuration for all stream materialization and historical retrieval tasks. The `stream_engine` config in FeatureView. E.g
```yaml
stream_engine:
type: SparkComputeEngine
config:
spark_master: "local[*]"
spark_app_name: "Feast Stream Engine"
spark_conf:
spark.sql.shuffle.partitions: 100
spark.executor.memory: "4g"
```
```python
from feast import StreamFeatureView
fv = StreamFeatureView(
stream_engine={
"spark_conf": {
"spark.sql.shuffle.partitions": 200,
"spark.executor.memory": "8g"
},
}
)
```
Then, when you materialize the feature view, it will use the stream_engine configuration specified in the feature view, which has shuffle partitions set to 200 and executor memory set to 8g.

### API

The compute engine builds the execution plan in a DAG format named FeatureBuilder. It derives feature generation from
Feature View definitions including:

```
1. Transformation (via Transformation API)
2. Aggregation (via Aggregation API)
3. Join (join with entity datasets, customized JOIN or join with another Feature View)
4. Filter (Point in time filter, ttl filter, filter by custom expression)
...
```

### Components
The compute engine is responsible for executing the materialization and retrieval tasks defined in the feature views. It
builds a directed acyclic graph (DAG) of operations that need to be performed to generate the features.
The Core components of the compute engine are:


#### Feature Builder

The Feature builder is responsible for resolving the features from the feature views and executing the operations
defined in the DAG. It handles the execution of transformations, aggregations, joins, and filters.

#### Feature Resolver

The Feature resolver is the core component of the compute engine that constructs the execution plan for feature
generation. It takes the definitions from feature views and builds a directed acyclic graph (DAG) of operations that
need to be performed to generate the features.
8 changes: 8 additions & 0 deletions docs/getting-started/concepts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
[feature-view.md](feature-view.md)
{% endcontent-ref %}

{% content-ref url="batch-feature-view.md" %}
[batch-feature-view.md](batch-feature-view.md)
{% endcontent-ref %}

{% content-ref url="stream-feature-view.md" %}
[stream-feature-view.md](stream-feature-view.md)
{% endcontent-ref %}

{% content-ref url="feature-retrieval.md" %}
[feature-retrieval.md](feature-retrieval.md)
{% endcontent-ref %}
Expand Down
6 changes: 6 additions & 0 deletions docs/getting-started/concepts/batch-feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

`BatchFeatureView` is a flexible abstraction in Feast that allows users to define features derived from batch data sources or even other `FeatureView`s, enabling composable and reusable feature pipelines. It is an extension of the `FeatureView` class, with support for user-defined transformations, aggregations, and recursive chaining of feature logic.

## Supported Compute Engines
- [x] LocalComputeEngine
- [x] SparkComputeEngine
- [ ] LambdaComputeEngine
- [ ] KubernetesComputeEngine

---

## ✅ Key Capabilities
Expand Down
16 changes: 16 additions & 0 deletions docs/getting-started/concepts/stream-feature-view.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Stream Feature View
`StreamFeatureView` is a type of feature view in Feast that allows you to define features that are continuously updated from a streaming source. It is designed to handle real-time data ingestion and feature generation, making it suitable for use cases where features need to be updated frequently as new data arrives.

### Supported Compute Engines
- [x] LocalComputeEngine
- [x] SparkComputeEngine
- [ ] FlinkComputeEngine

### Key Capabilities
- **Real-time Feature Generation**: Supports defining features that are continuously updated from a streaming source.

- **Transformations**: Apply transformation logic (e.g., `feature_transformation` or `udf`) to raw data source.

- **Aggregations**: Define time-windowed aggregations (e.g., `sum`, `avg`) over event-timestamped data.

- **Feature resolution & execution**: Automatically resolves and executes dependent views during materialization or retrieval.
4 changes: 2 additions & 2 deletions docs/reference/compute-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) |
| `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) |
| `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) |
| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |
| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |
| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) |
| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) |

---

Expand Down
30 changes: 13 additions & 17 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class BatchFeatureView(FeatureView):

Attributes:
name: The unique name of the batch feature view.
mode: The transformation mode to use for the batch feature view. This can be one of TransformationMode
entities: List of entities or entity join keys.
ttl: The amount of time this group of features lives. A ttl of 0 indicates that
this group of features lives forever. Note that large ttl's or a ttl of 0
Expand All @@ -46,6 +47,13 @@ class BatchFeatureView(FeatureView):
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the batch feature view, typically the email of the primary maintainer.
udf: A user-defined function that applies transformations to the data in the batch feature view.
udf_string: A string representation of the user-defined function.
feature_transformation: A transformation object that defines how features are transformed.
Note, feature_transformation has precedence over udf and udf_string.
batch_engine: A dictionary containing configuration for the batch engine used to process the feature view.
Note, it will override the repo-level default batch engine config defined in the yaml file.
aggregations: A list of aggregations to be applied to the features in the batch feature view.
"""

name: str
Expand All @@ -67,7 +75,7 @@ class BatchFeatureView(FeatureView):
udf: Optional[Callable[[Any], Any]]
udf_string: Optional[str]
feature_transformation: Optional[Transformation]
batch_engine: Optional[Field]
batch_engine: Optional[Dict[str, Any]]
aggregations: Optional[List[Aggregation]]

def __init__(
Expand All @@ -88,7 +96,7 @@ def __init__(
udf: Optional[Callable[[Any], Any]] = None,
udf_string: Optional[str] = "",
feature_transformation: Optional[Transformation] = None,
batch_engine: Optional[Field] = None,
batch_engine: Optional[Dict[str, Any]] = None,
aggregations: Optional[List[Aggregation]] = None,
):
if not flags_helper.is_test():
Expand Down Expand Up @@ -162,21 +170,9 @@ def batch_feature_view(
schema: Optional[List[Field]] = None,
):
"""
Args:
name:
mode:
entities:
ttl:
source:
tags:
online:
offline:
description:
owner:
schema:

Returns:

Creates a BatchFeatureView object with the given user-defined function (UDF) as the transformation.
Please make sure that the udf contains all non-built in imports within the function to ensure that the execution
of a deserialized function does not miss imports.
"""

def mainify(obj):
Expand Down
31 changes: 31 additions & 0 deletions sdk/python/feast/infra/compute_engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,34 @@ def get_execution_context(
entity_defs=entity_defs,
entity_df=entity_df,
)

def _get_feature_view_engine_config(
self, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
) -> dict:
"""
Merge repo-level default batch engine config with runtime engine overrides defined in the feature view.

Priority:
1. Repo config (`self.repo_config.batch_engine_config`) - baseline
2. FeatureView overrides (`batch_engine` for BatchFeatureView, `stream_engine` for StreamFeatureView`) - highest priority

Args:
feature_view: A BatchFeatureView or StreamFeatureView.

Returns:
dict: The merged engine configuration.
"""
default_conf = self.repo_config.batch_engine_config or {}

runtime_conf = None
if isinstance(feature_view, BatchFeatureView):
runtime_conf = feature_view.batch_engine
elif isinstance(feature_view, StreamFeatureView):
runtime_conf = feature_view.stream_engine

if runtime_conf is not None and not isinstance(runtime_conf, dict):
raise TypeError(
f"Engine config for {feature_view.name} must be a dict, got {type(runtime_conf)}."
)

return {**default_conf, **runtime_conf} if runtime_conf else dict(default_conf)
Empty file.
32 changes: 14 additions & 18 deletions sdk/python/feast/infra/compute_engines/spark/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Dict, Literal, Optional, Sequence, Union, cast

from pydantic import StrictStr
from pyspark.sql import SparkSession

from feast import (
BatchFeatureView,
Expand Down Expand Up @@ -77,20 +78,11 @@ def teardown_infra(
):
pass

def __init__(
self,
offline_store,
online_store,
repo_config,
**kwargs,
):
super().__init__(
offline_store=offline_store,
online_store=online_store,
repo_config=repo_config,
**kwargs,
)
self.spark_session = get_or_create_new_spark_session()
def _get_feature_view_spark_session(
self, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
) -> SparkSession:
spark_conf = self._get_feature_view_engine_config(feature_view)
return get_or_create_new_spark_session(spark_conf)

def _materialize_one(
self,
Expand All @@ -113,11 +105,13 @@ def _materialize_one(
# ✅ 1. Build typed execution context
context = self.get_execution_context(registry, task)

spark_session = self._get_feature_view_spark_session(task.feature_view)

try:
# ✅ 2. Construct Feature Builder and run it
builder = SparkFeatureBuilder(
registry=registry,
spark_session=self.spark_session,
spark_session=spark_session,
task=task,
)
plan = builder.build()
Expand Down Expand Up @@ -209,18 +203,20 @@ def get_historical_features(
# ✅ 1. Build typed execution context
context = self.get_execution_context(registry, task)

spark_session = self._get_feature_view_spark_session(task.feature_view)

try:
# ✅ 2. Construct Feature Builder and run it
builder = SparkFeatureBuilder(
registry=registry,
spark_session=self.spark_session,
spark_session=spark_session,
task=task,
)
plan = builder.build()

return SparkDAGRetrievalJob(
plan=plan,
spark_session=self.spark_session,
spark_session=spark_session,
context=context,
config=self.repo_config,
full_feature_names=task.full_feature_name,
Expand All @@ -229,7 +225,7 @@ def get_historical_features(
# 🛑 Handle failure
return SparkDAGRetrievalJob(
plan=None,
spark_session=self.spark_session,
spark_session=spark_session,
context=context,
config=self.repo_config,
full_feature_names=task.full_feature_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel):

spark_conf: Optional[Dict[str, str]] = None
""" Configuration overlay for the spark session """
# sparksession is not serializable and we dont want to pass it around as an argument

staging_location: Optional[StrictStr] = None
""" Remote path for batch materialization jobs"""
Expand Down
Loading
Loading