@@ -9,10 +9,13 @@ import * as path from 'path';
9
9
import { ConfigService } from '@nestjs/config' ;
10
10
import * as fs from 'fs' ;
11
11
import { promises as fsPromises } from 'fs' ;
12
+ import { CACHE_DIR } from './constants' ;
13
+ import { FileRemoval } from './cleanup/removalUtils' ;
14
+ import * as kill from 'tree-kill' ;
12
15
13
16
export interface Job {
14
17
id : string ;
15
- status : 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' ;
18
+ status : 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal' ;
16
19
progress : number ;
17
20
outputPath : string ;
18
21
inputUrl : string ;
@@ -32,21 +35,23 @@ export class AppService {
32
35
private jobQueue : string [ ] = [ ] ;
33
36
private maxConcurrentJobs : number ;
34
37
private cacheDir : string ;
38
+ private immediateRemoval : boolean ;
35
39
36
40
constructor (
37
41
private logger : Logger ,
38
42
private configService : ConfigService ,
43
+ private readonly fileRemoval : FileRemoval
44
+
39
45
) {
40
- this . cacheDir = path . join ( process . cwd ( ) , 'cache' ) ;
46
+ this . cacheDir = CACHE_DIR ;
41
47
this . maxConcurrentJobs = this . configService . get < number > (
42
48
'MAX_CONCURRENT_JOBS' ,
43
49
1 ,
44
50
) ;
45
-
46
- // Ensure the cache directory exists
47
- if ( ! fs . existsSync ( this . cacheDir ) ) {
48
- fs . mkdirSync ( this . cacheDir , { recursive : true } ) ;
49
- }
51
+ this . immediateRemoval = this . configService . get < boolean > (
52
+ 'REMOVE_FILE_AFTER_RIGHT_DOWNLOAD' ,
53
+ true ,
54
+ ) ;
50
55
}
51
56
52
57
async downloadAndCombine (
@@ -92,7 +97,7 @@ export class AppService {
92
97
if ( ! deviceId ) {
93
98
return this . activeJobs ;
94
99
}
95
- return this . activeJobs . filter ( ( job ) => job . deviceId === deviceId ) ;
100
+ return this . activeJobs . filter ( ( job ) => job . deviceId === deviceId && job . status !== 'ready-for-removal' ) ;
96
101
}
97
102
98
103
async deleteCache ( ) : Promise < { message : string } > {
@@ -110,23 +115,74 @@ export class AppService {
110
115
}
111
116
}
112
117
118
+ removeJob ( jobId : string ) : void {
119
+ this . activeJobs = this . activeJobs . filter ( job => job . id !== jobId ) ;
120
+ this . logger . log ( `Job ${ jobId } removed.` ) ;
121
+ }
122
+
113
123
cancelJob ( jobId : string ) : boolean {
114
- const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
124
+ this . completeJob ( jobId ) ;
125
+ const job = this . activeJobs . find ( job => job . id === jobId ) ;
115
126
const process = this . ffmpegProcesses . get ( jobId ) ;
127
+
128
+ const finalizeJobRemoval = ( ) => {
129
+ if ( job ) {
130
+ this . jobQueue = this . jobQueue . filter ( id => id !== jobId ) ;
131
+
132
+ if ( this . immediateRemoval === true || job . progress < 100 ) {
133
+ this . fileRemoval . cleanupReadyForRemovalJobs ( [ job ] ) ;
134
+ this . activeJobs = this . activeJobs . filter ( activeJob => activeJob . id !== jobId ) ;
135
+ this . logger . log ( `Job ${ jobId } removed` ) ;
136
+ }
137
+ else {
138
+ this . logger . log ( 'Immediate removal is not allowed, cleanup service will take care in due time' )
139
+ }
140
+ }
141
+ this . checkQueue ( ) ;
142
+ } ;
143
+
116
144
if ( process ) {
117
- process . kill ( 'SIGKILL' ) ;
145
+ try {
146
+ this . logger . log ( `Attempting to kill process tree for PID ${ process . pid } ` ) ;
147
+ new Promise < void > ( ( resolve , reject ) => {
148
+ kill ( process . pid , 'SIGINT' , ( err ) => {
149
+ if ( err ) {
150
+ this . logger . error ( `Failed to kill process tree for PID ${ process . pid } : ${ err . message } ` ) ;
151
+ reject ( err ) ;
152
+ } else {
153
+ this . logger . log ( `Successfully killed process tree for PID ${ process . pid } ` ) ;
154
+ resolve ( ) ;
155
+ finalizeJobRemoval ( )
156
+ }
157
+ } ) ;
158
+ } ) ;
159
+ } catch ( err ) {
160
+ this . logger . error ( `Error terminating process for job ${ jobId } : ${ err . message } ` ) ;
161
+ }
118
162
this . ffmpegProcesses . delete ( jobId ) ;
163
+ return true ;
164
+ } else {
165
+ finalizeJobRemoval ( ) ;
166
+ return true ;
119
167
}
120
-
168
+ }
169
+
170
+ completeJob ( jobId : string ) :void {
171
+ const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
121
172
if ( job ) {
122
- this . jobQueue = this . jobQueue . filter ( ( id ) => id !== jobId ) ;
123
- this . activeJobs = this . activeJobs . filter ( ( job ) => job . id !== jobId ) ;
173
+ job . status = 'ready-for-removal' ;
174
+ job . timestamp = new Date ( )
175
+ this . logger . log ( `Job ${ jobId } marked as completed and ready for removal.` ) ;
176
+ } else {
177
+ this . logger . warn ( `Job ${ jobId } not found. Cannot mark as completed.` ) ;
124
178
}
179
+ }
125
180
126
- this . checkQueue ( ) ;
127
-
128
- this . logger . log ( `Job ${ jobId } canceled` ) ;
129
- return true ;
181
+ cleanupJob ( jobId : string ) : void {
182
+ const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
183
+ this . activeJobs = this . activeJobs . filter ( ( job ) => job . id !== jobId ) ;
184
+ this . ffmpegProcesses . delete ( jobId ) ;
185
+ this . videoDurations . delete ( jobId ) ;
130
186
}
131
187
132
188
getTranscodedFilePath ( jobId : string ) : string | null {
@@ -137,12 +193,6 @@ export class AppService {
137
193
return null ;
138
194
}
139
195
140
- cleanupJob ( jobId : string ) : void {
141
- this . activeJobs = this . activeJobs . filter ( ( job ) => job . id !== jobId ) ;
142
- this . ffmpegProcesses . delete ( jobId ) ;
143
- this . videoDurations . delete ( jobId ) ;
144
- }
145
-
146
196
getMaxConcurrentJobs ( ) : number {
147
197
return this . maxConcurrentJobs ;
148
198
}
@@ -219,24 +269,29 @@ export class AppService {
219
269
}
220
270
221
271
private checkQueue ( ) {
222
- const runningJobs = Array . from ( this . activeJobs . values ( ) ) . filter (
223
- ( job ) => job . status === 'optimizing' ,
224
- ) . length ;
272
+ let runningJobs = this . activeJobs . filter ( ( job ) => job . status === 'optimizing' )
273
+ . length ;
225
274
226
275
while ( runningJobs < this . maxConcurrentJobs && this . jobQueue . length > 0 ) {
227
276
const nextJobId = this . jobQueue . shift ( ) ;
228
277
if ( nextJobId ) {
229
278
this . startJob ( nextJobId ) ;
279
+ runningJobs ++ ; // Now we track the newly started job
230
280
}
231
281
}
232
282
}
233
283
284
+
234
285
private startJob ( jobId : string ) {
235
286
const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
236
287
if ( job ) {
237
288
job . status = 'optimizing' ;
238
289
const ffmpegArgs = this . getFfmpegArgs ( job . inputUrl , job . outputPath ) ;
239
- this . startFFmpegProcess ( jobId , ffmpegArgs ) ;
290
+ this . startFFmpegProcess ( jobId , ffmpegArgs )
291
+ . finally ( ( ) => {
292
+ // This runs after the returned Promise resolves or rejects.
293
+ this . checkQueue ( ) ;
294
+ } ) ;
240
295
this . logger . log ( `Started job ${ jobId } ` ) ;
241
296
}
242
297
}
@@ -263,20 +318,19 @@ export class AppService {
263
318
await this . getVideoDuration ( ffmpegArgs [ 1 ] , jobId ) ;
264
319
265
320
return new Promise ( ( resolve , reject ) => {
266
- const ffmpegProcess = spawn ( 'ffmpeg' , ffmpegArgs ) ;
321
+ const ffmpegProcess = spawn ( 'ffmpeg' , ffmpegArgs , { stdio : [ 'pipe' , 'pipe' , 'pipe' ] } ) ;
267
322
this . ffmpegProcesses . set ( jobId , ffmpegProcess ) ;
268
323
269
324
ffmpegProcess . stderr . on ( 'data' , ( data ) => {
270
325
this . updateProgress ( jobId , data . toString ( ) ) ;
271
326
} ) ;
272
-
327
+
273
328
ffmpegProcess . on ( 'close' , async ( code ) => {
274
329
this . ffmpegProcesses . delete ( jobId ) ;
275
330
this . videoDurations . delete ( jobId ) ;
276
331
277
332
const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
278
333
if ( ! job ) {
279
- // Job was cancelled and removed, just resolve
280
334
resolve ( ) ;
281
335
return ;
282
336
}
@@ -320,12 +374,10 @@ export class AppService {
320
374
if ( job ) {
321
375
job . status = 'failed' ;
322
376
}
323
- } finally {
324
- // Check queue after job completion or failure
325
- this . checkQueue ( ) ;
326
377
}
327
378
}
328
379
380
+
329
381
private async getVideoDuration (
330
382
inputUrl : string ,
331
383
jobId : string ,
0 commit comments