Skip to content

Commit dc5aff2

Browse files
[KIP-848 EA] Admin API for listing consumer groups now has (#2323)
an optional filter to return only groups of given types Co-authored-by: mahajanadhitya <[email protected]>
1 parent 0981bd2 commit dc5aff2

File tree

12 files changed

+323
-23
lines changed

12 files changed

+323
-23
lines changed

CHANGELOG.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1-
# 2.5.3
1+
# 2.6.0
2+
3+
## Enhancements
24

3-
v2.5.3 is a maintenance release with the following fixes and enhancements:
5+
* References librdkafka.redist 2.6.0. Refer to the [librdkafka v2.6.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0) for more information.
6+
* [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#2323).
7+
8+
9+
# 2.5.3
410

511
## Enhancements
612

examples/AdminClient/Program.cs

Lines changed: 90 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,86 @@ static Tuple<IsolationLevel, List<TopicPartitionOffsetSpec>> ParseListOffsetsArg
313313
return Tuple.Create(isolationLevel, topicPartitionOffsetSpecs);
314314
}
315315

316+
static bool ParseListConsumerGroupsArgs(string[] commandArgs,
317+
ref TimeSpan timeout,
318+
ref List<ConsumerGroupState> states,
319+
ref List<ConsumerGroupType> types,
320+
ref string error)
321+
{
322+
try
323+
{
324+
var typeArray = false;
325+
var stateArray = false;
326+
var lastArray = 0;
327+
for (var i = 0; i < commandArgs.Length; i++)
328+
{
329+
var commandArg = commandArgs[i];
330+
if (commandArg == "-states")
331+
{
332+
if (stateArray)
333+
{
334+
error = "Cannot pass the states flag (-states) more than once";
335+
return false;
336+
}
337+
stateArray = true;
338+
lastArray = 1;
339+
}
340+
else if (commandArg == "-types")
341+
{
342+
if (typeArray)
343+
{
344+
error = "Cannot pass the types flag (-types) more than once";
345+
return false;
346+
}
347+
typeArray = true;
348+
lastArray = 2;
349+
}
350+
else
351+
{
352+
if (lastArray == 1)
353+
{
354+
try
355+
{
356+
states.Add(Enum.Parse<ConsumerGroupState>(commandArg));
357+
}
358+
catch (Exception)
359+
{
360+
error = $"Invalid state: {commandArg}";
361+
return false;
362+
}
363+
}
364+
else if (lastArray == 2)
365+
{
366+
try
367+
{
368+
types.Add(Enum.Parse<ConsumerGroupType>(commandArg));
369+
}
370+
catch (Exception)
371+
{
372+
error = $"Invalid type: {commandArg}";
373+
return false;
374+
}
375+
}
376+
else if (i == 0)
377+
{
378+
timeout = TimeSpan.FromSeconds(int.Parse(commandArg));
379+
}
380+
else
381+
{
382+
error = $"Unknown argument: {commandArg}";
383+
return false;
384+
}
385+
}
386+
}
387+
return true;
388+
}
389+
catch (SystemException e)
390+
{
391+
error = e.Message;
392+
return false;
393+
}
394+
}
395+
316396
static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsResultInfos)
317397
{
318398
foreach(var listOffsetsResultInfo in ListOffsetsResultInfos)
@@ -581,24 +661,15 @@ static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[
581661
static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] commandArgs)
582662
{
583663
var timeout = TimeSpan.FromSeconds(30);
584-
var statesList = new List<ConsumerGroupState>();
585-
try
586-
{
587-
if (commandArgs.Length > 0)
588-
{
589-
timeout = TimeSpan.FromSeconds(Int32.Parse(commandArgs[0]));
590-
}
591-
if (commandArgs.Length > 1)
592-
{
593-
for (int i = 1; i < commandArgs.Length; i++)
594-
{
595-
statesList.Add(Enum.Parse<ConsumerGroupState>(commandArgs[i]));
596-
}
597-
}
598-
}
599-
catch (SystemException)
664+
string error = "";
665+
var states = new List<ConsumerGroupState>();
666+
var types = new List<ConsumerGroupType>();
667+
668+
if (!ParseListConsumerGroupsArgs(commandArgs,
669+
ref timeout, ref states, ref types, ref error))
600670
{
601-
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [<timeout_seconds> <match_state_1> <match_state_2> ... <match_state_N>]");
671+
Console.WriteLine(error);
672+
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [timeout] [-states <match_state_1> .. <match_state_N>] [-types <match_type_1> .. <match_type_M>]");
602673
Environment.ExitCode = 1;
603674
return;
604675
}
@@ -610,7 +681,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
610681
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions()
611682
{
612683
RequestTimeout = timeout,
613-
MatchStates = statesList,
684+
MatchStates = states,
685+
MatchTypes = types,
614686
});
615687
Console.WriteLine(result);
616688
}

src/Confluent.Kafka/Admin/ConsumerGroupListing.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public class ConsumerGroupListing
3232
/// </summary>
3333
public ConsumerGroupState State { get; set; }
3434

35+
/// <summary>
36+
/// The type of the consumer group.
37+
/// </summary>
38+
public ConsumerGroupType Type { get; set; }
39+
3540
/// <summary>
3641
/// Whether the consumer group is simple or not.
3742
/// </summary>
@@ -42,7 +47,7 @@ public class ConsumerGroupListing
4247
/// </summary>
4348
public override string ToString()
4449
{
45-
return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}";
50+
return $"{GroupId}, State = {State}, Type = {Type}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}";
4651
}
4752
}
4853
}

src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,13 @@ public class ListConsumerGroupsOptions
4141
/// Default: null
4242
/// </summary>
4343
public IEnumerable<ConsumerGroupState> MatchStates { get; set; } = null;
44+
45+
/// <summary>
46+
/// An enumerable with the types to query, null to query for all
47+
/// the types.
48+
///
49+
/// Default: null
50+
/// </summary>
51+
public IEnumerable<ConsumerGroupType> MatchTypes { get; set; } = null;
4452
}
4553
}

src/Confluent.Kafka/AdminClient.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ private ListConsumerGroupsReport extractListConsumerGroupsResults(IntPtr resultP
282282
IsSimpleConsumerGroup =
283283
(int)Librdkafka.ConsumerGroupListing_is_simple_consumer_group(cglPtr) == 1,
284284
State = Librdkafka.ConsumerGroupListing_state(cglPtr),
285+
Type = Librdkafka.ConsumerGroupListing_type(cglPtr),
285286
};
286287
}).ToList();
287288
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka
19+
{
20+
/// <summary>
21+
/// Enumerates the different consumer group types.
22+
/// </summary>
23+
public enum ConsumerGroupType : int
24+
{
25+
/// <summary>
26+
/// Unknown type
27+
/// </summary>
28+
Unknown = 0,
29+
30+
/// <summary>
31+
/// Consumer type (KIP-848)
32+
/// </summary>
33+
Consumer = 1,
34+
35+
/// <summary>
36+
/// Classic type
37+
/// </summary>
38+
Classic = 2,
39+
};
40+
}

src/Confluent.Kafka/Impl/LibRdKafka.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,8 @@ static bool SetDelegates(Type nativeMethodsClass)
299299
_AdminOptions_set_opaque = (Action<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_opaque").CreateDelegate(typeof(Action<IntPtr, IntPtr>));
300300
_AdminOptions_set_require_stable_offsets = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_require_stable_offsets").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
301301
_AdminOptions_set_include_authorized_operations = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_include_authorized_operations").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
302-
_AdminOptions_set_match_consumer_group_states = (Func<IntPtr, ConsumerGroupState[], UIntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_states").CreateDelegate(typeof(Func<IntPtr, ConsumerGroupState[], UIntPtr, IntPtr>));
302+
_AdminOptions_set_match_consumer_group_states = (AdminOptions_set_match_consumer_group_states_delegate)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_states").CreateDelegate(typeof(AdminOptions_set_match_consumer_group_states_delegate));
303+
_AdminOptions_set_match_consumer_group_types = (AdminOptions_set_match_consumer_group_types_delegate)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_types").CreateDelegate(typeof(AdminOptions_set_match_consumer_group_types_delegate));
303304
_AdminOptions_set_isolation_level = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_isolation_level").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
304305

305306
_NewTopic_new = (Func<string, IntPtr, IntPtr, StringBuilder, UIntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_NewTopic_new").CreateDelegate(typeof(Func<string, IntPtr, IntPtr, StringBuilder, UIntPtr, IntPtr>));
@@ -410,6 +411,7 @@ static bool SetDelegates(Type nativeMethodsClass)
410411
_ConsumerGroupListing_group_id = (_ConsumerGroupListing_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_group_id").CreateDelegate(typeof (_ConsumerGroupListing_group_id_delegate));
411412
_ConsumerGroupListing_is_simple_consumer_group = (_ConsumerGroupListing_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupListing_is_simple_consumer_group_delegate));
412413
_ConsumerGroupListing_state = (_ConsumerGroupListing_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_state").CreateDelegate(typeof (_ConsumerGroupListing_state_delegate));
414+
_ConsumerGroupListing_type = (_ConsumerGroupListing_type_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_type").CreateDelegate(typeof (_ConsumerGroupListing_type_delegate));
413415
_ListConsumerGroups_result_valid = (_ListConsumerGroups_result_valid_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_valid").CreateDelegate(typeof (_ListConsumerGroups_result_valid_delegate));
414416
_ListConsumerGroups_result_errors = (_ListConsumerGroups_result_errors_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_errors").CreateDelegate(typeof (_ListConsumerGroups_result_errors_delegate));
415417

@@ -1336,10 +1338,16 @@ internal static IntPtr AdminOptions_set_include_authorized_operations(
13361338
IntPtr options,
13371339
IntPtr true_or_false) => _AdminOptions_set_include_authorized_operations(options, true_or_false);
13381340

1339-
private static Func<IntPtr, ConsumerGroupState[], UIntPtr, IntPtr> _AdminOptions_set_match_consumer_group_states;
1341+
private delegate IntPtr AdminOptions_set_match_consumer_group_states_delegate(IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt);
1342+
private static AdminOptions_set_match_consumer_group_states_delegate _AdminOptions_set_match_consumer_group_states;
13401343
internal static IntPtr AdminOptions_set_match_consumer_group_states(IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt)
13411344
=> _AdminOptions_set_match_consumer_group_states(options, states, statesCnt);
13421345

1346+
private delegate IntPtr AdminOptions_set_match_consumer_group_types_delegate(IntPtr options, ConsumerGroupType[] types, UIntPtr typesCnt);
1347+
private static AdminOptions_set_match_consumer_group_types_delegate _AdminOptions_set_match_consumer_group_types;
1348+
internal static IntPtr AdminOptions_set_match_consumer_group_types(IntPtr options, ConsumerGroupType[] types, UIntPtr typesCnt)
1349+
=> _AdminOptions_set_match_consumer_group_types(options, types, typesCnt);
1350+
13431351
private static Func<IntPtr, IntPtr, IntPtr> _AdminOptions_set_isolation_level;
13441352
internal static IntPtr AdminOptions_set_isolation_level(IntPtr options, IntPtr IsolationLevel)
13451353
=> _AdminOptions_set_isolation_level(options, IsolationLevel);
@@ -1883,6 +1891,11 @@ internal static IntPtr ConsumerGroupListing_is_simple_consumer_group(IntPtr grp
18831891
internal static ConsumerGroupState ConsumerGroupListing_state(IntPtr grplist)
18841892
=> _ConsumerGroupListing_state(grplist);
18851893

1894+
private delegate ConsumerGroupType _ConsumerGroupListing_type_delegate(IntPtr grplist);
1895+
private static _ConsumerGroupListing_type_delegate _ConsumerGroupListing_type;
1896+
internal static ConsumerGroupType ConsumerGroupListing_type(IntPtr grplist)
1897+
=> _ConsumerGroupListing_type(grplist);
1898+
18861899
private delegate IntPtr _ListConsumerGroups_result_valid_delegate(IntPtr result, out UIntPtr cntp);
18871900
private static _ListConsumerGroups_result_valid_delegate _ListConsumerGroups_result_valid;
18881901
internal static IntPtr ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp)

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta
564564
ConsumerGroupState[] states,
565565
UIntPtr statesCnt);
566566

567+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
568+
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types(
569+
IntPtr options,
570+
ConsumerGroupType[] types,
571+
UIntPtr typesCnt);
572+
567573
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
568574
internal static extern IntPtr rd_kafka_NewTopic_new(
569575
[MarshalAs(UnmanagedType.LPStr)] string topic,
@@ -1032,6 +1038,9 @@ internal static extern void rd_kafka_ListConsumerGroups(
10321038
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10331039
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist);
10341040

1041+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1042+
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist);
1043+
10351044
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10361045
internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp);
10371046

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta
568568
ConsumerGroupState[] states,
569569
UIntPtr statesCnt);
570570

571+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
572+
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types(
573+
IntPtr options,
574+
ConsumerGroupType[] types,
575+
UIntPtr typesCnt);
576+
571577
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
572578
internal static extern IntPtr rd_kafka_NewTopic_new(
573579
[MarshalAs(UnmanagedType.LPStr)] string topic,
@@ -1036,6 +1042,9 @@ internal static extern void rd_kafka_ListConsumerGroups(
10361042
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10371043
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist);
10381044

1045+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1046+
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist);
1047+
10391048
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10401049
internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp);
10411050

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta
568568
ConsumerGroupState[] states,
569569
UIntPtr statesCnt);
570570

571+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
572+
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types(
573+
IntPtr options,
574+
ConsumerGroupType[] types,
575+
UIntPtr typesCnt);
576+
571577
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
572578
internal static extern IntPtr rd_kafka_NewTopic_new(
573579
[MarshalAs(UnmanagedType.LPStr)] string topic,
@@ -1036,6 +1042,9 @@ internal static extern void rd_kafka_ListConsumerGroups(
10361042
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10371043
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist);
10381044

1045+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1046+
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist);
1047+
10391048
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10401049
internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp);
10411050

0 commit comments

Comments
 (0)