Skip to content

Commit 2da9c43

Browse files
authored
Update state when batch dropped (#1789)
1 parent c815bfc commit 2da9c43

File tree

5 files changed

+369
-8
lines changed

5 files changed

+369
-8
lines changed

plugins/outputs/cloudwatchlogs/internal/pusher/batch.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ type logEventBatch struct {
9898
minT, maxT time.Time
9999
// Callbacks to execute when batch is successfully sent.
100100
doneCallbacks []func()
101-
batchers map[string]*state.RangeQueueBatcher
101+
// Callbacks specifically for updating state
102+
stateCallbacks []func()
103+
batchers map[string]*state.RangeQueueBatcher
102104
}
103105

104106
func newLogEventBatch(target Target, entityProvider logs.LogEntityProvider) *logEventBatch {
@@ -151,7 +153,7 @@ func (b *logEventBatch) handleLogEventState(s *logEventState) {
151153
batcher, ok := b.batchers[queueID]
152154
if !ok {
153155
batcher = state.NewRangeQueueBatcher(s.queue)
154-
b.addDoneCallback(batcher.Done)
156+
b.addStateCallback(batcher.Done)
155157
b.batchers[queueID] = batcher
156158
}
157159
batcher.Merge(s.r)
@@ -164,14 +166,36 @@ func (b *logEventBatch) addDoneCallback(callback func()) {
164166
}
165167
}
166168

167-
// done runs all registered callbacks.
169+
// addStateCallback adds the callback to the state callbacks list.
170+
// State callbacks are specifically for updating the state file and are executed
171+
// even when a batch fails after exhausting all retry attempts.
172+
func (b *logEventBatch) addStateCallback(callback func()) {
173+
if callback != nil {
174+
b.stateCallbacks = append(b.stateCallbacks, callback)
175+
}
176+
}
177+
178+
// done runs all registered callbacks, including both success callbacks and state callbacks.
168179
func (b *logEventBatch) done() {
180+
b.updateState()
181+
169182
for i := len(b.doneCallbacks) - 1; i >= 0; i-- {
170183
done := b.doneCallbacks[i]
171184
done()
172185
}
173186
}
174187

188+
// updateState runs only the state callbacks to update the state file
189+
// without executing other success-related callbacks. This is used when a batch
190+
// fails after exhausting all retry attempts to prevent reprocessing the same
191+
// batch after restart.
192+
func (b *logEventBatch) updateState() {
193+
for i := len(b.stateCallbacks) - 1; i >= 0; i-- {
194+
callback := b.stateCallbacks[i]
195+
callback()
196+
}
197+
}
198+
175199
// build creates a cloudwatchlogs.PutLogEventsInput from the batch. The log events in the batch must be in
176200
// chronological order by their timestamp.
177201
func (b *logEventBatch) build() *cloudwatchlogs.PutLogEventsInput {

plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,137 @@ func TestLogEvent(t *testing.T) {
5151
}
5252

5353
func TestLogEventBatch(t *testing.T) {
54+
t.Run("UpdateStateOnly", func(t *testing.T) {
55+
batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil)
56+
57+
successCallbackCalled := false
58+
successCallback := func() {
59+
successCallbackCalled = true
60+
}
61+
62+
stateCallbackCalled := false
63+
stateCallback := func() {
64+
stateCallbackCalled = true
65+
}
66+
67+
batch.addDoneCallback(successCallback)
68+
batch.addStateCallback(stateCallback)
69+
70+
batch.updateState()
71+
72+
assert.False(t, successCallbackCalled, "Success callback should not have been called")
73+
assert.True(t, stateCallbackCalled, "State callback should have been called")
74+
})
75+
76+
t.Run("UpdateStateOnly_WithMultipleCallbacks", func(t *testing.T) {
77+
batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil)
78+
79+
successCallbacksCalled := make([]bool, 3)
80+
successCallbacks := []func(){
81+
func() { successCallbacksCalled[0] = true },
82+
func() { successCallbacksCalled[1] = true },
83+
func() { successCallbacksCalled[2] = true },
84+
}
85+
86+
stateCallbacksCalled := make([]bool, 3)
87+
stateCallbacks := []func(){
88+
func() { stateCallbacksCalled[0] = true },
89+
func() { stateCallbacksCalled[1] = true },
90+
func() { stateCallbacksCalled[2] = true },
91+
}
92+
93+
for _, cb := range successCallbacks {
94+
batch.addDoneCallback(cb)
95+
}
96+
for _, cb := range stateCallbacks {
97+
batch.addStateCallback(cb)
98+
}
99+
100+
batch.updateState()
101+
102+
// Verify none of the success callbacks were called
103+
for i, called := range successCallbacksCalled {
104+
assert.False(t, called, "Success callback %d should not have been called", i)
105+
}
106+
107+
// Verify all state callbacks were called
108+
for i, called := range stateCallbacksCalled {
109+
assert.True(t, called, "State callback %d should have been called", i)
110+
}
111+
})
112+
113+
t.Run("UpdateStateOnly_WithRangeQueueBatcher", func(t *testing.T) {
114+
batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil)
115+
116+
mrq1 := &mockRangeQueue{}
117+
mrq1.On("ID").Return("test1")
118+
mrq1.On("Enqueue", state.NewRange(10, 20)).Once()
119+
120+
mrq2 := &mockRangeQueue{}
121+
mrq2.On("ID").Return("test2")
122+
mrq2.On("Enqueue", state.NewRange(30, 40)).Once()
123+
124+
event1 := newStatefulLogEvent(time.Now(), "Test1", nil, &logEventState{
125+
r: state.NewRange(10, 20),
126+
queue: mrq1,
127+
})
128+
event2 := newStatefulLogEvent(time.Now(), "Test2", nil, &logEventState{
129+
r: state.NewRange(30, 40),
130+
queue: mrq2,
131+
})
132+
133+
successCallbackCalled := false
134+
batch.addDoneCallback(func() {
135+
successCallbackCalled = true
136+
})
137+
138+
batch.append(event1)
139+
batch.append(event2)
140+
141+
batch.updateState()
142+
143+
mrq1.AssertExpectations(t)
144+
mrq2.AssertExpectations(t)
145+
146+
assert.False(t, successCallbackCalled, "Success callback should not have been called")
147+
})
148+
149+
t.Run("UpdateStateWithDone", func(t *testing.T) {
150+
batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil)
151+
152+
mrq1 := &mockRangeQueue{}
153+
mrq1.On("ID").Return("test1")
154+
mrq1.On("Enqueue", state.NewRange(10, 20)).Once()
155+
156+
mrq2 := &mockRangeQueue{}
157+
mrq2.On("ID").Return("test2")
158+
mrq2.On("Enqueue", state.NewRange(30, 40)).Once()
159+
160+
event1 := newStatefulLogEvent(time.Now(), "Test1", nil, &logEventState{
161+
r: state.NewRange(10, 20),
162+
queue: mrq1,
163+
})
164+
event2 := newStatefulLogEvent(time.Now(), "Test2", nil, &logEventState{
165+
r: state.NewRange(30, 40),
166+
queue: mrq2,
167+
})
168+
169+
stateCallbackCalled := false
170+
batch.addStateCallback(func() {
171+
stateCallbackCalled = true
172+
})
173+
174+
batch.append(event1)
175+
batch.append(event2)
176+
177+
batch.done()
178+
179+
mrq1.AssertExpectations(t)
180+
mrq2.AssertExpectations(t)
181+
182+
assert.True(t, stateCallbackCalled, "State callback should have been called")
183+
})
184+
54185
t.Run("Append", func(t *testing.T) {
55186
batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil)
56187

plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ import (
1515
"github.com/aws/aws-sdk-go/aws"
1616
"github.com/aws/aws-sdk-go/aws/awserr"
1717
"github.com/influxdata/telegraf"
18+
"github.com/stretchr/testify/assert"
19+
"github.com/stretchr/testify/mock"
1820
"github.com/stretchr/testify/require"
1921

22+
"github.com/aws/amazon-cloudwatch-agent/internal/state"
2023
"github.com/aws/amazon-cloudwatch-agent/logs"
2124
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
2225
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
@@ -66,6 +69,23 @@ func (s *stubLogsService) DescribeLogGroups(in *cloudwatchlogs.DescribeLogGroups
6669
return nil, nil
6770
}
6871

72+
type mockSender struct {
73+
mock.Mock
74+
}
75+
76+
func (m *mockSender) Send(batch *logEventBatch) {
77+
m.Called(batch)
78+
}
79+
80+
func (m *mockSender) SetRetryDuration(d time.Duration) {
81+
m.Called(d)
82+
}
83+
84+
func (m *mockSender) RetryDuration() time.Duration {
85+
args := m.Called()
86+
return args.Get(0).(time.Duration)
87+
}
88+
6989
func TestAddSingleEvent_WithAccountId(t *testing.T) {
7090
t.Parallel()
7191
var wg sync.WaitGroup
@@ -685,3 +705,98 @@ func testPreparationWithLogger(
685705
)
686706
return stop, q.(*queue)
687707
}
708+
709+
func TestQueueCallbackRegistration(t *testing.T) {
710+
t.Run("RegistersCallbacks", func(t *testing.T) {
711+
var wg sync.WaitGroup
712+
var s stubLogsService
713+
var called bool
714+
715+
// Mock the PutLogEvents method to verify the batch has callbacks registered
716+
s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
717+
called = true
718+
return &cloudwatchlogs.PutLogEventsOutput{}, nil
719+
}
720+
721+
mockSender := &mockSender{}
722+
mockSender.On("Send", mock.AnythingOfType("*pusher.logEventBatch")).Run(func(args mock.Arguments) {
723+
batch := args.Get(0).(*logEventBatch)
724+
725+
assert.NotEmpty(t, batch.doneCallbacks, "Regular callbacks should be registered")
726+
assert.Empty(t, batch.stateCallbacks, "State callbacks should not be registered")
727+
728+
s.PutLogEvents(batch.build())
729+
}).Return()
730+
731+
logger := testutil.NewNopLogger()
732+
stop := make(chan struct{})
733+
q := &queue{
734+
target: Target{"G", "S", util.StandardLogGroupClass, -1},
735+
logger: logger,
736+
converter: newConverter(logger, Target{"G", "S", util.StandardLogGroupClass, -1}),
737+
batch: newLogEventBatch(Target{"G", "S", util.StandardLogGroupClass, -1}, nil),
738+
sender: mockSender,
739+
eventsCh: make(chan logs.LogEvent, 100),
740+
flushCh: make(chan struct{}),
741+
resetTimerCh: make(chan struct{}),
742+
flushTimer: time.NewTimer(10 * time.Millisecond),
743+
stop: stop,
744+
startNonBlockCh: make(chan struct{}),
745+
wg: &wg,
746+
}
747+
q.flushTimeout.Store(10 * time.Millisecond)
748+
749+
q.batch.append(newLogEvent(time.Now(), "test message", nil))
750+
q.send()
751+
752+
mockSender.AssertExpectations(t)
753+
assert.True(t, called, "PutLogEvents should have been called")
754+
})
755+
756+
t.Run("RegistersStateCallbacksForStatefulEvents", func(t *testing.T) {
757+
var wg sync.WaitGroup
758+
759+
mrq := &mockRangeQueue{}
760+
mrq.On("ID").Return("test-queue")
761+
mrq.On("Enqueue", mock.Anything).Return()
762+
763+
mockSender := &mockSender{}
764+
mockSender.On("Send", mock.AnythingOfType("*pusher.logEventBatch")).Run(func(args mock.Arguments) {
765+
batch := args.Get(0).(*logEventBatch)
766+
767+
assert.NotEmpty(t, batch.doneCallbacks, "Regular callbacks should be registered")
768+
assert.NotEmpty(t, batch.stateCallbacks, "State callbacks should be registered")
769+
770+
batcher, ok := batch.batchers["test-queue"]
771+
assert.True(t, ok, "Batch should have a batcher for our queue")
772+
assert.NotNil(t, batcher, "Batcher should not be nil")
773+
}).Return()
774+
775+
logger := testutil.NewNopLogger()
776+
stop := make(chan struct{})
777+
q := &queue{
778+
target: Target{"G", "S", util.StandardLogGroupClass, -1},
779+
logger: logger,
780+
converter: newConverter(logger, Target{"G", "S", util.StandardLogGroupClass, -1}),
781+
batch: newLogEventBatch(Target{"G", "S", util.StandardLogGroupClass, -1}, nil),
782+
sender: mockSender,
783+
eventsCh: make(chan logs.LogEvent, 100),
784+
flushCh: make(chan struct{}),
785+
resetTimerCh: make(chan struct{}),
786+
flushTimer: time.NewTimer(10 * time.Millisecond),
787+
stop: stop,
788+
startNonBlockCh: make(chan struct{}),
789+
wg: &wg,
790+
}
791+
q.flushTimeout.Store(10 * time.Millisecond)
792+
793+
event := newStubStatefulLogEvent("test message", time.Now(), state.NewRange(10, 20), mrq)
794+
795+
convertedEvent := q.converter.convert(event)
796+
q.batch.append(convertedEvent)
797+
798+
q.send()
799+
800+
mockSender.AssertExpectations(t)
801+
})
802+
}

plugins/outputs/cloudwatchlogs/internal/pusher/sender.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func (s *sender) Send(batch *logEventBatch) {
8787
var awsErr awserr.Error
8888
if !errors.As(err, &awsErr) {
8989
s.logger.Errorf("Non aws error received when sending logs to %v/%v: %v. CloudWatch agent will not retry and logs will be missing!", batch.Group, batch.Stream, err)
90+
batch.updateState()
9091
return
9192
}
9293

@@ -99,6 +100,7 @@ func (s *sender) Send(batch *logEventBatch) {
99100
case *cloudwatchlogs.InvalidParameterException,
100101
*cloudwatchlogs.DataAlreadyAcceptedException:
101102
s.logger.Errorf("%v, will not retry the request", e)
103+
batch.updateState()
102104
return
103105
default:
104106
s.logger.Errorf("Aws error received when sending logs to %v/%v: %v", batch.Group, batch.Stream, awsErr)
@@ -116,6 +118,7 @@ func (s *sender) Send(batch *logEventBatch) {
116118

117119
if time.Since(startTime)+wait > s.RetryDuration() {
118120
s.logger.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
121+
batch.updateState()
119122
return
120123
}
121124

@@ -124,6 +127,7 @@ func (s *sender) Send(batch *logEventBatch) {
124127
select {
125128
case <-s.stop:
126129
s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
130+
batch.updateState()
127131
return
128132
case <-time.After(wait):
129133
}

0 commit comments

Comments
 (0)