Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ message FeatureViewSpec {

// Whether these features should be written to the offline store
bool offline = 13;

repeated FeatureViewSpec source_views = 14;
}

message FeatureViewMeta {
Expand Down
19 changes: 10 additions & 9 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BatchFeatureView(FeatureView):
entities: List[str]
ttl: Optional[timedelta]
source: DataSource
sink_source: Optional[DataSource] = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just call it sink?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will update it

schema: List[Field]
entity_columns: List[Field]
features: List[Field]
Expand All @@ -65,7 +66,7 @@ class BatchFeatureView(FeatureView):
materialization_intervals: List[Tuple[datetime, datetime]]
udf: Optional[Callable[[Any], Any]]
udf_string: Optional[str]
feature_transformation: Transformation
feature_transformation: Optional[Transformation]
batch_engine: Optional[Field]
aggregations: Optional[List[Aggregation]]

Expand All @@ -74,7 +75,8 @@ def __init__(
*,
name: str,
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
source: DataSource,
source: Union[DataSource, "BatchFeatureView", List["BatchFeatureView"]],
sink_source: Optional[DataSource] = None,
entities: Optional[List[Entity]] = None,
ttl: Optional[timedelta] = None,
tags: Optional[Dict[str, str]] = None,
Expand All @@ -83,7 +85,7 @@ def __init__(
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
udf: Optional[Callable[[Any], Any]],
udf: Optional[Callable[[Any], Any]] = None,
udf_string: Optional[str] = "",
feature_transformation: Optional[Transformation] = None,
batch_engine: Optional[Field] = None,
Expand All @@ -96,7 +98,7 @@ def __init__(
RuntimeWarning,
)

if (
if isinstance(source, DataSource) and (
type(source).__name__ not in SUPPORTED_BATCH_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
Expand Down Expand Up @@ -124,14 +126,13 @@ def __init__(
description=description,
owner=owner,
schema=schema,
source=source,
source=source, # type: ignore[arg-type]
sink_source=sink_source,
)

def get_feature_transformation(self) -> Transformation:
def get_feature_transformation(self) -> Optional[Transformation]:
if not self.udf:
raise ValueError(
"Either a UDF or a feature transformation must be provided for BatchFeatureView"
)
return None
if self.mode in (
TransformationMode.PANDAS,
TransformationMode.PYTHON,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ def apply(
for fv in itertools.chain(
views_to_update, sfvs_to_update, odfvs_with_writes_to_update
):
if isinstance(fv, FeatureView):
if isinstance(fv, FeatureView) and fv.batch_source:
data_sources_set_to_update.add(fv.batch_source)
if hasattr(fv, "stream_source"):
if fv.stream_source:
Expand Down
99 changes: 70 additions & 29 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import copy
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Type
from typing import Dict, List, Optional, Tuple, Type, Union

from google.protobuf.duration_pb2 import Duration
from google.protobuf.message import Message
Expand Down Expand Up @@ -90,6 +90,7 @@ class FeatureView(BaseFeatureView):
ttl: Optional[timedelta]
batch_source: DataSource
stream_source: Optional[DataSource]
source_views: Optional[List["FeatureView"]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In __eq__ , I think we also need to compare compare source_views, else two FeatureViews with different source dependencies will be considered equal.

same for __copy__

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

entity_columns: List[Field]
features: List[Field]
online: bool
Expand All @@ -103,7 +104,8 @@ def __init__(
self,
*,
name: str,
source: DataSource,
source: Union[DataSource, "FeatureView", List["FeatureView"]],
sink_source: Optional[DataSource] = None,
schema: Optional[List[Field]] = None,
entities: Optional[List[Entity]] = None,
ttl: Optional[timedelta] = timedelta(days=0),
Expand Down Expand Up @@ -144,22 +146,45 @@ def __init__(
self.ttl = ttl
schema = schema or []

# Initialize data sources.
# Normalize source
self.stream_source = None
self.data_source: Optional[DataSource] = None
self.source_views: List[FeatureView] = []

if isinstance(source, DataSource):
self.data_source = source
elif isinstance(source, FeatureView):
self.source_views = [source]
elif isinstance(source, list) and all(
isinstance(sv, FeatureView) for sv in source
):
self.source_views = source
else:
raise TypeError(
"source must be a DataSource, a FeatureView, or a list of FeatureView."
)

# Set up stream, batch and derived view sources
if (
isinstance(source, PushSource)
or isinstance(source, KafkaSource)
or isinstance(source, KinesisSource)
isinstance(self.data_source, PushSource)
or isinstance(self.data_source, KafkaSource)
or isinstance(self.data_source, KinesisSource)
):
self.stream_source = source
if not source.batch_source:
# Stream source definition
self.stream_source = self.data_source
if not self.data_source.batch_source:
raise ValueError(
f"A batch_source needs to be specified for stream source `{source.name}`"
f"A batch_source needs to be specified for stream source `{self.data_source.name}`"
)
else:
self.batch_source = source.batch_source
self.batch_source = self.data_source.batch_source
elif self.data_source:
# Batch source definition
self.batch_source = self.data_source
else:
self.stream_source = None
self.batch_source = source
# Derived view source definition
if not sink_source:
raise ValueError("Derived FeatureView must specify `sink_source`.")
self.batch_source = sink_source

# Initialize features and entity columns.
features: List[Field] = []
Expand Down Expand Up @@ -201,25 +226,26 @@ def __init__(
)

# TODO(felixwang9817): Add more robust validation of features.
cols = [field.name for field in schema]
for col in cols:
if (
self.batch_source.field_mapping is not None
and col in self.batch_source.field_mapping.keys()
):
raise ValueError(
f"The field {col} is mapped to {self.batch_source.field_mapping[col]} for this data source. "
f"Please either remove this field mapping or use {self.batch_source.field_mapping[col]} as the "
f"Entity or Feature name."
)
if self.batch_source is not None:
cols = [field.name for field in schema]
for col in cols:
if (
self.batch_source.field_mapping is not None
and col in self.batch_source.field_mapping.keys()
):
raise ValueError(
f"The field {col} is mapped to {self.batch_source.field_mapping[col]} for this data source. "
f"Please either remove this field mapping or use {self.batch_source.field_mapping[col]} as the "
f"Entity or Feature name."
)

super().__init__(
name=name,
features=features,
description=description,
tags=tags,
owner=owner,
source=source,
source=self.batch_source,
)
self.online = online
self.offline = offline
Expand Down Expand Up @@ -348,13 +374,18 @@ def to_proto(self) -> FeatureViewProto:
meta = self.to_proto_meta()
ttl_duration = self.get_ttl_duration()

batch_source_proto = self.batch_source.to_proto()
batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}"
batch_source_proto = None
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()
batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}"

stream_source_proto = None
if self.stream_source:
stream_source_proto = self.stream_source.to_proto()
stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}"
source_view_protos = None
if self.source_views:
source_view_protos = [view.to_proto().spec for view in self.source_views]
spec = FeatureViewSpecProto(
name=self.name,
entities=self.entities,
Expand All @@ -368,6 +399,7 @@ def to_proto(self) -> FeatureViewProto:
offline=self.offline,
batch_source=batch_source_proto,
stream_source=stream_source_proto,
source_views=source_view_protos,
)

return FeatureViewProto(spec=spec, meta=meta)
Expand Down Expand Up @@ -403,12 +435,21 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
Returns:
A FeatureViewProto object based on the feature view protobuf.
"""
batch_source = DataSource.from_proto(feature_view_proto.spec.batch_source)
batch_source = (
DataSource.from_proto(feature_view_proto.spec.batch_source)
if feature_view_proto.spec.HasField("batch_source")
else None
)
stream_source = (
DataSource.from_proto(feature_view_proto.spec.stream_source)
if feature_view_proto.spec.HasField("stream_source")
else None
)
source_views = [
FeatureView.from_proto(FeatureViewProto(spec=view_spec, meta=None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_proto() method recursively calls itself for each source view without any depth limit. While cycle detection exists in FeatureResolver, cycle detection only runs when you use the compute engine, but proto deserialization happens much earlier during APIs/ registry loading.

We might need to handle this in FeatureView.from_proto().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we not need to store metadata for nested feature views ? meta=None ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need both cycle detection and de-duplication during serialization. It may not cause issue for few feature views but if there are many feature views, it could cause slowness.

A -> [B, C]
B -> [D, E]
C -> [D, E]

When serializing FeatureViewA = FeatureViewD and FeatureViewE get serialized twice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right make sense.
I don't have any meta data required for compute engine at the moment, what do you think something useful?

for view_spec in feature_view_proto.spec.source_views
]

feature_view = cls(
name=feature_view_proto.spec.name,
description=feature_view_proto.spec.description,
Expand All @@ -421,7 +462,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
if feature_view_proto.spec.ttl.ToNanoseconds() == 0
else feature_view_proto.spec.ttl.ToTimedelta()
),
source=batch_source,
source=batch_source if batch_source else source_views,
)
if stream_source:
feature_view.stream_source = stream_source
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def update_data_sources_with_inferred_event_timestamp_col(
) -> None:
ERROR_MSG_PREFIX = "Unable to infer DataSource timestamp_field"
for data_source in data_sources:
if data_source is None:
continue
if isinstance(data_source, RequestSource):
continue
if isinstance(data_source, PushSource):
Expand Down
Empty file.
43 changes: 43 additions & 0 deletions sdk/python/feast/infra/compute_engines/algorithms/topo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import List, Set

from feast.infra.compute_engines.dag.node import DAGNode


def topo_sort(root: DAGNode) -> List[DAGNode]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call it topological_sort?

"""
Topologically sort a DAG starting from a single root node.
Args:
root: The root DAGNode.
Returns:
A list of DAGNodes in topological order (dependencies first).
"""
return topo_sort_multiple([root])


def topo_sort_multiple(roots: List[DAGNode]) -> List[DAGNode]:
"""
Topologically sort a DAG with multiple roots.
Args:
roots: List of root DAGNodes.
Returns:
A list of all reachable DAGNodes in execution-safe order.
"""
visited: Set[int] = set()
ordered: List[DAGNode] = []

def dfs(node: DAGNode):
if id(node) in visited:
return
visited.add(id(node))
for input_node in node.inputs:
dfs(input_node)
ordered.append(node)

for root in roots:
dfs(root)

return ordered
46 changes: 2 additions & 44 deletions sdk/python/feast/infra/compute_engines/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence, Union
from typing import List, Sequence, Union

import pyarrow as pa

Expand All @@ -12,13 +12,12 @@
MaterializationTask,
)
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
from feast.infra.compute_engines.dag.context import ExecutionContext
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names


class ComputeEngine(ABC):
Expand Down Expand Up @@ -124,52 +123,11 @@ def get_execution_context(
if hasattr(task, "entity_df") and task.entity_df is not None:
entity_df = task.entity_df

column_info = self.get_column_info(registry, task)
return ExecutionContext(
project=task.project,
repo_config=self.repo_config,
offline_store=self.offline_store,
online_store=self.online_store,
entity_defs=entity_defs,
column_info=column_info,
entity_df=entity_df,
)

def get_column_info(
self,
registry: BaseRegistry,
task: Union[MaterializationTask, HistoricalRetrievalTask],
) -> ColumnInfo:
entities = []
for entity_name in task.feature_view.entities:
entities.append(registry.get_entity(entity_name, task.project))

join_keys, feature_cols, ts_col, created_ts_col = _get_column_names(
task.feature_view, entities
)
field_mapping = self.get_field_mapping(task.feature_view)

return ColumnInfo(
join_keys=join_keys,
feature_cols=feature_cols,
ts_col=ts_col,
created_ts_col=created_ts_col,
field_mapping=field_mapping,
)

def get_field_mapping(
self, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
) -> Optional[dict]:
"""
Get the field mapping for a feature view.
Args:
feature_view: The feature view to get the field mapping for.

Returns:
A dictionary mapping field names to column names.
"""
if feature_view.stream_source:
return feature_view.stream_source.field_mapping
if feature_view.batch_source:
return feature_view.batch_source.field_mapping
return None
Loading
Loading