Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions docs/reference/offline-stores/hybrid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Hybrid Offline Store

## Description
The HybridOfflineStore allows routing offline feature operations to different offline store backends based on the `batch_source` of the FeatureView. This enables a single Feast deployment to support multiple offline store backends, each configured independently and selected dynamically at runtime.

## Getting started
To use the HybridOfflineStore, install Feast with all required offline store dependencies (e.g., BigQuery, Snowflake, etc.) for the stores you plan to use. For example:

```bash
pip install 'feast[spark,snowflake]'
```

## Example

{% code title="feature_store.yaml" %}
```yaml
project: my_feature_repo
registry: data/registry.db
provider: local
offline_store:
type: hybrid_offline_store.HybridOfflineStore
offline_stores:
- type: spark
conf:
spark_master: local[*]
spark_app_name: feast_spark_app
- type: snowflake
conf:
account: my_snowflake_account
user: feast_user
password: feast_password
database: feast_database
schema: feast_schema
```
{% endcode %}

### Example FeatureView
```python
from feast import FeatureView, Entity, ValueType
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from feast.infra.offline_stores.snowflake_source import SnowflakeSource


entity = Entity(name="user_id", value_type=ValueType.INT64, join_keys=["user_id"])
feature_view1 = FeatureView(
name="user_features",
entities=["user_id"],
ttl=None,
features=[
# Define your features here
],
source=SparkSource(
path="s3://my-bucket/user_features_data",
),
)

feature_view2 = FeatureView(
name="user_activity",
entities=["user_id"],
ttl=None,
features=[
# Define your features here
],
source=SnowflakeSource(
path="s3://my-bucket/user_activity_data",
),
)

```

Then you can use materialize API to materialize the data from the specified offline store based on the `batch_source` of the FeatureView.

```python
from feast import FeatureStore
store = FeatureStore(repo_path=".")
store.materialize(
start_date="2025-01-01",
end_date="2025-07-31",
feature_views=[feature_view1, feature_view2],
)
```

## Functionality Matrix
| Feature/Functionality | Supported |
|---------------------------------------------------|----------------------------|
| pull_latest_from_table_or_query | Yes |
| pull_all_from_table_or_query | Yes |
| offline_write_batch | Yes |
| validate_data_source | Yes |
| get_table_column_names_and_types_from_data_source | Yes |
| write_logged_features | No |
| get_historical_features | Only with same data source |
25 changes: 25 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
}

_DATA_SOURCE_FOR_OFFLINE_STORE = {
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.dask.DaskOfflineStore",
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery.BigQueryOfflineStore",
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift.RedshiftOfflineStore",
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore",
DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore",
DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore",
DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
}


@typechecked
class DataSource(ABC):
Expand Down Expand Up @@ -401,6 +411,9 @@ def _set_timestamps_in_proto(self, data_source_proto: DataSourceProto):
self.last_updated_timestamp
)

@abstractmethod
def source_type(self) -> DataSourceProto.SourceType.ValueType: ...


@typechecked
class KafkaSource(DataSource):
Expand Down Expand Up @@ -564,6 +577,9 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_query_string(self) -> str:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.STREAM_KAFKA


@typechecked
class RequestSource(DataSource):
Expand Down Expand Up @@ -679,6 +695,9 @@ def get_table_query_string(self) -> str:
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.REQUEST_SOURCE


@typechecked
class KinesisSource(DataSource):
Expand Down Expand Up @@ -811,6 +830,9 @@ def _to_proto_impl(self) -> DataSourceProto:

return data_source_proto

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.STREAM_KINESIS


class PushMode(enum.Enum):
ONLINE = 1
Expand Down Expand Up @@ -911,3 +933,6 @@ def get_table_query_string(self) -> str:
@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.PUSH_SOURCE
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
class BigQuerySource(DataSource):
"""A BigQuerySource object defines a data source that a BigQueryOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_BIGQUERY

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@


class AthenaSource(DataSource):
def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_ATHENA

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:


class ClickhouseSource(DataSource):
def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add ClickhouseSourceType to DataSourceProto
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
class CouchbaseColumnarSource(DataSource):
"""A CouchbaseColumnarSource object defines a data source that a CouchbaseColumnarOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add Couchbase to DataSourceProto.SourceType

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just add all of this now ? is the scope large?

Copy link
Collaborator Author

@HaoXuAI HaoXuAI Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it will have to update the protobuf and someother places, which will do it in next PR

return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:
class MsSqlServerSource(DataSource):
"""A MsSqlServerSource object defines a data source that a MsSqlServerOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add MsSqlServerSource to DataSourceProto.SourceType
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
class PostgreSQLSource(DataSource):
"""A PostgreSQLSource object defines a data source that a PostgreSQLOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add Postgres to DataSourceProto.SourceType
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class SparkSourceFormat(Enum):
class SparkSource(DataSource):
"""A SparkSource object defines a data source that a Spark offline store can use"""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_SPARK

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def to_proto(self) -> DataSourceProto.TrinoOptions:
class TrinoSource(DataSource):
"""A TrinoSource object defines a data source that a TrinoOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_TRINO

def __init__(
self,
*,
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
class FileSource(DataSource):
"""A FileSource object defines a data source that a DaskOfflineStore or DuckDBOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_FILE

def __init__(
self,
*,
Expand Down
Loading
Loading