@@ -514,25 +514,25 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
514514 if strings .Contains (errMsg , errMsgTopicNotFound ) {
515515 // when topic is deleted, we should give up reconnection.
516516 p .log .Warn ("Topic not found, stop reconnecting, close the producer" )
517- p .doClose (joinErrors (ErrTopicNotfound , err ))
517+ p .doClose (errors . Join (ErrTopicNotfound , err ))
518518 return struct {}{}, nil
519519 }
520520
521521 if strings .Contains (errMsg , errMsgTopicTerminated ) {
522522 p .log .Warn ("Topic was terminated, failing pending messages, stop reconnecting, close the producer" )
523- p .doClose (joinErrors (ErrTopicTerminated , err ))
523+ p .doClose (errors . Join (ErrTopicTerminated , err ))
524524 return struct {}{}, nil
525525 }
526526
527527 if strings .Contains (errMsg , errMsgProducerBlockedQuotaExceededException ) {
528528 p .log .Warn ("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting" )
529- p .failPendingMessages (joinErrors (ErrProducerBlockedQuotaExceeded , err ))
529+ p .failPendingMessages (errors . Join (ErrProducerBlockedQuotaExceeded , err ))
530530 return struct {}{}, nil
531531 }
532532
533533 if strings .Contains (errMsg , errMsgProducerFenced ) {
534534 p .log .Warn ("Producer was fenced, failing pending messages, stop reconnecting" )
535- p .doClose (joinErrors (ErrProducerFenced , err ))
535+ p .doClose (errors . Join (ErrProducerFenced , err ))
536536 return struct {}{}, nil
537537 }
538538
@@ -1111,18 +1111,18 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
11111111
11121112func (p * partitionProducer ) validateMsg (msg * ProducerMessage ) error {
11131113 if msg == nil {
1114- return joinErrors (ErrInvalidMessage , fmt .Errorf ("message is nil" ))
1114+ return errors . Join (ErrInvalidMessage , fmt .Errorf ("message is nil" ))
11151115 }
11161116
11171117 if msg .Value != nil && msg .Payload != nil {
1118- return joinErrors (ErrInvalidMessage , fmt .Errorf ("can not set Value and Payload both" ))
1118+ return errors . Join (ErrInvalidMessage , fmt .Errorf ("can not set Value and Payload both" ))
11191119 }
11201120
11211121 if p .options .DisableMultiSchema {
11221122 if msg .Schema != nil && p .options .Schema != nil &&
11231123 msg .Schema .GetSchemaInfo ().hash () != p .options .Schema .GetSchemaInfo ().hash () {
11241124 p .log .Errorf ("The producer %s of the topic %s is disabled the `MultiSchema`" , p .producerName , p .topic )
1125- return joinErrors (ErrSchema , fmt .Errorf ("msg schema can not match with producer schema" ))
1125+ return errors . Join (ErrSchema , fmt .Errorf ("msg schema can not match with producer schema" ))
11261126 }
11271127 }
11281128
@@ -1138,16 +1138,16 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
11381138 if txn .state .Load () != int32 (TxnOpen ) {
11391139 p .log .WithField ("state" , txn .state .Load ()).Error ("Failed to send message" +
11401140 " by a non-open transaction." )
1141- return joinErrors (ErrTransaction ,
1141+ return errors . Join (ErrTransaction ,
11421142 fmt .Errorf ("failed to send message by a non-open transaction" ))
11431143 }
11441144
11451145 if err := txn .registerProducerTopic (p .topic ); err != nil {
1146- return joinErrors (ErrTransaction , err )
1146+ return errors . Join (ErrTransaction , err )
11471147 }
11481148
11491149 if err := txn .registerSendOrAckOp (); err != nil {
1150- return joinErrors (ErrTransaction , err )
1150+ return errors . Join (ErrTransaction , err )
11511151 }
11521152
11531153 sr .transaction = txn
@@ -1173,7 +1173,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) error {
11731173 if schemaVersion == nil {
11741174 schemaVersion , err = p .getOrCreateSchema (schema .GetSchemaInfo ())
11751175 if err != nil {
1176- return joinErrors (ErrSchema , fmt .Errorf ("get schema version fail, err: %w" , err ))
1176+ return errors . Join (ErrSchema , fmt .Errorf ("get schema version fail, err: %w" , err ))
11771177 }
11781178 p .schemaCache .Put (schema .GetSchemaInfo (), schemaVersion )
11791179 }
@@ -1190,15 +1190,15 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
11901190 if sr .msg .Value != nil {
11911191 if sr .schema == nil {
11921192 p .log .Errorf ("Schema encode message failed %s" , sr .msg .Value )
1193- return joinErrors (ErrSchema , fmt .Errorf ("set schema value without setting schema" ))
1193+ return errors . Join (ErrSchema , fmt .Errorf ("set schema value without setting schema" ))
11941194 }
11951195
11961196 // payload and schema are mutually exclusive
11971197 // try to get payload from schema value only if payload is not set
11981198 schemaPayload , err := sr .schema .Encode (sr .msg .Value )
11991199 if err != nil {
12001200 p .log .WithError (err ).Errorf ("Schema encode message failed %s" , sr .msg .Value )
1201- return joinErrors (ErrSchema , err )
1201+ return errors . Join (ErrSchema , err )
12021202 }
12031203
12041204 sr .uncompressedPayload = schemaPayload
0 commit comments