Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit c231ed2

Browse files
committed
Support for Jaeger tags with binary value
Since we are using the OTEL translator, when ingesting Jaeger traces all the tags with `VType=ValueType_BINARY` are being encoded into base64 and stored as strings. The context of the type of value is lost after this translation occurs, meaning that, when we return those tags in a Jaeger query we return the base64 string as `VType=ValueType_STRING`. To preserve the type of the value, the prefix `data:application/octet-stream; base64,` will be added to the base64 string before storing in the DB. When retrieving a trace we look for tags that are string and have this prefix, we remove it and return a `ValueType_BINARY` with the result of decoding the base64 string. The prefix was choosen from the `The "data" URL scheme` RFC https://www.rfc-editor.org/rfc/rfc2397#section-2 . Jaeger allows these type of tags for Spans, Process and Logs.
1 parent 0f12a84 commit c231ed2

File tree

11 files changed

+1728
-35
lines changed

11 files changed

+1728
-35
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ We use the following categories for changes:
2727
### Changed
2828
- Log throughput in the same line for samples, spans and metric metadata [#1643]
2929
- The `chunks_created` metrics was removed. [#1634]
30+
- When querying for Jaeger tags with binary values the binary data will be
31+
returned instead of the base64 representation of the string [#1649].
3032

3133
### Fixed
3234
- Do not collect telemetry if `timescaledb.telemetry_level=off` [#1612]

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/jackc/pgtype v1.12.0
2626
github.com/jackc/pgx/v4 v4.17.0
2727
github.com/jaegertracing/jaeger v1.38.0
28+
github.com/kr/pretty v0.3.0
2829
github.com/oklog/run v1.1.0
2930
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.57.2
3031
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
@@ -48,7 +49,7 @@ require (
4849
go.opentelemetry.io/otel v1.9.0
4950
go.opentelemetry.io/otel/exporters/jaeger v1.7.0
5051
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0
51-
go.opentelemetry.io/otel/sdk v1.7.0
52+
go.opentelemetry.io/otel/sdk v1.8.0
5253
go.opentelemetry.io/otel/trace v1.9.0
5354
go.uber.org/atomic v1.9.0
5455
go.uber.org/automaxprocs v1.5.1
@@ -127,6 +128,7 @@ require (
127128
github.com/jpillora/backoff v1.0.0 // indirect
128129
github.com/json-iterator/go v1.1.12 // indirect
129130
github.com/julienschmidt/httprouter v1.3.0 // indirect
131+
github.com/kr/text v0.2.0 // indirect
130132
github.com/magiconair/properties v1.8.6 // indirect
131133
github.com/mailru/easyjson v0.7.7 // indirect
132134
github.com/mattn/go-colorable v0.1.12 // indirect
@@ -156,6 +158,7 @@ require (
156158
github.com/prometheus/common/sigv4 v0.1.0 // indirect
157159
github.com/prometheus/exporter-toolkit v0.7.1 // indirect
158160
github.com/prometheus/procfs v0.8.0 // indirect
161+
github.com/rogpeppe/go-internal v1.6.2 // indirect
159162
github.com/rs/cors v1.8.2 // indirect
160163
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
161164
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect

go.sum

Lines changed: 1346 additions & 1 deletion
Large diffs are not rendered by default.

pkg/jaeger/store/binary_tags.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package store
2+
3+
import (
4+
"encoding/base64"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/jaegertracing/jaeger/model"
9+
)
10+
11+
const MEDIA_TYPE_ENCODED_BINARY = "data:application/octet-stream; base64,"
12+
const MEDIA_TYPE_ENCODED_BINARY_LEN = len(MEDIA_TYPE_ENCODED_BINARY)
13+
14+
// spanBinaryTags is the set of keys from Jaeger model.KeyValue items used in
15+
// model.Span.Tags, model.Process.Tags and model.Log.Fields that have
16+
// `VType=ValueType_BINARY` and had to be encoded to be store as strings.
17+
type spanBinaryTags struct {
18+
spanID int64
19+
spanTags map[string]struct{}
20+
processTags map[string]struct{}
21+
logsTags map[int]map[string]struct{}
22+
}
23+
24+
func (t *spanBinaryTags) isEmpty() bool {
25+
return len(t.spanTags) == 0 && len(t.logsTags) == 0 && len(t.processTags) == 0
26+
}
27+
28+
func isEncodedBinaryValue(v interface{}) bool {
29+
if val, isStr := v.(string); isStr {
30+
if strings.HasPrefix(val, MEDIA_TYPE_ENCODED_BINARY) {
31+
return true
32+
}
33+
}
34+
return false
35+
}
36+
37+
// decodeSpanBinaryTags decodes the tags with binary values that are present in
38+
// the binaryTags sets for the span, process and logs.
39+
//
40+
// The spanBinaryTags struct is necesary in order to not go through all the
41+
// tags for each Span that the query is returning. We are taking advantage on
42+
// the fact that when translating from OTEL to Jaeger we had to visit all the
43+
// tags and were able to construct this set.
44+
//
45+
// When writing binary tags we encode the slice of bytes into a base64 string
46+
// representation and add the prefix `__ValueType_BINARY__`. Decoding implies
47+
// removing the prefix and decoding the base64 string.
48+
func decodeSpanBinaryTags(span *model.Span, binaryTags *spanBinaryTags) {
49+
if len(binaryTags.spanTags) != 0 {
50+
decodeBinaryTags(span.Tags, binaryTags.spanTags)
51+
}
52+
53+
if len(binaryTags.processTags) != 0 {
54+
decodeBinaryTags(span.Process.Tags, binaryTags.processTags)
55+
}
56+
57+
if len(binaryTags.logsTags) != 0 {
58+
for i, logsTags := range binaryTags.logsTags {
59+
decodeBinaryTags(span.Logs[i].Fields, logsTags)
60+
}
61+
}
62+
}
63+
64+
func decodeBinaryTags(actualTags []model.KeyValue, binaryTagsToDecode map[string]struct{}) {
65+
for i, tag := range actualTags {
66+
_, ok := binaryTagsToDecode[tag.Key]
67+
if !ok {
68+
continue
69+
}
70+
71+
if tag.GetVType() != model.ValueType_STRING {
72+
continue
73+
}
74+
75+
encoded := tag.VStr
76+
if !strings.HasPrefix(encoded, MEDIA_TYPE_ENCODED_BINARY) {
77+
continue
78+
}
79+
80+
vBin, err := decodeBinaryTagValue(encoded)
81+
82+
// If we can't decode it means that we didn't encode it in the
83+
// first place, so we should keep it as is.
84+
if err != nil {
85+
continue
86+
}
87+
actualTags[i] = model.KeyValue{
88+
Key: tag.Key,
89+
VType: model.ValueType_BINARY,
90+
VBinary: vBin,
91+
}
92+
}
93+
}
94+
95+
func decodeBinaryTagValue(encoded string) ([]byte, error) {
96+
v := encoded[MEDIA_TYPE_ENCODED_BINARY_LEN:]
97+
return base64.StdEncoding.DecodeString(v)
98+
}
99+
100+
func encodeBinaryTagToStr(tag model.KeyValue) model.KeyValue {
101+
value := fmt.Sprintf("%s%s", MEDIA_TYPE_ENCODED_BINARY, base64.StdEncoding.EncodeToString(tag.GetVBinary()))
102+
return model.KeyValue{
103+
Key: tag.Key,
104+
VType: model.ValueType_STRING,
105+
VStr: value,
106+
}
107+
}
108+
109+
func encodeBinaryTags(span *model.Span) {
110+
for i, tag := range span.Tags {
111+
if !isBinaryTag(tag) {
112+
continue
113+
}
114+
span.Tags[i] = encodeBinaryTagToStr(tag)
115+
}
116+
117+
for _, log := range span.Logs {
118+
for i, tag := range log.Fields {
119+
if !isBinaryTag(tag) {
120+
continue
121+
}
122+
log.Fields[i] = encodeBinaryTagToStr(tag)
123+
}
124+
}
125+
126+
for i, tag := range span.Process.Tags {
127+
if !isBinaryTag(tag) {
128+
continue
129+
}
130+
span.Process.Tags[i] = encodeBinaryTagToStr(tag)
131+
}
132+
}
133+
134+
func isBinaryTag(tag model.KeyValue) bool {
135+
return tag.GetVType() == model.ValueType_BINARY
136+
}

pkg/jaeger/store/find_traces.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,19 @@ func findTraces(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, q *
3636

3737
func scanTraces(rows pgxconn.PgxRows) ([]*model.Trace, error) {
3838
traces := ptrace.NewTraces()
39+
spansBinaryTags := map[int64]*spanBinaryTags{}
3940
for rows.Next() {
4041
if rows.Err() != nil {
4142
return nil, fmt.Errorf("trace row iterator: %w", rows.Err())
4243
}
43-
if err := ScanRow(rows, &traces); err != nil {
44+
var err error
45+
spanBinaryTags, err := ScanRow(rows, &traces)
46+
if err != nil {
4447
return nil, fmt.Errorf("error scanning trace: %w", err)
4548
}
49+
if !spanBinaryTags.isEmpty() {
50+
spansBinaryTags[spanBinaryTags.spanID] = spanBinaryTags
51+
}
4652

4753
}
4854
if rows.Err() != nil {
@@ -54,10 +60,10 @@ func scanTraces(rows pgxconn.PgxRows) ([]*model.Trace, error) {
5460
return nil, fmt.Errorf("internal-traces-to-jaeger-proto: %w", err)
5561
}
5662

57-
return batchSliceToTraceSlice(batch), nil
63+
return batchSliceToTraceSlice(batch, spansBinaryTags), nil
5864
}
5965

60-
func batchSliceToTraceSlice(bSlice []*model.Batch) []*model.Trace {
66+
func batchSliceToTraceSlice(bSlice []*model.Batch, spansWithBinaryTag map[int64]*spanBinaryTags) []*model.Trace {
6167
// Mostly Copied from Jaeger's grpc_client.go
6268
// https://github.com/jaegertracing/jaeger/blob/067dff713ab635ade66315bbd05518d7b28f40c6/plugin/storage/grpc/shared/grpc_client.go#L179
6369
traces := make([]*model.Trace, 0)
@@ -74,6 +80,9 @@ func batchSliceToTraceSlice(bSlice []*model.Batch) []*model.Trace {
7480
}
7581
//copy over the process from the batch
7682
span.Process = batch.Process
83+
if binaryTags, ok := spansWithBinaryTag[int64(span.SpanID)]; ok {
84+
decodeSpanBinaryTags(span, binaryTags)
85+
}
7786
trace.Spans = append(trace.Spans, span)
7887
}
7988
}

pkg/jaeger/store/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (p *Store) StreamingSpanWriter() spanstore.Writer {
4949
}
5050

5151
func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
52+
encodeBinaryTags(span)
5253
batches := []*model.Batch{
5354
{
5455
Spans: []*model.Span{span},

0 commit comments

Comments
 (0)