Skip to content

Commit b6f24ff

Browse files
committed
Poll for H2 capacity before sending H2 body.
This change stops data being buffered while the remote peer lacks the capacity to receive so that the back pressure is propagated. All h2 body write functions are async now.
1 parent f697eee commit b6f24ff

File tree

7 files changed

+53
-30
lines changed

7 files changed

+53
-30
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
81c56d987be33640906fb081facd6af46dc23721
1+
c57df409e909c786e37f46868146e711025a2b34

pingora-core/src/protocols/http/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl HttpSession {
6262
h1.write_body(&data).await?;
6363
Ok(())
6464
}
65-
HttpSession::H2(h2) => h2.write_request_body(data, end),
65+
HttpSession::H2(h2) => h2.write_request_body(data, end).await,
6666
}
6767
}
6868

pingora-core/src/protocols/http/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl Session {
143143
s.write_body(&data).await?;
144144
Ok(())
145145
}
146-
Self::H2(s) => s.write_body(data, end),
146+
Self::H2(s) => s.write_body(data, end).await,
147147
}
148148
}
149149

@@ -175,7 +175,7 @@ impl Session {
175175
pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
176176
match self {
177177
Self::H1(s) => s.response_duplex_vec(tasks).await,
178-
Self::H2(s) => s.response_duplex_vec(tasks),
178+
Self::H2(s) => s.response_duplex_vec(tasks).await,
179179
}
180180
}
181181

pingora-core/src/protocols/http/v2/client.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl Http2Session {
122122
}
123123

124124
/// Write a request body chunk
125-
pub fn write_request_body(&mut self, data: Bytes, end: bool) -> Result<()> {
125+
pub async fn write_request_body(&mut self, data: Bytes, end: bool) -> Result<()> {
126126
if self.ended {
127127
warn!("Try to write request body after end of stream, dropping the extra data");
128128
return Ok(());
@@ -133,7 +133,9 @@ impl Http2Session {
133133
.as_mut()
134134
.expect("Try to write request body before sending request header");
135135

136-
write_body(body_writer, data, end).map_err(|e| self.handle_err(e))?;
136+
super::write_body(body_writer, data, end)
137+
.await
138+
.map_err(|e| self.handle_err(e))?;
137139
self.ended = self.ended || end;
138140
Ok(())
139141
}
@@ -404,15 +406,6 @@ impl Http2Session {
404406
}
405407
}
406408

407-
/// A helper function to write the request body
408-
pub fn write_body(send_body: &mut SendStream<Bytes>, data: Bytes, end: bool) -> Result<()> {
409-
let data_len = data.len();
410-
send_body.reserve_capacity(data_len);
411-
send_body
412-
.send_data(data, end)
413-
.or_err(WriteError, "while writing h2 request body")
414-
}
415-
416409
/* helper functions */
417410

418411
/* Types of errors during h2 header read

pingora-core/src/protocols/http/v2/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,33 @@
1414

1515
//! HTTP/2 implementation
1616
17+
use crate::{Error, ErrorType::*, OrErr, Result};
18+
use bytes::Bytes;
19+
use h2::SendStream;
20+
1721
pub mod client;
1822
pub mod server;
23+
24+
/// A helper function to write the body of h2 streams.
25+
pub async fn write_body(writer: &mut SendStream<Bytes>, data: Bytes, end: bool) -> Result<()> {
26+
let mut remaining = data;
27+
// Note: this loop allows to send the data frame even if data is empty in order to signal
28+
// END_OF_STREAM to the peer.
29+
loop {
30+
writer.reserve_capacity(remaining.len());
31+
match std::future::poll_fn(|cx| writer.poll_capacity(cx)).await {
32+
None => return Error::e_explain(H2Error, "cannot reserve capacity"),
33+
Some(ready) => {
34+
let n = ready.or_err(H2Error, "while waiting for capacity")?;
35+
let remaining_size = remaining.len();
36+
let data_to_send = remaining.split_to(std::cmp::min(remaining_size, n));
37+
writer
38+
.send_data(data_to_send, remaining.is_empty() && end)
39+
.or_err(WriteError, "while writing h2 request body")?;
40+
if remaining.is_empty() {
41+
return Ok(());
42+
}
43+
}
44+
}
45+
}
46+
}

pingora-core/src/protocols/http/v2/server.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ impl HttpSession {
232232
}
233233

234234
/// Write response body to the client. See [Self::write_response_header] for how to use `end`.
235-
pub fn write_body(&mut self, data: Bytes, end: bool) -> Result<()> {
235+
pub async fn write_body(&mut self, data: Bytes, end: bool) -> Result<()> {
236236
if self.ended {
237237
// NOTE: in h1, we also track to see if content-length matches the data
238238
// We have not tracked that in h2
@@ -246,11 +246,9 @@ impl HttpSession {
246246
));
247247
};
248248
let data_len = data.len();
249-
writer.reserve_capacity(data_len);
250-
writer.send_data(data, end).or_err(
251-
ErrorType::WriteError,
252-
"while writing h2 response body to downstream",
253-
)?;
249+
super::write_body(writer, data, end)
250+
.await
251+
.map_err(|e| e.into_down())?;
254252
self.body_sent += data_len;
255253
self.ended = self.ended || end;
256254
Ok(())
@@ -308,7 +306,7 @@ impl HttpSession {
308306
Ok(())
309307
}
310308

311-
pub fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
309+
pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
312310
let mut end_stream = false;
313311
for task in tasks.into_iter() {
314312
end_stream = match task {
@@ -320,7 +318,7 @@ impl HttpSession {
320318
HttpTask::Body(data, end) => match data {
321319
Some(d) => {
322320
if !d.is_empty() {
323-
self.write_body(d, end).map_err(|e| e.into_down())?;
321+
self.write_body(d, end).await.map_err(|e| e.into_down())?;
324322
}
325323
end
326324
}
@@ -567,7 +565,7 @@ mod test {
567565
}
568566

569567
// end: false here to verify finish() closes the stream nicely
570-
http.write_body(server_body.into(), false).unwrap();
568+
http.write_body(server_body.into(), false).await.unwrap();
571569
assert_eq!(http.body_bytes_sent(), 16);
572570

573571
http.write_trailers(trailers).unwrap();
@@ -639,7 +637,7 @@ mod test {
639637
.write_response_header(response_header.clone(), false)
640638
.is_ok());
641639

642-
http.write_body(server_body.into(), false).unwrap();
640+
http.write_body(server_body.into(), false).await.unwrap();
643641
assert_eq!(http.body_bytes_sent(), 16);
644642

645643
// 3. Waiting for the reset from the client
@@ -711,7 +709,7 @@ mod test {
711709
.write_response_header(response_header.clone(), false)
712710
.is_ok());
713711

714-
http.write_body(server_body.into(), false).unwrap();
712+
http.write_body(server_body.into(), false).await.unwrap();
715713
assert_eq!(http.body_bytes_sent(), 16);
716714

717715
// 3. Waiting for the client to close stream.

pingora-proxy/src/proxy_h2.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use super::*;
1616
use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache};
1717
use crate::proxy_common::*;
1818
use http::{header::CONTENT_LENGTH, Method, StatusCode};
19-
use pingora_core::protocols::http::v2::client::{write_body, Http2Session};
19+
use pingora_core::protocols::http::v2::{client::Http2Session, write_body};
2020

2121
// add scheme and authority as required by h2 lib
2222
fn update_h2_scheme_authority(
@@ -150,7 +150,7 @@ impl<SV> HttpProxy<SV> {
150150

151151
if !send_end_stream && body_empty {
152152
// send END_STREAM on empty DATA frame
153-
match client_session.write_request_body(Bytes::new(), true) {
153+
match client_session.write_request_body(Bytes::new(), true).await {
154154
Ok(()) => debug!("sent empty DATA frame to h2"),
155155
Err(e) => {
156156
return (false, Some(e.into_up()));
@@ -547,11 +547,15 @@ impl<SV> HttpProxy<SV> {
547547

548548
if let Some(data) = data {
549549
debug!("Write {} bytes body to h2 upstream", data.len());
550-
write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?;
550+
write_body(client_body, data, end_of_body)
551+
.await
552+
.map_err(|e| e.into_up())?;
551553
} else {
552554
debug!("Read downstream body done");
553555
/* send a standalone END_STREAM flag */
554-
write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?;
556+
write_body(client_body, Bytes::new(), true)
557+
.await
558+
.map_err(|e| e.into_up())?;
555559
}
556560

557561
Ok(end_of_body)

0 commit comments

Comments
 (0)