Skip to content

Commit 2bc58f0

Browse files
committed
fix(storage): Make Writer thread-safe.
storage.Writer took an assumption that CloseWithError() could be called more than once, and was thread-safe with respect to concurrent Write(), Flush(), and Close() calls. This was not honored in the refactor in #12422. Modify Writer so that it is thread-safe to provide these behaviors, and support repeated Close() and CloseWithError() calls. To address this, we start the sender goroutine earlier, and gather the first buffer in that goroutine. It's possible that some workloads which gather less than one buffer worth of data with a sequence of small writes will observe a performance hit here, since those writes used to be direct copies but will now be a channel ping-pong. If that's an issue, it could be improved by wrapping the buffer in a mutex and doing more explicit concurrency control.
1 parent ce9d29b commit 2bc58f0

File tree

2 files changed

+268
-131
lines changed

2 files changed

+268
-131
lines changed

storage/client_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,6 +1490,115 @@ func TestWriterSmallFlushEmulated(t *testing.T) {
14901490
})
14911491
}
14921492

1493+
func TestWriterAsyncCancelEmulated(t *testing.T) {
1494+
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1495+
// Create test bucket.
1496+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{
1497+
Name: bucket,
1498+
}, nil)
1499+
if err != nil {
1500+
t.Fatalf("client.CreateBucket: %v", err)
1501+
}
1502+
objName := fmt.Sprintf("object-%d", time.Now().Nanosecond())
1503+
1504+
vc := &Client{tc: client}
1505+
1506+
ctx, cancel := context.WithCancel(ctx)
1507+
defer cancel()
1508+
1509+
w := vc.Bucket(bucket).Object(objName).NewWriter(ctx)
1510+
defer w.Close()
1511+
1512+
if _, err := w.Write(randomBytes3MiB); err != nil {
1513+
t.Fatalf("first w.Write(): %v", err)
1514+
}
1515+
// Cancel concurrently with an additional write
1516+
go cancel()
1517+
// We don't actually care if this succeeds or fails - it will blow up under
1518+
// the race detector if writes and cancels are not thread-safe.
1519+
w.Write(randomBytes3MiB)
1520+
// Make sure the cancel got processed.
1521+
<-ctx.Done()
1522+
1523+
// The writer close should observe the cancelled error.
1524+
if err := w.Close(); err != context.Canceled {
1525+
t.Errorf("w.Close(): got %v, want %v", err, context.Canceled)
1526+
}
1527+
})
1528+
}
1529+
1530+
func TestWriterCloseTwiceEmulated(t *testing.T) {
1531+
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1532+
// Create test bucket.
1533+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{
1534+
Name: bucket,
1535+
}, nil)
1536+
if err != nil {
1537+
t.Fatalf("client.CreateBucket: %v", err)
1538+
}
1539+
objName := fmt.Sprintf("object-%d", time.Now().Nanosecond())
1540+
1541+
vc := &Client{tc: client}
1542+
obj := vc.Bucket(bucket).Object(objName)
1543+
w := obj.NewWriter(ctx)
1544+
if err := w.Close(); err != nil {
1545+
t.Fatalf("closing writer: %v", err)
1546+
}
1547+
// Closing a writer twice is allowed!
1548+
if err := w.Close(); err != nil {
1549+
t.Fatalf("closing writer: %v", err)
1550+
}
1551+
1552+
// The object is present with 0 contents.
1553+
attrs, err := obj.Attrs(ctx)
1554+
if err != nil {
1555+
t.Fatalf("obj.Attrs: %v", err)
1556+
}
1557+
if attrs.Size != 0 {
1558+
t.Errorf("incorrect object size; got %v, want 0", attrs.Size)
1559+
}
1560+
})
1561+
}
1562+
1563+
func TestWriterCloseWithErrorTwiceEmulated(t *testing.T) {
1564+
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1565+
// Create test bucket.
1566+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{
1567+
Name: bucket,
1568+
}, nil)
1569+
if err != nil {
1570+
t.Fatalf("client.CreateBucket: %v", err)
1571+
}
1572+
objName := fmt.Sprintf("object-%d", time.Now().Nanosecond())
1573+
1574+
vc := &Client{tc: client}
1575+
obj := vc.Bucket(bucket).Object(objName)
1576+
w := obj.NewWriter(ctx)
1577+
if _, err := w.Write(randomBytes3MiB); err != nil {
1578+
t.Errorf("w.Write(): %v", err)
1579+
}
1580+
1581+
errOne := errors.New("the first error")
1582+
if err := w.CloseWithError(errOne); err != nil {
1583+
// CloseWithError always returns nil
1584+
t.Fatalf("w.CloseWithError(errOne): %v", err)
1585+
}
1586+
if err := w.Close(); err != errOne {
1587+
t.Errorf("first w.Close(); got %v, want %v", err, errOne)
1588+
}
1589+
1590+
errTwo := errors.New("the second error")
1591+
if err := w.CloseWithError(errTwo); err != nil {
1592+
// CloseWithError always returns nil
1593+
t.Fatalf("w.CloseWithError(errOne): %v", err)
1594+
}
1595+
// The error is _not_ replaced by a subsequent call.
1596+
if err := w.Close(); err != errOne {
1597+
t.Errorf("second w.Close(); got %v, want %v", err, errTwo)
1598+
}
1599+
})
1600+
}
1601+
14931602
// customObjSizeReadStream intercepts BidiReadObjectResponse messages and
14941603
// changes the object size in the BidiReadObjectResponse.Metadata to
14951604
// customRecvSize.

0 commit comments

Comments
 (0)