Skip to content

Commit 65308ab

Browse files
committed
improve: add eventChan to execute any in the event loop
1 parent 1152cfc commit 65308ab

File tree

2 files changed

+22
-22
lines changed

2 files changed

+22
-22
lines changed

pulsar/producer_partition.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ type partitionProducer struct {
106106

107107
// Channel where app is posting messages to be published
108108
dataChan chan *sendRequest
109-
cmdChan chan interface{}
110-
connectClosedCh chan *connectionClosed
109+
eventChan chan func()
111110
publishSemaphore internal.Semaphore
112111
pendingQueue internal.BlockingQueue
113112
lastSequenceID int64
@@ -167,8 +166,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
167166
options: options,
168167
producerID: client.rpcClient.NewProducerID(),
169168
dataChan: make(chan *sendRequest, maxPendingMessages),
170-
cmdChan: make(chan interface{}, 10),
171-
connectClosedCh: make(chan *connectionClosed, 10),
169+
eventChan: make(chan func()),
172170
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
173171
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
174172
compression.Level(options.CompressionLevel)),
@@ -294,6 +292,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
294292

295293
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
296294
if err != nil {
295+
cnx.UnregisterListener(p.producerID)
297296
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
298297
if errors.Is(err, internal.ErrRequestTimeOut) {
299298
id := p.client.rpcClient.NewRequestID()
@@ -408,9 +407,10 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu
408407
assignedBrokerURL = p.client.selectServiceURL(
409408
closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls())
410409
}
411-
p.connectClosedCh <- &connectionClosed{
410+
p.log.Info("runEventsLoop will reconnect in producer")
411+
p.reconnectToBroker(&connectionClosed{
412412
assignedBrokerURL: assignedBrokerURL,
413-
}
413+
})
414414
}
415415

416416
func (p *partitionProducer) SetRedirectedClusterURI(redirectedClusterURI string) {
@@ -545,27 +545,22 @@ func (p *partitionProducer) runEventsLoop() {
545545
return
546546
}
547547
p.internalSend(data)
548-
case cmd, ok := <-p.cmdChan:
549-
// when doClose() is call, p.dataChan will be closed, cmd will be nil
548+
case event, ok := <-p.eventChan:
549+
// when doClose() is call, p.eventChan will be closed, cmd will be nil
550550
if !ok {
551551
return
552552
}
553-
switch v := cmd.(type) {
554-
case *flushRequest:
555-
p.internalFlush(v)
556-
case *closeProducer:
557-
p.internalClose(v)
558-
return
559-
}
560-
case connectionClosed := <-p.connectClosedCh:
561-
p.log.Info("runEventsLoop will reconnect in producer")
562-
p.reconnectToBroker(connectionClosed)
553+
event()
563554
case <-p.batchFlushTicker.C:
564555
p.internalFlushCurrentBatch()
565556
}
566557
}
567558
}
568559

560+
func (p *partitionProducer) execute(fn func()) {
561+
p.eventChan <- fn
562+
}
563+
569564
func (p *partitionProducer) Topic() string {
570565
return p.topic
571566
}
@@ -1399,7 +1394,7 @@ func (p *partitionProducer) doClose(reason error) {
13991394

14001395
p.log.Info("Closing producer")
14011396
defer close(p.dataChan)
1402-
defer close(p.cmdChan)
1397+
defer close(p.eventChan)
14031398

14041399
id := p.client.rpcClient.NewRequestID()
14051400
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
@@ -1481,7 +1476,10 @@ func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
14811476
select {
14821477
case <-ctx.Done():
14831478
return ctx.Err()
1484-
case p.cmdChan <- flushReq:
1479+
default:
1480+
p.execute(func() {
1481+
p.internalFlush(flushReq)
1482+
})
14851483
}
14861484

14871485
// wait for the flush request to complete
@@ -1514,7 +1512,9 @@ func (p *partitionProducer) Close() {
15141512
}
15151513

15161514
cp := &closeProducer{doneCh: make(chan struct{})}
1517-
p.cmdChan <- cp
1515+
p.execute(func() {
1516+
p.internalClose(cp)
1517+
})
15181518

15191519
// wait for close producer request to complete
15201520
<-cp.doneCh

pulsar/producer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2336,7 +2336,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
23362336
testProducer, err := client.CreateProducer(ProducerOptions{
23372337
Topic: newTopicName(),
23382338
DisableBlockIfQueueFull: false,
2339-
BatchingMaxPublishDelay: 100000,
2339+
BatchingMaxPublishDelay: 10 * time.Second,
23402340
BatchingMaxMessages: 1000,
23412341
})
23422342

0 commit comments

Comments
 (0)