Skip to content

Commit 1deccaf

Browse files
authored
[FIXED] Set OptStartSeq correctly in OrderedConsumerConfig (#1644)
Signed-off-by: Piotr Piotrowski <[email protected]>
1 parent 69922ce commit 1deccaf

File tree

4 files changed

+141
-2
lines changed

4 files changed

+141
-2
lines changed

jetstream/jetstream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
740740
doReset: make(chan struct{}, 1),
741741
}
742742
if cfg.OptStartSeq != 0 {
743-
oc.cursor.streamSeq = cfg.OptStartSeq - 1
743+
oc.cursor.streamSeq = cfg.OptStartSeq
744744
}
745745
err := oc.reset()
746746
if err != nil {

jetstream/ordered.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ func (c *orderedConsumer) reset() error {
487487
break
488488
}
489489
}
490+
490491
seq := c.cursor.streamSeq + 1
491492
c.cursor.deliverSeq = 0
492493
consumerConfig := c.getConsumerConfigForSeq(seq)
@@ -549,6 +550,8 @@ func (c *orderedConsumer) getConsumerConfigForSeq(seq uint64) *ConsumerConfig {
549550
c.cfg.DeliverPolicy == DeliverAllPolicy {
550551

551552
cfg.OptStartSeq = 0
553+
} else {
554+
cfg.OptStartSeq = c.cfg.OptStartSeq
552555
}
553556

554557
if cfg.DeliverPolicy == DeliverLastPerSubjectPolicy && len(c.cfg.FilterSubjects) == 0 {

jetstream/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig)
274274
doReset: make(chan struct{}, 1),
275275
}
276276
if cfg.OptStartSeq != 0 {
277-
oc.cursor.streamSeq = cfg.OptStartSeq - 1
277+
oc.cursor.streamSeq = cfg.OptStartSeq
278278
}
279279
err := oc.reset()
280280
if err != nil {

jetstream/test/ordered_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"errors"
1919
"fmt"
20+
"reflect"
2021
"sync"
2122
"testing"
2223
"time"
@@ -1548,3 +1549,138 @@ func TestOrderedConsumerNextOrder(t *testing.T) {
15481549
}
15491550
}
15501551
}
1552+
1553+
func TestOrderedConsumerConfig(t *testing.T) {
1554+
srv := RunBasicJetStreamServer()
1555+
defer shutdownJSServerAndRemoveStorage(t, srv)
1556+
nc, err := nats.Connect(srv.ClientURL())
1557+
if err != nil {
1558+
t.Fatalf("Unexpected error: %v", err)
1559+
}
1560+
1561+
js, err := jetstream.New(nc)
1562+
if err != nil {
1563+
t.Fatalf("Unexpected error: %v", err)
1564+
}
1565+
defer nc.Close()
1566+
1567+
s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1568+
if err != nil {
1569+
t.Fatalf("Unexpected error: %v", err)
1570+
}
1571+
1572+
tests := []struct {
1573+
name string
1574+
config jetstream.OrderedConsumerConfig
1575+
expected jetstream.ConsumerConfig
1576+
}{
1577+
{
1578+
name: "default config",
1579+
config: jetstream.OrderedConsumerConfig{},
1580+
expected: jetstream.ConsumerConfig{
1581+
DeliverPolicy: jetstream.DeliverAllPolicy,
1582+
AckPolicy: jetstream.AckNonePolicy,
1583+
MaxDeliver: -1,
1584+
MaxWaiting: 512,
1585+
InactiveThreshold: 5 * time.Minute,
1586+
Replicas: 1,
1587+
MemoryStorage: true,
1588+
},
1589+
},
1590+
{
1591+
name: "custom inactive threshold",
1592+
config: jetstream.OrderedConsumerConfig{
1593+
InactiveThreshold: 10 * time.Second,
1594+
},
1595+
expected: jetstream.ConsumerConfig{
1596+
DeliverPolicy: jetstream.DeliverAllPolicy,
1597+
AckPolicy: jetstream.AckNonePolicy,
1598+
MaxDeliver: -1,
1599+
MaxWaiting: 512,
1600+
InactiveThreshold: 10 * time.Second,
1601+
Replicas: 1,
1602+
MemoryStorage: true,
1603+
},
1604+
},
1605+
{
1606+
name: "custom opt start seq and inactive threshold",
1607+
config: jetstream.OrderedConsumerConfig{
1608+
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
1609+
OptStartSeq: 10,
1610+
InactiveThreshold: 10 * time.Second,
1611+
},
1612+
expected: jetstream.ConsumerConfig{
1613+
OptStartSeq: 10,
1614+
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
1615+
AckPolicy: jetstream.AckNonePolicy,
1616+
MaxDeliver: -1,
1617+
MaxWaiting: 512,
1618+
InactiveThreshold: 10 * time.Second,
1619+
Replicas: 1,
1620+
MemoryStorage: true,
1621+
},
1622+
},
1623+
{
1624+
name: "all fields customized, start with custom seq",
1625+
config: jetstream.OrderedConsumerConfig{
1626+
FilterSubjects: []string{"foo.a", "foo.b"},
1627+
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
1628+
OptStartSeq: 10,
1629+
ReplayPolicy: jetstream.ReplayOriginalPolicy,
1630+
InactiveThreshold: 10 * time.Second,
1631+
HeadersOnly: true,
1632+
},
1633+
expected: jetstream.ConsumerConfig{
1634+
FilterSubjects: []string{"foo.a", "foo.b"},
1635+
OptStartSeq: 10,
1636+
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
1637+
AckPolicy: jetstream.AckNonePolicy,
1638+
MaxDeliver: -1,
1639+
MaxWaiting: 512,
1640+
InactiveThreshold: 10 * time.Second,
1641+
Replicas: 1,
1642+
MemoryStorage: true,
1643+
HeadersOnly: true,
1644+
},
1645+
},
1646+
{
1647+
name: "all fields customized, start with custom time",
1648+
config: jetstream.OrderedConsumerConfig{
1649+
FilterSubjects: []string{"foo.a", "foo.b"},
1650+
DeliverPolicy: jetstream.DeliverByStartTimePolicy,
1651+
OptStartTime: &time.Time{},
1652+
ReplayPolicy: jetstream.ReplayOriginalPolicy,
1653+
InactiveThreshold: 10 * time.Second,
1654+
HeadersOnly: true,
1655+
},
1656+
expected: jetstream.ConsumerConfig{
1657+
FilterSubjects: []string{"foo.a", "foo.b"},
1658+
OptStartTime: &time.Time{},
1659+
DeliverPolicy: jetstream.DeliverByStartTimePolicy,
1660+
AckPolicy: jetstream.AckNonePolicy,
1661+
MaxDeliver: -1,
1662+
MaxWaiting: 512,
1663+
InactiveThreshold: 10 * time.Second,
1664+
Replicas: 1,
1665+
MemoryStorage: true,
1666+
HeadersOnly: true,
1667+
},
1668+
},
1669+
}
1670+
1671+
for _, test := range tests {
1672+
t.Run(test.name, func(t *testing.T) {
1673+
c, err := s.OrderedConsumer(context.Background(), test.config)
1674+
if err != nil {
1675+
t.Fatalf("Unexpected error: %v", err)
1676+
}
1677+
1678+
cfg := c.CachedInfo().Config
1679+
test.expected.Name = cfg.Name
1680+
1681+
if !reflect.DeepEqual(test.expected, cfg) {
1682+
t.Fatalf("Expected config %+v, got %+v", test.expected, cfg)
1683+
}
1684+
})
1685+
}
1686+
}

0 commit comments

Comments
 (0)