Skip to content

Commit 9a4bd63

Browse files
authored
fix: Add PushSource proto and Python class (#2428)
* fix: Add PushSource proto and Python class Signed-off-by: Achal Shah <[email protected]> * tests Signed-off-by: Achal Shah <[email protected]> * remove pdb Signed-off-by: Achal Shah <[email protected]> * cr Signed-off-by: Achal Shah <[email protected]> * cr Signed-off-by: Achal Shah <[email protected]> * cr Signed-off-by: Achal Shah <[email protected]>
1 parent 3c41f94 commit 9a4bd63

File tree

3 files changed

+120
-3
lines changed

3 files changed

+120
-3
lines changed

protos/feast/core/DataSource.proto

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import "feast/core/DataFormat.proto";
2626
import "feast/types/Value.proto";
2727

2828
// Defines a Data Source that can be used source Feature data
29-
// Next available id: 22
29+
// Next available id: 23
3030
message DataSource {
3131
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
3232
// but they are going to be reserved for backwards compatibility.
3333
reserved 6 to 10;
3434

3535
// Type of Data Source.
36-
// Next available id: 9
36+
// Next available id: 10
3737
enum SourceType {
3838
INVALID = 0;
3939
BATCH_FILE = 1;
@@ -44,7 +44,7 @@ message DataSource {
4444
STREAM_KINESIS = 4;
4545
CUSTOM_SOURCE = 6;
4646
REQUEST_SOURCE = 7;
47-
47+
PUSH_SOURCE = 9;
4848
}
4949

5050
// Unique name of data source within the project
@@ -169,6 +169,16 @@ message DataSource {
169169
map<string, feast.types.ValueType.Enum> schema = 2;
170170
}
171171

172+
// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
173+
// the online store on-demand, such as by stream consumers.
174+
message PushOptions {
175+
// Mapping of feature name to type
176+
map<string, feast.types.ValueType.Enum> schema = 1;
177+
// Optional batch source for the push source for historical features and materialization.
178+
DataSource batch_source = 2;
179+
}
180+
181+
172182
// DataSource options.
173183
oneof options {
174184
FileOptions file_options = 11;
@@ -179,5 +189,6 @@ message DataSource {
179189
RequestDataOptions request_data_options = 18;
180190
CustomSourceOptions custom_options = 16;
181191
SnowflakeOptions snowflake_options = 19;
192+
PushOptions push_options = 22;
182193
}
183194
}

sdk/python/feast/data_source.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,3 +522,75 @@ def to_proto(self) -> DataSourceProto:
522522
data_source_proto.date_partition_column = self.date_partition_column
523523

524524
return data_source_proto
525+
526+
527+
class PushSource(DataSource):
528+
"""
529+
PushSource that can be used to ingest features on request
530+
531+
Args:
532+
name: Name of the push source
533+
schema: Schema mapping from the input feature name to a ValueType
534+
"""
535+
536+
name: str
537+
schema: Dict[str, ValueType]
538+
batch_source: Optional[DataSource]
539+
540+
def __init__(
541+
self,
542+
name: str,
543+
schema: Dict[str, ValueType],
544+
batch_source: Optional[DataSource] = None,
545+
):
546+
"""Creates a PushSource object."""
547+
super().__init__(name)
548+
self.schema = schema
549+
self.batch_source = batch_source
550+
551+
def validate(self, config: RepoConfig):
552+
pass
553+
554+
def get_table_column_names_and_types(
555+
self, config: RepoConfig
556+
) -> Iterable[Tuple[str, str]]:
557+
pass
558+
559+
@staticmethod
560+
def from_proto(data_source: DataSourceProto):
561+
schema_pb = data_source.push_options.schema
562+
schema = {}
563+
for key, value in schema_pb.items():
564+
schema[key] = value
565+
566+
batch_source = None
567+
if data_source.push_options.HasField("batch_source"):
568+
batch_source = DataSource.from_proto(data_source.push_options.batch_source)
569+
570+
return PushSource(
571+
name=data_source.name, schema=schema, batch_source=batch_source
572+
)
573+
574+
def to_proto(self) -> DataSourceProto:
575+
schema_pb = {}
576+
for key, value in self.schema.items():
577+
schema_pb[key] = value
578+
batch_source_proto = None
579+
if self.batch_source:
580+
batch_source_proto = self.batch_source.to_proto()
581+
582+
options = DataSourceProto.PushOptions(
583+
schema=schema_pb, batch_source=batch_source_proto
584+
)
585+
data_source_proto = DataSourceProto(
586+
name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options,
587+
)
588+
589+
return data_source_proto
590+
591+
def get_table_query_string(self) -> str:
592+
raise NotImplementedError
593+
594+
@staticmethod
595+
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
596+
raise NotImplementedError
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from feast.data_source import PushSource
2+
from feast.infra.offline_stores.bigquery_source import BigQuerySource
3+
from feast.protos.feast.types.Value_pb2 import ValueType
4+
5+
6+
def test_push_no_batch():
7+
push_source = PushSource(
8+
name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64}
9+
)
10+
push_source_proto = push_source.to_proto()
11+
assert push_source_proto.push_options is not None
12+
assert not push_source_proto.push_options.HasField("batch_source")
13+
push_source_unproto = PushSource.from_proto(push_source_proto)
14+
15+
assert push_source.name == push_source_unproto.name
16+
assert push_source.schema == push_source_unproto.schema
17+
assert push_source.batch_source == push_source_unproto.batch_source
18+
19+
20+
def test_push_with_batch():
21+
push_source = PushSource(
22+
name="test",
23+
schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64},
24+
batch_source=BigQuerySource(table="test.test"),
25+
)
26+
push_source_proto = push_source.to_proto()
27+
assert push_source_proto.push_options is not None
28+
assert push_source_proto.push_options.HasField("batch_source")
29+
30+
push_source_unproto = PushSource.from_proto(push_source_proto)
31+
32+
assert push_source.name == push_source_unproto.name
33+
assert push_source.schema == push_source_unproto.schema
34+
assert push_source.batch_source.name == push_source_unproto.batch_source.name

0 commit comments

Comments
 (0)