Skip to content

Commit 2c22ed7

Browse files
return queue is full error if sized_channel is full (#11063)
#### Description This change fixes a potential deadlock bug for persistent queue. There is a race condition in persistent queue that caused `used` in `sizedChannel` to become out of sync with `ch` len. This causes `Offer` to be deadlocked in specific race condition. For example: 1. Multiple consumers are calling Consume 2. Multiple producers are calling Offer to insert into the queue a. All elements are taken from consumers. ch is empty 3. One consumer completes consume, calls onProcessingFinished a. Inside sizedChannel, syncSize is invoked, used is reset to 0 when other consumers are still waiting for lock to consume 4. More Offer is called inserting elements -> used and ch len should equal 5. As step 3a consumers completes, used is decreased -> used is lower than ch len a. More Offer is called inserting since used is below capacity. however, ch is full. b. goroutine calling offer is holding the mutex but can’t release it as ch is full. c. no consumer can acquire mutex to complete previous onProcessingFinished This change returns an error if channel is full instead of waiting for it to unblock. #### Link to tracking issue Fixes # #11015 #### Testing - Added concurrent test in persistent queue that can reproduce the problem(note: need to re-run it 100 times as the race condition is not consistent). - Added unit test for sizedChannel #### Documentation Added comment in the block explaining it --------- Co-authored-by: Dmitrii Anoshin <[email protected]>
1 parent e2aaa77 commit 2c22ed7

File tree

5 files changed

+127
-6
lines changed

5 files changed

+127
-6
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'bug_fix'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: 'exporterqueue'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix a bug in persistent queue that Offer can becomes deadlocked when queue is almost full
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [11015]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []

exporter/internal/queue/mock_storage.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111
"sync/atomic"
1212
"syscall"
13+
"time"
1314

1415
"go.opentelemetry.io/collector/component"
1516
"go.opentelemetry.io/collector/extension/experimental/storage"
@@ -20,22 +21,31 @@ type mockStorageExtension struct {
2021
component.ShutdownFunc
2122
st sync.Map
2223
getClientError error
24+
executionDelay time.Duration
2325
}
2426

2527
func (m *mockStorageExtension) GetClient(context.Context, component.Kind, component.ID, string) (storage.Client, error) {
2628
if m.getClientError != nil {
2729
return nil, m.getClientError
2830
}
29-
return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}}, nil
31+
return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}, executionDelay: m.executionDelay}, nil
3032
}
3133

3234
func NewMockStorageExtension(getClientError error) storage.Extension {
33-
return &mockStorageExtension{getClientError: getClientError}
35+
return NewMockStorageExtensionWithDelay(getClientError, 0)
36+
}
37+
38+
func NewMockStorageExtensionWithDelay(getClientError error, executionDelay time.Duration) storage.Extension {
39+
return &mockStorageExtension{
40+
getClientError: getClientError,
41+
executionDelay: executionDelay,
42+
}
3443
}
3544

3645
type mockStorageClient struct {
37-
st *sync.Map
38-
closed *atomic.Bool
46+
st *sync.Map
47+
closed *atomic.Bool
48+
executionDelay time.Duration // simulate real storage client delay
3949
}
4050

4151
func (m *mockStorageClient) Get(ctx context.Context, s string) ([]byte, error) {
@@ -61,6 +71,9 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e
6171
if m.isClosed() {
6272
panic("client already closed")
6373
}
74+
if m.executionDelay != 0 {
75+
time.Sleep(m.executionDelay)
76+
}
6477
for _, op := range ops {
6578
switch op.Type {
6679
case storage.Get:

exporter/internal/queue/persistent_queue_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"strconv"
11+
"sync"
1112
"sync/atomic"
1213
"syscall"
1314
"testing"
@@ -531,6 +532,70 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) {
531532
require.Equal(t, 6, newPs.Size())
532533
}
533534

535+
func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) {
536+
req := newTracesRequest(1, 1)
537+
538+
ext := NewMockStorageExtensionWithDelay(nil, 20*time.Nanosecond)
539+
pq := createTestPersistentQueueWithItemsCapacity(t, ext, 25)
540+
541+
proWg := sync.WaitGroup{}
542+
// Sending small amount of data as windows test can't handle the test fast enough
543+
for j := 0; j < 5; j++ {
544+
proWg.Add(1)
545+
go func() {
546+
defer proWg.Done()
547+
// Put in items up to capacity
548+
for i := 0; i < 10; i++ {
549+
for {
550+
// retry infinitely so the exact amount of items are added to the queue eventually
551+
if err := pq.Offer(context.Background(), req); err == nil {
552+
break
553+
}
554+
time.Sleep(50 * time.Nanosecond)
555+
}
556+
}
557+
}()
558+
}
559+
560+
conWg := sync.WaitGroup{}
561+
for j := 0; j < 5; j++ {
562+
conWg.Add(1)
563+
go func() {
564+
defer conWg.Done()
565+
for i := 0; i < 10; i++ {
566+
require.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil }))
567+
}
568+
}()
569+
}
570+
571+
conDone := make(chan struct{})
572+
go func() {
573+
defer close(conDone)
574+
conWg.Wait()
575+
}()
576+
577+
proDone := make(chan struct{})
578+
go func() {
579+
defer close(proDone)
580+
proWg.Wait()
581+
}()
582+
583+
doneCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
584+
defer cancel()
585+
select {
586+
case <-conDone:
587+
case <-doneCtx.Done():
588+
assert.Fail(t, "timed out waiting for consumers to complete")
589+
}
590+
591+
select {
592+
case <-proDone:
593+
case <-doneCtx.Done():
594+
assert.Fail(t, "timed out waiting for producers to complete")
595+
}
596+
assert.Zero(t, pq.sizedChannel.Size())
597+
}
598+
534599
func TestPersistentQueue_PutCloseReadClose(t *testing.T) {
535600
req := newTracesRequest(5, 10)
536601
ext := NewMockStorageExtension(nil)

exporter/internal/queue/sized_channel.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,16 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error
5555
return err
5656
}
5757
}
58-
vcq.ch <- el
59-
return nil
58+
59+
select {
60+
// for persistent queue implementation, channel len can be out of sync with used size. Attempt to put it
61+
// into the channel. If it is full, simply returns ErrQueueIsFull error. This prevents potential deadlock issues.
62+
case vcq.ch <- el:
63+
return nil
64+
default:
65+
vcq.used.Add(-size)
66+
return ErrQueueIsFull
67+
}
6068
}
6169

6270
// pop removes the element from the queue and returns it.

exporter/internal/queue/sized_channel_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,13 @@ func TestSizedCapacityChannel(t *testing.T) {
4242
assert.False(t, ok)
4343
assert.Equal(t, 0, el)
4444
}
45+
46+
func TestSizedCapacityChannel_Offer_sizedNotFullButChannelFull(t *testing.T) {
47+
q := newSizedChannel[int](1, nil, 0)
48+
assert.NoError(t, q.push(1, 1, nil))
49+
50+
q.used.Store(0)
51+
err := q.push(1, 1, nil)
52+
assert.Error(t, err)
53+
assert.Equal(t, ErrQueueIsFull, err)
54+
}

0 commit comments

Comments
 (0)