Skip to content

Commit 005a6f2

Browse files
authored
[IMPROVED] Ordered consumer creation and initial config settings (#1645)
This changes a few things around creating ordered consumers: - initial ordered consumer creation is now done without retries - fixed an issue where start seq could be invalid when resetting a consumer which did not receive any messages - simplified getConsumerConfig() Signed-off-by: Piotr Piotrowski <[email protected]>
1 parent 1deccaf commit 005a6f2

File tree

4 files changed

+123
-13
lines changed

4 files changed

+123
-13
lines changed

jetstream/jetstream.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -739,13 +739,12 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
739739
namePrefix: nuid.Next(),
740740
doReset: make(chan struct{}, 1),
741741
}
742-
if cfg.OptStartSeq != 0 {
743-
oc.cursor.streamSeq = cfg.OptStartSeq
744-
}
745-
err := oc.reset()
742+
consCfg := oc.getConsumerConfig()
743+
cons, err := js.CreateOrUpdateConsumer(ctx, stream, *consCfg)
746744
if err != nil {
747745
return nil, err
748746
}
747+
oc.currentConsumer = cons.(*pullConsumer)
749748

750749
return oc, nil
751750
}

jetstream/ordered.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -488,9 +488,8 @@ func (c *orderedConsumer) reset() error {
488488
}
489489
}
490490

491-
seq := c.cursor.streamSeq + 1
492491
c.cursor.deliverSeq = 0
493-
consumerConfig := c.getConsumerConfigForSeq(seq)
492+
consumerConfig := c.getConsumerConfig()
494493

495494
var err error
496495
var cons Consumer
@@ -519,13 +518,27 @@ func (c *orderedConsumer) reset() error {
519518
return nil
520519
}
521520

522-
func (c *orderedConsumer) getConsumerConfigForSeq(seq uint64) *ConsumerConfig {
521+
func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
523522
c.serial++
523+
var nextSeq uint64
524+
525+
// if stream sequence is not initialized, no message was consumed yet
526+
// therefore, start from the beginning (either from 1 or from the provided sequence)
527+
if c.cursor.streamSeq == 0 {
528+
if c.cfg.OptStartSeq != 0 {
529+
nextSeq = c.cfg.OptStartSeq
530+
} else {
531+
nextSeq = 1
532+
}
533+
} else {
534+
// otherwise, start from the next sequence
535+
nextSeq = c.cursor.streamSeq + 1
536+
}
524537
name := fmt.Sprintf("%s_%d", c.namePrefix, c.serial)
525538
cfg := &ConsumerConfig{
526539
Name: name,
527540
DeliverPolicy: DeliverByStartSequencePolicy,
528-
OptStartSeq: seq,
541+
OptStartSeq: nextSeq,
529542
AckPolicy: AckNonePolicy,
530543
InactiveThreshold: 5 * time.Minute,
531544
Replicas: 1,
@@ -538,7 +551,7 @@ func (c *orderedConsumer) getConsumerConfigForSeq(seq uint64) *ConsumerConfig {
538551
cfg.FilterSubjects = c.cfg.FilterSubjects
539552
}
540553

541-
if seq != c.cfg.OptStartSeq+1 {
554+
if c.serial != 1 {
542555
return cfg
543556
}
544557

jetstream/stream.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,12 @@ func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig)
273273
namePrefix: nuid.Next(),
274274
doReset: make(chan struct{}, 1),
275275
}
276-
if cfg.OptStartSeq != 0 {
277-
oc.cursor.streamSeq = cfg.OptStartSeq
278-
}
279-
err := oc.reset()
276+
consCfg := oc.getConsumerConfig()
277+
cons, err := s.CreateOrUpdateConsumer(ctx, *consCfg)
280278
if err != nil {
281279
return nil, err
282280
}
281+
oc.currentConsumer = cons.(*pullConsumer)
283282

284283
return oc, nil
285284
}

jetstream/test/ordered_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,105 @@ func TestOrderedConsumerConsume(t *testing.T) {
8686
l.Stop()
8787
})
8888

89+
t.Run("reset consumer before receiving any messages", func(t *testing.T) {
90+
srv := RunBasicJetStreamServer()
91+
defer shutdownJSServerAndRemoveStorage(t, srv)
92+
nc, err := nats.Connect(srv.ClientURL())
93+
if err != nil {
94+
t.Fatalf("Unexpected error: %v", err)
95+
}
96+
97+
js, err := jetstream.New(nc)
98+
if err != nil {
99+
t.Fatalf("Unexpected error: %v", err)
100+
}
101+
defer nc.Close()
102+
103+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
104+
defer cancel()
105+
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
106+
if err != nil {
107+
t.Fatalf("Unexpected error: %v", err)
108+
}
109+
c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
110+
if err != nil {
111+
t.Fatalf("Unexpected error: %v", err)
112+
}
113+
114+
wg := &sync.WaitGroup{}
115+
l, err := c.Consume(func(msg jetstream.Msg) {
116+
wg.Done()
117+
})
118+
if err != nil {
119+
t.Fatalf("Unexpected error: %v", err)
120+
}
121+
time.Sleep(500 * time.Millisecond)
122+
123+
name := c.CachedInfo().Name
124+
if err := s.DeleteConsumer(ctx, name); err != nil {
125+
t.Fatal(err)
126+
}
127+
wg.Add(len(testMsgs))
128+
publishTestMsgs(t, nc)
129+
wg.Wait()
130+
131+
l.Stop()
132+
})
133+
134+
t.Run("reset consumer before receiving any messages with custom start seq", func(t *testing.T) {
135+
srv := RunBasicJetStreamServer()
136+
defer shutdownJSServerAndRemoveStorage(t, srv)
137+
nc, err := nats.Connect(srv.ClientURL())
138+
if err != nil {
139+
t.Fatalf("Unexpected error: %v", err)
140+
}
141+
142+
js, err := jetstream.New(nc)
143+
if err != nil {
144+
t.Fatalf("Unexpected error: %v", err)
145+
}
146+
defer nc.Close()
147+
148+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
149+
defer cancel()
150+
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
151+
if err != nil {
152+
t.Fatalf("Unexpected error: %v", err)
153+
}
154+
c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{DeliverPolicy: jetstream.DeliverByStartSequencePolicy, OptStartSeq: 3})
155+
if err != nil {
156+
t.Fatalf("Unexpected error: %v", err)
157+
}
158+
159+
wg := &sync.WaitGroup{}
160+
l, err := c.Consume(func(msg jetstream.Msg) {
161+
wg.Done()
162+
})
163+
if err != nil {
164+
t.Fatalf("Unexpected error: %v", err)
165+
}
166+
time.Sleep(500 * time.Millisecond)
167+
168+
name := c.CachedInfo().Name
169+
if err := s.DeleteConsumer(ctx, name); err != nil {
170+
t.Fatal(err)
171+
}
172+
// should receive messages with sequences 3, 4 and 5
173+
wg.Add(len(testMsgs) - 2)
174+
publishTestMsgs(t, nc)
175+
wg.Wait()
176+
177+
// now delete consumer again and publish some more messages, all should be received normally
178+
name = c.CachedInfo().Name
179+
if err := s.DeleteConsumer(ctx, name); err != nil {
180+
t.Fatal(err)
181+
}
182+
wg.Add(len(testMsgs))
183+
publishTestMsgs(t, nc)
184+
wg.Wait()
185+
l.Stop()
186+
})
187+
89188
t.Run("base usage, server shutdown", func(t *testing.T) {
90189
srv := RunBasicJetStreamServer()
91190
defer shutdownJSServerAndRemoveStorage(t, srv)

0 commit comments

Comments
 (0)