Skip to content

Commit aa090be

Browse files
authored
[Issue 1223] Support ZeroQueueConsumer (#1225)
Fixes #1223 ### Motivation Support ZeroQueueConsumer, refer to Java [ZeroQueueConsumerImpl](https://github.com/apache/pulsar/blob/8c50a6c2e91c81dbf187ce5e66cb39e2758a741e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L42) ### Modifications - The consumer add a new optional parameter `EnableZeroQueueConsumer` - Add a new `zeroQueueConsumer`
1 parent a6e28dc commit aa090be

File tree

5 files changed

+860
-46
lines changed

5 files changed

+860
-46
lines changed

pulsar/consumer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ type ConsumerOptions struct {
160160
// Default value is `1000` messages and should be good for most use cases.
161161
ReceiverQueueSize int
162162

163+
// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.
164+
// Notice: only non-partitioned topic is supported.
165+
// Default is false.
166+
EnableZeroQueueConsumer bool
167+
163168
// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
164169
// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
165170
// receive queue can be scaled.

pulsar/consumer_impl.go

Lines changed: 69 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ import (
2222
"fmt"
2323
"math/rand"
2424
"strconv"
25+
"strings"
2526
"sync"
2627
"time"
2728

29+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
30+
2831
"github.com/apache/pulsar-client-go/pulsar/crypto"
2932
"github.com/apache/pulsar-client-go/pulsar/internal"
3033
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -81,6 +84,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
8184
options.ReceiverQueueSize = defaultReceiverQueueSize
8285
}
8386

87+
if options.EnableZeroQueueConsumer {
88+
options.ReceiverQueueSize = 0
89+
}
90+
8491
if options.Interceptors == nil {
8592
options.Interceptors = defaultConsumerInterceptors
8693
}
@@ -236,7 +243,24 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
236243
}
237244

238245
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
239-
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
246+
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (Consumer, error) {
247+
partitions, err := client.TopicPartitions(topic)
248+
if err != nil {
249+
return nil, err
250+
}
251+
252+
if len(partitions) > 1 && options.EnableZeroQueueConsumer {
253+
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
254+
}
255+
256+
if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
257+
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
258+
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
259+
}
260+
261+
if len(partitions) == 1 && options.EnableZeroQueueConsumer {
262+
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
263+
}
240264

241265
consumer := &consumer{
242266
topic: topic,
@@ -253,7 +277,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
253277
metrics: client.metrics.GetLeveledMetrics(topic),
254278
}
255279

256-
err := consumer.internalTopicSubscribeToPartitions()
280+
err = consumer.internalTopicSubscribeToPartitions()
257281
if err != nil {
258282
return nil, err
259283
}
@@ -343,10 +367,6 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
343367
consumer *partitionConsumer
344368
}
345369

346-
receiverQueueSize := c.options.ReceiverQueueSize
347-
metadata := c.options.Properties
348-
subProperties := c.options.SubscriptionProperties
349-
350370
startPartition := oldNumPartitions
351371
partitionsToAdd := newNumPartitions - oldNumPartitions
352372

@@ -364,45 +384,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
364384

365385
go func(idx int, pt string) {
366386
defer wg.Done()
367-
368-
var nackRedeliveryDelay time.Duration
369-
if c.options.NackRedeliveryDelay == 0 {
370-
nackRedeliveryDelay = defaultNackRedeliveryDelay
371-
} else {
372-
nackRedeliveryDelay = c.options.NackRedeliveryDelay
373-
}
374-
opts := &partitionConsumerOpts{
375-
topic: pt,
376-
consumerName: c.consumerName,
377-
subscription: c.options.SubscriptionName,
378-
subscriptionType: c.options.Type,
379-
subscriptionInitPos: c.options.SubscriptionInitialPosition,
380-
partitionIdx: idx,
381-
receiverQueueSize: receiverQueueSize,
382-
nackRedeliveryDelay: nackRedeliveryDelay,
383-
nackBackoffPolicy: c.options.NackBackoffPolicy,
384-
metadata: metadata,
385-
subProperties: subProperties,
386-
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
387-
startMessageID: c.options.startMessageID,
388-
startMessageIDInclusive: c.options.StartMessageIDInclusive,
389-
subscriptionMode: c.options.SubscriptionMode,
390-
readCompacted: c.options.ReadCompacted,
391-
interceptors: c.options.Interceptors,
392-
maxReconnectToBroker: c.options.MaxReconnectToBroker,
393-
backoffPolicy: c.options.BackoffPolicy,
394-
keySharedPolicy: c.options.KeySharedPolicy,
395-
schema: c.options.Schema,
396-
decryption: c.options.Decryption,
397-
ackWithResponse: c.options.AckWithResponse,
398-
maxPendingChunkedMessage: c.options.MaxPendingChunkedMessage,
399-
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
400-
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk,
401-
consumerEventListener: c.options.EventListener,
402-
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
403-
ackGroupingOptions: c.options.AckGroupingOptions,
404-
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
405-
}
387+
opts := newPartitionConsumerOpts(pt, c.consumerName, idx, c.options)
406388
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
407389
ch <- ConsumerError{
408390
err: err,
@@ -444,6 +426,48 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
444426
return nil
445427
}
446428

429+
func newPartitionConsumerOpts(topic, consumerName string, idx int, options ConsumerOptions) *partitionConsumerOpts {
430+
431+
var nackRedeliveryDelay time.Duration
432+
if options.NackRedeliveryDelay == 0 {
433+
nackRedeliveryDelay = defaultNackRedeliveryDelay
434+
} else {
435+
nackRedeliveryDelay = options.NackRedeliveryDelay
436+
}
437+
return &partitionConsumerOpts{
438+
topic: topic,
439+
consumerName: consumerName,
440+
subscription: options.SubscriptionName,
441+
subscriptionType: options.Type,
442+
subscriptionInitPos: options.SubscriptionInitialPosition,
443+
partitionIdx: idx,
444+
receiverQueueSize: options.ReceiverQueueSize,
445+
nackRedeliveryDelay: nackRedeliveryDelay,
446+
nackBackoffPolicy: options.NackBackoffPolicy,
447+
metadata: options.Properties,
448+
subProperties: options.SubscriptionProperties,
449+
replicateSubscriptionState: options.ReplicateSubscriptionState,
450+
startMessageID: options.startMessageID,
451+
startMessageIDInclusive: options.StartMessageIDInclusive,
452+
subscriptionMode: options.SubscriptionMode,
453+
readCompacted: options.ReadCompacted,
454+
interceptors: options.Interceptors,
455+
maxReconnectToBroker: options.MaxReconnectToBroker,
456+
backoffPolicy: options.BackoffPolicy,
457+
keySharedPolicy: options.KeySharedPolicy,
458+
schema: options.Schema,
459+
decryption: options.Decryption,
460+
ackWithResponse: options.AckWithResponse,
461+
maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
462+
expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
463+
autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
464+
consumerEventListener: options.EventListener,
465+
enableBatchIndexAck: options.EnableBatchIndexAcknowledgment,
466+
ackGroupingOptions: options.AckGroupingOptions,
467+
autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize,
468+
}
469+
}
470+
447471
func (c *consumer) Subscription() string {
448472
return c.options.SubscriptionName
449473
}

0 commit comments

Comments
 (0)