Skip to content

Commit 843c209

Browse files
committed
feat: implement MASQUE connect-udp
1 parent 531cb0b commit 843c209

File tree

16 files changed

+1056
-55
lines changed

16 files changed

+1056
-55
lines changed

echo.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import socket
2+
3+
HOST = '127.0.0.1' # Listen on all interfaces
4+
PORT = 12345 # Arbitrary non-privileged port
5+
6+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server_socket:
7+
server_socket.bind((HOST, PORT))
8+
print(f"UDP Echo Server listening on {HOST}:{PORT}")
9+
while True:
10+
data, addr = server_socket.recvfrom(1024)
11+
print(f"Received from {addr}: {data.decode()}")
12+
server_socket.sendto(data, addr)

neqo-bin/src/client/http3.rs

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,30 @@ use std::{
2323

2424
use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram, Header};
2525
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
26-
use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority};
26+
use neqo_http3::{
27+
ConnectUdpEvent, Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority,
28+
};
2729
use neqo_transport::{
28-
AppError, CloseReason, Connection, EmptyConnectionIdGenerator, Error as TransportError,
29-
OutputBatch, RandomConnectionIdGenerator, StreamId,
30+
AppError, CloseReason, Connection, DatagramTracking, EmptyConnectionIdGenerator,
31+
Error as TransportError, OutputBatch, RandomConnectionIdGenerator, StreamId,
3032
};
3133
use rustc_hash::FxHashMap as HashMap;
3234
use url::Url;
3335

34-
use super::{get_output_file, qlog_new, Args, CloseState, Res};
35-
use crate::{send_data::SendData, STREAM_IO_BUFFER_SIZE};
36+
use super::{qlog_new, Args, CloseState, Res};
37+
use crate::{client::get_output_file, send_data::SendData, STREAM_IO_BUFFER_SIZE};
3638

37-
pub struct Handler<'a> {
39+
pub struct Handler {
3840
#[expect(clippy::struct_field_names, reason = "This name is more descriptive.")]
39-
url_handler: UrlHandler<'a>,
41+
url_handler: UrlHandler,
4042
token: Option<ResumptionToken>,
4143
output_read_data: bool,
4244
read_buffer: Vec<u8>,
4345
}
4446

45-
impl<'a> Handler<'a> {
46-
pub(crate) fn new(url_queue: VecDeque<Url>, args: &'a Args) -> Self {
47+
impl Handler {
48+
pub(crate) fn new(url_queue: VecDeque<Url>, args: Args) -> Self {
49+
let output_read_data = args.output_read_data;
4750
let url_handler = UrlHandler {
4851
url_queue,
4952
handled_urls: Vec::new(),
@@ -55,7 +58,7 @@ impl<'a> Handler<'a> {
5558
Self {
5659
url_handler,
5760
token: None,
58-
output_read_data: args.output_read_data,
61+
output_read_data,
5962
read_buffer: vec![0; STREAM_IO_BUFFER_SIZE],
6063
}
6164
}
@@ -82,7 +85,10 @@ pub fn create_client(
8285
cid_generator,
8386
local_addr,
8487
remote_addr,
85-
args.shared.quic_parameters.get(args.shared.alpn.as_str()),
88+
args.shared
89+
.quic_parameters
90+
.get(args.shared.alpn.as_str())
91+
.datagram_size(1500),
8692
Instant::now(),
8793
)?;
8894
let ciphers = args.get_ciphers();
@@ -95,7 +101,9 @@ pub fn create_client(
95101
.max_table_size_encoder(args.shared.max_table_size_encoder)
96102
.max_table_size_decoder(args.shared.max_table_size_decoder)
97103
.max_blocked_streams(args.shared.max_blocked_streams)
98-
.max_concurrent_push_streams(args.max_concurrent_push_streams),
104+
.max_concurrent_push_streams(args.max_concurrent_push_streams)
105+
.http3_datagram(true)
106+
.connect(true),
99107
);
100108

101109
let qlog = qlog_new(args, hostname, client.connection_id())?;
@@ -140,6 +148,7 @@ impl super::Client for Http3Client {
140148
now: Instant,
141149
max_datagrams: NonZeroUsize,
142150
) -> OutputBatch {
151+
println!("==== client process_multiple_output ====");
143152
self.process_multiple_output(now, max_datagrams)
144153
}
145154

@@ -148,6 +157,7 @@ impl super::Client for Http3Client {
148157
dgrams: impl IntoIterator<Item = Datagram<&'a mut [u8]>>,
149158
now: Instant,
150159
) {
160+
println!("==== client process_multiple_input ====");
151161
self.process_multiple_input(dgrams, now);
152162
}
153163

@@ -167,7 +177,7 @@ impl super::Client for Http3Client {
167177
}
168178
}
169179

170-
impl Handler<'_> {
180+
impl Handler {
171181
fn reinit(&mut self) {
172182
for url in self.url_handler.handled_urls.drain(..) {
173183
self.url_handler.url_queue.push_front(url);
@@ -177,7 +187,7 @@ impl Handler<'_> {
177187
}
178188
}
179189

180-
impl super::Handler for Handler<'_> {
190+
impl super::Handler for Handler {
181191
type Client = Http3Client;
182192

183193
fn handle(&mut self, client: &mut Http3Client) -> Res<bool> {
@@ -258,6 +268,25 @@ impl super::Handler for Handler<'_> {
258268
self.url_handler.process_urls(client);
259269
}
260270
Http3ClientEvent::ResumptionToken(t) => self.token = Some(t),
271+
Http3ClientEvent::ConnectUdp(event) => match event {
272+
ConnectUdpEvent::Negotiated(_) => todo!(),
273+
ConnectUdpEvent::Session {
274+
stream_id,
275+
status: _,
276+
headers: _,
277+
} => {
278+
todo!();
279+
}
280+
ConnectUdpEvent::SessionClosed {
281+
stream_id: _,
282+
reason: _,
283+
headers: _,
284+
} => todo!(),
285+
ConnectUdpEvent::Datagram {
286+
session_id,
287+
datagram,
288+
} => todo!(),
289+
},
261290
_ => {
262291
qwarn!("Unhandled event {event:?}");
263292
}
@@ -363,15 +392,15 @@ impl StreamHandler for UploadStreamHandler {
363392
}
364393
}
365394

366-
struct UrlHandler<'a> {
395+
struct UrlHandler {
367396
url_queue: VecDeque<Url>,
368397
handled_urls: Vec<Url>,
369398
stream_handlers: HashMap<StreamId, Box<dyn StreamHandler>>,
370399
all_paths: Vec<PathBuf>,
371-
args: &'a Args,
400+
args: Args,
372401
}
373402

374-
impl UrlHandler<'_> {
403+
impl UrlHandler {
375404
fn stream_handler(&mut self, stream_id: StreamId) -> Option<&mut Box<dyn StreamHandler>> {
376405
self.stream_handlers.get_mut(&stream_id)
377406
}

neqo-bin/src/client/mod.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::SharedArgs;
3939

4040
mod http09;
4141
mod http3;
42+
mod proxy;
4243

4344
const BUFWRITER_BUFFER_SIZE: usize = 64 * 1024;
4445

@@ -109,6 +110,7 @@ type Res<T> = Result<T, Error>;
109110
clippy::struct_excessive_bools,
110111
reason = "Not a good use of that lint."
111112
)]
113+
#[derive(Clone)]
112114
pub struct Args {
113115
#[command(flatten)]
114116
shared: SharedArgs,
@@ -179,6 +181,9 @@ pub struct Args {
179181
#[arg(name = "cid-length", short = 'l', long, default_value = "0",
180182
value_parser = clap::value_parser!(u8).range(..=20))]
181183
cid_len: u8,
184+
185+
#[arg(name = "proxy", long)]
186+
proxy: Option<Url>,
182187
}
183188

184189
impl Args {
@@ -219,6 +224,7 @@ impl Args {
219224
upload_size,
220225
stats: false,
221226
cid_len: 0,
227+
proxy: None,
222228
}
223229
}
224230

@@ -386,6 +392,7 @@ trait Handler {
386392
fn take_token(&mut self) -> Option<ResumptionToken>;
387393
}
388394

395+
#[derive(Debug)]
389396
enum CloseState {
390397
NotClosing,
391398
Closing,
@@ -442,11 +449,11 @@ impl<'a, H: Handler> Runner<'a, H> {
442449
loop {
443450
let handler_done = self.handler.handle(&mut self.client)?;
444451
self.process_output().await?;
445-
if self.client.has_events() {
452+
if dbg!(self.client.has_events()) {
446453
continue;
447454
}
448455

449-
match (handler_done, self.client.is_closed()?) {
456+
match (dbg!(handler_done), dbg!(self.client.is_closed()?)) {
450457
// more work; or no more work, already closing connection
451458
(true, CloseState::Closing) | (false, _) => {}
452459
// no more work, closing connection
@@ -595,6 +602,61 @@ pub async fn client(mut args: Args) -> Res<()> {
595602

596603
init()?;
597604

605+
if let Some(proxy_url) = &args.proxy {
606+
let url = args.urls.pop().expect("TODO");
607+
let Origin::Tuple( _scheme, host, _port) = url.clone().origin() else {
608+
panic!();
609+
};
610+
let Origin::Tuple(_scheme, proxy_host, proxy_port) = proxy_url.origin() else {
611+
panic!();
612+
};
613+
let proxy_addr = format!("{proxy_host}:{proxy_port}").to_socket_addrs()?.find(|addr| {
614+
!matches!(
615+
(addr, args.ipv4_only, args.ipv6_only),
616+
(SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false)
617+
)
618+
});
619+
620+
let remote_addr = format!("{proxy_host}:{proxy_port}").to_socket_addrs()?.find(|addr| {
621+
!matches!(
622+
(addr, args.ipv4_only, args.ipv6_only),
623+
(SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false)
624+
)
625+
}).unwrap();
626+
let Some(proxy_addr) = proxy_addr else {
627+
qerror!("No compatible address found for: {proxy_host}");
628+
exit(1);
629+
};
630+
let mut socket = crate::udp::Socket::bind(local_addr_for(&proxy_addr, 0))?;
631+
let real_local = socket.local_addr().unwrap();
632+
qinfo!(
633+
"{} Client connecting: {real_local:?} -> {proxy_addr:?}",
634+
args.shared.alpn
635+
);
636+
637+
let hostname = format!("{host}");
638+
let proxy_hostname = format!("{proxy_host}");
639+
let client = http3::create_client(&args, real_local, remote_addr, &hostname, None)
640+
.expect("failed to create client");
641+
642+
let proxy = http3::create_client(&args, real_local, proxy_addr, &proxy_hostname, None)
643+
.expect("failed to create client");
644+
645+
let mut urls = VecDeque::new();
646+
urls.push_back(url.clone());
647+
let handler = http3::Handler::new(urls, args.clone());
648+
649+
let proxy = proxy::Proxy::new(client, handler, proxy, proxy_url.clone());
650+
651+
let proxy_handler = proxy::Handler::new();
652+
653+
Runner::new(real_local, &mut socket,proxy, proxy_handler, &args)
654+
.run()
655+
.await?;
656+
657+
return Ok(());
658+
}
659+
598660
for ((host, port), mut urls) in urls_by_origin(&args.urls) {
599661
if args.resume && urls.len() < 2 {
600662
qerror!("Resumption to {host} cannot work without at least 2 URLs");
@@ -634,7 +696,7 @@ pub async fn client(mut args: Args) -> Res<()> {
634696
let client = http3::create_client(&args, real_local, remote_addr, &hostname, token)
635697
.expect("failed to create client");
636698

637-
let handler = http3::Handler::new(to_request, &args);
699+
let handler = http3::Handler::new(to_request, args.clone());
638700

639701
Runner::new(real_local, &mut socket, client, handler, &args)
640702
.run()

0 commit comments

Comments
 (0)