Skip to content

Commit c600968

Browse files
authored
chore: stop collecting errors in proc_error jobsdb (#6232)
# Description We are no longer be collecting errors in `proc_error` jobsdb as we are planning to remove error collection for debugging purposes in the near future. What was happening until now: - rudder-server was collecting failed jobs from webhook, processor steps, router, and batchrouter, and was storing these failed jobs in the `proc_error` JobsDB. - periodically, the `stash` processor component was creating backup files from the `proc_error` JobsDB and uploading them to `rudder-proc-err-logs` in object storage. In the past, we relied on `proc_error` jobs for replays, but that’s no longer the case. At this point, it feels like we’re spending unnecessary resources collecting and storing all this information which no-one actually needs, using as an excuse that it is useful for debugging purposes… As a first step we are stopping collection and storage of failed jobs to `proc_error`. In a follow up step we will be removing all relevant `proc_error` code altogether. ## Linear Ticket resolves PIPE-2295 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent ffc0d7a commit c600968

23 files changed

+120
-399
lines changed

app/apphandlers/embeddedAppHandler.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -200,27 +200,27 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
200200
defer batchRouterDB.Close()
201201

202202
// We need two errorDBs, one in read & one in write mode to support separate gateway to store failures
203-
errDBForRead := jobsdb.NewForRead(
203+
errorDBForRead := jobsdb.NewForRead(
204204
"proc_error",
205205
jobsdb.WithClearDB(options.ClearDB),
206206
jobsdb.WithDSLimit(a.config.procErrorDSLimit),
207207
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
208208
jobsdb.WithStats(statsFactory),
209209
jobsdb.WithDBHandle(dbPool),
210210
)
211-
defer errDBForRead.Close()
212-
errDBForWrite := jobsdb.NewForWrite(
211+
defer errorDBForRead.Close()
212+
errorDBForWrite := jobsdb.NewForWrite(
213213
"proc_error",
214214
jobsdb.WithClearDB(options.ClearDB),
215215
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
216216
jobsdb.WithStats(statsFactory),
217217
jobsdb.WithDBHandle(dbPool),
218218
)
219-
defer errDBForWrite.Close()
220-
if err = errDBForWrite.Start(); err != nil {
221-
return fmt.Errorf("could not start errDBForWrite: %w", err)
219+
defer errorDBForWrite.Close()
220+
if err = errorDBForWrite.Start(); err != nil {
221+
return fmt.Errorf("could not start errorDBForWrite: %w", err)
222222
}
223-
defer errDBForWrite.Stop()
223+
defer errorDBForWrite.Stop()
224224

225225
schemaDB := jobsdb.NewForReadWrite(
226226
"esch",
@@ -281,8 +281,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
281281
gwDBForProcessor,
282282
routerDB,
283283
batchRouterDB,
284-
errDBForRead,
285-
errDBForWrite,
284+
errorDBForRead,
285+
errorDBForWrite,
286286
schemaDB,
287287
archivalDB,
288288
reporting,
@@ -310,7 +310,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
310310
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
311311
routerDB,
312312
),
313-
ProcErrorDB: errDBForWrite,
313+
ProcErrorDB: errorDBForWrite,
314314
TransientSources: transientSources,
315315
RsourcesService: rsourcesService,
316316
TransformerFeaturesService: transformerFeaturesService,
@@ -326,7 +326,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
326326
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
327327
batchRouterDB,
328328
),
329-
ProcErrorDB: errDBForWrite,
329+
ProcErrorDB: errorDBForWrite,
330330
TransientSources: transientSources,
331331
RsourcesService: rsourcesService,
332332
Debugger: destinationHandle,
@@ -340,7 +340,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
340340
GatewayDB: gwDBForProcessor,
341341
RouterDB: routerDB,
342342
BatchRouterDB: batchRouterDB,
343-
ErrorDB: errDBForRead,
343+
ErrorDB: errorDBForRead,
344344
EventSchemaDB: schemaDB,
345345
ArchivalDB: archivalDB,
346346
Processor: proc,
@@ -373,7 +373,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
373373
streamMsgValidator := stream.NewMessageValidator()
374374
gw := gateway.Handle{}
375375
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
376-
gatewayDB, errDBForWrite, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
376+
gatewayDB, errorDBForWrite, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
377377
streamMsgValidator, gateway.WithInternalHttpHandlers(
378378
map[string]http.Handler{
379379
"/drain": drainConfigManager.DrainConfigHttpHandler(),

app/apphandlers/gatewayAppHandler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,19 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
9090
}
9191
defer gatewayDB.Stop()
9292

93-
errDB := jobsdb.NewForWrite(
93+
errorDB := jobsdb.NewForWrite(
9494
"proc_error",
9595
jobsdb.WithClearDB(options.ClearDB),
9696
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
9797
jobsdb.WithStats(statsFactory),
9898
jobsdb.WithDBHandle(dbPool),
9999
)
100-
defer errDB.Close()
100+
defer errorDB.Close()
101101

102-
if err := errDB.Start(); err != nil {
103-
return fmt.Errorf("could not start errDB: %w", err)
102+
if err := errorDB.Start(); err != nil {
103+
return fmt.Errorf("could not start errorDB: %w", err)
104104
}
105-
defer errDB.Stop()
105+
defer errorDB.Stop()
106106

107107
g, ctx := errgroup.WithContext(ctx)
108108

@@ -145,7 +145,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
145145
}
146146
streamMsgValidator := stream.NewMessageValidator()
147147
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
148-
gatewayDB, errDB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
148+
gatewayDB, errorDB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
149149
streamMsgValidator, gateway.WithInternalHttpHandlers(
150150
map[string]http.Handler{
151151
"/drain": drainConfigHttpHandler,

app/apphandlers/processorAppHandler.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,27 +186,27 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
186186
jobsdb.WithDBHandle(dbPool),
187187
)
188188
defer batchRouterDB.Close()
189-
errDBForRead := jobsdb.NewForRead(
189+
errorDBForRead := jobsdb.NewForRead(
190190
"proc_error",
191191
jobsdb.WithClearDB(options.ClearDB),
192192
jobsdb.WithDSLimit(a.config.procErrorDSLimit),
193193
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
194194
jobsdb.WithStats(statsFactory),
195195
jobsdb.WithDBHandle(dbPool),
196196
)
197-
defer errDBForRead.Close()
198-
errDBForWrite := jobsdb.NewForWrite(
197+
defer errorDBForRead.Close()
198+
errorDBForWrite := jobsdb.NewForWrite(
199199
"proc_error",
200200
jobsdb.WithClearDB(options.ClearDB),
201201
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
202202
jobsdb.WithStats(statsFactory),
203203
jobsdb.WithDBHandle(dbPool),
204204
)
205-
errDBForWrite.Close()
206-
if err = errDBForWrite.Start(); err != nil {
207-
return fmt.Errorf("could not start errDBForWrite: %w", err)
205+
errorDBForWrite.Close()
206+
if err = errorDBForWrite.Start(); err != nil {
207+
return fmt.Errorf("could not start errorDBForWrite: %w", err)
208208
}
209-
defer errDBForWrite.Stop()
209+
defer errorDBForWrite.Stop()
210210
schemaDB := jobsdb.NewForReadWrite(
211211
"esch",
212212
jobsdb.WithClearDB(options.ClearDB),
@@ -277,8 +277,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
277277
gwDBForProcessor,
278278
routerDB,
279279
batchRouterDB,
280-
errDBForRead,
281-
errDBForWrite,
280+
errorDBForRead,
281+
errorDBForWrite,
282282
schemaDB,
283283
archivalDB,
284284
reporting,
@@ -306,7 +306,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
306306
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
307307
routerDB,
308308
),
309-
ProcErrorDB: errDBForWrite,
309+
ProcErrorDB: errorDBForWrite,
310310
TransientSources: transientSources,
311311
RsourcesService: rsourcesService,
312312
TransformerFeaturesService: transformerFeaturesService,
@@ -322,7 +322,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
322322
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
323323
batchRouterDB,
324324
),
325-
ProcErrorDB: errDBForWrite,
325+
ProcErrorDB: errorDBForWrite,
326326
TransientSources: transientSources,
327327
RsourcesService: rsourcesService,
328328
Debugger: destinationHandle,
@@ -337,7 +337,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
337337
GatewayDB: gwDBForProcessor,
338338
RouterDB: routerDB,
339339
BatchRouterDB: batchRouterDB,
340-
ErrorDB: errDBForRead,
340+
ErrorDB: errorDBForRead,
341341
SchemaForwarder: schemaForwarder,
342342
EventSchemaDB: schemaDB,
343343
ArchivalDB: archivalDB,

app/cluster/integration_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ func TestDynamicClusterManager(t *testing.T) {
114114

115115
archDB := jobsdb.NewForReadWrite("archival", jobsdb.WithStats(stats.NOP))
116116
defer archDB.TearDown()
117-
readErrDB := jobsdb.NewForRead("proc_error", jobsdb.WithStats(stats.NOP))
118-
defer readErrDB.TearDown()
119-
writeErrDB := jobsdb.NewForWrite("proc_error", jobsdb.WithStats(stats.NOP))
120-
require.NoError(t, writeErrDB.Start())
121-
defer writeErrDB.TearDown()
117+
readErrorDB := jobsdb.NewForRead("proc_error", jobsdb.WithStats(stats.NOP))
118+
defer readErrorDB.TearDown()
119+
writeErrorDB := jobsdb.NewForWrite("proc_error", jobsdb.WithStats(stats.NOP))
120+
require.NoError(t, writeErrorDB.Start())
121+
defer writeErrorDB.TearDown()
122122

123123
clearDb := false
124124
ctx := context.Background()
@@ -131,8 +131,8 @@ func TestDynamicClusterManager(t *testing.T) {
131131
gwDB,
132132
rtDB,
133133
brtDB,
134-
readErrDB,
135-
writeErrDB,
134+
readErrorDB,
135+
writeErrorDB,
136136
eschDB,
137137
archDB,
138138
&reporting.NOOP{},
@@ -155,7 +155,7 @@ func TestDynamicClusterManager(t *testing.T) {
155155
Reporting: &reporting.NOOP{},
156156
BackendConfig: mockBackendConfig,
157157
RouterDB: rtDB,
158-
ProcErrorDB: readErrDB,
158+
ProcErrorDB: readErrorDB,
159159
TransientSources: transientsource.NewEmptyService(),
160160
RsourcesService: mockRsourcesService,
161161
TransformerFeaturesService: transformer.NewNoOpService(),
@@ -165,7 +165,7 @@ func TestDynamicClusterManager(t *testing.T) {
165165
Reporting: &reporting.NOOP{},
166166
BackendConfig: mockBackendConfig,
167167
RouterDB: brtDB,
168-
ProcErrorDB: readErrDB,
168+
ProcErrorDB: readErrorDB,
169169
TransientSources: transientsource.NewEmptyService(),
170170
RsourcesService: mockRsourcesService,
171171
}
@@ -192,7 +192,7 @@ func TestDynamicClusterManager(t *testing.T) {
192192
GatewayDB: gwDB,
193193
RouterDB: rtDB,
194194
BatchRouterDB: brtDB,
195-
ErrorDB: readErrDB,
195+
ErrorDB: readErrorDB,
196196
EventSchemaDB: eschDB,
197197
ArchivalDB: archDB,
198198
SchemaForwarder: schemaForwarder,

gateway/gateway_test.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"encoding/base64"
7-
"encoding/json"
87
"errors"
98
"fmt"
109
"io"
@@ -49,7 +48,6 @@ import (
4948
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
5049
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
5150
"github.com/rudderlabs/rudder-server/gateway/response"
52-
webhookModel "github.com/rudderlabs/rudder-server/gateway/webhook/model"
5351
"github.com/rudderlabs/rudder-server/jobsdb"
5452
mocksApp "github.com/rudderlabs/rudder-server/mocks/app"
5553
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
@@ -1663,59 +1661,6 @@ var _ = Describe("Gateway", func() {
16631661
err := gateway.Shutdown()
16641662
Expect(err).To(BeNil())
16651663
})
1666-
1667-
It("should save failures to error db", func() {
1668-
c.mockErrJobsDB.
1669-
EXPECT().Store(
1670-
gomock.Any(),
1671-
gomock.Any(),
1672-
).
1673-
DoAndReturn(
1674-
func(
1675-
ctx context.Context,
1676-
jobs []*jobsdb.JobT,
1677-
) error {
1678-
for idx, job := range jobs {
1679-
Expect(misc.IsValidUUID(job.UUID.String())).To(BeTrue())
1680-
Expect(job.CustomVal).To(Equal("WEBHOOK"))
1681-
1682-
var paramsMap, expectedParamsMap map[string]interface{}
1683-
var expectedStr []byte
1684-
1685-
switch idx {
1686-
case 0:
1687-
Expect(job.EventPayload).To(Equal(json.RawMessage(`{"a1": "b1"}`)))
1688-
expectedStr = []byte(fmt.Sprintf(`{"source_id": "%v", "stage": "webhook", "source_type": "cio", "reason": "err1"}`, SourceIDEnabled))
1689-
case 1:
1690-
Expect(job.EventPayload).To(Equal(json.RawMessage(`{"a2": "b2"}`)))
1691-
expectedStr = []byte(fmt.Sprintf(`{"source_id": "%v", "stage": "webhook", "source_type": "af", "reason": "err2"}`, SourceIDEnabled))
1692-
}
1693-
1694-
_ = jsonrs.Unmarshal(job.Parameters, &paramsMap)
1695-
_ = jsonrs.Unmarshal(expectedStr, &expectedParamsMap)
1696-
equals := reflect.DeepEqual(paramsMap, expectedParamsMap)
1697-
Expect(equals).To(BeTrue())
1698-
}
1699-
return nil
1700-
}).
1701-
Times(1)
1702-
1703-
reqs := make([]*webhookModel.FailedWebhookPayload, 2)
1704-
reqs[0] = &webhookModel.FailedWebhookPayload{
1705-
RequestContext: rCtxEnabled,
1706-
Payload: []byte(`{"a1": "b1"}`),
1707-
SourceType: "cio",
1708-
Reason: "err1",
1709-
}
1710-
reqs[1] = &webhookModel.FailedWebhookPayload{
1711-
RequestContext: rCtxEnabled,
1712-
Payload: []byte(`{"a2": "b2"}`),
1713-
SourceType: "af",
1714-
Reason: "err2",
1715-
}
1716-
err := gateway.SaveWebhookFailures(reqs)
1717-
Expect(err).To(BeNil())
1718-
})
17191664
})
17201665

17211666
Context("Internal Batch", func() {

gateway/handle.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type Handle struct {
6262
application app.App
6363
backendConfig backendconfig.BackendConfig
6464
jobsDB jobsdb.JobsDB
65-
errDB jobsdb.JobsDB
65+
errorDB jobsdb.JobsDB
6666
rateLimiter throttler.Throttler
6767
versionHandler func(w http.ResponseWriter, r *http.Request)
6868
rsourcesService rsources.JobService
@@ -132,6 +132,7 @@ type Handle struct {
132132
enableInternalBatchEnrichment config.ValueLoader[bool]
133133
enableEventBlocking config.ValueLoader[bool]
134134
webhookV2HandlerEnabled bool
135+
errorDBEnabled config.ValueLoader[bool]
135136
}
136137

137138
// additional internal http handlers

gateway/handle_lifecycle.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ This function will block until backend config is initially received.
7070
func (gw *Handle) Setup(
7171
ctx context.Context,
7272
config *config.Config, logger logger.Logger, stat stats.Stats,
73-
application app.App, backendConfig backendconfig.BackendConfig, jobsDB, errDB jobsdb.JobsDB,
73+
application app.App, backendConfig backendconfig.BackendConfig, jobsDB, errorDB jobsdb.JobsDB,
7474
rateLimiter throttler.Throttler, versionHandler func(w http.ResponseWriter, r *http.Request),
7575
rsourcesService rsources.JobService, transformerFeaturesService transformer.FeaturesService,
7676
sourcehandle sourcedebugger.SourceDebugger, streamMsgValidator func(message *stream.Message) error,
@@ -83,7 +83,7 @@ func (gw *Handle) Setup(
8383
gw.application = application
8484
gw.backendConfig = backendConfig
8585
gw.jobsDB = jobsDB
86-
gw.errDB = errDB
86+
gw.errorDB = errorDB
8787
gw.rateLimiter = rateLimiter
8888
gw.versionHandler = versionHandler
8989
gw.rsourcesService = rsourcesService
@@ -129,6 +129,7 @@ func (gw *Handle) Setup(
129129
gw.conf.enableInternalBatchEnrichment = config.GetReloadableBoolVar(true, "gateway.enableBatchEnrichment")
130130
// enable webhook v2 handler. disabled by default
131131
gw.conf.webhookV2HandlerEnabled = config.GetBoolVar(false, "Gateway.webhookV2HandlerEnabled")
132+
gw.conf.errorDBEnabled = config.GetReloadableBoolVar(false, "ErrorDB.enabled")
132133
// enable event blocking. false by default
133134
gw.conf.enableEventBlocking = config.GetReloadableBoolVar(false, "enableEventBlocking")
134135
// Registering stats

gateway/handle_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func createTestGateway(t *testing.T, enableEventBlocking bool, eventBlockingSett
9292
enableInternalBatchEnrichment config.ValueLoader[bool]
9393
enableEventBlocking config.ValueLoader[bool]
9494
webhookV2HandlerEnabled bool
95+
errorDBEnabled config.ValueLoader[bool]
9596
}{
9697
enableEventBlocking: config.SingleValueLoader(enableEventBlocking),
9798
enableInternalBatchValidator: config.SingleValueLoader(false),

gateway/handle_webhook.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ func (gw *Handle) ProcessTransformedWebhookRequest(w *http.ResponseWriter, r *ht
3030

3131
// SaveWebhookFailures saves errors to the error db
3232
func (gw *Handle) SaveWebhookFailures(reqs []*model.FailedWebhookPayload) error {
33+
if !gw.conf.errorDBEnabled.Load() {
34+
return nil
35+
}
36+
3337
jobs := make([]*jobsdb.JobT, 0, len(reqs))
3438
for _, req := range reqs {
3539
params := map[string]any{
@@ -63,5 +67,5 @@ func (gw *Handle) SaveWebhookFailures(reqs []*model.FailedWebhookPayload) error
6367

6468
ctx, cancel := context.WithTimeout(context.Background(), gw.conf.WriteTimeout)
6569
defer cancel()
66-
return gw.errDB.Store(ctx, jobs)
70+
return gw.errorDB.Store(ctx, jobs)
6771
}

0 commit comments

Comments
 (0)