|
22 | 22 | import java.io.PipedOutputStream; |
23 | 23 | import java.io.Reader; |
24 | 24 | import java.nio.charset.StandardCharsets; |
| 25 | +import java.time.Instant; |
| 26 | +import java.time.temporal.ChronoUnit; |
25 | 27 | import java.util.HashMap; |
26 | 28 | import java.util.Map; |
27 | 29 | import okhttp3.WebSocket; |
@@ -210,6 +212,13 @@ private synchronized OutputStream getSocketInputOutputStream(int stream) { |
210 | 212 | } |
211 | 213 |
|
212 | 214 | private class WebSocketOutputStream extends OutputStream { |
| 215 | + |
| 216 | + private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024; |
| 217 | + |
| 218 | + private static final int MAX_WAIT_MILLIS = 10000; |
| 219 | + |
| 220 | + private static final int WAIT_MILLIS = 10; |
| 221 | + |
213 | 222 | private final byte stream; |
214 | 223 |
|
215 | 224 | public WebSocketOutputStream(int stream) { |
@@ -266,10 +275,28 @@ public void write(byte[] b, int offset, int length) throws IOException { |
266 | 275 | int bufferSize = Math.min(remaining, 15 * 1024 * 1024); |
267 | 276 | byte[] buffer = new byte[bufferSize + 1]; |
268 | 277 | buffer[0] = stream; |
| 278 | + |
269 | 279 | System.arraycopy(b, offset + bytesWritten, buffer, 1, bufferSize); |
270 | | - if (!WebSocketStreamHandler.this.socket.send(ByteString.of(buffer))) { |
271 | | - throw new IOException("WebSocket has closed."); |
| 280 | + ByteString byteString = ByteString.of(buffer); |
| 281 | + |
| 282 | + final Instant start = Instant.now(); |
| 283 | + synchronized (WebSocketOutputStream.this) { |
| 284 | + while (WebSocketStreamHandler.this.socket.queueSize() + byteString.size() > MAX_QUEUE_SIZE |
| 285 | + && Instant.now().isBefore(start.plus(MAX_WAIT_MILLIS, ChronoUnit.MILLIS))) { |
| 286 | + try { |
| 287 | + wait(WAIT_MILLIS); |
| 288 | + } catch (InterruptedException e) { |
| 289 | + throw new IOException("Error waiting web socket queue", e); |
| 290 | + } |
| 291 | + } |
| 292 | + |
| 293 | + if (!WebSocketStreamHandler.this.socket.send(byteString)) { |
| 294 | + throw new IOException("WebSocket has closed."); |
| 295 | + } |
| 296 | + |
| 297 | + notifyAll(); |
272 | 298 | } |
| 299 | + |
273 | 300 | bytesWritten += bufferSize; |
274 | 301 | remaining -= bufferSize; |
275 | 302 | } |
|
0 commit comments