Skip to content

Commit c82fb6b

Browse files
Allow modules to receive HttpTask::Done, flush response compression
This change adds a `response_done_filter` hook for downstream HTTP modules to handle the HttpTask::Done type, which may be sent in lieu of a body + end flag, for example in certain caching request paths. In the `ResponseCompression` module this could manifest as un-flushed body bytes. This change duplicates the `response_filter` logic currently used for upstream response compression.
1 parent 5dc2501 commit c82fb6b

File tree

8 files changed

+155
-6
lines changed

8 files changed

+155
-6
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
f95698e4cc0825eab85a3c515914eb7c24c7ea83
1+
0cc5dbe2652b7f8ebe8ac981b3360c2f0966eeee

pingora-core/src/modules/http/compression.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ impl HttpModule for ResponseCompression {
7272
}
7373
Ok(())
7474
}
75+
76+
fn response_done_filter(&mut self) -> Result<Option<Bytes>> {
77+
if !self.0.is_enabled() {
78+
return Ok(None);
79+
}
80+
// Flush or finish any remaining encoded bytes upon HTTP response completion
81+
// (if it was not already ended in the body filter).
82+
Ok(self.0.response_body_filter(None, true))
83+
}
7584
}
7685

7786
/// The builder for HTTP response compression module

pingora-core/src/modules/http/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ pub trait HttpModule {
7272
Ok(None)
7373
}
7474

75+
fn response_done_filter(&mut self) -> Result<Option<Bytes>> {
76+
Ok(None)
77+
}
78+
7579
fn as_any(&self) -> &dyn Any;
7680
fn as_any_mut(&mut self) -> &mut dyn Any;
7781
}
@@ -258,6 +262,24 @@ impl HttpModuleCtx {
258262
}
259263
Ok(encoded)
260264
}
265+
266+
/// Run the `response_done_filter` for all the modules according to their orders.
267+
///
268+
/// This filter may be invoked in certain response paths to signal end of response
269+
/// if not already done so via trailers or body (with end flag set).
270+
///
271+
/// Returns an `Option<Bytes>` which can be used to write additional response body
272+
/// bytes. Note, if multiple modules attempt to write body bytes, only the last one
273+
/// will be used.
274+
pub fn response_done_filter(&mut self) -> Result<Option<Bytes>> {
275+
let mut encoded = None;
276+
for filter in self.module_ctx.iter_mut() {
277+
if let Some(buf) = filter.response_done_filter()? {
278+
encoded = Some(buf);
279+
}
280+
}
281+
Ok(encoded)
282+
}
261283
}
262284

263285
#[cfg(test)]

pingora-core/src/protocols/http/compression/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use super::HttpTask;
2020

2121
use bytes::Bytes;
22-
use log::warn;
22+
use log::{debug, warn};
2323
use pingora_error::{ErrorType, Result};
2424
use pingora_http::{RequestHeader, ResponseHeader};
2525
use std::time::Duration;
@@ -248,6 +248,7 @@ impl ResponseCompressionCtx {
248248
}
249249

250250
let action = decide_action(resp, accept_encoding);
251+
debug!("compression action: {action:?}");
251252
let (encoder, preserve_etag) = match action {
252253
Action::Noop => (None, false),
253254
Action::Compress(algorithm) => {
@@ -271,9 +272,10 @@ impl ResponseCompressionCtx {
271272
}
272273
}
273274

274-
/// Stream the response body chunks into this ctx. The return value will be the compressed data
275+
/// Stream the response body chunks into this ctx. The return value will be the compressed
276+
/// data.
275277
///
276-
/// Return None if the compressed is not enabled
278+
/// Return None if compression is not enabled.
277279
pub fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
278280
match &mut self.0 {
279281
CtxInner::HeaderPhase { .. } => panic!("Wrong phase: HeaderPhase"),

pingora-proxy/src/lib.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,20 @@ impl Session {
431431
*task = HttpTask::Body(Some(buf), true);
432432
}
433433
}
434-
_ => { /* Done or Failed */ }
434+
HttpTask::Done => {
435+
// `Done` can be sent in certain response paths to mark end
436+
// of response if not already done via trailers or body with
437+
// end flag set.
438+
// If the filter returns body bytes on Done,
439+
// write them into the response.
440+
//
441+
// Note, this will not work if end of stream has already
442+
// been seen or we've written content-length bytes.
443+
if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
444+
*task = HttpTask::Body(Some(buf), true);
445+
}
446+
}
447+
_ => { /* Failed */ }
435448
}
436449
}
437450
self.downstream_session.response_duplex_vec(tasks).await

pingora-proxy/tests/test_upstream.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,53 @@ mod test_cache {
342342
assert_eq!(res.text().await.unwrap(), "no if headers detected\n");
343343
}
344344

345+
#[tokio::test]
346+
async fn test_cache_downstream_compression() {
347+
init();
348+
349+
// disable reqwest gzip support to check compression headers and body
350+
// otherwise reqwest will decompress and strip the headers
351+
let client = reqwest::ClientBuilder::new().gzip(false).build().unwrap();
352+
let res = client
353+
.get("http://127.0.0.1:6148/unique/test_cache_downstream_compression/no_compression")
354+
.header("x-downstream-compression", "1")
355+
.header("accept-encoding", "gzip")
356+
.send()
357+
.await
358+
.unwrap();
359+
assert_eq!(res.status(), StatusCode::OK);
360+
let headers = res.headers();
361+
assert_eq!(headers["Content-Encoding"], "gzip");
362+
assert_eq!(headers["x-cache-status"], "miss");
363+
let body = res.bytes().await.unwrap();
364+
assert!(body.len() < 32);
365+
}
366+
367+
#[tokio::test]
368+
async fn test_cache_downstream_decompression() {
369+
init();
370+
371+
// disable reqwest gzip support to check compression headers and body
372+
// otherwise reqwest will decompress and strip the headers
373+
let client = reqwest::ClientBuilder::new().gzip(false).build().unwrap();
374+
let res = client
375+
.get("http://127.0.0.1:6148/unique/test_cache_downstream_decompression/gzip/index.html")
376+
.header("x-downstream-decompression", "1")
377+
.header("x-upstream-accept-encoding", "gzip")
378+
.send()
379+
.await
380+
.unwrap();
381+
382+
assert_eq!(res.status(), StatusCode::OK);
383+
let headers = res.headers();
384+
// upstream should have received gzip, should decompress for downstream
385+
assert_eq!(headers["received-accept-encoding"], "gzip");
386+
assert!(headers.get("Content-Encoding").is_none());
387+
assert_eq!(headers["x-cache-status"], "miss");
388+
let body = res.bytes().await.unwrap();
389+
assert_eq!(body, "Hello World!\n");
390+
}
391+
345392
#[tokio::test]
346393
async fn test_network_error_mid_response() {
347394
init();

pingora-proxy/tests/utils/conf/origin/conf/nginx.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,14 @@ http {
420420
}
421421
}
422422

423+
location /gzip {
424+
alias ./html;
425+
gzip on;
426+
gzip_min_length 0;
427+
gzip_types *;
428+
add_header received-accept-encoding $http_accept_encoding;
429+
}
430+
423431
location /sleep {
424432
rewrite_by_lua_block {
425433
local sleep_sec = tonumber(ngx.var.http_x_set_sleep) or 1

pingora-proxy/tests/utils/server_utils.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use super::cert;
1717
use async_trait::async_trait;
1818
use clap::Parser;
19-
use http::header::VARY;
19+
use http::header::{ACCEPT_ENCODING, VARY};
2020
use http::HeaderValue;
2121
use once_cell::sync::Lazy;
2222
use pingora_cache::cache_control::CacheControl;
@@ -350,6 +350,38 @@ impl ProxyHttp for ExampleProxyCache {
350350
}
351351
}
352352

353+
async fn early_request_filter(
354+
&self,
355+
session: &mut Session,
356+
_ctx: &mut Self::CTX,
357+
) -> Result<()> {
358+
if session
359+
.req_header()
360+
.headers
361+
.get("x-downstream-compression")
362+
.is_some()
363+
{
364+
session
365+
.downstream_modules_ctx
366+
.get_mut::<ResponseCompression>()
367+
.unwrap()
368+
.adjust_level(6);
369+
}
370+
if session
371+
.req_header()
372+
.headers
373+
.get("x-downstream-decompression")
374+
.is_some()
375+
{
376+
session
377+
.downstream_modules_ctx
378+
.get_mut::<ResponseCompression>()
379+
.unwrap()
380+
.adjust_decompression(true);
381+
}
382+
Ok(())
383+
}
384+
353385
async fn upstream_peer(
354386
&self,
355387
session: &mut Session,
@@ -474,6 +506,22 @@ impl ProxyHttp for ExampleProxyCache {
474506
key.finalize()
475507
}
476508

509+
async fn upstream_request_filter(
510+
&self,
511+
session: &mut Session,
512+
upstream_request: &mut RequestHeader,
513+
_ctx: &mut Self::CTX,
514+
) -> Result<()> {
515+
if let Some(up_accept_encoding) = session
516+
.req_header()
517+
.headers
518+
.get("x-upstream-accept-encoding")
519+
{
520+
upstream_request.insert_header(&ACCEPT_ENCODING, up_accept_encoding)?;
521+
}
522+
Ok(())
523+
}
524+
477525
fn response_cache_filter(
478526
&self,
479527
_session: &Session,

0 commit comments

Comments
 (0)