|
| 1 | +use bytes::{Bytes, Buf}; |
1 | 2 | use camino::{Utf8Path, Utf8PathBuf};
|
2 | 3 | use thiserror::Error;
|
3 |
| -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; |
| 4 | +use tokio::io::{AsyncRead, AsyncWrite}; |
4 | 5 | #[cfg(unix)]
|
5 | 6 | use tokio_serial::SerialPort;
|
6 | 7 | use tokio_serial::SerialPortBuilderExt;
|
7 | 8 | use tokio_serial::SerialStream;
|
8 | 9 | use tokio_stream::{StreamExt, StreamMap};
|
9 | 10 | use tokio_util::io::ReaderStream;
|
10 |
| -use tracing::{error, info}; |
| 11 | +use tracing::{error, warn, info}; |
11 | 12 |
|
12 | 13 | use std::collections::HashMap;
|
13 | 14 | use std::fs;
|
| 15 | +use std::io::ErrorKind; |
| 16 | +use std::pin::Pin; |
| 17 | +use std::task::Poll::Ready; |
14 | 18 |
|
15 | 19 | #[cfg(unix)]
|
16 | 20 | use std::os::unix;
|
@@ -108,15 +112,41 @@ where
|
108 | 112 | let bytes = result.map_err(Error::Read)?;
|
109 | 113 | info!(?src_id, ?dst_ids, ?bytes, "read");
|
110 | 114 | for dst_id in dst_ids {
|
111 |
| - // This unwrap is OK as long as we validate all route IDs exist first |
112 |
| - // Route IDs are validated in Args::check_route_ids() |
113 |
| - let dst = sinks.get_mut(dst_id).unwrap(); |
114 |
| - let mut buf = bytes.clone(); |
115 |
| - dst.write_all_buf(&mut buf).await.map_err(Error::Write)?; |
116 |
| - info!(?dst_id, ?bytes, "wrote"); |
| 115 | + if let Some(dst) = sinks.get_mut(dst_id) { |
| 116 | + let mut buf = bytes.clone(); |
| 117 | + if let Err(e) = write_non_blocking(dst, &mut buf).await { |
| 118 | + if let Error::Write(io_err) = &e { |
| 119 | + if io_err.kind() == ErrorKind::WouldBlock { |
| 120 | + warn!(?dst_id, ?bytes, "discarded"); |
| 121 | + } else { |
| 122 | + error!(?dst_id, ?e, "write error"); |
| 123 | + } |
| 124 | + } |
| 125 | + } else { |
| 126 | + info!(?dst_id, ?bytes, "wrote"); |
| 127 | + } |
| 128 | + } |
117 | 129 | }
|
118 | 130 | }
|
119 | 131 | }
|
120 | 132 |
|
121 | 133 | Ok(())
|
122 | 134 | }
|
| 135 | + |
| 136 | +async fn write_non_blocking<W: AsyncWrite + Unpin>( |
| 137 | + dst: &mut W, |
| 138 | + buf: &mut Bytes, |
| 139 | +) -> Result<()> { |
| 140 | + let waker = futures::task::noop_waker(); |
| 141 | + let mut cx = futures::task::Context::from_waker(&waker); |
| 142 | + |
| 143 | + let pinned_dst = Pin::new(dst); |
| 144 | + match pinned_dst.poll_write(&mut cx, buf) { |
| 145 | + Ready(Ok(bytes_written)) => { |
| 146 | + buf.advance(bytes_written); |
| 147 | + Ok(()) |
| 148 | + } |
| 149 | + Ready(Err(e)) => Err(Error::Write(e)), |
| 150 | + _ => Err(Error::Write(std::io::Error::new(ErrorKind::WouldBlock, "Would block"))), |
| 151 | + } |
| 152 | +} |
0 commit comments