Skip to content

Commit 401805f

Browse files
committed
fix(transport): discard async data if receiver is closed
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent f129f83 commit 401805f

File tree

1 file changed

+27
-40
lines changed

1 file changed

+27
-40
lines changed

crates/transport/src/value.rs

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ use bytes::{Buf as _, BufMut as _, Bytes, BytesMut};
1212
use futures::stream::{self, FuturesUnordered};
1313
use futures::{Stream, StreamExt as _, TryStreamExt as _};
1414
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
15+
use tokio::select;
1516
use tokio::sync::{mpsc, oneshot};
1617
use tokio::task::JoinSet;
17-
use tokio::{select, try_join};
1818
use tokio_stream::wrappers::ReceiverStream;
1919
use tokio_util::codec::{Encoder as _, FramedRead};
2020
use tokio_util::io::StreamReader;
21-
use tracing::{error, instrument, trace, Instrument as _, Span};
21+
use tracing::{debug, error, instrument, trace, Instrument as _, Span};
2222
use wasm_tokio::cm::{
2323
BoolCodec, F32Codec, F64Codec, OptionDecoder, OptionEncoder, PrimValEncoder, ResultDecoder,
2424
ResultEncoder, S16Codec, S32Codec, S64Codec, S8Codec, TupleDecoder, TupleEncoder, U16Codec,
@@ -1617,30 +1617,20 @@ where
16171617
return Err(std::io::ErrorKind::UnexpectedEof.into());
16181618
};
16191619
let item = item?;
1620-
try_join!(
1621-
async {
1622-
tx.send(item).map_err(|_| {
1623-
std::io::Error::new(
1624-
std::io::ErrorKind::BrokenPipe,
1625-
"future receiver closed",
1626-
)
1627-
})
1628-
},
1629-
async {
1630-
if let Some(rx) = dec.decoder_mut().take_deferred() {
1631-
let buf = mem::take(dec.read_buffer_mut());
1632-
let mut r = dec.into_inner();
1633-
if !r.buffer.is_empty() {
1634-
r.buffer.unsplit(buf);
1635-
} else {
1636-
r.buffer = buf;
1637-
}
1638-
rx(r, Vec::default()).await
1639-
} else {
1640-
Ok(())
1641-
}
1620+
if tx.send(item).is_err() {
1621+
debug!("future receiver closed, discard data");
1622+
return Ok(());
1623+
}
1624+
if let Some(rx) = dec.decoder_mut().take_deferred() {
1625+
let buf = mem::take(dec.read_buffer_mut());
1626+
let mut r = dec.into_inner();
1627+
if !r.buffer.is_empty() {
1628+
r.buffer.unsplit(buf);
1629+
} else {
1630+
r.buffer = buf;
16421631
}
1643-
)?;
1632+
rx(r, Vec::default()).await?;
1633+
}
16441634
Ok(())
16451635
}
16461636
.instrument(span),
@@ -2042,9 +2032,10 @@ where
20422032
)
20432033
})?;
20442034
trace!(i, end, "received stream chunk");
2045-
tx.send(chunk).await.map_err(|_| {
2046-
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "stream receiver closed")
2047-
})?;
2035+
if tx.send(chunk).await.is_err() {
2036+
debug!("stream receiver closed, discard data");
2037+
return Ok(())
2038+
}
20482039
for (i, deferred) in zip(i.., mem::take(&mut framed.decoder_mut().deferred)) {
20492040
if let Some(deferred) = deferred {
20502041
let r = framed.get_ref().index(&[i]).map_err(std::io::Error::other)?;
@@ -2162,12 +2153,10 @@ where
21622153
return Ok(());
21632154
}
21642155
trace!(?chunk, "received pending byte stream chunk");
2165-
tx.send(chunk).await.map_err(|_| {
2166-
std::io::Error::new(
2167-
std::io::ErrorKind::BrokenPipe,
2168-
"stream receiver closed",
2169-
)
2170-
})?;
2156+
if tx.send(chunk).await.is_err() {
2157+
debug!("stream receiver closed, discard data");
2158+
return Ok(());
2159+
}
21712160
}
21722161
Ok(())
21732162
}
@@ -2240,12 +2229,10 @@ where
22402229
return Ok(());
22412230
}
22422231
trace!(?chunk, "received byte stream chunk");
2243-
tx.send(std::io::Result::Ok(chunk)).await.map_err(|_| {
2244-
std::io::Error::new(
2245-
std::io::ErrorKind::BrokenPipe,
2246-
"stream receiver closed",
2247-
)
2248-
})?;
2232+
if tx.send(std::io::Result::Ok(chunk)).await.is_err() {
2233+
debug!("stream receiver closed, discard data");
2234+
return Ok(());
2235+
}
22492236
}
22502237
Ok(())
22512238
})

0 commit comments

Comments
 (0)