Skip to content

Commit bd0f632

Browse files
authored
More resilient droplet upload (#195)
* More resilient droplet upload, ensure draining for complete upload process
1 parent f6b54a6 commit bd0f632

File tree

7 files changed

+107
-25
lines changed

7 files changed

+107
-25
lines changed

ccclient/poller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (p *poller) Poll(fallbackURL *url.URL, res *http.Response, cancelChan <-cha
4747

4848
switch body.Entity.Status {
4949
case JOB_QUEUED, JOB_RUNNING:
50+
p.logger.Info("cc-job-queued-or-running", lager.Data{"status": body.Entity.Status})
5051
case JOB_FINISHED:
5152
p.logger.Info("cc-job-finished")
5253
return nil

cmd/cc-uploader/main.go

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"code.cloudfoundry.org/tlsconfig"
5+
"context"
56
"crypto/tls"
67
"crypto/x509"
78
"flag"
@@ -11,7 +12,10 @@ import (
1112
"net"
1213
"net/http"
1314
"os"
15+
"os/signal"
1416
"runtime"
17+
"sync"
18+
"syscall"
1519
"time"
1620

1721
"code.cloudfoundry.org/cc-uploader/ccclient"
@@ -41,6 +45,9 @@ const (
4145
communicationTimeout = 30 * time.Second
4246
)
4347

48+
// Global WaitGroup to track uploads
49+
var uploadWaitGroup sync.WaitGroup
50+
4451
func main() {
4552
runtime.GOMAXPROCS(runtime.NumCPU())
4653
flag.Parse()
@@ -54,12 +61,20 @@ func main() {
5461

5562
initializeDropsonde(logger, uploaderConfig)
5663

64+
// Create signal channel to listen for shutdown signals
65+
signalChan := make(chan os.Signal, 1)
66+
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
67+
68+
var nonTLSServer *http.Server
69+
tlsServer, tlsRunner := initializeServer(logger, uploaderConfig, true)
5770
members := grouper.Members{
58-
{"cc-uploader-tls", initializeServer(logger, uploaderConfig, true)},
71+
{"cc-uploader-tls", tlsRunner},
5972
}
6073
if !uploaderConfig.DisableNonTLS {
74+
var nonTLSRunner ifrit.Runner
75+
nonTLSServer, nonTLSRunner = initializeServer(logger, uploaderConfig, false)
6176
members = append(grouper.Members{
62-
{"cc-uploader", initializeServer(logger, uploaderConfig, false)},
77+
{"cc-uploader", nonTLSRunner},
6378
}, members...)
6479
}
6580
if uploaderConfig.DebugServerConfig.DebugAddress != "" {
@@ -73,10 +88,32 @@ func main() {
7388
monitor := ifrit.Invoke(sigmon.New(group))
7489
logger.Info("ready")
7590

76-
err = <-monitor.Wait()
77-
if err != nil {
78-
logger.Error("exited-with-failure", err)
79-
os.Exit(1)
91+
select {
92+
case err := <-monitor.Wait():
93+
if err != nil {
94+
logger.Info("exited-with-failure")
95+
os.Exit(1)
96+
}
97+
case sig := <-signalChan:
98+
logger.Info("shutdown-signal-received", lager.Data{"signal": sig})
99+
100+
// Gracefully signal Ifrit monitor to stop processes
101+
monitor.Signal(os.Interrupt)
102+
logger.Info("graceful-shutdown-waiting-for-uploads")
103+
// Wait for all uploads to finish before shutting down
104+
uploadWaitGroup.Wait()
105+
// Gracefully shutdown the HTTP server
106+
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
107+
defer cancel()
108+
if !uploaderConfig.DisableNonTLS {
109+
if err := nonTLSServer.Shutdown(ctx); err != nil {
110+
logger.Error("non-tls-server-shutdown-failed", err)
111+
}
112+
}
113+
114+
if err := tlsServer.Shutdown(ctx); err != nil {
115+
logger.Error("tls-server-shutdown-failed", err)
116+
}
80117
}
81118

82119
logger.Info("exited")
@@ -123,18 +160,19 @@ func initializeTlsTransport(uploaderConfig config.UploaderConfig, skipVerify boo
123160
}
124161
}
125162

126-
func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig, tlsServer bool) ifrit.Runner {
163+
func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig, tlsServer bool) (*http.Server, ifrit.Runner) {
127164
uploader := ccclient.NewUploader(logger, &http.Client{Transport: initializeTlsTransport(uploaderConfig, false)})
128165

129166
// To maintain backwards compatibility with hairpin polling URLs, skip SSL verification for now
130167
poller := ccclient.NewPoller(logger, &http.Client{Transport: initializeTlsTransport(uploaderConfig, true)}, time.Duration(uploaderConfig.CCJobPollingInterval))
131168

132-
ccUploaderHandler, err := handlers.New(uploader, poller, logger)
169+
ccUploaderHandler, err := handlers.New(uploader, poller, logger, &uploadWaitGroup)
133170
if err != nil {
134171
logger.Error("router-building-failed", err)
135172
os.Exit(1)
136173
}
137174

175+
var server *http.Server
138176
if tlsServer {
139177
clientTLSConfig, err := tlsconfig.Build(
140178
tlsconfig.WithIdentityFromFile(uploaderConfig.MutualTLS.ServerCert, uploaderConfig.MutualTLS.ServerKey),
@@ -151,10 +189,16 @@ func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig,
151189
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
152190
}
153191

154-
if err != nil {
155-
logger.Fatal("failed-loading-tls-config", err)
192+
server = &http.Server{
193+
Addr: uploaderConfig.MutualTLS.ListenAddress,
194+
Handler: ccUploaderHandler,
195+
TLSConfig: clientTLSConfig,
156196
}
157-
return http_server.NewTLSServer(uploaderConfig.MutualTLS.ListenAddress, ccUploaderHandler, clientTLSConfig)
197+
return server, http_server.NewTLSServer(uploaderConfig.MutualTLS.ListenAddress, ccUploaderHandler, clientTLSConfig)
198+
}
199+
server = &http.Server{
200+
Addr: uploaderConfig.ListenAddress,
201+
Handler: ccUploaderHandler,
158202
}
159-
return http_server.New(uploaderConfig.ListenAddress, ccUploaderHandler)
203+
return server, http_server.New(uploaderConfig.ListenAddress, ccUploaderHandler)
160204
}

cmd/cc-uploader/main_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,34 @@ var _ = Describe("CC Uploader", func() {
257257
Expect(len(fakeCC.UploadedDroplets[appGuid])).To(Equal(contentLength))
258258
})
259259
})
260+
260261
})
261262
})
263+
264+
Describe("Handling shutdown signals", func() {
265+
It("should handle SIGTERM and drain ongoing uploads before shutting down", func() {
266+
ccUploaderAddress := fmt.Sprintf("http://localhost:%d", httpListenPort)
267+
emitter := NewEmitter(100) // large, slow upload
268+
postRequest := dropletUploadRequest(appGuid, emitter, 100, ccUploaderAddress)
269+
270+
go func() {
271+
_, err := http.DefaultClient.Do(postRequest)
272+
Expect(err).NotTo(HaveOccurred())
273+
}()
274+
275+
// Give it a moment to start the upload
276+
time.Sleep(200 * time.Millisecond)
277+
278+
// Send SIGTERM to the cc-uploader process
279+
session.Signal(os.Interrupt)
280+
281+
// Expect shutdown logs
282+
Eventually(session, 1*time.Second).Should(gbytes.Say("shutdown-signal-received"))
283+
Eventually(session, 1*time.Second).Should(gbytes.Say("graceful-shutdown-waiting-for-uploads"))
284+
285+
// Wait for the process to exit cleanly
286+
Eventually(session, 2*time.Second).Should(gexec.Exit(0))
287+
})
288+
})
289+
262290
})

handlers/handlers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package handlers
22

33
import (
44
"net/http"
5+
"sync"
56

67
"code.cloudfoundry.org/cc-uploader"
78
"code.cloudfoundry.org/cc-uploader/ccclient"
@@ -11,9 +12,9 @@ import (
1112
"github.com/tedsuo/rata"
1213
)
1314

14-
func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger) (http.Handler, error) {
15+
func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger, uploadWaitGroup *sync.WaitGroup) (http.Handler, error) {
1516
return rata.NewRouter(ccuploader.Routes, rata.Handlers{
16-
ccuploader.UploadDropletRoute: upload_droplet.New(uploader, poller, logger),
17+
ccuploader.UploadDropletRoute: upload_droplet.New(uploader, poller, logger, uploadWaitGroup),
1718
ccuploader.UploadBuildArtifactsRoute: upload_build_artifacts.New(uploader, logger),
1819
})
1920
}

handlers/handlers_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http/httptest"
99
"net/url"
1010
"strconv"
11+
"sync"
1112
"time"
1213

1314
"code.cloudfoundry.org/cc-uploader/ccclient"
@@ -53,8 +54,8 @@ var _ = Describe("Handlers", func() {
5354

5455
uploader := ccclient.NewUploader(logger, http.DefaultClient)
5556
poller := ccclient.NewPoller(logger, http.DefaultClient, 100*time.Millisecond)
56-
57-
handler, err = handlers.New(uploader, poller, logger)
57+
var wg sync.WaitGroup
58+
handler, err = handlers.New(uploader, poller, logger, &wg)
5859
Expect(err).NotTo(HaveOccurred())
5960

6061
postStatusCode = http.StatusCreated

handlers/upload_droplet/upload_droplet.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,37 @@ import (
66
"net/http"
77
"net/url"
88
"strconv"
9+
"sync"
910
"time"
1011

1112
"code.cloudfoundry.org/cc-uploader/ccclient"
1213
"code.cloudfoundry.org/lager/v3"
1314
"code.cloudfoundry.org/runtimeschema/cc_messages"
1415
)
1516

16-
func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger) http.Handler {
17+
func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger, uploadWaitGroup *sync.WaitGroup) http.Handler {
1718
return &dropletUploader{
18-
uploader: uploader,
19-
poller: poller,
20-
logger: logger,
19+
uploader: uploader,
20+
poller: poller,
21+
logger: logger,
22+
uploadWaitGroup: uploadWaitGroup,
2123
}
2224
}
2325

2426
type dropletUploader struct {
25-
uploader ccclient.Uploader
26-
poller ccclient.Poller
27-
logger lager.Logger
27+
uploader ccclient.Uploader
28+
poller ccclient.Poller
29+
logger lager.Logger
30+
uploadWaitGroup *sync.WaitGroup
2831
}
2932

3033
var MissingCCDropletUploadUriKeyError = errors.New(fmt.Sprintf("missing %s parameter", cc_messages.CcDropletUploadUriKey))
3134

3235
func (h *dropletUploader) ServeHTTP(w http.ResponseWriter, r *http.Request) {
3336
logger := h.logger.Session("droplet.upload")
34-
37+
h.uploadWaitGroup.Add(1)
38+
// Ensure that the WaitGroup is decremented when the function returns
39+
defer h.uploadWaitGroup.Done()
3540
logger.Info("extracting-droplet-upload-uri-key")
3641
uploadUriParameter := r.URL.Query().Get(cc_messages.CcDropletUploadUriKey)
3742
if uploadUriParameter == "" {

handlers/upload_droplet/upload_droplet_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"net/http/httptest"
99
"net/url"
10+
"sync"
1011
"time"
1112

1213
"code.cloudfoundry.org/cc-uploader/ccclient/fake_ccclient"
@@ -36,7 +37,8 @@ var _ = Describe("UploadDroplet", func() {
3637

3738
JustBeforeEach(func() {
3839
logger = lager.NewLogger("fake-logger")
39-
dropletUploadHandler := upload_droplet.New(&uploader, &poller, logger)
40+
var wg sync.WaitGroup
41+
dropletUploadHandler := upload_droplet.New(&uploader, &poller, logger, &wg)
4042

4143
dropletUploadHandler.ServeHTTP(responseWriter, incomingRequest)
4244
})

0 commit comments

Comments
 (0)