Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 62e04aa

Browse files
committed
Improve error handling for metric initialization
1 parent f7d7b41 commit 62e04aa

File tree

1 file changed

+61
-43
lines changed

1 file changed

+61
-43
lines changed

pkg/pgmodel/sql_ingest.go

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,31 @@ type insertDataRequest struct {
146146
errChan chan error
147147
}
148148

149+
func (idr *insertDataRequest) reportResult(err error) {
150+
if err != nil {
151+
select {
152+
case idr.errChan <- err:
153+
default:
154+
}
155+
}
156+
idr.finished.Done()
157+
}
158+
149159
type insertDataTask struct {
150160
finished *sync.WaitGroup
151161
errChan chan error
152162
}
153163

164+
func (idt *insertDataTask) reportResult(err error) {
165+
if err != nil {
166+
select {
167+
case idt.errChan <- err:
168+
default:
169+
}
170+
}
171+
idt.finished.Done()
172+
}
173+
154174
func (p *pgxInserter) InsertData(rows map[string][]samplesInfo) (uint64, error) {
155175
var numRows uint64
156176
workFinished := &sync.WaitGroup{}
@@ -293,50 +313,27 @@ type copyRequest struct {
293313
table string
294314
}
295315

296-
func runInserterRoutineFailure(input chan insertDataRequest, err error) {
297-
for idr := range input {
298-
select {
299-
case idr.errChan <- fmt.Errorf("The insert routine has previously failed with %w", err):
300-
default:
301-
}
302-
idr.finished.Done()
303-
}
304-
}
305-
306316
func runInserterRoutine(conn pgxConn, input chan insertDataRequest, metricName string, completeMetricCreationSignal chan struct{}, errChan chan error, metricTableNames MetricCache, toCopiers chan copyRequest) {
307-
tableName, err := metricTableNames.Get(metricName)
308-
if err == ErrEntryNotFound {
309-
var possiblyNew bool
310-
tableName, possiblyNew, err = getMetricTableName(conn, metricName)
317+
var tableName string
318+
var firstReq insertDataRequest
319+
firstReqSet := false
320+
for firstReq = range input {
321+
var err error
322+
tableName, err = initializeInserterRoutine(conn, metricName, completeMetricCreationSignal, metricTableNames)
311323
if err != nil {
312324
select {
313325
case errChan <- err:
314326
default:
315327
}
316-
//won't be able to insert anyway
317-
runInserterRoutineFailure(input, err)
318-
return
328+
firstReq.reportResult(fmt.Errorf("Initializing the insert routine has failed with %w", err))
329+
} else {
330+
firstReqSet = true
331+
break
319332
}
333+
}
320334

321-
//ignone error since this is just an optimization
322-
_ = metricTableNames.Set(metricName, tableName)
323-
324-
if possiblyNew {
325-
//pass a signal if there is space
326-
select {
327-
case completeMetricCreationSignal <- struct{}{}:
328-
default:
329-
}
330-
}
331-
} else if err != nil {
332-
if err != nil {
333-
select {
334-
case errChan <- err:
335-
default:
336-
}
337-
}
338-
//won't be able to insert anyway
339-
runInserterRoutineFailure(input, err)
335+
//input channel was closed before getting a successful request
336+
if !firstReqSet {
340337
return
341338
}
342339

@@ -349,6 +346,8 @@ func runInserterRoutine(conn pgxConn, input chan insertDataRequest, metricName s
349346
toCopiers: toCopiers,
350347
}
351348

349+
handler.handleReq(firstReq)
350+
352351
for {
353352
if !handler.hasPendingReqs() {
354353
stillAlive := handler.blockingHandleReq()
@@ -369,6 +368,31 @@ func runInserterRoutine(conn pgxConn, input chan insertDataRequest, metricName s
369368
}
370369
}
371370

371+
func initializeInserterRoutine(conn pgxConn, metricName string, completeMetricCreationSignal chan struct{}, metricTableNames MetricCache) (tableName string, err error) {
372+
tableName, err = metricTableNames.Get(metricName)
373+
if err == ErrEntryNotFound {
374+
var possiblyNew bool
375+
tableName, possiblyNew, err = getMetricTableName(conn, metricName)
376+
if err != nil {
377+
return "", err
378+
}
379+
380+
//ignone error since this is just an optimization
381+
_ = metricTableNames.Set(metricName, tableName)
382+
383+
if possiblyNew {
384+
//pass a signal if there is space
385+
select {
386+
case completeMetricCreationSignal <- struct{}{}:
387+
default:
388+
}
389+
}
390+
} else if err != nil {
391+
return "", err
392+
}
393+
return tableName, err
394+
}
395+
372396
func (h *insertHandler) hasPendingReqs() bool {
373397
return len(h.pending.batch.sampleInfos) > 0
374398
}
@@ -561,13 +585,7 @@ func decompressChunks(conn pgxConn, pending *pendingBuffer, table string) error
561585

562586
func (pending *pendingBuffer) reportResults(err error) {
563587
for i := 0; i < len(pending.needsResponse); i++ {
564-
if err != nil {
565-
select {
566-
case pending.needsResponse[i].errChan <- err:
567-
default:
568-
}
569-
}
570-
pending.needsResponse[i].finished.Done()
588+
pending.needsResponse[i].reportResult(err)
571589
pending.needsResponse[i] = insertDataTask{}
572590
}
573591
pending.needsResponse = pending.needsResponse[:0]

0 commit comments

Comments
 (0)