Skip to content

Commit be4a023

Browse files
Noah-Kennedydrcaramelsyrup
authored andcommitted
add tweak_new_upstream_tcp_connection hook
Add a new hook which is invoked on a TCP socket before it is connected.
1 parent 42e11c4 commit be4a023

File tree

6 files changed

+72
-23
lines changed

6 files changed

+72
-23
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
9df2f3f6e6b919a632b08af4584a1c1a3bcee0fd
1+
5c08613f7ab19914dc8a45a37590a06a325127bb

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ tokio = "1"
2727
async-trait = "0.1.42"
2828
httparse = "1"
2929
bytes = "1.0"
30+
derivative = "2.2.0"
3031
http = "1.0.0"
3132
log = "0.4"
3233
h2 = ">=0.4.6"

pingora-core/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ bytes = { workspace = true }
3535
http = { workspace = true }
3636
log = { workspace = true }
3737
h2 = { workspace = true }
38+
derivative.workspace = true
3839
clap = { version = "3.2.25", features = ["derive"] }
3940
once_cell = { workspace = true }
4041
serde = { version = "1.0", features = ["derive"] }
@@ -88,8 +89,8 @@ jemallocator = "0.5"
8889

8990
[features]
9091
default = []
91-
openssl = ["pingora-openssl", "openssl_derived",]
92-
boringssl = ["pingora-boringssl", "openssl_derived",]
92+
openssl = ["pingora-openssl", "openssl_derived"]
93+
boringssl = ["pingora-boringssl", "openssl_derived"]
9394
rustls = ["pingora-rustls", "any_tls", "dep:x509-parser", "ouroboros"]
9495
patched_http1 = ["pingora-http/patched_http1"]
9596
openssl_derived = ["any_tls"]

pingora-core/src/connectors/l4.rs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use async_trait::async_trait;
16-
use log::debug;
17-
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
18-
use rand::seq::SliceRandom;
19-
use std::net::SocketAddr as InetSocketAddr;
20-
#[cfg(unix)]
21-
use std::os::unix::io::AsRawFd;
22-
#[cfg(windows)]
23-
use std::os::windows::io::AsRawSocket;
24-
2515
#[cfg(unix)]
2616
use crate::protocols::l4::ext::connect_uds;
2717
use crate::protocols::l4::ext::{
@@ -31,6 +21,15 @@ use crate::protocols::l4::socket::SocketAddr;
3121
use crate::protocols::l4::stream::Stream;
3222
use crate::protocols::{GetSocketDigest, SocketDigest};
3323
use crate::upstreams::peer::Peer;
24+
use async_trait::async_trait;
25+
use log::debug;
26+
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
27+
use rand::seq::SliceRandom;
28+
use std::net::SocketAddr as InetSocketAddr;
29+
#[cfg(unix)]
30+
use std::os::unix::io::AsRawFd;
31+
#[cfg(windows)]
32+
use std::os::windows::io::AsRawSocket;
3433

3534
/// The interface to establish a L4 connection
3635
#[async_trait]
@@ -123,6 +122,14 @@ where
123122
debug!("Setting dscp");
124123
set_dscp(raw, dscp)?;
125124
}
125+
126+
if let Some(tweak_hook) = peer
127+
.get_peer_options()
128+
.and_then(|o| o.upstream_tcp_sock_tweak_hook.clone())
129+
{
130+
tweak_hook(socket)?;
131+
}
132+
126133
Ok(())
127134
});
128135
let conn_res = match peer.connection_timeout() {
@@ -302,6 +309,8 @@ mod tests {
302309
use crate::upstreams::peer::{BasicPeer, HttpPeer, Proxy};
303310
use std::collections::BTreeMap;
304311
use std::path::PathBuf;
312+
use std::sync::atomic::{AtomicBool, Ordering};
313+
use std::sync::Arc;
305314
use tokio::io::AsyncWriteExt;
306315
#[cfg(unix)]
307316
use tokio::net::UnixListener;
@@ -358,6 +367,26 @@ mod tests {
358367
assert_eq!(new_session.unwrap_err().etype(), &ConnectTimedout)
359368
}
360369

370+
#[tokio::test]
371+
async fn test_tweak_hook() {
372+
const INIT_FLAG: bool = false;
373+
374+
let flag = Arc::new(AtomicBool::new(INIT_FLAG));
375+
376+
let mut peer = BasicPeer::new("1.1.1.1:80");
377+
378+
let move_flag = Arc::clone(&flag);
379+
380+
peer.options.upstream_tcp_sock_tweak_hook = Some(Arc::new(move |_| {
381+
move_flag.fetch_xor(true, Ordering::SeqCst);
382+
Ok(())
383+
}));
384+
385+
connect(&peer, None).await.unwrap();
386+
387+
assert_eq!(!INIT_FLAG, flag.load(Ordering::SeqCst));
388+
}
389+
361390
#[tokio::test]
362391
async fn test_custom_connect() {
363392
#[derive(Debug)]

pingora-core/src/upstreams/peer.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,15 @@
1414

1515
//! Defines where to connect to and how to connect to a remote server
1616
17+
use crate::connectors::{l4::BindTo, L4Connect};
18+
use crate::protocols::l4::socket::SocketAddr;
19+
use crate::protocols::tls::CaType;
20+
#[cfg(unix)]
21+
use crate::protocols::ConnFdReusable;
22+
use crate::protocols::TcpKeepalive;
23+
use crate::utils::tls::{get_organization_unit, CertKey};
1724
use ahash::AHasher;
25+
use derivative::Derivative;
1826
use pingora_error::{
1927
ErrorType::{InternalError, SocketError},
2028
OrErr, Result,
@@ -30,14 +38,7 @@ use std::os::windows::io::AsRawSocket;
3038
use std::path::{Path, PathBuf};
3139
use std::sync::Arc;
3240
use std::time::Duration;
33-
34-
use crate::connectors::{l4::BindTo, L4Connect};
35-
use crate::protocols::l4::socket::SocketAddr;
36-
use crate::protocols::tls::CaType;
37-
#[cfg(unix)]
38-
use crate::protocols::ConnFdReusable;
39-
use crate::protocols::TcpKeepalive;
40-
use crate::utils::tls::{get_organization_unit, CertKey};
41+
use tokio::net::TcpSocket;
4142

4243
pub use crate::protocols::tls::ALPN;
4344

@@ -203,6 +204,17 @@ pub trait Peer: Display + Clone {
203204
fn get_tracer(&self) -> Option<Tracer> {
204205
None
205206
}
207+
208+
/// Returns a hook that should be run before an upstream TCP connection is connected.
209+
///
210+
/// This hook can be used to set additional socket options.
211+
fn upstream_tcp_sock_tweak_hook(
212+
&self,
213+
) -> Option<&Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>> {
214+
self.get_peer_options()?
215+
.upstream_tcp_sock_tweak_hook
216+
.as_ref()
217+
}
206218
}
207219

208220
/// A simple TCP or TLS peer without many complicated settings.
@@ -303,7 +315,9 @@ impl Scheme {
303315
/// The preferences to connect to a remote server
304316
///
305317
/// See [`Peer`] for the meaning of the fields
306-
#[derive(Clone, Debug)]
318+
#[non_exhaustive]
319+
#[derive(Clone, Derivative)]
320+
#[derivative(Debug)]
307321
pub struct PeerOptions {
308322
pub bind_to: Option<BindTo>,
309323
pub connection_timeout: Option<Duration>,
@@ -335,6 +349,9 @@ pub struct PeerOptions {
335349
pub tracer: Option<Tracer>,
336350
// A custom L4 connector to use to establish new L4 connections
337351
pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
352+
#[derivative(Debug = "ignore")]
353+
pub upstream_tcp_sock_tweak_hook:
354+
Option<Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>>,
338355
}
339356

340357
impl PeerOptions {
@@ -363,6 +380,7 @@ impl PeerOptions {
363380
tcp_fast_open: false,
364381
tracer: None,
365382
custom_l4: None,
383+
upstream_tcp_sock_tweak_hook: None,
366384
}
367385
}
368386

pingora-load-balancing/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ tokio = { workspace = true }
3030
futures = "0"
3131
log = { workspace = true }
3232
http = { workspace = true }
33-
derivative = "2.2.0"
33+
derivative.workspace = true
3434

3535
[dev-dependencies]
3636

0 commit comments

Comments
 (0)