Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgr
go 1.23.0

require (
github.com/MrAlias/semconv-go v0.0.0-20250423184919-541c4b3517ee
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/metric v1.35.0
Expand Down
8 changes: 8 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
github.com/MrAlias/semconv-go v0.0.0-20250423175828-6e12f68bba93 h1:KTRJAnSqrCVeMcvnALrpyqSzIulOSdRvdDaufVo/Lx8=
github.com/MrAlias/semconv-go v0.0.0-20250423175828-6e12f68bba93/go.mod h1:WttzAU035NFUz87x02joKWkfKgjA5CkcYPIduTRGHNI=
github.com/MrAlias/semconv-go v0.0.0-20250423182713-dbae8bcc6cf4 h1:wBQ66sUvbSiVEFiGPVHiw0INW6KItxwY8RZuvONe3z0=
github.com/MrAlias/semconv-go v0.0.0-20250423182713-dbae8bcc6cf4/go.mod h1:WttzAU035NFUz87x02joKWkfKgjA5CkcYPIduTRGHNI=
github.com/MrAlias/semconv-go v0.0.0-20250423184450-beb08c266ed0 h1:r2lGWFKrBs30m7bo5sezjE++cuPJhZsogjgM7f1P0J4=
github.com/MrAlias/semconv-go v0.0.0-20250423184450-beb08c266ed0/go.mod h1:WttzAU035NFUz87x02joKWkfKgjA5CkcYPIduTRGHNI=
github.com/MrAlias/semconv-go v0.0.0-20250423184919-541c4b3517ee h1:m7DtUMvggKI+bMP6gxYVf4wCRrZsAr9YEEYblgUQm4Q=
github.com/MrAlias/semconv-go v0.0.0-20250423184919-541c4b3517ee/go.mod h1:WttzAU035NFUz87x02joKWkfKgjA5CkcYPIduTRGHNI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down
144 changes: 46 additions & 98 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"

"github.com/MrAlias/semconv-go/rpcconv"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
Expand All @@ -17,7 +18,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"

Expand All @@ -38,11 +38,11 @@ type serverHandler struct {

tracer trace.Tracer

duration metric.Float64Histogram
inSize metric.Int64Histogram
outSize metric.Int64Histogram
inMsg metric.Int64Histogram
outMsg metric.Int64Histogram
duration rpcconv.ServerDuration
inSize rpcconv.ServerRequestSize
outSize rpcconv.ServerResponseSize
inMsg rpcconv.ServerRequestsPerRPC
outMsg rpcconv.ServerResponsesPerRPC
}

// NewServerHandler creates a stats.Handler for a gRPC server.
Expand All @@ -62,66 +62,30 @@ func NewServerHandler(opts ...Option) stats.Handler {
)

var err error
h.duration, err = meter.Float64Histogram(
"rpc.server.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"),
)
h.duration, err = rpcconv.NewServerDuration(meter)
if err != nil {
otel.Handle(err)
if h.duration == nil {
h.duration = noop.Float64Histogram{}
}
}

h.inSize, err = meter.Int64Histogram(
"rpc.server.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"),
)
h.inSize, err = rpcconv.NewServerRequestSize(meter)
if err != nil {
otel.Handle(err)
if h.inSize == nil {
h.inSize = noop.Int64Histogram{}
}
}

h.outSize, err = meter.Int64Histogram(
"rpc.server.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"),
)
h.outSize, err = rpcconv.NewServerResponseSize(meter)
if err != nil {
otel.Handle(err)
if h.outSize == nil {
h.outSize = noop.Int64Histogram{}
}
}

h.inMsg, err = meter.Int64Histogram(
"rpc.server.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"),
)
h.inMsg, err = rpcconv.NewServerRequestsPerRPC(meter)
if err != nil {
otel.Handle(err)
if h.inMsg == nil {
h.inMsg = noop.Int64Histogram{}
}
}

h.outMsg, err = meter.Int64Histogram(
"rpc.server.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"),
)
h.outMsg, err = rpcconv.NewServerResponsesPerRPC(meter)
if err != nil {
otel.Handle(err)
if h.outMsg == nil {
h.outMsg = noop.Int64Histogram{}
}
}

return h
}

Expand Down Expand Up @@ -165,19 +129,28 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.handleRPC(ctx, rs, h.duration, h.inSize, h.outSize, h.inMsg, h.outMsg, serverStatus)
h.handleRPC(
ctx,
rs,
h.duration.Inst(),
h.inSize,
h.outSize,
h.inMsg.Inst(),
h.outMsg.Inst(),
serverStatus,
)
}

type clientHandler struct {
*config

tracer trace.Tracer

duration metric.Float64Histogram
inSize metric.Int64Histogram
outSize metric.Int64Histogram
inMsg metric.Int64Histogram
outMsg metric.Int64Histogram
duration rpcconv.ClientDuration
inSize rpcconv.ClientResponseSize
outSize rpcconv.ClientRequestSize
inMsg rpcconv.ClientResponsesPerRPC
outMsg rpcconv.ClientRequestsPerRPC
}

// NewClientHandler creates a stats.Handler for a gRPC client.
Expand All @@ -197,66 +170,30 @@ func NewClientHandler(opts ...Option) stats.Handler {
)

var err error
h.duration, err = meter.Float64Histogram(
"rpc.client.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"),
)
h.duration, err = rpcconv.NewClientDuration(meter)
if err != nil {
otel.Handle(err)
if h.duration == nil {
h.duration = noop.Float64Histogram{}
}
}

h.outSize, err = meter.Int64Histogram(
"rpc.client.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"),
)
h.inSize, err = rpcconv.NewClientResponseSize(meter)
if err != nil {
otel.Handle(err)
if h.outSize == nil {
h.outSize = noop.Int64Histogram{}
}
}

h.inSize, err = meter.Int64Histogram(
"rpc.client.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"),
)
h.outSize, err = rpcconv.NewClientRequestSize(meter)
if err != nil {
otel.Handle(err)
if h.inSize == nil {
h.inSize = noop.Int64Histogram{}
}
}

h.outMsg, err = meter.Int64Histogram(
"rpc.client.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"),
)
h.inMsg, err = rpcconv.NewClientResponsesPerRPC(meter)
if err != nil {
otel.Handle(err)
if h.outMsg == nil {
h.outMsg = noop.Int64Histogram{}
}
}

h.inMsg, err = meter.Int64Histogram(
"rpc.client.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"),
)
h.outMsg, err = rpcconv.NewClientRequestsPerRPC(meter)
if err != nil {
otel.Handle(err)
if h.inMsg == nil {
h.inMsg = noop.Int64Histogram{}
}
}

return h
}

Expand Down Expand Up @@ -290,7 +227,13 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.handleRPC(
ctx, rs, h.duration, h.inSize, h.outSize, h.inMsg, h.outMsg,
ctx,
rs,
h.duration.Inst(),
h.inSize,
h.outSize,
h.inMsg.Inst(),
h.outMsg.Inst(),
func(s *status.Status) (codes.Code, string) {
return codes.Error, s.Message()
},
Expand All @@ -307,11 +250,16 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

type int64Hist interface {
Record(context.Context, int64, ...attribute.KeyValue)
}

func (c *config) handleRPC(
ctx context.Context,
rs stats.RPCStats,
duration metric.Float64Histogram,
inSize, outSize, inMsg, outMsg metric.Int64Histogram,
inSize, outSize int64Hist,
inMsg, outMsg metric.Int64Histogram,
recordStatus func(*status.Status) (codes.Code, string),
) {
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
Expand All @@ -327,7 +275,7 @@ func (c *config) handleRPC(
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.inMessages, 1)
inSize.Record(ctx, int64(rs.Length), metric.WithAttributes(gctx.metricAttrs...))
inSize.Record(ctx, int64(rs.Length), gctx.metricAttrs...)
}

if c.ReceivedEvent && span.IsRecording() {
Expand All @@ -343,7 +291,7 @@ func (c *config) handleRPC(
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.outMessages, 1)
outSize.Record(ctx, int64(rs.Length), metric.WithAttributes(gctx.metricAttrs...))
outSize.Record(ctx, int64(rs.Length), gctx.metricAttrs...)
}

if c.SentEvent && span.IsRecording() {
Expand Down