Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
}

_DATA_SOURCE_FOR_OFFLINE_STORE = {
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.dask.DaskOfflineStore",
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery.BigQueryOfflineStore",
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift.RedshiftOfflineStore",
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore",
DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore",
DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore",
DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
}


@typechecked
class DataSource(ABC):
Expand Down Expand Up @@ -401,6 +411,9 @@ def _set_timestamps_in_proto(self, data_source_proto: DataSourceProto):
self.last_updated_timestamp
)

@abstractmethod
def source_type(self) -> DataSourceProto.SourceType.ValueType: ...


@typechecked
class KafkaSource(DataSource):
Expand Down Expand Up @@ -564,6 +577,9 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_query_string(self) -> str:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.STREAM_KAFKA


@typechecked
class RequestSource(DataSource):
Expand Down Expand Up @@ -679,6 +695,9 @@ def get_table_query_string(self) -> str:
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.REQUEST_SOURCE


@typechecked
class KinesisSource(DataSource):
Expand Down Expand Up @@ -811,6 +830,9 @@ def _to_proto_impl(self) -> DataSourceProto:

return data_source_proto

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.STREAM_KINESIS


class PushMode(enum.Enum):
ONLINE = 1
Expand Down Expand Up @@ -911,3 +933,6 @@ def get_table_query_string(self) -> str:
@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.PUSH_SOURCE
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
class BigQuerySource(DataSource):
"""A BigQuerySource object defines a data source that a BigQueryOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_BIGQUERY

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@


class AthenaSource(DataSource):
def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_ATHENA

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:


class ClickhouseSource(DataSource):
def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add ClickhouseSourceType to DataSourceProto
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
class CouchbaseColumnarSource(DataSource):
"""A CouchbaseColumnarSource object defines a data source that a CouchbaseColumnarOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add Couchbase to DataSourceProto.SourceType

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just add all of this now ? is the scope large?

Copy link
Collaborator Author

@HaoXuAI HaoXuAI Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it will have to update the protobuf and someother places, which will do it in next PR

return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:
class MsSqlServerSource(DataSource):
"""A MsSqlServerSource object defines a data source that a MsSqlServerOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add MsSqlServerSource to DataSourceProto.SourceType
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
class PostgreSQLSource(DataSource):
"""A PostgreSQLSource object defines a data source that a PostgreSQLOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add Postgres to DataSourceProto.SourceType
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class SparkSourceFormat(Enum):
class SparkSource(DataSource):
"""A SparkSource object defines a data source that a Spark offline store can use"""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_SPARK

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def to_proto(self) -> DataSourceProto.TrinoOptions:
class TrinoSource(DataSource):
"""A TrinoSource object defines a data source that a TrinoOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_TRINO

def __init__(
self,
*,
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
class FileSource(DataSource):
"""A FileSource object defines a data source that a DaskOfflineStore or DuckDBOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_FILE

def __init__(
self,
*,
Expand Down
203 changes: 203 additions & 0 deletions sdk/python/feast/infra/offline_stores/hybrid_offline_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import pandas as pd
import pyarrow

from feast import FeatureView, RepoConfig
from feast.data_source import _DATA_SOURCE_FOR_OFFLINE_STORE, DataSource
from feast.errors import FeastOfflineStoreInvalidName
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import (
FeastConfigBaseModel,
get_offline_config_from_type,
get_offline_store_type,
)


class HybridOfflineStoreConfig(FeastConfigBaseModel):
type: str = "hybrid_offline_store.HybridOfflineStore"

class OfflineStoresWithConfig(FeastConfigBaseModel):
type: str
conf: Dict[str, Any]

offline_stores: Optional[List[OfflineStoresWithConfig]]


class HybridOfflineStore(OfflineStore):
_instance: Optional["HybridOfflineStore"] = None
_initialized: bool
offline_stores: Dict[str, OfflineStore]

def __new__(cls):
if cls._instance is None:
cls._instance = super(HybridOfflineStore, cls).__new__(cls)
cls._instance._initialized = False
cls._instance.offline_stores = {}
return cls._instance

def _initialize_offline_stores(self, config: RepoConfig):
if self._initialized:
return
for store_cfg in getattr(config.offline_store, "offline_stores", []):
try:
offline_store_type = get_offline_store_type(store_cfg.type)
config_cls = get_offline_config_from_type(store_cfg.type)
config_instance = config_cls(**store_cfg.conf)
store = get_offline_store_from_config(config_instance)
self.offline_stores[offline_store_type] = store
except FeastOfflineStoreInvalidName as e:
raise FeastOfflineStoreInvalidName(
f"Failed to initialize Hybrid offline store {store_cfg.type}: {e}"
)
self._initialized = True

def get_source_key_from_type(
self, source_type: DataSourceProto.SourceType.ValueType
) -> Optional[str]:
if source_type not in list(_DATA_SOURCE_FOR_OFFLINE_STORE.keys()):
raise ValueError(
f"Unsupported DataSource type for HybridOfflineStore: {source_type}."
f"Supported types are: {list(_DATA_SOURCE_FOR_OFFLINE_STORE.keys())}"
)
return _DATA_SOURCE_FOR_OFFLINE_STORE.get(source_type, None)

def _get_offline_store_for_feature_view(
self, feature_view: FeatureView, config: RepoConfig
) -> OfflineStore:
self._initialize_offline_stores(config)
source_type = feature_view.batch_source.source_type()
store_key = self.get_source_key_from_type(source_type)
if store_key is None:
raise ValueError(
f"Unsupported FeatureView batch_source type: {source_type}"
)
return self.offline_stores[store_key]

def _get_offline_store_for_source(
self, data_source: DataSource, config: RepoConfig
) -> OfflineStore:
self._initialize_offline_stores(config)
source_type = data_source.source_type()
store_key = self.get_source_key_from_type(source_type)
if store_key is None:
raise ValueError(f"Unsupported DataSource type: {source_type}")
return self.offline_stores[store_key]

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
store = HybridOfflineStore()._get_offline_store_for_feature_view(
feature_views[0], config
)
return store.get_historical_features(
config,
feature_views,
feature_refs,
entity_df,
registry,
project,
full_feature_names,
)

@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
store = HybridOfflineStore()._get_offline_store_for_source(data_source, config)
return store.pull_latest_from_table_or_query(
config,
data_source,
join_key_columns,
feature_name_columns,
timestamp_field,
created_timestamp_column,
start_date,
end_date,
)

@staticmethod
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
store = HybridOfflineStore()._get_offline_store_for_source(data_source, config)
return store.pull_all_from_table_or_query(
config,
data_source,
join_key_columns,
feature_name_columns,
timestamp_field,
created_timestamp_column,
start_date,
end_date,
)

@staticmethod
def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
raise NotImplementedError(
"HybridOfflineStore does not support write_logged_features. "
"Please use the specific offline store for logging."
)

@staticmethod
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
store = HybridOfflineStore()._get_offline_store_for_feature_view(
feature_view, config
)
return store.offline_write_batch(config, feature_view, table, progress)

def validate_data_source(
self,
config: RepoConfig,
data_source: DataSource,
):
store = self._get_offline_store_for_source(data_source, config)
return store.validate_data_source(config, data_source)

def get_table_column_names_and_types_from_data_source(
self,
config: RepoConfig,
data_source: DataSource,
) -> Iterable[Tuple[str, str]]:
store = self._get_offline_store_for_source(data_source, config)
return store.get_table_column_names_and_types_from_data_source(
config, data_source
)
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
class RedshiftSource(DataSource):
"""A RedshiftSource object defines a data source that a RedshiftOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_REDSHIFT

def __init__(
self,
*,
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ def get_table_column_names_and_types(
for column in metadata
]

def source_type(self) -> DataSourceProto.SourceType.ValueType:
"""
Returns the source type of this data source.
"""
return DataSourceProto.BATCH_SNOWFLAKE


snowflake_type_code_map = {
0: "NUMBER",
Expand Down
Loading
Loading