@@ -25,6 +25,53 @@ type compressRoundTripper struct {
2525 compressor * compressor
2626}
2727
28+ var availableDecoders = map [string ]func (body io.ReadCloser ) (io.ReadCloser , error ){
29+ "" : func (io.ReadCloser ) (io.ReadCloser , error ) {
30+ // Not a compressed payload. Nothing to do.
31+ return nil , nil
32+ },
33+ "gzip" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
34+ gr , err := gzip .NewReader (body )
35+ if err != nil {
36+ return nil , err
37+ }
38+ return gr , nil
39+ },
40+ "zstd" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
41+ zr , err := zstd .NewReader (
42+ body ,
43+ // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
44+ // for our use-case (a server accepting decoding http requests).
45+ // Disabling async improves performance (I benchmarked it previously when working
46+ // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
47+ zstd .WithDecoderConcurrency (1 ),
48+ )
49+ if err != nil {
50+ return nil , err
51+ }
52+ return zr .IOReadCloser (), nil
53+ },
54+ "zlib" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
55+ zr , err := zlib .NewReader (body )
56+ if err != nil {
57+ return nil , err
58+ }
59+ return zr , nil
60+ },
61+ "snappy" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
62+ sr := snappy .NewReader (body )
63+ sb := new (bytes.Buffer )
64+ _ , err := io .Copy (sb , sr )
65+ if err != nil {
66+ return nil , err
67+ }
68+ if err = body .Close (); err != nil {
69+ return nil , err
70+ }
71+ return io .NopCloser (sb ), nil
72+ },
73+ }
74+
2875func newCompressRoundTripper (rt http.RoundTripper , compressionType configcompression.Type ) (* compressRoundTripper , error ) {
2976 encoder , err := newCompressor (compressionType )
3077 if err != nil {
@@ -77,64 +124,27 @@ type decompressor struct {
77124// by identifying the compression format in the "Content-Encoding" header and re-writing
78125// request body so that the handlers further in the chain can work on decompressed data.
79126// It supports gzip and deflate/zlib compression.
80- func httpContentDecompressor (h http.Handler , maxRequestBodySize int64 , eh func (w http.ResponseWriter , r * http.Request , errorMsg string , statusCode int ), decoders map [string ]func (body io.ReadCloser ) (io.ReadCloser , error )) http.Handler {
127+ func httpContentDecompressor (h http.Handler , maxRequestBodySize int64 , eh func (w http.ResponseWriter , r * http.Request , errorMsg string , statusCode int ), enableDecoders [] string , decoders map [string ]func (body io.ReadCloser ) (io.ReadCloser , error )) http.Handler {
81128 errHandler := defaultErrorHandler
82129 if eh != nil {
83130 errHandler = eh
84131 }
85132
133+ enabled := map [string ]func (body io.ReadCloser ) (io.ReadCloser , error ){}
134+ for _ , dec := range enableDecoders {
135+ enabled [dec ] = availableDecoders [dec ]
136+
137+ if dec == "deflate" {
138+ enabled ["deflate" ] = availableDecoders ["zlib" ]
139+ }
140+ }
141+
86142 d := & decompressor {
87143 maxRequestBodySize : maxRequestBodySize ,
88144 errHandler : errHandler ,
89145 base : h ,
90- decoders : map [string ]func (body io.ReadCloser ) (io.ReadCloser , error ){
91- "" : func (io.ReadCloser ) (io.ReadCloser , error ) {
92- // Not a compressed payload. Nothing to do.
93- return nil , nil
94- },
95- "gzip" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
96- gr , err := gzip .NewReader (body )
97- if err != nil {
98- return nil , err
99- }
100- return gr , nil
101- },
102- "zstd" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
103- zr , err := zstd .NewReader (
104- body ,
105- // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
106- // for our use-case (a server accepting decoding http requests).
107- // Disabling async improves performance (I benchmarked it previously when working
108- // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
109- zstd .WithDecoderConcurrency (1 ),
110- )
111- if err != nil {
112- return nil , err
113- }
114- return zr .IOReadCloser (), nil
115- },
116- "zlib" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
117- zr , err := zlib .NewReader (body )
118- if err != nil {
119- return nil , err
120- }
121- return zr , nil
122- },
123- "snappy" : func (body io.ReadCloser ) (io.ReadCloser , error ) {
124- sr := snappy .NewReader (body )
125- sb := new (bytes.Buffer )
126- _ , err := io .Copy (sb , sr )
127- if err != nil {
128- return nil , err
129- }
130- if err = body .Close (); err != nil {
131- return nil , err
132- }
133- return io .NopCloser (sb ), nil
134- },
135- },
146+ decoders : enabled ,
136147 }
137- d .decoders ["deflate" ] = d .decoders ["zlib" ]
138148
139149 for key , dec := range decoders {
140150 d .decoders [key ] = dec
0 commit comments