-
Notifications
You must be signed in to change notification settings - Fork 369
Description
Expected behavior
When chunking is enabled, a golang consumer client code sometimes remove chunked msg context from it data through the below logic:
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
lastChunkedMsgID := -1
totalChunks := -1
if ctx != nil {
lastChunkedMsgID = int(ctx.lastChunkedMsgID)
totalChunks = int(ctx.totalChunks)
ctx.chunkedMsgBuffer.Clear()
}
pc.log.Warnf(fmt.Sprintf(
"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
pc.chunkedMsgCtxMap.remove(uuid)
pc.availablePermits.inc()
return nil
}
If pulsar-client-go is removing such chunked message ctx's then there should be some information back to the broker for re-attempting this message again.
Actual behavior
When a consumer client discards this message, broker never retries to send the same message again and the msgbacklog ouput (through cli)
./pulsar-admin topics partitioned-stats persistent://dummy/my_topic/my_namespace | grep msgBacklog being non-zero. Sometimes 5, 10 or even 1 message
Steps to reproduce
Enable producer and consumer side chunking with the following configs
Backend Consumer config
consumer:
topic: "my_topic"
namespace: "my_namespace"
subscriptionName: "my_subscription"
subscriptionType: "key_shared"
receiverQueueSize: 1000
enableAutoScaledReceiverQueueSizeLimit: "true"
nackRedeliveryDelay: 30s
maxPendingChunkMessages: 100
expiryTimeOfIncompleteChunkSec: 120
autoAckIncompleteChunkEnabled: "false"
Backend Producer config
producer:
topic: "my_topic"
pulsarClientIsolationEnabled: false
enabledChunking: "true"
compressionType: "lz4"
compressionLevel: "faster"
sendTimeout: 60s
disableBlockIfQueFull: true
maxPendingMessages: 1000
hashingScheme: "murmur3_32_hash"
sendAsync: true
Broker configuration
broker:
image: apachepulsar/pulsar:latest
container_name: broker
hostname: broker
restart: no
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker
- advertisedListeners=external:pulsar://broker:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=2048m
- maxMessageSize=1242880
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8080/admin/v2/clusters" ]
interval: 5s
timeout: 10s
retries: 5
Ingestion payload size is 15MB.
Send this request for 300 seconds and the unexpected chunk message will come within 10-15 seconds on local machine.
Note: Currently, we are not hitting go c.discardChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk) and go c.discardOldestChunkMessage(c.pc.options.autoAckIncompleteChunk) condition for discard. The potential problem lies over there also as no such mechanism for broker to retry is made.
System configuration
Pulsar version: 3.3.2 for broker and 0.15.1 for pulsar-client-go
Can, we get some help here, as majority of our pipelines are currently impacted with this flow.