@@ -137,7 +137,16 @@ func main() {
137
137
138
138
http .Handle (cfg .telemetryPath , promhttp .Handler ())
139
139
140
- elector = initElector (cfg )
140
+ elector , err = initElector (cfg )
141
+
142
+ if err != nil {
143
+ log .Error ("msg" , err .Error ())
144
+ os .Exit (1 )
145
+ }
146
+
147
+ if elector == nil {
148
+ log .Warn ("msg" , "No adapter leader election. Group lock id is not set. Possible duplicate write load if running adapter in high-availability mode" )
149
+ }
141
150
142
151
// migrate has to happen after elector started
143
152
if cfg .migrate {
@@ -153,8 +162,8 @@ func main() {
153
162
}
154
163
defer client .Close ()
155
164
156
- http .Handle ("/write" , timeHandler ("write" , write (client )))
157
- http .Handle ("/read" , timeHandler ("read" , read (client )))
165
+ http .Handle ("/write" , timeHandler (httpRequestDuration , "write" , write (client )))
166
+ http .Handle ("/read" , timeHandler (httpRequestDuration , "read" , read (client )))
158
167
http .Handle ("/healthz" , health (client ))
159
168
160
169
log .Info ("msg" , "Starting up..." )
@@ -189,26 +198,22 @@ func parseFlags() *config {
189
198
return cfg
190
199
}
191
200
192
- func initElector (cfg * config ) * util.Elector {
201
+ func initElector (cfg * config ) ( * util.Elector , error ) {
193
202
if cfg .restElection && cfg .haGroupLockID != 0 {
194
- log .Error ("msg" , "Use either REST or PgAdvisoryLock for the leader election" )
195
- os .Exit (1 )
203
+ return nil , fmt .Errorf ("Use either REST or PgAdvisoryLock for the leader election" )
196
204
}
197
205
if cfg .restElection {
198
- return util .NewElector (util .NewRestElection ())
206
+ return util .NewElector (util .NewRestElection ()), nil
199
207
}
200
208
if cfg .haGroupLockID == 0 {
201
- log .Warn ("msg" , "No adapter leader election. Group lock id is not set. Possible duplicate write load if running adapter in high-availability mode" )
202
- return nil
209
+ return nil , nil
203
210
}
204
211
if cfg .prometheusTimeout == - 1 {
205
- log .Error ("msg" , "Prometheus timeout configuration must be set when using PG advisory lock" )
206
- os .Exit (1 )
212
+ return nil , fmt .Errorf ("Prometheus timeout configuration must be set when using PG advisory lock" )
207
213
}
208
214
lock , err := util .NewPgAdvisoryLock (cfg .haGroupLockID , cfg .pgmodelCfg .GetConnectionStr ())
209
215
if err != nil {
210
- log .Error ("msg" , "Error creating advisory lock" , "haGroupLockId" , cfg .haGroupLockID , "err" , err )
211
- os .Exit (1 )
216
+ return nil , fmt .Errorf ("Error creating advisory lock\n haGroupLockId: %d\n err: %s\n " , cfg .haGroupLockID , err )
212
217
}
213
218
scheduledElector := util .NewScheduledElector (lock , cfg .electionInterval )
214
219
log .Info ("msg" , "Initialized leader election based on PostgreSQL advisory lock" )
@@ -221,7 +226,7 @@ func initElector(cfg *config) *util.Elector {
221
226
}
222
227
}()
223
228
}
224
- return & scheduledElector .Elector
229
+ return & scheduledElector .Elector , nil
225
230
}
226
231
227
232
func migrate (cfg * pgclient.Config ) {
@@ -261,15 +266,6 @@ func migrate(cfg *pgclient.Config) {
261
266
262
267
func write (writer pgmodel.DBInserter ) http.Handler {
263
268
return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
264
- compressed , err := ioutil .ReadAll (r .Body )
265
- if err != nil {
266
- log .Error ("msg" , "Read error" , "err" , err .Error ())
267
- http .Error (w , err .Error (), http .StatusInternalServerError )
268
- return
269
- }
270
-
271
- atomic .StoreInt64 (& lastRequestUnixNano , time .Now ().UnixNano ())
272
-
273
269
shouldWrite , err := isWriter ()
274
270
if err != nil {
275
271
leaderGauge .Set (0 )
@@ -283,6 +279,16 @@ func write(writer pgmodel.DBInserter) http.Handler {
283
279
}
284
280
285
281
leaderGauge .Set (1 )
282
+
283
+ compressed , err := ioutil .ReadAll (r .Body )
284
+ if err != nil {
285
+ log .Error ("msg" , "Read error" , "err" , err .Error ())
286
+ http .Error (w , err .Error (), http .StatusInternalServerError )
287
+ return
288
+ }
289
+
290
+ atomic .StoreInt64 (& lastRequestUnixNano , time .Now ().UnixNano ())
291
+
286
292
reqBuf , err := snappy .Decode (nil , compressed )
287
293
if err != nil {
288
294
log .Error ("msg" , "Decode error" , "err" , err .Error ())
@@ -310,6 +316,7 @@ func write(writer pgmodel.DBInserter) http.Handler {
310
316
numSamples , err := writer .Ingest (req .GetTimeseries ())
311
317
if err != nil {
312
318
log .Warn ("msg" , "Error sending samples to remote storage" , "err" , err , "num_samples" , numSamples )
319
+ http .Error (w , err .Error (), http .StatusInternalServerError )
313
320
failedSamples .Add (float64 (receivedBatchCount ))
314
321
return
315
322
}
@@ -415,12 +422,12 @@ func health(hc pgmodel.HealthChecker) http.Handler {
415
422
}
416
423
417
424
// timeHandler uses Prometheus histogram to track request time
418
- func timeHandler (path string , handler http.Handler ) http.Handler {
425
+ func timeHandler (histogramVec prometheus. ObserverVec , path string , handler http.Handler ) http.Handler {
419
426
f := func (w http.ResponseWriter , r * http.Request ) {
420
427
start := time .Now ()
421
428
handler .ServeHTTP (w , r )
422
- elapsedMs := time .Since (start ).Nanoseconds () / int64 ( time . Millisecond )
423
- httpRequestDuration .WithLabelValues (path ).Observe (float64 (elapsedMs ))
429
+ elapsedMs := time .Since (start ).Milliseconds ( )
430
+ histogramVec .WithLabelValues (path ).Observe (float64 (elapsedMs ))
424
431
}
425
432
return http .HandlerFunc (f )
426
433
}
0 commit comments