-
Notifications
You must be signed in to change notification settings - Fork 369
Description
Expected behavior
consumers under the same namespace are not blocked
Actual behavior
consumers under the same namespace are all blocked
Steps to reproduce
1.create several pulsar topics, they under same namespace, and all topic have consumers. send msg to these topics continuously.
2.create some consumers, this consumers contain retry and dlq queue, the code as below.
`
c, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name, target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
SubscriptionName: common.GetEventbridgeTargetSubscriptionNameV2(target.Name, target.ID),,
Type: pulsar.Shared,
RetryEnable: true,
NackBackoffPolicy: ExponentialNackBackoffRetryPolicy{},
DLQ: &pulsar.DLQPolicy{
MaxDeliveries: 20,
DeadLetterTopic: common.GetEventbridgeTargetDLQTopicNameV2(
common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name, target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
target.Name,
target.ID,
),
RetryLetterTopic: common.GetEventbridgeTargetRetryTopicNameV2(
common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name, target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
target.Name,
target.ID,
),
InitSubscriptionName: "default",
},
})
`
common package as below, very simple logic
`
func GetEventbridgeRuleTopicNameV2(eventbridgeName string, eventbridgeId int64, ruleName string, ruleId int64) string {
return fmt.Sprintf("persistent://eventbridge/%s-%d/rule-%s-%d", eventbridgeName, eventbridgeId, ruleName, ruleId)
}
func GetEventbridgeTargetSubscriptionNameV2(targetName string, targetId int64) string {
return fmt.Sprintf("target-%s-%d", targetName, targetId)
}
func GetEventbridgeTargetDLQTopicNameV2(topic string, targetName string, targetId int64) string {
return fmt.Sprintf("%s-target-%s-%d-DLQ", topic, targetName, targetId)
}
func GetEventbridgeRuleTopicNameV2(eventbridgeName string, eventbridgeId int64, ruleName string, ruleId int64) string {
return fmt.Sprintf("persistent://eventbridge/%s-%d/rule-%s-%d", eventbridgeName, eventbridgeId, ruleName, ruleId)
}
func GetEventbridgeTargetRetryTopicNameV2(topic string, targetName string, targetId int64) string {
return fmt.Sprintf("%s-target-%s-%d-RETRY", topic, targetName, targetId)
}
`
3. after consumer reviced msg, we assume handle msg occurs error, invoke unAck() method directly.
4. the producer sustain send message to topic, the message flow is very large, after several time, the consumer could not consumer any message, also under the same namespace, other topic's consumer could nerver consume any msg either(other topic msg flow is very small).
we use pulsar-client-go 0.12.0, find above problem exist, but when we use 0.15.0, the problem is disappear
so I want to know why 0.12.0 have this problem, if there is some issue that linked this problem.
the pulsar conf below, we set the unacked msg all are 0
System configuration
Pulsar Server version: both 2.8.1 and 3.0.6 have this problem
Pulsar Go Client version: 0.12.0
