Skip to content

Commit 6ca95fe

Browse files
committed
feat: implement MASQUE connect-udp
1 parent 531cb0b commit 6ca95fe

File tree

14 files changed

+787
-81
lines changed

14 files changed

+787
-81
lines changed

echo.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import socket
2+
3+
HOST = '127.0.0.1' # Listen on all interfaces
4+
PORT = 12345 # Arbitrary non-privileged port
5+
6+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server_socket:
7+
server_socket.bind((HOST, PORT))
8+
print(f"UDP Echo Server listening on {HOST}:{PORT}")
9+
while True:
10+
data, addr = server_socket.recvfrom(1024)
11+
print(f"Received from {addr}: {data.decode()}")
12+
server_socket.sendto(data, addr)

neqo-bin/src/client/http3.rs

Lines changed: 106 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ use std::{
2323

2424
use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram, Header};
2525
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
26-
use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority};
26+
use neqo_http3::{
27+
ConnectUdpEvent, Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority,
28+
};
2729
use neqo_transport::{
28-
AppError, CloseReason, Connection, EmptyConnectionIdGenerator, Error as TransportError,
29-
OutputBatch, RandomConnectionIdGenerator, StreamId,
30+
AppError, CloseReason, Connection, DatagramTracking, EmptyConnectionIdGenerator, Error as TransportError, OutputBatch, RandomConnectionIdGenerator, StreamId
3031
};
3132
use rustc_hash::FxHashMap as HashMap;
3233
use url::Url;
@@ -82,7 +83,7 @@ pub fn create_client(
8283
cid_generator,
8384
local_addr,
8485
remote_addr,
85-
args.shared.quic_parameters.get(args.shared.alpn.as_str()),
86+
args.shared.quic_parameters.get(args.shared.alpn.as_str()).datagram_size(1500),
8687
Instant::now(),
8788
)?;
8889
let ciphers = args.get_ciphers();
@@ -95,7 +96,9 @@ pub fn create_client(
9596
.max_table_size_encoder(args.shared.max_table_size_encoder)
9697
.max_table_size_decoder(args.shared.max_table_size_decoder)
9798
.max_blocked_streams(args.shared.max_blocked_streams)
98-
.max_concurrent_push_streams(args.max_concurrent_push_streams),
99+
.max_concurrent_push_streams(args.max_concurrent_push_streams)
100+
.http3_datagram(true)
101+
.connect(true),
99102
);
100103

101104
let qlog = qlog_new(args, hostname, client.connection_id())?;
@@ -258,6 +261,29 @@ impl super::Handler for Handler<'_> {
258261
self.url_handler.process_urls(client);
259262
}
260263
Http3ClientEvent::ResumptionToken(t) => self.token = Some(t),
264+
Http3ClientEvent::ConnectUdp(event) => match event {
265+
ConnectUdpEvent::Negotiated(_) => todo!(),
266+
ConnectUdpEvent::Session {
267+
stream_id,
268+
status,
269+
headers,
270+
} => {
271+
client.connect_udp_send_datagram(
272+
stream_id,
273+
b"Hello World!",
274+
DatagramTracking::None,
275+
).unwrap();
276+
}
277+
ConnectUdpEvent::SessionClosed {
278+
stream_id,
279+
reason,
280+
headers,
281+
} => todo!(),
282+
ConnectUdpEvent::Datagram {
283+
session_id,
284+
datagram,
285+
} => panic!("Received datagram on session {session_id}: {}", String::from_utf8_lossy(&datagram)),
286+
},
261287
_ => {
262288
qwarn!("Unhandled event {event:?}");
263289
}
@@ -363,6 +389,26 @@ impl StreamHandler for UploadStreamHandler {
363389
}
364390
}
365391

392+
struct ConnectUdpStreamHandler {
393+
stream_id: StreamId,
394+
}
395+
396+
impl StreamHandler for ConnectUdpStreamHandler {
397+
fn process_data_readable(
398+
&mut self,
399+
_stream_id: StreamId,
400+
_fin: bool,
401+
_data: &[u8],
402+
_output_read_data: bool,
403+
) -> Res<()> {
404+
todo!()
405+
}
406+
407+
fn process_data_writable(&mut self, _________client: &mut Http3Client, _stream_id: StreamId) {
408+
todo!()
409+
}
410+
}
411+
366412
struct UrlHandler<'a> {
367413
url_queue: VecDeque<Url>,
368414
handled_urls: Vec<Url>,
@@ -391,53 +437,65 @@ impl UrlHandler<'_> {
391437
}
392438

393439
fn next_url(&mut self, client: &mut Http3Client) -> bool {
440+
if !client.state().active() {
441+
return false;
442+
}
443+
394444
let url = self
395445
.url_queue
396446
.pop_front()
397447
.expect("download_next called with empty queue");
398-
match client.fetch(
399-
Instant::now(),
400-
&self.args.method,
401-
&url,
402-
&to_headers(&self.args.header),
403-
Priority::default(),
404-
) {
405-
Ok(client_stream_id) => {
406-
qdebug!("Successfully created stream id {client_stream_id} for {url}");
407-
408-
let handler: Box<dyn StreamHandler> = match self.args.method.as_str() {
409-
"GET" => {
410-
let out_file = get_output_file(
411-
&url,
412-
self.args.output_dir.as_ref(),
413-
&mut self.all_paths,
414-
);
415-
client.stream_close_send(client_stream_id).unwrap();
416-
Box::new(DownloadStreamHandler { out_file })
417-
}
418-
"POST" => Box::new(UploadStreamHandler {
419-
data: SendData::zeroes(self.args.upload_size),
420-
start: Instant::now(),
421-
}),
422-
_ => unimplemented!(),
423-
};
424-
425-
self.stream_handlers.insert(client_stream_id, handler);
426-
self.handled_urls.push(url);
427-
true
428-
}
429-
Err(
430-
Error::Transport(TransportError::StreamLimit)
431-
| Error::StreamLimit
432-
| Error::Unavailable,
433-
) => {
434-
self.url_queue.push_front(url);
435-
false
436-
}
437-
Err(e) => {
438-
panic!("Can't create stream {e}");
439-
}
440-
}
448+
dbg!(&url);
449+
let stream_id = client
450+
.connect_udp_create_session(Instant::now(), &url, &[])
451+
.unwrap();
452+
self.stream_handlers
453+
.insert(stream_id, Box::new(ConnectUdpStreamHandler { stream_id }));
454+
455+
true
456+
// match client.fetch(
457+
// Instant::now(),
458+
// &self.args.method,
459+
// &url,
460+
// &to_headers(&self.args.header),
461+
// Priority::default(),
462+
// ) {
463+
// Ok(client_stream_id) => {
464+
// qdebug!("Successfully created stream id {client_stream_id} for {url}");
465+
466+
// let handler: Box<dyn StreamHandler> = match self.args.method.as_str() {
467+
// "GET" => {
468+
// let out_file = get_output_file(
469+
// &url,
470+
// self.args.output_dir.as_ref(),
471+
// &mut self.all_paths,
472+
// );
473+
// client.stream_close_send(client_stream_id).unwrap();
474+
// Box::new(DownloadStreamHandler { out_file })
475+
// }
476+
// "POST" => Box::new(UploadStreamHandler {
477+
// data: SendData::zeroes(self.args.upload_size),
478+
// start: Instant::now(),
479+
// }),
480+
// _ => unimplemented!(),
481+
// };
482+
483+
// self.stream_handlers.insert(client_stream_id, handler);
484+
// self.handled_urls.push(url);
485+
// true
486+
// }
487+
// Err(
488+
// Error::Transport(TransportError::StreamLimit)
489+
// | Error::StreamLimit
490+
// | Error::Unavailable,
491+
// ) => {
492+
// self.url_queue.push_front(url);
493+
// false
494+
// }
495+
// Err(e) => {
496+
// panic!("Can't create stream {e}");
497+
// }
498+
// }
441499
}
442500

443501
fn done(&self) -> bool {

neqo-http3/src/client_events.rs

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,26 @@ pub enum WebTransportEvent {
4141
},
4242
}
4343

44+
// TODO: All needed?
45+
#[derive(Debug, PartialEq, Eq, Clone)]
46+
pub enum ConnectUdpEvent {
47+
Negotiated(bool),
48+
Session {
49+
stream_id: StreamId,
50+
status: u16,
51+
headers: Vec<Header>,
52+
},
53+
SessionClosed {
54+
stream_id: StreamId,
55+
reason: SessionCloseReason,
56+
headers: Option<Vec<Header>>,
57+
},
58+
Datagram {
59+
session_id: StreamId,
60+
datagram: Vec<u8>,
61+
},
62+
}
63+
4464
#[derive(Debug, PartialEq, Eq, Clone)]
4565
pub enum Http3ClientEvent {
4666
/// Response headers are received.
@@ -103,6 +123,8 @@ pub enum Http3ClientEvent {
103123
StateChange(Http3State),
104124
/// `WebTransport` events
105125
WebTransport(WebTransportEvent),
126+
/// `ConnectUdp` events
127+
ConnectUdp(ConnectUdpEvent),
106128
}
107129

108130
#[derive(Debug, Default, Clone)]
@@ -188,14 +210,21 @@ impl ExtendedConnectEvents for Http3ClientEvents {
188210
status: u16,
189211
headers: Vec<Header>,
190212
) {
191-
if connect_type == ExtendedConnectType::WebTransport {
192-
self.insert(Http3ClientEvent::WebTransport(WebTransportEvent::Session {
193-
stream_id,
194-
status,
195-
headers,
196-
}));
197-
} else {
198-
unreachable!("There is only ExtendedConnectType::WebTransport");
213+
match connect_type {
214+
ExtendedConnectType::WebTransport => {
215+
self.insert(Http3ClientEvent::WebTransport(WebTransportEvent::Session {
216+
stream_id,
217+
status,
218+
headers,
219+
}));
220+
}
221+
ExtendedConnectType::ConnectUdp => {
222+
self.insert(Http3ClientEvent::ConnectUdp(ConnectUdpEvent::Session {
223+
stream_id,
224+
status,
225+
headers,
226+
}));
227+
}
199228
}
200229
}
201230

@@ -206,16 +235,28 @@ impl ExtendedConnectEvents for Http3ClientEvents {
206235
reason: SessionCloseReason,
207236
headers: Option<Vec<Header>>,
208237
) {
209-
if connect_type == ExtendedConnectType::WebTransport {
210-
self.insert(Http3ClientEvent::WebTransport(
211-
WebTransportEvent::SessionClosed {
212-
stream_id,
213-
reason,
214-
headers,
215-
},
216-
));
217-
} else {
218-
unreachable!("There are no other types");
238+
match connect_type {
239+
ExtendedConnectType::WebTransport => {
240+
self.insert(Http3ClientEvent::WebTransport(
241+
WebTransportEvent::SessionClosed {
242+
stream_id,
243+
reason,
244+
headers,
245+
},
246+
));
247+
}
248+
ExtendedConnectType::ConnectUdp => {
249+
self.insert(Http3ClientEvent::ConnectUdp(
250+
ConnectUdpEvent::SessionClosed {
251+
stream_id,
252+
reason,
253+
headers,
254+
},
255+
));
256+
}
257+
_ => {
258+
unreachable!("There are no other types");
259+
}
219260
}
220261
}
221262

@@ -230,8 +271,16 @@ impl ExtendedConnectEvents for Http3ClientEvents {
230271
}
231272

232273
fn new_datagram(&self, session_id: StreamId, datagram: Vec<u8>) {
233-
self.insert(Http3ClientEvent::WebTransport(
234-
WebTransportEvent::Datagram {
274+
// TODO
275+
// self.insert(Http3ClientEvent::WebTransport(
276+
// WebTransportEvent::Datagram {
277+
// session_id,
278+
// datagram,
279+
// },
280+
// ));
281+
282+
self.insert(Http3ClientEvent::ConnectUdp(
283+
ConnectUdpEvent::Datagram{
235284
session_id,
236285
datagram,
237286
},
@@ -356,6 +405,7 @@ impl Http3ClientEvents {
356405
}
357406

358407
pub fn negotiation_done(&self, feature_type: HSettingType, succeeded: bool) {
408+
// TODO: Emit event for extended connect type?
359409
if feature_type == HSettingType::EnableWebTransport {
360410
self.insert(Http3ClientEvent::WebTransport(
361411
WebTransportEvent::Negotiated(succeeded),

neqo-http3/src/conn_params.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const QPACK_TABLE_SIZE_LIMIT: u64 = (1 << 30) - 1;
1414
const QPACK_MAX_BLOCKED_STREAMS_DEFAULT: u16 = 20;
1515
const MAX_PUSH_STREAM_DEFAULT: u64 = 0;
1616
const WEBTRANSPORT_DEFAULT: bool = false;
17+
const CONNECT_DEFAULT: bool = true;
1718
const HTTP3_DATAGRAM_DEFAULT: bool = false;
1819

1920
#[derive(Debug, Clone)]
@@ -22,6 +23,7 @@ pub struct Http3Parameters {
2223
qpack_settings: qpack::Settings,
2324
max_concurrent_push_streams: u64,
2425
webtransport: bool,
26+
connect: bool,
2527
http3_datagram: bool,
2628
}
2729

@@ -36,6 +38,7 @@ impl Default for Http3Parameters {
3638
},
3739
max_concurrent_push_streams: MAX_PUSH_STREAM_DEFAULT,
3840
webtransport: WEBTRANSPORT_DEFAULT,
41+
connect: CONNECT_DEFAULT,
3942
http3_datagram: HTTP3_DATAGRAM_DEFAULT,
4043
}
4144
}
@@ -118,11 +121,24 @@ impl Http3Parameters {
118121
self
119122
}
120123

124+
// TODO: Should this even be callable on the client side?
125+
#[must_use]
126+
pub const fn connect(mut self, connect: bool) -> Self {
127+
self.connect = connect;
128+
self
129+
}
130+
121131
#[must_use]
122132
pub const fn get_webtransport(&self) -> bool {
123133
self.webtransport
124134
}
125135

136+
#[must_use]
137+
pub const fn get_connect(&self) -> bool {
138+
assert!(self.connect);
139+
self.connect
140+
}
141+
126142
#[must_use]
127143
pub const fn http3_datagram(mut self, http3_datagram: bool) -> Self {
128144
self.http3_datagram = http3_datagram;

0 commit comments

Comments
 (0)