Skip to content

Commit ef28e16

Browse files
danielbdiasjfermi
andauthored
chore(examples): add grpc stream propagation example (#3908)
* chore(examples): add context propagation with gRPC streams * add telemetry to example * update example and README * Apply suggestions from code review Co-authored-by: Julianne Fermi <[email protected]> * small update on demo code --------- Co-authored-by: Julianne Fermi <[email protected]>
1 parent be7c461 commit ef28e16

26 files changed

+2140
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ dist/tracetest-docker-$(TAG).tar dist/tracetest-agent-docker-$(TAG).tar: $(CLI_S
3737
docker save --output dist/tracetest-agent-docker-$(TAG).tar "kubeshop/tracetest-agent:$(TAG)"
3838

3939
help: Makefile ## show list of commands
40-
@echo "Choose a command run:"
40+
@echo "Choose a command to run:"
4141
@echo ""
4242
@awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort
4343

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
## gRPC Stream Propagation
2+
3+
This example shows a system with two components working together to process payment data in a Producer/Consumer fashion. Every component is instrumented with OpenTelemetry and sends trace data to a Jaeger instance. The system is composed of:
4+
- a **Producer API** that receives payment data from customers, enqueues it internally and publishes it through a gRPC stream.
5+
- a **Consumer worker** that reads a gRPC stream of payment data and processes it.
6+
- an **OTel Collector** that receives trace data from both components and forwards it to a Jaeger instance.
7+
- a **Jaeger instance** that stores and displays trace data.
8+
9+
```mermaid
10+
flowchart LR
11+
User
12+
ProducerAPI["Producer API"]
13+
ConsumerWorker["Consumer Worker"]
14+
OTelCollector["OTel Collector"]
15+
Jaeger
16+
17+
User -- send payment --> ProducerAPI
18+
subgraph PaymentSystem
19+
ProducerAPI -- enqueue payment --> ProducerAPI
20+
ProducerAPI -- send notification --> ConsumerWorker
21+
22+
end
23+
PaymentSystem -- send telemetry --> OTelCollector
24+
OTelCollector --> Jaeger
25+
```
26+
27+
### Running the Example
28+
29+
To run this example, you need to have the following tools installed:
30+
- [Docker](https://www.docker.com/)
31+
- [Tracetest CLI](https://docs.tracetest.io/getting-started/installation#install-the-tracetest-cli)
32+
- [grpcurl](https://github.com/fullstorydev/grpcurl?tab=readme-ov-file#installation)
33+
34+
You can run this example using your environment on [Tracetest](https://app.tracetest.io), or using Tracetest Core. When you have a [Tracetest Agent](https://docs.tracetest.io/configuration/agent) configured on Tracetest UI and an API Key, you can run the following commands:
35+
36+
```sh
37+
TRACETEST_API_KEY="your-api-key" docker compose up
38+
```
39+
40+
This command will start the Producer API, Consumer Worker, OTel Collector, Tracetest Agent, and Jaeger instance.
41+
42+
You can execute a gRPC call to the Producer API with the following command:
43+
44+
```sh
45+
grpcurl -plaintext -proto ./proto/paymentreceiver.proto -d '{ "customerId": "1234", "amount": 50000 }' localhost:8080 proto.PaymentReceiver/ReceivePayment
46+
47+
# Expected output
48+
# {
49+
# "received": true
50+
# }
51+
```
52+
53+
This will start the entire process of receiving and notifying a payment, which you can see in the logs of the Producer API and Consumer Worker:
54+
```sh
55+
consumer-worker-1 | 2024/06/12 19:37:03 Received payment notification: payment:{customerId:"1234" amount:50000} highValuePayment:true metadata:{key:"traceparent" value:"00-ac8e8ed08353f23cbf1028ef42e7d10f-a95f55a8f9d9e29c-01"}
56+
```
57+
58+
You can access the Jaeger UI at http://localhost:16686 and see the trace generated by this call.
59+
60+
### Testing the Example
61+
62+
You can also run the tests of this example to guarantee that everything is working as expected. First, configure the Tracetest CLI to connect to your environment:
63+
```sh
64+
tracetest configure
65+
# and follow the instructions shown by the CLI
66+
```
67+
68+
Configure it to read traces from Jaeger with the command:
69+
70+
And finally, run the test:
71+
```sh
72+
tracetest run test -f ./trace-based-test.yaml
73+
74+
# Expected output
75+
# ✔ RunGroup: #ISI8sDUSR (https://app.tracetest.io/organizations/your-org-id/environments/your-env-id/run/ISI8sDUSR)
76+
# Summary: 1 passed, 0 failed, 0 pending
77+
# ✔ Test gRPC Stream Propagation (https://app.tracetest.io/organizations/your-org-id/environments/your-env-id/test/pprDfSUSg/run/1/test) - trace id: 808e8592f1ec08bda8701c3dcea5810c
78+
# ✔ It should call ReceivePayment gRPC endpoint
79+
# ✔ In should enqueue the payment to send it in a stream
80+
# ✔ It should send the a payment notification through a gRPC stream
81+
# ✔ It should receive a PaymentNotification through a stream and process it
82+
# ✔ The trace shape is correct
83+
```
84+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
FROM golang:alpine as builder
2+
ENV GO111MODULE=on
3+
RUN apk update && apk add --no-cache git
4+
5+
WORKDIR /app
6+
COPY go.mod ./
7+
COPY go.sum ./
8+
RUN go mod download
9+
COPY . .
10+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/main .
11+
12+
FROM scratch
13+
COPY --from=builder /app/bin/main .
14+
CMD ["./main"]
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Dependencies:
2+
# https://grpc.io/docs/protoc-installation/
3+
# go install google.golang.org/protobuf/cmd/[email protected]
4+
# go install google.golang.org/grpc/cmd/[email protected]
5+
6+
REQUIRED_BINS := protoc protoc-gen-go protoc-gen-go-grpc
7+
8+
help: ## show list of commands
9+
@echo "Choose a command to run:"
10+
@echo ""
11+
@awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort
12+
13+
build-proto: ensure-dependencies clean-proto ## generate gRPC code from proto files
14+
@protoc \
15+
-I=../proto \
16+
--go_out=./proto \
17+
--go_opt=paths=source_relative \
18+
--go-grpc_out=./proto \
19+
--go-grpc_opt=paths=source_relative \
20+
../proto/paymentreceiver.proto
21+
22+
ensure-dependencies: ## check if required binaries are installed
23+
$(foreach bin,$(REQUIRED_BINS),\
24+
$(if $(shell command -v $(bin) 2> /dev/null),,$(error Please install `$(bin)` or run `make install-grpc-tools`)))
25+
26+
install-grpc-tools: ## install required binaries
27+
go install google.golang.org/protobuf/cmd/[email protected]
28+
go install google.golang.org/grpc/cmd/[email protected]
29+
30+
clean-proto: ## remove generated gRPC code
31+
@rm -f proto/*.go
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
module github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/consumer-worker
2+
3+
go 1.21
4+
5+
require (
6+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
7+
go.opentelemetry.io/otel v1.27.0
8+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0
9+
go.opentelemetry.io/otel/sdk v1.27.0
10+
go.opentelemetry.io/otel/trace v1.27.0
11+
google.golang.org/grpc v1.64.0
12+
google.golang.org/protobuf v1.34.1
13+
)
14+
15+
require (
16+
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
17+
github.com/go-logr/logr v1.4.1 // indirect
18+
github.com/go-logr/stdr v1.2.2 // indirect
19+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
20+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect
21+
go.opentelemetry.io/otel/metric v1.27.0 // indirect
22+
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
23+
golang.org/x/net v0.25.0 // indirect
24+
golang.org/x/sys v0.20.0 // indirect
25+
golang.org/x/text v0.15.0 // indirect
26+
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
27+
)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
2+
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
3+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
6+
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
7+
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
8+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
9+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
10+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
11+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
12+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
13+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
14+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
15+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
16+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
17+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
18+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 h1:vS1Ao/R55RNV4O7TA2Qopok8yN+X0LIP6RVWLFkprck=
19+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0=
20+
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
21+
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
22+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 h1:R9DE4kQ4k+YtfLI2ULwX82VtNQ2J8yZmA7ZIF/D+7Mc=
23+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0/go.mod h1:OQFyQVrDlbe+R7xrEyDr/2Wr67Ol0hRUgsfA+V5A95s=
24+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA=
25+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0/go.mod h1:MOiCmryaYtc+V0Ei+Tx9o5S1ZjA7kzLucuVuyzBZloQ=
26+
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
27+
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
28+
go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI=
29+
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
30+
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
31+
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
32+
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
33+
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
34+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
35+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
36+
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
37+
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
38+
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
39+
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
40+
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
41+
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
42+
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
43+
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
44+
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
45+
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
46+
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
47+
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
48+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
49+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"io"
6+
"log"
7+
8+
pb "github.com/kubeshop/tracetest/quick-start-grpc-stream-propagation/consumer-worker/proto"
9+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
10+
"go.opentelemetry.io/otel/trace"
11+
grpc "google.golang.org/grpc"
12+
"google.golang.org/grpc/credentials/insecure"
13+
)
14+
15+
func main() {
16+
ctx := context.Background()
17+
18+
producerAPIAddress := getEnvVar("PRODUCER_API_ADDRESS", "localhost:8080")
19+
otelExporterEndpoint := getEnvVar("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317")
20+
otelServiceName := getEnvVar("OTEL_SERVICE_NAME", "producer-api")
21+
22+
tracer, err := setupOpenTelemetry(context.Background(), otelExporterEndpoint, otelServiceName)
23+
if err != nil {
24+
log.Fatalf("failed to initialize OpenTelemetry: %v", err)
25+
return
26+
}
27+
28+
grpcClient, err := grpc.NewClient(
29+
producerAPIAddress,
30+
grpc.WithTransportCredentials(insecure.NewCredentials()),
31+
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
32+
)
33+
if err != nil {
34+
log.Fatalf("could not connect to producer API: %v", err)
35+
}
36+
37+
log.Printf("Connected to producer API at %s", producerAPIAddress)
38+
39+
client := pb.NewPaymentReceiverClient(grpcClient)
40+
41+
stream, err := client.NotifyPayment(ctx, &pb.Empty{}, grpc.WaitForReady(true))
42+
if err != nil {
43+
log.Fatalf("could not receive payment notifications: %v", err)
44+
}
45+
46+
log.Printf("Listening for payment notifications...")
47+
48+
for {
49+
notification, err := stream.Recv()
50+
if err == io.EOF {
51+
log.Printf("No more payment notifications")
52+
return
53+
}
54+
if err != nil {
55+
log.Fatalf("could not receive payment notification: %v", err)
56+
}
57+
58+
processPaymentNotification(tracer, notification)
59+
}
60+
}
61+
62+
func processPaymentNotification(tracer trace.Tracer, notification *pb.PaymentNotification) {
63+
messageProcessingCtx := injectMetadataIntoContext(context.Background(), notification.Metadata)
64+
_, span := tracer.Start(messageProcessingCtx, "ProcessPaymentNotification")
65+
defer span.End()
66+
67+
log.Printf("Received payment notification: %v", notification)
68+
}

0 commit comments

Comments
 (0)