Skip to content

Commit 69a06d5

Browse files
authored
[fix] Only decompress the payload if it's not empty (#1280)
The message payload is optional and in some cases only message properties are sent. In this case, the message decompression would fail so we only want to do the decompression if the payload is not empty.
1 parent 7484c93 commit 69a06d5

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

pulsar/consumer_partition.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,15 +1105,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11051105
}
11061106
}
11071107

1108-
// decryption is success, decompress the payload
1109-
uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, processedPayloadBuffer)
1110-
if err != nil {
1111-
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
1112-
return err
1113-
}
1108+
var uncompressedHeadersAndPayload internal.Buffer
1109+
// decryption is success, decompress the payload, but only if payload is not empty
1110+
if n := msgMeta.UncompressedSize; n != nil && *n > 0 {
1111+
uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer)
1112+
if err != nil {
1113+
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
1114+
return err
1115+
}
11141116

1115-
// Reset the reader on the uncompressed buffer
1116-
reader.ResetBuffer(uncompressedHeadersAndPayload)
1117+
// Reset the reader on the uncompressed buffer
1118+
reader.ResetBuffer(uncompressedHeadersAndPayload)
1119+
}
11171120

11181121
numMsgs := 1
11191122
if msgMeta.NumMessagesInBatch != nil {

0 commit comments

Comments
 (0)