Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# vNext
# 2.3.0

## Enhancements

- References librdkafka.redist 2.3.0. Refer to the [librdkafka v2.3.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) for more information.
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).


# 2.2.0
Expand Down
218 changes: 208 additions & 10 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
// Copyright 2015-2016 Andreas Heider,
// 2016-2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -525,7 +526,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
{
try
{
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() {
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions()
{
RequestTimeout = timeout,
MatchStates = statesList,
});
Expand All @@ -546,23 +548,52 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm

static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 1)
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]");
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <username> <password> <include_authorized_operations> <group1> [<group2 ... <groupN>]");
Environment.ExitCode = 1;
return;
}

var groupNames = commandArgs.ToList();
var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");
var groupNames = commandArgs.Skip(3).ToList();

if (string.IsNullOrWhiteSpace(username))
{
username = null;
}
if (string.IsNullOrWhiteSpace(password))
{
password = null;
}

var timeout = TimeSpan.FromSeconds(30);
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout });
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
foreach (var group in descResult.ConsumerGroupDescriptions)
{
Console.WriteLine($" Group: {group.GroupId} {group.Error}");
Console.WriteLine($"\n Group: {group.GroupId} {group.Error}");
Console.WriteLine($" Broker: {group.Coordinator}");
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}");
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}");
Expand All @@ -579,6 +610,11 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
}
Console.WriteLine($" TopicPartitions: [{topicPartitions}]");
}
if (includeAuthorizedOperations)
{
string operations = string.Join(" ", group.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
}
catch (KafkaException e)
Expand Down Expand Up @@ -757,6 +793,162 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,
}
}

static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
{
foreach (var topic in topicDescriptions)
{
Console.WriteLine($"\n Topic: {topic.Name} {topic.Error}");
Console.WriteLine($" Partitions:");
foreach (var partition in topic.Partitions)
{
Console.WriteLine($" Partition ID: {partition.Partition} with leader: {partition.Leader}");
if(!partition.ISR.Any())
{
Console.WriteLine(" There is no In-Sync-Replica broker for the partition");
}
else
{
string isrs = string.Join("; ", partition.ISR);
Console.WriteLine($" The In-Sync-Replica brokers are: {isrs}");
}

if(!partition.Replicas.Any())
{
Console.WriteLine(" There is no Replica broker for the partition");
}
else
{
string replicas = string.Join("; ", partition.Replicas);
Console.WriteLine($" The Replica brokers are: {replicas}");
}

}
Console.WriteLine($" Is internal: {topic.IsInternal}");
if (includeAuthorizedOperations)
{
string operations = string.Join(" ", topic.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
}

static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-topics <username> <password> <include_authorized_operations> <topic1> [<topic2 ... <topicN>]");
Environment.ExitCode = 1;
return;
}

var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");
if (string.IsNullOrWhiteSpace(username))
{
username = null;
}
if (string.IsNullOrWhiteSpace(password))
{
password = null;
}
var topicNames = commandArgs.Skip(3).ToList();

var timeout = TimeSpan.FromSeconds(30);
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeTopicsAsync(
TopicCollection.OfTopicNames(topicNames),
new DescribeTopicsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
PrintTopicDescriptions(descResult.TopicDescriptions, includeAuthorizedOperations);
}
catch (DescribeTopicsException e)
{
// At least one TopicDescription will have an error.
PrintTopicDescriptions(e.Results.TopicDescriptions, includeAuthorizedOperations);
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred describing topics: {e}");
Environment.ExitCode = 1;
}
}
}

static async Task DescribeClusterAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-cluster <username> <password> <include_authorized_operations>");
Environment.ExitCode = 1;
return;
}

var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");

var timeout = TimeSpan.FromSeconds(30);
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});

Console.WriteLine($" Cluster Id: {descResult.ClusterId}\n Controller: {descResult.Controller}");
Console.WriteLine(" Nodes:");
foreach(var node in descResult.Nodes)
{
Console.WriteLine($" {node}");
}
if (includeAuthorizedOperations)
{
string operations = string.Join(" ", descResult.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred describing cluster: {e}");
Environment.ExitCode = 1;
}
}
}

public static async Task Main(string[] args)
{
if (args.Length < 2)
Expand All @@ -768,8 +960,8 @@ public static async Task Main(string[] args)
"list-consumer-groups", "describe-consumer-groups",
"list-consumer-group-offsets", "alter-consumer-group-offsets",
"incremental-alter-configs", "describe-user-scram-credentials",
"alter-user-scram-credentials"

"alter-user-scram-credentials", "describe-topics",
"describe-cluster"
}) +
" ..");
Environment.ExitCode = 1;
Expand Down Expand Up @@ -824,6 +1016,12 @@ public static async Task Main(string[] args)
case "alter-user-scram-credentials":
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs);
break;
case "describe-topics":
await DescribeTopicsAsync(bootstrapServers, commandArgs);
break;
case "describe-cluster":
await DescribeClusterAsync(bootstrapServers, commandArgs);
break;
default:
Console.WriteLine($"unknown command: {command}");
break;
Expand Down
44 changes: 42 additions & 2 deletions src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Confluent Inc.
// Copyright 2022-2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
// Refer to LICENSE for more information.

using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Confluent.Kafka.Admin
{
Expand Down Expand Up @@ -50,13 +52,51 @@ public class ConsumerGroupDescription
public ConsumerGroupState State { get; set; }

/// <summary>
/// Consumer group coordinator (broker).
/// Broker that acts as consumer group coordinator (null if not known).
/// </summary>
public Node Coordinator { get; set; }

/// <summary>
/// Members list.
/// </summary>
public List<MemberDescription> Members { get; set; }

/// <summary>
/// AclOperation list (null if not requested or not supported).
/// </summary>
public List<AclOperation> AuthorizedOperations { get; set; }

/// <summary>
/// Returns a JSON representation of this object.
/// </summary>
/// <returns>
/// A JSON representation of this object.
/// </returns>
public override string ToString()
{
var result = new StringBuilder();
var members = string.Join(",",
Members.Select(member =>
member.ToString()
).ToList());
var authorizedOperations = "null";
if (AuthorizedOperations != null)
{
authorizedOperations = string.Join(",",
AuthorizedOperations.Select(authorizedOperation =>
authorizedOperation.ToString().Quote()
).ToList());
authorizedOperations = $"[{authorizedOperations}]";
}

result.Append($"{{\"GroupId\": {GroupId.Quote()}");
result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}");
result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}");
result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]");
result.Append($", \"AuthorizedOperations\": {authorizedOperations}}}");

return result.ToString();
}

}
}
Loading