Skip to content

Commit a5db348

Browse files
committed
Modify ForceFlush to abort after timeout/cancellation
1 parent c61f4b6 commit a5db348

File tree

3 files changed

+48
-1
lines changed

3 files changed

+48
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1313
- Jaeger exporter was updated to use thrift v0.14.1. (#1712)
1414
- Migrate from using internally built and maintained version of the OTLP to the one hosted at `go.opentelemetry.io/proto/otlp`. (#1713)
1515
- Migrate from using `github.com/gogo/protobuf` to `google.golang.org/protobuf` to match `go.opentelemetry.io/proto/otlp`. (#1713)
16+
- Modify `BatchSpanProcessor.ForceFlush` to abort after timeout/cancellation (#1618)
1617

1718
### Removed
1819

sdk/trace/batch_span_processor.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,23 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
148148

149149
// ForceFlush exports all ended spans that have not yet been exported.
150150
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
151-
return bsp.exportSpans(ctx)
151+
var err error
152+
if bsp.e != nil {
153+
wait := make(chan struct{})
154+
go func() {
155+
if err := bsp.exportSpans(ctx); err != nil {
156+
otel.Handle(err)
157+
}
158+
close(wait)
159+
}()
160+
// Wait until the wait group is done or the context is cancelled/timed out
161+
select {
162+
case <-wait:
163+
case <-ctx.Done():
164+
err = ctx.Err()
165+
}
166+
}
167+
return err
152168
}
153169

154170
func WithMaxQueueSize(size int) BatchSpanProcessorOption {

sdk/trace/batch_span_processor_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,33 @@ func TestBatchSpanProcessorShutdown(t *testing.T) {
257257
}
258258
assert.Equal(t, 1, bp.shutdownCount)
259259
}
260+
261+
func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
262+
var bp testBatchExporter
263+
bsp := sdktrace.NewBatchSpanProcessor(&bp)
264+
265+
err := bsp.ForceFlush(context.Background())
266+
assert.Equal(t, nil, err)
267+
}
268+
269+
func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
270+
var bp testBatchExporter
271+
bsp := sdktrace.NewBatchSpanProcessor(&bp)
272+
// Add timeout to context to test deadline
273+
ctx, cancel := context.WithTimeout(context.Background(), 0)
274+
defer cancel()
275+
276+
err := bsp.ForceFlush(ctx)
277+
assert.Equal(t, context.DeadlineExceeded, err)
278+
}
279+
280+
func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
281+
var bp testBatchExporter
282+
bsp := sdktrace.NewBatchSpanProcessor(&bp)
283+
ctx, cancel := context.WithCancel(context.Background())
284+
// Cancel the context
285+
cancel()
286+
287+
err := bsp.ForceFlush(ctx)
288+
assert.Equal(t, context.Canceled, err)
289+
}

0 commit comments

Comments
 (0)