Skip to content

[Data] Add Bounded Kafka Datasource #58653

@owenowenisme

Description

@owenowenisme

Description

Overview

Implemented bounded read for Kafka datasource.

To make things simple first we only support offset-range based read but we will add timestamp-range shortly.

API

def read_kafka(
    topics: Union[str, List[str]],
    *,
    bootstrap_servers: Union[str, List[str]],
    trigger: Literal["once"] = "once",
    start_offset: Optional[Union[int, str]] = "earliest",
    end_offset: Optional[Union[int, str]] = "latest",
    authentication: Optional[Dict[str, Any]] = None,
    parallelism: int = -1,
    num_cpus: Optional[float] = None,
    num_gpus: Optional[float] = None,
    memory: Optional[float] = None,
    ray_remote_args: Optional[Dict[str, Any]] = None,
    override_num_blocks: Optional[int] = None,
    timeout_ms: int = 10000,
) -> Dataset:

Bounded Read Support

  • Reads messages between configurable start and end offsets
  • Supports multiple offset specification formats:
    • Integer offsets: Direct offset numbers (e.g., start_offset=100)
    • Named positions: "earliest" or "latest"
    • String numbers: Offset as strings (e.g., "100")
  • Default behavior: reads from "earliest" to "latest" if not specified

Multi-Topic and Multi-Partition Support

  • Can read from single or multiple topics simultaneously
  • Automatically discovers all partitions for each topic
  • Creates one read task per partition.
  • Each task reads independently from its assigned partition

Timeout

If a task reaches the timeout before hitting the end_offset, it stops and returns the data read so far.

We can specify this with timeout_ms.

Schema

Each record includes complete Kafka message metadata:

{
    "offset": int64,           # Message offset within partition
    "key": string,             # Message key (UTF-8 decoded)
    "value": bytes,           # Message value
    "topic": string,           # Topic name
    "partition": int32,        # Partition ID
    "timestamp": int64,        # Message timestamp (milliseconds)
    "timestamp_type": int32,   # 0=CreateTime, 1=LogAppendTime
    "headers": map<string, bytes>  # Message headers
}

Currently we make the value as bytes, but we can add custom value deserializer later.

Others

  • We do not commit offset when reading since its bounded read for now

Use case

No response

Metadata

Metadata

Assignees

Labels

P1Issue that should be fixed within a few weeksdataRay Data-related issues

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions