Skip to content

Commit e3ae6e6

Browse files
authored
[KIP-714] Fix: Metric name from the protocol is read outside boundaries (#5105)
* Failing test * Fixes #5102
1 parent 99e9abb commit e3ae6e6

File tree

2 files changed

+87
-1
lines changed

2 files changed

+87
-1
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ librdkafka v2.11.0 is a feature release:
44

55
* Fix for frequent disconnections on push telemetry requests
66
with particular metric configurations (#4912).
7+
* Avoid copy outside boundaries when reading metric names in telemetry
8+
subscription (#5105)
79

810

911
## Fixes
@@ -19,6 +21,10 @@ librdkafka v2.11.0 is a feature release:
1921
some metrics are matching the producer but none the consumer
2022
or the other way around.
2123
Happens since 2.5.0 (#4912).
24+
* Issues: #5102
25+
Avoid copy outside boundaries when reading metric names in telemetry
26+
subscription. It can cause that some metrics aren't matched.
27+
Happens since 2.5.0 (#5105).
2228

2329

2430

src/rdkafka_request.c

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6627,7 +6627,7 @@ void rd_kafka_handle_GetTelemetrySubscriptions(rd_kafka_t *rk,
66276627
rd_kafkap_str_t Metric;
66286628
rd_kafka_buf_read_str(rkbuf, &Metric);
66296629
rk->rk_telemetry.requested_metrics[i] =
6630-
rd_strdup(Metric.str);
6630+
RD_KAFKAP_STR_DUP(&Metric);
66316631
}
66326632
}
66336633

@@ -6952,13 +6952,93 @@ static int unittest_idempotent_producer(void) {
69526952
return 0;
69536953
}
69546954

6955+
/**
6956+
* @brief Test for the GetTelemetrySubscriptions response handling.
6957+
*
6958+
* @returns 1 on failure, 0 on success.
6959+
*/
6960+
static int unittest_handle_GetTelemetrySubscriptions(void) {
6961+
rd_kafka_t *rk;
6962+
rd_kafka_broker_t *rkb;
6963+
rd_kafka_buf_t *rkbuf;
6964+
6965+
RD_UT_SAY("Verifying GetTelemetrySubscriptions response handling");
6966+
6967+
rk = rd_kafka_new(RD_KAFKA_CONSUMER, NULL, NULL, 0);
6968+
rkb = rd_kafka_broker_add_logical(rk, "unittest");
6969+
6970+
rkbuf = rd_kafka_buf_new(0, 0);
6971+
rkbuf->rkbuf_rkb = rkb;
6972+
rd_kafka_buf_write_i32(rkbuf, 0); /* ThrottleTime */
6973+
rd_kafka_buf_write_i16(rkbuf, 0); /* ErrorCode */
6974+
6975+
rd_kafka_buf_write_uuid(rkbuf, &rk->rk_telemetry.client_instance_id);
6976+
6977+
rd_kafka_buf_write_i32(rkbuf, 0); /* SubscriptionId */
6978+
6979+
rd_kafka_buf_write_arraycnt(rkbuf, 2); /* #AcceptedCompressionTypes */
6980+
/* AcceptedCompressionTypes[0] */
6981+
rd_kafka_buf_write_i8(rkbuf, RD_KAFKA_COMPRESSION_GZIP);
6982+
/* AcceptedCompressionTypes[1] */
6983+
rd_kafka_buf_write_i8(rkbuf, RD_KAFKA_COMPRESSION_LZ4);
6984+
6985+
rd_kafka_buf_write_i32(rkbuf, 0); /* PushIntervalMs */
6986+
rd_kafka_buf_write_i32(rkbuf, 0); /* TelemetryMaxBytes */
6987+
rd_kafka_buf_write_bool(rkbuf, 0); /* DeltaTemporality */
6988+
6989+
rd_kafka_buf_write_arraycnt(rkbuf, 2); /* #RequestedMetrics */
6990+
/* RequestedMetrics[0] */
6991+
rd_kafka_buf_write_str(rkbuf, "metric1", -1);
6992+
/* RequestedMetrics[1] */
6993+
rd_kafka_buf_write_str(rkbuf, "metric2", -1);
6994+
6995+
/* Set up a buffer reader for sending the buffer. */
6996+
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
6997+
6998+
/* Handle the response */
6999+
rd_kafka_handle_GetTelemetrySubscriptions(
7000+
rk, rkb, RD_KAFKA_RESP_ERR_NO_ERROR, rkbuf, NULL, NULL);
7001+
7002+
7003+
RD_UT_ASSERT(rk->rk_telemetry.accepted_compression_types_cnt == 2,
7004+
"Expected 2 accepted compression types, got %" PRIusz,
7005+
rk->rk_telemetry.accepted_compression_types_cnt);
7006+
RD_UT_ASSERT(rk->rk_telemetry.accepted_compression_types[0] ==
7007+
RD_KAFKA_COMPRESSION_GZIP,
7008+
"Expected 'gzip' compression type, got '%s'",
7009+
rd_kafka_compression2str(
7010+
rk->rk_telemetry.accepted_compression_types[0]));
7011+
RD_UT_ASSERT(rk->rk_telemetry.accepted_compression_types[1] ==
7012+
RD_KAFKA_COMPRESSION_LZ4,
7013+
"Expected 'lz4' compression type, got '%s'",
7014+
rd_kafka_compression2str(
7015+
rk->rk_telemetry.accepted_compression_types[1]));
7016+
7017+
RD_UT_ASSERT(rk->rk_telemetry.requested_metrics_cnt == 2,
7018+
"Expected 2 requested metrics, got %" PRIusz,
7019+
rk->rk_telemetry.requested_metrics_cnt);
7020+
RD_UT_ASSERT(
7021+
rd_strcmp(rk->rk_telemetry.requested_metrics[0], "metric1") == 0,
7022+
"Expected 'metric1', got '%s'",
7023+
rk->rk_telemetry.requested_metrics[0]);
7024+
RD_UT_ASSERT(
7025+
rd_strcmp(rk->rk_telemetry.requested_metrics[1], "metric2") == 0,
7026+
"Expected 'metric2', got '%s'",
7027+
rk->rk_telemetry.requested_metrics[1]);
7028+
7029+
rd_kafka_buf_destroy(rkbuf);
7030+
rd_kafka_destroy(rk);
7031+
return 0;
7032+
}
7033+
69557034
/**
69567035
* @brief Request/response unit tests
69577036
*/
69587037
int unittest_request(void) {
69597038
int fails = 0;
69607039

69617040
fails += unittest_idempotent_producer();
7041+
fails += unittest_handle_GetTelemetrySubscriptions();
69627042

69637043
return fails;
69647044
}

0 commit comments

Comments
 (0)