Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .chloggen/persist-request-context.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ change_type: enhancement
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Preserve request span context and client metadata in the persistent queue.
note: Preserve request span context and client information in the persistent queue.

# One or more tracking issues or pull requests related to the change
issues: [11740, 13220]
issues: [11740, 13220, 13232]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
It allows internal collector spans and client metadata to propagate through the persistent queue used by
It allows internal collector spans and client information to propagate through the persistent queue used by
the exporters. The same way as it's done for the in-memory queue.
Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding
`--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by
Expand Down
92 changes: 78 additions & 14 deletions pdata/xpdata/request/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package request // import "go.opentelemetry.io/collector/pdata/xpdata/request"

import (
"context"
"net"

"go.opentelemetry.io/otel/trace"

Expand All @@ -19,20 +20,21 @@ var readOnlyState = pdataint.StateReadOnly

// encodeContext encodes the context into a map of strings.
func encodeContext(ctx context.Context) internal.RequestContext {
return internal.RequestContext{
SpanContext: encodeSpanContext(ctx),
ClientMetadata: encodeClientMetadata(ctx),
}
rc := internal.RequestContext{}
encodeSpanContext(ctx, &rc)
encodeClientMetadata(ctx, &rc)
encodeClientAddress(ctx, &rc)
return rc
}

func encodeSpanContext(ctx context.Context) *internal.SpanContext {
func encodeSpanContext(ctx context.Context, rc *internal.RequestContext) {
spanCtx := trace.SpanContextFromContext(ctx)
if !spanCtx.IsValid() {
return nil
return
}
traceID := spanCtx.TraceID()
spanID := spanCtx.SpanID()
return &internal.SpanContext{
rc.SpanContext = &internal.SpanContext{
TraceId: traceID[:],
SpanId: spanID[:],
TraceFlags: uint32(spanCtx.TraceFlags()),
Expand All @@ -41,7 +43,7 @@ func encodeSpanContext(ctx context.Context) *internal.SpanContext {
}
}

func encodeClientMetadata(ctx context.Context) []protocommon.KeyValue {
func encodeClientMetadata(ctx context.Context, rc *internal.RequestContext) {
clientMetadata := client.FromContext(ctx).Metadata
metadataMap, metadataFound := pcommon.Map{}, false
for k := range clientMetadata.Keys() {
Expand All @@ -54,9 +56,35 @@ func encodeClientMetadata(ctx context.Context) []protocommon.KeyValue {
}
}
if metadataFound {
return *pdataint.GetOrigMap(pdataint.Map(metadataMap))
rc.ClientMetadata = *pdataint.GetOrigMap(pdataint.Map(metadataMap))
}
}

func encodeClientAddress(ctx context.Context, rc *internal.RequestContext) {
switch a := client.FromContext(ctx).Addr.(type) {
case *net.IPAddr:
rc.ClientAddress = &internal.RequestContext_Ip{Ip: &internal.IPAddr{
Ip: a.IP,
Zone: a.Zone,
}}
case *net.TCPAddr:
rc.ClientAddress = &internal.RequestContext_Tcp{Tcp: &internal.TCPAddr{
Ip: a.IP,
Port: int32(a.Port), //nolint:gosec // G115
Zone: a.Zone,
}}
case *net.UDPAddr:
rc.ClientAddress = &internal.RequestContext_Udp{Udp: &internal.UDPAddr{
Ip: a.IP,
Port: int32(a.Port), //nolint:gosec // G115
Zone: a.Zone,
}}
case *net.UnixAddr:
rc.ClientAddress = &internal.RequestContext_Unix{Unix: &internal.UnixAddr{
Name: a.Name,
Net: a.Net,
}}
}
return nil
}

// decodeContext decodes the context from the bytes map.
Expand All @@ -65,7 +93,15 @@ func decodeContext(ctx context.Context, rc *internal.RequestContext) context.Con
return ctx
}
ctx = decodeSpanContext(ctx, rc.SpanContext)
return decodeClientMetadata(ctx, rc.ClientMetadata)
metadataMap := decodeClientMetadata(rc.ClientMetadata)
clientAddress := decodeClientAddress(rc)
if len(metadataMap) > 0 || clientAddress != nil {
ctx = client.NewContext(ctx, client.Info{
Metadata: client.NewMetadata(metadataMap),
Addr: clientAddress,
})
}
return ctx
}

func decodeSpanContext(ctx context.Context, sc *internal.SpanContext) context.Context {
Expand All @@ -86,9 +122,9 @@ func decodeSpanContext(ctx context.Context, sc *internal.SpanContext) context.Co
}))
}

func decodeClientMetadata(ctx context.Context, clientMetadata []protocommon.KeyValue) context.Context {
func decodeClientMetadata(clientMetadata []protocommon.KeyValue) map[string][]string {
if len(clientMetadata) == 0 {
return ctx
return nil
}
metadataMap := make(map[string][]string, len(clientMetadata))
for k, vals := range pcommon.Map(pdataint.NewMap(&clientMetadata, &readOnlyState)).All() {
Expand All @@ -97,5 +133,33 @@ func decodeClientMetadata(ctx context.Context, clientMetadata []protocommon.KeyV
metadataMap[k][i] = v.Str()
}
}
return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(metadataMap)})
return metadataMap
}

func decodeClientAddress(rc *internal.RequestContext) net.Addr {
switch a := rc.ClientAddress.(type) {
case *internal.RequestContext_Ip:
return &net.IPAddr{
IP: a.Ip.Ip,
Zone: a.Ip.Zone,
}
case *internal.RequestContext_Tcp:
return &net.TCPAddr{
IP: a.Tcp.Ip,
Port: int(a.Tcp.Port),
Zone: a.Tcp.Zone,
}
case *internal.RequestContext_Udp:
return &net.UDPAddr{
IP: a.Udp.Ip,
Port: int(a.Udp.Port),
Zone: a.Udp.Zone,
}
case *internal.RequestContext_Unix:
return &net.UnixAddr{
Name: a.Unix.Name,
Net: a.Unix.Net,
}
}
return nil
}
85 changes: 68 additions & 17 deletions pdata/xpdata/request/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package request // import "go.opentelemetry.io/collector/pdata/xpdata/request"

import (
"context"
"net"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -21,24 +22,74 @@ func TestEncodeDecodeContext(t *testing.T) {
"key1": {"value1"},
"key2": {"value2", "value3"},
})
tests := []struct {
name string
clientInfo client.Info
}{
{
name: "without_client_address",
clientInfo: client.Info{Metadata: clientMetadata},
},
{
name: "with_client_IP_address",
clientInfo: client.Info{
Metadata: clientMetadata,
Addr: &net.IPAddr{
IP: net.IPv6loopback,
Zone: "eth0",
},
},
},
{
name: "with_client_TCP_address",
clientInfo: client.Info{
Metadata: clientMetadata,
Addr: &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8080,
},
},
},
{
name: "with_client_UDP_address",
clientInfo: client.Info{
Metadata: clientMetadata,
Addr: &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8080,
},
},
},
{
name: "with_client_unix_address",
clientInfo: client.Info{
Metadata: clientMetadata,
Addr: &net.UnixAddr{
Name: "/var/run/test.sock",
Net: "unixpacket",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Encode a context with a span and client metadata
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
ctx = client.NewContext(ctx, tt.clientInfo)
reqCtx := encodeContext(ctx)
buf, err := reqCtx.Marshal()
require.NoError(t, err)

// Encode a context with a span and client metadata
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
ctx = client.NewContext(ctx, client.Info{
Metadata: clientMetadata,
})
reqCtx := encodeContext(ctx)
buf, err := reqCtx.Marshal()
require.NoError(t, err)

// Decode the context
gotReqCtx := internal.RequestContext{}
err = gotReqCtx.Unmarshal(buf)
require.NoError(t, err)
gotCtx := decodeContext(context.Background(), &gotReqCtx)
assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx))
assert.Equal(t, clientMetadata, client.FromContext(gotCtx).Metadata)
// Decode the context
gotReqCtx := internal.RequestContext{}
err = gotReqCtx.Unmarshal(buf)
require.NoError(t, err)
gotCtx := decodeContext(context.Background(), &gotReqCtx)
assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx))
assert.Equal(t, tt.clientInfo, client.FromContext(gotCtx))
})
}

// Decode nil request context
// Decode a nil context
assert.Equal(t, context.Background(), decodeContext(context.Background(), nil))
}
Loading
Loading