Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -4426,6 +4426,21 @@ RD_EXPORT int rd_kafka_assignment_lost(rd_kafka_t *rk);
* or successfully scheduled if asynchronous, or failed.
* RD_KAFKA_RESP_ERR__FATAL is returned if the consumer has raised
* a fatal error.
*
* FIXME: Update below documentation.
*
* RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH is returned, when
* using `group.protocol=consumer`, if the commit failed because the
* member has switched to a new member epoch.
* This error code can be retried.
* Partition level error is also set in the \p offsets.
*
* RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID is returned, when
* using `group.protocol=consumer`, if the member has been
* removed from the consumer group
* This error code is permanent, uncommitted messages will be
* reprocessed by this or a different member and committed there.
* Partition level error is also set in the \p offsets.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_commit(rd_kafka_t *rk,
Expand Down
63 changes: 49 additions & 14 deletions src/rdkafka_assignment.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,30 @@ rd_kafka_assignment_apply_offsets(rd_kafka_t *rk,
continue;
}

if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
rktpar->err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
if (err == RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH ||
rktpar->err == RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH) {
rd_kafka_topic_partition_t *rktpar_copy;

rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
"Adding %s [%" PRId32
"] back to pending "
"list because of stale member epoch",
rktpar->topic, rktpar->partition);

rktpar_copy = rd_kafka_topic_partition_list_add_copy(
rk->rk_consumer.assignment.pending, rktpar);
/* Need to reset offset to STORED to query for
* the committed offset again. If the offset is
* kept INVALID then auto.offset.reset will be
* triggered.
*
* Not necessary if err is UNSTABLE_OFFSET_COMMIT
* because the buffer is retried there. */
rktpar_copy->offset = RD_KAFKA_OFFSET_STORED;

} else if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
rktpar->err ==
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
/* Ongoing transactions are blocking offset retrieval.
* This is typically retried from the OffsetFetch
* handler but we can come here if the assignment
Expand Down Expand Up @@ -210,7 +232,9 @@ rd_kafka_assignment_apply_offsets(rd_kafka_t *rk,
/* Do nothing for request-level errors (err is set). */
}

if (offsets->cnt > 0)
/* In case of stale member epoch we retry to serve the
* assignment only after a successful ConsumerGroupHeartbeat. */
if (offsets->cnt > 0 && err != RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH)
rd_kafka_assignment_serve(rk);
}

Expand Down Expand Up @@ -274,18 +298,29 @@ static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk,
return;
}



if (err) {
rd_kafka_dbg(rk, CGRP, "OFFSET",
"Offset fetch error for %d partition(s): %s",
offsets->cnt, rd_kafka_err2str(err));
rd_kafka_consumer_err(
rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0, NULL,
NULL, RD_KAFKA_OFFSET_INVALID,
"Failed to fetch committed offsets for "
"%d partition(s) in group \"%s\": %s",
offsets->cnt, rk->rk_group_id->str, rd_kafka_err2str(err));
switch (err) {
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
rk->rk_cgrp->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
/* Fallback */
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp);
break;
default:
rd_kafka_dbg(
rk, CGRP, "OFFSET",
"Offset fetch error for %d partition(s): %s",
offsets->cnt, rd_kafka_err2str(err));
rd_kafka_consumer_err(
rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0,
NULL, NULL, RD_KAFKA_OFFSET_INVALID,
"Failed to fetch committed offsets for "
"%d partition(s) in group \"%s\": %s",
offsets->cnt, rk->rk_group_id->str,
rd_kafka_err2str(err));
}
}

/* Apply the fetched offsets to the assignment */
Expand Down
99 changes: 89 additions & 10 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2979,6 +2979,16 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
}
}

if (rkcg->rkcg_consumer_flags &
RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING &&
rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
/* TODO: Check if this should be done only for the steady state?
*/
rd_kafka_assignment_serve(rk);
rkcg->rkcg_consumer_flags &=
~RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
}

if (rkcg->rkcg_next_target_assignment) {
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) {
rd_kafka_cgrp_consumer_next_target_assignment_request_metadata(
Expand Down Expand Up @@ -3092,8 +3102,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
/* Re-query for coordinator */
rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
Expand Down Expand Up @@ -3334,7 +3344,11 @@ static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg) {
if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
return 0;

/* Check if wait-coord queue has timed out. */
/* Check if wait-coord queue has timed out.

FIXME: Remove usage of `group_session_timeout_ms` for the new
consumer group protocol implementation defined in KIP-848.
*/
if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
rkcg->rkcg_ts_terminate +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
Expand Down Expand Up @@ -3505,7 +3519,6 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
rd_kafka_op_t *rko,
const char *reason) {

/* wait_coord_q is disabled session.timeout.ms after
* group close() has been initated. */
if (rko->rko_u.offset_commit.ts_timeout != 0 ||
Expand All @@ -3524,6 +3537,11 @@ static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
: "none");

rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;

/* FIXME: Remove `group_session_timeout_ms` for the new protocol
* defined in KIP-848 as this property is deprecated from client
* side in the new protocol.
*/
rko->rko_u.offset_commit.ts_timeout =
rd_clock() +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
Expand All @@ -3532,6 +3550,45 @@ static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
return 1;
}

/**
* @brief Defer offset commit (rko) until coordinator is available (KIP-848).
*
* @returns 1 if the rko was deferred or 0 if the defer queue is disabled
* or rko already deferred.
*/
static int rd_kafka_cgrp_consumer_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
rd_kafka_op_t *rko,
const char *reason) {
/* wait_coord_q is disabled session.timeout.ms after
* group close() has been initated. */
if ((rko->rko_u.offset_commit.ts_timeout != 0 &&
rd_clock() >= rko->rko_u.offset_commit.ts_timeout) ||
!rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
return 0;

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
"Group \"%s\": "
"unable to OffsetCommit in state %s: %s: "
"retrying later",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state], reason);

rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;

if (!rko->rko_u.offset_commit.ts_timeout) {
rko->rko_u.offset_commit.ts_timeout =
rd_clock() +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
}

/* Reset partition level error before retrying */
rd_kafka_topic_partition_list_set_err(
rko->rko_u.offset_commit.partitions, RD_KAFKA_RESP_ERR_NO_ERROR);

rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);

return 1;
}

/**
* @brief Update the committed offsets for the partitions in \p offsets,
Expand Down Expand Up @@ -3730,18 +3787,23 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
rd_kafka_err2str(err));
}


/*
* Error handling
*/
switch (err) {
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
/* Revoke assignment and rebalance on unknown member */
rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
rd_kafka_cgrp_revoke_all_rejoin_maybe(
rkcg, rd_true /*assignment is lost*/,
rd_true /*this consumer is initiating*/,
"OffsetCommit error: Unknown member");
if (rkcg->rkcg_group_protocol ==
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp);
} else {
/* Revoke assignment and rebalance on unknown member */
rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
rd_kafka_cgrp_revoke_all_rejoin_maybe(
rkcg, rd_true /*assignment is lost*/,
rd_true /*this consumer is initiating*/,
"OffsetCommit error: Unknown member");
}
break;

case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
Expand All @@ -3756,6 +3818,20 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
case RD_KAFKA_RESP_ERR__IN_PROGRESS:
return; /* Retrying */

case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
/* FIXME: Add logs.*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an expedite HB here

Suggested change
/* FIXME: Add logs.*/
/* FIXME: Add logs.*/
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp);

rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp);
if (!rd_strcmp(rko_orig->rko_u.offset_commit.reason, "manual"))
/* Don't retry manual commits giving this error.
* TODO: do this in a faster and cleaner way
* with a bool. */
break;

if (rd_kafka_cgrp_consumer_defer_offset_commit(
rkcg, rko_orig, rd_kafka_err2str(err)))
return;
break;

case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR__TRANSPORT:
Expand Down Expand Up @@ -6056,6 +6132,9 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {
}
}

/**
* FIXME: Add reason and logging.
*/
void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) {
if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER)
return;
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ typedef struct rd_kafka_cgrp_s {
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x40
/** Member is fenced, rejoining */
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x80

/** Serve pending assignments after heartbeat */
#define RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING 0x100

/** Rejoin the group following a currently in-progress
* incremental unassign. */
Expand Down
2 changes: 0 additions & 2 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,6 @@ rd_kafka_commit0(rd_kafka_t *rk,
return RD_KAFKA_RESP_ERR_NO_ERROR;
}



/**
* NOTE: 'offsets' may be NULL, see official documentation.
*/
Expand Down
7 changes: 6 additions & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -2978,8 +2978,12 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(

/**
* @brief Creates a copy of \p rktpar and adds it to \p rktparlist
*
* @return Copy of passed partition that was added to the list
*
* @remark Ownership of returned partition remains of the list.
*/
void rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_list_t *rktparlist,
const rd_kafka_topic_partition_t *rktpar) {
rd_kafka_topic_partition_t *dst;
Expand All @@ -2988,6 +2992,7 @@ void rd_kafka_topic_partition_list_add_copy(
__FUNCTION__, __LINE__, rktparlist, rktpar->topic,
rktpar->partition, NULL, rktpar->_private);
rd_kafka_topic_partition_update(dst, rktpar);
return dst;
}


Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(
const char *topic,
int32_t partition);

void rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_list_t *rktparlist,
const rd_kafka_topic_partition_t *rktpar);

Expand Down
16 changes: 14 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2301,8 +2301,20 @@ void rd_kafka_ConsumerGroupHeartbeatRequest(

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

rd_kafka_buf_set_abs_timeout(
rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
/* FIXME:
* 1) Improve this timeout to something less than
* `rkcg_heartbeat_intvl_ms` so that the next heartbeat
* is not skipped.
* 2) Remove usage of `group_session_timeout_ms` altogether
* from the new protocol defined in KIP-848.
*/
if (rkb->rkb_rk->rk_cgrp->rkcg_heartbeat_intvl_ms > 0) {
rd_kafka_buf_set_abs_timeout(
rkbuf, rkb->rkb_rk->rk_cgrp->rkcg_heartbeat_intvl_ms, 0);
} else {
rd_kafka_buf_set_abs_timeout(
rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
}

rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}
Expand Down