@@ -710,17 +710,19 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer,
710
710
uncompressedPayload []byte ,
711
711
request * sendRequest , msg * ProducerMessage , deliverAt time.Time ,
712
712
schemaVersion []byte , multiSchemaEnabled bool ) bool {
713
- var ok bool
713
+ var useTxn bool
714
+ var mostSigBits uint64
715
+ var leastSigBits uint64
714
716
if request .transaction != nil {
715
717
txnID := request .transaction .GetTxnID ()
716
- ok = p .batchBuilder .Add (smm , p .sequenceIDGenerator , uncompressedPayload , request ,
717
- msg .ReplicationClusters , deliverAt , schemaVersion , multiSchemaEnabled , true , txnID .MostSigBits ,
718
- txnID .LeastSigBits )
719
- } else {
720
- ok = p .batchBuilder .Add (smm , p .sequenceIDGenerator , uncompressedPayload , request ,
721
- msg .ReplicationClusters , deliverAt , schemaVersion , multiSchemaEnabled , false , 0 , 0 )
718
+ useTxn = true
719
+ mostSigBits = txnID .MostSigBits
720
+ leastSigBits = txnID .LeastSigBits
722
721
}
723
- return ok
722
+
723
+ return p .batchBuilder .Add (smm , p .sequenceIDGenerator , uncompressedPayload , request ,
724
+ msg .ReplicationClusters , deliverAt , schemaVersion , multiSchemaEnabled , useTxn , mostSigBits ,
725
+ leastSigBits )
724
726
}
725
727
726
728
func (p * partitionProducer ) genMetadata (msg * ProducerMessage ,
@@ -764,6 +766,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *Pro
764
766
}
765
767
}
766
768
769
+ func (p * partitionProducer ) updateSingleMessageMetadataSeqID (smm * pb.SingleMessageMetadata , msg * ProducerMessage ) {
770
+ if msg .SequenceID != nil {
771
+ smm .SequenceId = proto .Uint64 (uint64 (* msg .SequenceID ))
772
+ } else {
773
+ smm .SequenceId = proto .Uint64 (internal .GetAndAdd (p .sequenceIDGenerator , 1 ))
774
+ }
775
+ }
776
+
767
777
func (p * partitionProducer ) genSingleMessageMetadataInBatch (msg * ProducerMessage ,
768
778
uncompressedSize int ) (smm * pb.SingleMessageMetadata ) {
769
779
smm = & pb.SingleMessageMetadata {
@@ -786,14 +796,7 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage
786
796
smm .Properties = internal .ConvertFromStringMap (msg .Properties )
787
797
}
788
798
789
- var sequenceID uint64
790
- if msg .SequenceID != nil {
791
- sequenceID = uint64 (* msg .SequenceID )
792
- } else {
793
- sequenceID = internal .GetAndAdd (p .sequenceIDGenerator , 1 )
794
- }
795
-
796
- smm .SequenceId = proto .Uint64 (sequenceID )
799
+ p .updateSingleMessageMetadataSeqID (smm , msg )
797
800
798
801
return
799
802
}
@@ -813,35 +816,30 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
813
816
}
814
817
815
818
sid := * mm .SequenceId
816
- var err error
819
+ var useTxn bool
820
+ var mostSigBits uint64
821
+ var leastSigBits uint64
822
+
817
823
if request .transaction != nil {
818
824
txnID := request .transaction .GetTxnID ()
819
- err = internal .SingleSend (
820
- buffer ,
821
- p .producerID ,
822
- sid ,
823
- mm ,
824
- payloadBuf ,
825
- p .encryptor ,
826
- maxMessageSize ,
827
- true ,
828
- txnID .MostSigBits ,
829
- txnID .LeastSigBits ,
830
- )
831
- } else {
832
- err = internal .SingleSend (
833
- buffer ,
834
- p .producerID ,
835
- sid ,
836
- mm ,
837
- payloadBuf ,
838
- p .encryptor ,
839
- maxMessageSize ,
840
- false ,
841
- 0 ,
842
- 0 ,
843
- )
844
- }
825
+ useTxn = true
826
+ mostSigBits = txnID .MostSigBits
827
+ leastSigBits = txnID .LeastSigBits
828
+ }
829
+
830
+ err := internal .SingleSend (
831
+ buffer ,
832
+ p .producerID ,
833
+ sid ,
834
+ mm ,
835
+ payloadBuf ,
836
+ p .encryptor ,
837
+ maxMessageSize ,
838
+ useTxn ,
839
+ mostSigBits ,
840
+ leastSigBits ,
841
+ )
842
+
845
843
if err != nil {
846
844
runCallback (request .callback , nil , request .msg , err )
847
845
p .releaseSemaphoreAndMem (int64 (len (msg .Payload )))
@@ -1001,7 +999,7 @@ func (p *partitionProducer) failTimeoutMessages() {
1001
999
}
1002
1000
}
1003
1001
1004
- // flag the send has completed with error, flush make no effect
1002
+ // flag the sending has completed with error, flush make no effect
1005
1003
pi .Complete ()
1006
1004
pi .Unlock ()
1007
1005
@@ -1116,8 +1114,10 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
1116
1114
callback func (MessageID , * ProducerMessage , error ), flushImmediately bool ) {
1117
1115
// Register transaction operation to transaction and the transaction coordinator.
1118
1116
var newCallback func (MessageID , * ProducerMessage , error )
1117
+ var txn * transaction
1119
1118
if msg .Transaction != nil {
1120
1119
transactionImpl := (msg .Transaction ).(* transaction )
1120
+ txn = transactionImpl
1121
1121
if transactionImpl .state != TxnOpen {
1122
1122
p .log .WithField ("state" , transactionImpl .state ).Error ("Failed to send message" +
1123
1123
" by a non-open transaction." )
@@ -1150,10 +1150,6 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
1150
1150
1151
1151
// callbackOnce make sure the callback is only invoked once in chunking
1152
1152
callbackOnce := & sync.Once {}
1153
- var txn * transaction
1154
- if msg .Transaction != nil {
1155
- txn = (msg .Transaction ).(* transaction )
1156
- }
1157
1153
sr := & sendRequest {
1158
1154
ctx : ctx ,
1159
1155
msg : msg ,
@@ -1398,7 +1394,7 @@ func (p *partitionProducer) _setConn(conn internal.Connection) {
1398
1394
// _getConn returns internal connection field of this partition producer atomically.
1399
1395
// Note: should only be called by this partition producer before attempting to use the connection
1400
1396
func (p * partitionProducer ) _getConn () internal.Connection {
1401
- // Invariant: The conn must be non-nill for the lifetime of the partitionProducer.
1397
+ // Invariant: The conn must be non-nil for the lifetime of the partitionProducer.
1402
1398
// For this reason we leave this cast unchecked and panic() if the
1403
1399
// invariant is broken
1404
1400
return p .conn .Load ().(internal.Connection )
0 commit comments