Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message DataSource {

// Watermark delay threshold for stream data
google.protobuf.Duration watermark_delay_threshold = 4;

}

// Defines options for DataSource that sources features from Kinesis records.
Expand Down Expand Up @@ -271,4 +272,4 @@ message DataSource {

message DataSourceList {
repeated DataSource datasources = 1;
}
}
14 changes: 13 additions & 1 deletion sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ def __init__(
message_format: StreamFormat,
topic: str,
watermark_delay_threshold: Optional[timedelta] = None,
kafka_settings: Optional[Dict[str, str]] = None,
):
self.kafka_bootstrap_servers = kafka_bootstrap_servers
self.message_format = message_format
self.topic = topic
self.watermark_delay_threshold = watermark_delay_threshold or None
self.kafka_settings = kafka_settings or None

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand All @@ -65,11 +67,15 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
if kafka_options_proto.watermark_delay_threshold.ToNanoseconds() == 0
else kafka_options_proto.watermark_delay_threshold.ToTimedelta()
)

kafka_options = cls(
kafka_bootstrap_servers=kafka_options_proto.kafka_bootstrap_servers,
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
topic=kafka_options_proto.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=kafka_options_proto.kafka_settings
if kafka_options_proto.HasField("kafka_settings")
else None,
)

return kafka_options
Expand All @@ -91,6 +97,7 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
message_format=self.message_format.to_proto(),
topic=self.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=self.kafka_settings,
)

return kafka_options_proto
Expand Down Expand Up @@ -361,6 +368,7 @@ def __init__(
owner: Optional[str] = "",
batch_source: Optional[DataSource] = None,
watermark_delay_threshold: Optional[timedelta] = None,
kafka_settings: Optional[Dict[str, str]] = None
):
"""
Creates a KafkaSource object.
Expand All @@ -384,11 +392,12 @@ def __init__(
batch_source (optional): The datasource that acts as a batch source.
watermark_delay_threshold (optional): The watermark delay threshold for stream data.
Specifically how late stream data can arrive without being discarded.
kafka_settings (optional): Optional kafka settings
"""
if bootstrap_servers:
warnings.warn(
(
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "
"The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. "
Copy link

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use American English spelling 'favor' instead of British English 'favour' for consistency with the codebase.

Suggested change
"The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. "
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "

Copilot uses AI. Check for mistakes.

"Feast 0.25 and onwards will not support the 'bootstrap_servers' parameter."
),
DeprecationWarning,
Expand All @@ -413,6 +422,7 @@ def __init__(
message_format=message_format,
topic=topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=kafka_settings
)

def __eq__(self, other):
Expand Down Expand Up @@ -468,6 +478,8 @@ def from_proto(data_source: DataSourceProto):
if data_source.batch_source
else None
),
kafka_settings=(DataSource.from_proto(data_source.kafka_options.kafka_settings)
Copy link

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect method call. kafka_settings should be used directly as a dictionary, not converted using DataSource.from_proto() which expects a DataSource protobuf message.

Suggested change
kafka_settings=(DataSource.from_proto(data_source.kafka_options.kafka_settings)
kafka_settings=(dict(data_source.kafka_options.kafka_settings)

Copilot uses AI. Check for mistakes.

if data_source.kafka_options.kafka_settings else None )
)

def to_proto(self) -> DataSourceProto:
Expand Down
69 changes: 33 additions & 36 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,52 +80,49 @@ def ingest_stream_feature_view(
@no_type_check
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
)

if self.data_source.kafka_options.kafka_settings is not None:
for k, v in self.data_source.kafka_options.kafka_settings.items():
stream_df = stream_df.option(k, v)

stream_df = stream_df.load().selectExpr("CAST(value AS STRING)")

if self.format == "json":
if not isinstance(
self.data_source.kafka_options.message_format, JsonFormat
):
raise ValueError("kafka source message format is not jsonformat")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
raise ValueError("kafka source message format is not json format")
stream_df = stream_df.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
else:
elif self.format == "avro":
if not isinstance(
self.data_source.kafka_options.message_format, AvroFormat
):
raise ValueError("kafka source message format is not avro format")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
stream_df = stream_df.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
else:
raise ValueError("kafka source message format is not currently supported")

stream_df = stream_df.select("table.*")

return stream_df

def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
Expand Down
Loading
Loading