Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/inputs/logfile/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
package constants

const (
// DefaultReaderBufferSize is the default buffer size for file readers (256KB)
// This is much smaller than MaxEventSize to reduce memory usage
DefaultReaderBufferSize = 256 * 1024

// DefaultMaxEventSize is the default maximum size for log events (1MB)
DefaultMaxEventSize = 1024 * 1024

Expand Down
54 changes: 43 additions & 11 deletions plugins/inputs/logfile/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ type Tail struct {
Lines chan *Line
Config

file *os.File
reader *bufio.Reader
file *os.File
reader *bufio.Reader
useLargeBuffer bool

watcher watch.FileWatcher
changes *watch.FileChanges
Expand Down Expand Up @@ -544,14 +545,52 @@ func (tail *Tail) waitForChanges() error {

func (tail *Tail) openReader() {
tail.lk.Lock()
if tail.MaxLineSize > 0 {
if tail.useLargeBuffer {
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize)
} else {
tail.reader = bufio.NewReader(tail.file)
tail.reader = bufio.NewReaderSize(tail.file, constants.DefaultReaderBufferSize)
}
tail.lk.Unlock()
}

func (tail *Tail) readSlice(delim byte) ([]byte, error) {
// First try: normal ReadSlice
word, err := tail.reader.ReadSlice(delim)
if err != bufio.ErrBufferFull {
tail.curOffset += int64(len(word))
return word, err // fast path: no allocation
}

// Check if buffer already upgraded
if tail.reader.Size() == tail.MaxLineSize {
tail.curOffset += int64(len(word))
return word, bufio.ErrBufferFull
}

// Buffer was too small → allocate ONCE
buf := append([]byte(nil), word...)

// Copy any unread buffered data
unread := tail.reader.Buffered()
if unread > 0 {
peek, _ := tail.reader.Peek(unread)
buf = append(buf, peek...)
tail.reader.Discard(unread)
}

// Switch to a bigger buffer
tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize)
// In the event that the tail is re-opened, we don't want to have to do this
// re-sizing of the buffer again. The reader should just re-open with the larger buffer
tail.useLargeBuffer = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't ever set this back to false right? So it'll only take 1 occurrence to use the larger buffer forever in the tailer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, that will stay true. I'll create a ticket to adjust that.


word, err = tail.reader.ReadSlice(delim)
buf = append(buf, word...)
tail.curOffset += int64(len(buf))

return buf, err
}

func (tail *Tail) seekEnd() error {
return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
}
Expand Down Expand Up @@ -618,13 +657,6 @@ func (tail *Tail) Cleanup() {
watch.Cleanup(tail.Filename)
}

// A wrapper of bufio ReadSlice
func (tail *Tail) readSlice(delim byte) (line []byte, err error) {
line, err = tail.reader.ReadSlice(delim)
tail.curOffset += int64(len(line))
return
}

// A wrapper of bufio ReadByte
func (tail *Tail) readByte() (b byte, err error) {
b, err = tail.reader.ReadByte()
Expand Down
251 changes: 250 additions & 1 deletion plugins/inputs/logfile/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent/plugins/inputs/logfile/constants"
)

const linesWrittenToFile int = 10
Expand Down Expand Up @@ -289,7 +291,7 @@ func TestTail_1MBWithExplicitMaxLineSize(t *testing.T) {
tail, err := TailFile(filename, Config{
Follow: false,
MustExist: true,
MaxLineSize: 1024 * 1024, // Explicitly set 1MB buffer
MaxLineSize: constants.DefaultMaxEventSize, // Explicitly set 1MB buffer
})
require.NoError(t, err)
defer tail.Stop()
Expand Down Expand Up @@ -401,3 +403,250 @@ func TestConcurrentLinePoolAccess(t *testing.T) {

wg.Wait()
}

// TestDynamicBufferSmallLines tests that small lines use the default small buffer
func TestDynamicBufferSmallLines(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "small_lines.log")

// Create file with small lines (well within 256KB default buffer)
smallLine := strings.Repeat("a", 1024) // 1KB line
content := smallLine + "\n" + smallLine + "\n"
err := os.WriteFile(filename, []byte(content), 0600)
require.NoError(t, err)

tail, err := TailFile(filename, Config{
Follow: false,
MustExist: true,
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
})
require.NoError(t, err)
defer tail.Stop()

// Read first line to ensure reader is initialized
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, smallLine, line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for first line")
}

// Now check buffer size after reader is initialized
assert.Equal(t, constants.DefaultReaderBufferSize, tail.reader.Size(), "Should use default 256KB buffer for small lines")
assert.False(t, tail.useLargeBuffer, "Should not be using large buffer for small lines")

// Read second line
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, smallLine, line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for second line")
}

// Verify buffer is still small after reading small lines
assert.Equal(t, constants.DefaultReaderBufferSize, tail.reader.Size(), "Should still use 256KB buffer for small lines")
assert.False(t, tail.useLargeBuffer, "Should still not be using large buffer")
}

// TestDynamicBufferLargeLineUpgrade tests buffer upgrade when encountering large lines
func TestDynamicBufferLargeLineUpgrade(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "large_line.log")

// Create file with multiple large lines to test buffer upgrade
largeLine := strings.Repeat("b", 300*1024) // 300KB line
nearlyOneMBLine := strings.Repeat("x", constants.DefaultMaxEventSize-1024) // Nearly 1MB line (1MB - 1KB)
content := largeLine + "\nafter large line\n" + nearlyOneMBLine + "\nafter nearly 1MB line\n"
err := os.WriteFile(filename, []byte(content), 0600)
require.NoError(t, err)

tail, err := TailFile(filename, Config{
Follow: false,
MustExist: true,
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
})
require.NoError(t, err)
defer tail.Stop()

// Read first large line (300KB) - should trigger buffer upgrade
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, largeLine, line.Text)
assert.Equal(t, 300*1024, len(line.Text), "First line should be 300KB")
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for large line")
}

// Verify buffer was upgraded after reading large line
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should upgrade to 1MB buffer after large line")
assert.True(t, tail.useLargeBuffer, "Should be using large buffer after upgrade")

// Read line after large line
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, "after large line", line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for line after large")
}

// Read nearly 1MB line - should work with already upgraded buffer
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, nearlyOneMBLine, line.Text)
assert.Equal(t, constants.DefaultMaxEventSize-1024, len(line.Text), "Line should be nearly 1MB")
tail.ReleaseLine(line)
case <-time.After(2 * time.Second): // Longer timeout for very large line
t.Fatal("Timeout waiting for nearly 1MB line")
}

// Read final line
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, "after nearly 1MB line", line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for final line")
}

// Verify buffer remains large throughout
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should maintain 1MB buffer")
assert.True(t, tail.useLargeBuffer, "Should continue using large buffer")
}

// TestDynamicBufferPersistentUpgrade tests that buffer upgrade persists across file reopens
func TestDynamicBufferPersistentUpgrade(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "persistent_test.log")

// Create file with large line to trigger upgrade
largeLine := strings.Repeat("c", 300*1024) // 300KB line
content := largeLine + "\n"
err := os.WriteFile(filename, []byte(content), 0600)
require.NoError(t, err)

tail, err := TailFile(filename, Config{
Follow: true,
ReOpen: true,
MustExist: true,
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
})
require.NoError(t, err)
defer tail.Stop()

// Read large line to trigger upgrade
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, largeLine, line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for large line")
}

// Verify buffer was upgraded
assert.True(t, tail.useLargeBuffer, "Should be using large buffer after upgrade")

// Force a reopen by simulating file recreation
err = tail.Reopen(false)
require.NoError(t, err)

// Verify buffer upgrade persisted across reopen
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should use 1MB buffer after reopen")
assert.True(t, tail.useLargeBuffer, "Should maintain large buffer flag after reopen")
}

// TestDynamicBufferMaxLineSizeLimit tests behavior when line exceeds MaxLineSize
func TestDynamicBufferMaxLineSizeLimit(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "max_size_test.log")

maxLineSize := 512 * 1024 // 512KB max
// Create line larger than MaxLineSize
hugeLine := strings.Repeat("d", maxLineSize+1024) // 513KB line
content := hugeLine + "\n"
err := os.WriteFile(filename, []byte(content), 0600)
require.NoError(t, err)

tail, err := TailFile(filename, Config{
Follow: false,
MustExist: true,
MaxLineSize: maxLineSize,
})
require.NoError(t, err)
defer tail.Stop()

// Read the line - should be truncated at buffer boundary
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
// Line should be truncated to buffer size
assert.Equal(t, maxLineSize, len(line.Text))
assert.Equal(t, strings.Repeat("d", maxLineSize), line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for truncated line")
}

// Verify buffer was upgraded to MaxLineSize
assert.Equal(t, maxLineSize, tail.reader.Size(), "Should upgrade to MaxLineSize buffer")
assert.True(t, tail.useLargeBuffer, "Should be using large buffer")
}

// TestDynamicBufferMultipleUpgrades tests that buffer doesn't upgrade multiple times
func TestDynamicBufferMultipleUpgrades(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "multiple_upgrades.log")

// Create multiple large lines
largeLine1 := strings.Repeat("e", 300*1024) // 300KB
largeLine2 := strings.Repeat("f", 400*1024) // 400KB
content := largeLine1 + "\n" + largeLine2 + "\n"
err := os.WriteFile(filename, []byte(content), 0600)
require.NoError(t, err)

tail, err := TailFile(filename, Config{
Follow: false,
MustExist: true,
MaxLineSize: constants.DefaultMaxEventSize, // 1MB max
})
require.NoError(t, err)
defer tail.Stop()

// Read first large line
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, largeLine1, line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for first large line")
}

// Verify buffer was upgraded
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should upgrade to 1MB after first large line")
assert.True(t, tail.useLargeBuffer, "Should be using large buffer")

// Read second large line
select {
case line := <-tail.Lines:
assert.NoError(t, line.Err)
assert.Equal(t, largeLine2, line.Text)
tail.ReleaseLine(line)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for second large line")
}

// Verify buffer size didn't change (no double upgrade)
assert.Equal(t, constants.DefaultMaxEventSize, tail.reader.Size(), "Should maintain 1MB buffer")
assert.True(t, tail.useLargeBuffer, "Should continue using large buffer")
}
Loading