Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a5db348
Modify ForceFlush to abort after timeout/cancellation
humivo Mar 30, 2021
98ae772
Add newline to end of file
humivo Mar 30, 2021
54e1a6d
Change PR number in changelog for now
humivo Mar 30, 2021
23908b4
Merge branch 'main' into 1618-ForceFlushAborts
humivo Mar 30, 2021
65cf934
Fix up comment
humivo Mar 30, 2021
3a432e9
Merge branch '1618-ForceFlushAborts' of https://github.com/humivo/ope…
humivo Mar 30, 2021
05df2dc
Merge branch 'main' into 1618-ForceFlushAborts
humivo Mar 30, 2021
aedba66
Extend unit test to show that ForceFlush works and succeeds
humivo Mar 30, 2021
c066037
Add more options to make test more consistent
humivo Mar 30, 2021
d603409
Add space for readability
humivo Mar 30, 2021
3886019
Lower spans to make test clearer
humivo Mar 30, 2021
5495ecd
Merge branch 'main' into 1618-ForceFlushAborts
humivo Mar 30, 2021
6ed25e6
Change number of spans in test
humivo Mar 31, 2021
2403c58
Include a goroutine for the forceflush
humivo Mar 31, 2021
5a9d603
Ran gofmt on test file
humivo Mar 31, 2021
dac4b35
Try waiting for generating span to finish
humivo Mar 31, 2021
7e4bc40
Test to see if number of exported spans is within a range
humivo Mar 31, 2021
1b4271e
Improve test feedback to give more detail about what went wrong
humivo Mar 31, 2021
776fb17
Take out absolute value difference and test only if less than and wit…
humivo Mar 31, 2021
8bc967d
Merge branch 'main' into 1618-ForceFlushAborts
humivo Mar 31, 2021
93e2ec4
Merge branch 'main' into 1618-ForceFlushAborts
MrAlias Apr 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
The existing `ParentSpanID` and `HasRemoteParent` fields are removed in favor of this. (#1748)
- The `ParentContext` field of the `"go.opentelemetry.io/otel/sdk/trace".SamplingParameters` is updated to hold a `context.Context` containing the parent span.
This changes it to make `SamplingParameters` conform with the OpenTelemetry specification. (#1749)
- Modify `BatchSpanProcessor.ForceFlush` to abort after timeout/cancellation. (#1757)
- Improve OTLP/gRPC exporter connection errors. (#1737)

### Removed
Expand Down
18 changes: 17 additions & 1 deletion sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,23 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
return bsp.exportSpans(ctx)
var err error
if bsp.e != nil {
wait := make(chan struct{})
go func() {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
close(wait)
}()
// Wait until the export is finished or the context is cancelled/timed out
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
}
return err
}

func WithMaxQueueSize(size int) BatchSpanProcessorOption {
Expand Down
69 changes: 69 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package trace_test
import (
"context"
"encoding/binary"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -257,3 +258,71 @@ func TestBatchSpanProcessorShutdown(t *testing.T) {
}
assert.Equal(t, 1, bp.shutdownCount)
}

func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
te := testBatchExporter{}
tp := basicTracerProvider(t)
option := testOption{
name: "default BatchSpanProcessorOptions",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(3000),
},
wantNumSpans: 2053,
wantBatchCount: 1,
genNumSpans: 2053,
}
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOption")
generateSpan(t, option.parallel, tr, option)

// Force flush any held span batches
err := ssp.ForceFlush(context.Background())

gotNumOfSpans := te.len()
spanDifference := option.wantNumSpans - gotNumOfSpans
if spanDifference > 10 || spanDifference < 0 {
t.Errorf("number of exported span not equal to or within 10 less than: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}
gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
assert.NoError(t, err)
}

func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
// Add timeout to context to test deadline
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel()
<-ctx.Done()

if err := bsp.ForceFlush(ctx); err == nil {
t.Error("expected context DeadlineExceeded error, got nil")
} else if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected context DeadlineExceeded error, got %v", err)
}
}

func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
ctx, cancel := context.WithCancel(context.Background())
// Cancel the context
cancel()

if err := bsp.ForceFlush(ctx); err == nil {
t.Error("expected context canceled error, got nil")
} else if !errors.Is(err, context.Canceled) {
t.Errorf("expected context canceled error, got %v", err)
}
}