Skip to content

Commit 2ec036b

Browse files
Update documentation (#1055)
Co-authored-by: gwaramadze <[email protected]>
1 parent 6f91f2b commit 2ec036b

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed

docs/api-reference/quixstreams.md

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7974,6 +7974,151 @@ Implements retry logic to handle concurrent write conflicts.
79747974

79757975
- `batch`: The batch of data to write.
79767976

7977+
<a id="quixstreams.sinks.community.kafka"></a>
7978+
7979+
## quixstreams.sinks.community.kafka
7980+
7981+
<a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink"></a>
7982+
7983+
### KafkaReplicatorSink
7984+
7985+
```python
7986+
class KafkaReplicatorSink(BaseSink)
7987+
```
7988+
7989+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L22)
7990+
7991+
A sink that produces data to an external Kafka cluster.
7992+
7993+
This sink uses the same serialization approach as the Quix Application.
7994+
7995+
Example Snippet:
7996+
7997+
```python
7998+
from quixstreams import Application
7999+
from quixstreams.sinks.community.kafka import KafkaReplicatorSink
8000+
8001+
app = Application(
8002+
consumer_group="group",
8003+
)
8004+
8005+
topic = app.topic("input-topic")
8006+
8007+
# Define the external Kafka cluster configuration
8008+
kafka_sink = KafkaReplicatorSink(
8009+
broker_address="external-kafka:9092",
8010+
topic_name="output-topic",
8011+
value_serializer="json",
8012+
key_serializer="bytes",
8013+
)
8014+
8015+
sdf = app.dataframe(topic=topic)
8016+
sdf.sink(kafka_sink)
8017+
8018+
app.run()
8019+
```
8020+
8021+
<a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.__init__"></a>
8022+
8023+
#### KafkaReplicatorSink.\_\_init\_\_
8024+
8025+
```python
8026+
def __init__(
8027+
broker_address: Union[str, ConnectionConfig],
8028+
topic_name: str,
8029+
value_serializer: SerializerType = "json",
8030+
key_serializer: SerializerType = "bytes",
8031+
producer_extra_config: Optional[dict] = None,
8032+
flush_timeout: float = 10.0,
8033+
origin_topic: Optional[Topic] = None,
8034+
auto_create_sink_topic: bool = True,
8035+
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
8036+
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None
8037+
) -> None
8038+
```
8039+
8040+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L55)
8041+
8042+
**Arguments**:
8043+
8044+
- `broker_address`: The connection settings for the external Kafka cluster.
8045+
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
8046+
or a ConnectionConfig object if authentication is required.
8047+
- `topic_name`: The topic name to produce to on the external Kafka cluster.
8048+
- `value_serializer`: The serializer type for values.
8049+
Default - `json`.
8050+
- `key_serializer`: The serializer type for keys.
8051+
Default - `bytes`.
8052+
- `producer_extra_config`: A dictionary with additional options that
8053+
will be passed to `confluent_kafka.Producer` as is.
8054+
Default - `None`.
8055+
- `flush_timeout`: The time in seconds the producer waits for all messages
8056+
to be delivered during flush.
8057+
Default - 10.0.
8058+
- `origin_topic`: If auto-creating the sink topic, can optionally pass the
8059+
source topic to use its configuration.
8060+
- `auto_create_sink_topic`: Whether to try to create the sink topic upon startup
8061+
Default - True
8062+
- `on_client_connect_success`: An optional callback made after successful
8063+
client authentication, primarily for additional logging.
8064+
- `on_client_connect_failure`: An optional callback made after failed
8065+
client authentication (which should raise an Exception).
8066+
Callback should accept the raised Exception as an argument.
8067+
Callback must resolve (or propagate/re-raise) the Exception.
8068+
8069+
<a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.setup"></a>
8070+
8071+
#### KafkaReplicatorSink.setup
8072+
8073+
```python
8074+
def setup()
8075+
```
8076+
8077+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L111)
8078+
8079+
Initialize the InternalProducer and Topic for serialization.
8080+
8081+
<a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.add"></a>
8082+
8083+
#### KafkaReplicatorSink.add
8084+
8085+
```python
8086+
def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples,
8087+
topic: str, partition: int, offset: int) -> None
8088+
```
8089+
8090+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L146)
8091+
8092+
Add a message to be produced to the external Kafka cluster.
8093+
8094+
This method converts the provided data into a Row object and uses
8095+
the InternalProducer to serialize and produce it.
8096+
8097+
**Arguments**:
8098+
8099+
- `value`: The message value.
8100+
- `key`: The message key.
8101+
- `timestamp`: The message timestamp in milliseconds.
8102+
- `headers`: The message headers.
8103+
- `topic`: The source topic name.
8104+
- `partition`: The source partition.
8105+
- `offset`: The source offset.
8106+
8107+
<a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.flush"></a>
8108+
8109+
#### KafkaReplicatorSink.flush
8110+
8111+
```python
8112+
def flush() -> None
8113+
```
8114+
8115+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L190)
8116+
8117+
Flush the producer to ensure all messages are delivered.
8118+
8119+
This method is triggered by the Checkpoint class when it commits.
8120+
If flush fails, the checkpoint will be aborted.
8121+
79778122
<a id="quixstreams.sinks.community.pubsub"></a>
79788123

79798124
## quixstreams.sinks.community.pubsub

0 commit comments

Comments
 (0)