Skip to content

Commit 0f0d8d8

Browse files
pranavrthemasab
authored andcommitted
[KIP-848] Added new error code handling to OffsetCommit and OffsetFetch (#4681)
- Added new errors to manual commit. - improvements to OffsetCommit and OffsetFetch error code handling.
1 parent b42d0d9 commit 0f0d8d8

File tree

8 files changed

+176
-31
lines changed

8 files changed

+176
-31
lines changed

src/rdkafka.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4426,6 +4426,21 @@ RD_EXPORT int rd_kafka_assignment_lost(rd_kafka_t *rk);
44264426
* or successfully scheduled if asynchronous, or failed.
44274427
* RD_KAFKA_RESP_ERR__FATAL is returned if the consumer has raised
44284428
* a fatal error.
4429+
*
4430+
* FIXME: Update below documentation.
4431+
*
4432+
* RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH is returned, when
4433+
* using `group.protocol=consumer`, if the commit failed because the
4434+
* member has switched to a new member epoch.
4435+
* This error code can be retried.
4436+
* Partition level error is also set in the \p offsets.
4437+
*
4438+
* RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID is returned, when
4439+
* using `group.protocol=consumer`, if the member has been
4440+
* removed from the consumer group
4441+
* This error code is permanent, uncommitted messages will be
4442+
* reprocessed by this or a different member and committed there.
4443+
* Partition level error is also set in the \p offsets.
44294444
*/
44304445
RD_EXPORT rd_kafka_resp_err_t
44314446
rd_kafka_commit(rd_kafka_t *rk,

src/rdkafka_assignment.c

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,30 @@ rd_kafka_assignment_apply_offsets(rd_kafka_t *rk,
153153
continue;
154154
}
155155

156-
if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
157-
rktpar->err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
156+
if (err == RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH ||
157+
rktpar->err == RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH) {
158+
rd_kafka_topic_partition_t *rktpar_copy;
159+
160+
rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
161+
"Adding %s [%" PRId32
162+
"] back to pending "
163+
"list because of stale member epoch",
164+
rktpar->topic, rktpar->partition);
165+
166+
rktpar_copy = rd_kafka_topic_partition_list_add_copy(
167+
rk->rk_consumer.assignment.pending, rktpar);
168+
/* Need to reset offset to STORED to query for
169+
* the committed offset again. If the offset is
170+
* kept INVALID then auto.offset.reset will be
171+
* triggered.
172+
*
173+
* Not necessary if err is UNSTABLE_OFFSET_COMMIT
174+
* because the buffer is retried there. */
175+
rktpar_copy->offset = RD_KAFKA_OFFSET_STORED;
176+
177+
} else if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
178+
rktpar->err ==
179+
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
158180
/* Ongoing transactions are blocking offset retrieval.
159181
* This is typically retried from the OffsetFetch
160182
* handler but we can come here if the assignment
@@ -210,7 +232,9 @@ rd_kafka_assignment_apply_offsets(rd_kafka_t *rk,
210232
/* Do nothing for request-level errors (err is set). */
211233
}
212234

213-
if (offsets->cnt > 0)
235+
/* In case of stale member epoch we retry to serve the
236+
* assignment only after a successful ConsumerGroupHeartbeat. */
237+
if (offsets->cnt > 0 && err != RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH)
214238
rd_kafka_assignment_serve(rk);
215239
}
216240

@@ -274,18 +298,29 @@ static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk,
274298
return;
275299
}
276300

277-
278-
279301
if (err) {
280-
rd_kafka_dbg(rk, CGRP, "OFFSET",
281-
"Offset fetch error for %d partition(s): %s",
282-
offsets->cnt, rd_kafka_err2str(err));
283-
rd_kafka_consumer_err(
284-
rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0, NULL,
285-
NULL, RD_KAFKA_OFFSET_INVALID,
286-
"Failed to fetch committed offsets for "
287-
"%d partition(s) in group \"%s\": %s",
288-
offsets->cnt, rk->rk_group_id->str, rd_kafka_err2str(err));
302+
switch (err) {
303+
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
304+
rk->rk_cgrp->rkcg_consumer_flags |=
305+
RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
306+
/* Fallback */
307+
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
308+
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
309+
rk->rk_cgrp);
310+
break;
311+
default:
312+
rd_kafka_dbg(
313+
rk, CGRP, "OFFSET",
314+
"Offset fetch error for %d partition(s): %s",
315+
offsets->cnt, rd_kafka_err2str(err));
316+
rd_kafka_consumer_err(
317+
rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0,
318+
NULL, NULL, RD_KAFKA_OFFSET_INVALID,
319+
"Failed to fetch committed offsets for "
320+
"%d partition(s) in group \"%s\": %s",
321+
offsets->cnt, rk->rk_group_id->str,
322+
rd_kafka_err2str(err));
323+
}
289324
}
290325

291326
/* Apply the fetched offsets to the assignment */

src/rdkafka_cgrp.c

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2979,6 +2979,16 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
29792979
}
29802980
}
29812981

2982+
if (rkcg->rkcg_consumer_flags &
2983+
RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING &&
2984+
rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
2985+
/* TODO: Check if this should be done only for the steady state?
2986+
*/
2987+
rd_kafka_assignment_serve(rk);
2988+
rkcg->rkcg_consumer_flags &=
2989+
~RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
2990+
}
2991+
29822992
if (rkcg->rkcg_next_target_assignment) {
29832993
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) {
29842994
rd_kafka_cgrp_consumer_next_target_assignment_request_metadata(
@@ -3092,8 +3102,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
30923102
/* Re-query for coordinator */
30933103
rkcg->rkcg_consumer_flags |=
30943104
RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
3095-
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
30963105
rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
3106+
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
30973107
}
30983108

30993109
if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
@@ -3334,7 +3344,11 @@ static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg) {
33343344
if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
33353345
return 0;
33363346

3337-
/* Check if wait-coord queue has timed out. */
3347+
/* Check if wait-coord queue has timed out.
3348+
3349+
FIXME: Remove usage of `group_session_timeout_ms` for the new
3350+
consumer group protocol implementation defined in KIP-848.
3351+
*/
33383352
if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
33393353
rkcg->rkcg_ts_terminate +
33403354
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
@@ -3505,7 +3519,6 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
35053519
static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
35063520
rd_kafka_op_t *rko,
35073521
const char *reason) {
3508-
35093522
/* wait_coord_q is disabled session.timeout.ms after
35103523
* group close() has been initated. */
35113524
if (rko->rko_u.offset_commit.ts_timeout != 0 ||
@@ -3524,6 +3537,11 @@ static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
35243537
: "none");
35253538

35263539
rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
3540+
3541+
/* FIXME: Remove `group_session_timeout_ms` for the new protocol
3542+
* defined in KIP-848 as this property is deprecated from client
3543+
* side in the new protocol.
3544+
*/
35273545
rko->rko_u.offset_commit.ts_timeout =
35283546
rd_clock() +
35293547
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
@@ -3532,6 +3550,45 @@ static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
35323550
return 1;
35333551
}
35343552

3553+
/**
3554+
* @brief Defer offset commit (rko) until coordinator is available (KIP-848).
3555+
*
3556+
* @returns 1 if the rko was deferred or 0 if the defer queue is disabled
3557+
* or rko already deferred.
3558+
*/
3559+
static int rd_kafka_cgrp_consumer_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
3560+
rd_kafka_op_t *rko,
3561+
const char *reason) {
3562+
/* wait_coord_q is disabled session.timeout.ms after
3563+
* group close() has been initated. */
3564+
if ((rko->rko_u.offset_commit.ts_timeout != 0 &&
3565+
rd_clock() >= rko->rko_u.offset_commit.ts_timeout) ||
3566+
!rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
3567+
return 0;
3568+
3569+
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
3570+
"Group \"%s\": "
3571+
"unable to OffsetCommit in state %s: %s: "
3572+
"retrying later",
3573+
rkcg->rkcg_group_id->str,
3574+
rd_kafka_cgrp_state_names[rkcg->rkcg_state], reason);
3575+
3576+
rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
3577+
3578+
if (!rko->rko_u.offset_commit.ts_timeout) {
3579+
rko->rko_u.offset_commit.ts_timeout =
3580+
rd_clock() +
3581+
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
3582+
}
3583+
3584+
/* Reset partition level error before retrying */
3585+
rd_kafka_topic_partition_list_set_err(
3586+
rko->rko_u.offset_commit.partitions, RD_KAFKA_RESP_ERR_NO_ERROR);
3587+
3588+
rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
3589+
3590+
return 1;
3591+
}
35353592

35363593
/**
35373594
* @brief Update the committed offsets for the partitions in \p offsets,
@@ -3730,18 +3787,23 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
37303787
rd_kafka_err2str(err));
37313788
}
37323789

3733-
37343790
/*
37353791
* Error handling
37363792
*/
37373793
switch (err) {
37383794
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
3739-
/* Revoke assignment and rebalance on unknown member */
3740-
rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
3741-
rd_kafka_cgrp_revoke_all_rejoin_maybe(
3742-
rkcg, rd_true /*assignment is lost*/,
3743-
rd_true /*this consumer is initiating*/,
3744-
"OffsetCommit error: Unknown member");
3795+
if (rkcg->rkcg_group_protocol ==
3796+
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
3797+
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
3798+
rk->rk_cgrp);
3799+
} else {
3800+
/* Revoke assignment and rebalance on unknown member */
3801+
rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
3802+
rd_kafka_cgrp_revoke_all_rejoin_maybe(
3803+
rkcg, rd_true /*assignment is lost*/,
3804+
rd_true /*this consumer is initiating*/,
3805+
"OffsetCommit error: Unknown member");
3806+
}
37453807
break;
37463808

37473809
case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
@@ -3756,6 +3818,20 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
37563818
case RD_KAFKA_RESP_ERR__IN_PROGRESS:
37573819
return; /* Retrying */
37583820

3821+
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
3822+
/* FIXME: Add logs.*/
3823+
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp);
3824+
if (!rd_strcmp(rko_orig->rko_u.offset_commit.reason, "manual"))
3825+
/* Don't retry manual commits giving this error.
3826+
* TODO: do this in a faster and cleaner way
3827+
* with a bool. */
3828+
break;
3829+
3830+
if (rd_kafka_cgrp_consumer_defer_offset_commit(
3831+
rkcg, rko_orig, rd_kafka_err2str(err)))
3832+
return;
3833+
break;
3834+
37593835
case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
37603836
case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
37613837
case RD_KAFKA_RESP_ERR__TRANSPORT:
@@ -6056,6 +6132,9 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {
60566132
}
60576133
}
60586134

6135+
/**
6136+
* FIXME: Add reason and logging.
6137+
*/
60596138
void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) {
60606139
if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER)
60616140
return;

src/rdkafka_cgrp.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ typedef struct rd_kafka_cgrp_s {
305305
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x40
306306
/** Member is fenced, rejoining */
307307
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x80
308-
308+
/** Serve pending assignments after heartbeat */
309+
#define RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING 0x100
309310

310311
/** Rejoin the group following a currently in-progress
311312
* incremental unassign. */

src/rdkafka_offset.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,6 @@ rd_kafka_commit0(rd_kafka_t *rk,
380380
return RD_KAFKA_RESP_ERR_NO_ERROR;
381381
}
382382

383-
384-
385383
/**
386384
* NOTE: 'offsets' may be NULL, see official documentation.
387385
*/

src/rdkafka_partition.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2978,8 +2978,12 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(
29782978

29792979
/**
29802980
* @brief Creates a copy of \p rktpar and adds it to \p rktparlist
2981+
*
2982+
* @return Copy of passed partition that was added to the list
2983+
*
2984+
* @remark Ownership of returned partition remains of the list.
29812985
*/
2982-
void rd_kafka_topic_partition_list_add_copy(
2986+
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_copy(
29832987
rd_kafka_topic_partition_list_t *rktparlist,
29842988
const rd_kafka_topic_partition_t *rktpar) {
29852989
rd_kafka_topic_partition_t *dst;
@@ -2988,6 +2992,7 @@ void rd_kafka_topic_partition_list_add_copy(
29882992
__FUNCTION__, __LINE__, rktparlist, rktpar->topic,
29892993
rktpar->partition, NULL, rktpar->_private);
29902994
rd_kafka_topic_partition_update(dst, rktpar);
2995+
return dst;
29912996
}
29922997

29932998

src/rdkafka_partition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(
724724
const char *topic,
725725
int32_t partition);
726726

727-
void rd_kafka_topic_partition_list_add_copy(
727+
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_copy(
728728
rd_kafka_topic_partition_list_t *rktparlist,
729729
const rd_kafka_topic_partition_t *rktpar);
730730

src/rdkafka_request.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2301,8 +2301,20 @@ void rd_kafka_ConsumerGroupHeartbeatRequest(
23012301

23022302
rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
23032303

2304-
rd_kafka_buf_set_abs_timeout(
2305-
rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
2304+
/* FIXME:
2305+
* 1) Improve this timeout to something less than
2306+
* `rkcg_heartbeat_intvl_ms` so that the next heartbeat
2307+
* is not skipped.
2308+
* 2) Remove usage of `group_session_timeout_ms` altogether
2309+
* from the new protocol defined in KIP-848.
2310+
*/
2311+
if (rkb->rkb_rk->rk_cgrp->rkcg_heartbeat_intvl_ms > 0) {
2312+
rd_kafka_buf_set_abs_timeout(
2313+
rkbuf, rkb->rkb_rk->rk_cgrp->rkcg_heartbeat_intvl_ms, 0);
2314+
} else {
2315+
rd_kafka_buf_set_abs_timeout(
2316+
rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
2317+
}
23062318

23072319
rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
23082320
}

0 commit comments

Comments
 (0)