43
43
stopAfterMsgsLeft chan int
44
44
withStopAfter bool
45
45
runningFetch * fetchResult
46
+ subscription * orderedSubscription
46
47
sync.Mutex
47
48
}
48
49
@@ -92,7 +93,8 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
92
93
return nil , fmt .Errorf ("%w: %s" , ErrInvalidOption , err )
93
94
}
94
95
c .userErrHandler = consumeOpts .ErrHandler
95
- opts = append (opts , ConsumeErrHandler (c .errHandler (c .serial )))
96
+ opts = append (opts , consumeReconnectNotify (),
97
+ ConsumeErrHandler (c .errHandler (c .serial )))
96
98
if consumeOpts .StopAfter > 0 {
97
99
c .withStopAfter = true
98
100
c .stopAfter = consumeOpts .StopAfter
@@ -105,6 +107,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
105
107
consumer : c ,
106
108
done : make (chan struct {}, 1 ),
107
109
}
110
+ c .subscription = sub
108
111
internalHandler := func (serial int ) func (msg Msg ) {
109
112
return func (msg Msg ) {
110
113
// handler is a noop if message was delivered for a consumer with different serial
@@ -197,13 +200,13 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err
197
200
return func (cc ConsumeContext , err error ) {
198
201
c .Lock ()
199
202
defer c .Unlock ()
200
- if c .userErrHandler != nil && ! errors .Is (err , errOrderedSequenceMismatch ) {
203
+ if c .userErrHandler != nil && ! errors .Is (err , errOrderedSequenceMismatch ) && ! errors . Is ( err , errConnected ) {
201
204
c .userErrHandler (cc , err )
202
205
}
203
206
if errors .Is (err , ErrNoHeartbeat ) ||
204
207
errors .Is (err , errOrderedSequenceMismatch ) ||
205
208
errors .Is (err , ErrConsumerDeleted ) ||
206
- errors .Is (err , ErrConsumerNotFound ) {
209
+ errors .Is (err , errConnected ) {
207
210
// only reset if serial matches the current consumer serial and there is no reset in progress
208
211
if serial == c .serial && atomic .LoadUint32 (& c .resetInProgress ) == 0 {
209
212
atomic .StoreUint32 (& c .resetInProgress , 1 )
@@ -235,7 +238,9 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
235
238
if err != nil {
236
239
return nil , fmt .Errorf ("%w: %s" , ErrInvalidOption , err )
237
240
}
238
- opts = append (opts , WithMessagesErrOnMissingHeartbeat (true ))
241
+ opts = append (opts ,
242
+ WithMessagesErrOnMissingHeartbeat (true ),
243
+ messagesReconnectNotify ())
239
244
c .stopAfterMsgsLeft = make (chan int , 1 )
240
245
if consumeOpts .StopAfter > 0 {
241
246
c .withStopAfter = true
@@ -255,6 +260,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
255
260
opts : opts ,
256
261
done : make (chan struct {}, 1 ),
257
262
}
263
+ c .subscription = sub
258
264
259
265
return sub , nil
260
266
}
@@ -367,6 +373,11 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro
367
373
}
368
374
c .currentConsumer .Unlock ()
369
375
c .consumerType = consumerTypeFetch
376
+ sub := orderedSubscription {
377
+ consumer : c ,
378
+ done : make (chan struct {}),
379
+ }
380
+ c .subscription = & sub
370
381
err := c .reset ()
371
382
if err != nil {
372
383
return nil , err
@@ -397,6 +408,11 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat
397
408
c .cursor .streamSeq = c .runningFetch .sseq
398
409
}
399
410
c .consumerType = consumerTypeFetch
411
+ sub := orderedSubscription {
412
+ consumer : c ,
413
+ done : make (chan struct {}),
414
+ }
415
+ c .subscription = & sub
400
416
err := c .reset ()
401
417
if err != nil {
402
418
return nil , err
@@ -425,6 +441,11 @@ func (c *orderedConsumer) FetchNoWait(batch int) (MessageBatch, error) {
425
441
return nil , ErrOrderedConsumerConcurrentRequests
426
442
}
427
443
c .consumerType = consumerTypeFetch
444
+ sub := orderedSubscription {
445
+ consumer : c ,
446
+ done : make (chan struct {}),
447
+ }
448
+ c .subscription = & sub
428
449
err := c .reset ()
429
450
if err != nil {
430
451
return nil , err
@@ -481,52 +502,42 @@ func (c *orderedConsumer) reset() error {
481
502
}
482
503
consName := c .currentConsumer .CachedInfo ().Name
483
504
c .currentConsumer .Unlock ()
484
- var err error
485
- for i := 0 ; ; i ++ {
486
- if c .cfg .MaxResetAttempts > 0 && i == c .cfg .MaxResetAttempts {
487
- return fmt .Errorf ("%w: maximum number of delete attempts reached: %s" , ErrOrderedConsumerReset , err )
488
- }
505
+ go func () {
489
506
ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
490
- err = c .jetStream .DeleteConsumer (ctx , c .stream , consName )
507
+ _ = c .jetStream .DeleteConsumer (ctx , c .stream , consName )
491
508
cancel ()
492
- if err != nil {
493
- if errors .Is (err , ErrConsumerNotFound ) {
494
- break
495
- }
496
- if errors .Is (err , nats .ErrTimeout ) || errors .Is (err , context .DeadlineExceeded ) {
497
- continue
498
- }
499
- return err
500
- }
501
- break
502
- }
509
+ }()
503
510
}
504
511
505
512
c .cursor .deliverSeq = 0
506
513
consumerConfig := c .getConsumerConfig ()
507
514
508
515
var err error
509
516
var cons Consumer
510
- for i := 0 ; ; i ++ {
511
- if c .cfg .MaxResetAttempts > 0 && i == c .cfg .MaxResetAttempts {
512
- return fmt .Errorf ("%w: maximum number of create consumer attempts reached: %s" , ErrOrderedConsumerReset , err )
517
+
518
+ backoffOpts := backoffOpts {
519
+ attempts : c .cfg .MaxResetAttempts ,
520
+ initialInterval : time .Second ,
521
+ factor : 2 ,
522
+ maxInterval : 10 * time .Second ,
523
+ cancel : c .subscription .done ,
524
+ }
525
+ err = retryWithBackoff (func (attempt int ) (bool , error ) {
526
+ isClosed := atomic .LoadUint32 (& c .subscription .closed ) == 1
527
+ if isClosed {
528
+ return false , nil
513
529
}
514
530
ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
531
+ defer cancel ()
515
532
cons , err = c .jetStream .CreateOrUpdateConsumer (ctx , c .stream , * consumerConfig )
516
533
if err != nil {
517
- if errors .Is (err , ErrConsumerNotFound ) {
518
- cancel ()
519
- break
520
- }
521
- if errors .Is (err , nats .ErrTimeout ) || errors .Is (err , context .DeadlineExceeded ) {
522
- cancel ()
523
- continue
524
- }
525
- cancel ()
526
- return err
534
+ return true , err
527
535
}
528
- cancel ()
529
- break
536
+ c .currentConsumer = cons .(* pullConsumer )
537
+ return false , nil
538
+ }, backoffOpts )
539
+ if err != nil {
540
+ return err
530
541
}
531
542
c .currentConsumer = cons .(* pullConsumer )
532
543
return nil
@@ -548,6 +559,10 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
548
559
// otherwise, start from the next sequence
549
560
nextSeq = c .cursor .streamSeq + 1
550
561
}
562
+
563
+ if c .cfg .MaxResetAttempts == 0 {
564
+ c .cfg .MaxResetAttempts = - 1
565
+ }
551
566
name := fmt .Sprintf ("%s_%d" , c .namePrefix , c .serial )
552
567
cfg := & ConsumerConfig {
553
568
Name : name ,
@@ -564,6 +579,9 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
564
579
} else {
565
580
cfg .FilterSubjects = c .cfg .FilterSubjects
566
581
}
582
+ if c .cfg .InactiveThreshold != 0 {
583
+ cfg .InactiveThreshold = c .cfg .InactiveThreshold
584
+ }
567
585
568
586
if c .serial != 1 {
569
587
return cfg
@@ -589,9 +607,6 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
589
607
cfg .DeliverPolicy = DeliverByStartTimePolicy
590
608
cfg .OptStartTime = c .cfg .OptStartTime
591
609
}
592
- if c .cfg .InactiveThreshold != 0 {
593
- cfg .InactiveThreshold = c .cfg .InactiveThreshold
594
- }
595
610
596
611
return cfg
597
612
}
@@ -612,6 +627,20 @@ func messagesStopAfterNotify(numMsgs int, msgsLeftAfterStop chan int) PullMessag
612
627
})
613
628
}
614
629
630
+ func consumeReconnectNotify () PullConsumeOpt {
631
+ return pullOptFunc (func (opts * consumeOpts ) error {
632
+ opts .notifyOnReconnect = true
633
+ return nil
634
+ })
635
+ }
636
+
637
+ func messagesReconnectNotify () PullMessagesOpt {
638
+ return pullOptFunc (func (opts * consumeOpts ) error {
639
+ opts .notifyOnReconnect = true
640
+ return nil
641
+ })
642
+ }
643
+
615
644
// Info returns information about the ordered consumer.
616
645
// Note that this method will fetch the latest instance of the
617
646
// consumer from the server, which can be deleted by the library at any time.
@@ -652,3 +681,91 @@ func (c *orderedConsumer) CachedInfo() *ConsumerInfo {
652
681
}
653
682
return c .currentConsumer .info
654
683
}
684
+
685
+ type backoffOpts struct {
686
+ // total retry attempts
687
+ // -1 for unlimited
688
+ attempts int
689
+ // initial interval after which first retry will be performed
690
+ // defaults to 1s
691
+ initialInterval time.Duration
692
+ // determines whether first function execution should be performed immediately
693
+ disableInitialExecution bool
694
+ // multiplier on each attempt
695
+ // defaults to 2
696
+ factor float64
697
+ // max interval between retries
698
+ // after reaching this value, all subsequent
699
+ // retries will be performed with this interval
700
+ // defaults to 1 minute
701
+ maxInterval time.Duration
702
+ // custom backoff intervals
703
+ // if set, overrides all other options except attempts
704
+ // if attempts are set, then the last interval will be used
705
+ // for all subsequent retries after reaching the limit
706
+ customBackoff []time.Duration
707
+ // cancel channel
708
+ // if set, retry will be canceled when this channel is closed
709
+ cancel <- chan struct {}
710
+ }
711
+
712
+ func retryWithBackoff (f func (int ) (bool , error ), opts backoffOpts ) error {
713
+ var err error
714
+ var shouldContinue bool
715
+ // if custom backoff is set, use it instead of other options
716
+ if len (opts .customBackoff ) > 0 {
717
+ if opts .attempts != 0 {
718
+ return fmt .Errorf ("cannot use custom backoff intervals when attempts are set" )
719
+ }
720
+ for i , interval := range opts .customBackoff {
721
+ select {
722
+ case <- opts .cancel :
723
+ return nil
724
+ case <- time .After (interval ):
725
+ }
726
+ shouldContinue , err = f (i )
727
+ if ! shouldContinue {
728
+ return err
729
+ }
730
+ }
731
+ return err
732
+ }
733
+
734
+ // set default options
735
+ if opts .initialInterval == 0 {
736
+ opts .initialInterval = 1 * time .Second
737
+ }
738
+ if opts .factor == 0 {
739
+ opts .factor = 2
740
+ }
741
+ if opts .maxInterval == 0 {
742
+ opts .maxInterval = 1 * time .Minute
743
+ }
744
+ if opts .attempts == 0 {
745
+ return fmt .Errorf ("retry attempts have to be set when not using custom backoff intervals" )
746
+ }
747
+ interval := opts .initialInterval
748
+ for i := 0 ; ; i ++ {
749
+ if i == 0 && opts .disableInitialExecution {
750
+ time .Sleep (interval )
751
+ continue
752
+ }
753
+ shouldContinue , err = f (i )
754
+ if ! shouldContinue {
755
+ return err
756
+ }
757
+ if opts .attempts > 0 && i >= opts .attempts - 1 {
758
+ break
759
+ }
760
+ select {
761
+ case <- opts .cancel :
762
+ return nil
763
+ case <- time .After (interval ):
764
+ }
765
+ interval = time .Duration (float64 (interval ) * opts .factor )
766
+ if interval >= opts .maxInterval {
767
+ interval = opts .maxInterval
768
+ }
769
+ }
770
+ return err
771
+ }
0 commit comments