Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

## Added

- Added `Marshler` config option to `otlphttp` to enable otlp over json or protobufs. (#1586)
### Removed

- Removed the exported `SimpleSpanProcessor` and `BatchSpanProcessor` structs.
Expand Down
24 changes: 20 additions & 4 deletions exporters/otlp/otlphttp/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"strings"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
colmetricspb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
Expand All @@ -37,7 +40,8 @@ import (
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)

const contentType = "application/x-protobuf"
const contentTypeProto = "application/x-protobuf"
const contentTypeJSON = "application/json"

// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
Expand Down Expand Up @@ -142,7 +146,7 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
pbRequest := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: rms,
}
rawRequest, err := pbRequest.Marshal()
rawRequest, err := d.marshal(pbRequest)
if err != nil {
return err
}
Expand All @@ -158,13 +162,21 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
pbRequest := &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
}
rawRequest, err := pbRequest.Marshal()
rawRequest, err := d.marshal(pbRequest)
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
}

func (d *driver) marshal(msg proto.Message) ([]byte, error) {
if d.cfg.marshaler == MarshalJSON {
s, err := (&jsonpb.Marshaler{}).MarshalToString(msg)
return []byte(s), err
}
return proto.Marshal(msg)
}

func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
var cancel context.CancelFunc
Expand Down Expand Up @@ -267,7 +279,11 @@ func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Head
headers.Set(k, v)
}
contentLength := (int64)(len(rawRequest))
headers.Set("Content-Type", contentType)
if d.cfg.marshaler == MarshalJSON {
headers.Set("Content-Type", contentTypeJSON)
} else {
headers.Set("Content-Type", contentTypeProto)
}
requestReader := bytes.NewBuffer(rawRequest)
switch d.cfg.compression {
case NoCompression:
Expand Down
6 changes: 6 additions & 0 deletions exporters/otlp/otlphttp/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func TestEndToEnd(t *testing.T) {
ExpectedHeaders: testHeaders,
},
},
{
name: "with json encoding",
opts: []otlphttp.Option{
otlphttp.WithMarshal(otlphttp.MarshalJSON),
},
},
}

for _, tc := range tests {
Expand Down
34 changes: 28 additions & 6 deletions exporters/otlp/otlphttp/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -109,15 +110,25 @@ func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectormetricpb.ExportMetricsServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {
request, err := unmarshalMetricsRequest(rawRequest, r.Header.Get("content-type"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.metricLock.Lock()
defer c.metricLock.Unlock()
c.metricsStorage.AddMetrics(&request)
c.metricsStorage.AddMetrics(request)
}

func unmarshalMetricsRequest(rawRequest []byte, contentType string) (*collectormetricpb.ExportMetricsServiceRequest, error) {
request := &collectormetricpb.ExportMetricsServiceRequest{}
if contentType == "application/json" {
err := jsonpb.UnmarshalString(string(rawRequest), request)
return request, err
}
err := request.Unmarshal(rawRequest)
return request, err
}

func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
Expand All @@ -140,15 +151,26 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectortracepb.ExportTraceServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {

request, err := unmarshalTraceRequest(rawRequest, r.Header.Get("content-type"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.spanLock.Lock()
defer c.spanLock.Unlock()
c.spansStorage.AddSpans(&request)
c.spansStorage.AddSpans(request)
}

func unmarshalTraceRequest(rawRequest []byte, contentType string) (*collectortracepb.ExportTraceServiceRequest, error) {
request := &collectortracepb.ExportTraceServiceRequest{}
if contentType == "application/json" {
err := jsonpb.UnmarshalString(string(rawRequest), request)
return request, err
}
err := request.Unmarshal(rawRequest)
return request, err
}

func (c *mockCollector) checkHeaders(r *http.Request) bool {
Expand Down
24 changes: 24 additions & 0 deletions exporters/otlp/otlphttp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ const (
DefaultBackoff time.Duration = 300 * time.Millisecond
)

// Marshaler describes the kind of message format sent to the collector
type Marshaler int

const (
// MarshalProto tells the driver to send using the protobuf binary format.
MarshalProto Marshaler = iota
// MarshalJSON tells the driver to send using json format.
MarshalJSON
)

type config struct {
endpoint string
compression Compression
Expand All @@ -58,6 +68,7 @@ type config struct {
tlsCfg *tls.Config
insecure bool
headers map[string]string
marshaler Marshaler
}

// Option applies an option to the HTTP driver.
Expand Down Expand Up @@ -201,3 +212,16 @@ func (headersOption) private() {}
func WithHeaders(headers map[string]string) Option {
return (headersOption)(headers)
}

type marshalerOption Marshaler

func (o marshalerOption) Apply(cfg *config) {
cfg.marshaler = Marshaler(o)
}
func (marshalerOption) private() {}

// WithMarshal tells the driver which wire format to use when sending to the
// collector. If unset, MarshalProto will be used
func WithMarshal(m Marshaler) Option {
return marshalerOption(m)
}