Skip to content

Commit 091d422

Browse files
authored
fix(gensupport): fix transferChunk race condition by returning response with non-cancelled context. (#3258)
The previous implementation used context.WithTimeout to manage timeouts for each chunk upload and cancelled the upload when the response was received. This introduced a race condition when the response was not yet processed by caller and context was cancelled. This fix removes the race condition by doing following things: - Starts a timer and the upload request in a separate goroutine. - Uses a select statement to wait for either the upload to complete, the timer to fire, or the context to be cancelled. - Explicitly cancels the upload request if the timer fires first, preventing the race condition by not cancelling the context when response is succeess (which waits for caller to process the body) and ensuring resources are cleaned up correctly. - Renamed two variable to make code more readable. Internal Bug: 435359905
1 parent bf38d3a commit 091d422

File tree

2 files changed

+73
-32
lines changed

2 files changed

+73
-32
lines changed

internal/gensupport/resumable.go

Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -127,34 +127,65 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
127127
}
128128

129129
// transferChunk performs a single HTTP request to upload a single chunk.
130+
// It uses a goroutine to perform the upload and a timer to enforce ChunkTransferTimeout.
130131
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
131-
// rCtx is derived from a context with a defined ChunkTransferTimeout with non-zero value.
132-
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
133-
// triggering a retry of the request.
134-
var rCtx context.Context
135-
var cancel context.CancelFunc
136-
137-
rCtx = ctx
138-
if rx.ChunkTransferTimeout != 0 {
139-
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
140-
defer cancel()
132+
// If no timeout is specified, perform the request synchronously without a timer.
133+
if rx.ChunkTransferTimeout == 0 {
134+
res, err := rx.doUploadRequest(ctx, chunk, off, size, done)
135+
if err != nil {
136+
return res, err
137+
}
138+
return res, nil
141139
}
142140

143-
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
144-
if err != nil {
145-
return res, err
146-
}
141+
// Start a timer for the ChunkTransferTimeout duration.
142+
timer := time.NewTimer(rx.ChunkTransferTimeout)
147143

148-
// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
149-
// this file), so we don't expect to get a 308.
150-
if res.StatusCode == 308 {
151-
return nil, errors.New("unexpected 308 response status code")
144+
// A struct to hold the result from the goroutine.
145+
type uploadResult struct {
146+
res *http.Response
147+
err error
152148
}
153149

154-
if res.StatusCode == http.StatusOK {
155-
rx.reportProgress(off, off+int64(size))
150+
// A buffered channel to receive the result of the upload.
151+
resultCh := make(chan uploadResult, 1)
152+
153+
// Create a cancellable context for the upload request. This allows us to
154+
// abort the request if the timer fires first.
155+
rCtx, cancel := context.WithCancel(ctx)
156+
// NOTE: We do NOT use `defer cancel()` here. The context must remain valid
157+
// for the caller to read the response body of a successful request.
158+
// Cancellation is handled manually on timeout paths.
159+
160+
// Starting the chunk upload in parallel.
161+
go func() {
162+
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
163+
resultCh <- uploadResult{res: res, err: err}
164+
}()
165+
166+
// Wait for timer to fire or result channel to have the uploadResult or ctx to be cancelled.
167+
select {
168+
// Note: Calling cancel() will guarantee that the goroutine finishes,
169+
// so these two cases will never block forever on draining the resultCh.
170+
case <-ctx.Done():
171+
// Context is cancelled for the overall upload.
172+
cancel()
173+
// Drain resultCh.
174+
<-resultCh
175+
return nil, ctx.Err()
176+
case <-timer.C:
177+
// Chunk Transfer timer fired before resultCh so we return context.DeadlineExceeded.
178+
cancel()
179+
// Drain resultCh.
180+
<-resultCh
181+
return nil, context.DeadlineExceeded
182+
case result := <-resultCh:
183+
// Handle the result from the upload.
184+
if result.err != nil {
185+
return result.res, result.err
186+
}
187+
return result.res, nil
156188
}
157-
return res, nil
158189
}
159190

160191
// uploadChunkWithRetries attempts to upload a single chunk, with retries
@@ -164,14 +195,14 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
164195
shouldRetry := rx.Retry.errorFunc()
165196

166197
// Configure single chunk retry deadline.
167-
retryDeadline := defaultRetryDeadline
198+
chunkRetryDeadline := defaultRetryDeadline
168199
if rx.ChunkRetryDeadline != 0 {
169-
retryDeadline = rx.ChunkRetryDeadline
200+
chunkRetryDeadline = rx.ChunkRetryDeadline
170201
}
171202

172203
// Each chunk gets its own initialized-at-zero backoff and invocation ID.
173204
bo := rx.Retry.backoff()
174-
quitAfterTimer := time.NewTimer(retryDeadline)
205+
quitAfterTimer := time.NewTimer(chunkRetryDeadline)
175206
defer quitAfterTimer.Stop()
176207
rx.attempts = 1
177208
rx.invocationID = uuid.New().String()
@@ -184,20 +215,20 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
184215
for {
185216
// Wait for the backoff period, unless the context is canceled or the
186217
// retry deadline is hit.
187-
pauseTimer := time.NewTimer(pause)
218+
backoffPauseTimer := time.NewTimer(pause)
188219
select {
189220
case <-ctx.Done():
190-
pauseTimer.Stop()
221+
backoffPauseTimer.Stop()
191222
if err == nil {
192223
err = ctx.Err()
193224
}
194225
return resp, err
195-
case <-pauseTimer.C:
226+
case <-backoffPauseTimer.C:
196227
case <-quitAfterTimer.C:
197-
pauseTimer.Stop()
228+
backoffPauseTimer.Stop()
198229
return resp, err
199230
}
200-
pauseTimer.Stop()
231+
backoffPauseTimer.Stop()
201232

202233
// Check for context cancellation or timeout once more. If more than one
203234
// case in the select statement above was satisfied at the same time, Go
@@ -233,6 +264,11 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
233264
if resp != nil {
234265
status = resp.StatusCode
235266
}
267+
// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
268+
// this file), so we don't expect to get a 308.
269+
if status == 308 {
270+
return nil, errors.New("unexpected 308 response status code")
271+
}
236272
// Chunk upload should be retried if the ChunkTransferTimeout is non-zero and err is context deadline exceeded
237273
// or we encounter a retryable error.
238274
if (rx.ChunkTransferTimeout != 0 && errors.Is(err, context.DeadlineExceeded)) || shouldRetry(status, err) {
@@ -283,7 +319,9 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (*http.Response, error) {
283319
if resp == nil {
284320
return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDeadline", rx.URI)
285321
}
286-
322+
if resp.StatusCode == http.StatusOK {
323+
rx.reportProgress(off, off+int64(size))
324+
}
287325
if statusResumeIncomplete(resp) {
288326
// The upload is not yet complete, but the server has acknowledged this chunk.
289327
// We don't have anything to do with the response body.

internal/gensupport/resumable_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,11 @@ func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, e
7272
}
7373
ev := t.events[0]
7474
t.events = t.events[1:]
75-
if ev.delay > 0 {
76-
time.Sleep(ev.delay)
75+
stallTimer := time.NewTimer(ev.delay)
76+
select {
77+
case <-stallTimer.C:
78+
case <-req.Context().Done():
79+
return nil, req.Context().Err()
7780
}
7881
if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
7982
return nil, fmt.Errorf("byte range: got %s; want %s", got, want)

0 commit comments

Comments
 (0)