Skip to content

Commit 2eb23bb

Browse files
authored
fix(storage): fix MultiRangeDownloader deadlock (#12548)
This fixes some potential deadlocks in MultiRangeDownloader which were caused by not releasing a lock on early return, or holding it when it wasn't needed (when sending on a chan).
1 parent 5ef6068 commit 2eb23bb

File tree

3 files changed

+73
-30
lines changed

3 files changed

+73
-30
lines changed

storage/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,8 +1564,8 @@ func TestMultiRangeDownloaderEmulated(t *testing.T) {
15641564
reader.Wait()
15651565
for _, k := range res {
15661566
if !bytes.Equal(k.buf.Bytes(), content[k.offset:k.offset+k.limit]) {
1567-
t.Errorf("Error in read range offset %v, limit %v, got: %v; want: %v",
1568-
k.offset, k.limit, k.buf.Bytes(), content[k.offset:k.offset+k.limit])
1567+
t.Errorf("Error in read range offset %v, limit %v, got: %v bytes; want: %v bytes",
1568+
k.offset, k.limit, len(k.buf.Bytes()), len(content[k.offset:k.offset+k.limit]))
15691569
}
15701570
if k.err != nil {
15711571
t.Errorf("read range %v to %v : %v", k.offset, k.limit, k.err)

storage/grpc_client.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,6 +1269,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12691269
if err == nil {
12701270
mrd.mu.Lock()
12711271
if len(mrd.activeRanges) == 0 && mrd.numActiveRanges == 0 {
1272+
mrd.mu.Unlock()
12721273
mrd.closeReceiver <- true
12731274
mrd.closeSender <- true
12741275
return
@@ -1277,34 +1278,37 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12771278
arr := resp.GetObjectDataRanges()
12781279
for _, val := range arr {
12791280
id := val.GetReadRange().GetReadId()
1280-
mrd.mu.Lock()
1281-
_, ok := mrd.activeRanges[id]
1282-
if !ok {
1283-
// it's ok to ignore responses for read_id not in map as user would have been notified by callback.
1284-
continue
1285-
}
1286-
_, err = mrd.activeRanges[id].writer.Write(val.GetChecksummedData().GetContent())
1287-
if err != nil {
1288-
mrd.activeRanges[id].callback(mrd.activeRanges[id].offset, mrd.activeRanges[id].totalBytesWritten, err)
1289-
mrd.numActiveRanges--
1290-
delete(mrd.activeRanges, id)
1291-
} else {
1292-
mrd.activeRanges[id] = mrdRange{
1293-
readID: mrd.activeRanges[id].readID,
1294-
writer: mrd.activeRanges[id].writer,
1295-
offset: mrd.activeRanges[id].offset,
1296-
limit: mrd.activeRanges[id].limit,
1297-
currentBytesWritten: mrd.activeRanges[id].currentBytesWritten + int64(len(val.GetChecksummedData().GetContent())),
1298-
totalBytesWritten: mrd.activeRanges[id].totalBytesWritten + int64(len(val.GetChecksummedData().GetContent())),
1299-
callback: mrd.activeRanges[id].callback,
1281+
func() {
1282+
mrd.mu.Lock()
1283+
defer mrd.mu.Unlock()
1284+
currRange, ok := mrd.activeRanges[id]
1285+
if !ok {
1286+
// it's ok to ignore responses for read_id not in map as user would have been notified by callback.
1287+
return
13001288
}
1301-
}
1302-
if val.GetRangeEnd() {
1303-
mrd.activeRanges[id].callback(mrd.activeRanges[id].offset, mrd.activeRanges[id].totalBytesWritten, nil)
1304-
mrd.numActiveRanges--
1305-
delete(mrd.activeRanges, id)
1306-
}
1307-
mrd.mu.Unlock()
1289+
_, err = currRange.writer.Write(val.GetChecksummedData().GetContent())
1290+
if err != nil {
1291+
currRange.callback(currRange.offset, currRange.totalBytesWritten, err)
1292+
mrd.numActiveRanges--
1293+
delete(mrd.activeRanges, id)
1294+
} else {
1295+
currRange = mrdRange{
1296+
readID: currRange.readID,
1297+
writer: currRange.writer,
1298+
offset: currRange.offset,
1299+
limit: currRange.limit,
1300+
currentBytesWritten: currRange.currentBytesWritten + int64(len(val.GetChecksummedData().GetContent())),
1301+
totalBytesWritten: currRange.totalBytesWritten + int64(len(val.GetChecksummedData().GetContent())),
1302+
callback: currRange.callback,
1303+
}
1304+
mrd.activeRanges[id] = currRange
1305+
}
1306+
if val.GetRangeEnd() {
1307+
currRange.callback(currRange.offset, currRange.totalBytesWritten, nil)
1308+
mrd.numActiveRanges--
1309+
delete(mrd.activeRanges, id)
1310+
}
1311+
}()
13081312
}
13091313
}
13101314
}
@@ -1459,8 +1463,8 @@ func (mrd *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback f
14591463
spec := mrdRange{readID: id, writer: output, offset: offset, limit: limit, currentBytesWritten: 0, totalBytesWritten: 0, callback: callback}
14601464
mrd.mu.Lock()
14611465
mrd.numActiveRanges++
1462-
mrd.rangesToRead <- []mrdRange{spec}
14631466
mrd.mu.Unlock()
1467+
mrd.rangesToRead <- []mrdRange{spec}
14641468
} else {
14651469
callback(offset, 0, errors.New("storage: cannot add range because the stream is closed"))
14661470
}

storage/integration_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,45 @@ func TestIntegration_MultiRangeDownloader(t *testing.T) {
416416
})
417417
}
418418

419+
// Test many concurrent reads on the same MultiRangeDownloader to try to detect
420+
// potential deadlocks.
421+
func TestIntegration_MRDManyReads(t *testing.T) {
422+
multiTransportTest(skipAllButBidi(context.Background(), "Bidi Read API test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
423+
content := make([]byte, 5<<20)
424+
rand.New(rand.NewSource(0)).Read(content)
425+
objName := "MultiRangeDownloaderManyReads"
426+
// Upload test data.
427+
obj := client.Bucket(bucket).Object(objName)
428+
if err := writeObject(ctx, obj, "text/plain", content); err != nil {
429+
t.Fatal(err)
430+
}
431+
defer func() {
432+
if err := obj.Delete(ctx); err != nil {
433+
log.Printf("failed to delete test object: %v", err)
434+
}
435+
}()
436+
reader, err := obj.NewMultiRangeDownloader(ctx)
437+
if err != nil {
438+
t.Fatalf("NewMultiRangeDownloader: %v", err)
439+
}
440+
// Previously 100 ranges here worked in a few seconds, but 1000 caused
441+
// a deadlock. After this PR, 1000 also works in under 10s.
442+
// 1000 is larger than the size of the buffer for the new ranges
443+
// to be added.
444+
for i := 0; i < 1000; i++ {
445+
reader.Add(io.Discard, 0, 100, func(_ int64, _ int64, err error) {
446+
if err != nil {
447+
t.Errorf("error in call %v: %v", i, err)
448+
}
449+
})
450+
}
451+
reader.Wait()
452+
if err = reader.Close(); err != nil {
453+
t.Fatalf("Error while closing reader %v", err)
454+
}
455+
})
456+
}
457+
419458
// TestIntegration_MRDCallbackReturnsDataLength tests if the callback returns the correct data
420459
// read length or not.
421460
func TestIntegration_MRDCallbackReturnsDataLength(t *testing.T) {

0 commit comments

Comments
 (0)