11const { Request, Response } = require ( 'minipass-fetch' )
22const Minipass = require ( 'minipass' )
3- const MinipassCollect = require ( 'minipass-collect' )
43const MinipassFlush = require ( 'minipass-flush' )
5- const MinipassPipeline = require ( 'minipass-pipeline' )
64const cacache = require ( 'cacache' )
75const url = require ( 'url' )
86
7+ const CachingMinipassPipeline = require ( '../pipeline.js' )
98const CachePolicy = require ( './policy.js' )
109const cacheKey = require ( './key.js' )
1110const remote = require ( '../remote.js' )
1211
1312const hasOwnProperty = ( obj , prop ) => Object . prototype . hasOwnProperty . call ( obj , prop )
1413
15- // maximum amount of data we will buffer into memory
16- // if we'll exceed this, we switch to streaming
17- const MAX_MEM_SIZE = 5 * 1024 * 1024 // 5MB
18-
1914// allow list for request headers that will be written to the cache index
2015// note: we will also store any request headers
2116// that are named in a response's vary header
@@ -256,13 +251,12 @@ class CacheEntry {
256251 }
257252
258253 const size = this . response . headers . get ( 'content-length' )
259- const fitsInMemory = ! ! size && Number ( size ) < MAX_MEM_SIZE
260- const shouldBuffer = this . options . memoize !== false && fitsInMemory
261254 const cacheOpts = {
262255 algorithms : this . options . algorithms ,
263256 metadata : getMetadata ( this . request , this . response , this . options ) ,
264257 size,
265- memoize : fitsInMemory && this . options . memoize ,
258+ integrity : this . options . integrity ,
259+ integrityEmitter : this . response . body . hasIntegrityEmitter && this . response . body ,
266260 }
267261
268262 let body = null
@@ -275,52 +269,31 @@ class CacheEntry {
275269 cacheWriteReject = reject
276270 } )
277271
278- body = new MinipassPipeline ( new MinipassFlush ( {
272+ body = new CachingMinipassPipeline ( { events : [ 'integrity' , 'size' ] } , new MinipassFlush ( {
279273 flush ( ) {
280274 return cacheWritePromise
281275 } ,
282276 } ) )
283-
284- let abortStream , onResume
285- if ( shouldBuffer ) {
286- // if the result fits in memory, use a collect stream to gather
287- // the response and write it to cacache while also passing it through
288- // to the user
289- onResume = ( ) => {
290- const collector = new MinipassCollect . PassThrough ( )
291- abortStream = collector
292- collector . on ( 'collect' , ( data ) => {
293- // TODO if the cache write fails, log a warning but return the response anyway
294- cacache . put ( this . options . cachePath , this . key , data , cacheOpts )
295- . then ( cacheWriteResolve , cacheWriteReject )
296- } )
297- body . unshift ( collector )
298- body . unshift ( this . response . body )
299- }
300- } else {
301- // if it does not fit in memory, create a tee stream and use
302- // that to pipe to both the cache and the user simultaneously
303- onResume = ( ) => {
304- const tee = new Minipass ( )
305- const cacheStream = cacache . put . stream ( this . options . cachePath , this . key , cacheOpts )
306- abortStream = cacheStream
307- tee . pipe ( cacheStream )
308- // TODO if the cache write fails, log a warning but return the response anyway
309- cacheStream . promise ( ) . then ( cacheWriteResolve , cacheWriteReject )
310- body . unshift ( tee )
311- body . unshift ( this . response . body )
312- }
277+ // this is always true since if we aren't reusing the one from the remote fetch, we
278+ // are using the one from cacache
279+ body . hasIntegrityEmitter = true
280+
281+ const onResume = ( ) => {
282+ const tee = new Minipass ( )
283+ const cacheStream = cacache . put . stream ( this . options . cachePath , this . key , cacheOpts )
284+ // re-emit the integrity and size events on our new response body so they can be reused
285+ cacheStream . on ( 'integrity' , i => body . emit ( 'integrity' , i ) )
286+ cacheStream . on ( 'size' , s => body . emit ( 'size' , s ) )
287+ // stick a flag on here so downstream users will know if they can expect integrity events
288+ tee . pipe ( cacheStream )
289+ // TODO if the cache write fails, log a warning but return the response anyway
290+ cacheStream . promise ( ) . then ( cacheWriteResolve , cacheWriteReject )
291+ body . unshift ( tee )
292+ body . unshift ( this . response . body )
313293 }
314294
315295 body . once ( 'resume' , onResume )
316296 body . once ( 'end' , ( ) => body . removeListener ( 'resume' , onResume ) )
317- this . response . body . on ( 'error' , ( err ) => {
318- // the abortStream will either be a MinipassCollect if we buffer
319- // or a cacache write stream, either way be sure to listen for
320- // errors from the actual response and avoid writing data that we
321- // know to be invalid to the cache
322- abortStream . destroy ( err )
323- } )
324297 } else {
325298 await cacache . index . insert ( this . options . cachePath , this . key , null , cacheOpts )
326299 }
@@ -331,7 +304,7 @@ class CacheEntry {
331304 // the header anyway
332305 this . response . headers . set ( 'x-local-cache' , encodeURIComponent ( this . options . cachePath ) )
333306 this . response . headers . set ( 'x-local-cache-key' , encodeURIComponent ( this . key ) )
334- this . response . headers . set ( 'x-local-cache-mode' , shouldBuffer ? 'buffer' : 'stream' )
307+ this . response . headers . set ( 'x-local-cache-mode' , 'stream' )
335308 this . response . headers . set ( 'x-local-cache-status' , status )
336309 this . response . headers . set ( 'x-local-cache-time' , new Date ( ) . toISOString ( ) )
337310 const newResponse = new Response ( body , {
@@ -346,9 +319,6 @@ class CacheEntry {
346319 // use the cached data to create a response and return it
347320 async respond ( method , options , status ) {
348321 let response
349- const size = Number ( this . response . headers . get ( 'content-length' ) )
350- const fitsInMemory = ! ! size && size < MAX_MEM_SIZE
351- const shouldBuffer = this . options . memoize !== false && fitsInMemory
352322 if ( method === 'HEAD' || [ 301 , 308 ] . includes ( this . response . status ) ) {
353323 // if the request is a HEAD, or the response is a redirect,
354324 // then the metadata in the entry already includes everything
@@ -358,66 +328,44 @@ class CacheEntry {
358328 // we're responding with a full cached response, so create a body
359329 // that reads from cacache and attach it to a new Response
360330 const body = new Minipass ( )
361- const removeOnResume = ( ) => body . removeListener ( 'resume' , onResume )
362- let onResume
363- if ( shouldBuffer ) {
364- onResume = async ( ) => {
365- removeOnResume ( )
366- try {
367- const content = await cacache . get . byDigest (
331+ const headers = { ...this . policy . responseHeaders ( ) }
332+ const onResume = ( ) => {
333+ const cacheStream = cacache . get . stream . byDigest (
334+ this . options . cachePath , this . entry . integrity , { memoize : this . options . memoize }
335+ )
336+ cacheStream . on ( 'error' , async ( err ) => {
337+ cacheStream . pause ( )
338+ if ( err . code === 'EINTEGRITY' ) {
339+ await cacache . rm . content (
368340 this . options . cachePath , this . entry . integrity , { memoize : this . options . memoize }
369341 )
370- body . end ( content )
371- } catch ( err ) {
372- if ( err . code === 'EINTEGRITY' ) {
373- await cacache . rm . content (
374- this . options . cachePath , this . entry . integrity , { memoize : this . options . memoize }
375- )
376- }
377- if ( err . code === 'ENOENT' || err . code === 'EINTEGRITY' ) {
378- await CacheEntry . invalidate ( this . request , this . options )
379- }
380- body . emit ( 'error' , err )
381342 }
382- }
383- } else {
384- onResume = ( ) => {
385- const cacheStream = cacache . get . stream . byDigest (
386- this . options . cachePath , this . entry . integrity , { memoize : this . options . memoize }
387- )
388- cacheStream . on ( 'error' , async ( err ) => {
389- cacheStream . pause ( )
390- if ( err . code === 'EINTEGRITY' ) {
391- await cacache . rm . content (
392- this . options . cachePath , this . entry . integrity , { memoize : this . options . memoize }
393- )
394- }
395- if ( err . code === 'ENOENT' || err . code === 'EINTEGRITY' ) {
396- await CacheEntry . invalidate ( this . request , this . options )
397- }
398- body . emit ( 'error' , err )
399- cacheStream . resume ( )
400- } )
401- cacheStream . pipe ( body )
402- }
343+ if ( err . code === 'ENOENT' || err . code === 'EINTEGRITY' ) {
344+ await CacheEntry . invalidate ( this . request , this . options )
345+ }
346+ body . emit ( 'error' , err )
347+ cacheStream . resume ( )
348+ } )
349+ // emit the integrity and size events based on our metadata so we're consistent
350+ body . emit ( 'integrity' , this . entry . integrity )
351+ body . emit ( 'size' , Number ( headers [ 'content-length' ] ) )
352+ cacheStream . pipe ( body )
403353 }
404354
405355 body . once ( 'resume' , onResume )
406- body . once ( 'end' , removeOnResume )
356+ body . once ( 'end' , ( ) => body . removeListener ( 'resume' , onResume ) )
407357 response = new Response ( body , {
408358 url : this . entry . metadata . url ,
409359 counter : options . counter ,
410360 status : 200 ,
411- headers : {
412- ...this . policy . responseHeaders ( ) ,
413- } ,
361+ headers,
414362 } )
415363 }
416364
417365 response . headers . set ( 'x-local-cache' , encodeURIComponent ( this . options . cachePath ) )
418366 response . headers . set ( 'x-local-cache-hash' , encodeURIComponent ( this . entry . integrity ) )
419367 response . headers . set ( 'x-local-cache-key' , encodeURIComponent ( this . key ) )
420- response . headers . set ( 'x-local-cache-mode' , shouldBuffer ? 'buffer' : 'stream' )
368+ response . headers . set ( 'x-local-cache-mode' , 'stream' )
421369 response . headers . set ( 'x-local-cache-status' , status )
422370 response . headers . set ( 'x-local-cache-time' , new Date ( this . entry . metadata . time ) . toUTCString ( ) )
423371 return response
0 commit comments