Skip to content

Commit aa7c2f1

Browse files
andrewhavckjohnhurt
authored andcommitted
Flush already received data if upstream write errors
1 parent 5b36a95 commit aa7c2f1

File tree

5 files changed

+70
-10
lines changed

5 files changed

+70
-10
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
fea06c09da4b7504dddd3273d88e14339455628c
1+
da81db0dcc6bb8aad2d01f0b6449accbeaef7013

pingora-proxy/src/proxy_h1.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ impl<SV> HttpProxy<SV> {
154154
{
155155
let mut request_done = false;
156156
let mut response_done = false;
157+
let mut send_error = None;
157158

158159
/* duplex mode, wait for either to complete */
159160
while !request_done || !response_done {
@@ -178,18 +179,28 @@ impl<SV> HttpProxy<SV> {
178179
Err(e) => {
179180
// Push the error to downstream and then quit
180181
// Don't care if send fails: downstream already gone
181-
let _ = tx.send(HttpTask::Failed(e.into_up())).await;
182+
let _ = tx.send(HttpTask::Failed(send_error.unwrap_or(e).into_up())).await;
182183
// Downstream should consume all remaining data and handle the error
183184
return Ok(())
184185
}
185186
}
186187
},
187188

188189
body = rx.recv(), if !request_done => {
189-
request_done = send_body_to1(client_session, body).await?;
190-
// An upgraded request is terminated when either side is done
191-
if request_done && client_session.is_upgrade_req() {
192-
response_done = true;
190+
match send_body_to1(client_session, body).await {
191+
Ok(send_done) => {
192+
request_done = send_done;
193+
// An upgraded request is terminated when either side is done
194+
if request_done && client_session.is_upgrade_req() {
195+
response_done = true;
196+
}
197+
},
198+
Err(e) => {
199+
debug!("send error, draining read buf: {e}");
200+
request_done = true;
201+
send_error = Some(e);
202+
continue
203+
}
193204
}
194205
},
195206

pingora-proxy/src/proxy_h2.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,18 @@ impl<SV> HttpProxy<SV> {
275275
}
276276
};
277277
let is_body_done = session.is_body_done();
278-
let request_done =
279-
self.send_body_to2(session, body, is_body_done, client_body, ctx)
280-
.await?;
281-
downstream_state.maybe_finished(request_done);
278+
match self.send_body_to2(session, body, is_body_done, client_body, ctx).await {
279+
Ok(request_done) => {
280+
downstream_state.maybe_finished(request_done);
281+
},
282+
Err(e) => {
283+
// mark request done, attempt to drain receive
284+
warn!("Upstream h2 body send error: {e}");
285+
// upstream is what actually errored but we don't want to continue
286+
// polling the downstream body
287+
downstream_state.to_errored();
288+
}
289+
};
282290
},
283291

284292
task = rx.recv(), if !response_state.upstream_done() => {

pingora-proxy/tests/test_upstream.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,36 @@ async fn test_connection_die() {
6464
assert!(body.is_err());
6565
}
6666

67+
#[tokio::test]
68+
async fn test_upload_connection_die() {
69+
init();
70+
let client = reqwest::Client::new();
71+
let res = client
72+
.post("http://127.0.0.1:6147/upload_connection_die/")
73+
.body("b".repeat(15 * 1024 * 1024)) // 15 MB upload
74+
.timeout(Duration::from_secs(5))
75+
.send()
76+
.await
77+
.unwrap();
78+
// should get 200 status before connection dies
79+
assert_eq!(res.status(), StatusCode::OK);
80+
let _ = res.text().await;
81+
82+
// try h2
83+
let client = reqwest::Client::new();
84+
let res = client
85+
.post("http://127.0.0.1:6147/upload_connection_die/")
86+
.body("b".repeat(15 * 1024 * 1024)) // 15 MB upload
87+
.timeout(Duration::from_secs(5))
88+
.header("x-h2", "true")
89+
.send()
90+
.await
91+
.unwrap();
92+
// should get 200 status before connection dies
93+
assert_eq!(res.status(), StatusCode::OK);
94+
let _ = res.text().await;
95+
}
96+
6797
#[tokio::test]
6898
async fn test_upload() {
6999
init();

pingora-proxy/tests/utils/conf/origin/conf/nginx.conf

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,17 @@ http {
299299
}
300300
}
301301

302+
location /upload_connection_die/ {
303+
content_by_lua_block {
304+
ngx.status = ngx.HTTP_OK
305+
ngx.print("")
306+
ngx.flush(true)
307+
308+
time.sleep(1)
309+
ngx.exit(444)
310+
}
311+
}
312+
302313
location /download/ {
303314
content_by_lua_block {
304315
ngx.req.read_body()

0 commit comments

Comments
 (0)