Skip to content

Commit 714289e

Browse files
mx-psijpkrohling
authored andcommitted
[configgrpc] Use own compressors for zstd (open-telemetry#10323) (open-telemetry#10324)
Backport of open-telemetry#10323 Signed-off-by: Juraci Paixão Kröhling <[email protected]> Co-authored-by: Juraci Paixão Kröhling <[email protected]>
1 parent 63b2b7c commit 714289e

File tree

7 files changed

+144
-6
lines changed

7 files changed

+144
-6
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'bug_fix'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: Use own compressors for zstd
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Before this change, the zstd compressor we used didn't respect the max message size.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10323]

config/configgrpc/configgrpc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
15-
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
1615
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
1716
"go.opentelemetry.io/otel"
1817
"google.golang.org/grpc"
@@ -28,6 +27,7 @@ import (
2827
"go.opentelemetry.io/collector/component"
2928
"go.opentelemetry.io/collector/config/configauth"
3029
"go.opentelemetry.io/collector/config/configcompression"
30+
grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal"
3131
"go.opentelemetry.io/collector/config/confignet"
3232
"go.opentelemetry.io/collector/config/configopaque"
3333
"go.opentelemetry.io/collector/config/configtelemetry"
@@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
426426
case configcompression.TypeSnappy:
427427
return snappy.Name, nil
428428
case configcompression.TypeZstd:
429-
return zstd.Name, nil
429+
return grpcInternal.ZstdName, nil
430430
default:
431431
return "", fmt.Errorf("unsupported compression type %q", compressionType)
432432
}

config/configgrpc/configgrpc_benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010
"testing"
1111

1212
"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
13-
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
1413
"google.golang.org/grpc/codes"
1514
"google.golang.org/grpc/encoding"
1615
"google.golang.org/grpc/encoding/gzip"
1716
"google.golang.org/grpc/status"
1817

18+
"go.opentelemetry.io/collector/config/configgrpc/internal"
1919
"go.opentelemetry.io/collector/pdata/plog"
2020
"go.opentelemetry.io/collector/pdata/pmetric"
2121
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -27,7 +27,7 @@ func BenchmarkCompressors(b *testing.B) {
2727

2828
compressors := make([]encoding.Compressor, 0)
2929
compressors = append(compressors, encoding.GetCompressor(gzip.Name))
30-
compressors = append(compressors, encoding.GetCompressor(zstd.Name))
30+
compressors = append(compressors, encoding.GetCompressor(internal.ZstdName))
3131
compressors = append(compressors, encoding.GetCompressor(snappy.Name))
3232

3333
for _, payload := range payloads {

config/configgrpc/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configgrpc
33
go 1.21.0
44

55
require (
6+
github.com/klauspost/compress v1.17.2
67
github.com/mostynb/go-grpc-compression v1.2.2
78
github.com/stretchr/testify v1.9.0
89
go.opentelemetry.io/collector v0.102.0
@@ -36,7 +37,6 @@ require (
3637
github.com/golang/snappy v0.0.4 // indirect
3738
github.com/hashicorp/go-version v1.7.0 // indirect
3839
github.com/json-iterator/go v1.1.12 // indirect
39-
github.com/klauspost/compress v1.17.2 // indirect
4040
github.com/knadh/koanf/maps v0.1.1 // indirect
4141
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
4242
github.com/knadh/koanf/v2 v2.1.1 // indirect

config/configgrpc/internal/zstd.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright The OpenTelemetry Authors
2+
// Copyright 2017 gRPC authors
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal"
6+
7+
import (
8+
"errors"
9+
"io"
10+
"sync"
11+
12+
"github.com/klauspost/compress/zstd"
13+
"google.golang.org/grpc/encoding"
14+
)
15+
16+
const ZstdName = "zstd"
17+
18+
func init() {
19+
encoding.RegisterCompressor(NewZstdCodec())
20+
}
21+
22+
type writer struct {
23+
*zstd.Encoder
24+
pool *sync.Pool
25+
}
26+
27+
func NewZstdCodec() encoding.Compressor {
28+
c := &compressor{}
29+
c.poolCompressor.New = func() any {
30+
zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024))
31+
return &writer{Encoder: zw, pool: &c.poolCompressor}
32+
}
33+
return c
34+
}
35+
36+
func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
37+
z := c.poolCompressor.Get().(*writer)
38+
z.Encoder.Reset(w)
39+
return z, nil
40+
}
41+
42+
func (z *writer) Close() error {
43+
defer z.pool.Put(z)
44+
return z.Encoder.Close()
45+
}
46+
47+
type reader struct {
48+
*zstd.Decoder
49+
pool *sync.Pool
50+
}
51+
52+
func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
53+
z, inPool := c.poolDecompressor.Get().(*reader)
54+
if !inPool {
55+
newZ, err := zstd.NewReader(r)
56+
if err != nil {
57+
return nil, err
58+
}
59+
return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
60+
}
61+
if err := z.Reset(r); err != nil {
62+
c.poolDecompressor.Put(z)
63+
return nil, err
64+
}
65+
return z, nil
66+
}
67+
68+
func (z *reader) Read(p []byte) (n int, err error) {
69+
n, err = z.Decoder.Read(p)
70+
if errors.Is(err, io.EOF) {
71+
z.pool.Put(z)
72+
}
73+
return n, err
74+
}
75+
76+
func (c *compressor) Name() string {
77+
return ZstdName
78+
}
79+
80+
type compressor struct {
81+
poolCompressor sync.Pool
82+
poolDecompressor sync.Pool
83+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package internal
5+
6+
import (
7+
"bytes"
8+
"io"
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func Test_zstdCodec_CompressDecompress(t *testing.T) {
15+
// prepare
16+
msg := []byte("Hello world.")
17+
compressed := &bytes.Buffer{}
18+
19+
// zstd header, for sanity checking
20+
header := []byte{40, 181, 47, 253}
21+
22+
c := NewZstdCodec()
23+
cWriter, err := c.Compress(compressed)
24+
require.NoError(t, err)
25+
require.NotNil(t, cWriter)
26+
27+
_, err = cWriter.Write(msg)
28+
require.NoError(t, err)
29+
cWriter.Close()
30+
31+
cReader, err := c.Decompress(compressed)
32+
require.NoError(t, err)
33+
require.NotNil(t, cReader)
34+
35+
uncompressed, err := io.ReadAll(cReader)
36+
require.NoError(t, err)
37+
require.Equal(t, msg, uncompressed)
38+
39+
// test header
40+
require.Equal(t, header, compressed.Bytes()[:4])
41+
}

receiver/otlpreceiver/otlp_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,8 @@ func TestGRPCMaxRecvSize(t *testing.T) {
726726
require.NoError(t, err)
727727

728728
td := testdata.GenerateTraces(50000)
729-
require.Error(t, exportTraces(cc, td))
729+
err = exportTraces(cc, td)
730+
require.Error(t, err)
730731
assert.NoError(t, cc.Close())
731732
require.NoError(t, recv.Shutdown(context.Background()))
732733

0 commit comments

Comments
 (0)