-
Notifications
You must be signed in to change notification settings - Fork 369
Description
Expected behavior
Expect more logs to help debug what is going on with decompression
Actual behavior
We get the message
{"log":{"consumerID":95,"level":"ERROR",
"msg":"Discarding corrupted message","msgID":{"entryId":1792,"ledgerId":758404,"partition":-1},"name":"","subscription":"reader-czjug","time":"2025-02-12T10:46:13.616639169Z",
"topic":"persistent://XXXX/wav/084b74e6-f4c6-4ff8-9bff-d35370e6a77b",
"validationError":1},"stream":"stdout","timestamp":1739357173616}
which looks like it is coming from here
pulsar-client-go/pulsar/consumer_partition.go
Lines 2172 to 2182 in 4e71a47
| func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, | |
| validationError pb.CommandAck_ValidationError) { | |
| if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { | |
| pc.log.WithField("state", state).Error("Failed to discardCorruptedMessage " + | |
| "by closing or closed consumer") | |
| return | |
| } | |
| pc.log.WithFields(log.Fields{ | |
| "msgID": msgID, | |
| "validationError": validationError, | |
| }).Error("Discarding corrupted message") |
validationError:1 appears to be a decompression error
| CommandAck_DecompressionError CommandAck_ValidationError = 1 |
which means it is coming from here I guess?
pulsar-client-go/pulsar/consumer_partition.go
Lines 1228 to 1232 in 4e71a47
| uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer) | |
| if err != nil { | |
| pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError) | |
| return err | |
| } |
and given no other log lines then it suggests a cause is this
pulsar-client-go/pulsar/consumer_partition.go
Lines 2148 to 2151 in 4e71a47
| uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize())) | |
| if err != nil { | |
| return nil, err | |
| } |
Steps to reproduce
Sorry I don't have reproduction steps but maybe we could add some logging here?
pulsar-client-go/pulsar/consumer_partition.go
Lines 2148 to 2151 in 4e71a47
| uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize())) | |
| if err != nil { | |
| return nil, err | |
| } |
System configuration
Pulsar version: 2.11.3 and 3.3.2 (we upgraded brokers and still seeing same issue)
Pulsar Golang client: 0.14.0