-
-
Notifications
You must be signed in to change notification settings - Fork 675
Closed
Description
I started working on a caching dispatcher + handler. Thought I'd post it here in case anyone is interested in working on something like that.
We only needed to cache redirects so I kind of stopped there.
import stream from 'node:stream'
import { LRUCache } from 'lru-cache'
import cacheControlParser from 'cache-control-parser'
class CacheHandler {
constructor({ key, handler, store }) {
this.entry = null
this.key = key
this.handler = handler
this.store = store
this.abort = null
this.resume = null
}
onConnect(abort) {
this.abort = abort
return this.handler.onConnect(abort)
}
onHeaders(statusCode, rawHeaders, resume, statusMessage) {
this.resume = resume
// TODO (fix): Check if content-length fits in cache...
let cacheControl
for (let n = 0; n < rawHeaders.length; n += 2) {
if (
rawHeaders[n].length === 'cache-control'.length &&
rawHeaders[n].toString().toLowerCase() === 'cache-control'
) {
cacheControl = cacheControlParser.parse(rawHeaders[n + 1].toString())
break
}
}
if (
cacheControl &&
cacheControl.public &&
!cacheControl.private &&
!cacheControl['no-store'] &&
// TODO (fix): Support all cache control directives...
// !opts.headers['no-transform'] &&
!cacheControl['no-cache'] &&
!cacheControl['must-understand'] &&
!cacheControl['must-revalidate'] &&
!cacheControl['proxy-revalidate']
) {
const maxAge = cacheControl['s-max-age'] ?? cacheControl['max-age']
const ttl = cacheControl.immutable
? 31556952 // 1 year
: Number(maxAge)
if (ttl > 0) {
this.entry = this.store.create(this.key, ttl * 1e3)
this.entry.statusCode = statusCode
this.entry.statusMessage = statusMessage
this.entry.rawHeaders = rawHeaders
this.entry.on('drain', resume)
this.entry.on('error', this.abort)
}
}
return this.handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
}
onData(chunk) {
let ret = true
if (this.entry) {
this.entry.size += chunk.bodyLength
if (this.entry.size > this.store.maxEntrySize) {
this.entry.destroy()
this.entry = null
} else {
ret = this.entry.write(chunk)
}
}
return this.handler.onData(chunk) !== false && ret !== false
}
onComplete(rawTrailers) {
if (this.entry) {
this.entry.rawTrailers = rawTrailers
this.entry.end()
}
return this.handler.onComplete(rawTrailers)
}
onError(err) {
if (this.entry) {
this.entry.destroy(err)
}
return this.handler.onError(err)
}
}
// TODO (fix): Filsystem backed cache...
class CacheStore {
constructor({ maxSize, maxEntrySize }) {
this.maxSize = maxSize
this.maxEntrySize = maxEntrySize
this.cache = new LRUCache({
maxSize,
sizeCalculation: (value) => value.body.byteLength,
})
}
create (key, ttl) {
const entry = Object.assign(new stream.PassThrough(), {
statusCode: null,
statusMessage: null,
rawHeaders: null,
rawTrailers: null,
size: 0,
}).on('finish', () => {
this.cache.set(key, entry, ttl)
})
return entry
}
get(key, callback) {
callback(null, this.cache.get(key))
}
}
export class CacheDispatcher {
constructor(dispatcher, { maxSize = 0, maxEntrySize = maxSize / 10 }) {
this.dispatcher = dispatcher
this.store = new CacheStore({ maxSize, maxEntrySize })
}
dispatch(opts, handler) {
if (opts.headers?.['cache-control'] || opts.headers?.authorization) {
// TODO (fix): Support all cache control directives...
// const cacheControl = cacheControlParser.parse(opts.headers['cache-control'])
// cacheControl['no-cache']
// cacheControl['no-store']
// cacheControl['max-age']
// cacheControl['max-stale']
// cacheControl['min-fresh']
// cacheControl['no-transform']
// cacheControl['only-if-cached']
this.dispatcher.dispatch(opts, handler)
return
}
// TODO (fix): Support all methods?
if (opts.method !== 'GET' && opts.method !== 'HEAD') {
this.dispatcher.dispatch(opts, handler)
return
}
// TODO (fix): Support body?
opts.body.resume()
// TODO (fix): How to generate key?
const key = `${opts.method}:${opts.path}`
this.store.get(key, (err, value) => {
if (err) {
// TODO (fix): How to handle cache errors?
this.handler.onError(err)
} else if (value) {
const { statusCode, statusMessage, rawHeaders, rawTrailers } = value
const ac = new AbortController()
const signal = ac.signal
let _resume = null
const resume = () => {
_resume?.(null)
_resume = null
}
const abort = () => {
ac.abort()
resume()
}
try {
handler.onConnect(abort)
signal.throwIfAborted()
handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
signal.throwIfAborted()
stream.pipeline(
value,
new stream.Writable({
signal,
write(chunk, encoding, callback) {
try {
if (handler.onData(chunk) === false) {
_resume = callback
} else {
callback(null)
}
} catch (err) {
callback(err)
}
},
}),
(err) => {
if (err) {
handler.onError(err)
} else {
handler.onComplete(rawTrailers)
}
}
)
} catch (err) {
handler.onError(err)
}
} else {
this.dispatcher.dispatch(opts, new CacheHandler({ handler, store: this.store, key }))
}
})
}
}metcoder95, Ethan-Arrowood, styfle, ci010 and trivikr
Metadata
Metadata
Assignees
Labels
No labels