Skip to content
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
## Enhancements

- References librdkafka.redist 2.3.0. Refer to the [librdkafka v2.3.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) for more information.
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).
- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484): Added support for ListOffsets Admin API (#2086).
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).


# 2.2.0
Expand Down
78 changes: 59 additions & 19 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,59 +242,85 @@ static List<UserScramCredentialAlteration> ParseUserScramCredentialAlterations(
return alterations;
}

static List<TopicPartitionOffsetSpec> ParseTopicPartitionOffsetSpecs(string[] args)
static Tuple<IsolationLevel, List<TopicPartitionOffsetSpec>> ParseListOffsetsArgs(string[] args)
{
if (args.Length == 0)
{
Console.WriteLine("usage: .. <bootstrapServers> list-offsets " +
"<topic> <partition> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..");
Console.WriteLine("usage: .. <bootstrapServers> list-offsets <isolation_level> " +
"<topic1> <partition1> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..");
Environment.ExitCode = 1;
return null;
}

var isolationLevel = Enum.Parse<IsolationLevel>(args[0]);
var topicPartitionOffsetSpecs = new List<TopicPartitionOffsetSpec>();
for (int i = 0; i < args.Length;) {
for (int i = 1; i < args.Length;)
{
if (args.Length < i+3)
{
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
}

string topic = args[i];
var partition = Int32.Parse(args[i + 1]);
var offsetSpec = args[i + 2];
if (offsetSpec == "TIMESTAMP")
{
if (args.Length < i+4)
{
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
}

var timestamp = Int64.Parse(args[i + 3]);
i = i + 1;
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.ForTimestamp(timestamp)
});
}
else if (offsetSpec == "MAXTIMESTAMP")
else if (offsetSpec == "MAX_TIMESTAMP")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.MaxTimestamp()
});
}
else if (offsetSpec == "EARLIEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Earliest()
});
}
else if (offsetSpec == "LATEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec {
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Latest()
});
}
else
{
throw new ArgumentException(
"offsetSpec can be EARLIEST, LATEST, MAXTIMESTAMP or TIMESTAMP T1.");
"offsetSpec can be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP T1.");
}
i = i + 3;
}
return topicPartitionOffsetSpecs;
return Tuple.Create(isolationLevel, topicPartitionOffsetSpecs);
}

static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsResultInfos)
{
foreach(var listOffsetsResultInfo in ListOffsetsResultInfos)
{
Console.WriteLine(" ListOffsetsResultInfo:");
Console.WriteLine($" TopicPartitionOffsetError: {listOffsetsResultInfo.TopicPartitionOffsetError}");
Console.WriteLine($" Timestamp: {listOffsetsResultInfo.Timestamp}");
}
}

static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
Expand Down Expand Up @@ -850,21 +876,35 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,

static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs) {

var topicPartitionOffsetSpecs = ParseTopicPartitionOffsetSpecs(commandArgs);
var listOffsetsArgs = ParseListOffsetsArgs(commandArgs);
if (listOffsetsArgs == null) { return; }

var isolationLevel = listOffsetsArgs.Item1;
var topicPartitionOffsets = listOffsetsArgs.Item2;

var timeout = TimeSpan.FromSeconds(30);
ListOffsetsOptions options = new ListOffsetsOptions(){RequestTimeout = timeout, IsolationLevel = Confluent.Kafka.Admin.IsolationLevel.ReadUncommitted};
ListOffsetsOptions options = new ListOffsetsOptions(){ RequestTimeout = timeout, IsolationLevel = isolationLevel };

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
var ListOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsetSpecs, options);
foreach(var ListOffsetsResultInfo in ListOffsetsResult.ListOffsetsResultInfos)
try
{
TopicPartitionOffsetError topicPartition = ListOffsetsResultInfo.TopicPartitionOffsetError;
long Timestamp = ListOffsetsResultInfo.Timestamp;
Console.WriteLine($"{topicPartition.Topic} ${topicPartition.Partition} ${topicPartition.Error.Code} ${topicPartition.Offset} ${Timestamp}");
var listOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsets, options);
Console.WriteLine("ListOffsetsResult:");
PrintListOffsetsResultInfos(listOffsetsResult.ListOffsetsResultInfos);
}
catch (ListOffsetsException e)
{
Console.WriteLine("ListOffsetsReport:");
Console.WriteLine($" Error: {e.Error}");
PrintListOffsetsResultInfos(e.Result.ListOffsetsResultInfos);
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred listing offsets: {e}");
Environment.ExitCode = 1;
}
}

}
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
{
Expand Down
36 changes: 0 additions & 36 deletions src/Confluent.Kafka/Admin/IsolationLevel.cs

This file was deleted.

4 changes: 2 additions & 2 deletions src/Confluent.Kafka/Admin/ListOffsetsException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ListOffsetsException : KafkaException
/// (whether or not they were in error). At least one of these
/// topic partiton in result will be in error.
/// </param>
public ListOffsetsException(ListOffsetsResult result)
public ListOffsetsException(ListOffsetsReport result)
: base(new Error(ErrorCode.Local_Partial,
"An error occurred in list offsets, check individual topic partiton in result."))
{
Expand All @@ -43,6 +43,6 @@ public ListOffsetsException(ListOffsetsResult result)
/// (whether or not they were in error). At least one of these
/// results will be in error.
/// </summary>
public ListOffsetsResult Result { get; }
public ListOffsetsReport Result { get; }
}
}
27 changes: 16 additions & 11 deletions src/Confluent.Kafka/Admin/ListOffsetsReport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,41 @@
//
// Refer to LICENSE for more information.
using System.Collections.Generic;
using System.Text;
using System.Linq;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a list offsets operation.
/// Represents an error that occurred during a ListOffsets request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of ListOffsets request (including error status).

/// </summary>
public class ListOffsetsReport
{
/// <summary>
/// ListOffsetsResultInfo Elements for all the TopicPartitions queried
/// for ListOffsets.
/// Result information for all the partitions queried
/// with ListOffsets. At least one of these
/// results will be in error.
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }

/// <summary>
/// List of non-client level errors encountered while listing offsets.
/// Operation error status, null if successful.
/// </summary>
public Error Error { get; set; }

/// <summary>
/// Returns a human readable representation of this object.
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
string res = "ListOffsetsReport :\n";
foreach (var listoffsetsresultinfo in ListOffsetsResultInfos)
{
res += listoffsetsresultinfo.ToString();
}
return res;
var result = new StringBuilder();
result.Append($"{{\"ListOffsetsResultInfos\": [");
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
result.Append($"], \"Error\": \"{Error.Code}\"}}");
return result.ToString();
}
}
}
25 changes: 15 additions & 10 deletions src/Confluent.Kafka/Admin/ListOffsetsResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,36 @@
//
// Refer to LICENSE for more information.
using System.Collections.Generic;
using System.Text;
using System.Linq;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a list offsets operation.
/// Represents the result of a ListOffsets request.
/// </summary>
public class ListOffsetsResult
{
/// <summary>
/// ListOffsetsResultInfo Elements for all the TopicPartitions queried
/// for ListOffsets
/// Result information for all the partitions queried
/// with ListOffsets.
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }


/// <summary>
/// Returns a human readable representation of this object.
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
string res = "ListOffsetsResult:\n";
foreach (var listoffsetsresultinfo in ListOffsetsResultInfos)
{
res += listoffsetsresultinfo.ToString();
}
return res;
var result = new StringBuilder();
result.Append($"{{\"ListOffsetsResultInfos\": [");
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
result.Append("]}");
return result.ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.
//
// Refer to LICENSE for more information.
using System.Collections.Generic;

using System.Text;


namespace Confluent.Kafka.Admin
{
Expand All @@ -28,20 +30,23 @@ public class ListOffsetsResultInfo
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }

/// <summary>
/// Timestamp Corresponding to the Offset, -1 if not set by broker.
/// Timestamp Corresponding to the offset, -1 if not set by the broker.
/// </summary>
public long Timestamp { get; set; }

/// <summary>
/// Returns a human readable representation of this object.
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
string res = "TopicPartitionOffsetError:\n";
res += TopicPartitionOffsetError.ToString();
res += "\n";
res += $"Timestamp : {Timestamp}\n";
return res;
var result = new StringBuilder();
result.Append($"{{\"TopicPartitionOffsetError\": {TopicPartitionOffsetError.ToString().Quote()}");
result.Append($", \"Timestamp\": {Timestamp}");
result.Append("}");
return result.ToString();
}
}
}
Loading