Skip to content

Commit 67c3d53

Browse files
authored
Fix sending buffer race by using proper reference counting (#1394)
Fixes #1371 ### Motivation The issue is that the buffer is sent to the connection when the data is flushed at this line: https://github.com/apache/pulsar-client-go/blob/0ab28c229e4dab2adb505a135325b6ede6e0e4f4/pulsar/producer_partition.go#L910 Because of this, both the data request in the connection and the pending item hold onto this buffer. However, if the pending item fails due to a timeout or the producer closing, the buffer is returned to the buffer pool at this line: https://github.com/apache/pulsar-client-go/blob/0ab28c229e4dab2adb505a135325b6ede6e0e4f4/pulsar/producer_partition.go#L1747 Yet, the connection still has this buffer. When the connection tries to handle the data request, it will try to use a buffer that has already been returned. This causes a data race. This fix implements proper reference counting to ensure buffers are only returned to the pool when all references are released, preventing the data race. ### Modifications - Added `Retain()` and `Release()` methods to the `Buffer` interface - Implemented atomic reference counting using `atomic.Int64` - Buffers are only returned to the pool when reference count reaches zero - Add new metric `pulsar_client_sending_buffers_count` to show the sending buffers count. - This PR also fixes serval minor issues like data/request cleanup when closing the connection
1 parent 9440f18 commit 67c3d53

File tree

8 files changed

+182
-39
lines changed

8 files changed

+182
-39
lines changed

pulsar/consumer_multitopic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ func (dummyConnection) ID() string {
354354
}
355355

356356
func (dummyConnection) GetMaxMessageSize() int32 {
357-
return 0
357+
return 5 * 1024 * 1024 // 5MB
358358
}
359359

360360
func (dummyConnection) Close() {

pulsar/internal/batch_builder.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ import (
2929
"github.com/apache/pulsar-client-go/pulsar/log"
3030
)
3131

32-
type BuffersPool interface {
33-
GetBuffer() Buffer
34-
}
35-
3632
// BatcherBuilderProvider defines func which returns the BatchBuilder.
3733
type BatcherBuilderProvider func(
3834
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
@@ -269,10 +265,7 @@ func (bc *batchContainer) Flush() *FlushBatch {
269265
uncompressedSize := bc.buffer.ReadableBytes()
270266
bc.msgMetadata.UncompressedSize = &uncompressedSize
271267

272-
buffer := bc.buffersPool.GetBuffer()
273-
if buffer == nil {
274-
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
275-
}
268+
buffer := bc.buffersPool.GetBuffer(int(uncompressedSize * 3 / 2))
276269

277270
sequenceID := uint64(0)
278271
var err error

pulsar/internal/buffer.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@ package internal
1919

2020
import (
2121
"encoding/binary"
22+
"sync"
23+
"sync/atomic"
2224

2325
log "github.com/sirupsen/logrus"
2426
)
2527

28+
type BuffersPool interface {
29+
GetBuffer(initSize int) Buffer
30+
Put(buf Buffer)
31+
}
32+
2633
// Buffer is a variable-sized buffer of bytes with Read and Write methods.
2734
// The zero value for Buffer is an empty buffer ready to use.
2835
type Buffer interface {
@@ -69,15 +76,56 @@ type Buffer interface {
6976
Resize(newSize uint32)
7077
ResizeIfNeeded(spaceNeeded uint32)
7178

79+
Retain()
80+
Release()
81+
RefCnt() int64
82+
7283
// Clear will clear the current buffer data.
7384
Clear()
7485
}
7586

87+
type bufferPoolImpl struct {
88+
sync.Pool
89+
}
90+
91+
func NewBufferPool() BuffersPool {
92+
return &bufferPoolImpl{
93+
Pool: sync.Pool{},
94+
}
95+
}
96+
97+
func (p *bufferPoolImpl) GetBuffer(initSize int) Buffer {
98+
sendingBuffersCount.Inc()
99+
b, ok := p.Get().(*buffer)
100+
if ok {
101+
b.Clear()
102+
} else {
103+
b = &buffer{
104+
data: make([]byte, initSize),
105+
readerIdx: 0,
106+
writerIdx: 0,
107+
}
108+
}
109+
b.pool = p
110+
b.Retain()
111+
return b
112+
}
113+
114+
func (p *bufferPoolImpl) Put(buf Buffer) {
115+
sendingBuffersCount.Dec()
116+
if b, ok := buf.(*buffer); ok {
117+
p.Pool.Put(b)
118+
}
119+
}
120+
76121
type buffer struct {
77122
data []byte
78123

79124
readerIdx uint32
80125
writerIdx uint32
126+
127+
refCnt atomic.Int64
128+
pool BuffersPool
81129
}
82130

83131
// NewBuffer creates and initializes a new Buffer using buf as its initial contents.
@@ -213,7 +261,24 @@ func (b *buffer) Put(writerIdx uint32, s []byte) {
213261
copy(b.data[writerIdx:], s)
214262
}
215263

264+
func (b *buffer) Retain() {
265+
b.refCnt.Add(1)
266+
}
267+
268+
func (b *buffer) Release() {
269+
if b.refCnt.Add(-1) == 0 {
270+
if b.pool != nil {
271+
b.pool.Put(b)
272+
}
273+
}
274+
}
275+
276+
func (b *buffer) RefCnt() int64 {
277+
return b.refCnt.Load()
278+
}
279+
216280
func (b *buffer) Clear() {
217281
b.readerIdx = 0
218282
b.writerIdx = 0
283+
b.refCnt.Store(0)
219284
}

pulsar/internal/connection.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,13 +385,20 @@ func (c *connection) failLeftRequestsWhenClose() {
385385
// if other requests come in after the nil message
386386
// then the RPC client will time out
387387
ch <- nil
388+
c.writeRequestsCh <- nil
388389
}()
389390
for req := range ch {
390391
if nil == req {
391392
break // we have drained the requests
392393
}
393394
c.internalSendRequest(req)
394395
}
396+
for req := range c.writeRequestsCh {
397+
if nil == req {
398+
break
399+
}
400+
req.data.Release()
401+
}
395402
}
396403

397404
func (c *connection) run() {
@@ -432,6 +439,7 @@ func (c *connection) run() {
432439
return
433440
}
434441
c.internalWriteData(req.ctx, req.data)
442+
req.data.Release()
435443

436444
case <-pingSendTicker.C:
437445
c.sendPing()
@@ -457,13 +465,23 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
457465
}
458466

459467
func (c *connection) WriteData(ctx context.Context, data Buffer) {
468+
writeToQueue := false
469+
defer func() {
470+
if !writeToQueue {
471+
data.Release()
472+
}
473+
}()
460474
select {
461475
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
462476
// Channel is not full
477+
writeToQueue = true
463478
return
464479
case <-ctx.Done():
465480
c.log.Debug("Write data context cancelled")
466481
return
482+
case <-c.closeCh:
483+
c.log.Debug("Write data connection closed")
484+
return
467485
default:
468486
// Channel full, fallback to probe if connection is closed
469487
}
@@ -472,10 +490,14 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
472490
select {
473491
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
474492
// Successfully wrote on the channel
493+
writeToQueue = true
475494
return
476495
case <-ctx.Done():
477496
c.log.Debug("Write data context cancelled")
478497
return
498+
case <-c.closeCh:
499+
c.log.Debug("Write data connection closed")
500+
return
479501
case <-time.After(100 * time.Millisecond):
480502
// The channel is either:
481503
// 1. blocked, in which case we need to wait until we have space
@@ -497,6 +519,8 @@ func (c *connection) internalWriteData(ctx context.Context, data Buffer) {
497519
select {
498520
case <-ctx.Done():
499521
return
522+
case <-c.closeCh:
523+
return
500524
default:
501525
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
502526
c.log.WithError(err).Warn("Failed to write on connection")

pulsar/internal/key_based_batch_builder_test.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,6 @@ import (
3030
"google.golang.org/protobuf/proto"
3131
)
3232

33-
type mockBufferPool struct {
34-
}
35-
36-
func (m *mockBufferPool) GetBuffer() Buffer {
37-
return nil
38-
}
39-
4033
type mockEncryptor struct {
4134
}
4235

@@ -53,7 +46,7 @@ func TestKeyBasedBatcherOrdering(t *testing.T) {
5346
1,
5447
pb.CompressionType_NONE,
5548
compression.Level(0),
56-
&mockBufferPool{},
49+
&bufferPoolImpl{},
5750
log.NewLoggerWithLogrus(logrus.StandardLogger()),
5851
&mockEncryptor{},
5952
)

pulsar/internal/metrics.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ import (
2121
"github.com/prometheus/client_golang/prometheus"
2222
)
2323

24+
var (
25+
defaultConstLabels = map[string]string{
26+
"client": "go",
27+
}
28+
29+
sendingBuffersCount = prometheus.NewGauge(prometheus.GaugeOpts{
30+
Name: "pulsar_client_sending_buffers_count",
31+
Help: "Number of sending buffers",
32+
ConstLabels: defaultConstLabels,
33+
})
34+
)
35+
2436
type Metrics struct {
2537
metricsLevel int
2638
messagesPublished *prometheus.CounterVec
@@ -99,8 +111,9 @@ type LeveledMetrics struct {
99111
// NewMetricsProvider returns metrics registered to registerer.
100112
func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string,
101113
registerer prometheus.Registerer) *Metrics {
102-
constLabels := map[string]string{
103-
"client": "go",
114+
constLabels := make(map[string]string)
115+
for k, v := range defaultConstLabels {
116+
constLabels[k] = v
104117
}
105118
for k, v := range userDefinedLabels {
106119
constLabels[k] = v
@@ -535,6 +548,7 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str
535548
metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
536549
}
537550
}
551+
_ = registerer.Register(sendingBuffersCount)
538552
return metrics
539553
}
540554

pulsar/producer_partition.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ var (
6969
ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked")
7070
ErrProducerFenced = newError(ProducerFenced, "producer fenced")
7171

72-
buffersPool sync.Pool
72+
buffersPool internal.BuffersPool
7373
sendRequestPool *sync.Pool
7474
)
7575

@@ -86,6 +86,7 @@ func init() {
8686
return &sendRequest{}
8787
},
8888
}
89+
buffersPool = internal.NewBufferPool()
8990
}
9091

9192
type partitionProducer struct {
@@ -96,7 +97,7 @@ type partitionProducer struct {
9697
topic string
9798
log log.Logger
9899

99-
conn uAtomic.Value
100+
conn atomic.Pointer[internal.Connection]
100101
cnxKeySuffix int32
101102

102103
options *ProducerOptions
@@ -363,7 +364,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
363364
p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
364365
maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
365366
compression.Level(p.options.CompressionLevel),
366-
p,
367+
buffersPool,
367368
p.log,
368369
p.encryptor)
369370
if err != nil {
@@ -393,6 +394,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
393394
pi.Lock()
394395
pi.sentAt = time.Now()
395396
pi.Unlock()
397+
pi.buffer.Retain() // Retain for writing to the connection
396398
p.pendingQueue.Put(pi)
397399
p._getConn().WriteData(pi.ctx, pi.buffer)
398400

@@ -412,14 +414,6 @@ func (cc *connectionClosed) HasURL() bool {
412414
return len(cc.assignedBrokerURL) > 0
413415
}
414416

415-
func (p *partitionProducer) GetBuffer() internal.Buffer {
416-
b, ok := buffersPool.Get().(internal.Buffer)
417-
if ok {
418-
b.Clear()
419-
}
420-
return b
421-
}
422-
423417
func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProducer) {
424418
// Trigger reconnection in the produce goroutine
425419
p.log.WithField("cnx", p._getConn().ID()).Warn("Connection was closed")
@@ -796,10 +790,7 @@ func (p *partitionProducer) internalSingleSend(
796790
payloadBuf := internal.NewBuffer(len(compressedPayload))
797791
payloadBuf.Write(compressedPayload)
798792

799-
buffer := p.GetBuffer()
800-
if buffer == nil {
801-
buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
802-
}
793+
buffer := buffersPool.GetBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
803794

804795
sid := *mm.SequenceId
805796
var useTxn bool
@@ -889,6 +880,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
889880
func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) {
890881
select {
891882
case <-p.ctx.Done():
883+
buffer.Release()
892884
for _, cb := range callbacks {
893885
if sr, ok := cb.(*sendRequest); ok {
894886
sr.done(nil, ErrProducerClosed)
@@ -898,6 +890,7 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
898890
default:
899891
now := time.Now()
900892
ctx, cancel := context.WithCancel(context.Background())
893+
buffer.Retain()
901894
p.pendingQueue.Put(&pendingItem{
902895
ctx: ctx,
903896
cancel: cancel,
@@ -1744,7 +1737,7 @@ func (i *pendingItem) done(err error) {
17441737

17451738
i.isDone = true
17461739
// return the buffer to the pool after all callbacks have been called.
1747-
defer buffersPool.Put(i.buffer)
1740+
defer i.buffer.Release()
17481741
if i.flushCallback != nil {
17491742
i.flushCallback(err)
17501743
}
@@ -1757,16 +1750,16 @@ func (i *pendingItem) done(err error) {
17571750
// _setConn sets the internal connection field of this partition producer atomically.
17581751
// Note: should only be called by this partition producer when a new connection is available.
17591752
func (p *partitionProducer) _setConn(conn internal.Connection) {
1760-
p.conn.Store(conn)
1753+
p.conn.Store(&conn)
17611754
}
17621755

17631756
// _getConn returns internal connection field of this partition producer atomically.
17641757
// Note: should only be called by this partition producer before attempting to use the connection
17651758
func (p *partitionProducer) _getConn() internal.Connection {
1766-
// Invariant: p.conn must be non-nil for the lifetime of the partitionProducer.
1759+
// Invariant: The conn must be non-nil for the lifetime of the partitionProducer.
17671760
// For this reason we leave this cast unchecked and panic() if the
17681761
// invariant is broken
1769-
return p.conn.Load().(internal.Connection)
1762+
return *p.conn.Load()
17701763
}
17711764

17721765
type chunkRecorder struct {

0 commit comments

Comments
 (0)