Skip to content

Commit 9927591

Browse files
Fix bug when send multi messages in pulsar sender (#277)
- Fix bug when send multi messages in pulsar sender - Add corresponding test
1 parent 80676b9 commit 9927591

File tree

2 files changed

+59
-30
lines changed

2 files changed

+59
-30
lines changed

pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -245,28 +245,27 @@ public Builder messageProps(Map<String, Object> messagePropsMap) {
245245

246246
void sender(byte[] message) {
247247
if (closeCalled) throw new ClosedSenderException();
248-
sendMessage(message);
248+
try {
249+
get().newMessage()
250+
.value(message)
251+
.loadConf(messageProps)
252+
.sendAsync();
253+
} catch (Exception e) {
254+
cleanup();
255+
throw new RuntimeException("Pulsar producer send message failed." + e.getMessage(), e);
256+
}
249257
}
250258

251-
void sendMessage(byte[] message) {
259+
Producer<byte[]> get() {
252260
if (client == null) {
253261
synchronized (this) {
254262
if (client == null) {
255263
client = createClient();
256264
producer = createProducer(client);
257-
258-
try {
259-
producer.newMessage()
260-
.value(message)
261-
.loadConf(messageProps)
262-
.sendAsync();
263-
} catch (Exception e) {
264-
cleanup();
265-
throw new RuntimeException("Pulsar producer send failed." + e.getMessage(), e);
266-
}
267265
}
268266
}
269267
}
268+
return producer;
270269
}
271270

272271
Producer<byte[]> createProducer(PulsarClient client) {

pulsar-client/src/test/java/zipkin2/reporter/pulsar/ITPulsarSender.java

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package zipkin2.reporter.pulsar;
66

77
import org.apache.pulsar.client.api.Consumer;
8+
import org.apache.pulsar.client.api.Message;
89
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
910
import org.junit.jupiter.api.BeforeEach;
1011
import org.junit.jupiter.api.Tag;
@@ -97,6 +98,28 @@ class ITPulsarSender {
9798
}
9899
}
99100

101+
@Test void send_multiple_JSON_messages() throws Exception {
102+
try (PulsarSender sender = pulsar.newSenderBuilder(testName)
103+
.encoding(Encoding.JSON)
104+
.build()) {
105+
int size = 10;
106+
for (int i = 0; i < size; i++) {
107+
send(sender, CLIENT_SPAN);
108+
}
109+
for (int i = 0; i < size; i++) {
110+
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender)))
111+
.hasSize(1).containsExactly(CLIENT_SPAN);
112+
}
113+
114+
send(sender, CLIENT_SPAN);
115+
send(sender, CLIENT_SPAN, CLIENT_SPAN);
116+
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender)))
117+
.hasSize(1).containsExactly(CLIENT_SPAN);
118+
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender)))
119+
.hasSize(2).containsExactly(CLIENT_SPAN, CLIENT_SPAN);
120+
}
121+
}
122+
100123
@Test void illegalToSendWhenClosed() throws IOException {
101124
try (PulsarSender sender = pulsar.newSenderBuilder(testName).build()) {
102125
sender.close();
@@ -144,27 +167,34 @@ byte[] readMessage(PulsarSender sender) throws Exception {
144167
final CountDownLatch countDown = new CountDownLatch(1);
145168
final AtomicReference<byte[]> result = new AtomicReference<>();
146169

147-
try (Consumer<byte[]> ignored = sender.client.newConsumer()
170+
Consumer<byte[]> consumer = null;
171+
Message<byte[]> message = null;
172+
try {
173+
consumer = sender.client.newConsumer()
148174
.topic(sender.topic)
149175
.subscriptionName("zipkin-subscription")
150176
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
151-
.messageListener((consumer, message) -> {
152-
try {
153-
result.set(message.getData());
154-
countDown.countDown();
155-
consumer.acknowledge(message);
156-
} catch (Exception e) {
157-
consumer.negativeAcknowledge(message);
158-
}
159-
}).subscribe()) {
160-
161-
assertThat(countDown.await(10, TimeUnit.SECONDS))
162-
.withFailMessage("Timed out waiting to read message.")
163-
.isTrue();
164-
assertThat(result)
165-
.withFailMessage("Message data is null in Pulsar consumer.")
166-
.isNotNull();
167-
return result.get();
177+
.subscribe();
178+
message = consumer.receive(10, TimeUnit.SECONDS);
179+
result.set(message.getData());
180+
countDown.countDown();
181+
consumer.acknowledge(message);
182+
} catch (Exception e) {
183+
if (consumer != null) {
184+
consumer.negativeAcknowledge(message);
185+
}
186+
} finally {
187+
if (consumer != null) {
188+
consumer.close();
189+
}
168190
}
191+
192+
assertThat(countDown.await(10, TimeUnit.SECONDS))
193+
.withFailMessage("Timed out waiting to read message.")
194+
.isTrue();
195+
assertThat(result)
196+
.withFailMessage("Message data is null in Pulsar consumer.")
197+
.isNotNull();
198+
return result.get();
169199
}
170200
}

0 commit comments

Comments
 (0)