Skip to content

Conversation

zerafachris
Copy link

@zerafachris zerafachris commented Jan 6, 2025

What this PR does / why we need it:

This is an attempt to introduce additional kafka settings to the Kafka stream as raised by #4894

Which issue(s) this PR fixes:

Not applicable.

Misc

Unfortunately, this is a partial solution and am looking for support on this.

I have made most of the necessary changes but unit tests are failing due to:

===================================================================== short test summary info ======================================================================
FAILED sdk/python/tests/unit/test_stream_feature_view.py::test_stream_feature_view_serialization - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/test_stream_feature_view.py::test_stream_feature_view_udfs - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/test_stream_feature_view.py::test_stream_feature_view_initialization_with_optional_fields_omitted - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_feature_view[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_feature_view_udf[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_source[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/test_data_sources.py::test_proto_conversion - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_source_from_repo - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_feature_view_with_inline_stream_source[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_feature_view_with_inline_stream_source_from_repo - AssertionError: stdout: b'No project found in the repository. Using project name test5578hi0q50 defined in feature_store.yaml\nApplying changes for project tes...
=============================================== 10 failed, 519 passed, 2 skipped, 558 warnings in 234.66s (0:03:54) ================================================

Any support is greatly appreciated

Signed-off-by: zerafachris PERSONAL <[email protected]>
Signed-off-by: zerafachris PERSONAL <[email protected]>
@zerafachris zerafachris requested a review from a team as a code owner January 6, 2025 21:11
@zerafachris
Copy link
Author

With commit "1ef5d0afb35e3dd2f9f8aaee6da0a28f88982a82" I was able to add the proto definition for kafka_settings as a StreamFormat. Not sure if this is correct of not for dict[str, str]

Signed-off-by: zerafachris PERSONAL <[email protected]>
@zerafachris zerafachris changed the title WIP: feat: Feature/kafkaoptions kwargs feat: Feature/kafkaoptions kwargs Jan 7, 2025
@franciscojavierarceo
Copy link
Member

I'll try to pull this down on Friday, sorry traveling for work.

@franciscojavierarceo
Copy link
Member

Hey @zerafachris mind taking a look? did you still need this? I was planning on cutting a release for it and wanted to make sure we get this in if needed.

Signed-off-by: zerafachris PERSONAL <[email protected]>
@zerafachris
Copy link
Author

zerafachris commented Feb 4, 2025

Hi @franciscojavierarceo, I resolved the conflicts. Ideally, yes, I would be keen to use this.

Unfortunately, I am still unable to fix the unit tests. The errors are:

===================================================================== short test summary info ======================================================================
FAILED sdk/python/tests/unit/test_stream_feature_view.py::test_stream_feature_view_serialization - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/test_stream_feature_view.py::test_stream_feature_view_udfs - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/test_stream_feature_view.py::test_stream_feature_view_initialization_with_optional_fields_omitted - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_feature_view[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_feature_view_udf[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_source[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_stream_source_from_repo - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/test_data_sources.py::test_proto_conversion - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/online_store/test_online_retrieval.py::test_get_online_features_milvus - feast.errors.FeastModuleImportError: Could not import module 'feast.infra.online_stores.milvus_online_store.milvus' while attempting to load class 'MilvusOnlin...
FAILED sdk/python/tests/unit/online_store/test_online_retrieval.py::test_milvus_lite_get_online_documents_v2 - feast.errors.FeastModuleImportError: Could not import module 'feast.infra.online_stores.milvus_online_store.milvus' while attempting to load class 'MilvusOnlin...
FAILED sdk/python/tests/unit/online_store/test_online_retrieval.py::test_milvus_native_from_feast_data - ModuleNotFoundError: No module named 'pymilvus'
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_feature_view_with_inline_stream_source[feature_store_with_local_registry] - typeguard.TypeCheckError: argument "data_source" (feast.core.DataFormat_pb2.StreamFormat) is not an instance of feast.core.DataSource_pb2.DataSource
FAILED sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py::test_apply_feature_view_with_inline_stream_source_from_repo - AssertionError: stdout: b'No project found in the repository. Using project name testht2m82xvdq defined in feature_store.yaml\nApplying changes for project tes...
=============================================== 13 failed, 520 passed, 3 skipped, 556 warnings in 205.98s (0:03:25) ================================================

@franciscojavierarceo
Copy link
Member

I'll take a look at this today 👍

@ntkathole
Copy link
Member

@zerafachris can you please sign the commit so that DCO check pass on PR and also rebase it ? I can help with the unit tests failures.

Copy link
Contributor

@jyejare jyejare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK pending comments.

def watermark_delay_threshold(self) -> google.protobuf.duration_pb2.Duration:
"""Watermark delay threshold for stream data"""
@property
def kafka_settings(self) -> feast.core.DataFormat_pb2.StreamFormat:
Copy link
Contributor

@jyejare jyejare Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit tests are failing with the kafka_settings being of type feast.core.DataFormat_pb2.StreamFormat. The from_proto function here expects the kafka_settings (and as a data source config ) of type feast.core.DataFormat_pb2.DataSource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the code is rebased, the link mentioned here may point to a different code.

So being particular here:
The here link points to a kafka_settings parameter of the from_proto function in the module sdk/python/feast/data_source.py and class KafkaSource.

Copy link

stale bot commented Jul 19, 2025

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label Jul 19, 2025
@stale stale bot removed the wontfix This will not be worked on label Sep 29, 2025
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces support for additional Kafka settings to the KafkaSource data source as requested in issue #4894. The implementation adds a new kafka_settings parameter to allow users to pass additional configuration options to Kafka streams.

  • Add kafka_settings parameter to KafkaOptions and KafkaSource classes
  • Update protobuf definitions to include kafka_settings field
  • Modify Spark Kafka processor to apply additional Kafka settings when creating streams

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
sdk/python/feast/protos/feast/core/DataSource_pb2.pyi Add type definitions for new kafka_settings field
sdk/python/feast/protos/feast/core/DataSource_pb2.py Update protobuf compiled Python code with kafka_settings field
sdk/python/feast/infra/contrib/spark_kafka_processor.py Apply kafka_settings options to Spark streaming and refactor format handling
sdk/python/feast/data_source.py Add kafka_settings parameter to KafkaOptions and KafkaSource classes
protos/feast/core/DataSource.proto Add kafka_settings field to KafkaOptions message definition

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +228 to +229
def kafka_settings(self) -> feast.core.DataFormat_pb2.StreamFormat:
"""kafka_settings field"""
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type for kafka_settings is incorrect. It should be a dictionary-like type for key-value pairs, not StreamFormat.

Copilot uses AI. Check for mistakes.

topic: builtins.str = ...,
message_format: feast.core.DataFormat_pb2.StreamFormat | None = ...,
watermark_delay_threshold: google.protobuf.duration_pb2.Duration | None = ...,
kafka_settings: feast.core.DataFormat_pb2.StreamFormat | None = ...,
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type annotation for kafka_settings parameter is incorrect. It should be a dictionary-like type for key-value pairs, not StreamFormat.

Copilot uses AI. Check for mistakes.

warnings.warn(
(
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "
"The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. "
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use American English spelling 'favor' instead of British English 'favour' for consistency with the codebase.

Suggested change
"The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. "
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "

Copilot uses AI. Check for mistakes.

if data_source.batch_source
else None
),
kafka_settings=(DataSource.from_proto(data_source.kafka_options.kafka_settings)
Copy link
Preview

Copilot AI Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect method call. kafka_settings should be used directly as a dictionary, not converted using DataSource.from_proto() which expects a DataSource protobuf message.

Suggested change
kafka_settings=(DataSource.from_proto(data_source.kafka_options.kafka_settings)
kafka_settings=(dict(data_source.kafka_options.kafka_settings)

Copilot uses AI. Check for mistakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants