Skip to content

Commit e27b8c0

Browse files
authored
fix: warehouse source empty jobs (#6204)
# Description - While inserting source jobs, in case of empty tables or jobs, return an error instead of silently succeeding it. ``` goroutine 62670 [running]: github.com/rudderlabs/rudder-go-kit/logger.(*logger).Fataln(0x4000805680, {0x480acd4, 0x27}, {0x40028db808, 0x7, 0x3?}) \t/go/pkg/mod/github.com/rudderlabs/[email protected]/logger/logger.go:381 +0x234 github.com/rudderlabs/rudder-server/utils/crash.(*panicLogger).Handler.func1.(*panicLogger).Notify.1.1() \t/rudder-server/utils/crash/logger.go:33 +0x57c sync.(*Once).doSlow(0x16?, 0x25?) \t/usr/local/go/src/sync/once.go:78 +0xf0 sync.(*Once).Do(0x4006826bd8?, 0x4fa9c?) \t/usr/local/go/src/sync/once.go:69 +0x24 github.com/rudderlabs/rudder-server/utils/crash.(*panicLogger).Handler.func1.(*panicLogger).Notify.1() \t/rudder-server/utils/crash/logger.go:32 +0x90 panic({0x43fe800?, 0x4083180b10?}) \t/usr/local/go/src/runtime/panic.go:792 +0x124 github.com/rudderlabs/rudder-server/warehouse/internal/repo.(*Source).Insert(0x400082b650, {0x509cf78, 0x4005449a10}, {0x7e4f280, 0x0, 0x0}) \t/rudder-server/warehouse/internal/repo/source.go:54 +0x2e8 github.com/rudderlabs/rudder-server/warehouse/source.(*Manager).InsertJobs(0x4000a9c6c0, {0x509cf78, 0x4005449a10}, {{0x4082e86da0, 0x1b}, {0x4082e86dc0, 0x1b}, {{0x0, 0xee01da36a, 0x0}}, ...}) \t/rudder-server/warehouse/source/source.go:112 +0x7d0 github.com/rudderlabs/rudder-server/warehouse/source.(*Manager).InsertJobHandler(0x4000a9c6c0, {0x5092510, 0x4083193308}, 0x40010b7680) \t/rudder-server/warehouse/source/http.go:37 +0x33c net/http.HandlerFunc.ServeHTTP(0x400082b9e0?, {0x5092510?, 0x4083193308?}, 0x0?) \t/usr/local/go/src/net/http/server.go:2294 +0x38 github.com/rudderlabs/rudder-server/warehouse/api.(*Api).addMasterEndpoints.func1.1.(*Api).logMiddleware.3({0x5092510, 0x4083193308}, 0x40010b7680) \t/rudder-server/warehouse/api/http.go:460 +0x58 net/http.HandlerFunc.ServeHTTP(0x4043b52c30?, {0x5092510?, 0x4083193308?}, 0x400e4ccba4?) \t/usr/local/go/src/net/http/server.go:2294 +0x38 github.com/go-chi/chi/v5.(*Mux).routeHTTP(0x400c4c0120, {0x5092510, 0x4083193308}, 0x40010b7680) \t/go/pkg/mod/github.com/go-chi/chi/[email protected]/mux.go:478 +0x280 net/http.Hand","stack":"goroutine 62670 [running]: runtime/debug.Stack() \t/usr/local/go/src/runtime/debug/stack.go:26 +0x64 github.com/rudderlabs/rudder-server/utils/crash.(*panicLogger).Handler.func1.(*panicLogger).Notify.1.1() \t/rudder-server/utils/crash/logger.go:34 +0x4c sync.(*Once).doSlow(0x16?, 0x25?) \t/usr/local/go/src/sync/once.go:78 +0xf0 sync.(*Once).Do(0x4006826bd8?, 0x4fa9c?) \t/usr/local/go/src/sync/once.go:69 +0x24 github.com/rudderlabs/rudder-server/utils/crash.(*panicLogger).Handler.func1.(*panicLogger).Notify.1() \t/rudder-server/utils/crash/logger.go:32 +0x90 panic({0x43fe800?, 0x4083180b10?}) \t/usr/local/go/src/runtime/panic.go:792 +0x124 github.com/rudderlabs/rudder-server/warehouse/internal/repo.(*Source).Insert(0x400082b650, {0x509cf78, 0x4005449a10}, {0x7e4f280, 0x0, 0x0}) \t/rudder-server/warehouse/internal/repo/source.go:54 +0x2e8 github.com/rudderlabs/rudder-server/warehouse/source.(*Manager).InsertJobs(0x4000a9c6c0, {0x509cf78, 0x4005449a10}, {{0x4082e86da0, 0x1b}, {0x4082e86dc0, 0x1b}, {{0x0, 0xee01da36a, 0x0}}, ...}) \t/rudder-server/warehouse/source/source.go:112 +0x7d0 github.com/rudderlabs/rudder-server/warehouse/source.(*Manager).InsertJobHandler(0x4000a9c6c0, {0x5092510, 0x4083193308}, 0x40010b7680) \t/rudder-server/warehouse/source/http.go:37 +0x33c net/http.HandlerFunc.ServeHTTP(0x400082b9e0?, {0x5092510?, 0x4083193308?}, 0x0?) \t/usr/local/go/src/net/http/server.go:2294 +0x38 github.com/rudderlabs/rudder-server/warehouse/api.(*Api).addMasterEndpoints.func1.1.(*Api).logMiddleware.3({0x5092510, 0x4083193308}, 0x40010b7680) runtime error: index out of range [0] with length 0 ``` ## Linear Ticket - Resolves WAR-993 ## Security - [ ] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 11e930c commit e27b8c0

File tree

3 files changed

+57
-20
lines changed

3 files changed

+57
-20
lines changed

warehouse/internal/repo/source.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ func NewSource(db *sqlmw.DB, opts ...Opt) *Source {
5050
}
5151

5252
func (s *Source) Insert(ctx context.Context, sourceJobs []model.SourceJob) ([]int64, error) {
53+
if len(sourceJobs) == 0 {
54+
return nil, errors.New("empty sourceJobs")
55+
}
5356
defer (*repo)(s).TimerStat("insert", stats.Tags{
5457
"sourceId": sourceJobs[0].SourceID,
5558
"destId": sourceJobs[0].DestinationID,

warehouse/source/http_test.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/ory/dockertest/v3"
16+
"github.com/samber/lo"
1617

1718
"github.com/rudderlabs/rudder-go-kit/stats"
1819

@@ -130,12 +131,15 @@ func TestManager_InsertJobHandler(t *testing.T) {
130131
uploadsRepo := repo.NewUploads(db, repo.WithNow(func() time.Time {
131132
return now
132133
}))
133-
tableUploadsRepo := repo.NewTableUploads(db, config.New(), repo.WithNow(func() time.Time {
134+
tuRepo := repo.NewTableUploads(db, config.New(), repo.WithNow(func() time.Time {
134135
return now
135136
}))
136137
stagingRepo := repo.NewStagingFiles(db, config.New(), repo.WithNow(func() time.Time {
137138
return now
138139
}))
140+
srcRepo := repo.NewSource(db, repo.WithNow(func() time.Time {
141+
return now
142+
}))
139143

140144
stagingFile := model.StagingFile{
141145
WorkspaceID: workspaceID,
@@ -175,7 +179,7 @@ func TestManager_InsertJobHandler(t *testing.T) {
175179
}})
176180
require.NoError(t, err)
177181

178-
err = tableUploadsRepo.Insert(ctx, uploadID, []string{
182+
err = tuRepo.Insert(ctx, uploadID, []string{
179183
"test_table_1",
180184
"test_table_2",
181185
"test_table_3",
@@ -212,6 +216,26 @@ func TestManager_InsertJobHandler(t *testing.T) {
212216
require.NoError(t, err)
213217
require.Equal(t, "invalid payload: source_id is required\n", string(b))
214218
})
219+
t.Run("empty tables", func(t *testing.T) {
220+
req := httptest.NewRequest(http.MethodPost, "/v1/warehouse/jobs", bytes.NewReader([]byte(`
221+
{
222+
"source_id": "test_invalid_source_id",
223+
"destination_id": "test_invalid_destination_id",
224+
"job_run_id": "test_invalid_job_run_id",
225+
"task_run_id": "test_invalid_task_run_id",
226+
"async_job_type": "deletebyjobrunid"
227+
}
228+
`)))
229+
resp := httptest.NewRecorder()
230+
231+
sourceManager := New(config.New(), logger.NOP, stats.NOP, db, &mockPublisher{})
232+
sourceManager.InsertJobHandler(resp, req)
233+
require.Equal(t, http.StatusInternalServerError, resp.Code)
234+
235+
b, err := io.ReadAll(resp.Body)
236+
require.NoError(t, err)
237+
require.Equal(t, "can't insert source jobs\n", string(b))
238+
})
215239
t.Run("success", func(t *testing.T) {
216240
req := httptest.NewRequest(http.MethodPost, "/v1/warehouse/jobs", bytes.NewReader([]byte(`
217241
{
@@ -233,9 +257,12 @@ func TestManager_InsertJobHandler(t *testing.T) {
233257
require.NoError(t, err)
234258
require.Nil(t, insertResponse.Err)
235259
require.Len(t, insertResponse.JobIds, 5)
236-
})
237-
t.Run("exclude tables", func(t *testing.T) {
238-
// discards, merge rules and mapping tables should be excluded
260+
261+
sourceJobs, err := srcRepo.GetToProcess(ctx, 100)
262+
require.NoError(t, err)
263+
require.ElementsMatch(t, []string{"test_table_1", "test_table_2", "test_table_3", "test_table_4", "test_table_5"}, lo.Map(sourceJobs, func(item model.SourceJob, _ int) string {
264+
return item.TableName
265+
}))
239266
})
240267
}
241268

warehouse/source/source.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,25 +82,32 @@ func (m *Manager) InsertJobs(ctx context.Context, payload insertJobRequest) ([]i
8282
return nil, fmt.Errorf("getting table uploads: %w", err)
8383
}
8484

85-
tableNames := lo.Map(tableUploads, func(item model.TableUpload, index int) string {
86-
return item.TableName
87-
})
88-
tableNames = lo.Filter(lo.Uniq(tableNames), func(tableName string, i int) bool {
89-
switch strings.ToLower(tableName) {
90-
case whutils.DiscardsTable, whutils.IdentityMappingsTable, whutils.IdentityMergeRulesTable:
91-
return false
92-
default:
93-
return true
94-
}
95-
})
85+
// There is no need to create source jobs for discards and identity resolution tables.
86+
// Source jobs are basically used for deleting old data in case of Google Sheets for full sync and discards and identity resolution tables are de
87+
tableNames := lo.Filter(
88+
lo.Uniq(lo.Map(tableUploads, func(item model.TableUpload, _ int) string {
89+
return item.TableName
90+
})),
91+
func(tableName string, i int) bool {
92+
switch strings.ToLower(tableName) {
93+
case whutils.DiscardsTable, whutils.IdentityMappingsTable, whutils.IdentityMergeRulesTable:
94+
return false
95+
default:
96+
return true
97+
}
98+
},
99+
)
100+
if len(tableNames) == 0 {
101+
return nil, fmt.Errorf("no tables found for source: %s, destination: %s, job run: %s, task run: %s", payload.SourceID, payload.DestinationID, payload.JobRunID, payload.TaskRunID)
102+
}
96103

97104
type metadata struct {
98105
JobRunID string `json:"job_run_id"`
99106
TaskRunID string `json:"task_run_id"`
100107
JobType string `json:"jobtype"`
101108
StartTime time.Time `json:"start_time"`
102109
}
103-
metadataJson, err := jsonrs.Marshal(metadata{
110+
metadataJSON, err := jsonrs.Marshal(metadata{
104111
JobRunID: payload.JobRunID,
105112
TaskRunID: payload.TaskRunID,
106113
StartTime: payload.StartTime.Time,
@@ -109,20 +116,20 @@ func (m *Manager) InsertJobs(ctx context.Context, payload insertJobRequest) ([]i
109116
if err != nil {
110117
return nil, fmt.Errorf("marshalling metadata: %w", err)
111118
}
112-
jobIds, err := m.sourceRepo.Insert(ctx, lo.Map(tableNames, func(tableName string, _ int) model.SourceJob {
119+
jobIDs, err := m.sourceRepo.Insert(ctx, lo.Map(tableNames, func(tableName string, _ int) model.SourceJob {
113120
return model.SourceJob{
114121
SourceID: payload.SourceID,
115122
DestinationID: payload.DestinationID,
116123
WorkspaceID: payload.WorkspaceID,
117124
TableName: tableName,
118125
JobType: jobType,
119-
Metadata: metadataJson,
126+
Metadata: metadataJSON,
120127
}
121128
}))
122129
if err != nil {
123130
return nil, fmt.Errorf("inserting source jobs: %w", err)
124131
}
125-
return jobIds, nil
132+
return jobIDs, nil
126133
}
127134

128135
func (m *Manager) Run(ctx context.Context) error {

0 commit comments

Comments
 (0)