Skip to content

Commit 18e6d99

Browse files
committed
fix data race in invocationID and attempts and unit tests
1 parent c50df37 commit 18e6d99

File tree

2 files changed

+28
-11
lines changed

2 files changed

+28
-11
lines changed

internal/gensupport/resumable.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (rx *ResumableUpload) Progress() int64 {
6464
// off specifies the offset in rx.Media from which data is drawn.
6565
// size is the number of bytes in data.
6666
// final specifies whether data is the final chunk to be uploaded.
67-
func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
67+
func (rx *ResumableUpload) doUploadRequest(ctx context.Context, invocationID string, attempts int, data io.Reader, off, size int64, final bool) (*http.Response, error) {
6868
req, err := http.NewRequest("POST", rx.URI, data)
6969
if err != nil {
7070
return nil, err
@@ -88,11 +88,11 @@ func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader,
8888
// TODO(b/274504690): Consider dropping gccl-invocation-id key since it
8989
// duplicates the X-Goog-Gcs-Idempotency-Token header (added in v0.115.0).
9090
baseXGoogHeader := "gl-go/" + GoVersion() + " gdcl/" + internal.Version
91-
invocationHeader := fmt.Sprintf("gccl-invocation-id/%s gccl-attempt-count/%d", rx.invocationID, rx.attempts)
91+
invocationHeader := fmt.Sprintf("gccl-invocation-id/%s gccl-attempt-count/%d", invocationID, attempts)
9292
req.Header.Set("X-Goog-Api-Client", strings.Join([]string{baseXGoogHeader, invocationHeader}, " "))
9393

9494
// Set idempotency token header which is used by GCS uploads.
95-
req.Header.Set("X-Goog-Gcs-Idempotency-Token", rx.invocationID)
95+
req.Header.Set("X-Goog-Gcs-Idempotency-Token", invocationID)
9696

9797
// Google's upload endpoint uses status code 308 for a
9898
// different purpose than the "308 Permanent Redirect"
@@ -127,14 +127,27 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
127127
}
128128

129129
// transferChunk performs a single HTTP request to upload a single chunk.
130-
// It uses a goroutine to perform the upload and a timer to enforce ChunkTransferTimeout or ChunkRetryDeadline.
131-
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunkRetryDeadline time.Duration, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
132-
if rx.ChunkTransferTimeout != 0 {
133-
chunkRetryDeadline = rx.ChunkTransferTimeout
130+
// It uses a goroutine to perform the upload and a timer to enforce ChunkTransferTimeout.
131+
func (rx *ResumableUpload) transferChunk(ctx context.Context, invocationID string, attempts int, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
132+
// If no timeout is specified, perform the request synchronously without a timer.
133+
if rx.ChunkTransferTimeout == 0 {
134+
res, err := rx.doUploadRequest(ctx, invocationID, attempts, chunk, off, size, done)
135+
if err != nil {
136+
return res, err
137+
}
138+
// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
139+
// this file), so we don't expect to get a 308.
140+
if res.StatusCode == 308 {
141+
return nil, errors.New("unexpected 308 response status code")
142+
}
143+
if res.StatusCode == http.StatusOK {
144+
rx.reportProgress(off, off+int64(size))
145+
}
146+
return res, nil
134147
}
135148

136-
// Start a timer for the configured duration.
137-
timer := time.NewTimer(chunkRetryDeadline)
149+
// Start a timer for the ChunkTransferTimeout duration.
150+
timer := time.NewTimer(rx.ChunkTransferTimeout)
138151

139152
// A struct to hold the result from the goroutine.
140153
type uploadResult struct {
@@ -154,7 +167,7 @@ func (rx *ResumableUpload) transferChunk(ctx context.Context, chunkRetryDeadline
154167

155168
// Starting the chunk upload in parallel.
156169
go func() {
157-
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
170+
res, err := rx.doUploadRequest(rCtx, invocationID, attempts, chunk, off, size, done)
158171
resultCh <- uploadResult{res: res, err: err}
159172
}()
160173

@@ -267,7 +280,7 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
267280
io.Copy(io.Discard, resp.Body)
268281
resp.Body.Close()
269282
}
270-
resp, err = rx.transferChunk(ctx, chunkRetryDeadline, chunk, off, size, done)
283+
resp, err = rx.transferChunk(ctx, rx.invocationID, rx.attempts, chunk, off, size, done)
271284
status := 0
272285
if resp != nil {
273286
status = resp.StatusCode

internal/gensupport/resumable_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"net/http"
1313
"reflect"
1414
"strings"
15+
"sync"
1516
"testing"
1617
"time"
1718
)
@@ -36,6 +37,7 @@ type event struct {
3637
// It records the incoming data, unless the corresponding event is configured to return
3738
// http.StatusServiceUnavailable.
3839
type interruptibleTransport struct {
40+
mu sync.Mutex
3941
events []event
4042
buf []byte
4143
bodies bodyTracker
@@ -67,6 +69,8 @@ func (tc *trackingCloser) Open() {
6769
}
6870

6971
func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
72+
t.mu.Lock()
73+
defer t.mu.Unlock()
7074
if len(t.events) == 0 {
7175
panic("ran out of events, but got a request")
7276
}

0 commit comments

Comments
 (0)