Skip to content

Commit a28c078

Browse files
committed
doc: add wrpc-transport docs
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent cca4dd3 commit a28c078

File tree

13 files changed

+122
-8
lines changed

13 files changed

+122
-8
lines changed

SPEC.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
wRPC is a transport-agnostic protocol designed for asynchronous transmit of WIT function call invocations over network and other means of communication.
44

5-
wRPC follows client-server model, where peers may *serve* function (servers) and method calls invoked by the other peers (clients).
5+
wRPC follows client-server model, where peers (servers) may *serve* function and method calls invoked by the other peers (clients).
66

77
wRPC relies on [component model value definition encoding] for data encoding on the wire.
88

crates/transport/src/frame/codec.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use wasm_tokio::{Leb128DecoderU32, Leb128DecoderU64, Leb128Encoder};
66

77
use super::{Frame, FrameRef};
88

9+
/// [Frame] decoder
910
pub struct Decoder {
1011
path: Option<Vec<usize>>,
1112
path_cap: usize,
@@ -15,6 +16,7 @@ pub struct Decoder {
1516
}
1617

1718
impl Decoder {
19+
/// Construct a new [Frame] decoder
1820
#[must_use]
1921
pub fn new(max_depth: u32, max_size: u64) -> Self {
2022
Self {
@@ -125,6 +127,7 @@ impl tokio_util::codec::Decoder for Decoder {
125127
}
126128
}
127129

130+
/// [Frame] encoder
128131
pub struct Encoder;
129132

130133
impl tokio_util::codec::Encoder<FrameRef<'_>> for Encoder {

crates/transport/src/frame/conn/accept.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use core::ops::{Deref, DerefMut};
33

44
use tokio::io::{AsyncRead, AsyncWrite};
55

6+
/// Accepts connections on a transport
67
pub trait Accept {
78
/// Transport-specific invocation context
89
type Context: Send + Sync + 'static;
@@ -13,11 +14,13 @@ pub trait Accept {
1314
/// Incoming byte stream
1415
type Incoming: AsyncRead + Send + Sync + Unpin + 'static;
1516

17+
/// Accept a connection returning a pair of streams and connection context
1618
fn accept(
1719
&self,
1820
) -> impl Future<Output = std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)>>;
1921
}
2022

23+
/// Wrapper returned by [`AcceptExt::map_context`]
2124
pub struct AcceptMapContext<T, F> {
2225
inner: T,
2326
f: F,
@@ -37,7 +40,9 @@ impl<T, F> DerefMut for AcceptMapContext<T, F> {
3740
}
3841
}
3942

43+
/// Extension trait for [Accept]
4044
pub trait AcceptExt: Accept + Sized {
45+
/// Maps [`Self::Context`](Accept::Context) to a type `T` using `F`
4146
fn map_context<T, F: Fn(Self::Context) -> T>(self, f: F) -> AcceptMapContext<Self, F> {
4247
AcceptMapContext { inner: self, f }
4348
}

crates/transport/src/frame/conn/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use wasm_tokio::{CoreNameEncoder, CoreVecEncoderBytes};
1515
use crate::frame::conn::{egress, ingress, Incoming, Outgoing};
1616
use crate::frame::PROTOCOL;
1717

18+
/// Invoke function `func` on instance `instance`
1819
#[instrument(level = "trace", skip_all)]
1920
pub async fn invoke<P, I, O>(
2021
mut tx: O,

crates/transport/src/frame/conn/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ pub use accept::*;
2828
pub use client::*;
2929
pub use server::*;
3030

31+
/// Index trie containing async stream subscriptions
3132
#[derive(Default)]
32-
pub enum IndexTrie {
33+
enum IndexTrie {
3334
#[default]
3435
Empty,
3536
Leaf {
@@ -123,6 +124,7 @@ impl<P: AsRef<[Option<usize>]>> FromIterator<P> for IndexTrie {
123124
}
124125

125126
impl IndexTrie {
127+
/// Takes the receiver
126128
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
127129
fn take_rx(&mut self, path: &[usize]) -> Option<mpsc::Receiver<std::io::Result<Bytes>>> {
128130
let Some((i, path)) = path.split_first() else {
@@ -157,6 +159,7 @@ impl IndexTrie {
157159
}
158160
}
159161

162+
/// Gets a sender
160163
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
161164
fn get_tx(&mut self, path: &[usize]) -> Option<mpsc::Sender<std::io::Result<Bytes>>> {
162165
let Some((i, path)) = path.split_first() else {
@@ -269,6 +272,7 @@ impl IndexTrie {
269272
}
270273

271274
pin_project! {
275+
/// Incoming framed stream
272276
#[project = IncomingProj]
273277
pub struct Incoming {
274278
#[pin]
@@ -332,6 +336,7 @@ impl AsyncRead for Incoming {
332336
}
333337

334338
pin_project! {
339+
/// Outgoing framed stream
335340
#[project = OutgoingProj]
336341
pub struct Outgoing {
337342
#[pin]

crates/transport/src/frame/conn/server.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::frame::conn::{egress, ingress, Accept};
1919
use crate::frame::{Incoming, Outgoing};
2020
use crate::Serve;
2121

22+
/// wRPC server for framed transports
2223
pub struct Server<C, I, O>(Mutex<HashMap<String, HashMap<String, mpsc::Sender<(C, I, O)>>>>);
2324

2425
impl<C, I, O> Default for Server<C, I, O> {
@@ -27,10 +28,20 @@ impl<C, I, O> Default for Server<C, I, O> {
2728
}
2829
}
2930

31+
/// Error returned by [`Server::accept`]
3032
pub enum AcceptError<C, I, O> {
33+
/// I/O error
3134
IO(std::io::Error),
35+
/// Protocol version is not supported
3236
UnsupportedVersion(u8),
33-
UnhandledFunction { instance: String, name: String },
37+
/// Function was not handled
38+
UnhandledFunction {
39+
/// Instance
40+
instance: String,
41+
/// Function name
42+
name: String,
43+
},
44+
/// Message sending failed
3445
Send(mpsc::error::SendError<(C, I, O)>),
3546
}
3647

crates/transport/src/frame/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! wRPC transport stream framing
2+
13
use std::sync::Arc;
24

35
use bytes::Bytes;
@@ -19,14 +21,18 @@ pub const PROTOCOL: u8 = 0;
1921
/// Owned wRPC frame
2022
#[derive(Clone, Debug, Eq, PartialEq)]
2123
pub struct Frame {
24+
/// Frame path
2225
pub path: Arc<[usize]>,
26+
/// Frame data
2327
pub data: Bytes,
2428
}
2529

2630
/// wRPC frame reference
2731
#[derive(Clone, Debug, Eq, PartialEq)]
2832
pub struct FrameRef<'a> {
33+
/// Frame path
2934
pub path: &'a [usize],
35+
/// Frame data
3036
pub data: &'a [u8],
3137
}
3238

crates/transport/src/frame/tcp.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! TCP transport
2+
13
use core::net::SocketAddr;
24

35
use anyhow::{bail, Context as _};
@@ -9,8 +11,13 @@ use tracing::instrument;
911
use crate::frame::{invoke, Accept, Incoming, Outgoing};
1012
use crate::Invoke;
1113

14+
/// [Invoke] implementation in terms of a single [TcpStream]
15+
///
16+
/// [`Invoke::invoke`] can only be called once on [Invocation],
17+
/// repeated calls with return an error
1218
pub struct Invocation(std::sync::Mutex<Option<TcpStream>>);
1319

20+
/// [Invoke] implementation of a TCP transport
1421
#[derive(Clone, Debug)]
1522
pub struct Client<T>(T);
1623

crates/transport/src/frame/unix.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! Unix domain socket transport
2+
13
use std::path::{Path, PathBuf};
24

35
use anyhow::{bail, Context as _};
@@ -9,8 +11,13 @@ use tracing::instrument;
911
use crate::frame::{invoke, Accept, Incoming, Outgoing};
1012
use crate::Invoke;
1113

14+
/// [Invoke] implementation in terms of a single [UnixStream]
15+
///
16+
/// [`Invoke::invoke`] can only be called once on [Invocation],
17+
/// repeated calls with return an error
1218
pub struct Invocation(std::sync::Mutex<Option<UnixStream>>);
1319

20+
/// [Invoke] implementation of a Unix domain socket transport
1421
#[derive(Clone, Debug)]
1522
pub struct Client<T>(T);
1623

crates/transport/src/invoke.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! wRPC transport client handle
2+
13
use core::future::Future;
24
use core::mem;
35
use core::pin::pin;
@@ -76,9 +78,12 @@ pub trait Invoke: Send + Sync {
7678
P: AsRef<[Option<usize>]> + Send + Sync;
7779
}
7880

81+
/// Wrapper struct returned by [InvokeExt::timeout]
7982
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
8083
pub struct Timeout<'a, T: ?Sized> {
84+
/// Inner [Invoke]
8185
pub inner: &'a T,
86+
/// Invocation timeout
8287
pub timeout: Duration,
8388
}
8489

@@ -108,9 +113,12 @@ impl<T: Invoke> Invoke for Timeout<'_, T> {
108113
}
109114
}
110115

116+
/// Wrapper struct returned by [InvokeExt::timeout_owned]
111117
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
112118
pub struct TimeoutOwned<T> {
119+
/// Inner [Invoke]
113120
pub inner: T,
121+
/// Invocation timeout
114122
pub timeout: Duration,
115123
}
116124

@@ -138,6 +146,7 @@ impl<T: Invoke> Invoke for TimeoutOwned<T> {
138146
}
139147
}
140148

149+
/// Extension trait for [Invoke]
141150
pub trait InvokeExt: Invoke {
142151
/// Invoke function `func` on instance `instance` using typed `Params` and `Results`
143152
#[instrument(level = "trace", skip(self, cx, params, paths))]

0 commit comments

Comments
 (0)