-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Code and tests fixes to make the full test suite pass #4970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
0ef423a
to
b02a4eb
Compare
b02a4eb
to
8a7a17a
Compare
30f2a9c
to
41c2d23
Compare
41c2d23
to
f67e0d4
Compare
10e3b71
to
5609960
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving partial review comments.
rkt = test_create_producer_topic(rk, topic, "message.timeout.ms", | ||
"2000", NULL); | ||
"3000", NULL); | ||
|
||
TEST_SAY("Auto-creating topic %s\n", topic); | ||
test_auto_create_topic_rkt(rk, rkt, tmout_multip(5000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wouldn't be needed anymore as we're creating using mock broker
case 1: | ||
expected_metadata_requests = 7; | ||
break; | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Just fail the test if case isn't one of the expected variations instead, otherwise the person reading the code thinks this is a valid case too
b15851b
to
85daea2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another partial review, 2 commits are still pending review.
@@ -116,7 +116,12 @@ int main_0093_holb_consumer(int argc, char **argv) { | |||
|
|||
test_conf_set(conf, "session.timeout.ms", "6000"); | |||
test_conf_set(conf, "max.poll.interval.ms", "20000"); | |||
test_conf_set(conf, "socket.timeout.ms", "3000"); | |||
/* Socket timeout must be greater than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the requests sent by the group coordinator (the joingroup request), don't we set request timeout differently?
/* Absolute timeout */
rd_kafka_buf_set_abs_timeout_force(
rkbuf,
/* Request timeout is max.poll.interval.ms + grace
* if the broker supports it, else
* session.timeout.ms + grace. */
(ApiVersion >= 1 ? rk->rk_conf.max_poll_interval_ms
: rk->rk_conf.group_session_timeout_ms) +
3000 /* 3s grace period*/,
0);
In particular, isn't this test designed to test that while JoinGroup requests may block group coordinator connection for T, where socket.timeout.ms < T < max.poll.interval.ms, but it should not cause any failures?
From commit that added this test,
"Since JoinGroupRequests may block for up to max.poll.interval.ms,
which may be set very high (hours..), any sub-sequent requests
on the same connection, such as Metadata refreshes, would time out
and tear down the connection, triggering another rebalance."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot reproduce this test failure anymore. I'll remove the test change. It's fair that the socket.timeout.ms
shouldn't be used but max.poll.interval.ms
be used instead. This values is still lower but it shouldn't be needed to increase it.
@@ -87,6 +87,9 @@ static void do_test_fetch_max_bytes(void) { | |||
Test::Fail("Failed to create KafkaConsumer: " + errstr); | |||
delete conf; | |||
|
|||
/* For next consumer */ | |||
test_wait_topic_exists(c->c_ptr(), topic.c_str(), 5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: given that we've already produced to the said topic, this is pretty much just rd_sleep(1), right? To propagate the topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's similar, at the moment, we may change how we wait a topic exists on all brokers by using JMX requests.
static int32_t | ||
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add @brief description above this method, makes it a bit easier for the reader if it includes what we're returning too
src/rdkafka_cgrp.c
Outdated
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg, | ||
rd_kafka_topic_partition_list_t *rktparlist) { | ||
int32_t ret = rd_atomic32_add(&rkcg->rkcg_subscription_version, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
int32_t ret = rd_atomic32_add(&rkcg->rkcg_subscription_version, 1); | |
int32_t new_subscription_version = rd_atomic32_add(&rkcg->rkcg_subscription_version, 1); |
src/rdkafka_int.h
Outdated
rd_kafka_metadata_internal_t | ||
*rk_full_metadata; /* Last full metadata. */ | ||
rd_ts_t rk_ts_full_metadata; /* Timestamp of .. */ | ||
rd_ts_t rk_ts_full_metadata; /* Timestamp of .. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
/* Timestamp of most recent full metadata */
* avoid retrying it on this same broker. | ||
* This is to prevent client is hung | ||
* until it can connect to this broker again. */ | ||
if (!request->rkbuf_u.Metadata.decr && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't we need to hold
Metadata.decr_lock
while reading it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's only needed if we need to decrease the integer it points to, that can be rkmc_full_brokers_sent
or rkmc_full_topics_sent
. Lock for it will be acquired when the buffer is destroyed. Can add a comment for this.
1df9ab4
to
cefa0ca
Compare
cefa0ca
to
537d426
Compare
440e46e
to
bb1d731
Compare
skip events generated before the assignment that lead to a test failure
- avoid full metadata refresh during metadata propagation time after topic creation - Rebalance events order after max.poll.interval.ms exceeded
…ssages verification. Log warnings for the errors to identify the cause.
calls cause an unknown topic or partition error
as for the `rd_kafka_toppar_t` that is also the same strategy as in Java client, considering: - when topic id changes partitions metadata is taken entirely from the new one. - when leader epoch is greater or equal to the one in the cache, or null (-1), partition metadata is taken from the new one. - when leader epoch is less than the one in the cache, partition metadata remains the same. Also when full metadata is necessary, the cache is used for storing it and for matching topics in the regex, removing the need to store the full metadata result, that is about the same size, but the cache is updated more accurately.
by reducing or skipping tests with a large number of elements
on an unreachable broker prevents refreshing the controller or the coordinator until that broker becomes reachable again
It's due to the metadata propagation period so even if producing to the topic is done, the metadata is not yet propagated to all brokers. As in `test_wait_metadata_update` we wait 1s for propagation. TBD: checking the JMX metrics about metadata propagation to tell exactly when metadata has been propagated.
This issue happens when a broker is destroying and there are enqueued operations to add buffers to id. If the operations are executed after purging the buffers their aren't destroyed and the refcnt for `rkbuf_rkb` prevents the broker and whole instance from being finally destroyed
on `rd_kafka_toppar_delegate_to_leader`
produces a _TIMED_OUT error and then after retrying the delivery callbacks are called with _MSG_TIMED_OUT.
for them, as this way it won't be requested later and will skip metadata refresh with a log containing "already being requested". Related issue is solved with subscription versions and tests 0143 and 0146 still pass Replace wait cache hints in case consumer group metadata refresh wasn't sent to request them again. Avoid joining the group if not all topics are in cache but metadata request couldn't be sent
version to set the released one
…after AK version upgrade to rc4
d1d53e2
to
e07ae46
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed individual commits and fixups. Please merge with all commits intact. Thank you for the fixes!
failing after the produce requests with a _STATE error.
Test 0146 is less dependent on timing.
both variations to track first metadata request as well
eefdbb9
to
c110479
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the newly added commits.
Includes a task to run the test suite on demand on Semaphore CI.
A description can be found in each commit message.
Closes #4964, #4778, #4907, #4884