Skip to content

Commit 16dd34a

Browse files
authored
KIP 848: Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config (#2366)
1 parent 6b37a82 commit 16dd34a

File tree

3 files changed

+35
-4
lines changed

3 files changed

+35
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Enhancements
44

55
* References librdkafka.redist 2.10.0. Refer to the [librdkafka v2.10.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0) for more information.
6+
* [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#2366)
67

78

89
# 2.9.0

src/Confluent.Kafka/Admin/ConfigSource.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public enum ConfigSource : int
5050
/// <summary>
5151
/// Default
5252
/// </summary>
53-
DefaultConfig = 5
53+
DefaultConfig = 5,
54+
55+
/// <summary>
56+
/// GROUP
57+
/// </summary>
58+
GroupConfig = 8
5459
};
5560
}

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_IncrementalAlterConfigs.cs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System.Collections.Generic;
2020
using Confluent.Kafka.Admin;
2121
using Xunit;
22+
using Confluent.Kafka.TestsCommon;
2223

2324

2425
namespace Confluent.Kafka.IntegrationTests
@@ -90,7 +91,7 @@ public void AdminClient_IncrementalAlterConfigs(string bootstrapServers)
9091
Assert.Equal("10001", describeConfigsResult[0].Entries["flush.ms"].Value);
9192
Assert.Equal("delete,compact", describeConfigsResult[0].Entries["cleanup.policy"].Value);
9293

93-
// 4. test ValidateOnly = true does not update config entry.
94+
// 5. test ValidateOnly = true does not update config entry.
9495
toUpdate = new Dictionary<ConfigResource, List<ConfigEntry>>
9596
{
9697
{ configResource, new List<ConfigEntry> { new ConfigEntry { Name = "flush.ms", Value = "20002" , IncrementalOperation = AlterConfigOpType.Set } } }
@@ -100,7 +101,7 @@ public void AdminClient_IncrementalAlterConfigs(string bootstrapServers)
100101
describeConfigsResult = adminClient.DescribeConfigsAsync(new List<ConfigResource> { configResource }).Result;
101102
Assert.Equal("10001", describeConfigsResult[0].Entries["flush.ms"].Value);
102103

103-
// 5. test updating broker resource.
104+
// 6. test updating broker resource.
104105
toUpdate = new Dictionary<ConfigResource, List<ConfigEntry>>
105106
{
106107
{
@@ -110,7 +111,7 @@ public void AdminClient_IncrementalAlterConfigs(string bootstrapServers)
110111
};
111112
adminClient.IncrementalAlterConfigsAsync(toUpdate).Wait();
112113

113-
// 6. test updating more than one resource.
114+
// 7. test updating more than one resource.
114115
var configResource2 = new ConfigResource { Name = topicName2, Type = ResourceType.Topic };
115116
toUpdate = new Dictionary<ConfigResource, List<ConfigEntry>>
116117
{
@@ -123,6 +124,30 @@ public void AdminClient_IncrementalAlterConfigs(string bootstrapServers)
123124
Assert.Equal(2, describeConfigsResult.Count);
124125
Assert.Equal("222", describeConfigsResult[0].Entries["flush.ms"].Value);
125126
Assert.Equal("333", describeConfigsResult[1].Entries["flush.ms"].Value);
127+
128+
// TODO: enable this test for the classic run too, when
129+
// Confluent Platform test cluster is upgraded to 8.0.0.
130+
if(!TestConsumerGroupProtocol.IsClassic()) {
131+
// 8. test updating ResourceType.Group
132+
string groupName = Guid.NewGuid().ToString();
133+
LogToFile($"Testing IncrementalAlterConfigs for consumer group {groupName}");
134+
var groupConfigResource = new ConfigResource { Name = groupName, Type = ResourceType.Group };
135+
var groupToUpdate = new Dictionary<ConfigResource, List<ConfigEntry>>
136+
{
137+
{
138+
groupConfigResource,
139+
new List<ConfigEntry> {
140+
new ConfigEntry { Name = "consumer.session.timeout.ms", Value = "50000", IncrementalOperation = AlterConfigOpType.Set }
141+
}
142+
}
143+
};
144+
adminClient.IncrementalAlterConfigsAsync(groupToUpdate).Wait();
145+
Thread.Sleep(TimeSpan.FromMilliseconds(200));
146+
var describeGroupConfigsResult = adminClient.DescribeConfigsAsync(new List<ConfigResource> { groupConfigResource }).Result;
147+
Assert.Single(describeGroupConfigsResult);
148+
Assert.Equal("50000", describeGroupConfigsResult[0].Entries["consumer.session.timeout.ms"].Value);
149+
LogToFile($"Successfully updated consumer.group {groupName} config");
150+
}
126151
}
127152

128153
Assert.Equal(0, Library.HandleCount);

0 commit comments

Comments
 (0)