Skip to content

Commit fc02a7c

Browse files
khushijain21mauri870djaglowski
authored andcommitted
[fileconsumer] Add compression:auto to support ingesting mixed file types (open-telemetry#38510)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR supports the use case of reading from mixed file types. To support reading from both compressed and uncompressed file - configure ``` compression: auto ``` <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#37772 <!--Describe what testing was performed and which tests were added.--> #### Testing Start otelcollector-contrib with following config file ``` receivers: filelog: include: [ "./test.log*" ] # Path to log file start_at: beginning # Read from the start of the file include_file_path: true include_file_name: true compression: auto exporters: debug/1: verbosity: detailed service: telemetry: logs: level: debug pipelines: logs: receivers: [filelog] exporters: [debug/1] ``` Have `test.log` and `test.log.gz` in your path. We have two readers - one for plain text and other for gzipped file. --------- Co-authored-by: Mauri de Souza Meneguzzo <[email protected]> Co-authored-by: Daniel Jaglowski <[email protected]>
1 parent fb8577b commit fc02a7c

File tree

4 files changed

+79
-17
lines changed

4 files changed

+79
-17
lines changed

.chloggen/add-compression-auto.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: fileconsumer
8+
9+
10+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
11+
note: Support "`compression:auto`" which automatically detects file compression type and ingests data. Currently, only gzip compressed files are auto detected. This allows for mix of compressed and uncompressed files to be ingested with the same filelogreceiver.
12+
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [37772]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"os"
11+
"path/filepath"
1112
"sync"
1213
"time"
1314

@@ -59,13 +60,19 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
5960
if err != nil {
6061
return nil, err
6162
}
63+
var filetype string
64+
if filepath.Ext(file.Name()) == gzipExtension {
65+
filetype = gzipExtension
66+
}
67+
6268
m := &Metadata{
6369
Fingerprint: fp,
6470
FileAttributes: attributes,
6571
TokenLenState: tokenlen.State{},
6672
FlushState: flush.State{
6773
LastDataChange: time.Now(),
6874
},
75+
FileType: filetype,
6976
}
7077
return f.NewReaderFromMetadata(file, m)
7178
}

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
2626
)
2727

28+
const gzipExtension = ".gz"
29+
2830
type Metadata struct {
2931
Fingerprint *fingerprint.Fingerprint
3032
Offset int64
@@ -33,6 +35,7 @@ type Metadata struct {
3335
HeaderFinalized bool
3436
FlushState flush.State
3537
TokenLenState tokenlen.State
38+
FileType string
3639
}
3740

3841
// Reader manages a single file
@@ -69,30 +72,30 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
6972

7073
switch r.compression {
7174
case "gzip":
72-
// We need to create a gzip reader each time ReadToEnd is called because the underlying
73-
// SectionReader can only read a fixed window (from previous offset to EOF).
74-
info, err := r.file.Stat()
75-
if err != nil {
76-
r.set.Logger.Error("failed to stat", zap.Error(err))
77-
return
78-
}
79-
currentEOF := info.Size()
80-
81-
// use a gzip Reader with an underlying SectionReader to pick up at the last
82-
// offset of a gzip compressed file
83-
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
75+
currentEOF, err := r.createGzipReader()
8476
if err != nil {
85-
if !errors.Is(err, io.EOF) {
86-
r.set.Logger.Error("failed to create gzip reader", zap.Error(err))
87-
}
8877
return
8978
}
90-
r.reader = gzipReader
9179
// Offset tracking in an uncompressed file is based on the length of emitted tokens, but in this case
9280
// we need to set the offset to the end of the file.
9381
defer func() {
9482
r.Offset = currentEOF
9583
}()
84+
case "auto":
85+
// Identifying a filename by its extension may not always be correct. We could have a compressed file without the .gz extension
86+
if r.FileType == gzipExtension {
87+
currentEOF, err := r.createGzipReader()
88+
if err != nil {
89+
return
90+
}
91+
// Offset tracking in an uncompressed file is based on the length of emitted tokens, but in this case
92+
// we need to set the offset to the end of the file.
93+
defer func() {
94+
r.Offset = currentEOF
95+
}()
96+
} else {
97+
r.reader = r.file
98+
}
9699
default:
97100
r.reader = r.file
98101
}
@@ -117,6 +120,29 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
117120
r.readContents(ctx)
118121
}
119122

123+
// createGzipReader creates gzip reader and returns the file offset
124+
func (r *Reader) createGzipReader() (int64, error) {
125+
// We need to create a gzip reader each time ReadToEnd is called because the underlying
126+
// SectionReader can only read a fixed window (from previous offset to EOF).
127+
info, err := r.file.Stat()
128+
if err != nil {
129+
r.set.Logger.Error("failed to stat", zap.Error(err))
130+
return 0, err
131+
}
132+
currentEOF := info.Size()
133+
// use a gzip Reader with an underlying SectionReader to pick up at the last
134+
// offset of a gzip compressed file
135+
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
136+
if err != nil {
137+
if !errors.Is(err, io.EOF) {
138+
r.set.Logger.Error("failed to create gzip reader", zap.Error(err))
139+
}
140+
return 0, err
141+
}
142+
r.reader = gzipReader
143+
return currentEOF, nil
144+
}
145+
120146
func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
121147
bufPtr := r.getBufPtrFromPool()
122148
defer r.bufPool.Put(bufPtr)

receiver/filelogreceiver/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Tails and parses logs from files.
6363
| `ordering_criteria.sort_by.location` | | Relevant if `sort_type` is set to `timestamp`. Defines the location of the timestamp of the file. |
6464
| `ordering_criteria.sort_by.format` | | Relevant if `sort_type` is set to `timestamp`. Defines the strptime format of the timestamp being sorted. |
6565
| `ordering_criteria.sort_by.ascending` | | Sort direction |
66-
| `compression` | | Indicate the compression format of input files. If set accordingly, files will be read using a reader that uncompresses the file before scanning its content. Options are `` or `gzip` |
66+
| `compression` | | Indicate the compression format of input files. If set accordingly, files will be read using a reader that uncompresses the file before scanning its content. Options are ``, `gzip`, or `auto`. `auto` auto-detects file compression type. Currently, gzip files are the only compressed files auto-detected, based on ".gz" filename extension. `auto` option is useful when ingesting a mix of compressed and uncompressed files with the same filelogreceiver. |
6767

6868
Note that _by default_, no logs will be read from a file that is not actively being written to because `start_at` defaults to `end`.
6969

0 commit comments

Comments
 (0)