Skip to content

Commit 4d12ff7

Browse files
committed
chore(examples): replace async-stream usage by explicit streams
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent cffa853 commit 4d12ff7

File tree

5 files changed

+23
-43
lines changed

5 files changed

+23
-43
lines changed

Cargo.lock

Lines changed: 1 addition & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ wrpc-cli = { workspace = true }
9696
[workspace.dependencies]
9797
anyhow = { version = "1", default-features = false }
9898
async-nats = { package = "async-nats-wrpc", version = "0.35.1", default-features = false }
99-
async-stream = { version = "0.3", default-features = false }
10099
bitflags = { version = "2", default-features = false }
101100
bytes = { version = "1", default-features = false }
102101
clap = { version = "4", default-features = false }

examples/rust/streams-nats-client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ repository.workspace = true
1111
[dependencies]
1212
anyhow = { workspace = true }
1313
async-nats = { workspace = true }
14-
async-stream = { workspace = true }
1514
bytes = { workspace = true }
1615
clap = { workspace = true, features = [
1716
"color",
@@ -24,6 +23,7 @@ clap = { workspace = true, features = [
2423
] }
2524
futures = { workspace = true }
2625
tokio = { workspace = true, features = ["rt-multi-thread"] }
26+
tokio-stream = { workspace = true, features = ["time"] }
2727
tracing = { workspace = true }
2828
tracing-subscriber = { workspace = true, features = [
2929
"ansi",

examples/rust/streams-nats-client/src/main.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use core::time::Duration;
22

33
use anyhow::Context as _;
4-
use async_stream::stream;
54
use bytes::Bytes;
65
use clap::Parser;
7-
use futures::StreamExt as _;
8-
use tokio::time::sleep;
9-
use tokio::{sync::mpsc, try_join};
6+
use futures::{stream, StreamExt as _};
7+
use tokio::sync::mpsc;
8+
use tokio::{time, try_join};
9+
use tokio_stream::wrappers::IntervalStream;
1010
use tracing::debug;
1111
use tracing_subscriber::layer::SubscriberExt as _;
1212
use tracing_subscriber::util::SubscriberInitExt as _;
@@ -50,18 +50,23 @@ async fn main() -> anyhow::Result<()> {
5050
.await
5151
.context("failed to connect to NATS.io")?;
5252
for prefix in prefixes {
53-
let numbers = Box::pin(stream! {
54-
for i in 1..=10 {
55-
yield vec![i];
56-
sleep(Duration::from_secs(1)).await;
57-
}
58-
});
59-
let bytes = Box::pin(stream! {
60-
for i in 1..=10 {
61-
yield Bytes::from(i.to_string());
62-
sleep(Duration::from_secs(1)).await;
63-
}
64-
});
53+
let numbers = Box::pin(
54+
stream::iter(1..)
55+
.take(10)
56+
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
57+
.map(|(i, _)| i)
58+
.ready_chunks(10),
59+
);
60+
61+
// `stream<u8>` items are chunked using [`Bytes`]
62+
let bytes = Box::pin(
63+
stream::iter(b"foo bar baz")
64+
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
65+
.map(|(i, _)| *i)
66+
.ready_chunks(10)
67+
.map(Bytes::from),
68+
);
69+
6570
let wrpc = wrpc_transport_nats::Client::new(nats.clone(), prefix.clone(), None);
6671
let (mut numbers, mut bytes, io) = echo(&wrpc, None, Req { numbers, bytes })
6772
.await

examples/rust/streams-nats-server/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ repository.workspace = true
1111
[dependencies]
1212
anyhow = { workspace = true }
1313
async-nats = { workspace = true }
14-
async-stream = { workspace = true }
1514
bytes = { workspace = true }
1615
clap = { workspace = true, features = [
1716
"color",

0 commit comments

Comments
 (0)