Skip to content

Commit 5a94ee1

Browse files
authored
Merge branch 'ArroyoSystems:master' into master
2 parents 3c87e6d + 4f56baf commit 5a94ee1

File tree

50 files changed

+802
-138
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+802
-138
lines changed

crates/arroyo-connectors/src/blackhole/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ impl Connector for BlackholeConnector {
9999
format: None,
100100
bad_data: None,
101101
framing: None,
102+
metadata_fields: vec![],
102103
};
103104

104105
Ok(Connection {

crates/arroyo-connectors/src/filesystem/delta.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ impl Connector for DeltaLakeConnector {
123123
format: Some(format),
124124
bad_data: schema.bad_data.clone(),
125125
framing: schema.framing.clone(),
126+
metadata_fields: schema.metadata_fields(),
126127
};
127128

128129
Ok(Connection {

crates/arroyo-connectors/src/filesystem/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ impl Connector for FileSystemConnector {
168168
format: Some(format),
169169
bad_data: schema.bad_data.clone(),
170170
framing: schema.framing.clone(),
171+
metadata_fields: schema.metadata_fields(),
171172
};
172173

173174
Ok(Connection {

crates/arroyo-connectors/src/filesystem/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ impl FileSystemSourceFunc {
367367
line = line_reader.next() => {
368368
match line.transpose()? {
369369
Some(line) => {
370-
ctx.deserialize_slice(line.as_bytes(), SystemTime::now()).await?;
370+
ctx.deserialize_slice(line.as_bytes(), SystemTime::now(), None).await?;
371371
records_read += 1;
372372
if ctx.should_flush() {
373373
ctx.flush_buffer().await?;

crates/arroyo-connectors/src/fluvio/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ impl Connector for FluvioConnector {
154154
format: Some(format),
155155
bad_data: schema.bad_data.clone(),
156156
framing: schema.framing.clone(),
157+
metadata_fields: schema.metadata_fields(),
157158
};
158159

159160
Ok(Connection {

crates/arroyo-connectors/src/fluvio/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ impl FluvioSourceFunc {
166166
match message {
167167
Some((_, Ok(msg))) => {
168168
let timestamp = from_millis(msg.timestamp().max(0) as u64);
169-
ctx.deserialize_slice(msg.value(), timestamp).await?;
169+
ctx.deserialize_slice(msg.value(), timestamp, None).await?;
170170

171171
if ctx.should_flush() {
172172
ctx.flush_buffer().await?;

crates/arroyo-connectors/src/impulse/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ impl Connector for ImpulseConnector {
166166
format: None,
167167
bad_data: None,
168168
framing: None,
169+
metadata_fields: vec![],
169170
};
170171

171172
Ok(Connection {

crates/arroyo-connectors/src/kafka/mod.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use anyhow::{anyhow, bail};
2+
use arrow::datatypes::DataType;
23
use arroyo_formats::de::ArrowDeserializer;
34
use arroyo_formats::ser::ArrowSerializer;
4-
use arroyo_operator::connector::Connection;
5+
use arroyo_operator::connector::{Connection, MetadataDef};
56
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
67
use arroyo_rpc::df::ArroyoSchema;
78
use arroyo_rpc::formats::{BadData, Format, JsonFormat};
@@ -214,6 +215,7 @@ impl Connector for KafkaConnector {
214215
format: Some(format),
215216
bad_data: schema.bad_data.clone(),
216217
framing: schema.framing.clone(),
218+
metadata_fields: schema.metadata_fields(),
217219
};
218220

219221
Ok(Connection {
@@ -306,6 +308,27 @@ impl Connector for KafkaConnector {
306308
}
307309
}
308310

311+
fn metadata_defs(&self) -> &'static [MetadataDef] {
312+
&[
313+
MetadataDef {
314+
name: "offset_id",
315+
data_type: DataType::Int64,
316+
},
317+
MetadataDef {
318+
name: "partition",
319+
data_type: DataType::Int32,
320+
},
321+
MetadataDef {
322+
name: "topic",
323+
data_type: DataType::Utf8,
324+
},
325+
MetadataDef {
326+
name: "timestamp",
327+
data_type: DataType::Int64,
328+
},
329+
]
330+
}
331+
309332
fn from_options(
310333
&self,
311334
name: &str,
@@ -383,6 +406,7 @@ impl Connector for KafkaConnector {
383406
.unwrap_or(u32::MAX),
384407
)
385408
.unwrap(),
409+
metadata_fields: config.metadata_fields,
386410
})))
387411
}
388412
TableType::Sink {
@@ -622,7 +646,7 @@ impl KafkaTester {
622646
let mut builders = aschema.builders();
623647

624648
let mut error = deserializer
625-
.deserialize_slice(&mut builders, &msg, SystemTime::now())
649+
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
626650
.await
627651
.into_iter()
628652
.next();
@@ -644,7 +668,7 @@ impl KafkaTester {
644668
let mut builders = aschema.builders();
645669

646670
let mut error = deserializer
647-
.deserialize_slice(&mut builders, &msg, SystemTime::now())
671+
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
648672
.await
649673
.into_iter()
650674
.next();
@@ -678,7 +702,7 @@ impl KafkaTester {
678702
let mut builders = aschema.builders();
679703

680704
let mut error = deserializer
681-
.deserialize_slice(&mut builders, &msg, SystemTime::now())
705+
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
682706
.await
683707
.into_iter()
684708
.next();

crates/arroyo-connectors/src/kafka/source/mod.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
use arroyo_formats::de::FieldValueType;
12
use arroyo_rpc::formats::{BadData, Format, Framing};
23
use arroyo_rpc::grpc::rpc::TableConfig;
34
use arroyo_rpc::schema_resolver::SchemaResolver;
4-
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp};
5+
use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, ControlResp, MetadataField};
56

67
use arroyo_operator::context::ArrowContext;
78
use arroyo_operator::operator::SourceOperator;
@@ -35,6 +36,7 @@ pub struct KafkaSourceFunc {
3536
pub schema_resolver: Option<Arc<dyn SchemaResolver + Sync>>,
3637
pub client_configs: HashMap<String, String>,
3738
pub messages_per_second: NonZeroU32,
39+
pub metadata_fields: Vec<MetadataField>,
3840
}
3941

4042
#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
@@ -178,7 +180,26 @@ impl KafkaSourceFunc {
178180
.ok_or_else(|| UserError::new("Failed to read timestamp from Kafka record",
179181
"The message read from Kafka did not contain a message timestamp"))?;
180182

181-
ctx.deserialize_slice(v, from_millis(timestamp as u64)).await?;
183+
let topic = msg.topic();
184+
185+
let connector_metadata = if !self.metadata_fields.is_empty() {
186+
let mut connector_metadata = HashMap::new();
187+
for f in &self.metadata_fields {
188+
connector_metadata.insert(&f.field_name, match f.key.as_str() {
189+
"offset_id" => FieldValueType::Int64(msg.offset()),
190+
"partition" => FieldValueType::Int32(msg.partition()),
191+
"topic" => FieldValueType::String(topic),
192+
"timestamp" => FieldValueType::Int64(timestamp),
193+
k => unreachable!("Invalid metadata key '{}'", k),
194+
});
195+
}
196+
Some(connector_metadata)
197+
} else {
198+
None
199+
};
200+
201+
ctx.deserialize_slice(v, from_millis(timestamp.max(0) as u64), connector_metadata.as_ref()).await?;
202+
182203

183204
if ctx.should_flush() {
184205
ctx.flush_buffer().await?;

crates/arroyo-connectors/src/kafka/source/test.rs

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use arroyo_operator::operator::SourceOperator;
1818
use arroyo_rpc::df::ArroyoSchema;
1919
use arroyo_rpc::formats::{Format, RawStringFormat};
2020
use arroyo_rpc::grpc::rpc::{CheckpointMetadata, OperatorCheckpointMetadata, OperatorMetadata};
21-
use arroyo_rpc::{CheckpointCompleted, ControlMessage, ControlResp};
21+
use arroyo_rpc::{CheckpointCompleted, ControlMessage, ControlResp, MetadataField};
2222
use arroyo_types::{
2323
single_item_hash_map, to_micros, ArrowMessage, CheckpointBarrier, SignalMessage, TaskInfo,
2424
};
@@ -87,6 +87,7 @@ impl KafkaTopicTester {
8787
schema_resolver: None,
8888
client_configs: HashMap::new(),
8989
messages_per_second: NonZeroU32::new(100).unwrap(),
90+
metadata_fields: vec![],
9091
});
9192

9293
let (to_control_tx, control_rx) = channel(128);
@@ -342,3 +343,101 @@ async fn test_kafka() {
342343
)
343344
.await;
344345
}
346+
347+
#[tokio::test]
348+
async fn test_kafka_with_metadata_fields() {
349+
let mut kafka_topic_tester = KafkaTopicTester {
350+
topic: "__arroyo-source-test_metadata".to_string(),
351+
server: "0.0.0.0:9092".to_string(),
352+
group_id: Some("test-consumer-group".to_string()),
353+
};
354+
355+
let mut task_info = arroyo_types::get_test_task_info();
356+
task_info.job_id = format!("kafka-job-{}", random::<u64>());
357+
358+
kafka_topic_tester.create_topic().await;
359+
360+
// Prepare metadata fields
361+
let metadata_fields = vec![MetadataField {
362+
field_name: "offset".to_string(),
363+
key: "offset_id".to_string(),
364+
}];
365+
366+
// Set metadata fields in KafkaSourceFunc
367+
let mut kafka = KafkaSourceFunc {
368+
bootstrap_servers: kafka_topic_tester.server.clone(),
369+
topic: kafka_topic_tester.topic.clone(),
370+
group_id: kafka_topic_tester.group_id.clone(),
371+
group_id_prefix: None,
372+
offset_mode: SourceOffset::Earliest,
373+
format: Format::RawString(RawStringFormat {}),
374+
framing: None,
375+
bad_data: None,
376+
schema_resolver: None,
377+
client_configs: HashMap::new(),
378+
messages_per_second: NonZeroU32::new(100).unwrap(),
379+
metadata_fields,
380+
};
381+
382+
let (_to_control_tx, control_rx) = channel(128);
383+
let (command_tx, _from_control_rx) = channel(128);
384+
let (data_tx, _recv) = batch_bounded(128);
385+
386+
let checkpoint_metadata = None;
387+
388+
let mut ctx = ArrowContext::new(
389+
task_info.clone(),
390+
checkpoint_metadata,
391+
control_rx,
392+
command_tx,
393+
1,
394+
vec![],
395+
Some(ArroyoSchema::new_unkeyed(
396+
Arc::new(Schema::new(vec![
397+
Field::new(
398+
"_timestamp",
399+
DataType::Timestamp(TimeUnit::Nanosecond, None),
400+
false,
401+
),
402+
Field::new("value", DataType::Utf8, false),
403+
Field::new("offset", DataType::Int64, false),
404+
])),
405+
0,
406+
)),
407+
None,
408+
vec![vec![data_tx]],
409+
kafka.tables(),
410+
)
411+
.await;
412+
413+
tokio::spawn(async move {
414+
kafka.run(&mut ctx).await;
415+
});
416+
417+
let mut reader = kafka_topic_tester
418+
.get_source_with_reader(task_info.clone(), None)
419+
.await;
420+
let mut producer = kafka_topic_tester.get_producer();
421+
422+
// Send test data
423+
let expected_messages: Vec<_> = (1u64..=21)
424+
.map(|i| {
425+
let data = TestData { i };
426+
producer.send_data(data.clone());
427+
serde_json::to_string(&data).unwrap()
428+
})
429+
.collect();
430+
431+
// Verify received messages
432+
reader
433+
.assert_next_message_record_values(expected_messages.into())
434+
.await;
435+
436+
reader
437+
.to_control_tx
438+
.send(ControlMessage::Stop {
439+
mode: arroyo_rpc::grpc::rpc::StopMode::Graceful,
440+
})
441+
.await
442+
.unwrap();
443+
}

0 commit comments

Comments
 (0)