Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
90cbb39
WIP: Add custom streams config to proto
LaPetiteSouris May 3, 2020
e6e7371
Use gogo/protobuf for generating duration
LaPetiteSouris May 8, 2020
36c892f
Add TestParseStreamConfig
LaPetiteSouris May 8, 2020
b87a521
Drop support for Compact
LaPetiteSouris May 9, 2020
6c3081c
Parse CreateStream request for configuration
LaPetiteSouris May 9, 2020
fb63ee5
Check for 0 value before set StreamConfig
LaPetiteSouris May 9, 2020
00a7e68
Renaming CustomStreamConfig
LaPetiteSouris May 21, 2020
781527f
Add implementation readme for custom stream
LaPetiteSouris May 21, 2020
38933f3
WIP: use string for duration stream config
LaPetiteSouris May 23, 2020
d3e2753
Merge remote-tracking branch 'upstream/master' into stream-configuration
LaPetiteSouris May 23, 2020
5d0c2eb
Use int64 as configuration type for stream duration configs
LaPetiteSouris May 31, 2020
6b08a2e
Add CompactEnabled into stream request parsing
LaPetiteSouris May 31, 2020
e8d3437
Add test for log compaction
LaPetiteSouris May 31, 2020
6b3f207
Edit documentation
LaPetiteSouris Jun 1, 2020
aede9da
Use nested message for CustomStreamConfiguration
LaPetiteSouris Jun 13, 2020
924f0ab
Change server_test.go to use time.Duration for custom stream config
LaPetiteSouris Jun 14, 2020
db16bae
Update client implementation guidance docs
LaPetiteSouris Jun 14, 2020
f860ddd
Merge remote-tracking branch 'upstream/master' into stream-configuration
LaPetiteSouris Jun 14, 2020
2c55e3e
Support nullabletype in custom stream config
LaPetiteSouris Jun 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions documentation/client_implementation.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ configure a stream. Supported options are:
| MaxReplication | bool | Sets the stream replication factor equal to the current number of servers in the cluster. This means all partitions for the stream will be fully replicated within the cluster. | false |
| ReplicationFactor | int | Sets the replication factor for the stream. The replication factor controls the number of servers a stream's partitions should be replicated to. For example, a value of 1 would mean only 1 server would have the data, and a value of 3 would mean 3 servers would have it. A value of -1 will signal to the server to set the replication factor equal to the current number of servers in the cluster (i.e. MaxReplication). | 1 |
| Partitions | int | Sets the number of partitions for the stream. | 1 |
| RetentionMaxBytes | int64 | The maximum size a stream's log can grow to, in bytes, before we will discard old log segments to free up space. A value of 0 indicates no limit. | |
| RetentionMaxMessages | int64 | The maximum size a stream's log can grow to, in number of messages, before we will discard old log segments to free up space. A value of 0 indicates no limit. | |
| RetentionMaxAge | time.Duration | The TTL for stream log segment files, after which they are deleted. A value of 0 indicates no TTL | |
| CleanerInterval | time.Duration |The frequency to check if a new stream log segment file should be rolled and whether any segments are eligible for deletion based on the retention policy or compaction if enabled | |
| SegmentMaxBytes | int64 |The maximum size of a single stream log segment file in bytes. Retention is always done a file at a time, so a larger segment size means fewer files but less granular control over retention. | |
| SegmentMaxAge | time.Duration |The maximum time before a new stream log segment is rolled out. A value of 0 means new segments will only be rolled when segment.max.bytes is reached. Retention is always done a file at a time, so a larger value means fewer files but less granular control over retention. | |
| CompactMaxGoroutines | int32| The maximum number of concurrent goroutines to use for compaction on a stream log (only applicable if compact.enabled is true). | |
| SetCompactEnabled | bool | Enable message compaction by key on the server for this stream | |


`CreateStream` returns/throws an error if the operation fails, specifically
`ErrStreamExists` if a stream with the given name already exists.
Expand Down Expand Up @@ -285,7 +294,7 @@ configure a subscription. Supported options are:
| StartAtOffset | int | Sets the subscription start position to the first message with an offset greater than or equal to the given offset. | |
| StartAtTime | timestamp | Sets the subscription start position to the first message with a timestamp greater than or equal to the given time. | |
| StartAtTimeDelta | time duration | Sets the subscription start position to the first message with a timestamp greater than or equal to `now - delta`. | |
| ReadISRReplica | bool | Sets the subscription to one of a random ISR replica instead of subscribing to the partition's leader. | false |
| ReadISRReplica | | Sets the subscription to one of a random ISR replica instead of subscribing to the partition's leader. | false |

Currently, `Subscribe` can only subscribe to a single partition. In the future,
there will be functionality for consuming all partitions.
Expand Down Expand Up @@ -883,6 +892,41 @@ it using the [resilient RPC method](#rpcs) described above. If the
`AlreadyExists` gRPC error is returned, an `ErrStreamExists` error/exception is
thrown. Otherwise, any other error/exception is thrown if the operation failed.

Also, client can set custom configurations for the stream to be created. The exhaustive list of supported stream configuration are:

```
RetentionMaxBytes,
RetentionMaxMessages,
RetentionMaxAge,
CleanerInterval,
SegmentMaxBytes,
SegmentMaxAge,
CompactEnabled,
CompactMaxGoroutines,
```

Refer to [Sream Configuration](configuration.md#streams-configuration-settings) for more details
Note that these opts are optional, if not given, the default configurations of the broker will be used instead.

In order to differentiate between custom configuration and default server's configuration, we use 3 custom `NullableType` in setting options for the `CreateStreamRequest`. These custom types are

```proto
message NullableInt64 {
int64 value = 1;
}

message NullableInt32 {
int32 value = 1;
}

message NullableBool {
bool value = 1;
}

```

Note: if `CompactMaxGoroutines` is configured, you have to make sure manually that `CompacEnabled` is also set. The reason is that if this is not enabled explicitly, the servier will use default configuration and that may be to disable compaction on the service side, which renders `CompactMaxGoroutines` to be unused.

```go
// CreateStream creates a new stream attached to a NATS subject. Subject is the
// NATS subject the stream is attached to, and name is the stream identifier,
Expand All @@ -897,12 +941,21 @@ func (c *client) CreateStream(ctx context.Context, subject, name string, options
}

req := &proto.CreateStreamRequest{
Subject: subject,
Name: name,
ReplicationFactor: opts.ReplicationFactor,
Group: opts.Group,
Partitions: opts.Partitions,
Subject: subject,
Name: name,
ReplicationFactor: opts.ReplicationFactor,
Group: opts.Group,
Partitions: opts.Partitions,
RetentionMaxAge: opts.RetentionMaxAge,
RetentionMaxBytes: opts.RetentionMaxBytes,
RetentionMaxMessages: opts.RetentionMaxMessages,
CleanerInterval: opts.CleanerInterval,
SegmentMaxBytes: opts.SegmentMaxBytes,
SegmentMaxAge: opts.SegmentMaxAge,
CompactMaxGoroutines: opts.CompactMaxGoroutines,
CompactEnabled: opts.CompactEnabled,
}

err := c.doResilientRPC(func(client proto.APIClient) error {
_, err := client.CreateStream(ctx, req)
return err
Expand Down
3 changes: 1 addition & 2 deletions documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,4 @@ the configuration file.
|:----|:----|:----|:----|:----|:----|
| stream.enabled | | Enables the activity stream. This will create an internal stream called `__activity` which events will be published to. | bool | false | |
| stream.publish.timeout | | The timeout for publishes to the activity stream. This is the time to wait for an ack from the activity stream, which means it's related to `stream.publish.ack.policy`. If the ack policy is `none`, this has no effect. | duration | 5s | |
| stream.publish.ack.policy | | The ack policy to use for publishes to the activity stream. The value `none` means publishes will not wait for an ack, `leader` means publishes will wait for the ack sent when the leader has committed the event, and `all` means publishes will wait for the ack sent when all replicas have committed the event. | string | all | [none, leader, all] |

| stream.publish.ack.policy | | The ack policy to use for publishes to the activity stream. The value `none` means publishes will not wait for an ack, `leader` means publishes will wait for the ack sent when the leader has committed the event, and `all` means publishes will wait for the ack sent when all replicas have committed the event. | string | all | [none, leader, all] |
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down
41 changes: 38 additions & 3 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,46 @@ func (a *apiServer) CreateStream(ctx context.Context, req *client.CreateStreamRe
Id: i,
}
}
// set custom stream config
streamConfig := &proto.CustomStreamConfig{}

if req.RetentionMaxAge != nil {
streamConfig.RetentionMaxAge = &proto.NullableInt64{Value: req.RetentionMaxAge.GetValue()}
}

if req.CleanerInterval != nil {
streamConfig.CleanerInterval = &proto.NullableInt64{Value: req.CleanerInterval.GetValue()}
}

if req.SegmentMaxBytes != nil {
streamConfig.SegmentMaxBytes = &proto.NullableInt64{Value: req.SegmentMaxBytes.GetValue()}
}

if req.SegmentMaxAge != nil {
streamConfig.SegmentMaxAge = &proto.NullableInt64{Value: req.SegmentMaxAge.GetValue()}
}

if req.CompactMaxGoroutines != nil {
streamConfig.CompactMaxGoroutines = &proto.NullableInt32{Value: req.CompactMaxGoroutines.GetValue()}
}

if req.RetentionMaxBytes != nil {
streamConfig.RetentionMaxBytes = &proto.NullableInt64{Value: req.RetentionMaxBytes.GetValue()}
}

if req.RetentionMaxMessages != nil {
streamConfig.RetentionMaxMessages = &proto.NullableInt64{Value: req.RetentionMaxMessages.GetValue()}
}

if req.CompactEnabled != nil {
streamConfig.CompactEnabled = &proto.NullableBool{Value: req.CompactEnabled.GetValue()}
}

stream := &proto.Stream{
Name: req.Name,
Subject: req.Subject,
Partitions: partitions,
Name: req.Name,
Subject: req.Subject,
Partitions: partitions,
CustomStreamConfig: streamConfig,
}

if e := a.metadata.CreateStream(ctx, &proto.CreateStreamOp{Stream: stream}); e != nil {
Expand Down
45 changes: 43 additions & 2 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

"github.com/dustin/go-humanize"
"github.com/hako/durafmt"
client "github.com/liftbridge-io/liftbridge-api/go"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
"github.com/nats-io/nats.go"
"github.com/nats-io/nuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"

client "github.com/liftbridge-io/liftbridge-api/go"
)

const (
Expand Down Expand Up @@ -179,6 +179,47 @@ func (l StreamsConfig) RetentionString() string {
return str
}

// ParseCustomStreamConfig tries to parse streams config from the request
// to StreamConfig struct. If the value is present in the request's config section,
// it will be set in StreamConfig
func (l *StreamsConfig) ParseCustomStreamConfig(c *proto.CustomStreamConfig) {
if c == nil {
return
}
// By default, duration configuration a considered as millisecon
retentionMaxAge := c.GetRetentionMaxAge()
if retentionMaxAge != nil {
l.RetentionMaxAge = time.Duration(retentionMaxAge.GetValue()) * time.Millisecond
}
cleanerInterval := c.GetCleanerInterval()
if cleanerInterval != nil {
l.CleanerInterval = time.Duration(cleanerInterval.GetValue()) * time.Millisecond
}
segmentMaxAge := c.GetSegmentMaxAge()
if segmentMaxAge != nil {
l.SegmentMaxAge = time.Duration(segmentMaxAge.GetValue()) * time.Millisecond
}
if c.GetRetentionMaxBytes() != nil {
l.RetentionMaxBytes = c.GetRetentionMaxBytes().GetValue()
}
if c.GetRetentionMaxMessages() != nil {
l.RetentionMaxMessages = c.GetRetentionMaxMessages().GetValue()
}

if c.GetSegmentMaxBytes() != nil {
l.SegmentMaxBytes = c.GetSegmentMaxBytes().GetValue()
}

if c.GetCompactEnabled() != nil {
l.Compact = c.GetCompactEnabled().GetValue()
}

if c.GetCompactMaxGoroutines() != nil {
l.CompactMaxGoroutines = int(c.GetCompactMaxGoroutines().GetValue())
}

}

// ClusteringConfig contains settings for controlling cluster behavior.
type ClusteringConfig struct {
ServerID string
Expand Down
105 changes: 105 additions & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"

client "github.com/liftbridge-io/liftbridge-api/go"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

// Ensure NewConfig properly parses config files.
Expand Down Expand Up @@ -122,3 +123,107 @@ func TestNewConfigUnknownSetting(t *testing.T) {
_, err := NewConfig("configs/unknown-setting.yaml")
require.Error(t, err)
}

// Ensure custom's StreamConfig can be parsed correctly
// if a given value is present in the custom's StreamConfig
// it should be set, otherwise, default values should be kept
func TestParseCustomStreamConfig(t *testing.T) {
// Given custom stream config
// duration configuration is in millisecond
customStreamConfig := &proto.CustomStreamConfig{
SegmentMaxBytes: &proto.NullableInt64{Value: 1024},
SegmentMaxAge: &proto.NullableInt64{Value: 1000000},
RetentionMaxBytes: &proto.NullableInt64{Value: 2048},
RetentionMaxMessages: &proto.NullableInt64{Value: 1000},
RetentionMaxAge: &proto.NullableInt64{Value: 1000000},
CleanerInterval: &proto.NullableInt64{Value: 1000000},
CompactMaxGoroutines: &proto.NullableInt32{Value: 10},
}
streamConfig := StreamsConfig{}

streamConfig.ParseCustomStreamConfig(customStreamConfig)

s, _ := time.ParseDuration("1000s")

// Expect custom stream config overwrites default stream config
require.Equal(t, int64(1024), streamConfig.SegmentMaxBytes)
require.Equal(t, s, streamConfig.SegmentMaxAge)
require.Equal(t, int64(2048), streamConfig.RetentionMaxBytes)
require.Equal(t, int64(1000), streamConfig.RetentionMaxMessages)
require.Equal(t, s, streamConfig.RetentionMaxAge)
require.Equal(t, s, streamConfig.CleanerInterval)
require.Equal(t, 10, streamConfig.CompactMaxGoroutines)

}

// Ensure default stream configs are always present,
// this should be the case when custom's stream configs are not set
func TestDefaultCustomStreamConfig(t *testing.T) {
s, _ := time.ParseDuration("1000s")
// Given a default stream config
streamConfig := StreamsConfig{SegmentMaxBytes: 2048, SegmentMaxAge: s}

// Given custom configs
customStreamConfig := &proto.CustomStreamConfig{
RetentionMaxBytes: &proto.NullableInt64{Value: 1024},
RetentionMaxMessages: &proto.NullableInt64{Value: 1000},
RetentionMaxAge: &proto.NullableInt64{Value: 1000000},
CleanerInterval: &proto.NullableInt64{Value: 1000000},
CompactMaxGoroutines: &proto.NullableInt32{Value: 10},
}

streamConfig.ParseCustomStreamConfig(customStreamConfig)

// Ensure that in case of non-overlap values, default configs
// remain present
require.Equal(t, int64(2048), streamConfig.SegmentMaxBytes)
require.Equal(t, s, streamConfig.SegmentMaxAge)

// Ensure values from custom configs overwrite default configs
require.Equal(t, int64(1024), streamConfig.RetentionMaxBytes)
require.Equal(t, int64(1000), streamConfig.RetentionMaxMessages)
require.Equal(t, s, streamConfig.RetentionMaxAge)
require.Equal(t, s, streamConfig.CleanerInterval)
require.Equal(t, 10, streamConfig.CompactMaxGoroutines)

}

// Ensure compact activation is correctly parsed
func TestCompactEnabledInCustomStreamConfig(t *testing.T) {
// Given a default stream config
streamConfig := StreamsConfig{}

// Given custom configs with option to disable compact
customStreamConfig := &proto.CustomStreamConfig{
CompactEnabled: &proto.NullableBool{Value: false},
}

streamConfig.ParseCustomStreamConfig(customStreamConfig)

// Ensure that stream config correctly disable compact option
require.Equal(t, false, streamConfig.Compact)

// Given a default stream config
streamConfig2 := StreamsConfig{}
// Given custom configs with option to enable compact
customStreamConfig2 := &proto.CustomStreamConfig{
CompactEnabled: &proto.NullableBool{Value: true},
}

streamConfig2.ParseCustomStreamConfig(customStreamConfig2)

// Ensure that stream config correctly enable compact option
require.Equal(t, true, streamConfig2.Compact)

// Given a default stream config with default compaction disabled
streamConfig3 := StreamsConfig{}

// Given custom configs with NO option to configure compact
customStreamConfig3 := &proto.CustomStreamConfig{}

streamConfig3.ParseCustomStreamConfig(customStreamConfig3)

// Ensure that stream default config is retained (by default compact.enabled is set
// to true)
require.Equal(t, true, streamConfig2.Compact)
}
16 changes: 12 additions & 4 deletions server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,24 +655,29 @@ func (m *metadataAPI) AddStream(protoStream *proto.Stream, recovered bool) (*str
stream := newStream(protoStream.Name, protoStream.Subject)
m.streams[protoStream.Name] = stream

// Get stream configuration

customStreamsConfig := protoStream.GetCustomStreamConfig()

for _, partition := range protoStream.Partitions {
if err := m.addPartition(stream, partition, recovered); err != nil {
if err := m.addPartition(stream, partition, recovered, customStreamsConfig); err != nil {
delete(m.streams, protoStream.Name)
return nil, err
}
}

return stream, nil
}

func (m *metadataAPI) addPartition(stream *stream, protoPartition *proto.Partition, recovered bool) error {
func (m *metadataAPI) addPartition(stream *stream, protoPartition *proto.Partition, recovered bool, protoStreamsConfig *proto.CustomStreamConfig) error {
if p := stream.GetPartition(protoPartition.Id); p != nil {
// Partition already exists for stream.
return fmt.Errorf("partition %d already exists for stream %s",
protoPartition.Id, protoPartition.Stream)
}

// This will initialize/recover the durable commit log.
partition, err := m.newPartition(protoPartition, recovered)
partition, err := m.newPartition(protoPartition, recovered, protoStreamsConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -713,7 +718,10 @@ func (m *metadataAPI) ResumePartition(streamName string, id int32, recovered boo
}

// Resume the partition by replacing it.
partition, err := m.newPartition(partition.Partition, recovered)
// nil given as configuration means we do not set
// custom stream's configuration. The default broker configuration
// will be used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this on partition resume?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as (* server) newParition() signature changes to

func (s *Server) newPartition(protoPartition *proto.Partition, recovered bool, protoStreamsConfig *proto.CustomStreamConfig) (*partition, error)

so I have to pass nil here

partition, err := m.newPartition(partition.Partition, recovered, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMetadataAddPartitionAlreadyExists(t *testing.T) {
Stream: "foo",
Subject: "foo",
Id: 0,
}, false)
}, false, nil)
require.Error(t, err)
}

Expand Down
Loading