@@ -467,6 +467,14 @@ func (p *partitionProducer) Name() string {
467
467
return p .producerName
468
468
}
469
469
470
+ func runCallback (cb func (MessageID , * ProducerMessage , error ), id MessageID , msg * ProducerMessage , err error ) {
471
+ if cb == nil {
472
+ return
473
+ }
474
+
475
+ cb (id , msg , err )
476
+ }
477
+
470
478
func (p * partitionProducer ) internalSend (request * sendRequest ) {
471
479
p .log .Debug ("Received send request: " , * request .msg )
472
480
@@ -480,7 +488,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
480
488
var err error
481
489
if msg .Value != nil && msg .Payload != nil {
482
490
p .log .Error ("Can not set Value and Payload both" )
483
- request .callback ( nil , request .msg , errors .New ("can not set Value and Payload both" ))
491
+ runCallback ( request .callback , nil , request .msg , errors .New ("can not set Value and Payload both" ))
484
492
return
485
493
}
486
494
@@ -494,7 +502,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
494
502
if msg .Schema != nil && p .options .Schema != nil &&
495
503
msg .Schema .GetSchemaInfo ().hash () != p .options .Schema .GetSchemaInfo ().hash () {
496
504
p .releaseSemaphoreAndMem (uncompressedPayloadSize )
497
- request .callback ( nil , request .msg , fmt .Errorf ("msg schema can not match with producer schema" ))
505
+ runCallback ( request .callback , nil , request .msg , fmt .Errorf ("msg schema can not match with producer schema" ))
498
506
p .log .WithError (err ).Errorf ("The producer %s of the topic %s is disabled the `MultiSchema`" , p .producerName , p .topic )
499
507
return
500
508
}
@@ -513,7 +521,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
513
521
schemaPayload , err = schema .Encode (msg .Value )
514
522
if err != nil {
515
523
p .releaseSemaphoreAndMem (uncompressedPayloadSize )
516
- request .callback ( nil , request .msg , newError (SchemaFailure , err .Error ()))
524
+ runCallback ( request .callback , nil , request .msg , newError (SchemaFailure , err .Error ()))
517
525
p .log .WithError (err ).Errorf ("Schema encode message failed %s" , msg .Value )
518
526
return
519
527
}
@@ -530,7 +538,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
530
538
if err != nil {
531
539
p .releaseSemaphoreAndMem (uncompressedPayloadSize )
532
540
p .log .WithError (err ).Error ("get schema version fail" )
533
- request .callback ( nil , request .msg , fmt .Errorf ("get schema version fail, err: %w" , err ))
541
+ runCallback ( request .callback , nil , request .msg , fmt .Errorf ("get schema version fail, err: %w" , err ))
534
542
return
535
543
}
536
544
p .schemaCache .Put (schema .GetSchemaInfo (), schemaVersion )
@@ -589,7 +597,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
589
597
// if msg is too large and chunking is disabled
590
598
if checkSize > maxMessageSize && ! p .options .EnableChunking {
591
599
p .releaseSemaphoreAndMem (uncompressedPayloadSize )
592
- request .callback ( nil , request .msg , errMessageTooLarge )
600
+ runCallback ( request .callback , nil , request .msg , errMessageTooLarge )
593
601
p .log .WithError (errMessageTooLarge ).
594
602
WithField ("size" , checkSize ).
595
603
WithField ("properties" , msg .Properties ).
@@ -608,7 +616,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
608
616
payloadChunkSize = int (p ._getConn ().GetMaxMessageSize ()) - proto .Size (mm )
609
617
if payloadChunkSize <= 0 {
610
618
p .releaseSemaphoreAndMem (uncompressedPayloadSize )
611
- request .callback ( nil , msg , errMetaTooLarge )
619
+ runCallback ( request .callback , nil , msg , errMetaTooLarge )
612
620
p .log .WithError (errMetaTooLarge ).
613
621
WithField ("metadata size" , proto .Size (mm )).
614
622
WithField ("properties" , msg .Properties ).
@@ -683,7 +691,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
683
691
if ok := addRequestToBatch (smm , p , uncompressedPayload , request , msg , deliverAt , schemaVersion ,
684
692
multiSchemaEnabled ); ! ok {
685
693
p .releaseSemaphoreAndMem (uncompressedPayloadSize )
686
- request .callback ( nil , request .msg , errFailAddToBatch )
694
+ runCallback ( request .callback , nil , request .msg , errFailAddToBatch )
687
695
p .log .WithField ("size" , uncompressedSize ).
688
696
WithField ("properties" , msg .Properties ).
689
697
Error ("unable to add message to batch" )
@@ -835,7 +843,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
835
843
)
836
844
}
837
845
if err != nil {
838
- request .callback ( nil , request .msg , err )
846
+ runCallback ( request .callback , nil , request .msg , err )
839
847
p .releaseSemaphoreAndMem (int64 (len (msg .Payload )))
840
848
p .log .WithError (err ).Errorf ("Single message serialize failed %s" , msg .Value )
841
849
return
@@ -875,7 +883,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
875
883
if err != nil {
876
884
for _ , cb := range callbacks {
877
885
if sr , ok := cb .(* sendRequest ); ok {
878
- sr .callback ( nil , sr .msg , err )
886
+ runCallback ( sr .callback , nil , sr .msg , err )
879
887
}
880
888
}
881
889
if errors .Is (err , internal .ErrExceedMaxMessageSize ) {
@@ -985,7 +993,7 @@ func (p *partitionProducer) failTimeoutMessages() {
985
993
986
994
if sr .callback != nil {
987
995
sr .callbackOnce .Do (func () {
988
- sr .callback ( nil , sr .msg , errSendTimeout )
996
+ runCallback ( sr .callback , nil , sr .msg , errSendTimeout )
989
997
})
990
998
}
991
999
if sr .transaction != nil {
@@ -1018,7 +1026,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
1018
1026
if errs [i ] != nil {
1019
1027
for _ , cb := range callbacks [i ] {
1020
1028
if sr , ok := cb .(* sendRequest ); ok {
1021
- sr .callback ( nil , sr .msg , errs [i ])
1029
+ runCallback ( sr .callback , nil , sr .msg , errs [i ])
1022
1030
}
1023
1031
}
1024
1032
if errors .Is (errs [i ], internal .ErrExceedMaxMessageSize ) {
@@ -1106,34 +1114,34 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
1106
1114
1107
1115
func (p * partitionProducer ) internalSendAsync (ctx context.Context , msg * ProducerMessage ,
1108
1116
callback func (MessageID , * ProducerMessage , error ), flushImmediately bool ) {
1109
- //Register transaction operation to transaction and the transaction coordinator.
1117
+ // Register transaction operation to transaction and the transaction coordinator.
1110
1118
var newCallback func (MessageID , * ProducerMessage , error )
1111
1119
if msg .Transaction != nil {
1112
1120
transactionImpl := (msg .Transaction ).(* transaction )
1113
1121
if transactionImpl .state != TxnOpen {
1114
1122
p .log .WithField ("state" , transactionImpl .state ).Error ("Failed to send message" +
1115
1123
" by a non-open transaction." )
1116
- callback ( nil , msg , newError (InvalidStatus , "Failed to send message by a non-open transaction." ))
1124
+ runCallback ( callback , nil , msg , newError (InvalidStatus , "Failed to send message by a non-open transaction." ))
1117
1125
return
1118
1126
}
1119
1127
1120
1128
if err := transactionImpl .registerProducerTopic (p .topic ); err != nil {
1121
- callback ( nil , msg , err )
1129
+ runCallback ( callback , nil , msg , err )
1122
1130
return
1123
1131
}
1124
1132
if err := transactionImpl .registerSendOrAckOp (); err != nil {
1125
- callback ( nil , msg , err )
1133
+ runCallback ( callback , nil , msg , err )
1126
1134
}
1127
1135
newCallback = func (id MessageID , producerMessage * ProducerMessage , err error ) {
1128
- callback ( id , producerMessage , err )
1136
+ runCallback ( callback , id , producerMessage , err )
1129
1137
transactionImpl .endSendOrAckOp (err )
1130
1138
}
1131
1139
} else {
1132
1140
newCallback = callback
1133
1141
}
1134
1142
if p .getProducerState () != producerReady {
1135
1143
// Producer is closing
1136
- newCallback ( nil , msg , errProducerClosed )
1144
+ runCallback ( newCallback , nil , msg , errProducerClosed )
1137
1145
return
1138
1146
}
1139
1147
@@ -1253,9 +1261,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
1253
1261
}
1254
1262
1255
1263
if sr .totalChunks <= 1 || sr .chunkID == sr .totalChunks - 1 {
1256
- if sr .callback != nil {
1257
- sr .callback (msgID , sr .msg , nil )
1258
- }
1264
+ runCallback (sr .callback , msgID , sr .msg , nil )
1259
1265
p .options .Interceptors .OnSendAcknowledgement (p , sr .msg , msgID )
1260
1266
}
1261
1267
}
@@ -1406,27 +1412,23 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) {
1406
1412
func (p * partitionProducer ) canAddToQueue (sr * sendRequest , uncompressedPayloadSize int64 ) bool {
1407
1413
if p .options .DisableBlockIfQueueFull {
1408
1414
if ! p .publishSemaphore .TryAcquire () {
1409
- if sr .callback != nil {
1410
- sr .callback (nil , sr .msg , errSendQueueIsFull )
1411
- }
1415
+ runCallback (sr .callback , nil , sr .msg , errSendQueueIsFull )
1412
1416
return false
1413
1417
}
1414
1418
if ! p .client .memLimit .TryReserveMemory (uncompressedPayloadSize ) {
1415
1419
p .publishSemaphore .Release ()
1416
- if sr .callback != nil {
1417
- sr .callback (nil , sr .msg , errMemoryBufferIsFull )
1418
- }
1420
+ runCallback (sr .callback , nil , sr .msg , errMemoryBufferIsFull )
1419
1421
return false
1420
1422
}
1421
1423
1422
1424
} else {
1423
1425
if ! p .publishSemaphore .Acquire (sr .ctx ) {
1424
- sr .callback ( nil , sr .msg , errContextExpired )
1426
+ runCallback ( sr .callback , nil , sr .msg , errContextExpired )
1425
1427
return false
1426
1428
}
1427
1429
if ! p .client .memLimit .ReserveMemory (sr .ctx , uncompressedPayloadSize ) {
1428
1430
p .publishSemaphore .Release ()
1429
- sr .callback ( nil , sr .msg , errContextExpired )
1431
+ runCallback ( sr .callback , nil , sr .msg , errContextExpired )
1430
1432
return false
1431
1433
}
1432
1434
}
0 commit comments