Skip to content

Commit be35740

Browse files
Gleiphir2769shenjiaqi.2769
andauthored
[fix] [issue 877] Fix ctx in partitionProducer.Send() is not performing as expected (#1053)
Fixes #877 ### Motivation The original PR is #878. Because the original author @billowqiu has not continued to reply to the review comments for a long time, resubmit the fix here. ### Modifications - Add select for ctx and doneCh in partitionProducer.Send() --------- Co-authored-by: shenjiaqi.2769 <[email protected]>
1 parent dd920ef commit be35740

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

pulsar/producer_partition.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,9 +1095,13 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
10951095
}, true)
10961096

10971097
// wait for send request to finish
1098-
<-doneCh
1099-
1100-
return msgID, err
1098+
select {
1099+
case <-ctx.Done():
1100+
return nil, ctx.Err()
1101+
case <-doneCh:
1102+
// send request has been finished
1103+
return msgID, err
1104+
}
11011105
}
11021106

11031107
func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

pulsar/producer_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,3 +2024,29 @@ func testSendMessagesWithMetadata(t *testing.T, disableBatch bool) {
20242024
assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey())
20252025
assert.Equal(t, msg.Properties, recvMsg.Properties())
20262026
}
2027+
2028+
func TestProducerSendWithContext(t *testing.T) {
2029+
client, err := NewClient(ClientOptions{
2030+
URL: lookupURL,
2031+
})
2032+
assert.NoError(t, err)
2033+
defer client.Close()
2034+
2035+
topicName := newTopicName()
2036+
// create producer
2037+
producer, err := client.CreateProducer(ProducerOptions{
2038+
Topic: topicName,
2039+
DisableBatching: true,
2040+
})
2041+
assert.Nil(t, err)
2042+
defer producer.Close()
2043+
2044+
ctx, cancel := context.WithCancel(context.Background())
2045+
// Make ctx be canceled to invalidate the context immediately
2046+
cancel()
2047+
_, err = producer.Send(ctx, &ProducerMessage{
2048+
Payload: make([]byte, 1024*1024),
2049+
})
2050+
// producer.Send should fail and return err context.Canceled
2051+
assert.True(t, errors.Is(err, context.Canceled))
2052+
}

0 commit comments

Comments
 (0)