Skip to content

Commit a6e06db

Browse files
committed
feat(ingest): error when ingestion is not possible
1 parent 0233994 commit a6e06db

File tree

6 files changed

+75
-11
lines changed

6 files changed

+75
-11
lines changed

cmd/humanlog/ingest.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func ingest(
3232
getState func(*cli.Context) *state.State,
3333
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
3434
getHTTPClient func(*cli.Context) *http.Client,
35+
notifyUnableToIngest func(error),
3536
) (sink.Sink, error) {
3637
state := getState(cctx)
3738
tokenSource := getTokenSource(cctx)
@@ -65,13 +66,13 @@ func ingest(
6566
var snk sink.Sink
6667
switch sinkType := os.Getenv("HUMANLOG_SINK_TYPE"); sinkType {
6768
case "unary":
68-
snk = logsvcsink.StartUnarySink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true)
69+
snk = logsvcsink.StartUnarySink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
6970
case "bidi":
70-
snk = logsvcsink.StartBidiStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true)
71+
snk = logsvcsink.StartBidiStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
7172
case "stream":
7273
fallthrough // use the stream sink as default, it's the best tradeoff for performance and compatibility
7374
default:
74-
snk = logsvcsink.StartStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true)
75+
snk = logsvcsink.StartStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
7576
}
7677

7778
return snk, nil

cmd/humanlog/localhost.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,13 @@ func startLocalhostServer(
6161
localhostHttpClient *http.Client,
6262
ownVersion *typesv1.Version,
6363
) (localsink sink.Sink, done func(context.Context) error, err error) {
64-
localhostAddr := net.JoinHostPort("localhost", strconv.Itoa(port))
6564

65+
notifyUnableToIngest := func(err error) {
66+
ll.ErrorContext(ctx, "localhost ingestor is unable to ingest", slog.Any("err", err))
67+
// TODO: take this as a hint to become the localhost ingestor
68+
}
69+
70+
localhostAddr := net.JoinHostPort("localhost", strconv.Itoa(port))
6671
l, err := net.Listen("tcp", localhostAddr)
6772
if err != nil && !isEADDRINUSE(err) {
6873
return nil, nil, fmt.Errorf("listening on host/port: %v", err)
@@ -78,7 +83,7 @@ func startLocalhostServer(
7883
}
7984
logdebug("sending logs to localhost forwarder")
8085
client := ingestv1connect.NewIngestServiceClient(localhostHttpClient, addr.String())
81-
localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true)
86+
localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
8287
return localhostSink, func(ctx context.Context) error {
8388
logdebug("flushing localhost sink")
8489
return localhostSink.Flush(ctx)

cmd/humanlog/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,14 +444,20 @@ func newApp() *cli.App {
444444
ll := getLogger(cctx)
445445
apiURL := getAPIUrl(cctx)
446446

447+
notifyUnableToIngest := func(err error) {
448+
// TODO: notify using system notification?
449+
logerror("configured to ingest, but unable to do so: %v", err)
450+
os.Exit(1)
451+
}
452+
447453
flushTimeout := 300 * time.Millisecond
448454
ingestctx, ingestcancel := context.WithCancel(context.WithoutCancel(ctx))
449455
go func() {
450456
<-ctx.Done()
451457
time.Sleep(2 * flushTimeout) // give it 2x timeout to flush before nipping the ctx entirely
452458
ingestcancel()
453459
}()
454-
remotesink, err := ingest(ingestctx, ll, cctx, apiURL, getCfg, getState, getTokenSource, getHTTPClient)
460+
remotesink, err := ingest(ingestctx, ll, cctx, apiURL, getCfg, getState, getTokenSource, getHTTPClient, notifyUnableToIngest)
455461
if err != nil {
456462
return fmt.Errorf("can't send logs: %v", err)
457463
}

pkg/sink/logsvcsink/bidistream_sink.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package logsvcsink
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"log/slog"
@@ -28,7 +29,17 @@ type ConnectBidiStreamSink struct {
2829
doneFlushing chan struct{}
2930
}
3031

31-
func StartBidiStreamSink(ctx context.Context, ll *slog.Logger, client ingestv1connect.IngestServiceClient, name string, machineID uint64, bufferSize int, drainBufferFor time.Duration, dropIfFull bool) *ConnectBidiStreamSink {
32+
func StartBidiStreamSink(
33+
ctx context.Context,
34+
ll *slog.Logger,
35+
client ingestv1connect.IngestServiceClient,
36+
name string,
37+
machineID uint64,
38+
bufferSize int,
39+
drainBufferFor time.Duration,
40+
dropIfFull bool,
41+
notifyUnableToIngest func(err error),
42+
) *ConnectBidiStreamSink {
3243
snk := &ConnectBidiStreamSink{
3344
ll: ll.With(
3445
slog.String("sink", name),
@@ -53,6 +64,12 @@ func StartBidiStreamSink(ctx context.Context, ll *slog.Logger, client ingestv1co
5364
close(snk.doneFlushing)
5465
return
5566
}
67+
var cerr *connect.Error
68+
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
69+
close(snk.doneFlushing)
70+
notifyUnableToIngest(err)
71+
return
72+
}
5673
if err != nil {
5774
ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err))
5875
}
@@ -92,6 +109,10 @@ func (snk *ConnectBidiStreamSink) connectAndHandleBuffer(
92109
stream = client.IngestBidiStream(ctx)
93110
firstReq := &v1.IngestBidiStreamRequest{Events: buffered, MachineId: machineID, ResumeSessionId: resumeSessionID}
94111
if err := stream.Send(firstReq); err != nil {
112+
var cerr *connect.Error
113+
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
114+
return false, cerr
115+
}
95116
return true, fmt.Errorf("creating ingestion stream: %w", err)
96117
}
97118
return false, nil

pkg/sink/logsvcsink/stream_sink.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package logsvcsink
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"log/slog"
@@ -37,6 +38,7 @@ func StartStreamSink(
3738
bufferSize int,
3839
drainBufferFor time.Duration,
3940
dropIfFull bool,
41+
notifyUnableToIngest func(err error),
4042
) *ConnectStreamSink {
4143

4244
snk := &ConnectStreamSink{
@@ -64,6 +66,12 @@ func StartStreamSink(
6466
close(snk.doneFlushing)
6567
return
6668
}
69+
var cerr *connect.Error
70+
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
71+
close(snk.doneFlushing)
72+
notifyUnableToIngest(err)
73+
return
74+
}
6775
if err != nil {
6876
ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err))
6977
}
@@ -96,14 +104,18 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
96104
buffered []*typesv1.LogEvent,
97105
sessionID uint64,
98106
heartbeatEvery time.Duration,
99-
) (lastBuffer []*typesv1.LogEvent, _ uint64, _ time.Duration, _ error) {
107+
) (lastBuffer []*typesv1.LogEvent, _ uint64, _ time.Duration, sendErr error) {
100108
ll := snk.ll
101109
ll.DebugContext(ctx, "contacting log ingestor")
102110
var stream *connect.ClientStreamForClient[v1.IngestStreamRequest, v1.IngestStreamResponse]
103111
err := retry.Do(ctx, func(ctx context.Context) (bool, error) {
104112

105113
hbRes, err := client.GetHeartbeat(ctx, connect.NewRequest(&v1.GetHeartbeatRequest{MachineId: &machineID}))
106114
if err != nil {
115+
var cerr *connect.Error
116+
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
117+
return false, cerr
118+
}
107119
return true, fmt.Errorf("requesting heartbeat config from ingestor: %v", err)
108120
}
109121
heartbeatEvery = hbRes.Msg.HeartbeatIn.AsDuration()
@@ -124,6 +136,13 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
124136
defer func() {
125137
res, err := stream.CloseAndReceive()
126138
if err != nil {
139+
var cerr *connect.Error
140+
if errors.Is(sendErr, io.EOF) && errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
141+
sendErr = cerr
142+
ll.ErrorContext(ctx, "no active plan, can't ingest logs", slog.Any("err", err))
143+
return
144+
}
145+
127146
ll.ErrorContext(ctx, "closing and receiving response for log ingestor session", slog.Any("err", err))
128147
return
129148
}
@@ -186,7 +205,7 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
186205
// until it's empty, then send what we have
187206
}
188207
start := time.Now()
189-
err := stream.Send(req)
208+
sendErr = stream.Send(req)
190209
dur := time.Since(start)
191210
ll.DebugContext(ctx, "sent logs",
192211
slog.String("sink", snk.name),
@@ -196,8 +215,8 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
196215
slog.Int("buffer_size", bufferSize),
197216
slog.Int64("drain_for_ms", drainBufferFor.Milliseconds()),
198217
)
199-
if err != nil {
200-
return req.Events, sessionID, heartbeatEvery, err
218+
if sendErr != nil {
219+
return req.Events, sessionID, heartbeatEvery, sendErr
201220
}
202221
req.Events = req.Events[:0:len(req.Events)]
203222
}

pkg/sink/logsvcsink/unary_sink.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package logsvcsink
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"log/slog"
@@ -37,6 +38,7 @@ func StartUnarySink(
3738
bufferSize int,
3839
drainBufferFor time.Duration,
3940
dropIfFull bool,
41+
notifyUnableToIngest func(err error),
4042
) *ConnectUnarySink {
4143
snk := &ConnectUnarySink{
4244
ll: ll.With(
@@ -63,6 +65,12 @@ func StartUnarySink(
6365
close(snk.doneFlushing)
6466
return
6567
}
68+
var cerr *connect.Error
69+
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
70+
close(snk.doneFlushing)
71+
notifyUnableToIngest(err)
72+
return
73+
}
6674
if err != nil {
6775
ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err))
6876
}
@@ -101,6 +109,10 @@ func (snk *ConnectUnarySink) connectAndHandleBuffer(
101109
err := retry.Do(ctx, func(ctx context.Context) (bool, error) {
102110
hbRes, err := client.GetHeartbeat(ctx, connect.NewRequest(&v1.GetHeartbeatRequest{MachineId: &machineID}))
103111
if err != nil {
112+
var cerr *connect.Error
113+
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
114+
return false, cerr
115+
}
104116
return true, fmt.Errorf("requesting heartbeat config from ingestor: %v", err)
105117
}
106118
heartbeatEvery = hbRes.Msg.HeartbeatIn.AsDuration()

0 commit comments

Comments
 (0)