-
Notifications
You must be signed in to change notification settings - Fork 11
Closed
cowpaths/orbit.go
#3Description
When processing group updates, there's a condition where a consumer is created, but the consume loop is not started. This results in a broken state where the only thing that will fix it is a member change.
You can pretty easily recreate this issue by adding a few of consumers to the group when it's already running.
The issue comes down to the logic here:
orbit.go/pcgroups/elastic_stream_consumer_group.go
Lines 709 to 727 in 4cc1632
| if errors.Is(err, jetstream.ErrConsumerExists) { | |
| // try to delete the consumer if we can't create it to our desired config, we or someone else will try to re-create it within 5 seconds | |
| err := instance.js.DeleteConsumer(ctx, composeCGSName(instance.StreamName, instance.ConsumerGroupName), instance.MemberName) | |
| if err != nil { | |
| log.Printf("Warning: error trying to delete our member's consumer after trying to create it to our desired config: %v\n", err) | |
| // will try again later | |
| return | |
| } else { | |
| instance.consumer, err = instance.js.CreateConsumer(ctx, composeCGSName(instance.StreamName, instance.ConsumerGroupName), config) | |
| if err != nil { | |
| // will try again later | |
| return | |
| } | |
| } | |
| } | |
| // just return in any case | |
| // not logging here because some errors can happen during normal operation | |
| // e.g. JS API error: filtered consumer not unique on workqueue stream can happen because all the members cannot be perfectly synchronized processing membership changes | |
| return |
Basically, if there's an error with the first create, we unconditionally return, even if the second create succeeds.
Metadata
Metadata
Assignees
Labels
No labels