Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: Use own compressors for zstd

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Before this change, the zstd compressor we used didn't respect the max message size.

# One or more tracking issues or pull requests related to the change
issues: [10323]
4 changes: 2 additions & 2 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
Expand All @@ -28,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand Down Expand Up @@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
case configcompression.TypeSnappy:
return snappy.Name, nil
case configcompression.TypeZstd:
return zstd.Name, nil
return grpcInternal.ZstdName, nil
default:
return "", fmt.Errorf("unsupported compression type %q", compressionType)
}
Expand Down
4 changes: 2 additions & 2 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"testing"

"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/config/configgrpc/internal"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -27,7 +27,7 @@ func BenchmarkCompressors(b *testing.B) {

compressors := make([]encoding.Compressor, 0)
compressors = append(compressors, encoding.GetCompressor(gzip.Name))
compressors = append(compressors, encoding.GetCompressor(zstd.Name))
compressors = append(compressors, encoding.GetCompressor(internal.ZstdName))
compressors = append(compressors, encoding.GetCompressor(snappy.Name))

for _, payload := range payloads {
Expand Down
2 changes: 1 addition & 1 deletion config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configgrpc
go 1.21.0

require (
github.com/klauspost/compress v1.17.2
github.com/mostynb/go-grpc-compression v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.102.0
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
Expand Down
83 changes: 83 additions & 0 deletions config/configgrpc/internal/zstd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// Copyright 2017 gRPC authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal"

import (
"errors"
"io"
"sync"

"github.com/klauspost/compress/zstd"
"google.golang.org/grpc/encoding"
)

const ZstdName = "zstd"

func init() {
encoding.RegisterCompressor(NewZstdCodec())
}

type writer struct {
*zstd.Encoder
pool *sync.Pool
}

func NewZstdCodec() encoding.Compressor {
c := &compressor{}
c.poolCompressor.New = func() any {
zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024))
return &writer{Encoder: zw, pool: &c.poolCompressor}
}
return c
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.poolCompressor.Get().(*writer)
z.Encoder.Reset(w)
return z, nil
}

func (z *writer) Close() error {
defer z.pool.Put(z)
return z.Encoder.Close()
}

type reader struct {
*zstd.Decoder
pool *sync.Pool
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.poolDecompressor.Get().(*reader)
if !inPool {
newZ, err := zstd.NewReader(r)
if err != nil {
return nil, err
}
return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
}
if err := z.Reset(r); err != nil {
c.poolDecompressor.Put(z)
return nil, err
}
return z, nil
}

func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Decoder.Read(p)
if errors.Is(err, io.EOF) {
z.pool.Put(z)
}
return n, err
}

func (c *compressor) Name() string {
return ZstdName
}

type compressor struct {
poolCompressor sync.Pool
poolDecompressor sync.Pool
}
41 changes: 41 additions & 0 deletions config/configgrpc/internal/zstd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal

import (
"bytes"
"io"
"testing"

"github.com/stretchr/testify/require"
)

func Test_zstdCodec_CompressDecompress(t *testing.T) {
// prepare
msg := []byte("Hello world.")
compressed := &bytes.Buffer{}

// zstd header, for sanity checking
header := []byte{40, 181, 47, 253}

c := NewZstdCodec()
cWriter, err := c.Compress(compressed)
require.NoError(t, err)
require.NotNil(t, cWriter)

_, err = cWriter.Write(msg)
require.NoError(t, err)
cWriter.Close()

cReader, err := c.Decompress(compressed)
require.NoError(t, err)
require.NotNil(t, cReader)

uncompressed, err := io.ReadAll(cReader)
require.NoError(t, err)
require.Equal(t, msg, uncompressed)

// test header
require.Equal(t, header, compressed.Bytes()[:4])
}
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ func TestGRPCMaxRecvSize(t *testing.T) {
require.NoError(t, err)

td := testdata.GenerateTraces(50000)
require.Error(t, exportTraces(cc, td))
err = exportTraces(cc, td)
require.Error(t, err)
assert.NoError(t, cc.Close())
require.NoError(t, recv.Shutdown(context.Background()))

Expand Down