Skip to content

Commit acd1b05

Browse files
authored
feat: add reqwest::retry policies (#2763)
This adds retry policies, somewhat kind of like redirect policies. This includes a new `ClientBuilder::retry()` method, and a `reqwest::retry` module to build a retry policy. ## Example ```rust let retries = reqwest::retry::for_host("api.github.com") .classify_fn(|req_rep| { if req_rep.status() == Some(SERVICE_UNAVAILABLE) { req_rep.retryable() } else { req_rep.success() } }) ``` Closes #316 Closes #2577
1 parent 54b6022 commit acd1b05

File tree

9 files changed

+708
-265
lines changed

9 files changed

+708
-265
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ h2 = { version = "0.4", optional = true }
134134
log = "0.4.17"
135135
percent-encoding = "2.3"
136136
tokio = { version = "1.0", default-features = false, features = ["net", "time"] }
137-
tower = { version = "0.5.2", default-features = false, features = ["timeout", "util"] }
137+
tower = { version = "0.5.2", default-features = false, features = ["retry", "timeout", "util"] }
138138
tower-http = { version = "0.6.5", default-features = false, features = ["follow-redirect"] }
139139
pin-project-lite = "0.2.11"
140140

src/async_impl/body.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,6 @@ impl Body {
154154
}
155155
}
156156

157-
pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
158-
let reuse = match self.inner {
159-
Inner::Reusable(ref chunk) => Some(chunk.clone()),
160-
Inner::Streaming { .. } => None,
161-
};
162-
163-
(reuse, self)
164-
}
165-
166157
pub(crate) fn try_clone(&self) -> Option<Body> {
167158
match self.inner {
168159
Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),

src/async_impl/client.rs

Lines changed: 69 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#[cfg(any(feature = "native-tls", feature = "__rustls",))]
22
use std::any::Any;
3-
#[cfg(feature = "http2")]
4-
use std::error::Error;
53
use std::future::Future;
64
use std::net::IpAddr;
75
use std::pin::Pin;
@@ -45,7 +43,6 @@ use crate::Certificate;
4543
use crate::Identity;
4644
use crate::{IntoUrl, Method, Proxy, Url};
4745

48-
use bytes::Bytes;
4946
use http::header::{
5047
Entry, HeaderMap, HeaderValue, ACCEPT, ACCEPT_ENCODING, PROXY_AUTHORIZATION, RANGE, USER_AGENT,
5148
};
@@ -176,6 +173,7 @@ struct Config {
176173
proxies: Vec<ProxyMatcher>,
177174
auto_sys_proxy: bool,
178175
redirect_policy: redirect::Policy,
176+
retry_policy: crate::retry::Builder,
179177
referer: bool,
180178
read_timeout: Option<Duration>,
181179
timeout: Option<Duration>,
@@ -300,6 +298,7 @@ impl ClientBuilder {
300298
proxies: Vec::new(),
301299
auto_sys_proxy: true,
302300
redirect_policy: redirect::Policy::default(),
301+
retry_policy: crate::retry::Builder::default(),
303302
referer: true,
304303
read_timeout: None,
305304
timeout: None,
@@ -999,14 +998,18 @@ impl ClientBuilder {
999998
hyper: hyper_client,
1000999
};
10011000

1002-
let policy = {
1001+
let redirect_policy = {
10031002
let mut p = TowerRedirectPolicy::new(config.redirect_policy);
10041003
p.with_referer(config.referer)
10051004
.with_https_only(config.https_only);
10061005
p
10071006
};
10081007

1009-
let hyper = FollowRedirect::with_policy(hyper_service, policy.clone());
1008+
let retry_policy = config.retry_policy.into_policy();
1009+
1010+
let retries = tower::retry::Retry::new(retry_policy.clone(), hyper_service);
1011+
1012+
let hyper = FollowRedirect::with_policy(retries, redirect_policy.clone());
10101013

10111014
Ok(Client {
10121015
inner: Arc::new(ClientRef {
@@ -1026,7 +1029,8 @@ impl ClientBuilder {
10261029
config.pool_idle_timeout,
10271030
config.cookie_store,
10281031
);
1029-
Some(FollowRedirect::with_policy(h3_service, policy))
1032+
let retries = tower::retry::Retry::new(retry_policy, h3_service);
1033+
Some(FollowRedirect::with_policy(retries, redirect_policy))
10301034
}
10311035
None => None,
10321036
},
@@ -1337,6 +1341,17 @@ impl ClientBuilder {
13371341
self
13381342
}
13391343

1344+
// Retry options
1345+
1346+
/// Set a request retry policy.
1347+
///
1348+
/// Default behavior is to retry protocol NACKs.
1349+
// XXX: accept an `impl retry::IntoPolicy` instead?
1350+
pub fn retry(mut self, policy: crate::retry::Builder) -> ClientBuilder {
1351+
self.config.retry_policy = policy;
1352+
self
1353+
}
1354+
13401355
// Proxy options
13411356

13421357
/// Add a `Proxy` to the list of proxies the `Client` will use.
@@ -2505,13 +2520,7 @@ impl Client {
25052520
_ => return Pending::new_err(error::url_invalid_uri(url)),
25062521
};
25072522

2508-
let (reusable, body) = match body {
2509-
Some(body) => {
2510-
let (reusable, body) = body.try_reuse();
2511-
(Some(reusable), body)
2512-
}
2513-
None => (None, Body::empty()),
2514-
};
2523+
let body = body.unwrap_or_else(Body::empty);
25152524

25162525
self.proxy_auth(&uri, &mut headers);
25172526
self.proxy_custom_headers(&uri, &mut headers);
@@ -2556,9 +2565,6 @@ impl Client {
25562565
method,
25572566
url,
25582567
headers,
2559-
body: reusable,
2560-
2561-
retry_count: 0,
25622568

25632569
client: self.inner.clone(),
25642570

@@ -2792,14 +2798,18 @@ impl Config {
27922798
}
27932799
}
27942800

2801+
type LayeredService<T> =
2802+
FollowRedirect<tower::retry::Retry<crate::retry::Policy, T>, TowerRedirectPolicy>;
2803+
type LayeredFuture<T> = <LayeredService<T> as Service<http::Request<Body>>>::Future;
2804+
27952805
struct ClientRef {
27962806
accepts: Accepts,
27972807
#[cfg(feature = "cookies")]
27982808
cookie_store: Option<Arc<dyn cookie::CookieStore>>,
27992809
headers: HeaderMap,
2800-
hyper: FollowRedirect<HyperService, TowerRedirectPolicy>,
2810+
hyper: LayeredService<HyperService>,
28012811
#[cfg(feature = "http3")]
2802-
h3_client: Option<FollowRedirect<H3Client, TowerRedirectPolicy>>,
2812+
h3_client: Option<LayeredService<H3Client>>,
28032813
referer: bool,
28042814
request_timeout: RequestConfig<RequestTimeout>,
28052815
read_timeout: Option<Duration>,
@@ -2863,9 +2873,6 @@ pin_project! {
28632873
method: Method,
28642874
url: Url,
28652875
headers: HeaderMap,
2866-
body: Option<Option<Bytes>>,
2867-
2868-
retry_count: usize,
28692876

28702877
client: Arc<ClientRef>,
28712878

@@ -2880,9 +2887,9 @@ pin_project! {
28802887
}
28812888

28822889
enum ResponseFuture {
2883-
Default(tower_http::follow_redirect::ResponseFuture<HyperService, Body, TowerRedirectPolicy>),
2890+
Default(LayeredFuture<HyperService>),
28842891
#[cfg(feature = "http3")]
2885-
H3(tower_http::follow_redirect::ResponseFuture<H3Client, Body, TowerRedirectPolicy>),
2892+
H3(LayeredFuture<H3Client>),
28862893
}
28872894

28882895
impl PendingRequest {
@@ -2897,103 +2904,6 @@ impl PendingRequest {
28972904
fn read_timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Pin<Box<Sleep>>>> {
28982905
self.project().read_timeout_fut
28992906
}
2900-
2901-
#[cfg(any(feature = "http2", feature = "http3"))]
2902-
fn retry_error(mut self: Pin<&mut Self>, err: &(dyn std::error::Error + 'static)) -> bool {
2903-
use log::trace;
2904-
2905-
if !is_retryable_error(err) {
2906-
return false;
2907-
}
2908-
2909-
trace!("can retry {err:?}");
2910-
2911-
let body = match self.body {
2912-
Some(Some(ref body)) => Body::reusable(body.clone()),
2913-
Some(None) => {
2914-
log::debug!("error was retryable, but body not reusable");
2915-
return false;
2916-
}
2917-
None => Body::empty(),
2918-
};
2919-
2920-
if self.retry_count >= 2 {
2921-
trace!("retry count too high");
2922-
return false;
2923-
}
2924-
self.retry_count += 1;
2925-
2926-
// If it parsed once, it should parse again
2927-
let uri = try_uri(&self.url).expect("URL was already validated as URI");
2928-
2929-
*self.as_mut().in_flight().get_mut() = match *self.as_mut().in_flight().as_ref() {
2930-
#[cfg(feature = "http3")]
2931-
ResponseFuture::H3(_) => {
2932-
let mut req = hyper::Request::builder()
2933-
.method(self.method.clone())
2934-
.uri(uri)
2935-
.body(body)
2936-
.expect("valid request parts");
2937-
*req.headers_mut() = self.headers.clone();
2938-
let mut h3 = self
2939-
.client
2940-
.h3_client
2941-
.as_ref()
2942-
.expect("H3 client must exists, otherwise we can't have a h3 request here")
2943-
.clone();
2944-
ResponseFuture::H3(h3.call(req))
2945-
}
2946-
_ => {
2947-
let mut req = hyper::Request::builder()
2948-
.method(self.method.clone())
2949-
.uri(uri)
2950-
.body(body)
2951-
.expect("valid request parts");
2952-
*req.headers_mut() = self.headers.clone();
2953-
let mut hyper = self.client.hyper.clone();
2954-
ResponseFuture::Default(hyper.call(req))
2955-
}
2956-
};
2957-
2958-
true
2959-
}
2960-
}
2961-
2962-
#[cfg(any(feature = "http2", feature = "http3"))]
2963-
fn is_retryable_error(err: &(dyn std::error::Error + 'static)) -> bool {
2964-
// pop the legacy::Error
2965-
let err = if let Some(err) = err.source() {
2966-
err
2967-
} else {
2968-
return false;
2969-
};
2970-
2971-
#[cfg(feature = "http3")]
2972-
if let Some(cause) = err.source() {
2973-
if let Some(err) = cause.downcast_ref::<h3::error::ConnectionError>() {
2974-
log::debug!("determining if HTTP/3 error {err} can be retried");
2975-
// TODO: Does h3 provide an API for checking the error?
2976-
return err.to_string().as_str() == "timeout";
2977-
}
2978-
}
2979-
2980-
#[cfg(feature = "http2")]
2981-
if let Some(cause) = err.source() {
2982-
if let Some(err) = cause.downcast_ref::<h2::Error>() {
2983-
// They sent us a graceful shutdown, try with a new connection!
2984-
if err.is_go_away() && err.is_remote() && err.reason() == Some(h2::Reason::NO_ERROR) {
2985-
return true;
2986-
}
2987-
2988-
// REFUSED_STREAM was sent from the server, which is safe to retry.
2989-
// https://www.rfc-editor.org/rfc/rfc9113.html#section-8.7-3.2
2990-
if err.is_reset() && err.is_remote() && err.reason() == Some(h2::Reason::REFUSED_STREAM)
2991-
{
2992-
return true;
2993-
}
2994-
}
2995-
}
2996-
false
29972907
}
29982908

29992909
impl Pending {
@@ -3042,66 +2952,49 @@ impl Future for PendingRequest {
30422952
}
30432953
}
30442954

3045-
loop {
3046-
let res = match self.as_mut().in_flight().get_mut() {
3047-
ResponseFuture::Default(r) => match ready!(Pin::new(r).poll(cx)) {
3048-
Err(e) => {
3049-
#[cfg(feature = "http2")]
3050-
if e.is_request() {
3051-
if let Some(e) = e.source() {
3052-
if self.as_mut().retry_error(e) {
3053-
continue;
3054-
}
3055-
}
3056-
}
3057-
3058-
return Poll::Ready(Err(e.if_no_url(|| self.url.clone())));
3059-
}
3060-
Ok(res) => res.map(super::body::boxed),
3061-
},
3062-
#[cfg(feature = "http3")]
3063-
ResponseFuture::H3(r) => match ready!(Pin::new(r).poll(cx)) {
3064-
Err(e) => {
3065-
if self.as_mut().retry_error(&e) {
3066-
continue;
3067-
}
3068-
return Poll::Ready(Err(
3069-
crate::error::request(e).with_url(self.url.clone())
3070-
));
3071-
}
3072-
Ok(res) => res,
3073-
},
3074-
};
3075-
3076-
#[cfg(feature = "cookies")]
3077-
{
3078-
if let Some(ref cookie_store) = self.client.cookie_store {
3079-
let mut cookies =
3080-
cookie::extract_response_cookie_headers(res.headers()).peekable();
3081-
if cookies.peek().is_some() {
3082-
cookie_store.set_cookies(&mut cookies, &self.url);
3083-
}
2955+
let res = match self.as_mut().in_flight().get_mut() {
2956+
ResponseFuture::Default(r) => match ready!(Pin::new(r).poll(cx)) {
2957+
Err(e) => {
2958+
return Poll::Ready(Err(e.if_no_url(|| self.url.clone())));
30842959
}
3085-
}
3086-
if let Some(url) = &res
3087-
.extensions()
3088-
.get::<tower_http::follow_redirect::RequestUri>()
3089-
{
3090-
self.url = match Url::parse(&url.0.to_string()) {
3091-
Ok(url) => url,
3092-
Err(e) => return Poll::Ready(Err(crate::error::decode(e))),
2960+
Ok(res) => res.map(super::body::boxed),
2961+
},
2962+
#[cfg(feature = "http3")]
2963+
ResponseFuture::H3(r) => match ready!(Pin::new(r).poll(cx)) {
2964+
Err(e) => {
2965+
return Poll::Ready(Err(crate::error::request(e).with_url(self.url.clone())));
30932966
}
3094-
};
2967+
Ok(res) => res,
2968+
},
2969+
};
30952970

3096-
let res = Response::new(
3097-
res,
3098-
self.url.clone(),
3099-
self.client.accepts,
3100-
self.total_timeout.take(),
3101-
self.read_timeout,
3102-
);
3103-
return Poll::Ready(Ok(res));
2971+
#[cfg(feature = "cookies")]
2972+
{
2973+
if let Some(ref cookie_store) = self.client.cookie_store {
2974+
let mut cookies = cookie::extract_response_cookie_headers(res.headers()).peekable();
2975+
if cookies.peek().is_some() {
2976+
cookie_store.set_cookies(&mut cookies, &self.url);
2977+
}
2978+
}
31042979
}
2980+
if let Some(url) = &res
2981+
.extensions()
2982+
.get::<tower_http::follow_redirect::RequestUri>()
2983+
{
2984+
self.url = match Url::parse(&url.0.to_string()) {
2985+
Ok(url) => url,
2986+
Err(e) => return Poll::Ready(Err(crate::error::decode(e))),
2987+
}
2988+
};
2989+
2990+
let res = Response::new(
2991+
res,
2992+
self.url.clone(),
2993+
self.client.accepts,
2994+
self.total_timeout.take(),
2995+
self.read_timeout,
2996+
);
2997+
Poll::Ready(Ok(res))
31052998
}
31062999
}
31073000

src/blocking/client.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,13 @@ impl ClientBuilder {
342342
self.with_inner(move |inner| inner.redirect(policy))
343343
}
344344

345+
/// Set a request retry policy.
346+
///
347+
/// Default behavior is to retry protocol NACKs.
348+
pub fn retry(self, policy: crate::retry::Builder) -> ClientBuilder {
349+
self.with_inner(move |inner| inner.retry(policy))
350+
}
351+
345352
/// Enable or disable automatic setting of the `Referer` header.
346353
///
347354
/// Default is `true`.

0 commit comments

Comments
 (0)