Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ librdkafka v2.11.0 is a feature release:

* Fix for frequent disconnections on push telemetry requests
with particular metric configurations (#4912).
* Avoid copy outside boundaries when reading metric names in telemetry
subscription (#5105)


## Fixes
Expand All @@ -19,6 +21,10 @@ librdkafka v2.11.0 is a feature release:
some metrics are matching the producer but none the consumer
or the other way around.
Happens since 2.5.0 (#4912).
* Issues: #5102
Avoid copy outside boundaries when reading metric names in telemetry
subscription. It can cause that some metrics aren't matched.
Happens since 2.5.0 (#5105).



Expand Down
82 changes: 81 additions & 1 deletion src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -6627,7 +6627,7 @@ void rd_kafka_handle_GetTelemetrySubscriptions(rd_kafka_t *rk,
rd_kafkap_str_t Metric;
rd_kafka_buf_read_str(rkbuf, &Metric);
rk->rk_telemetry.requested_metrics[i] =
rd_strdup(Metric.str);
RD_KAFKAP_STR_DUP(&Metric);
}
}

Expand Down Expand Up @@ -6952,13 +6952,93 @@ static int unittest_idempotent_producer(void) {
return 0;
}

/**
* @brief Test for the GetTelemetrySubscriptions response handling.
*
* @returns 1 on failure, 0 on success.
*/
static int unittest_handle_GetTelemetrySubscriptions(void) {
rd_kafka_t *rk;
rd_kafka_broker_t *rkb;
rd_kafka_buf_t *rkbuf;

RD_UT_SAY("Verifying GetTelemetrySubscriptions response handling");

rk = rd_kafka_new(RD_KAFKA_CONSUMER, NULL, NULL, 0);
rkb = rd_kafka_broker_add_logical(rk, "unittest");

rkbuf = rd_kafka_buf_new(0, 0);
rkbuf->rkbuf_rkb = rkb;
rd_kafka_buf_write_i32(rkbuf, 0); /* ThrottleTime */
rd_kafka_buf_write_i16(rkbuf, 0); /* ErrorCode */

rd_kafka_buf_write_uuid(rkbuf, &rk->rk_telemetry.client_instance_id);

rd_kafka_buf_write_i32(rkbuf, 0); /* SubscriptionId */

rd_kafka_buf_write_arraycnt(rkbuf, 2); /* #AcceptedCompressionTypes */
/* AcceptedCompressionTypes[0] */
rd_kafka_buf_write_i8(rkbuf, RD_KAFKA_COMPRESSION_GZIP);
/* AcceptedCompressionTypes[1] */
rd_kafka_buf_write_i8(rkbuf, RD_KAFKA_COMPRESSION_LZ4);

rd_kafka_buf_write_i32(rkbuf, 0); /* PushIntervalMs */
rd_kafka_buf_write_i32(rkbuf, 0); /* TelemetryMaxBytes */
rd_kafka_buf_write_bool(rkbuf, 0); /* DeltaTemporality */

rd_kafka_buf_write_arraycnt(rkbuf, 2); /* #RequestedMetrics */
/* RequestedMetrics[0] */
rd_kafka_buf_write_str(rkbuf, "metric1", -1);
/* RequestedMetrics[1] */
rd_kafka_buf_write_str(rkbuf, "metric2", -1);

/* Set up a buffer reader for sending the buffer. */
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);

/* Handle the response */
rd_kafka_handle_GetTelemetrySubscriptions(
rk, rkb, RD_KAFKA_RESP_ERR_NO_ERROR, rkbuf, NULL, NULL);


RD_UT_ASSERT(rk->rk_telemetry.accepted_compression_types_cnt == 2,
"Expected 2 accepted compression types, got %" PRIusz,
rk->rk_telemetry.accepted_compression_types_cnt);
RD_UT_ASSERT(rk->rk_telemetry.accepted_compression_types[0] ==
RD_KAFKA_COMPRESSION_GZIP,
"Expected 'gzip' compression type, got '%s'",
rd_kafka_compression2str(
rk->rk_telemetry.accepted_compression_types[0]));
RD_UT_ASSERT(rk->rk_telemetry.accepted_compression_types[1] ==
RD_KAFKA_COMPRESSION_LZ4,
"Expected 'lz4' compression type, got '%s'",
rd_kafka_compression2str(
rk->rk_telemetry.accepted_compression_types[1]));

RD_UT_ASSERT(rk->rk_telemetry.requested_metrics_cnt == 2,
"Expected 2 requested metrics, got %" PRIusz,
rk->rk_telemetry.requested_metrics_cnt);
RD_UT_ASSERT(
rd_strcmp(rk->rk_telemetry.requested_metrics[0], "metric1") == 0,
"Expected 'metric1', got '%s'",
rk->rk_telemetry.requested_metrics[0]);
RD_UT_ASSERT(
rd_strcmp(rk->rk_telemetry.requested_metrics[1], "metric2") == 0,
"Expected 'metric2', got '%s'",
rk->rk_telemetry.requested_metrics[1]);

rd_kafka_buf_destroy(rkbuf);
rd_kafka_destroy(rk);
return 0;
}

/**
* @brief Request/response unit tests
*/
int unittest_request(void) {
int fails = 0;

fails += unittest_idempotent_producer();
fails += unittest_handle_GetTelemetrySubscriptions();

return fails;
}
Expand Down