Skip to content

Commit 235492d

Browse files
racornhuangdx0726
authored andcommitted
Fix memory leak when running topic compaction. (apache#6485)
Fixes apache#6482 ### Motivation Prevent topic compaction from leaking direct memory ### Modifications Several leaks were discovered using Netty leak detection and code review. * `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of `LedgerEntry`, but did not release the underlying buffers. Fix: iterate though the `Enumeration` and release underlying buffer. Instead of logging the case where the `Enumeration` did not contain any elements, complete the future exceptionally with the message (will be logged by Caffeine). * Two main sources of leak in `TwoPhaseCompactor`. The `RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf` (uncompressedPayload). Also, the return ByteBuf of `RawBacthConverter.rebatchMessage` was not closed. The first one was easy to fix (release buffer), to fix the second one and make the code easier to read, I decided to not let `RawBacthConverter.rebatchMessage` close the message read from the topic, instead the message read from the topic can be closed in a try/finally clause surrounding most of the method body handing a message from a topic (in phase two loop). Then if a new message was produced by `RawBacthConverter.rebatchMessage` we check that after we have added the message to the compact ledger and release the message. ### Verifying this change Modified `RawReaderTest.testBatchingRebatch` to show new contract. One can run the test described to reproduce the issue, to verify no leak is detected.
1 parent 2216770 commit 235492d

File tree

4 files changed

+82
-65
lines changed

4 files changed

+82
-65
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ public static List<ImmutablePair<MessageId,String>> extractIdsAndKeys(RawMessage
9191
* Take a batched message and a filter, and returns a message with the only the sub-messages
9292
* which match the filter. Returns an empty optional if no messages match.
9393
*
94-
* This takes ownership of the passes in message, and if the returned optional is not empty,
95-
* the ownership of that message is returned also.
94+
* NOTE: this message does not alter the reference count of the RawMessage argument.
9695
*/
9796
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
9897
BiPredicate<String, MessageId> filter)
@@ -161,9 +160,9 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
161160
return Optional.empty();
162161
}
163162
} finally {
163+
uncompressedPayload.release();
164164
batchBuffer.release();
165165
metadata.recycle();
166-
msg.close();
167166
}
168167
}
169168
}

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,19 @@ private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh
164164
if (rc != BKException.Code.OK) {
165165
promise.completeExceptionally(BKException.create(rc));
166166
} else {
167-
try (RawMessage m = RawMessageImpl.deserializeFrom(
168-
seq.nextElement().getEntryBuffer())) {
169-
promise.complete(m.getMessageIdData());
170-
} catch (NoSuchElementException e) {
171-
log.error("No such entry {} in ledger {}", entryId, lh.getId());
172-
promise.completeExceptionally(e);
167+
// Need to release buffers for all entries in the sequence
168+
if (seq.hasMoreElements()) {
169+
LedgerEntry entry = seq.nextElement();
170+
try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
171+
entry.getEntryBuffer().release();
172+
while (seq.hasMoreElements()) {
173+
seq.nextElement().getEntryBuffer().release();
174+
}
175+
promise.complete(m.getMessageIdData());
176+
}
177+
} else {
178+
promise.completeExceptionally(new NoSuchElementException(
179+
String.format("No such entry %d in ledger %d", entryId, lh.getId())));
173180
}
174181
}
175182
}, null);

pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -212,77 +212,88 @@ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId
212212

213213
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
214214
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
215+
if (promise.isDone()) {
216+
return;
217+
}
215218
reader.readNextAsync().whenCompleteAsync(
216219
(m, exception) -> {
217220
if (exception != null) {
218221
promise.completeExceptionally(exception);
219222
return;
220223
} else if (promise.isDone()) {
224+
m.close();
221225
return;
222226
}
223-
MessageId id = m.getMessageId();
224-
Optional<RawMessage> messageToAdd = Optional.empty();
225-
if (RawBatchConverter.isReadableBatch(m)) {
226-
try {
227-
messageToAdd = RawBatchConverter.rebatchMessage(
228-
m, (key, subid) -> latestForKey.get(key).equals(subid));
229-
} catch (IOException ioe) {
230-
log.info("Error decoding batch for message {}. Whole batch will be included in output",
231-
id, ioe);
232-
messageToAdd = Optional.of(m);
233-
}
234-
} else {
235-
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
236-
MessageId msg;
237-
if (keyAndSize == null) { // pass through messages without a key
238-
messageToAdd = Optional.of(m);
239-
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
240-
&& msg.equals(id)) { // consider message only if present into latestForKey map
241-
if (keyAndSize.getRight() <= 0) {
242-
promise.completeExceptionally(new IllegalArgumentException(
243-
"Compaction phase found empty record from sorted key-map"));
227+
try {
228+
MessageId id = m.getMessageId();
229+
Optional<RawMessage> messageToAdd = Optional.empty();
230+
if (RawBatchConverter.isReadableBatch(m)) {
231+
try {
232+
messageToAdd = RawBatchConverter.rebatchMessage(
233+
m, (key, subid) -> latestForKey.get(key).equals(subid));
234+
} catch (IOException ioe) {
235+
log.info("Error decoding batch for message {}. Whole batch will be included in output",
236+
id, ioe);
237+
messageToAdd = Optional.of(m);
244238
}
245-
messageToAdd = Optional.of(m);
246239
} else {
247-
m.close();
240+
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
241+
MessageId msg;
242+
if (keyAndSize == null) { // pass through messages without a key
243+
messageToAdd = Optional.of(m);
244+
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
245+
&& msg.equals(id)) { // consider message only if present into latestForKey map
246+
if (keyAndSize.getRight() <= 0) {
247+
promise.completeExceptionally(new IllegalArgumentException(
248+
"Compaction phase found empty record from sorted key-map"));
249+
}
250+
messageToAdd = Optional.of(m);
251+
}
248252
}
249-
}
250253

251-
if (messageToAdd.isPresent()) {
252-
try {
253-
outstanding.acquire();
254-
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
255-
.whenComplete((res, exception2) -> {
256-
outstanding.release();
257-
if (exception2 != null) {
258-
promise.completeExceptionally(exception2);
254+
if (messageToAdd.isPresent()) {
255+
RawMessage message = messageToAdd.get();
256+
try {
257+
outstanding.acquire();
258+
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
259+
.whenComplete((res, exception2) -> {
260+
outstanding.release();
261+
if (exception2 != null) {
262+
promise.completeExceptionally(exception2);
263+
}
264+
});
265+
if (to.equals(id)) {
266+
addFuture.whenComplete((res, exception2) -> {
267+
if (exception2 == null) {
268+
promise.complete(null);
259269
}
260270
});
261-
if (to.equals(id)) {
262-
addFuture.whenComplete((res, exception2) -> {
263-
if (exception2 == null) {
264-
promise.complete(null);
265-
}
266-
});
271+
}
272+
} catch (InterruptedException ie) {
273+
Thread.currentThread().interrupt();
274+
promise.completeExceptionally(ie);
275+
} finally {
276+
if (message != m) {
277+
message.close();
278+
}
267279
}
268-
} catch (InterruptedException ie) {
269-
Thread.currentThread().interrupt();
270-
promise.completeExceptionally(ie);
271-
}
272-
} else if (to.equals(id)) {
273-
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
274-
// present under latestForKey. Complete the compaction.
275-
try {
276-
// make sure all inflight writes have finished
277-
outstanding.acquire(MAX_OUTSTANDING);
278-
promise.complete(null);
279-
} catch (InterruptedException e) {
280-
Thread.currentThread().interrupt();
281-
promise.completeExceptionally(e);
280+
} else if (to.equals(id)) {
281+
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so,
282+
// not present under latestForKey. Complete the compaction.
283+
try {
284+
// make sure all inflight writes have finished
285+
outstanding.acquire(MAX_OUTSTANDING);
286+
promise.complete(null);
287+
} catch (InterruptedException e) {
288+
Thread.currentThread().interrupt();
289+
promise.completeExceptionally(e);
290+
}
291+
return;
282292
}
283-
return;
293+
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
294+
} finally {
295+
m.close();
284296
}
285-
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
286297
}, scheduler);
287298
}
288299

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,13 @@ public void testBatchingRebatch() throws Exception {
319319
}
320320

321321
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
322-
try {
323-
RawMessage m1 = reader.readNextAsync().get();
322+
try (RawMessage m1 = reader.readNextAsync().get()) {
324323
RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
325324
List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
326325
Assert.assertEquals(idsAndKeys.size(), 1);
327326
Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
328327
m2.close();
328+
Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
329329
} finally {
330330
reader.closeAsync().get();
331331
}

0 commit comments

Comments
 (0)