Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cli/tsc/dts/lib.deno.ns.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5157,6 +5157,18 @@ declare namespace Deno {

/** Sets `SO_REUSEPORT` on POSIX systems. */
reusePort?: boolean;

/** Maximum number of pending connections in the listen queue.
*
* This parameter controls how many incoming connections can be queued by the
* operating system while waiting for the application to accept them. If more
* connections arrive when the queue is full, they will be refused.
*
* The kernel may adjust this value (e.g., rounding up to the next power of 2
* plus 1). Different operating systems have different maximum limits.
*
* @default {511} */
tcpBacklog?: number;
}

/**
Expand Down
12 changes: 12 additions & 0 deletions cli/tsc/dts/lib.deno_net.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,18 @@ declare namespace Deno {
*
* @default {"0.0.0.0"} */
hostname?: string;

/** Maximum number of pending connections in the listen queue.
*
* This parameter controls how many incoming connections can be queued by the
* operating system while waiting for the application to accept them. If more
* connections arrive when the queue is full, they will be refused.
*
* The kernel may adjust this value (e.g., rounding up to the next power of 2
* plus 1). Different operating systems have different maximum limits.
*
* @default {511} */
tcpBacklog?: number;
}

/** @category Network */
Expand Down
1 change: 1 addition & 0 deletions ext/http/00_serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ function serveInner(options, handler) {
port: options.port ?? 8000,
reusePort: options.reusePort ?? false,
loadBalanced: options[kLoadBalanced] ?? false,
backlog: options.backlog,
};

if (options.certFile || options.keyFile) {
Expand Down
1 change: 1 addition & 0 deletions ext/net/01_net.js
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ function listen(args) {
},
args.reusePort,
args.loadBalanced ?? false,
args.tcpBacklog ?? 511,
);
addr.transport = "tcp";
return new Listener(rid, addr, "tcp");
Expand Down
3 changes: 2 additions & 1 deletion ext/net/02_tls.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ function listenTls({
transport = "tcp",
alpnProtocols = undefined,
reusePort = false,
tcpBacklog = 511,
}) {
if (transport !== "tcp") {
throw new TypeError(`Unsupported transport: '${transport}'`);
Expand All @@ -176,7 +177,7 @@ function listenTls({
const keyPair = loadTlsKeyPair("Deno.listenTls", arguments[0]);
const { 0: rid, 1: localAddr } = op_net_listen_tls(
{ hostname, port },
{ alpnProtocols, reusePort },
{ alpnProtocols, reusePort, tcpBacklog },
keyPair,
);
return new TlsListener(rid, localAddr);
Expand Down
7 changes: 4 additions & 3 deletions ext/net/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ pub fn op_net_listen_tcp<NP>(
#[serde] addr: IpAddr,
reuse_port: bool,
load_balanced: bool,
tcp_backlog: i32,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i32? What happens when this is negative?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For whatever reason, the libc function itself takes a signed int https://man7.org/linux/man-pages/man2/listen.2.html. As far as I can tell, negative values usually just act like you passed 0.

) -> Result<(ResourceId, IpAddr), NetError>
where
NP: NetPermissions + 'static,
Expand All @@ -589,9 +590,9 @@ where
.ok_or_else(|| NetError::NoResolvedAddress)?;

let listener = if load_balanced {
TcpListener::bind_load_balanced(addr)
TcpListener::bind_load_balanced(addr, tcp_backlog)
} else {
TcpListener::bind_direct(addr, reuse_port)
TcpListener::bind_direct(addr, reuse_port, tcp_backlog)
}?;
let local_addr = listener.local_addr()?;
let listener_resource = NetworkListenerResource::new(listener);
Expand Down Expand Up @@ -1483,7 +1484,7 @@ mod tests {
let sockets = Arc::new(Mutex::new(vec![]));
let clone_addr = addr.clone();
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
let listener = TcpListener::bind_direct(addr, false).unwrap();
let listener = TcpListener::bind_direct(addr, false, 511).unwrap();
let accept_fut = listener.accept().boxed_local();
let store_fut = async move {
let socket = accept_fut.await.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions ext/net/ops_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ pub struct ListenTlsArgs {
reuse_port: bool,
#[serde(default)]
load_balanced: bool,
tcp_backlog: i32,
}

#[op2(stack_trace)]
Expand Down Expand Up @@ -543,9 +544,9 @@ where
.ok_or(NetError::NoResolvedAddress)?;

let tcp_listener = if args.load_balanced {
TcpListener::bind_load_balanced(bind_addr)
TcpListener::bind_load_balanced(bind_addr, args.tcp_backlog)
} else {
TcpListener::bind_direct(bind_addr, args.reuse_port)
TcpListener::bind_direct(bind_addr, args.reuse_port, args.tcp_backlog)
}?;
let local_addr = tcp_listener.local_addr()?;
let alpn = args
Expand Down
23 changes: 14 additions & 9 deletions ext/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub struct TcpConnection {

impl TcpConnection {
/// Boot a load-balanced TCP connection
pub fn start(key: SocketAddr) -> std::io::Result<Self> {
let listener = bind_socket_and_listen(key, false)?;
pub fn start(key: SocketAddr, backlog: i32) -> std::io::Result<Self> {
let listener = bind_socket_and_listen(key, false, backlog)?;
let sock = listener.into();

Ok(Self { sock, key })
Expand Down Expand Up @@ -78,11 +78,12 @@ impl TcpListener {
pub fn bind(
socket_addr: SocketAddr,
reuse_port: bool,
backlog: i32,
) -> std::io::Result<Self> {
if REUSE_PORT_LOAD_BALANCES && reuse_port {
Self::bind_load_balanced(socket_addr)
Self::bind_load_balanced(socket_addr, backlog)
} else {
Self::bind_direct(socket_addr, reuse_port)
Self::bind_direct(socket_addr, reuse_port, backlog)
}
}

Expand All @@ -91,17 +92,21 @@ impl TcpListener {
pub fn bind_direct(
socket_addr: SocketAddr,
reuse_port: bool,
backlog: i32,
) -> std::io::Result<Self> {
// We ignore `reuse_port` on platforms other than Linux to match the existing behaviour.
let listener = bind_socket_and_listen(socket_addr, reuse_port)?;
let listener = bind_socket_and_listen(socket_addr, reuse_port, backlog)?;
Ok(Self {
listener: Some(tokio::net::TcpListener::from_std(listener)?),
conn: None,
})
}

/// Bind to the port in a load-balanced manner.
pub fn bind_load_balanced(socket_addr: SocketAddr) -> std::io::Result<Self> {
pub fn bind_load_balanced(
socket_addr: SocketAddr,
backlog: i32,
) -> std::io::Result<Self> {
let tcp = &mut CONNS.get_or_init(Default::default).lock().unwrap().tcp;
if let Some(conn) = tcp.get(&socket_addr) {
let listener = Some(conn.listener()?);
Expand All @@ -110,7 +115,7 @@ impl TcpListener {
conn: Some(conn.clone()),
});
}
let conn = Arc::new(TcpConnection::start(socket_addr)?);
let conn = Arc::new(TcpConnection::start(socket_addr, backlog)?);
let listener = Some(conn.listener()?);
tcp.insert(socket_addr, conn.clone());
Ok(Self {
Expand Down Expand Up @@ -151,6 +156,7 @@ impl Drop for TcpListener {
fn bind_socket_and_listen(
socket_addr: SocketAddr,
reuse_port: bool,
backlog: i32,
) -> Result<std::net::TcpListener, std::io::Error> {
let socket = if socket_addr.is_ipv4() {
socket2::Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?
Expand All @@ -170,8 +176,7 @@ fn bind_socket_and_listen(
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&socket_addr.into())?;
// Kernel will round it up to the next power of 2 + 1.
socket.listen(511)?;
socket.listen(backlog)?;
let listener = socket.into();
Ok(listener)
}