Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ccclient/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (p *poller) Poll(fallbackURL *url.URL, res *http.Response, cancelChan <-cha

switch body.Entity.Status {
case JOB_QUEUED, JOB_RUNNING:
p.logger.Info("cc-job-queued-or-running", lager.Data{"status": body.Entity.Status})
case JOB_FINISHED:
p.logger.Info("cc-job-finished")
return nil
Expand Down
80 changes: 70 additions & 10 deletions cmd/cc-uploader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"code.cloudfoundry.org/tlsconfig"
"context"
"crypto/tls"
"crypto/x509"
"flag"
Expand All @@ -11,7 +12,10 @@ import (
"net"
"net/http"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"

"code.cloudfoundry.org/cc-uploader/ccclient"
Expand Down Expand Up @@ -41,6 +45,9 @@ const (
communicationTimeout = 30 * time.Second
)

// Global WaitGroup to track uploads
var uploadWaitGroup sync.WaitGroup

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
flag.Parse()
Expand All @@ -54,12 +61,28 @@ func main() {

initializeDropsonde(logger, uploaderConfig)

// Create signal channel to listen for shutdown signals
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

// Goroutine to log any signal received (without handling non-TERM signals)
go func() {
allSignals := make(chan os.Signal, 1)
signal.Notify(allSignals) // Capture all signals for logging
for sig := range allSignals {
logger.Info("received-signal", lager.Data{"signal": sig.String()})
}
}()
var nonTLSServer *http.Server
tlsServer, tlsRunner := initializeServer(logger, uploaderConfig, true)
members := grouper.Members{
{"cc-uploader-tls", initializeServer(logger, uploaderConfig, true)},
{"cc-uploader-tls", tlsRunner},
}
if !uploaderConfig.DisableNonTLS {
var nonTLSRunner ifrit.Runner
nonTLSServer, nonTLSRunner = initializeServer(logger, uploaderConfig, false)
members = append(grouper.Members{
{"cc-uploader", initializeServer(logger, uploaderConfig, false)},
{"cc-uploader", nonTLSRunner},
}, members...)
}
if uploaderConfig.DebugServerConfig.DebugAddress != "" {
Expand All @@ -73,10 +96,37 @@ func main() {
monitor := ifrit.Invoke(sigmon.New(group))
logger.Info("ready")

err = <-monitor.Wait()
if err != nil {
logger.Error("exited-with-failure", err)
os.Exit(1)
select {
case err := <-monitor.Wait(): // Handle process failure
if err != nil {
logger.Info("exited-with-failure")
os.Exit(1)
}
case sig := <-signalChan: // Handle shutdown signal
logger.Info("shutdown-signal-received", lager.Data{"signal": sig})

// Gracefully signal Ifrit monitor to stop processes
monitor.Signal(os.Interrupt)
logger.Info("graceful-shutdown-waiting-for-uploads")
// Wait for all uploads to finish before shutting down
uploadWaitGroup.Wait()
// Add a delay to ensure responses are sent to Diego before shutdown
extraWait := 30 * time.Second
logger.Info("waiting-additional-time-before-shutdown", lager.Data{"duration": extraWait})
time.Sleep(extraWait) // Ensure uploader has time to send responses
// Gracefully shutdown the HTTP server
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()
if !uploaderConfig.DisableNonTLS {
if err := nonTLSServer.Shutdown(ctx); err != nil {
logger.Error("non-tls-server-shutdown-failed", err)
}
}

if err := tlsServer.Shutdown(ctx); err != nil {
logger.Error("tls-server-shutdown-failed", err)
}

}

logger.Info("exited")
Expand Down Expand Up @@ -123,18 +173,19 @@ func initializeTlsTransport(uploaderConfig config.UploaderConfig, skipVerify boo
}
}

func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig, tlsServer bool) ifrit.Runner {
func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig, tlsServer bool) (*http.Server, ifrit.Runner) {
uploader := ccclient.NewUploader(logger, &http.Client{Transport: initializeTlsTransport(uploaderConfig, false)})

// To maintain backwards compatibility with hairpin polling URLs, skip SSL verification for now
poller := ccclient.NewPoller(logger, &http.Client{Transport: initializeTlsTransport(uploaderConfig, true)}, time.Duration(uploaderConfig.CCJobPollingInterval))

ccUploaderHandler, err := handlers.New(uploader, poller, logger)
ccUploaderHandler, err := handlers.New(uploader, poller, logger, &uploadWaitGroup)
if err != nil {
logger.Error("router-building-failed", err)
os.Exit(1)
}

var server *http.Server
if tlsServer {
clientTLSConfig, err := tlsconfig.Build(
tlsconfig.WithIdentityFromFile(uploaderConfig.MutualTLS.ServerCert, uploaderConfig.MutualTLS.ServerKey),
Expand All @@ -154,7 +205,16 @@ func initializeServer(logger lager.Logger, uploaderConfig config.UploaderConfig,
if err != nil {
logger.Fatal("failed-loading-tls-config", err)
}
return http_server.NewTLSServer(uploaderConfig.MutualTLS.ListenAddress, ccUploaderHandler, clientTLSConfig)
server = &http.Server{
Addr: uploaderConfig.MutualTLS.ListenAddress,
Handler: ccUploaderHandler,
TLSConfig: clientTLSConfig,
}
return server, http_server.NewTLSServer(uploaderConfig.MutualTLS.ListenAddress, ccUploaderHandler, clientTLSConfig)
}
server = &http.Server{
Addr: uploaderConfig.ListenAddress,
Handler: ccUploaderHandler,
}
return http_server.New(uploaderConfig.ListenAddress, ccUploaderHandler)
return server, http_server.New(uploaderConfig.ListenAddress, ccUploaderHandler)
}
5 changes: 3 additions & 2 deletions handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"net/http"
"sync"

"code.cloudfoundry.org/cc-uploader"
"code.cloudfoundry.org/cc-uploader/ccclient"
Expand All @@ -11,9 +12,9 @@ import (
"github.com/tedsuo/rata"
)

func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger) (http.Handler, error) {
func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger, uploadWaitGroup *sync.WaitGroup) (http.Handler, error) {
return rata.NewRouter(ccuploader.Routes, rata.Handlers{
ccuploader.UploadDropletRoute: upload_droplet.New(uploader, poller, logger),
ccuploader.UploadDropletRoute: upload_droplet.New(uploader, poller, logger, uploadWaitGroup),
ccuploader.UploadBuildArtifactsRoute: upload_build_artifacts.New(uploader, logger),
})
}
5 changes: 3 additions & 2 deletions handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"net/url"
"strconv"
"sync"
"time"

"code.cloudfoundry.org/cc-uploader/ccclient"
Expand Down Expand Up @@ -53,8 +54,8 @@ var _ = Describe("Handlers", func() {

uploader := ccclient.NewUploader(logger, http.DefaultClient)
poller := ccclient.NewPoller(logger, http.DefaultClient, 100*time.Millisecond)

handler, err = handlers.New(uploader, poller, logger)
var wg sync.WaitGroup
handler, err = handlers.New(uploader, poller, logger, &wg)
Expect(err).NotTo(HaveOccurred())

postStatusCode = http.StatusCreated
Expand Down
21 changes: 13 additions & 8 deletions handlers/upload_droplet/upload_droplet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,37 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"

"code.cloudfoundry.org/cc-uploader/ccclient"
"code.cloudfoundry.org/lager/v3"
"code.cloudfoundry.org/runtimeschema/cc_messages"
)

func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger) http.Handler {
func New(uploader ccclient.Uploader, poller ccclient.Poller, logger lager.Logger, uploadWaitGroup *sync.WaitGroup) http.Handler {
return &dropletUploader{
uploader: uploader,
poller: poller,
logger: logger,
uploader: uploader,
poller: poller,
logger: logger,
uploadWaitGroup: uploadWaitGroup, // Store reference
}
}

type dropletUploader struct {
uploader ccclient.Uploader
poller ccclient.Poller
logger lager.Logger
uploader ccclient.Uploader
poller ccclient.Poller
logger lager.Logger
uploadWaitGroup *sync.WaitGroup // Add a pointer to WaitGroup
}

var MissingCCDropletUploadUriKeyError = errors.New(fmt.Sprintf("missing %s parameter", cc_messages.CcDropletUploadUriKey))

func (h *dropletUploader) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := h.logger.Session("droplet.upload")

h.uploadWaitGroup.Add(1)
// Ensure that the WaitGroup is decremented when the function returns
defer h.uploadWaitGroup.Done()
logger.Info("extracting-droplet-upload-uri-key")
uploadUriParameter := r.URL.Query().Get(cc_messages.CcDropletUploadUriKey)
if uploadUriParameter == "" {
Expand Down
4 changes: 3 additions & 1 deletion handlers/upload_droplet/upload_droplet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"sync"
"time"

"code.cloudfoundry.org/cc-uploader/ccclient/fake_ccclient"
Expand Down Expand Up @@ -36,7 +37,8 @@ var _ = Describe("UploadDroplet", func() {

JustBeforeEach(func() {
logger = lager.NewLogger("fake-logger")
dropletUploadHandler := upload_droplet.New(&uploader, &poller, logger)
var wg sync.WaitGroup
dropletUploadHandler := upload_droplet.New(&uploader, &poller, logger, &wg)

dropletUploadHandler.ServeHTTP(responseWriter, incomingRequest)
})
Expand Down