Skip to content

Commit bd5e740

Browse files
committed
Reader Next returns on closed consumer
Signed-off-by: Gaylor Bosson <[email protected]>
1 parent 8e90873 commit bd5e740

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

pulsar/reader_impl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ func (r *reader) Next(ctx context.Context) (Message, error) {
171171
return nil, err
172172
}
173173
return cm.Message, nil
174+
case <-r.c.closeCh:
175+
return nil, newError(ConsumerClosed, "consumer closed")
174176
case <-ctx.Done():
175177
return nil, ctx.Err()
176178
}

pulsar/reader_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,3 +1035,27 @@ func TestReaderHasNextRetryFailed(t *testing.T) {
10351035
}
10361036

10371037
}
1038+
1039+
func TestReaderNextReturnsOnClosedConsumer(t *testing.T) {
1040+
client, err := NewClient(ClientOptions{
1041+
URL: serviceURL,
1042+
OperationTimeout: 2 * time.Second,
1043+
})
1044+
assert.NoError(t, err)
1045+
topic := newTopicName()
1046+
reader, err := client.CreateReader(ReaderOptions{
1047+
Topic: topic,
1048+
StartMessageID: EarliestMessageID(),
1049+
})
1050+
assert.Nil(t, err)
1051+
1052+
reader.Close()
1053+
1054+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1055+
defer cancel()
1056+
1057+
var e *Error
1058+
_, err = reader.Next(ctx)
1059+
assert.ErrorAs(t, err, &e)
1060+
assert.Equal(t, ConsumerClosed, e.Result())
1061+
}

0 commit comments

Comments
 (0)