Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ message DataSource {

// Watermark delay threshold for stream data
google.protobuf.Duration watermark_delay_threshold = 4;

}

// Defines options for DataSource that sources features from Kinesis records.
Expand Down Expand Up @@ -276,4 +277,4 @@ message DataSource {

message DataSourceList {
repeated DataSource datasources = 1;
}
}
14 changes: 13 additions & 1 deletion sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ def __init__(
message_format: StreamFormat,
topic: str,
watermark_delay_threshold: Optional[timedelta] = None,
kafka_settings: Optional[Dict[str, str]] = None,
):
self.kafka_bootstrap_servers = kafka_bootstrap_servers
self.message_format = message_format
self.topic = topic
self.watermark_delay_threshold = watermark_delay_threshold or None
self.kafka_settings = kafka_settings or None

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand All @@ -66,11 +68,15 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
if kafka_options_proto.watermark_delay_threshold.ToNanoseconds() == 0
else kafka_options_proto.watermark_delay_threshold.ToTimedelta()
)

kafka_options = cls(
kafka_bootstrap_servers=kafka_options_proto.kafka_bootstrap_servers,
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
topic=kafka_options_proto.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=kafka_options_proto.kafka_settings
if kafka_options_proto.HasField("kafka_settings")
else None,
)

return kafka_options
Expand All @@ -92,6 +98,7 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
message_format=self.message_format.to_proto(),
topic=self.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=self.kafka_settings,
)

return kafka_options_proto
Expand Down Expand Up @@ -435,6 +442,7 @@ def __init__(
owner: Optional[str] = "",
batch_source: Optional[DataSource] = None,
watermark_delay_threshold: Optional[timedelta] = None,
kafka_settings: Optional[Dict[str, str]] = None
):
"""
Creates a KafkaSource object.
Expand All @@ -458,11 +466,12 @@ def __init__(
batch_source (optional): The datasource that acts as a batch source.
watermark_delay_threshold (optional): The watermark delay threshold for stream data.
Specifically how late stream data can arrive without being discarded.
kafka_settings (optional): Optional kafka settings
"""
if bootstrap_servers:
warnings.warn(
(
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "
"The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. "
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use American English spelling 'favor' instead of British English 'favour' for consistency with the codebase.

Suggested change
"The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. "
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "

Copilot uses AI. Check for mistakes.

"Feast 0.25 and onwards will not support the 'bootstrap_servers' parameter."
),
DeprecationWarning,
Expand All @@ -487,6 +496,7 @@ def __init__(
message_format=message_format,
topic=topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=kafka_settings
)

def __eq__(self, other):
Expand Down Expand Up @@ -542,6 +552,8 @@ def from_proto(data_source: DataSourceProto):
if data_source.batch_source
else None
),
kafka_settings=(DataSource.from_proto(data_source.kafka_options.kafka_settings)
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect method call. kafka_settings should be used directly as a dictionary, not converted using DataSource.from_proto() which expects a DataSource protobuf message.

Suggested change
kafka_settings=(DataSource.from_proto(data_source.kafka_options.kafka_settings)
kafka_settings=(dict(data_source.kafka_options.kafka_settings)

Copilot uses AI. Check for mistakes.

if data_source.kafka_options.kafka_settings else None )
)

def _to_proto_impl(self) -> DataSourceProto:
Expand Down
69 changes: 33 additions & 36 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,52 +80,49 @@ def ingest_stream_feature_view(
@no_type_check
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
)

if self.data_source.kafka_options.kafka_settings is not None:
for k, v in self.data_source.kafka_options.kafka_settings.items():
stream_df = stream_df.option(k, v)

stream_df = stream_df.load().selectExpr("CAST(value AS STRING)")

if self.format == "json":
if not isinstance(
self.data_source.kafka_options.message_format, JsonFormat
):
raise ValueError("kafka source message format is not jsonformat")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
raise ValueError("kafka source message format is not json format")
stream_df = stream_df.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
else:
elif self.format == "avro":
if not isinstance(
self.data_source.kafka_options.message_format, AvroFormat
):
raise ValueError("kafka source message format is not avro format")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
stream_df = stream_df.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
else:
raise ValueError("kafka source message format is not currently supported")

stream_df = stream_df.select("table.*")

return stream_df

def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
Expand Down
9 changes: 7 additions & 2 deletions sdk/python/feast/protos/feast/core/DataSource_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class DataSource(google.protobuf.message.Message):
TOPIC_FIELD_NUMBER: builtins.int
MESSAGE_FORMAT_FIELD_NUMBER: builtins.int
WATERMARK_DELAY_THRESHOLD_FIELD_NUMBER: builtins.int
KAFKA_SETTINGS_FIELD_NUMBER: builtins.int
kafka_bootstrap_servers: builtins.str
"""Comma separated list of Kafka bootstrap servers. Used for feature tables without a defined source host[:port]]"""
topic: builtins.str
Expand All @@ -231,16 +232,20 @@ class DataSource(google.protobuf.message.Message):
@property
def watermark_delay_threshold(self) -> google.protobuf.duration_pb2.Duration:
"""Watermark delay threshold for stream data"""
@property
def kafka_settings(self) -> feast.core.DataFormat_pb2.StreamFormat:
Copy link
Contributor

@jyejare jyejare Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit tests are failing with the kafka_settings being of type feast.core.DataFormat_pb2.StreamFormat. The from_proto function here expects the kafka_settings (and as a data source config ) of type feast.core.DataFormat_pb2.DataSource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the code is rebased, the link mentioned here may point to a different code.

So being particular here:
The here link points to a kafka_settings parameter of the from_proto function in the module sdk/python/feast/data_source.py and class KafkaSource.

"""kafka_settings field"""
Comment on lines +236 to +237
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type for kafka_settings is incorrect. It should be a dictionary-like type for key-value pairs, not StreamFormat.

Copilot uses AI. Check for mistakes.

def __init__(
self,
*,
kafka_bootstrap_servers: builtins.str = ...,
topic: builtins.str = ...,
message_format: feast.core.DataFormat_pb2.StreamFormat | None = ...,
watermark_delay_threshold: google.protobuf.duration_pb2.Duration | None = ...,
kafka_settings: feast.core.DataFormat_pb2.StreamFormat | None = ...,
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type annotation for kafka_settings parameter is incorrect. It should be a dictionary-like type for key-value pairs, not StreamFormat.

Copilot uses AI. Check for mistakes.

) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["message_format", b"message_format", "watermark_delay_threshold", b"watermark_delay_threshold"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["kafka_bootstrap_servers", b"kafka_bootstrap_servers", "message_format", b"message_format", "topic", b"topic", "watermark_delay_threshold", b"watermark_delay_threshold"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["kafka_settings", b"kafka_settings", "message_format", b"message_format", "watermark_delay_threshold", b"watermark_delay_threshold"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["kafka_bootstrap_servers", b"kafka_bootstrap_servers", "kafka_settings", b"kafka_settings", "message_format", b"message_format", "topic", b"topic", "watermark_delay_threshold", b"watermark_delay_threshold"]) -> None: ...

class KinesisOptions(google.protobuf.message.Message):
"""Defines options for DataSource that sources features from Kinesis records.
Expand Down
Loading