Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class VertxOutputStream extends OutputStream {
private final VertxJavaIoContext context;
private final HttpServerRequest request;
private final AppendBuffer appendBuffer;
private final HttpServerResponse response;

private boolean committed;
private boolean closed;
Expand All @@ -44,7 +45,8 @@ public VertxOutputStream(VertxJavaIoContext context) {
this.appendBuffer = AppendBuffer.withMinChunks(
context.getMinChunkSize(),
context.getOutputBufferCapacity());
request.response().exceptionHandler(new Handler<Throwable>() {
response = request.response();
response.exceptionHandler(new Handler<>() {
@Override
public void handle(Throwable event) {
throwable = event;
Expand All @@ -58,10 +60,10 @@ public void handle(Throwable event) {
}
});
Handler<Void> handler = new DrainHandler(this);
request.response().drainHandler(handler);
request.response().closeHandler(handler);
response.drainHandler(handler);
response.closeHandler(handler);

context.getRoutingContext().addEndHandler(new Handler<AsyncResult<Void>>() {
context.getRoutingContext().addEndHandler(new Handler<>() {
@Override
public void handle(AsyncResult<Void> event) {
synchronized (request.connection()) {
Expand All @@ -79,7 +81,7 @@ private Buffer createBuffer(ByteBuf data) {

private void write(ByteBuf data, boolean last) throws IOException {
if (last && data == null) {
request.response().end((Handler<AsyncResult<Void>>) null);
response.end((Handler<AsyncResult<Void>>) null);
return;
}
//do all this in the same lock
Expand All @@ -102,11 +104,11 @@ private void write(ByteBuf data, boolean last) throws IOException {
data.release();
} else {
if (last) {
if (!request.response().ended()) { // can happen when an exception occurs during JSON serialization with Jackson
request.response().end(createBuffer(data), null);
if (!response.ended()) { // can happen when an exception occurs during JSON serialization with Jackson
response.end(createBuffer(data), null);
}
} else {
request.response().write(createBuffer(data), null);
response.write(createBuffer(data), null);
}
}
} catch (Exception e) {
Expand All @@ -127,11 +129,11 @@ private boolean awaitWriteable() throws IOException {
return false;
}
assert Thread.holdsLock(request.connection());
while (request.response().writeQueueFull()) {
while (response.writeQueueFull()) {
if (throwable != null) {
throw new IOException(throwable);
}
if (request.response().closed()) {
if (response.closed()) {
throw new IOException("Connection has been closed");
}
// registerDrainHandler();
Expand Down Expand Up @@ -196,7 +198,6 @@ private void writeBlocking(ByteBuf buffer, boolean finished) throws IOException
private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException {
if (!committed) {
committed = true;
final HttpServerResponse response = request.response();
if (finished) {
if (!response.headWritten()) {
if (buffer == null) {
Expand Down Expand Up @@ -249,30 +250,25 @@ public void close() throws IOException {
}
}

private static class DrainHandler implements Handler<Void> {
private final VertxOutputStream out;

public DrainHandler(VertxOutputStream out) {
this.out = out;
}
private record DrainHandler(VertxOutputStream out) implements Handler<Void> {

@Override
public void handle(Void event) {
synchronized (out.request.connection()) {
if (out.waitingForDrain) {
out.request.connection().notifyAll();
}
if (out.overflow != null) {
if (out.overflow.size() > 0) {
if (out.closed) {
out.request.response().end(Buffer.buffer(out.overflow.toByteArray()), null);
} else {
out.request.response().write(Buffer.buffer(out.overflow.toByteArray()), null);
public void handle(Void event) {
synchronized (out.request.connection()) {
if (out.waitingForDrain) {
out.request.connection().notifyAll();
}
if (out.overflow != null) {
if (out.overflow.size() > 0) {
if (out.closed) {
out.response.end(Buffer.buffer(out.overflow.toByteArray()), null);
} else {
out.response.write(Buffer.buffer(out.overflow.toByteArray()), null);
}
out.overflow.reset();
}
out.overflow.reset();
}
}
}
}
}
}
Loading