Skip to content

Commit 8be54bf

Browse files
chore: remove enableV2NotifierJob flag support (#6196)
1 parent c600968 commit 8be54bf

29 files changed

+273
-677
lines changed

integration_test/warehouse/warehouse_test.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,20 +93,14 @@ func TestMain(m *testing.M) {
9393
func TestUploads(t *testing.T) {
9494
t.Run("tracks loading", func(t *testing.T) {
9595
testCases := []struct {
96-
batchStagingFiles bool
97-
maxSizeInMB string
96+
maxSizeInMB string
9897
}{
99-
{batchStagingFiles: false},
100-
{batchStagingFiles: true, maxSizeInMB: "100"},
101-
{batchStagingFiles: true, maxSizeInMB: "0.00005"}, // Very low maxSizeInMB to ensure that staging files are not batched
98+
{maxSizeInMB: "100"},
99+
{maxSizeInMB: "0.00005"}, // Very low maxSizeInMB to ensure that staging files are not batched
102100
}
103101
for _, tc := range testCases {
104-
t.Run(fmt.Sprintf("batchStagingFiles: %t, maxSizeInMB: %s", tc.batchStagingFiles, tc.maxSizeInMB), func(t *testing.T) {
105-
if tc.batchStagingFiles {
106-
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
107-
108-
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.maxSizeInMB"), tc.maxSizeInMB)
109-
}
102+
t.Run(fmt.Sprintf("maxSizeInMB: %s", tc.maxSizeInMB), func(t *testing.T) {
103+
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.maxSizeInMB"), tc.maxSizeInMB)
110104
db, minioResource, whClient := setupServer(t, false, nil, nil)
111105

112106
var (

services/notifier/notifier.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ const (
3535
type JobType string
3636

3737
const (
38-
JobTypeUpload JobType = "upload"
3938
JobTypeUploadV2 JobType = "upload_v2"
4039
JobTypeAsync JobType = "async_job"
4140
)

services/notifier/notifier_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestNotifier(t *testing.T) {
4040
json.RawMessage(`{"id":"4"}`),
4141
json.RawMessage(`{"id":"5"}`),
4242
},
43-
JobType: notifier.JobTypeUpload,
43+
JobType: notifier.JobTypeUploadV2,
4444
UploadSchema: json.RawMessage(`{"UploadSchema": "1"}`),
4545
Priority: 50,
4646
}
@@ -125,7 +125,7 @@ func TestNotifier(t *testing.T) {
125125
json.RawMessage(`{"id":"4"}`),
126126
json.RawMessage(`{"id":"5"}`),
127127
},
128-
JobType: notifier.JobTypeUpload,
128+
JobType: notifier.JobTypeUploadV2,
129129
UploadSchema: json.RawMessage(`{"UploadSchema": "1"}`),
130130
Priority: 50,
131131
}
@@ -268,7 +268,7 @@ func TestNotifier(t *testing.T) {
268268

269269
publishRequest := &notifier.PublishRequest{
270270
Payloads: payloads,
271-
JobType: notifier.JobTypeUpload,
271+
JobType: notifier.JobTypeUploadV2,
272272
UploadSchema: json.RawMessage(`{"UploadSchema": "1"}`),
273273
Priority: 50,
274274
}
@@ -353,7 +353,7 @@ func TestNotifier(t *testing.T) {
353353

354354
publishRequest := &notifier.PublishRequest{
355355
Payloads: payloads,
356-
JobType: notifier.JobTypeUpload,
356+
JobType: notifier.JobTypeUploadV2,
357357
UploadSchema: json.RawMessage(`{"UploadSchema": "1"}`),
358358
Priority: 50,
359359
}
@@ -437,7 +437,7 @@ func TestNotifier(t *testing.T) {
437437

438438
publishRequest := &notifier.PublishRequest{
439439
Payloads: payloads,
440-
JobType: notifier.JobTypeUpload,
440+
JobType: notifier.JobTypeUploadV2,
441441
UploadSchema: json.RawMessage(`{"UploadSchema": "1"}`),
442442
Priority: 50,
443443
}

services/notifier/repo.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,13 @@ func scanJob(scan scanFn, job *Job) error {
300300
}
301301
if jobTypeRaw.Valid {
302302
switch jobTypeRaw.String {
303-
case string(JobTypeUpload), string(JobTypeAsync), string(JobTypeUploadV2):
303+
case string(JobTypeAsync), string(JobTypeUploadV2):
304304
job.Type = JobType(jobTypeRaw.String)
305305
default:
306306
return fmt.Errorf("scanning: unknown job type: %s", jobTypeRaw.String)
307307
}
308308
} else {
309-
job.Type = JobTypeUpload
309+
job.Type = JobTypeUploadV2
310310
}
311311
if errorRaw.Valid {
312312
job.Error = errors.New(errorRaw.String)

services/notifier/repo_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestRepo(t *testing.T) {
9797
json.RawMessage(`{"id":"4"}`),
9898
json.RawMessage(`{"id":"5"}`),
9999
},
100-
JobType: JobTypeUpload,
100+
JobType: JobTypeUploadV2,
101101
UploadSchema: json.RawMessage(`{"UploadSchema":"1"}`),
102102
Priority: 50,
103103
}
@@ -232,7 +232,7 @@ func TestRepo(t *testing.T) {
232232
Payloads: []json.RawMessage{
233233
json.RawMessage(`{"id":"11"}`),
234234
},
235-
JobType: JobTypeUpload,
235+
JobType: JobTypeUploadV2,
236236
Priority: 75,
237237
}
238238

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Drop staging_file_id column from wh_load_files table
2+
ALTER TABLE wh_load_files DROP COLUMN IF EXISTS staging_file_id;

warehouse/archive/archiver.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro
394394
hasUsedRudderStorage := a.usedRudderStorage(u.uploadMetadata)
395395

396396
// delete load file records
397-
if err := a.deleteLoadFileRecords(ctx, txn, stagingFileIDs, u.uploadID, hasUsedRudderStorage); err != nil {
397+
if err := a.deleteLoadFileRecords(ctx, txn, u.uploadID, hasUsedRudderStorage); err != nil {
398398
a.log.Errorn("[Archiver]: Error while deleting load file records for upload",
399399
obskit.UploadID(u.uploadID),
400400
obskit.Error(err),
@@ -490,20 +490,18 @@ func (a *Archiver) getStagingFilesData(
490490
func (a *Archiver) deleteLoadFileRecords(
491491
ctx context.Context,
492492
txn *sqlmw.Tx,
493-
stagingFileIDs []int64,
494493
uploadID int64,
495494
hasUsedRudderStorage bool,
496495
) error {
497496
stmt := fmt.Sprintf(`
498497
DELETE FROM %s
499-
WHERE staging_file_id = ANY($1)
500-
OR upload_id = $2
498+
WHERE upload_id = $1
501499
RETURNING location;`,
502500
pq.QuoteIdentifier(warehouseutils.WarehouseLoadFilesTable),
503501
)
504-
loadLocationRows, err := txn.QueryContext(ctx, stmt, pq.Array(stagingFileIDs), uploadID)
502+
loadLocationRows, err := txn.QueryContext(ctx, stmt, uploadID)
505503
if err != nil {
506-
return fmt.Errorf("cannot delete load files with staging_file_id = %+v: %w", stagingFileIDs, err)
504+
return fmt.Errorf("cannot delete load files with upload_id = %+v: %w", uploadID, err)
507505
}
508506

509507
defer func() { _ = loadLocationRows.Close() }()

warehouse/archive/testdata/dump.sql

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,32 +64,32 @@ VALUES
6464
NOW(), '{}', 4
6565
);
6666
INSERT INTO wh_load_files (
67-
id, staging_file_id, upload_id, location, source_id,
67+
id, upload_id, location, source_id,
6868
destination_id, destination_type,
6969
table_name, total_events, created_at,
7070
metadata
7171
)
7272
VALUES
7373
(
74-
1, 1, NULL, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
74+
1, 1, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
7575
'test-sourceID', 'test-destinationID',
7676
'POSTGRES', 'test-table', 1, NOW(),
7777
'{}'
7878
),
7979
(
80-
2, 2, 2, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
80+
2, 2, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
8181
'test-sourceID', 'test-destinationID',
8282
'POSTGRES', 'test-table', 1, NOW(),
8383
'{}'
8484
),
8585
(
86-
3, NULL, 3,'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
86+
3, 3,'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
8787
'test-sourceID', 'test-destinationID',
8888
'POSTGRES', 'test-table', 1, NOW(),
8989
'{}'
9090
),
9191
(
92-
4, NULL, 4,'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
92+
4, 4,'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz',
9393
'test-sourceID', 'test-destinationID',
9494
'POSTGRES', 'test-table', 1, NOW(),
9595
'{}'

warehouse/integrations/datalake/datalake_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func TestIntegration(t *testing.T) {
8888
destType string
8989
conf map[string]interface{}
9090
schemaTTLInMinutes int
91-
batchStagingFiles bool
9291
prerequisite func(t testing.TB, ctx context.Context)
9392
configOverride map[string]any
9493
verifySchema func(*testing.T, filemanager.FileManager, string)
@@ -111,7 +110,6 @@ func TestIntegration(t *testing.T) {
111110
"syncFrequency": "30",
112111
},
113112
schemaTTLInMinutes: 0,
114-
batchStagingFiles: true,
115113
prerequisite: func(t testing.TB, ctx context.Context) {
116114
t.Helper()
117115
createMinioBucket(t, ctx, minioEndpoint, s3AccessKeyID, s3AccessKey, s3BucketName, s3Region)
@@ -364,9 +362,6 @@ func TestIntegration(t *testing.T) {
364362
t.Setenv("STORAGE_EMULATOR_HOST", fmt.Sprintf("localhost:%d", c.Port("gcs", 4443)))
365363
t.Setenv("RSERVER_WORKLOAD_IDENTITY_TYPE", "GKE")
366364
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.schemaTTLInMinutes"), strconv.Itoa(tc.schemaTTLInMinutes))
367-
if tc.batchStagingFiles {
368-
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
369-
}
370365

371366
whth.BootstrapSvc(t, workspaceConfig, httpPort, jobsDBPort)
372367

0 commit comments

Comments
 (0)