Skip to content

Commit 55dd789

Browse files
jiazhaituteng
authored andcommitted
client: make SubscriptionMode a member of ConsumerConfigurationData (#6337)
Currently, SubscriptionMode is a parameter to create ConsumerImpl, but it is not exported out, and user could not set this value for consumer. This change tries to make SubscriptionMode a member of ConsumerConfigurationData, so user could set this parameter when create consumer. (cherry picked from commit 208af7c)
1 parent dffe575 commit 55dd789

File tree

11 files changed

+101
-40
lines changed

11 files changed

+101
-40
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
117117
TopicName.getPartitionIndex(conf.getSingleTopic()),
118118
false,
119119
consumerFuture,
120-
SubscriptionMode.Durable,
121120
MessageId.earliest,
122121
0 /* startMessageRollbackDurationInSec */,
123122
Schema.BYTES, null,

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
228228
*/
229229
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
230230

231+
/**
232+
* Select the subscription mode to be used when subscribing to the topic.
233+
*
234+
* <p>Options are:
235+
* <ul>
236+
* <li>{@link SubscriptionMode#Durable} (Default)</li>
237+
* <li>{@link SubscriptionMode#NonDurable}</li>
238+
* </ul>
239+
*
240+
* @param subscriptionMode
241+
* the subscription mode value
242+
* @return the consumer builder instance
243+
*/
244+
ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);
245+
231246
/**
232247
* Sets a {@link MessageListener} for the consumer
233248
*
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
/**
22+
* Types of subscription mode supported by Pulsar.
23+
*/
24+
public enum SubscriptionMode {
25+
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
26+
// position
27+
Durable,
28+
29+
// Lightweight subscription mode that doesn't have a durable cursor associated
30+
NonDurable
31+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
4848
import org.apache.pulsar.client.api.Schema;
4949
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
50+
import org.apache.pulsar.client.api.SubscriptionMode;
5051
import org.apache.pulsar.client.api.SubscriptionType;
5152
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
5253
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -191,6 +192,13 @@ public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType subscriptio
191192
return this;
192193
}
193194

195+
@Override
196+
public ConsumerBuilder<T> subscriptionMode(@NonNull SubscriptionMode subscriptionMode) {
197+
conf.setSubscriptionMode(subscriptionMode);
198+
return this;
199+
}
200+
201+
194202
@Override
195203
public ConsumerBuilder<T> messageListener(@NonNull MessageListener<T> messageListener) {
196204
conf.setMessageListener(messageListener);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.pulsar.client.api.PulsarClientException;
6464
import org.apache.pulsar.client.api.Schema;
6565
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
66+
import org.apache.pulsar.client.api.SubscriptionMode;
6667
import org.apache.pulsar.client.api.SubscriptionType;
6768
import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
6869
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -150,39 +151,38 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
150151

151152
private final boolean createTopicIfDoesNotExist;
152153

153-
enum SubscriptionMode {
154-
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
155-
// position
156-
Durable,
157154

158-
// Lightweight subscription mode that doesn't have a durable cursor associated
159-
NonDurable
160-
}
161-
162-
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
163-
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
164-
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
165-
boolean createTopicIfDoesNotExist) {
155+
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
156+
String topic,
157+
ConsumerConfigurationData<T> conf,
158+
ExecutorService listenerExecutor,
159+
int partitionIndex,
160+
boolean hasParentConsumer,
161+
CompletableFuture<Consumer<T>> subscribeFuture,
162+
MessageId startMessageId,
163+
Schema<T> schema,
164+
ConsumerInterceptors<T> interceptors,
165+
boolean createTopicIfDoesNotExist) {
166166
if (conf.getReceiverQueueSize() == 0) {
167167
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
168168
subscribeFuture,
169-
subscriptionMode, startMessageId, schema, interceptors,
169+
startMessageId, schema, interceptors,
170170
createTopicIfDoesNotExist);
171171
} else {
172172
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
173-
subscribeFuture, subscriptionMode, startMessageId, 0 /* rollback time in sec to start msgId */,
173+
subscribeFuture, startMessageId, 0 /* rollback time in sec to start msgId */,
174174
schema, interceptors, createTopicIfDoesNotExist);
175175
}
176176
}
177177

178178
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
179179
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer,
180-
CompletableFuture<Consumer<T>> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId,
180+
CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
181181
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
182182
boolean createTopicIfDoesNotExist) {
183183
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
184184
this.consumerId = client.newConsumerId();
185-
this.subscriptionMode = subscriptionMode;
185+
this.subscriptionMode = conf.getSubscriptionMode();
186186
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
187187
this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId;
188188
this.initialStartMessageId = this.startMessageId;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
6060
import org.apache.pulsar.client.api.Schema;
6161
import org.apache.pulsar.client.api.SubscriptionType;
62-
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
6362
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
6463
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
6564
import org.apache.pulsar.client.util.ConsumerName;
@@ -833,10 +832,10 @@ private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult,
833832
String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
834833
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
835834
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
836-
configurationData, client.externalExecutorProvider().getExecutor(),
837-
partitionIndex, true, subFuture,
838-
SubscriptionMode.Durable, null, schema, interceptors,
839-
createIfDoesNotExist);
835+
configurationData, client.externalExecutorProvider().getExecutor(),
836+
partitionIndex, true, subFuture,
837+
null, schema, interceptors,
838+
createIfDoesNotExist);
840839
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
841840
return subFuture;
842841
})
@@ -847,7 +846,7 @@ private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult,
847846

848847
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
849848
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
850-
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
849+
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, null,
851850
schema, interceptors,
852851
createIfDoesNotExist);
853852
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
@@ -1118,7 +1117,7 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
11181117
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
11191118
client, partitionName, configurationData,
11201119
client.externalExecutorProvider().getExecutor(),
1121-
partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
1120+
partitionIndex, true, subFuture, null, schema, interceptors,
11221121
true /* createTopicIfDoesNotExist */);
11231122
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
11241123
if (log.isDebugEnabled()) {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
6565
import org.apache.pulsar.client.api.AuthenticationFactory;
6666
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
67-
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
6867
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
6968
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
7069
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -354,7 +353,7 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
354353
} else {
355354
int partitionIndex = TopicName.getPartitionIndex(topic);
356355
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
357-
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
356+
consumerSubscribedFuture,null, schema, interceptors,
358357
true /* createTopicIfDoesNotExist */);
359358
}
360359

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.commons.codec.digest.DigestUtils;
2727
import org.apache.commons.lang3.StringUtils;
2828
import org.apache.pulsar.client.api.*;
29-
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
3029
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
3130
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
3231
import org.apache.pulsar.common.naming.TopicName;
@@ -47,6 +46,7 @@ public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConf
4746
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
4847
consumerConfiguration.setSubscriptionName(subscription);
4948
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
49+
consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
5050
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
5151
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
5252

@@ -83,7 +83,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
8383

8484
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
8585
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration,
86-
listenerExecutor, partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable,
86+
listenerExecutor, partitionIdx, false, consumerFuture,
8787
readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec(),
8888
schema, null, true /* createTopicIfDoesNotExist */);
8989
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
4949

5050
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
5151
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
52-
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
52+
MessageId startMessageId, Schema<T> schema,
5353
ConsumerInterceptors<T> interceptors,
5454
boolean createTopicIfDoesNotExist) {
5555
super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
56-
subscriptionMode, startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
56+
startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
5757
createTopicIfDoesNotExist);
5858
}
5959

pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.pulsar.client.api.MessageListener;
4444
import org.apache.pulsar.client.api.RegexSubscriptionMode;
4545
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
46+
import org.apache.pulsar.client.api.SubscriptionMode;
4647
import org.apache.pulsar.client.api.SubscriptionType;
4748

4849
@Data
@@ -59,6 +60,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
5960

6061
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
6162

63+
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
64+
6265
@JsonIgnore
6366
private MessageListener<T> messageListener;
6467

0 commit comments

Comments
 (0)