Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/reference/data-sources/push.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ Push sources allow feature values to be pushed to the online store and offline s

Push sources can be used by multiple feature views. When data is pushed to a push source, Feast propagates the feature values to all the consuming feature views.

Push sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for pushing data to a batch data source such as a data warehouse table. When using a push source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.
Push sources can optionally have a batch_source specified. If provided, it enables retrieval of historical features and supports materialization from the offline store to the online store. However, if your features are generated post-training or are only needed online (e.g., embeddings), you can omit the batch_source.

When a batch_source is used, users are responsible for ensuring that data is also pushed to a batch data source, such as a data warehouse. Note that when a push source is used as a stream source in a feature view definition, a batch_source does not need to be explicitly specified in the feature view itself.

## Stream sources

Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:

1. Raw events come in (stream 1)
Expand All @@ -20,7 +23,9 @@ Streaming data sources are important sources of feature values. A typical setup
Feast allows users to push features previously registered in a feature view to the online store for fresher features. It also allows users to push batches of stream data to the offline store by specifying that the push be directed to the offline store. This will push the data to the offline store declared in the repository configuration used to initialize the feature store.

## Example (basic)

### Defining a push source

Note that the push schema needs to also include the entity.

```python
Expand All @@ -43,7 +48,9 @@ fv = FeatureView(
```

### Pushing data

Note that the `to` parameter is optional and defaults to online but we can specify these options: `PushMode.ONLINE`, `PushMode.OFFLINE`, or `PushMode.ONLINE_AND_OFFLINE`.

```python
from feast import FeatureStore
import pandas as pd
Expand Down
19 changes: 10 additions & 9 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,13 +764,13 @@ class PushSource(DataSource):

# TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store
# TODO(adchia): consider a "mode" to support pushing raw vs transformed events
batch_source: DataSource
batch_source: Optional[DataSource] = None

def __init__(
self,
*,
name: str,
batch_source: DataSource,
batch_source: Optional[DataSource] = None,
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
Expand Down Expand Up @@ -815,8 +815,11 @@ def get_table_column_names_and_types(

@staticmethod
def from_proto(data_source: DataSourceProto):
assert data_source.HasField("batch_source")
batch_source = DataSource.from_proto(data_source.batch_source)
batch_source = (
DataSource.from_proto(data_source.batch_source)
if data_source.HasField("batch_source")
else None
)

return PushSource(
name=data_source.name,
Expand All @@ -827,19 +830,17 @@ def from_proto(data_source: DataSourceProto):
)

def to_proto(self) -> DataSourceProto:
batch_source_proto = None
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.PUSH_SOURCE,
description=self.description,
tags=self.tags,
owner=self.owner,
batch_source=batch_source_proto,
)

if self.batch_source:
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())

return data_source_proto

def get_table_query_string(self) -> str:
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ def update_data_sources_with_inferred_event_timestamp_col(
if isinstance(data_source, RequestSource):
continue
if isinstance(data_source, PushSource):
data_source = data_source.batch_source
if not isinstance(data_source.batch_source, DataSource):
continue
else:
data_source = data_source.batch_source
if data_source.timestamp_field is None or data_source.timestamp_field == "":
# prepare right match pattern for data source
ts_column_type_regex_pattern: str
Expand Down
16 changes: 16 additions & 0 deletions sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ def test_push_with_batch():
assert push_source.batch_source.name == push_source_unproto.batch_source.name


def test_push_source_without_batch_source():
# Create PushSource with no batch_source
push_source = PushSource(name="test_push_source")

# Convert to proto
push_source_proto = push_source.to_proto()

# Assert batch_source is not present in proto
assert not push_source_proto.HasField("batch_source")

# Deserialize and check again
push_source_unproto = PushSource.from_proto(push_source_proto)
assert push_source_unproto.batch_source is None
assert push_source_unproto.name == "test_push_source"


def test_request_source_primitive_type_to_proto():
schema = [
Field(name="f1", dtype=Float32),
Expand Down
Loading