Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 35 additions & 56 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ wrpc-cli = { workspace = true }
[workspace.dependencies]
anyhow = { version = "1", default-features = false }
async-nats = { package = "async-nats-wrpc", version = "0.35.1", default-features = false }
async-stream = { version = "0.3", default-features = false }
bitflags = { version = "2", default-features = false }
bytes = { version = "1", default-features = false }
clap = { version = "4", default-features = false }
Expand Down
4 changes: 0 additions & 4 deletions examples/rust/echo-stream-nats-client/wit/deps.lock

This file was deleted.

1 change: 0 additions & 1 deletion examples/rust/echo-stream-nats-client/wit/deps.toml

This file was deleted.

This file was deleted.

5 changes: 0 additions & 5 deletions examples/rust/echo-stream-nats-client/wit/world.wit

This file was deleted.

4 changes: 0 additions & 4 deletions examples/rust/echo-stream-nats-server/wit/deps.lock

This file was deleted.

1 change: 0 additions & 1 deletion examples/rust/echo-stream-nats-server/wit/deps.toml

This file was deleted.

This file was deleted.

5 changes: 0 additions & 5 deletions examples/rust/echo-stream-nats-server/wit/world.wit

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "echo-stream-nats-client"
name = "streams-nats-client"
version = "0.1.0"

authors.workspace = true
Expand All @@ -11,7 +11,7 @@ repository.workspace = true
[dependencies]
anyhow = { workspace = true }
async-nats = { workspace = true }
async-stream = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
Expand All @@ -23,6 +23,7 @@ clap = { workspace = true, features = [
] }
futures = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use core::time::Duration;

use anyhow::Context as _;
use async_stream::stream;
use bytes::Bytes;
use clap::Parser;
use futures::StreamExt as _;
use futures::{stream, StreamExt as _};
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio::{time, try_join};
use tokio_stream::wrappers::IntervalStream;
use tracing::debug;
use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::util::SubscriberInitExt as _;
use url::Url;

mod bindings {
wit_bindgen_wrpc::generate!({
with: {
"wrpc-examples:echo-stream/handler": generate
"wrpc-examples:streams/handler": generate
}
});
}

use bindings::wrpc_examples::echo_stream::handler::{echo, Req};
use bindings::wrpc_examples::streams::handler::{echo, Req};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand All @@ -27,7 +29,7 @@ struct Args {
#[arg(short, long, default_value = "nats://127.0.0.1:4222")]
nats: Url,

/// Prefixes to invoke `wrpc-examples:echo-stream/handler.echo` on
/// Prefixes to invoke `wrpc-examples:streams/handler.echo` on
#[arg(default_value = "rust")]
prefixes: Vec<String>,
}
Expand All @@ -48,34 +50,49 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed to connect to NATS.io")?;
for prefix in prefixes {
let input_stream = Box::pin(stream! {
for i in 1..=10 {
yield vec![i];
sleep(Duration::from_secs(1)).await;
}
});
let numbers = Box::pin(
stream::iter(1..)
.take(10)
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| i)
.ready_chunks(10),
);

// `stream<u8>` items are chunked using [`Bytes`]
let bytes = Box::pin(
stream::iter(b"foo bar baz")
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| *i)
.ready_chunks(10)
.map(Bytes::from),
);

let wrpc = wrpc_transport_nats::Client::new(nats.clone(), prefix.clone(), None);
let (mut output_stream, res) = echo(
&wrpc,
None,
Req {
input: input_stream,
let (mut numbers, mut bytes, io) = echo(&wrpc, None, Req { numbers, bytes })
.await
.context("failed to invoke `wrpc-examples:streams/handler.echo`")?;
try_join!(
async {
if let Some(io) = io {
debug!("performing async I/O");
io.await.context("failed to complete async I/O")
} else {
Ok(())
}
},
)
.await
.context("failed to invoke `wrpc-examples.hello/handler.hello`")?;
let task = tokio::spawn(async move {
match res {
Some(fut) => Some(fut.await),
None => None,
async {
while let Some(item) = numbers.next().await {
eprintln!("numbers: {item:?}");
}
Ok(())
},
async {
while let Some(item) = bytes.next().await {
eprintln!("bytes: {item:?}");
}
Ok(())
}
});
while let Some(item) = output_stream.next().await {
eprintln!("got {item:?}");
}
if let Some(res) = task.await? {
res?;
}
)?;
}
Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions examples/rust/streams-nats-client/wit/deps.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[streams]
path = "../../../wit/streams"
sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f"
sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9"
1 change: 1 addition & 0 deletions examples/rust/streams-nats-client/wit/deps.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
streams = "../../../wit/streams"
17 changes: 17 additions & 0 deletions examples/rust/streams-nats-client/wit/deps/streams/streams.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package wrpc-examples:streams;

interface handler {
record req {
numbers: stream<u64>,
bytes: stream<u8>,
}
echo: func(r: req) -> (numbers: stream<u64>, bytes: stream<u8>);
}

world client {
import handler;
}

world server {
export handler;
}
5 changes: 5 additions & 0 deletions examples/rust/streams-nats-client/wit/world.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package wrpc-examples:streams-rust-client;

world client {
include wrpc-examples:streams/client;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "echo-stream-nats-server"
name = "streams-nats-server"
version = "0.1.0"

authors.workspace = true
Expand All @@ -11,7 +11,7 @@ repository.workspace = true
[dependencies]
anyhow = { workspace = true }
async-nats = { workspace = true }
async-stream = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
Expand Down
Loading
Loading