Skip to content

Commit 7e9f1b5

Browse files
authored
chore: add event delivery metrics for async destinations (#5930)
# Description Add event delivery metrics for async destinations ## Linear Ticket https://linear.app/rudderstack/issue/OBS-844/add-missing-event-delivery-time-metrics-for-async-destinations Resolves OBS-844 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 2e3b796 commit 7e9f1b5

File tree

2 files changed

+320
-0
lines changed

2 files changed

+320
-0
lines changed

router/batchrouter/handle_observability.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ import (
66
"time"
77

88
"github.com/rudderlabs/rudder-go-kit/jsonrs"
9+
"github.com/rudderlabs/rudder-go-kit/logger"
910
"github.com/rudderlabs/rudder-server/warehouse/router"
1011

12+
"github.com/tidwall/gjson"
1113
"github.com/tidwall/sjson"
1214

1315
"github.com/rudderlabs/rudder-go-kit/stats"
1416

17+
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
1518
"github.com/rudderlabs/rudder-server/jobsdb"
1619
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
1720
"github.com/rudderlabs/rudder-server/services/diagnostics"
@@ -103,6 +106,9 @@ func (brt *Handle) recordAsyncDestinationDeliveryStatus(sourceID, destinationID
103106
errorResp = []byte(`{"success":"OK"}`)
104107
}
105108

109+
// Emit event_delivery_time metric for successful async destination deliveries
110+
brt.emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID, statusList)
111+
106112
// Payload and AttemptNum don't make sense in recording batch router delivery status,
107113
// So they are set to default values.
108114
payload, err := sjson.SetBytes([]byte(`{}`), "success", fmt.Sprint(successCount)+" events")
@@ -128,6 +134,60 @@ func (brt *Handle) recordAsyncDestinationDeliveryStatus(sourceID, destinationID
128134
brt.debugger.RecordEventDeliveryStatus(destinationID, &deliveryStatus)
129135
}
130136

137+
// emitAsyncEventDeliveryTimeMetrics emits event_delivery_time metrics for successful async destination deliveries
138+
func (brt *Handle) emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID string, statusList []*jobsdb.JobStatusT) {
139+
// Get the async destination struct to access original job parameters
140+
asyncDestStruct := brt.asyncDestinationStruct[destinationID]
141+
if asyncDestStruct == nil {
142+
brt.logger.Errorn("Async destination struct not found for destinationID: %s", obskit.DestinationID(destinationID))
143+
return
144+
}
145+
146+
// Process each successful job status to emit event_delivery_time metric
147+
for _, status := range statusList {
148+
if status.JobState != jobsdb.Succeeded.State {
149+
continue // Only emit metrics for successful deliveries
150+
}
151+
152+
// Get original job parameters for this job
153+
originalParams, exists := asyncDestStruct.OriginalJobParameters[status.JobID]
154+
if !exists {
155+
brt.logger.Debugn("Original job parameters not found for jobID: %d", logger.NewIntField("jobID", status.JobID))
156+
continue
157+
}
158+
159+
// Extract receivedAt from original job parameters
160+
receivedAtStr := gjson.GetBytes(originalParams, "received_at").String()
161+
if receivedAtStr == "" {
162+
brt.logger.Debugn("ReceivedAt not found in job parameters for jobID: %d", logger.NewIntField("jobID", status.JobID))
163+
continue
164+
}
165+
166+
// Parse receivedAt time
167+
receivedTime, err := time.Parse(misc.RFC3339Milli, receivedAtStr)
168+
if err != nil {
169+
brt.logger.Debugn("Failed to parse receivedAt time for jobID: %d, receivedAt: %s, error: %v", logger.NewIntField("jobID", status.JobID), logger.NewStringField("receivedAt", receivedAtStr), obskit.Error(err))
170+
continue
171+
}
172+
173+
// Extract source category from original job parameters
174+
sourceCategory := gjson.GetBytes(originalParams, "source_category").String()
175+
176+
// Create and emit the event_delivery_time metric
177+
eventDeliveryTimeStat := stats.Default.NewTaggedStat("event_delivery_time", stats.TimerType, map[string]string{
178+
"module": "batch_router",
179+
"destType": brt.destType,
180+
"workspaceId": status.WorkspaceId,
181+
"sourceId": sourceID,
182+
"destID": destinationID,
183+
"sourceCategory": sourceCategory,
184+
})
185+
186+
// Send the timing metric (time from received to delivered)
187+
eventDeliveryTimeStat.SendTiming(time.Since(receivedTime))
188+
}
189+
}
190+
131191
func (brt *Handle) recordDeliveryStatus(batchDestination Connection, output UploadResult, isWarehouse bool) {
132192
var (
133193
errorCode string
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
package batchrouter
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/rudderlabs/rudder-go-kit/jsonrs"
11+
12+
"github.com/rudderlabs/rudder-go-kit/logger"
13+
"github.com/rudderlabs/rudder-go-kit/stats"
14+
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
15+
"github.com/rudderlabs/rudder-server/jobsdb"
16+
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
17+
"github.com/rudderlabs/rudder-server/utils/misc"
18+
)
19+
20+
func TestEmitAsyncEventDeliveryTimeMetrics_NoAsyncDestinationStruct(t *testing.T) {
21+
brt := &Handle{
22+
logger: logger.NOP,
23+
destType: "MARKETO_BULK_UPLOAD",
24+
asyncDestinationStruct: make(map[string]*common.AsyncDestinationStruct),
25+
}
26+
destinationID := "dest-123"
27+
sourceID := "source-456"
28+
workspaceID := "workspace-789"
29+
jobID := int64(12345)
30+
31+
statusList := []*jobsdb.JobStatusT{
32+
{
33+
JobID: jobID,
34+
JobState: jobsdb.Succeeded.State,
35+
WorkspaceId: workspaceID,
36+
},
37+
}
38+
require.NotPanics(t, func() {
39+
brt.emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID, statusList)
40+
})
41+
}
42+
43+
func TestEmitAsyncEventDeliveryTimeMetrics(t *testing.T) {
44+
// Set up the batch router handle
45+
brt := &Handle{
46+
logger: logger.NOP,
47+
destType: "MARKETO_BULK_UPLOAD",
48+
asyncDestinationStruct: make(map[string]*common.AsyncDestinationStruct),
49+
}
50+
51+
// Create test data
52+
destinationID := "dest-123"
53+
sourceID := "source-456"
54+
workspaceID := "workspace-789"
55+
jobID := int64(12345)
56+
57+
// Create a receivedAt time (2 hours ago)
58+
receivedAt := time.Now().Add(-2 * time.Hour)
59+
receivedAtStr := receivedAt.Format(misc.RFC3339Milli)
60+
61+
// Create job parameters with receivedAt
62+
jobParams := map[string]interface{}{
63+
"source_id": sourceID,
64+
"destination_id": destinationID,
65+
"received_at": receivedAtStr,
66+
"source_category": "cloud",
67+
"workspace_id": workspaceID,
68+
}
69+
jobParamsBytes, _ := jsonrs.Marshal(jobParams)
70+
71+
// Create destination config
72+
destination := &backendconfig.DestinationT{
73+
ID: destinationID,
74+
Name: "Test Marketo Destination",
75+
DestinationDefinition: backendconfig.DestinationDefinitionT{
76+
Name: "MARKETO_BULK_UPLOAD",
77+
},
78+
}
79+
80+
// Set up async destination struct
81+
brt.asyncDestinationStruct[destinationID] = &common.AsyncDestinationStruct{
82+
Destination: destination,
83+
OriginalJobParameters: map[int64]json.RawMessage{
84+
jobID: jobParamsBytes,
85+
},
86+
}
87+
88+
// Create job status list with successful job
89+
statusList := []*jobsdb.JobStatusT{
90+
{
91+
JobID: jobID,
92+
JobState: jobsdb.Succeeded.State,
93+
WorkspaceId: workspaceID,
94+
},
95+
}
96+
97+
// Use NOP stats to avoid actual metric emission during test
98+
originalStats := stats.Default
99+
stats.Default = stats.NOP
100+
defer func() {
101+
stats.Default = originalStats
102+
}()
103+
104+
// Call the function under test - should not panic
105+
require.NotPanics(t, func() {
106+
brt.emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID, statusList)
107+
})
108+
}
109+
110+
func TestEmitAsyncEventDeliveryTimeMetrics_NoSuccessfulJobs(t *testing.T) {
111+
brt := &Handle{
112+
logger: logger.NOP,
113+
destType: "MARKETO_BULK_UPLOAD",
114+
asyncDestinationStruct: make(map[string]*common.AsyncDestinationStruct),
115+
}
116+
117+
destinationID := "dest-123"
118+
sourceID := "source-456"
119+
workspaceID := "workspace-789"
120+
121+
// Create destination
122+
destination := &backendconfig.DestinationT{
123+
ID: destinationID,
124+
Name: "Test Marketo Destination",
125+
}
126+
127+
brt.asyncDestinationStruct[destinationID] = &common.AsyncDestinationStruct{
128+
Destination: destination,
129+
OriginalJobParameters: make(map[int64]json.RawMessage),
130+
}
131+
132+
// Create job status list with only failed jobs
133+
statusList := []*jobsdb.JobStatusT{
134+
{
135+
JobID: 12345,
136+
JobState: jobsdb.Failed.State,
137+
WorkspaceId: workspaceID,
138+
},
139+
}
140+
141+
// Use NOP stats to avoid actual metric emission during test
142+
originalStats := stats.Default
143+
stats.Default = stats.NOP
144+
defer func() {
145+
stats.Default = originalStats
146+
}()
147+
148+
// Call the function under test - should not panic and should not emit metrics for failed jobs
149+
require.NotPanics(t, func() {
150+
brt.emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID, statusList)
151+
})
152+
}
153+
154+
func TestEmitAsyncEventDeliveryTimeMetrics_MissingReceivedAt(t *testing.T) {
155+
brt := &Handle{
156+
logger: logger.NOP,
157+
destType: "MARKETO_BULK_UPLOAD",
158+
asyncDestinationStruct: make(map[string]*common.AsyncDestinationStruct),
159+
}
160+
161+
destinationID := "dest-123"
162+
sourceID := "source-456"
163+
workspaceID := "workspace-789"
164+
jobID := int64(12345)
165+
166+
// Create job parameters WITHOUT receivedAt
167+
jobParams := map[string]interface{}{
168+
"source_id": sourceID,
169+
"destination_id": destinationID,
170+
// "received_at" is missing
171+
"source_category": "cloud",
172+
}
173+
jobParamsBytes, _ := jsonrs.Marshal(jobParams)
174+
175+
destination := &backendconfig.DestinationT{
176+
ID: destinationID,
177+
Name: "Test Marketo Destination",
178+
}
179+
180+
brt.asyncDestinationStruct[destinationID] = &common.AsyncDestinationStruct{
181+
Destination: destination,
182+
OriginalJobParameters: map[int64]json.RawMessage{
183+
jobID: jobParamsBytes,
184+
},
185+
}
186+
187+
statusList := []*jobsdb.JobStatusT{
188+
{
189+
JobID: jobID,
190+
JobState: jobsdb.Succeeded.State,
191+
WorkspaceId: workspaceID,
192+
},
193+
}
194+
195+
// Use NOP stats to avoid actual metric emission during test
196+
originalStats := stats.Default
197+
stats.Default = stats.NOP
198+
defer func() {
199+
stats.Default = originalStats
200+
}()
201+
202+
// Call the function under test - should not panic when receivedAt is missing
203+
require.NotPanics(t, func() {
204+
brt.emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID, statusList)
205+
})
206+
}
207+
208+
func TestEmitAsyncEventDeliveryTimeMetrics_InvalidReceivedAt(t *testing.T) {
209+
brt := &Handle{
210+
logger: logger.NOP,
211+
destType: "MARKETO_BULK_UPLOAD",
212+
asyncDestinationStruct: make(map[string]*common.AsyncDestinationStruct),
213+
}
214+
215+
destinationID := "dest-123"
216+
sourceID := "source-456"
217+
workspaceID := "workspace-789"
218+
jobID := int64(12345)
219+
220+
// Create job parameters with invalid receivedAt format
221+
jobParams := map[string]interface{}{
222+
"source_id": sourceID,
223+
"destination_id": destinationID,
224+
"received_at": "invalid-time-format",
225+
"source_category": "cloud",
226+
}
227+
jobParamsBytes, _ := jsonrs.Marshal(jobParams)
228+
229+
destination := &backendconfig.DestinationT{
230+
ID: destinationID,
231+
Name: "Test Marketo Destination",
232+
}
233+
234+
brt.asyncDestinationStruct[destinationID] = &common.AsyncDestinationStruct{
235+
Destination: destination,
236+
OriginalJobParameters: map[int64]json.RawMessage{
237+
jobID: jobParamsBytes,
238+
},
239+
}
240+
241+
statusList := []*jobsdb.JobStatusT{
242+
{
243+
JobID: jobID,
244+
JobState: jobsdb.Succeeded.State,
245+
WorkspaceId: workspaceID,
246+
},
247+
}
248+
249+
// Use NOP stats to avoid actual metric emission during test
250+
originalStats := stats.Default
251+
stats.Default = stats.NOP
252+
defer func() {
253+
stats.Default = originalStats
254+
}()
255+
256+
// Call the function under test - should not panic when receivedAt format is invalid
257+
require.NotPanics(t, func() {
258+
brt.emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID, statusList)
259+
})
260+
}

0 commit comments

Comments
 (0)