Skip to content

Commit f5f1866

Browse files
jpkrohlingmx-psi
authored andcommitted
[configgrpc] Use own compressors for zstd (open-telemetry#10323)
Uses our own version of the zstd compressor for gRPC servers. The code for it is based on the gzip compressor that comes built-in with gRPC. Benchmarks before this PR: ``` Running tool: /usr/bin/go test -benchmem -run=^$ -bench ^BenchmarkCompressors$ go.opentelemetry.io/collector/config/configgrpc sm_log_requestgoos: linux goarch: amd64 pkg: go.opentelemetry.io/collector/config/configgrpc cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz BenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_162/compressor_gzip-16 71594 19066 ns/op 615 B/op 4 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_159/compressor_zstd-16 151503 8544 ns/op 640 B/op 6 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_178/compressor_snappy-16 3632570 303.8 ns/op 304 B/op 3 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_219/compressor_gzip-16 68114 16938 ns/op 748 B/op 4 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_209/compressor_zstd-16 138091 8047 ns/op 896 B/op 6 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_260/compressor_snappy-16 3081198 402.5 ns/op 400 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_253/compressor_gzip-16 43414 27174 ns/op 386 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_216/compressor_zstd-16 117534 9903 ns/op 10112 B/op 6 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_454/compressor_snappy-16 1000000 1190 ns/op 528 B/op 2 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_203/compressor_gzip-16 67275 17508 ns/op 700 B/op 4 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_201/compressor_zstd-16 196862 6137 ns/op 848 B/op 6 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_220/compressor_snappy-16 3595815 331.7 ns/op 272 B/op 2 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_249/compressor_gzip-16 64105 19104 ns/op 844 B/op 4 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_256/compressor_zstd-16 169221 6929 ns/op 1120 B/op 6 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_279/compressor_snappy-16 2602239 473.0 ns/op 336 B/op 2 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_303/compressor_gzip-16 33861 36473 ns/op 904 B/op 4 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_258/compressor_zstd-16 107828 10596 ns/op 16832 B/op 6 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_591/compressor_snappy-16 725080 1540 ns/op 689 B/op 2 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_140/compressor_gzip-16 76315 16394 ns/op 496 B/op 4 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_137/compressor_zstd-16 193314 5957 ns/op 688 B/op 6 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_152/compressor_snappy-16 3558649 345.2 ns/op 208 B/op 2 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_194/compressor_gzip-16 68497 18413 ns/op 699 B/op 4 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_198/compressor_zstd-16 177841 6520 ns/op 1136 B/op 6 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_222/compressor_snappy-16 2354102 497.4 ns/op 272 B/op 2 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_601/compressor_gzip-16 21943 54603 ns/op 1941 B/op 5 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_559/compressor_zstd-16 71260 16077 ns/op 25312 B/op 6 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_1055/compressor_snappy-16 335415 3026 ns/op 1200 B/op 2 allocs/op PASS ok go.opentelemetry.io/collector/config/configgrpc 37.766s ``` After this version: ``` Running tool: /usr/bin/go test -benchmem -run=^$ -bench ^BenchmarkCompressors$ go.opentelemetry.io/collector/config/configgrpc sm_log_requestgoos: linux goarch: amd64 pkg: go.opentelemetry.io/collector/config/configgrpc cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz BenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_162/compressor_gzip-16 74952 15710 ns/op 603 B/op 4 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_159/compressor_zstd-16 156784 6966 ns/op 208 B/op 2 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_178/compressor_snappy-16 2216174 510.4 ns/op 308 B/op 3 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_219/compressor_gzip-16 68095 18569 ns/op 736 B/op 4 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_209/compressor_zstd-16 150705 8849 ns/op 294 B/op 2 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_260/compressor_snappy-16 2149710 556.8 ns/op 406 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_253/compressor_gzip-16 40040 26159 ns/op 368 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_216/compressor_zstd-16 123043 10254 ns/op 299 B/op 2 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_454/compressor_snappy-16 726780 1457 ns/op 533 B/op 2 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_203/compressor_gzip-16 64660 18186 ns/op 701 B/op 4 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_201/compressor_zstd-16 193225 6267 ns/op 273 B/op 2 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_220/compressor_snappy-16 2925073 418.2 ns/op 276 B/op 2 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_249/compressor_gzip-16 61320 20641 ns/op 846 B/op 4 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_256/compressor_zstd-16 190965 6440 ns/op 321 B/op 2 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_279/compressor_snappy-16 2051575 656.8 ns/op 341 B/op 2 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_303/compressor_gzip-16 30097 40680 ns/op 907 B/op 4 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_258/compressor_zstd-16 127027 8437 ns/op 363 B/op 2 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_591/compressor_snappy-16 716541 1803 ns/op 694 B/op 2 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_140/compressor_gzip-16 82287 15054 ns/op 496 B/op 4 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_137/compressor_zstd-16 230558 5470 ns/op 221 B/op 2 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_152/compressor_snappy-16 2759403 417.1 ns/op 211 B/op 2 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_194/compressor_gzip-16 58208 18925 ns/op 702 B/op 4 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_198/compressor_zstd-16 199226 6247 ns/op 256 B/op 2 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_222/compressor_snappy-16 2065202 609.8 ns/op 276 B/op 2 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_601/compressor_gzip-16 20583 59762 ns/op 1945 B/op 5 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_559/compressor_zstd-16 98254 13152 ns/op 728 B/op 2 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_1055/compressor_snappy-16 389401 3976 ns/op 1209 B/op 2 allocs/op PASS ok go.opentelemetry.io/collector/config/configgrpc 40.394s ``` Signed-off-by: Juraci Paixão Kröhling <[email protected]> --------- Signed-off-by: Juraci Paixão Kröhling <[email protected]>
1 parent 0ab388b commit f5f1866

File tree

7 files changed

+144
-6
lines changed

7 files changed

+144
-6
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'bug_fix'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: Use own compressors for zstd
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Before this change, the zstd compressor we used didn't respect the max message size.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10323]

config/configgrpc/configgrpc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
15-
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
1615
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
1716
"go.opentelemetry.io/otel"
1817
"google.golang.org/grpc"
@@ -28,6 +27,7 @@ import (
2827
"go.opentelemetry.io/collector/component"
2928
"go.opentelemetry.io/collector/config/configauth"
3029
"go.opentelemetry.io/collector/config/configcompression"
30+
grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal"
3131
"go.opentelemetry.io/collector/config/confignet"
3232
"go.opentelemetry.io/collector/config/configopaque"
3333
"go.opentelemetry.io/collector/config/configtelemetry"
@@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
426426
case configcompression.TypeSnappy:
427427
return snappy.Name, nil
428428
case configcompression.TypeZstd:
429-
return zstd.Name, nil
429+
return grpcInternal.ZstdName, nil
430430
default:
431431
return "", fmt.Errorf("unsupported compression type %q", compressionType)
432432
}

config/configgrpc/configgrpc_benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010
"testing"
1111

1212
"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
13-
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
1413
"google.golang.org/grpc/codes"
1514
"google.golang.org/grpc/encoding"
1615
"google.golang.org/grpc/encoding/gzip"
1716
"google.golang.org/grpc/status"
1817

18+
"go.opentelemetry.io/collector/config/configgrpc/internal"
1919
"go.opentelemetry.io/collector/pdata/plog"
2020
"go.opentelemetry.io/collector/pdata/pmetric"
2121
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -27,7 +27,7 @@ func BenchmarkCompressors(b *testing.B) {
2727

2828
compressors := make([]encoding.Compressor, 0)
2929
compressors = append(compressors, encoding.GetCompressor(gzip.Name))
30-
compressors = append(compressors, encoding.GetCompressor(zstd.Name))
30+
compressors = append(compressors, encoding.GetCompressor(internal.ZstdName))
3131
compressors = append(compressors, encoding.GetCompressor(snappy.Name))
3232

3333
for _, payload := range payloads {

config/configgrpc/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configgrpc
33
go 1.21.0
44

55
require (
6+
github.com/klauspost/compress v1.17.2
67
github.com/mostynb/go-grpc-compression v1.2.2
78
github.com/stretchr/testify v1.9.0
89
go.opentelemetry.io/collector v0.102.0
@@ -36,7 +37,6 @@ require (
3637
github.com/golang/snappy v0.0.4 // indirect
3738
github.com/hashicorp/go-version v1.7.0 // indirect
3839
github.com/json-iterator/go v1.1.12 // indirect
39-
github.com/klauspost/compress v1.17.2 // indirect
4040
github.com/knadh/koanf/maps v0.1.1 // indirect
4141
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
4242
github.com/knadh/koanf/v2 v2.1.1 // indirect

config/configgrpc/internal/zstd.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright The OpenTelemetry Authors
2+
// Copyright 2017 gRPC authors
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal"
6+
7+
import (
8+
"errors"
9+
"io"
10+
"sync"
11+
12+
"github.com/klauspost/compress/zstd"
13+
"google.golang.org/grpc/encoding"
14+
)
15+
16+
const ZstdName = "zstd"
17+
18+
func init() {
19+
encoding.RegisterCompressor(NewZstdCodec())
20+
}
21+
22+
type writer struct {
23+
*zstd.Encoder
24+
pool *sync.Pool
25+
}
26+
27+
func NewZstdCodec() encoding.Compressor {
28+
c := &compressor{}
29+
c.poolCompressor.New = func() any {
30+
zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024))
31+
return &writer{Encoder: zw, pool: &c.poolCompressor}
32+
}
33+
return c
34+
}
35+
36+
func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
37+
z := c.poolCompressor.Get().(*writer)
38+
z.Encoder.Reset(w)
39+
return z, nil
40+
}
41+
42+
func (z *writer) Close() error {
43+
defer z.pool.Put(z)
44+
return z.Encoder.Close()
45+
}
46+
47+
type reader struct {
48+
*zstd.Decoder
49+
pool *sync.Pool
50+
}
51+
52+
func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
53+
z, inPool := c.poolDecompressor.Get().(*reader)
54+
if !inPool {
55+
newZ, err := zstd.NewReader(r)
56+
if err != nil {
57+
return nil, err
58+
}
59+
return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
60+
}
61+
if err := z.Reset(r); err != nil {
62+
c.poolDecompressor.Put(z)
63+
return nil, err
64+
}
65+
return z, nil
66+
}
67+
68+
func (z *reader) Read(p []byte) (n int, err error) {
69+
n, err = z.Decoder.Read(p)
70+
if errors.Is(err, io.EOF) {
71+
z.pool.Put(z)
72+
}
73+
return n, err
74+
}
75+
76+
func (c *compressor) Name() string {
77+
return ZstdName
78+
}
79+
80+
type compressor struct {
81+
poolCompressor sync.Pool
82+
poolDecompressor sync.Pool
83+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package internal
5+
6+
import (
7+
"bytes"
8+
"io"
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func Test_zstdCodec_CompressDecompress(t *testing.T) {
15+
// prepare
16+
msg := []byte("Hello world.")
17+
compressed := &bytes.Buffer{}
18+
19+
// zstd header, for sanity checking
20+
header := []byte{40, 181, 47, 253}
21+
22+
c := NewZstdCodec()
23+
cWriter, err := c.Compress(compressed)
24+
require.NoError(t, err)
25+
require.NotNil(t, cWriter)
26+
27+
_, err = cWriter.Write(msg)
28+
require.NoError(t, err)
29+
cWriter.Close()
30+
31+
cReader, err := c.Decompress(compressed)
32+
require.NoError(t, err)
33+
require.NotNil(t, cReader)
34+
35+
uncompressed, err := io.ReadAll(cReader)
36+
require.NoError(t, err)
37+
require.Equal(t, msg, uncompressed)
38+
39+
// test header
40+
require.Equal(t, header, compressed.Bytes()[:4])
41+
}

receiver/otlpreceiver/otlp_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,8 @@ func TestGRPCMaxRecvSize(t *testing.T) {
726726
require.NoError(t, err)
727727

728728
td := testdata.GenerateTraces(50000)
729-
require.Error(t, exportTraces(cc, td))
729+
err = exportTraces(cc, td)
730+
require.Error(t, err)
730731
assert.NoError(t, cc.Close())
731732
require.NoError(t, recv.Shutdown(context.Background()))
732733

0 commit comments

Comments
 (0)