Skip to content

Commit a8b3a00

Browse files
authored
feat: introduce router transformer metrics for cost attribution (#5511)
1 parent 7fcdc47 commit a8b3a00

File tree

3 files changed

+234
-19
lines changed

3 files changed

+234
-19
lines changed

processor/transformer/transformer.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/rudderlabs/rudder-go-kit/logger"
2525
"github.com/rudderlabs/rudder-go-kit/stats"
2626

27+
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
2728
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
2829
transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client"
2930
"github.com/rudderlabs/rudder-server/processor/integrations"
@@ -158,6 +159,20 @@ func (t transformerMetricLabels) ToStatsTag() stats.Tags {
158159
return tags
159160
}
160161

162+
// ToLoggerFields converts the metric labels to a slice of logger.Fields
163+
func (t transformerMetricLabels) ToLoggerFields() []logger.Field {
164+
return []logger.Field{
165+
logger.NewStringField("endpoint", t.Endpoint),
166+
logger.NewStringField("stage", t.Stage),
167+
obskit.DestinationType(t.DestinationType),
168+
obskit.SourceType(t.SourceType),
169+
obskit.WorkspaceID(t.WorkspaceID),
170+
obskit.DestinationID(t.DestinationID),
171+
obskit.SourceID(t.SourceID),
172+
logger.NewStringField("transformationId", t.TransformationID),
173+
}
174+
}
175+
161176
func isJobTerminated(status int) bool {
162177
if status == http.StatusTooManyRequests || status == http.StatusRequestTimeout {
163178
return false
@@ -395,7 +410,8 @@ func (trans *handle) transform(
395410

396411
trackWg.Add(1)
397412
go func() {
398-
trackLongRunningTransformation(ctx, labels.Stage, trans.config.timeoutDuration, trans.logger.With(labels.ToStatsTag()))
413+
l := trans.logger.Withn(labels.ToLoggerFields()...)
414+
trackLongRunningTransformation(ctx, labels.Stage, trans.config.timeoutDuration, l)
399415
trackWg.Done()
400416
}()
401417

router/transformer/transformer.go

Lines changed: 96 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"io"
1212
"net/http"
13+
"net/url"
1314
"os"
1415
"strconv"
1516
"strings"
@@ -61,6 +62,8 @@ type handle struct {
6162
transformRequestTimerStat stats.Measurement
6263
logger logger.Logger
6364

65+
stats stats.Stats
66+
6467
// clientOAuthV2 is the HTTP client for router transformation requests using OAuth V2.
6568
clientOAuthV2 *http.Client
6669
// proxyClientOAuthV2 is the mockable HTTP client for transformer proxy requests using OAuth V2.
@@ -127,6 +130,34 @@ func NewTransformer(destinationTimeout, transformTimeout time.Duration, backendC
127130

128131
var loggerOverride logger.Logger
129132

133+
// Add transformerMetricLabels struct and methods
134+
type transformerMetricLabels struct {
135+
Endpoint string // hostname of the service
136+
DestinationType string // BQ, etc.
137+
SourceType string // webhook
138+
Stage string // processor, router, gateway
139+
WorkspaceID string // workspace identifier
140+
SourceID string // source identifier
141+
DestinationID string // destination identifier
142+
}
143+
144+
// ToStatsTag converts transformerMetricLabels to stats.Tags
145+
func (t transformerMetricLabels) ToStatsTag() stats.Tags {
146+
tags := stats.Tags{
147+
"endpoint": t.Endpoint,
148+
"destinationType": t.DestinationType,
149+
"sourceType": t.SourceType,
150+
"stage": t.Stage,
151+
"workspaceId": t.WorkspaceID,
152+
"destinationId": t.DestinationID,
153+
"sourceId": t.SourceID,
154+
155+
// Legacy tags: to be removed
156+
"destType": t.DestinationType,
157+
}
158+
return tags
159+
}
160+
130161
// Transform transforms router jobs to destination jobs
131162
func (trans *handle) Transform(transformType string, transformMessage *types.TransformMessageT) []types.DestinationJobT {
132163
var destinationJobs types.DestinationJobs
@@ -138,24 +169,37 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
138169
trans.logger.Errorf("problematic input for marshalling: %#v", transformMessage)
139170
panic(err)
140171
}
141-
trans.logger.Debugf("[Router Transformer] :: input payload : %s", string(rawJSON))
142-
143-
retryCount := 0
144-
var resp *http.Response
145-
var respData []byte
146-
// We should rarely have error communicating with our JS
147-
reqFailed := false
148172

149173
var url string
150174
if transformType == BATCH {
151175
url = getBatchURL()
152176
} else if transformType == ROUTER_TRANSFORM {
153177
url = getRouterTransformURL()
154178
} else {
155-
// Unexpected transformType returning empty
156179
return []types.DestinationJobT{}
157180
}
158181

182+
// Create metric labels
183+
labels := transformerMetricLabels{
184+
Endpoint: getEndpointFromURL(url),
185+
Stage: "router",
186+
DestinationType: transformMessage.Data[0].Destination.DestinationDefinition.Name,
187+
SourceType: transformMessage.Data[0].JobMetadata.SourceCategory,
188+
WorkspaceID: transformMessage.Data[0].JobMetadata.WorkspaceID,
189+
SourceID: transformMessage.Data[0].JobMetadata.SourceID,
190+
DestinationID: transformMessage.Data[0].Destination.ID,
191+
}
192+
193+
// Record request metrics
194+
trans.stats.NewTaggedStat("transformer_client_request_total_events", stats.CountType, labels.ToStatsTag()).Count(len(transformMessage.Data))
195+
trans.stats.NewTaggedStat("transformer_client_request_total_bytes", stats.CountType, labels.ToStatsTag()).Count(len(rawJSON))
196+
197+
retryCount := 0
198+
var resp *http.Response
199+
var respData []byte
200+
// We should rarely have error communicating with our JS
201+
reqFailed := false
202+
159203
for {
160204
s := time.Now()
161205
req, err := http.NewRequest("POST", url, bytes.NewBuffer(rawJSON))
@@ -183,14 +227,19 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
183227
resp, err = trans.client.Do(req)
184228
}
185229

230+
duration := time.Since(s)
231+
trans.stats.NewTaggedStat("transformer_client_total_durations_seconds", stats.CountType, labels.ToStatsTag()).Count(int(duration.Seconds()))
232+
186233
if err == nil {
187234
// If no err returned by client.Post, reading body.
188235
// If reading body fails, retrying.
189236
respData, err = io.ReadAll(resp.Body)
237+
trans.stats.NewTaggedStat("transformer_client_response_total_bytes", stats.CountType, labels.ToStatsTag()).Count(len(respData))
238+
190239
}
191240

192241
if err != nil {
193-
trans.transformRequestTimerStat.SendTiming(time.Since(s))
242+
trans.transformRequestTimerStat.SendTiming(duration)
194243
reqFailed = true
195244
trans.logger.Errorn(
196245
"JS HTTP connection error",
@@ -215,7 +264,7 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
215264
)
216265
}
217266

218-
trans.transformRequestTimerStat.SendTiming(time.Since(s))
267+
trans.transformRequestTimerStat.SendTiming(duration)
219268
break
220269
}
221270

@@ -254,6 +303,9 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
254303
err = jsonfast.Unmarshal(rawResp, &destinationJobs)
255304
}
256305

306+
// Record response events metric
307+
trans.stats.NewTaggedStat("transformer_client_response_total_events", stats.CountType, labels.ToStatsTag()).Count(len(destinationJobs))
308+
257309
// Validate the response received from the transformer
258310
in := transformMessage.JobIDs()
259311
var out []int64
@@ -359,11 +411,6 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
359411
routerJobDontBatchDirectives[m.JobID] = m.DontBatch
360412
}
361413

362-
stats.Default.NewTaggedStat("transformer_proxy.delivery_request", stats.CountType, stats.Tags{
363-
"destType": proxyReqParams.DestName,
364-
"workspaceId": proxyReqParams.ResponseData.Metadata[0].WorkspaceID,
365-
"destinationId": proxyReqParams.ResponseData.Metadata[0].DestinationID,
366-
}).Increment()
367414
trans.logger.Debugf(`[TransformerProxy] (Dest-%[1]v) Proxy Request starts - %[1]v`, proxyReqParams.DestName)
368415

369416
payload, err := proxyReqParams.Adapter.getPayload(proxyReqParams)
@@ -377,6 +424,7 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
377424
DontBatchDirectives: routerJobDontBatchDirectives,
378425
}
379426
}
427+
380428
proxyURL, err := proxyReqParams.Adapter.getProxyURL(proxyReqParams.DestName)
381429
if err != nil {
382430
return ProxyRequestResponse{
@@ -389,13 +437,32 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
389437
}
390438
}
391439

440+
// Create metric labels
441+
labels := transformerMetricLabels{
442+
Endpoint: getEndpointFromURL(proxyURL),
443+
Stage: "router_proxy",
444+
DestinationType: proxyReqParams.DestName,
445+
WorkspaceID: proxyReqParams.ResponseData.Metadata[0].WorkspaceID,
446+
DestinationID: proxyReqParams.ResponseData.Metadata[0].DestinationID,
447+
}.ToStatsTag()
448+
449+
// Record request metrics
450+
trans.stats.NewTaggedStat("transformer_proxy.delivery_request", stats.CountType, labels).Increment()
451+
trans.stats.NewTaggedStat("transformer_client_request_total_bytes", stats.CountType, labels).Count(len(payload))
452+
trans.stats.NewTaggedStat("transformer_client_request_total_events", stats.CountType, labels).Count(len(proxyReqParams.ResponseData.Metadata))
453+
392454
rdlTime := time.Now()
393455
httpPrxResp := trans.doProxyRequest(ctx, proxyURL, proxyReqParams, payload)
394456
respData, respCode, requestError := httpPrxResp.respData, httpPrxResp.statusCode, httpPrxResp.err
395457

396-
reqSuccessStr := strconv.FormatBool(requestError == nil)
397-
stats.Default.NewTaggedStat("transformer_proxy.request_latency", stats.TimerType, stats.Tags{"requestSuccess": reqSuccessStr, "destType": proxyReqParams.DestName}).SendTiming(time.Since(rdlTime))
398-
stats.Default.NewTaggedStat("transformer_proxy.request_result", stats.CountType, stats.Tags{"requestSuccess": reqSuccessStr, "destType": proxyReqParams.DestName}).Increment()
458+
duration := time.Since(rdlTime)
459+
460+
trans.stats.NewTaggedStat("transformer_client_total_durations_seconds", stats.CountType, labels).Count(int(duration.Seconds()))
461+
trans.stats.NewTaggedStat("transformer_client_response_total_bytes", stats.CountType, labels).Count(len(respData))
462+
463+
labelsWithSuccess := lo.Assign(labels, stats.Tags{"requestSuccess": strconv.FormatBool(requestError == nil)})
464+
trans.stats.NewTaggedStat("transformer_proxy.request_latency", stats.TimerType, labelsWithSuccess).SendTiming(duration)
465+
trans.stats.NewTaggedStat("transformer_proxy.request_result", stats.CountType, labelsWithSuccess).Increment()
399466

400467
if requestError != nil {
401468
return ProxyRequestResponse{
@@ -471,6 +538,8 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
471538
respData = []byte(transportResponse.InterceptorResponse.Response)
472539
}
473540

541+
trans.stats.NewTaggedStat("transformer_client_response_total_events", stats.CountType, labels).Count(len(transResp.routerJobResponseCodes))
542+
474543
return ProxyRequestResponse{
475544
ProxyRequestStatusCode: respCode,
476545
ProxyRequestResponseBody: string(respData),
@@ -532,6 +601,7 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c
532601
trans.proxyClient = transformerclient.NewClient(transformerClientConfig)
533602
// This client is used for Transformer Proxy(delivered from transformer to destination) using oauthV2
534603
trans.proxyClientOAuthV2 = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformProxyResponse, proxyClientOptionalArgs)
604+
trans.stats = stats.Default
535605
trans.transformRequestTimerStat = stats.Default.NewStat("router.transformer_request_time", stats.TimerType)
536606
}
537607

@@ -677,3 +747,11 @@ func GetAuthErrorCategoryFromTransformProxyResponse(respData []byte) (string, er
677747
}
678748
return transformedJobs.AuthErrorCategory, nil
679749
}
750+
751+
// Helper function to get endpoint from URL
752+
func getEndpointFromURL(urlStr string) string {
753+
if parsedURL, err := url.Parse(urlStr); err == nil {
754+
return parsedURL.Host
755+
}
756+
return ""
757+
}

router/transformer/transformer_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020

2121
"github.com/rudderlabs/rudder-go-kit/config"
2222
"github.com/rudderlabs/rudder-go-kit/logger"
23+
"github.com/rudderlabs/rudder-go-kit/stats"
24+
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
2325
"github.com/rudderlabs/rudder-go-kit/stats/mock_stats"
2426

2527
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
@@ -1981,3 +1983,122 @@ func normalizeErrors(transformerResponse []types.DestinationJobT, prefix string)
19811983
}
19821984
}
19831985
}
1986+
1987+
func TestTransformerMetrics(t *testing.T) {
1988+
initMocks(t)
1989+
config.Reset()
1990+
loggerOverride = logger.NOP
1991+
1992+
statsStore, err := memstats.New()
1993+
require.NoError(t, err)
1994+
1995+
// Setup test server
1996+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1997+
w.Header().Add(apiVersionHeader, strconv.Itoa(utilTypes.SupportedTransformerApiVersion))
1998+
response := []types.DestinationJobT{
1999+
{JobMetadataArray: []types.JobMetadataT{{JobID: 1}}, StatusCode: http.StatusOK},
2000+
{JobMetadataArray: []types.JobMetadataT{{JobID: 2}}, StatusCode: http.StatusOK},
2001+
}
2002+
b, err := json.Marshal(response)
2003+
require.NoError(t, err)
2004+
2005+
// For BATCH transform type, don't wrap in output field
2006+
_, err = w.Write(b)
2007+
require.NoError(t, err)
2008+
}))
2009+
defer srv.Close()
2010+
2011+
isOAuthV2EnabledLoader := config.SingleValueLoader(false)
2012+
expTimeDiff := config.SingleValueLoader(1 * time.Minute)
2013+
2014+
t.Setenv("DEST_TRANSFORM_URL", srv.URL)
2015+
// Create transformer with stats store
2016+
tr := &handle{
2017+
stats: statsStore,
2018+
logger: logger.NOP,
2019+
client: srv.Client(),
2020+
proxyClient: srv.Client(),
2021+
tr: &http.Transport{},
2022+
oAuthV2EnabledLoader: isOAuthV2EnabledLoader,
2023+
expirationTimeDiff: expTimeDiff,
2024+
transformRequestTimerStat: statsStore.NewStat("router.transformer_request_time", stats.TimerType), // Add this line
2025+
}
2026+
2027+
// Create test message
2028+
transformMessage := &types.TransformMessageT{
2029+
DestType: "test_destination", // Add this line
2030+
Data: []types.RouterJobT{
2031+
{
2032+
JobMetadata: types.JobMetadataT{
2033+
JobID: 1,
2034+
WorkspaceID: "workspace_1",
2035+
SourceID: "source_1",
2036+
},
2037+
Destination: backendconfig.DestinationT{
2038+
ID: "destination_1",
2039+
DestinationDefinition: backendconfig.DestinationDefinitionT{
2040+
Name: "test_destination",
2041+
},
2042+
},
2043+
},
2044+
{
2045+
JobMetadata: types.JobMetadataT{
2046+
JobID: 2,
2047+
WorkspaceID: "workspace_1",
2048+
SourceID: "source_1",
2049+
},
2050+
Destination: backendconfig.DestinationT{
2051+
ID: "destination_1",
2052+
DestinationDefinition: backendconfig.DestinationDefinitionT{
2053+
Name: "test_destination",
2054+
},
2055+
},
2056+
},
2057+
},
2058+
}
2059+
transformMessage.Data[0].JobMetadata.SourceCategory = "webhook"
2060+
transformMessage.Data[1].JobMetadata.SourceCategory = "webhook"
2061+
2062+
// Expected tags for metrics
2063+
expectedTags := stats.Tags{
2064+
"endpoint": getEndpointFromURL(srv.URL),
2065+
"stage": "router",
2066+
"destinationType": "test_destination",
2067+
"sourceType": "webhook",
2068+
"workspaceId": "workspace_1",
2069+
"destinationId": "destination_1",
2070+
"sourceId": "source_1",
2071+
"destType": "test_destination", // Legacy tag
2072+
}
2073+
2074+
// Perform transformation
2075+
response := tr.Transform(BATCH, transformMessage)
2076+
2077+
// Verify response
2078+
require.NotNil(t, response)
2079+
require.Len(t, response, 2)
2080+
require.Equal(t, http.StatusOK, response[0].StatusCode)
2081+
require.Equal(t, http.StatusOK, response[1].StatusCode)
2082+
2083+
// Verify metrics
2084+
metricsToCheck := []string{
2085+
"transformer_client_request_total_bytes",
2086+
"transformer_client_response_total_bytes",
2087+
"transformer_client_request_total_events",
2088+
"transformer_client_response_total_events",
2089+
"transformer_client_total_durations_seconds",
2090+
}
2091+
2092+
for _, metricName := range metricsToCheck {
2093+
measurements := statsStore.GetByName(metricName)
2094+
require.NotEmpty(t, measurements, "metric %s should not be empty", metricName)
2095+
require.Equal(t, expectedTags, measurements[0].Tags, "metric %s tags mismatch", metricName)
2096+
2097+
// Verify counts for event metrics
2098+
if metricName == "transformer_client_request_total_events" ||
2099+
metricName == "transformer_client_response_total_events" {
2100+
require.Equal(t, float64(2), measurements[0].Value,
2101+
"metric %s should have count of 2 events", metricName)
2102+
}
2103+
}
2104+
}

0 commit comments

Comments
 (0)