Skip to content

Commit c0704c7

Browse files
[exporter/awss3exporter] Added IsArchiveFormat for Sumo IC (open-telemetry#43574)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The sumo_ic marshaler was incorrectly blocked from using compression. Sumo Logic's Installed Collector compresses archive files before S3 upload, but the OpenTelemetry exporter prevented this, causing format incompatibility. Root cause: The exporter used ContentEncoding: "gzip" for all compressed uploads, which signals HTTP transfer compression (auto-decompressed by AWS). Archive formats like sumo_ic need files to stay compressed permanently. Solution: Added isArchiveFormat flag to distinguish between archive formats (files stay compressed) and transfer formats (HTTP optimization). The flag controls whether ContentEncoding header is set during upload. <!--Describe what testing was performed and which tests were added.--> #### Testing - Manual testing: Downloaded files from S3, verified gzip compression with file and gunzip commands - Unit test passed: ``` ➜ awss3exporter git:(sumoCompression) ✗ make test /Users/timothy.chan/Desktop/opentelemetry-collector-contrib/.tools/gotestsum --rerun-fails=1 --packages="./..." -- -race -timeout 900s -parallel 4 --tags="" ∅ internal/metadata ✓ internal/upload (cached) ✓ . (2.662s) DONE 98 tests in 2.662s ``` <!--Describe the documentation added.--> #### Documentation - Removed outdated README that gzip doesn't support sumo_ic marshaler <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 8abefcc commit c0704c7

File tree

11 files changed

+66
-17
lines changed

11 files changed

+66
-17
lines changed

.chloggen/sumoCompression.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/awss3
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Support compression with the sumo_ic marshaller
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43574]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/awss3exporter/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/
8282

8383
### Compression
8484
- `none` (default): No compression will be applied
85-
- `gzip`: Files will be compressed with gzip. **This does not support `sumo_ic`marshaler.**
85+
- `gzip`: Files will be compressed with gzip.
8686

8787
### resource_attrs_to_s3
8888
- `s3_bucket`: Defines which resource attribute's value should be used as the S3 bucket.

exporter/awss3exporter/config.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,6 @@ func (c *Config) Validate() error {
146146
if compression != configcompression.TypeGzip {
147147
errs = multierr.Append(errs, errors.New("unknown compression type"))
148148
}
149-
150-
if c.MarshalerName == SumoIC {
151-
errs = multierr.Append(errs, errors.New("marshaler does not support compression"))
152-
}
153149
}
154150

155151
if c.S3Uploader.RetryMode != "nop" && c.S3Uploader.RetryMode != "standard" && c.S3Uploader.RetryMode != "adaptive" {

exporter/awss3exporter/exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (e *s3Exporter) start(ctx context.Context, host component.Host) error {
7575

7676
e.marshaler = m
7777

78-
up, err := newUploadManager(ctx, e.config, e.signalType, m.format())
78+
up, err := newUploadManager(ctx, e.config, e.signalType, m.format(), m.compressed())
7979
if err != nil {
8080
return err
8181
}

exporter/awss3exporter/internal/upload/partition.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type PartitionKeyBuilder struct {
4949
// generating a new unique string to avoid collisions on file upload
5050
// across many different instances.
5151
UniqueKeyFunc func() string
52+
// IsCompressed when true keeps files compressed in S3
53+
// by omitting ContentEncoding headers. When false, ContentEncoding
54+
// is set for HTTP transfer compression (AWS auto-decompresses).
55+
IsCompressed bool
5256
}
5357

5458
func (pki *PartitionKeyBuilder) Build(ts time.Time, overridePrefix string) string {

exporter/awss3exporter/internal/upload/writer.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ func (sw *s3manager) Upload(ctx context.Context, data []byte, opts *UploadOption
6464
}
6565

6666
encoding := ""
67-
if sw.builder.Compression.IsCompressed() {
67+
// Only use ContentEncoding for non-archive formats
68+
// Archive formats store files compressed permanently (like .tar.gz)
69+
// while ContentEncoding is for HTTP transfer compression
70+
if sw.builder.Compression.IsCompressed() && !sw.builder.IsCompressed {
6871
encoding = string(sw.builder.Compression)
6972
}
7073

@@ -79,15 +82,20 @@ func (sw *s3manager) Upload(ctx context.Context, data []byte, opts *UploadOption
7982
}
8083
}
8184

82-
_, err = sw.uploader.Upload(ctx, &s3.PutObjectInput{
83-
Bucket: aws.String(overrideBucket),
84-
Key: aws.String(sw.builder.Build(now, overridePrefix)),
85-
Body: content,
86-
ContentEncoding: aws.String(encoding),
87-
StorageClass: sw.storageClass,
88-
ACL: sw.acl,
89-
})
85+
uploadInput := &s3.PutObjectInput{
86+
Bucket: aws.String(overrideBucket),
87+
Key: aws.String(sw.builder.Build(now, overridePrefix)),
88+
Body: content,
89+
StorageClass: sw.storageClass,
90+
ACL: sw.acl,
91+
}
92+
93+
// Only set ContentEncoding if we have a non-empty encoding value
94+
if encoding != "" {
95+
uploadInput.ContentEncoding = aws.String(encoding)
96+
}
9097

98+
_, err = sw.uploader.Upload(ctx, uploadInput)
9199
return err
92100
}
93101

exporter/awss3exporter/marshaler.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type marshaler interface {
1919
MarshalLogs(ld plog.Logs) ([]byte, error)
2020
MarshalMetrics(md pmetric.Metrics) ([]byte, error)
2121
format() string
22+
compressed() bool
2223
}
2324

2425
var ErrUnknownMarshaler = errors.New("unknown marshaler")
@@ -34,6 +35,7 @@ func newMarshalerFromEncoding(encoding *component.ID, fileFormat string, host co
3435
marshaler.metricsMarshaler, _ = e.(pmetric.Marshaler)
3536
marshaler.tracesMarshaler, _ = e.(ptrace.Marshaler)
3637
marshaler.fileFormat = fileFormat
38+
marshaler.IsCompressed = false
3739
return marshaler, nil
3840
}
3941

@@ -45,19 +47,23 @@ func newMarshaler(mType MarshalerType, logger *zap.Logger) (marshaler, error) {
4547
marshaler.tracesMarshaler = &ptrace.ProtoMarshaler{}
4648
marshaler.metricsMarshaler = &pmetric.ProtoMarshaler{}
4749
marshaler.fileFormat = "binpb"
50+
marshaler.IsCompressed = false
4851
case OtlpJSON:
4952
marshaler.logsMarshaler = &plog.JSONMarshaler{}
5053
marshaler.tracesMarshaler = &ptrace.JSONMarshaler{}
5154
marshaler.metricsMarshaler = &pmetric.JSONMarshaler{}
5255
marshaler.fileFormat = "json"
56+
marshaler.IsCompressed = false
5357
case SumoIC:
5458
sumomarshaler := newSumoICMarshaler()
5559
marshaler.logsMarshaler = &sumomarshaler
56-
marshaler.fileFormat = "json.gz"
60+
marshaler.fileFormat = "json"
61+
marshaler.IsCompressed = true
5762
case Body:
5863
exportbodyMarshaler := newbodyMarshaler()
5964
marshaler.logsMarshaler = &exportbodyMarshaler
6065
marshaler.fileFormat = exportbodyMarshaler.format()
66+
marshaler.IsCompressed = false
6167
default:
6268
return nil, ErrUnknownMarshaler
6369
}

exporter/awss3exporter/marshaler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestMarshaler(t *testing.T) {
3131
m, err := newMarshaler("sumo_ic", zap.NewNop())
3232
assert.NoError(t, err)
3333
require.NotNil(t, m)
34-
assert.Equal(t, "json.gz", m.format())
34+
assert.Equal(t, "json", m.format())
3535
}
3636
{
3737
m, err := newMarshaler("unknown", zap.NewNop())

exporter/awss3exporter/s3_marshaler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type s3Marshaler struct {
1616
metricsMarshaler pmetric.Marshaler
1717
logger *zap.Logger
1818
fileFormat string
19+
IsCompressed bool
1920
}
2021

2122
func (marshaler *s3Marshaler) MarshalTraces(td ptrace.Traces) ([]byte, error) {
@@ -33,3 +34,7 @@ func (marshaler *s3Marshaler) MarshalMetrics(md pmetric.Metrics) ([]byte, error)
3334
func (marshaler *s3Marshaler) format() string {
3435
return marshaler.fileFormat
3536
}
37+
38+
func (marshaler *s3Marshaler) compressed() bool {
39+
return marshaler.IsCompressed
40+
}

exporter/awss3exporter/s3_writer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ func newUploadManager(
2424
conf *Config,
2525
metadata string,
2626
format string,
27+
isCompressed bool,
2728
) (upload.Manager, error) {
2829
configOpts := []func(*config.LoadOptions) error{}
2930

@@ -110,6 +111,7 @@ func newUploadManager(
110111
Metadata: metadata,
111112
Compression: conf.S3Uploader.Compression,
112113
UniqueKeyFunc: uniqueKeyFunc,
114+
IsCompressed: isCompressed,
113115
},
114116
s3.NewFromConfig(cfg, s3Opts...),
115117
s3types.StorageClass(conf.S3Uploader.StorageClass),

0 commit comments

Comments
 (0)