Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 976c046

Browse files
Add ability to stop routines on partial error.
Signed-off-by: Harkishen-Singh <[email protected]>
1 parent abfa986 commit 976c046

File tree

2 files changed

+70
-48
lines changed

2 files changed

+70
-48
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/jackc/pgtype v1.10.0
2525
github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d
2626
github.com/jaegertracing/jaeger v1.31.0
27+
github.com/oklog/run v1.1.0 // indirect
2728
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.45.1
2829
github.com/opentracing-contrib/go-stdlib v1.0.0
2930
github.com/opentracing/opentracing-go v1.2.0

pkg/runner/runner.go

Lines changed: 69 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"fmt"
1313
"net"
1414
"net/http"
15-
"sync"
15+
"os"
16+
"os/signal"
1617
"time"
1718

1819
_ "github.com/jackc/pgx/v4/stdlib"
20+
"github.com/oklog/run"
1921
"go.opentelemetry.io/collector/model/otlpgrpc"
2022
"go.opentelemetry.io/otel"
2123
"google.golang.org/grpc"
@@ -126,9 +128,7 @@ func Run(cfg *Config) error {
126128
return fmt.Errorf("error registering metrics for telemetry: %w", err)
127129
}
128130

129-
log.Info("msg", "Started Prometheus HTTP server", "listening-port", cfg.ListenAddr)
130-
131-
var wg sync.WaitGroup
131+
var group run.Group
132132
if len(cfg.ThanosStoreAPIListenAddr) > 0 {
133133
srv := thanos.NewStorage(client.Queryable())
134134
options := make([]grpc.ServerOption, 0)
@@ -143,22 +143,20 @@ func Run(cfg *Config) error {
143143
grpcServer := grpc.NewServer(options...)
144144
storepb.RegisterStoreServer(grpcServer, srv)
145145

146-
wg.Add(1)
147-
go func() {
148-
listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr)
149-
if err != nil {
150-
wg.Done()
151-
log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err)
152-
return
153-
}
154-
155-
log.Info("msg", "Started Thanos StoreAPI GRPC server", "listening-port", cfg.ThanosStoreAPIListenAddr)
156-
wg.Done()
157-
if err := grpcServer.Serve(listener); err != nil {
158-
log.Error("msg", "Starting the Thanos store failed", "err", err)
159-
return
160-
}
161-
}()
146+
group.Add(
147+
func() error {
148+
listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr)
149+
if err != nil {
150+
log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err)
151+
return err
152+
}
153+
log.Info("msg", "Started Thanos StoreAPI GRPC server", "listening-port", cfg.ThanosStoreAPIListenAddr)
154+
return grpcServer.Serve(listener)
155+
}, func(error) {
156+
log.Info("msg", "Stopping Thanos StoreAPI GRPC server")
157+
grpcServer.Stop()
158+
},
159+
)
162160
}
163161

164162
if len(cfg.OTLPGRPCListenAddr) > 0 {
@@ -195,40 +193,63 @@ func Run(cfg *Config) error {
195193
return err
196194
}
197195

198-
wg.Add(1)
199-
go func() {
200-
listener, err := net.Listen("tcp", cfg.OTLPGRPCListenAddr)
201-
if err != nil {
202-
wg.Done()
203-
log.Error("msg", "Listening for OTLP GRPC server failed", "err", err)
204-
return
205-
}
206-
207-
log.Info("msg", "Started OpenTelemetry OTLP GRPC server", "listening-port", cfg.OTLPGRPCListenAddr)
208-
wg.Done()
209-
if err := grpcServer.Serve(listener); err != nil {
210-
log.Error("msg", "Starting the OTLP GRPC server failed", "err", err)
211-
return
212-
}
213-
}()
196+
group.Add(
197+
func() error {
198+
listener, err := net.Listen("tcp", cfg.OTLPGRPCListenAddr)
199+
if err != nil {
200+
log.Error("msg", "Listening for OpenTelemetry OTLP GRPC server failed", "err", err)
201+
return err
202+
}
203+
log.Info("msg", "Started OpenTelemetry OTLP GRPC server", "listening-port", cfg.OTLPGRPCListenAddr)
204+
return grpcServer.Serve(listener)
205+
}, func(error) {
206+
log.Info("msg", "Stopping OpenTelemetry OTLP GRPC server")
207+
grpcServer.Stop()
208+
},
209+
)
214210
}
215211

216212
mux := http.NewServeMux()
217213
mux.Handle("/", router)
218214

219-
wg.Wait()
220-
log.Info("msg", "All components are ready!")
221-
222-
if cfg.TLSCertFile != "" {
223-
err = http.ListenAndServeTLS(cfg.ListenAddr, cfg.TLSCertFile, cfg.TLSKeyFile, mux)
224-
} else {
225-
err = http.ListenAndServe(cfg.ListenAddr, mux)
215+
server := http.Server{
216+
Addr: cfg.ListenAddr,
217+
Handler: mux,
226218
}
227-
219+
group.Add(
220+
func() error {
221+
var err error
222+
log.Info("msg", "Started Prometheus remote-storage HTTP server", "listening-port", cfg.ListenAddr)
223+
if cfg.TLSCertFile != "" {
224+
err = server.ListenAndServeTLS(cfg.TLSCertFile, cfg.TLSKeyFile)
225+
} else {
226+
err = server.ListenAndServe()
227+
}
228+
return err
229+
}, func(error) {
230+
log.Info("msg", "Stopping Prometheus remote-storage HTTP server")
231+
err = server.Shutdown(context.Background())
232+
if err != nil {
233+
log.Error("msg", "unable to shutdown Prometheus remote-storage HTTP server", "err", err.Error())
234+
}
235+
},
236+
)
237+
238+
// Listen to OS interrupt signals.
239+
c := make(chan os.Signal, 1)
240+
signal.Notify(c, os.Interrupt)
241+
group.Add(
242+
func() error {
243+
<-c
244+
return nil
245+
}, func(err error) {
246+
close(c)
247+
},
248+
)
249+
250+
err = group.Run()
228251
if err != nil {
229-
log.Error("msg", "Listen failure", "err", err)
230-
return startupError
252+
log.Error("msg", "Execution failure, stopping Promscale", "err", err)
231253
}
232-
233-
return nil
254+
return err
234255
}

0 commit comments

Comments
 (0)