Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .chloggen/persist-request-context-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: QueueBatchEncoding interface is changed to support marshaling and unmarshaling of request context.

# One or more tracking issues or pull requests related to the change
issues: [13188]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
28 changes: 28 additions & 0 deletions .chloggen/persist-request-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add an option to preserve request span context in the persistent queue

# One or more tracking issues or pull requests related to the change
issues: [11740]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding
`--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by
a previous version of the collector (or by a collector with the feature gate disabled) can be read by a newer
collector with the feature enabled. However, the reverse is not supported: a buffer stored by a newer collector with
the feature enabled cannot be read by an older collector (or by a collector with the feature gate disabled).

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ coverage:

ignore:
- "pdata/internal/data/protogen/**/*"
- "**/*.pb.go"
- "cmd/mdatagen/third_party/**/*"
1 change: 1 addition & 0 deletions .github/workflows/utils/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@
"xexporter",
"xexporterhelper",
"xextension",
"xpdata",
"xpipeline",
"xprocessor",
"xprocessorhelper",
Expand Down
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,9 @@ genpdata:
pushd pdata/ && $(GOCMD) run ./internal/cmd/pdatagen/main.go && popd
$(MAKE) fmt

INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queue
# INTERNAL_PROTO_SRC_DIRS += path/to/other/proto/dirs
INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queue pdata/xpdata/request/internal
INTERNAL_PROTO_FILES := $(foreach dir,$(INTERNAL_PROTO_SRC_DIRS),$(wildcard $(dir)/*.proto))
INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} --go_out=${PWD}
INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} -I/usr/include/github.com/gogo/protobuf -I${PWD}/$(PROTO_INTERMEDIATE_DIR) --go_out=${PWD}

.PHONY: genproto_internal
genproto_internal:
Expand Down
1 change: 1 addition & 0 deletions cmd/builder/internal/builder/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var replaceModules = []string{
"/pdata",
"/pdata/testdata",
"/pdata/pprofile",
"/pdata/xpdata",
"/pipeline",
"/pipeline/xpipeline",
"/processor",
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ replaces:
- go.opentelemetry.io/collector/pdata => ../../pdata
- go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata
- go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile
- go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata
- go.opentelemetry.io/collector/pipeline => ../../pipeline
- go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline
- go.opentelemetry.io/collector/processor => ../../processor
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ require (
go.opentelemetry.io/collector/pdata v1.34.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.128.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.128.0 // indirect
go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/pipeline v0.128.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect
go.opentelemetry.io/collector/processor/processorhelper v0.128.0 // indirect
Expand Down Expand Up @@ -291,6 +292,8 @@ replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata

replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile

replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata

replace go.opentelemetry.io/collector/pipeline => ../../pipeline

replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline
Expand Down
3 changes: 3 additions & 0 deletions exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect
go.opentelemetry.io/collector/featuregate v1.34.0 // indirect
go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect
go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/pipeline v0.128.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect
go.opentelemetry.io/collector/receiver v1.34.0 // indirect
Expand Down Expand Up @@ -127,3 +128,5 @@ replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xe
replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry

replace go.opentelemetry.io/collector/client => ../../client

replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ func newFakeQueueBatch() QueueBatchSettings[request.Request] {

type fakeEncoding struct{}

func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
func (f fakeEncoding) Marshal(context.Context, request.Request) ([]byte, error) {
return []byte("mockRequest"), nil
}

func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
return &requesttest.FakeRequest{}, nil
func (f fakeEncoding) Unmarshal([]byte) (context.Context, request.Request, error) {
return context.Background(), &requesttest.FakeRequest{}, nil
}
15 changes: 15 additions & 0 deletions exporter/exporterhelper/internal/oteltest/tracetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
Expand All @@ -19,3 +20,17 @@ func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
require.Equal(t, codes.Unset, sd.Status().Code)
}
}

func FakeSpanContext(t *testing.T) trace.SpanContext {
traceID, err := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10")
require.NoError(t, err)
spanID, err := trace.SpanIDFromHex("0102030405060708")
require.NoError(t, err)
return trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: 0x01,
TraceState: trace.TraceState{},
Remote: true,
})
}
21 changes: 21 additions & 0 deletions exporter/exporterhelper/internal/queue/fg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"

import "go.opentelemetry.io/collector/featuregate"

// PersistRequestContextFeatureGate controls whether request context should be preserved in the persistent queue.
var PersistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.PersistRequestContext",
featuregate.StageAlpha,
featuregate.WithRegisterFromVersion("v0.128.0"),
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
)

// assign the feature gate to separate valiables to make it possible to override the behavior in tests
// on write and read paths separately.
var (
PersistRequestContextOnRead = PersistRequestContextFeatureGate.IsEnabled()
PersistRequestContextOnWrite = PersistRequestContextFeatureGate.IsEnabled()
)
19 changes: 10 additions & 9 deletions exporter/exporterhelper/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
}
}

reqBuf, err := pq.set.encoding.Marshal(req)
reqBuf, err := pq.set.encoding.Marshal(ctx, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don

// Read until either a successful retrieved element or no more elements in the storage.
for pq.metadata.ReadIndex != pq.metadata.WriteIndex {
index, req, consumed := pq.getNextItem(ctx)
index, req, reqCtx, consumed := pq.getNextItem(ctx)
// Ensure the used size and the channel size are in sync.
if pq.metadata.ReadIndex == pq.metadata.WriteIndex {
pq.metadata.QueueSize = 0
Expand All @@ -303,7 +303,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
if consumed {
id := indexDonePool.Get().(*indexDone)
id.reset(index, pq.set.sizer.Sizeof(req), pq)
return context.Background(), req, id, true
return reqCtx, req, id, true
}
}

Expand All @@ -316,7 +316,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
// returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, context.Context, bool) {
index := pq.metadata.ReadIndex
// Increase here, so even if errors happen below, it always iterates
pq.metadata.ReadIndex++
Expand All @@ -328,8 +328,9 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
getOp)

var request T
restoredCtx := context.Background()
if err == nil {
request, err = pq.set.encoding.Unmarshal(getOp.Value)
restoredCtx, request, err = pq.set.encoding.Unmarshal(getOp.Value)
}

if err != nil {
Expand All @@ -339,14 +340,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

return 0, request, false
return 0, request, restoredCtx, false
}

// Increase the reference count, so the client is not closed while the request is being processed.
// The client cannot be closed because we hold the lock since last we checked `stopped`.
pq.refClient++

return index, request, true
return index, request, restoredCtx, true
}

// onDone should be called to remove the item of the given index from the queue once processing is finished.
Expand Down Expand Up @@ -438,13 +439,13 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
continue
}
req, err := pq.set.encoding.Unmarshal(op.Value)
reqCtx, req, err := pq.set.encoding.Unmarshal(op.Value)
// If error happened or item is nil, it will be efficiently ignored
if err != nil {
pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
continue
}
if pq.putInternal(ctx, req) != nil {
if pq.putInternal(reqCtx, req) != nil {
errCount++
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ func (is *itemsSizer) Sizeof(val uint64) int64 {

type uint64Encoding struct{}

func (uint64Encoding) Marshal(val uint64) ([]byte, error) {
func (uint64Encoding) Marshal(_ context.Context, val uint64) ([]byte, error) {
return binary.LittleEndian.AppendUint64([]byte{}, val), nil
}

func (uint64Encoding) Unmarshal(bytes []byte) (uint64, error) {
func (uint64Encoding) Unmarshal(bytes []byte) (context.Context, uint64, error) {
if len(bytes) < 8 {
return 0, errInvalidValue
return context.Background(), 0, errInvalidValue
}
return binary.LittleEndian.Uint64(bytes), nil
return context.Background(), binary.LittleEndian.Uint64(bytes), nil
}

func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient {
Expand Down Expand Up @@ -913,7 +913,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {
}

func TestPersistentQueue_StorageFull(t *testing.T) {
marshaled, err := uint64Encoding{}.Marshal(uint64(50))
marshaled, err := uint64Encoding{}.Marshal(context.Background(), uint64(50))
require.NoError(t, err)
maxSizeInBytes := len(marshaled) * 5 // arbitrary small number

Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

type Encoding[T any] interface {
// Marshal is a function that can marshal a request into bytes.
Marshal(T) ([]byte, error)
Marshal(context.Context, T) ([]byte, error)

// Unmarshal is a function that can unmarshal bytes into a request.
Unmarshal([]byte) (T, error)
Unmarshal([]byte) (context.Context, T, error)
}

// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ type fakeEncoding struct {
mr request.Request
}

func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
func (f fakeEncoding) Marshal(context.Context, request.Request) ([]byte, error) {
return []byte("mockRequest"), nil
}

func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
return f.mr, nil
func (f fakeEncoding) Unmarshal([]byte) (context.Context, request.Request, error) {
return context.Background(), f.mr, nil
}

func newFakeEncoding(mr request.Request) queue.Encoding[request.Request] {
Expand Down
28 changes: 23 additions & 5 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/pdata/plog"
pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request"
"go.opentelemetry.io/collector/pipeline"
)

Expand Down Expand Up @@ -57,16 +59,32 @@

type logsEncoding struct{}

func (logsEncoding) Unmarshal(bytes []byte) (Request, error) {
var _ QueueBatchEncoding[Request] = logsEncoding{}

func (logsEncoding) Unmarshal(bytes []byte) (context.Context, Request, error) {
if queue.PersistRequestContextOnRead {
ctx, logs, err := pdatareq.UnmarshalLogs(bytes)
if errors.Is(err, pdatareq.ErrInvalidFormat) {
// fall back to unmarshaling without context
logs, err = logsUnmarshaler.UnmarshalLogs(bytes)
}
return ctx, newLogsRequest(logs), err
}

logs, err := logsUnmarshaler.UnmarshalLogs(bytes)
if err != nil {
return nil, err
var req Request
return context.Background(), req, err

Check warning on line 77 in exporter/exporterhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/logs.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
return newLogsRequest(logs), nil
return context.Background(), newLogsRequest(logs), nil
}

func (logsEncoding) Marshal(req Request) ([]byte, error) {
return logsMarshaler.MarshalLogs(req.(*logsRequest).ld)
func (logsEncoding) Marshal(ctx context.Context, req Request) ([]byte, error) {
logs := req.(*logsRequest).ld
if queue.PersistRequestContextOnWrite {
return pdatareq.MarshalLogs(ctx, logs)
}
return logsMarshaler.MarshalLogs(logs)
}

func (req *logsRequest) OnError(err error) Request {
Expand Down
Loading
Loading