Skip to content

Stream configuration #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 26, 2020

Conversation

LaPetiteSouris
Copy link
Contributor

@LaPetiteSouris LaPetiteSouris commented May 21, 2020

Statement of purpose

The PR is to add support the customizing stream's configuration upon creation.

Currently the following configurations StreamConfiguration are set once for all on broker's level.

When a CreateStream request is created, client should have the chance to set the configuration by themselves, which will overwrite the default configuration on the broker level for that stream. This gives the client a lot of flexibility for stream configuration and usage.

Implementation details

The idea is to add a new section in internal.pb.proto of the server for stream level configuration.

// Internal Protocal CustomStreamConfig
message NullableInt64 {
    int64 value = 1; 
}

message NullableInt32 {
    int32 value = 1; 
}

message NullableBool {
    bool value = 1; 
}

message CustomStreamConfig  {
    NullableInt64     RetentionMaxBytes     = 1;
    NullableInt64     RetentionMaxMessages  = 2;
    NullableInt64     RetentionMaxAge       = 3;  
    NullableInt64     CleanerInterval       = 4;
    NullableInt64     SegmentMaxBytes       = 5;
    NullableInt64     SegmentMaxAge         = 6;
    NullableInt32     CompactMaxGoroutines  = 7;
    NullableBool      CompactEnabled        = 8;
}

And then enrich CreateStreamRequest

message NullableInt64 {
    int64 value = 1; 
}

message NullableInt32 {
    int32 value = 1; 
}

message NullableBool {
    bool value = 1; 
}

// CreateStreamRequest is sent to create a new stream.
message CreateStreamRequest {
    string                  subject              = 1; // Stream NATS subject
    string                  name                 = 2;  // Stream name (unique per subject)
    string                  group                = 3;  // Partitions NATS subject amongst group members
    int32                   replicationFactor    = 4;  // Number of stream replicas
    int32                   partitions           = 5;  // Number of stream partitions
    NullableInt64           RetentionMaxBytes    = 6;  // The maximum size a stream's log can grow to, in bytes.
    NullableInt64           RetentionMaxMessages = 7;  // The maximum size a stream's log can grow to, in messages
    NullableInt64           RetentionMaxAge      = 8;  // The TTL for stream log segment files 
    NullableInt64           CleanerInterval      = 9;  // The frequency to check for log cleaner
    NullableInt64           SegmentMaxBytes      = 10; // The maximum size of a single stream log segment file in bytes
    NullableInt64           SegmentMaxAge        = 11; // The maximum time before a new stream log segment is rolled out
    NullableInt32           CompactMaxGoroutines = 12; // The maximum number of concurrent goroutines to use for compaction on a stream log
    NullableBool            CompactEnabled       = 13; // CompactEnabled controls compaction for a stream.
}

On the API level, the api shall parse the custom stream configuration from proto.request to set the configuration properly upon stream's partition creation. Default value (set on broker levels) will be used in case these values are unset upon CreateStream request .

mermaild

Summaries of notable changes:

  • signature of metadata/addPartition now changes:
func (m *metadataAPI) addPartition(stream *stream, protoPartition *proto.Partition, recovered bool, protoStreamsConfig *proto.CustomStreamConfig) error
  • signature of partition/newPartition now changes to, which takes protoStreamConfig into account:
func (s *Server) newPartition(protoPartition *proto.Partition, recovered bool, protoStreamsConfig *proto.CustomStreamConfig) (*partition, error)
  • in metadata/AddStream, before calling addPartition, it tries to parse the embedded custom stream config from protoStream

Notes on duration values.

All duration type is not supported due to complexity issue with google/protobuf.

As of know, duration configuration is used simple int64 as value and consider this value in millisecond time unit

Required

Required PRs:
api
go-client

@LaPetiteSouris
Copy link
Contributor Author

Just update the PR description to give accurate context and implementation overview of the PR

@LaPetiteSouris LaPetiteSouris force-pushed the stream-configuration branch 3 times, most recently from 73acd8f to 3bdfe16 Compare June 14, 2020 08:00
@LaPetiteSouris
Copy link
Contributor Author

Updated the description with detailed implementation , hope this help a bit in going through the PR.

partition, err := m.newPartition(partition.Partition, recovered)
// nil given as configuration means we do not set
// custom stream's configuration. The default broker configuration
// will be used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this on partition resume?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as (* server) newParition() signature changes to

func (s *Server) newPartition(protoPartition *proto.Partition, recovered bool, protoStreamsConfig *proto.CustomStreamConfig) (*partition, error)

so I have to pass nil here

@tylertreat tylertreat merged commit 2c55e3e into liftbridge-io:master Jun 26, 2020
@LaPetiteSouris LaPetiteSouris deleted the stream-configuration branch November 14, 2022 16:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants