Skip to content

Commit b06e83d

Browse files
Support nullabletype in custom stream config
1 parent f860ddd commit b06e83d

File tree

8 files changed

+409
-287
lines changed

8 files changed

+409
-287
lines changed

documentation/client_implementation.md

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ configure a stream. Supported options are:
162162
| SegmentMaxBytes | int64 |The maximum size of a single stream log segment file in bytes. Retention is always done a file at a time, so a larger segment size means fewer files but less granular control over retention. | |
163163
| SegmentMaxAge | time.Duration |The maximum time before a new stream log segment is rolled out. A value of 0 means new segments will only be rolled when segment.max.bytes is reached. Retention is always done a file at a time, so a larger value means fewer files but less granular control over retention. | |
164164
| CompactMaxGoroutines | int32| The maximum number of concurrent goroutines to use for compaction on a stream log (only applicable if compact.enabled is true). | |
165-
| EnableCompact | | Enable message compaction by key on the server for this stream | |
166-
| DisableCompact | | Disable message compaction by key on the server for this stream | |
165+
| SetCompactEnabled | bool | Enable message compaction by key on the server for this stream | |
166+
167167

168168
`CreateStream` returns/throws an error if the operation fails, specifically
169169
`ErrStreamExists` if a stream with the given name already exists.
@@ -894,7 +894,6 @@ thrown. Otherwise, any other error/exception is thrown if the operation failed.
894894

895895
Also, client can set custom configurations for the stream to be created. The exhaustive list of supported stream configuration are:
896896

897-
898897
```
899898
RetentionMaxBytes,
900899
RetentionMaxMessages,
@@ -907,7 +906,24 @@ CompactMaxGoroutines,
907906
```
908907

909908
Refer to [Sream Configuration](configuration.md#streams-configuration-settings) for more details
910-
Note that these opts are optional, if not given, the default configurations of the broker will be used instead.
909+
Note that these opts are optional, if not given, the default configurations of the broker will be used instead.
910+
911+
In order to differentiate between custom configuration and default server's configuration, we use 3 custom `NullableType` in setting options for the `CreateStreamRequest`. These custom types are
912+
913+
```proto
914+
message NullableInt64 {
915+
int64 value = 1;
916+
}
917+
918+
message NullableInt32 {
919+
int32 value = 1;
920+
}
921+
922+
message NullableBool {
923+
bool value = 1;
924+
}
925+
926+
```
911927

912928
Note: if `CompactMaxGoroutines` is configured, you have to make sure manually that `CompacEnabled` is also set. The reason is that if this is not enabled explicitly, the servier will use default configuration and that may be to disable compaction on the service side, which renders `CompactMaxGoroutines` to be unused.
913929

@@ -930,21 +946,14 @@ func (c *client) CreateStream(ctx context.Context, subject, name string, options
930946
ReplicationFactor: opts.ReplicationFactor,
931947
Group: opts.Group,
932948
Partitions: opts.Partitions,
933-
RetentionMaxAge: opts.RetentionMaxAge.Milliseconds(),
934-
CleanerInterval: opts.CleanerInterval.Milliseconds(),
949+
RetentionMaxAge: opts.RetentionMaxAge,
950+
RetentionMaxBytes: opts.RetentionMaxBytes,
951+
RetentionMaxMessages: opts.RetentionMaxMessages,
952+
CleanerInterval: opts.CleanerInterval,
935953
SegmentMaxBytes: opts.SegmentMaxBytes,
936-
SegmentMaxAge: opts.SegmentMaxAge.Milliseconds(),
954+
SegmentMaxAge: opts.SegmentMaxAge,
937955
CompactMaxGoroutines: opts.CompactMaxGoroutines,
938-
}
939-
940-
if opts.RetentionMaxBytes != nil {
941-
req.RetentionMaxBytes = opts.RetentionMaxBytes
942-
}
943-
if opts.RetentionMaxMessages != nil {
944-
req.RetentionMaxMessages = opts.RetentionMaxMessages
945-
}
946-
if opts.CompactEnabled != nil {
947-
req.CompactEnabled = opts.CompactEnabled
956+
CompactEnabled: opts.CompactEnabled,
948957
}
949958

950959
err := c.doResilientRPC(func(client proto.APIClient) error {

server/api.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,38 @@ func (a *apiServer) CreateStream(ctx context.Context, req *client.CreateStreamRe
5656
}
5757
}
5858
// set custom stream config
59-
streamConfig := &proto.CustomStreamConfig{
60-
RetentionMaxAge: req.RetentionMaxAge,
61-
CleanerInterval: req.CleanerInterval,
62-
SegmentMaxBytes: req.SegmentMaxBytes,
63-
SegmentMaxAge: req.SegmentMaxAge,
64-
CompactMaxGoroutines: req.CompactMaxGoroutines,
59+
streamConfig := &proto.CustomStreamConfig{}
60+
61+
if req.RetentionMaxAge != nil {
62+
streamConfig.RetentionMaxAge = &proto.NullableInt64{Value: req.RetentionMaxAge.GetValue()}
63+
}
64+
65+
if req.CleanerInterval != nil {
66+
streamConfig.CleanerInterval = &proto.NullableInt64{Value: req.CleanerInterval.GetValue()}
67+
}
68+
69+
if req.SegmentMaxBytes != nil {
70+
streamConfig.SegmentMaxBytes = &proto.NullableInt64{Value: req.SegmentMaxBytes.GetValue()}
71+
}
72+
73+
if req.SegmentMaxAge != nil {
74+
streamConfig.SegmentMaxAge = &proto.NullableInt64{Value: req.SegmentMaxAge.GetValue()}
75+
}
76+
77+
if req.CompactMaxGoroutines != nil {
78+
streamConfig.CompactMaxGoroutines = &proto.NullableInt32{Value: req.CompactMaxGoroutines.GetValue()}
6579
}
6680

6781
if req.RetentionMaxBytes != nil {
68-
streamConfig.RetentionMaxBytes = &proto.RetentionMaxBytes{Value: req.RetentionMaxBytes.GetValue()}
82+
streamConfig.RetentionMaxBytes = &proto.NullableInt64{Value: req.RetentionMaxBytes.GetValue()}
6983
}
7084

7185
if req.RetentionMaxMessages != nil {
72-
streamConfig.RetentionMaxMessages = &proto.RetentionMaxMessages{Value: req.RetentionMaxMessages.GetValue()}
86+
streamConfig.RetentionMaxMessages = &proto.NullableInt64{Value: req.RetentionMaxMessages.GetValue()}
7387
}
7488

7589
if req.CompactEnabled != nil {
76-
streamConfig.CompactEnabled = &proto.CompactEnabled{Value: req.CompactEnabled.GetValue()}
90+
streamConfig.CompactEnabled = &proto.NullableBool{Value: req.CompactEnabled.GetValue()}
7791
}
7892

7993
stream := &proto.Stream{

server/config.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,16 @@ func (l *StreamsConfig) ParseCustomStreamConfig(c *proto.CustomStreamConfig) {
188188
}
189189
// By default, duration configuration a considered as millisecon
190190
retentionMaxAge := c.GetRetentionMaxAge()
191-
if retentionMaxAge != 0 {
192-
l.RetentionMaxAge = time.Duration(retentionMaxAge) * time.Millisecond
191+
if retentionMaxAge != nil {
192+
l.RetentionMaxAge = time.Duration(retentionMaxAge.GetValue()) * time.Millisecond
193193
}
194194
cleanerInterval := c.GetCleanerInterval()
195-
if cleanerInterval != 0 {
196-
l.CleanerInterval = time.Duration(cleanerInterval) * time.Millisecond
195+
if cleanerInterval != nil {
196+
l.CleanerInterval = time.Duration(cleanerInterval.GetValue()) * time.Millisecond
197197
}
198198
segmentMaxAge := c.GetSegmentMaxAge()
199-
if segmentMaxAge != 0 {
200-
l.SegmentMaxAge = time.Duration(segmentMaxAge) * time.Millisecond
199+
if segmentMaxAge != nil {
200+
l.SegmentMaxAge = time.Duration(segmentMaxAge.GetValue()) * time.Millisecond
201201
}
202202
if c.GetRetentionMaxBytes() != nil {
203203
l.RetentionMaxBytes = c.GetRetentionMaxBytes().GetValue()
@@ -206,16 +206,16 @@ func (l *StreamsConfig) ParseCustomStreamConfig(c *proto.CustomStreamConfig) {
206206
l.RetentionMaxMessages = c.GetRetentionMaxMessages().GetValue()
207207
}
208208

209-
if c.GetSegmentMaxBytes() != 0 {
210-
l.SegmentMaxBytes = c.GetSegmentMaxBytes()
209+
if c.GetSegmentMaxBytes() != nil {
210+
l.SegmentMaxBytes = c.GetSegmentMaxBytes().GetValue()
211211
}
212212

213213
if c.GetCompactEnabled() != nil {
214214
l.Compact = c.GetCompactEnabled().GetValue()
215215
}
216216

217-
if c.GetCompactMaxGoroutines() != 0 {
218-
l.CompactMaxGoroutines = int(c.GetCompactMaxGoroutines())
217+
if c.GetCompactMaxGoroutines() != nil {
218+
l.CompactMaxGoroutines = int(c.GetCompactMaxGoroutines().GetValue())
219219
}
220220

221221
}

server/config_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ func TestParseCustomStreamConfig(t *testing.T) {
131131
// Given custom stream config
132132
// duration configuration is in millisecond
133133
customStreamConfig := &proto.CustomStreamConfig{
134-
SegmentMaxBytes: 1024,
135-
SegmentMaxAge: 1000000,
136-
RetentionMaxBytes: &proto.RetentionMaxBytes{Value: 2048},
137-
RetentionMaxMessages: &proto.RetentionMaxMessages{Value: 1000},
138-
RetentionMaxAge: 1000000,
139-
CleanerInterval: 1000000,
140-
CompactMaxGoroutines: 10,
134+
SegmentMaxBytes: &proto.NullableInt64{Value: 1024},
135+
SegmentMaxAge: &proto.NullableInt64{Value: 1000000},
136+
RetentionMaxBytes: &proto.NullableInt64{Value: 2048},
137+
RetentionMaxMessages: &proto.NullableInt64{Value: 1000},
138+
RetentionMaxAge: &proto.NullableInt64{Value: 1000000},
139+
CleanerInterval: &proto.NullableInt64{Value: 1000000},
140+
CompactMaxGoroutines: &proto.NullableInt32{Value: 10},
141141
}
142142
streamConfig := StreamsConfig{}
143143

@@ -165,11 +165,11 @@ func TestDefaultCustomStreamConfig(t *testing.T) {
165165

166166
// Given custom configs
167167
customStreamConfig := &proto.CustomStreamConfig{
168-
RetentionMaxBytes: &proto.RetentionMaxBytes{Value: 1024},
169-
RetentionMaxMessages: &proto.RetentionMaxMessages{Value: 1000},
170-
RetentionMaxAge: 1000000,
171-
CleanerInterval: 1000000,
172-
CompactMaxGoroutines: 10,
168+
RetentionMaxBytes: &proto.NullableInt64{Value: 1024},
169+
RetentionMaxMessages: &proto.NullableInt64{Value: 1000},
170+
RetentionMaxAge: &proto.NullableInt64{Value: 1000000},
171+
CleanerInterval: &proto.NullableInt64{Value: 1000000},
172+
CompactMaxGoroutines: &proto.NullableInt32{Value: 10},
173173
}
174174

175175
streamConfig.ParseCustomStreamConfig(customStreamConfig)
@@ -195,7 +195,7 @@ func TestCompactEnabledInCustomStreamConfig(t *testing.T) {
195195

196196
// Given custom configs with option to disable compact
197197
customStreamConfig := &proto.CustomStreamConfig{
198-
CompactEnabled: &proto.CompactEnabled{Value: false},
198+
CompactEnabled: &proto.NullableBool{Value: false},
199199
}
200200

201201
streamConfig.ParseCustomStreamConfig(customStreamConfig)
@@ -207,7 +207,7 @@ func TestCompactEnabledInCustomStreamConfig(t *testing.T) {
207207
streamConfig2 := StreamsConfig{}
208208
// Given custom configs with option to enable compact
209209
customStreamConfig2 := &proto.CustomStreamConfig{
210-
CompactEnabled: &proto.CompactEnabled{Value: true},
210+
CompactEnabled: &proto.NullableBool{Value: true},
211211
}
212212

213213
streamConfig2.ParseCustomStreamConfig(customStreamConfig2)

server/partition_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ func TestPartitionWithCustomConfigNoError(t *testing.T) {
578578
defer cleanupStorage(t)
579579
server := createServer(false)
580580
customStreamConfig := &proto.CustomStreamConfig{
581-
RetentionMaxMessages: &proto.RetentionMaxMessages{Value: 1000},
581+
RetentionMaxMessages: &proto.NullableInt64{Value: 1000},
582582
}
583583
p, err := server.newPartition(&proto.Partition{
584584
Subject: "foo",

0 commit comments

Comments
 (0)