-
Notifications
You must be signed in to change notification settings - Fork 3.2k
[KIP-848] Added new error code handling to OffsetCommit and OffsetFetch #4681
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-848] Added new error code handling to OffsetCommit and OffsetFetch #4681
Conversation
270f7f7
to
228b93d
Compare
ff78ef0
to
1d26a60
Compare
src/rdkafka_assignment.c
Outdated
@@ -230,6 +254,7 @@ static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk, | |||
rd_kafka_buf_t *reply, | |||
rd_kafka_buf_t *request, | |||
void *opaque) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this new empty line
src/rdkafka_assignment.c
Outdated
offsets->cnt, rk->rk_group_id->str, rd_kafka_err2str(err)); | ||
switch (err) { | ||
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH: | ||
rd_assert(rk->rk_cgrp->rkcg_group_protocol == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the assert here too
src/rdkafka_cgrp.c
Outdated
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH: | ||
/* FIXME: Add logs.*/ | ||
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can retry the commit in this case, if not a manual commit as the member epoch will be updated and even in case of order change, it has set_offsets
set to true, so will take the latest stored offset for given partitions.
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH: | |
/* FIXME: Add logs.*/ | |
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp); | |
break; | |
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH: | |
/* FIXME: Add logs.*/ | |
if (!rd_strcmp(rko_orig->rko_u.offset_commit.reason, "manual")) | |
/* Don't retry manual commits giving this error. | |
* TODO: do this in a faster and cleaner way | |
* with a bool. */ | |
break; | |
if (rd_kafka_cgrp_consumer_defer_offset_commit( | |
rkcg, rko_orig, rd_kafka_err2str(err))) | |
return; | |
break; |
/**
* @brief Defer offset commit (rko) until coordinator is available (KIP-848).
*
* @returns 1 if the rko was deferred or 0 if the defer queue is disabled
* or rko already deferred.
*/
static int rd_kafka_cgrp_consumer_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
rd_kafka_op_t *rko,
const char *reason) {
/* wait_coord_q is disabled session.timeout.ms after
* group close() has been initated. */
if ((rko->rko_u.offset_commit.ts_timeout != 0 &&
rd_clock() >= rko->rko_u.offset_commit.ts_timeout) ||
!rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
return 0;
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
"Group \"%s\": "
"unable to OffsetCommit in state %s: %s: "
"retrying later",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state], reason);
rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
if (!rko->rko_u.offset_commit.ts_timeout) {
rko->rko_u.offset_commit.ts_timeout =
rd_clock() +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
}
/* Reset partition level error before retrying */
rd_kafka_topic_partition_list_set_err(
rko->rko_u.offset_commit.partitions,
RD_KAFKA_RESP_ERR_NO_ERROR);
rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
return 1;
}
228b93d
to
215ec6e
Compare
@@ -3692,6 +3815,19 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk, | |||
case RD_KAFKA_RESP_ERR__IN_PROGRESS: | |||
return; /* Retrying */ | |||
|
|||
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH: | |||
/* FIXME: Add logs.*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was an expedite HB here
/* FIXME: Add logs.*/ | |
/* FIXME: Add logs.*/ | |
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp); |
36518f2
to
58aca9a
Compare
…fsetCommit and OffsetFetch error code handling.
58aca9a
to
90ca8e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks, will merge it after CI passes. Apply a style format
…ch (#4681) - Added new errors to manual commit. - improvements to OffsetCommit and OffsetFetch error code handling.
…ch (#4681) - Added new errors to manual commit. - improvements to OffsetCommit and OffsetFetch error code handling.
No description provided.