Skip to content

Commit 228b93d

Browse files
committed
Address comments
1 parent a1fd555 commit 228b93d

File tree

7 files changed

+55
-35
lines changed

7 files changed

+55
-35
lines changed

CONFIGURATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ partition.assignment.strategy | C | | range,roundro
110110
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
111111
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
112112
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
113+
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
114+
group.remote.assignor | C | | | medium | Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null <br>*Type: string*
113115
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer*
114116
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information. <br>*Type: integer*
115117
enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). <br>*Type: boolean*

src/rdkafka.c

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2187,7 +2187,6 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
21872187
int ret_errno = 0;
21882188
const char *conf_err;
21892189
char *group_remote_assignor_override = NULL;
2190-
rd_kafka_assignor_t *cooperative_assignor;
21912190
#ifndef _WIN32
21922191
sigset_t newset, oldset;
21932192
#endif
@@ -2379,28 +2378,51 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
23792378
goto fail;
23802379
}
23812380

2382-
/* Detect if chosen assignor is cooperative */
2383-
cooperative_assignor = rd_kafka_assignor_find(rk, "cooperative-sticky");
2384-
rk->rk_conf.partition_assignors_cooperative =
2385-
!rk->rk_conf.partition_assignors.rl_cnt ||
2386-
(cooperative_assignor && cooperative_assignor->rkas_enabled);
2387-
23882381
if (!rk->rk_conf.group_remote_assignor) {
2389-
/* Default remote assignor to the chosen local one. */
2390-
if (rk->rk_conf.partition_assignors_cooperative) {
2391-
group_remote_assignor_override = rd_strdup("uniform");
2392-
rk->rk_conf.group_remote_assignor =
2393-
group_remote_assignor_override;
2394-
} else {
2395-
rd_kafka_assignor_t *range_assignor =
2396-
rd_kafka_assignor_find(rk, "range");
2397-
if (range_assignor && range_assignor->rkas_enabled) {
2382+
rd_kafka_assignor_t *cooperative_assignor;
2383+
2384+
/* Detect if chosen assignor is cooperative */
2385+
cooperative_assignor =
2386+
rd_kafka_assignor_find(rk, "cooperative-sticky");
2387+
rk->rk_conf.partition_assignors_cooperative =
2388+
!rk->rk_conf.partition_assignors.rl_cnt ||
2389+
(cooperative_assignor &&
2390+
cooperative_assignor->rkas_enabled);
2391+
2392+
if (rk->rk_conf.group_protocol ==
2393+
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
2394+
/* Default remote assignor to the chosen local one. */
2395+
if (rk->rk_conf.partition_assignors_cooperative) {
23982396
group_remote_assignor_override =
2399-
rd_strdup("range");
2397+
rd_strdup("uniform");
24002398
rk->rk_conf.group_remote_assignor =
24012399
group_remote_assignor_override;
2400+
} else {
2401+
rd_kafka_assignor_t *range_assignor =
2402+
rd_kafka_assignor_find(rk, "range");
2403+
if (range_assignor &&
2404+
range_assignor->rkas_enabled) {
2405+
group_remote_assignor_override =
2406+
rd_strdup("range");
2407+
rk->rk_conf.group_remote_assignor =
2408+
group_remote_assignor_override;
2409+
} else {
2410+
rd_kafka_log(
2411+
rk, LOG_WARNING, "ASSIGNOR",
2412+
"roundrobin assignor isn't "
2413+
"available"
2414+
"with group protocol CONSUMER, "
2415+
"reverting group protocol "
2416+
"to CLASSIC");
2417+
rk->rk_conf.group_protocol =
2418+
RD_KAFKA_GROUP_PROTOCOL_CLASSIC;
2419+
}
24022420
}
24032421
}
2422+
} else {
2423+
/* When users starts setting properties of the new protocol,
2424+
* they can only use incremental_assign/unassign. */
2425+
rk->rk_conf.partition_assignors_cooperative = rd_true;
24042426
}
24052427

24062428
/* Create Mock cluster */

src/rdkafka_cgrp.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6015,9 +6015,6 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {
60156015

60166016
case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
60176017
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
6018-
/* If an updated/next subscription is available, schedule it. */
6019-
if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg))
6020-
break;
60216018

60226019
if (rkcg->rkcg_rebalance_rejoin) {
60236020
rkcg->rkcg_rebalance_rejoin = rd_false;

src/rdkafka_cgrp.h

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -288,24 +288,23 @@ typedef struct rd_kafka_cgrp_s {
288288
* target assignment. Cleared when an HB succeeds
289289
* after reconciliation finishes. */
290290
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK 0x1
291+
/** Member is sending an acknowledgement for a reconciled assignment */
292+
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x2
291293
/** A new subscription needs to be sent to the Coordinator. */
292-
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x2
294+
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION 0x4
293295
/** A new subscription is being sent to the Coordinator. */
294-
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION 0x4
296+
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION 0x8
295297
/** Consumer has subscribed at least once,
296298
* if it didn't happen rebalance protocol is still
297299
* considered NONE, otherwise it depends on the
298300
* configured partition assignors. */
299-
#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x8
300-
/** Send a complete request in next heartbeat,
301-
* but don't send the acknowledgement if it's not required */
302-
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x10
301+
#define RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE 0x10
302+
/** Send a complete request in next heartbeat */
303+
#define RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST 0x20
303304
/** Member is fenced, need to rejoin */
304-
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x20
305+
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x40
305306
/** Member is fenced, rejoining */
306-
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x40
307-
/** Member is sending an acknowledgement for a reconciled assignment */
308-
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x80
307+
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x80
309308

310309

311310
/** Rejoin the group following a currently in-progress

src/rdkafka_conf.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,7 +1136,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
11361136
"the only supported group "
11371137
"protocol type is `consumer`.",
11381138
.sdef = "consumer"},
1139-
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH | _RK_HIDDEN, "group.protocol", _RK_C_S2I,
1139+
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "group.protocol", _RK_C_S2I,
11401140
_RK(group_protocol),
11411141
"Group protocol to use. Use `classic` for the original protocol and "
11421142
"`consumer` for the new "
@@ -1146,8 +1146,8 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
11461146
.vdef = RD_KAFKA_GROUP_PROTOCOL_CLASSIC,
11471147
.s2i = {{RD_KAFKA_GROUP_PROTOCOL_CLASSIC, "classic"},
11481148
{RD_KAFKA_GROUP_PROTOCOL_CONSUMER, "consumer"}}},
1149-
{_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor",
1150-
_RK_C_STR, _RK(group_remote_assignor),
1149+
{_RK_GLOBAL | _RK_CGRP | _RK_MED, "group.remote.assignor", _RK_C_STR,
1150+
_RK(group_remote_assignor),
11511151
"Server side assignor to use. Keep it null to make server select a "
11521152
"suitable assignor for the group. "
11531153
"Available assignors: uniform or range. Default is null",

src/rdkafka_partition.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3252,7 +3252,7 @@ int rd_kafka_topic_partition_list_find_idx(
32523252
* @brief Search 'rktparlist' for \p topic_id and \p partition.
32533253
* @returns the elems[] index or -1 on miss.
32543254
*/
3255-
int rd_kafka_topic_partition_list_find_by_id_idx(
3255+
int rd_kafka_topic_partition_list_find_idx_by_id(
32563256
const rd_kafka_topic_partition_list_t *rktparlist,
32573257
rd_kafka_Uuid_t topic_id,
32583258
int32_t partition) {

src/rdkafka_partition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_by_id(
773773
rd_kafka_Uuid_t topic_id,
774774
int32_t partition);
775775

776-
int rd_kafka_topic_partition_list_find_by_id_idx(
776+
int rd_kafka_topic_partition_list_find_idx_by_id(
777777
const rd_kafka_topic_partition_list_t *rktparlist,
778778
rd_kafka_Uuid_t topic_id,
779779
int32_t partition);

0 commit comments

Comments
 (0)