Skip to content

Commit b02586a

Browse files
committed
feat(wit-bindgen-rust): add async support
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent dddfacf commit b02586a

File tree

14 files changed

+753
-311
lines changed

14 files changed

+753
-311
lines changed

crates/introspect/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,14 @@ pub fn async_paths_tyid(resolve: &Resolve, id: TypeId) -> (BTreeSet<VecDeque<Opt
163163
TypeDefKind::Future(ty) => {
164164
let mut paths = BTreeSet::default();
165165
if let Some(ty) = ty {
166-
let (nested, _) = async_paths_ty(resolve, ty);
167-
for path in nested {
166+
let (nested, fut) = async_paths_ty(resolve, ty);
167+
for mut path in nested {
168+
path.push_front(Some(0));
168169
paths.insert(path);
169170
}
171+
if fut {
172+
paths.insert(vec![Some(0)].into());
173+
}
170174
}
171175
(paths, true)
172176
}

crates/transport-nats/src/lib.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use futures::{Stream, StreamExt};
1818
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1919
use tokio::sync::oneshot;
2020
use tokio::try_join;
21-
use tracing::{instrument, trace, warn};
21+
use tracing::{debug, instrument, trace, warn};
2222
use wrpc_transport::Index as _;
2323

2424
pub const PROTOCOL: &str = "wrpc.0.0.1";
@@ -314,11 +314,15 @@ pub struct Reader {
314314
impl wrpc_transport::Index<Self> for Reader {
315315
#[instrument(level = "trace", skip(self))]
316316
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
317+
trace!("locking index tree");
317318
let mut nested = self
318319
.nested
319320
.lock()
320321
.map_err(|err| anyhow!(err.to_string()).context("failed to lock map"))?;
321-
let incoming = nested.take(path).context("unknown subscription")?;
322+
trace!("taking index subscription");
323+
let incoming = nested
324+
.take(path)
325+
.with_context(|| format!("unknown subscription for path `{path:?}`"))?;
322326
Ok(Self {
323327
buffer: Bytes::default(),
324328
incoming,
@@ -391,7 +395,7 @@ impl SubjectWriter {
391395
impl wrpc_transport::Index<Self> for SubjectWriter {
392396
#[instrument(level = "trace", skip(self))]
393397
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
394-
let tx: Subject = index_path(self.tx.as_str(), path).into();
398+
let tx = Subject::from(index_path(self.tx.as_str(), path));
395399
let publisher = self.nats.publish_sink(tx.clone());
396400
Ok(Self {
397401
nats: Arc::clone(&self.nats),
@@ -402,7 +406,7 @@ impl wrpc_transport::Index<Self> for SubjectWriter {
402406
}
403407

404408
impl AsyncWrite for SubjectWriter {
405-
#[instrument(level = "trace", skip_all, ret, fields(subject = ?self.tx, buf = format!("{buf:02x?}")))]
409+
#[instrument(level = "trace", skip_all, ret, fields(subject = self.tx.as_str(), buf = format!("{buf:02x?}")))]
406410
fn poll_write(
407411
mut self: Pin<&mut Self>,
408412
cx: &mut Context<'_>,
@@ -436,15 +440,15 @@ impl AsyncWrite for SubjectWriter {
436440
}
437441
}
438442

439-
#[instrument(level = "trace", skip_all, ret, fields(subject = ?self.tx))]
443+
#[instrument(level = "trace", skip_all, ret, fields(subject = self.tx.as_str()))]
440444
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
441445
trace!("flushing");
442446
self.publisher
443447
.poll_flush_unpin(cx)
444448
.map_err(|_| std::io::ErrorKind::BrokenPipe.into())
445449
}
446450

447-
#[instrument(level = "trace", skip_all, ret, fields(subject = ?self.tx))]
451+
#[instrument(level = "trace", skip_all, ret, fields(subject = self.tx.as_str()))]
448452
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
449453
trace!("writing empty buffer to shut down stream");
450454
ready!(self.as_mut().poll_write(cx, &[]))?;
@@ -822,16 +826,17 @@ impl wrpc_transport::Invoke for Client {
822826
type Incoming = Reader;
823827

824828
#[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
825-
async fn invoke(
829+
async fn invoke<P: AsRef<[Option<usize>]> + Send + Sync>(
826830
&self,
827831
cx: Self::Context,
828832
instance: &str,
829833
func: &str,
830834
mut params: Bytes,
831-
paths: &[impl AsRef<[Option<usize>]> + Send + Sync],
835+
paths: impl AsRef<[P]> + Send,
832836
) -> anyhow::Result<(Self::Outgoing, Self::Incoming)> {
833837
let rx = Subject::from(self.nats.new_inbox());
834838
let result_rx = Subject::from(result_subject(&rx));
839+
let paths = paths.as_ref();
835840
let (result_rx, handshake_rx, nested) = try_join!(
836841
async {
837842
self.nats
@@ -930,15 +935,20 @@ async fn serve_connection(
930935
let tx = tx.context("peer did not specify a reply subject")?;
931936
let rx = nats.new_inbox();
932937
let param_rx = Subject::from(param_subject(&rx));
933-
trace!("subscribing on subjects");
934938
let (param_rx, nested) = try_join!(
935939
async {
940+
trace!(
941+
subject = param_rx.as_str(),
942+
"subscribing on parameter subject"
943+
);
936944
nats.subscribe(param_rx.clone())
937945
.await
938946
.context("failed to subscribe on parameter subject")
939947
},
940948
try_join_all(paths.iter().map(|path| async {
941-
nats.subscribe(Subject::from(subscribe_path(&param_rx, path.as_ref())))
949+
let subject = subscribe_path(&param_rx, path.as_ref());
950+
trace!(?subject, "subscribing on nested parameter subject");
951+
nats.subscribe(Subject::from(subject))
942952
.await
943953
.context("failed to subscribe on nested parameter subject")
944954
}))
@@ -974,20 +984,22 @@ impl wrpc_transport::Serve for Client {
974984
type Incoming = Reader;
975985

976986
#[instrument(level = "trace", skip(self, paths))]
977-
async fn serve<P: AsRef<[Option<usize>]> + Send + Sync + 'static>(
987+
async fn serve(
978988
&self,
979989
instance: &str,
980990
func: &str,
981-
paths: impl Into<Arc<[P]>> + Send + Sync + 'static,
991+
paths: impl Into<Arc<[Box<[Option<usize>]>]>> + Send,
982992
) -> anyhow::Result<
983993
impl Stream<Item = anyhow::Result<(Self::Context, Self::Outgoing, Self::Incoming)>> + 'static,
984994
> {
985995
let subject = invocation_subject(&self.prefix, instance, func);
986996
let sub = if let Some(group) = &self.queue_group {
997+
debug!(subject, ?group, "queue-subscribing on invocation subject");
987998
self.nats
988999
.queue_subscribe(subject, group.to_string())
9891000
.await?
9901001
} else {
1002+
debug!(subject, "subscribing on invocation subject");
9911003
self.nats.subscribe(subject).await?
9921004
};
9931005
let paths = paths.into();

crates/transport-quic/src/lib.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ impl wrpc_transport::Index<Self> for Incoming {
404404
let mut lock = index.lock().map_err(|err| {
405405
std::io::Error::new(std::io::ErrorKind::Other, err.to_string())
406406
})?;
407+
trace!("taking index subscription");
407408
let rx = lock.take_rx(&path).ok_or_else(|| {
408409
std::io::Error::new(
409410
std::io::ErrorKind::NotFound,
@@ -674,14 +675,17 @@ impl wrpc_transport::Invoke for Client {
674675
type Incoming = Incoming;
675676

676677
#[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))]
677-
async fn invoke(
678+
async fn invoke<P>(
678679
&self,
679680
cx: Self::Context,
680681
instance: &str,
681682
func: &str,
682683
params: Bytes,
683-
paths: &[impl AsRef<[Option<usize>]> + Send + Sync],
684-
) -> anyhow::Result<(Self::Outgoing, Self::Incoming)> {
684+
paths: impl AsRef<[P]> + Send,
685+
) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
686+
where
687+
P: AsRef<[Option<usize>]> + Send + Sync,
688+
{
685689
let san = san(instance, func);
686690
trace!(?san, "establishing connection");
687691
let conn = self
@@ -695,7 +699,7 @@ impl wrpc_transport::Invoke for Client {
695699
.open_bi()
696700
.await
697701
.context("failed to open parameter stream")?;
698-
let index = Arc::new(std::sync::Mutex::new(paths.iter().collect()));
702+
let index = Arc::new(std::sync::Mutex::new(paths.as_ref().iter().collect()));
699703
let mut io = JoinSet::new();
700704
io.spawn(demux_connection(Arc::clone(&index), conn.clone()).in_current_span());
701705
let io = Arc::new(io);
@@ -765,11 +769,11 @@ impl wrpc_transport::Serve for Server {
765769
type Incoming = Incoming;
766770

767771
#[instrument(level = "trace", skip(self, paths))]
768-
async fn serve<P: AsRef<[Option<usize>]> + Send + Sync + 'static>(
772+
async fn serve(
769773
&self,
770774
instance: &str,
771775
func: &str,
772-
paths: impl Into<Arc<[P]>> + Send + Sync + 'static,
776+
paths: impl Into<Arc<[Box<[Option<usize>]>]>> + Send,
773777
) -> anyhow::Result<
774778
impl Stream<Item = anyhow::Result<(Self::Context, Self::Outgoing, Self::Incoming)>> + 'static,
775779
> {

crates/transport-quic/tests/loopback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async fn loopback() -> anyhow::Result<()> {
6666
let clt = Client::new(clt_ep, (Ipv4Addr::LOCALHOST, srv_addr.port()));
6767
let srv = Server::default();
6868
let invocations = srv
69-
.serve("foo", "bar", [[Some(42), Some(0)]])
69+
.serve("foo", "bar", [Box::from([Some(42), Some(0)])])
7070
.await
7171
.context("failed to serve `foo.bar`")?;
7272
let mut invocations = pin!(invocations);

crates/transport/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,19 @@ license.workspace = true
1010
repository.workspace = true
1111

1212
[features]
13-
default = ["frame"]
13+
default = ["frame", "fs", "net", "io-std"]
1414
frame = []
15+
fs = ["tokio/fs"]
16+
net = ["tokio/net"]
17+
io-std = ["tokio/io-std"]
1518

1619
[dependencies]
1720
anyhow = { workspace = true, features = ["std"] }
1821
bytes = { workspace = true }
1922
futures = { workspace = true, features = ["std"] }
2023
tokio = { workspace = true }
2124
tokio-stream = { workspace = true }
22-
tokio-util = { workspace = true, features = ["codec"] }
25+
tokio-util = { workspace = true, features = ["codec", "io"] }
2326
tracing = { workspace = true, features = ["attributes"] }
2427
wasm-tokio = { workspace = true, features = ["tracing"] }
2528

0 commit comments

Comments
 (0)