Skip to content

Conversation

@RatulDawar
Copy link
Contributor

@RatulDawar RatulDawar commented Jun 7, 2025

Adding connector for Apache Iggy.
We tried to keep the connector similar to apache kafka.
Currently supporting http transfer protocol and unstructed json data type.
Thanks to @Vinamra7 for helping contribute the source connector.
This refers the issue #818

Copy link
Member

@mwylde mwylde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the long delay on the review, and thanks for the contribution! This is a good start that needs a little more work before we'll be able to merge it.

In addition to the changes requested in the PR, it will also need state handling in the source. In order to provide at least-once semantics, arroyo needs to manage it own offset tracking as part of its state. Otherwise, if we're relying on auto-comitting, we might drop messages on restart. Look at how this is implemented in the Kafka connector.

if (format == 'json') {
schemaTypeOptions.push({ name: 'Unstructured JSON', value: 'unstructured' });
}
const schemaTypeOptions = useMemo(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the reason for this change? In general, all formats should be supported for any connector, as the format system is responsible for deserializing the bytes after they've been read from the source. We also want to avoid introducing special cases for particular connectors in the UI.

@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="utf-8"?>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, this should be converted to black and white to match with the other connector images. (If that's not too much trouble, I can just up a monochrome version).

"username": {
"title": "Username",
"type": "string",
"description": "Username for authentication"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's useful for username to also be a var-str, even though it's not generally sensitive, as that allows users to manage usernames and passwords together.

]
},
"consumer_id": {
"type": "integer",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general users shouldn't need to set a consumer name, since in arroyo we manage the horizontal scaling using low-level apis rather than high-level consumer group APIs.

It also looks like Iggy supports both numeric and textual consumers; I think many users would prefer to use names so perhaps it makes sense to support both types here by taking a string?

"title": "Consumer ID",
"description": "The ID of the consumer to use for polling messages"
},
"partition_id": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to have the user choose a particular partition to read from; generally you want to read from all partitions. At the very least I think we need an option to read from all partitions.

async fn init_client(&mut self) -> Result<()> {
info!("Creating Iggy client for {}", self.endpoint);

let client = IggyClient::default();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not passing in the configured endpoint here

})?;
table.insert(self.partition_id, state).await;

let client_temp = self.client.take();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this here?

let mut additional_fields = HashMap::new();
for field in &self.metadata_fields {
match field.key.as_str() {
"offset" => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be made a bit more succinct by pulling the insert call out of each match statement (since it's always the same)

.await
{
Ok(_) => {
info!("Sent {} messages to Iggy", messages_to_send.len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be downgraded to a debug or trace, otherwise will be very noisy

}

if let Err(e) = self.send_messages(messages, ctx).await {
error!("Failed to send messages to Iggy: {:?}", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently dropping messages on failure, which is generally the wrong default for a stream processing system like arroyo. Instead, we should retry some number of times and then fail the pipeline by panicking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants