Skip to content

Commit 531ccbb

Browse files
committed
improve: add eventChan to execute any in the event loop
1 parent 1152cfc commit 531ccbb

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

pulsar/producer_partition.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ type partitionProducer struct {
106106

107107
// Channel where app is posting messages to be published
108108
dataChan chan *sendRequest
109-
cmdChan chan interface{}
110-
connectClosedCh chan *connectionClosed
109+
eventChan chan func()
111110
publishSemaphore internal.Semaphore
112111
pendingQueue internal.BlockingQueue
113112
lastSequenceID int64
@@ -167,8 +166,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
167166
options: options,
168167
producerID: client.rpcClient.NewProducerID(),
169168
dataChan: make(chan *sendRequest, maxPendingMessages),
170-
cmdChan: make(chan interface{}, 10),
171-
connectClosedCh: make(chan *connectionClosed, 10),
169+
eventChan: make(chan func()),
172170
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
173171
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
174172
compression.Level(options.CompressionLevel)),
@@ -294,6 +292,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
294292

295293
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
296294
if err != nil {
295+
cnx.UnregisterListener(p.producerID)
297296
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
298297
if errors.Is(err, internal.ErrRequestTimeOut) {
299298
id := p.client.rpcClient.NewRequestID()
@@ -408,9 +407,12 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu
408407
assignedBrokerURL = p.client.selectServiceURL(
409408
closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls())
410409
}
411-
p.connectClosedCh <- &connectionClosed{
412-
assignedBrokerURL: assignedBrokerURL,
413-
}
410+
p.execute(func() {
411+
p.log.Info("runEventsLoop will reconnect in producer")
412+
p.reconnectToBroker(&connectionClosed{
413+
assignedBrokerURL: assignedBrokerURL,
414+
})
415+
})
414416
}
415417

416418
func (p *partitionProducer) SetRedirectedClusterURI(redirectedClusterURI string) {
@@ -545,27 +547,22 @@ func (p *partitionProducer) runEventsLoop() {
545547
return
546548
}
547549
p.internalSend(data)
548-
case cmd, ok := <-p.cmdChan:
549-
// when doClose() is call, p.dataChan will be closed, cmd will be nil
550+
case event, ok := <-p.eventChan:
551+
// when doClose() is call, p.eventChan will be closed, cmd will be nil
550552
if !ok {
551553
return
552554
}
553-
switch v := cmd.(type) {
554-
case *flushRequest:
555-
p.internalFlush(v)
556-
case *closeProducer:
557-
p.internalClose(v)
558-
return
559-
}
560-
case connectionClosed := <-p.connectClosedCh:
561-
p.log.Info("runEventsLoop will reconnect in producer")
562-
p.reconnectToBroker(connectionClosed)
555+
event()
563556
case <-p.batchFlushTicker.C:
564557
p.internalFlushCurrentBatch()
565558
}
566559
}
567560
}
568561

562+
func (p *partitionProducer) execute(fn func()) {
563+
p.eventChan <- fn
564+
}
565+
569566
func (p *partitionProducer) Topic() string {
570567
return p.topic
571568
}
@@ -1399,7 +1396,7 @@ func (p *partitionProducer) doClose(reason error) {
13991396

14001397
p.log.Info("Closing producer")
14011398
defer close(p.dataChan)
1402-
defer close(p.cmdChan)
1399+
defer close(p.eventChan)
14031400

14041401
id := p.client.rpcClient.NewRequestID()
14051402
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
@@ -1481,7 +1478,10 @@ func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
14811478
select {
14821479
case <-ctx.Done():
14831480
return ctx.Err()
1484-
case p.cmdChan <- flushReq:
1481+
default:
1482+
p.execute(func() {
1483+
p.internalFlush(flushReq)
1484+
})
14851485
}
14861486

14871487
// wait for the flush request to complete
@@ -1514,7 +1514,9 @@ func (p *partitionProducer) Close() {
15141514
}
15151515

15161516
cp := &closeProducer{doneCh: make(chan struct{})}
1517-
p.cmdChan <- cp
1517+
p.execute(func() {
1518+
p.internalClose(cp)
1519+
})
15181520

15191521
// wait for close producer request to complete
15201522
<-cp.doneCh

pulsar/producer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2336,7 +2336,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
23362336
testProducer, err := client.CreateProducer(ProducerOptions{
23372337
Topic: newTopicName(),
23382338
DisableBlockIfQueueFull: false,
2339-
BatchingMaxPublishDelay: 100000,
2339+
BatchingMaxPublishDelay: 10 * time.Second,
23402340
BatchingMaxMessages: 1000,
23412341
})
23422342

0 commit comments

Comments
 (0)