Skip to content
98 changes: 68 additions & 30 deletions internal/gensupport/resumable.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,34 +127,65 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
}

// transferChunk performs a single HTTP request to upload a single chunk.
// It uses a goroutine to perform the upload and a timer to enforce ChunkTransferTimeout.
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
// rCtx is derived from a context with a defined ChunkTransferTimeout with non-zero value.
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
// triggering a retry of the request.
var rCtx context.Context
var cancel context.CancelFunc

rCtx = ctx
if rx.ChunkTransferTimeout != 0 {
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
defer cancel()
// If no timeout is specified, perform the request synchronously without a timer.
if rx.ChunkTransferTimeout == 0 {
res, err := rx.doUploadRequest(ctx, chunk, off, size, done)
if err != nil {
return res, err
}
return res, nil
}

res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
if err != nil {
return res, err
}
// Start a timer for the ChunkTransferTimeout duration.
timer := time.NewTimer(rx.ChunkTransferTimeout)

// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
// this file), so we don't expect to get a 308.
if res.StatusCode == 308 {
return nil, errors.New("unexpected 308 response status code")
// A struct to hold the result from the goroutine.
type uploadResult struct {
res *http.Response
err error
}

if res.StatusCode == http.StatusOK {
rx.reportProgress(off, off+int64(size))
// A buffered channel to receive the result of the upload.
resultCh := make(chan uploadResult, 1)

// Create a cancellable context for the upload request. This allows us to
// abort the request if the timer fires first.
rCtx, cancel := context.WithCancel(ctx)
// NOTE: We do NOT use `defer cancel()` here. The context must remain valid
// for the caller to read the response body of a successful request.
// Cancellation is handled manually on timeout paths.

// Starting the chunk upload in parallel.
go func() {
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
resultCh <- uploadResult{res: res, err: err}
}()

// Wait for timer to fire or result channel to have the uploadResult or ctx to be cancelled.
select {
// Note: Calling cancel() will guarantee that the goroutine finishes,
// so these two cases will never block forever on draining the resultCh.
case <-ctx.Done():
// Context is cancelled for the overall upload.
cancel()
// Drain resultCh.
<-resultCh
return nil, ctx.Err()
case <-timer.C:
// Chunk Transfer timer fired before resultCh so we return context.DeadlineExceeded.
cancel()
// Drain resultCh.
<-resultCh
return nil, context.DeadlineExceeded
case result := <-resultCh:
// Handle the result from the upload.
if result.err != nil {
return result.res, result.err
}
return result.res, nil
}
return res, nil
}

// uploadChunkWithRetries attempts to upload a single chunk, with retries
Expand All @@ -164,14 +195,14 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
shouldRetry := rx.Retry.errorFunc()

// Configure single chunk retry deadline.
retryDeadline := defaultRetryDeadline
chunkRetryDeadline := defaultRetryDeadline
if rx.ChunkRetryDeadline != 0 {
retryDeadline = rx.ChunkRetryDeadline
chunkRetryDeadline = rx.ChunkRetryDeadline
}

// Each chunk gets its own initialized-at-zero backoff and invocation ID.
bo := rx.Retry.backoff()
quitAfterTimer := time.NewTimer(retryDeadline)
quitAfterTimer := time.NewTimer(chunkRetryDeadline)
defer quitAfterTimer.Stop()
rx.attempts = 1
rx.invocationID = uuid.New().String()
Expand All @@ -184,20 +215,20 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
for {
// Wait for the backoff period, unless the context is canceled or the
// retry deadline is hit.
pauseTimer := time.NewTimer(pause)
backoffPauseTimer := time.NewTimer(pause)
select {
case <-ctx.Done():
pauseTimer.Stop()
backoffPauseTimer.Stop()
if err == nil {
err = ctx.Err()
}
return resp, err
case <-pauseTimer.C:
case <-backoffPauseTimer.C:
case <-quitAfterTimer.C:
pauseTimer.Stop()
backoffPauseTimer.Stop()
return resp, err
}
pauseTimer.Stop()
backoffPauseTimer.Stop()

// Check for context cancellation or timeout once more. If more than one
// case in the select statement above was satisfied at the same time, Go
Expand Down Expand Up @@ -233,6 +264,11 @@ func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.
if resp != nil {
status = resp.StatusCode
}
// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
// this file), so we don't expect to get a 308.
if status == 308 {
return nil, errors.New("unexpected 308 response status code")
}
// Chunk upload should be retried if the ChunkTransferTimeout is non-zero and err is context deadline exceeded
// or we encounter a retryable error.
if (rx.ChunkTransferTimeout != 0 && errors.Is(err, context.DeadlineExceeded)) || shouldRetry(status, err) {
Expand Down Expand Up @@ -283,7 +319,9 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (*http.Response, error) {
if resp == nil {
return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDeadline", rx.URI)
}

if resp.StatusCode == http.StatusOK {
rx.reportProgress(off, off+int64(size))
}
if statusResumeIncomplete(resp) {
// The upload is not yet complete, but the server has acknowledged this chunk.
// We don't have anything to do with the response body.
Expand Down
7 changes: 5 additions & 2 deletions internal/gensupport/resumable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, e
}
ev := t.events[0]
t.events = t.events[1:]
if ev.delay > 0 {
time.Sleep(ev.delay)
stallTimer := time.NewTimer(ev.delay)
select {
case <-stallTimer.C:
case <-req.Context().Done():
return nil, req.Context().Err()
}
if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
return nil, fmt.Errorf("byte range: got %s; want %s", got, want)
Expand Down