Skip to content

Commit 93ca405

Browse files
committed
feat(nats): demultiplex a single client subscriber
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent 59c8e4b commit 93ca405

File tree

14 files changed

+380
-205
lines changed

14 files changed

+380
-205
lines changed

Cargo.lock

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ criterion = { version = "0.5", default-features = false }
121121
futures = { version = "0.3", default-features = false }
122122
heck = { version = "0.5", default-features = false }
123123
humantime = { version = "2.1", default-features = false }
124+
nuid = { version = "0.5", default-features = false }
124125
pin-project-lite = { version = "0.2", default-features = false }
125126
prettyplease = { version = "0.2.25", default-features = false }
126127
proc-macro2 = { version = "1", default-features = false }
@@ -163,7 +164,7 @@ wrpc-introspect = { version = "0.5", default-features = false, path = "./crates/
163164
wrpc-runtime-wasmtime = { version = "0.24.1", path = "./crates/runtime-wasmtime", default-features = false }
164165
wrpc-test = { path = "./crates/test", default-features = false }
165166
wrpc-transport = { version = "0.28.2", path = "./crates/transport", default-features = false }
166-
wrpc-transport-nats = { version = "0.26", path = "./crates/transport-nats", default-features = false }
167+
wrpc-transport-nats = { version = "0.27", path = "./crates/transport-nats", default-features = false }
167168
wrpc-transport-quic = { version = "0.3", path = "./crates/transport-quic", default-features = false }
168169
wrpc-wasi-keyvalue = { version = "0.1", path = "./crates/wasi-keyvalue", default-features = false }
169170
wrpc-wasi-keyvalue-mem = { version = "0.1", path = "./crates/wasi-keyvalue-mem", default-features = false }

benches/bench.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ fn bench_wasm_ping_direct(
204204
handler
205205
.ping(None::<async_nats::HeaderMap>)
206206
.await
207-
.expect("failed to call handler")
208-
})
207+
.expect("failed to call handler");
208+
});
209209
});
210210
Ok(())
211211
}
@@ -225,7 +225,7 @@ fn bench_wasm_greet_direct(
225225
.await
226226
.expect("failed to call handler");
227227
assert_eq!(greeting, "Hello, test");
228-
})
228+
});
229229
});
230230
Ok(())
231231
}
@@ -272,7 +272,7 @@ where
272272
.expect("failed to send message");
273273
assert!(status.is_none());
274274
assert_eq!(payload, expect);
275-
})
275+
});
276276
});
277277
stop_tx.send(()).expect("failed to stop server");
278278
rt.block_on(async { srv.await.context("server task panicked")? })?;
@@ -435,7 +435,9 @@ fn bench_wasm_ping_nats_wrpc(
435435
.block_on(WasmHandler::new(wasm))
436436
.context("failed to construct a Wasm handler")?;
437437
with_nats(&rt, |nats| {
438-
let wrpc = wrpc_transport_nats::Client::new(nats, "", None);
438+
let wrpc = rt
439+
.block_on(wrpc_transport_nats::Client::new(nats, "", None))
440+
.context("failed to construct client")?;
439441

440442
let invocations = rt
441443
.block_on(ping_bindings_wrpc::serve(&wrpc, handler))
@@ -468,7 +470,7 @@ fn bench_wasm_ping_nats_wrpc(
468470
ping_bindings_wrpc::wrpc_bench::bench::ping::ping(&wrpc, None)
469471
.await
470472
.expect("failed to call `ping`");
471-
})
473+
});
472474
});
473475
stop_tx.send(()).expect("failed to stop server");
474476
rt.block_on(async { srv.await.context("server task panicked")? })?;
@@ -485,7 +487,9 @@ fn bench_wasm_greet_nats_wrpc(
485487
.block_on(WasmHandler::new(wasm))
486488
.context("failed to construct a Wasm handler")?;
487489
with_nats(&rt, |nats| {
488-
let wrpc = wrpc_transport_nats::Client::new(nats, "", None);
490+
let wrpc = rt
491+
.block_on(wrpc_transport_nats::Client::new(nats, "", None))
492+
.context("failed to construct client")?;
489493

490494
let invocations = rt
491495
.block_on(greet_bindings_wrpc::serve(&wrpc, handler))
@@ -518,7 +522,7 @@ fn bench_wasm_greet_nats_wrpc(
518522
greet_bindings_wrpc::wrpc_bench::bench::greet::greet(&wrpc, None, "test")
519523
.await
520524
.expect("failed to call `greet`");
521-
})
525+
});
522526
});
523527
stop_tx.send(()).expect("failed to stop server");
524528
rt.block_on(async { srv.await.context("server task panicked")? })?;
@@ -529,7 +533,9 @@ fn bench_wasm_greet_nats_wrpc(
529533
fn bench_nats_wrpc_ping(g: &mut BenchmarkGroup<impl Measurement>) -> anyhow::Result<()> {
530534
let rt = tokio::runtime::Runtime::new().context("failed to build Tokio runtime")?;
531535
with_nats(&rt, |nats| {
532-
let wrpc = wrpc_transport_nats::Client::new(nats, "", None);
536+
let wrpc = rt
537+
.block_on(wrpc_transport_nats::Client::new(nats, "", None))
538+
.context("failed to construct client")?;
533539

534540
let invocations = rt
535541
.block_on(ping_bindings_wrpc::serve(&wrpc, NativeHandler))
@@ -562,7 +568,7 @@ fn bench_nats_wrpc_ping(g: &mut BenchmarkGroup<impl Measurement>) -> anyhow::Res
562568
ping_bindings_wrpc::wrpc_bench::bench::ping::ping(&wrpc, None)
563569
.await
564570
.expect("failed to call `ping`");
565-
})
571+
});
566572
});
567573
stop_tx.send(()).expect("failed to stop server");
568574
rt.block_on(async { srv.await.context("server task panicked")? })?;
@@ -573,7 +579,9 @@ fn bench_nats_wrpc_ping(g: &mut BenchmarkGroup<impl Measurement>) -> anyhow::Res
573579
fn bench_nats_wrpc_greet(g: &mut BenchmarkGroup<impl Measurement>) -> anyhow::Result<()> {
574580
let rt = tokio::runtime::Runtime::new().context("failed to build Tokio runtime")?;
575581
with_nats(&rt, |nats| {
576-
let wrpc = wrpc_transport_nats::Client::new(nats, "", None);
582+
let wrpc = rt
583+
.block_on(wrpc_transport_nats::Client::new(nats, "", None))
584+
.context("failed to construct client")?;
577585

578586
let invocations = rt
579587
.block_on(greet_bindings_wrpc::serve(&wrpc, NativeHandler))
@@ -607,7 +615,7 @@ fn bench_nats_wrpc_greet(g: &mut BenchmarkGroup<impl Measurement>) -> anyhow::Re
607615
.await
608616
.expect("failed to call `greet`");
609617
assert_eq!(greeting, "Hello, test");
610-
})
618+
});
611619
});
612620
stop_tx.send(()).expect("failed to stop server");
613621
rt.block_on(async { srv.await.context("server task panicked")? })?;

crates/transport-nats/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport-nats"
3-
version = "0.26.0"
3+
version = "0.27.0"
44
description = "wRPC NATS transport"
55

66
authors.workspace = true
@@ -22,7 +22,9 @@ async-nats-0_36 = { package = "async-nats", version = "0.36", default-features =
2222
], optional = true }
2323
bytes = { workspace = true }
2424
futures = { workspace = true, features = ["async-await"] }
25+
nuid = { workspace = true }
2526
tokio = { workspace = true, features = ["io-util", "rt-multi-thread"] }
27+
tokio-stream = { workspace = true, features = ["sync"] }
2628
tokio-util = { workspace = true, features = ["codec", "io"] }
2729
tracing = { workspace = true, features = ["attributes"] }
2830
wasm-tokio = { workspace = true }

0 commit comments

Comments
 (0)