Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@

import java.net.URI;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
Expand All @@ -34,7 +36,6 @@
import io.smallrye.mutiny.Multi;
import io.vertx.core.Vertx;
import io.vertx.ext.web.RoutingContext;
import wiremock.org.apache.hc.client5.http.utils.Base64;

public class MultiNdjsonTest {
@RegisterExtension
Expand Down Expand Up @@ -110,6 +111,34 @@ void shouldReadNdjsonFromSingleMessage() throws InterruptedException {
assertThat(collected).hasSize(4).containsAll(expected);
}

@Test
void shouldReadNdjsonFromSingleMessageWithNoDelimiter() throws InterruptedException {
var client = createClient(uri);
var collected = new CopyOnWriteArrayList<Message>();
var completionLatch = new CountDownLatch(1);
client.readSingleMessageNoDelimiter().onCompletion().invoke(completionLatch::countDown)
.subscribe().with(collected::add);

if (!completionLatch.await(5, TimeUnit.SECONDS)) {
fail("Streaming did not complete in time");
}
assertThat(collected).singleElement().satisfies(m -> assertThat(m).isEqualTo(Message.of("foo", "bar")));
}

@Test
void shouldReadNdjsonFromMultipleMessagesWithNoEndingDelimiter() throws InterruptedException {
var client = createClient(uri);
var collected = new CopyOnWriteArrayList<Message>();
var completionLatch = new CountDownLatch(1);
client.readMultipleMessagesNoEndingDelimiter().onCompletion().invoke(completionLatch::countDown)
.subscribe().with(collected::add);

if (!completionLatch.await(5, TimeUnit.SECONDS)) {
fail("Streaming did not complete in time");
}
assertThat(collected).hasSize(100);
}

@Test
void shouldReadLargeNdjsonPojoAsMulti() throws InterruptedException {
var client = createClient(uri);
Expand Down Expand Up @@ -151,6 +180,18 @@ public interface Client {
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Message> readPojoSingle();

@GET
@Path("/single-message-no-delimiter")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Message> readSingleMessageNoDelimiter();

@GET
@Path("multiple-messages-no-ending-delimiter")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Message> readMultipleMessagesNoEndingDelimiter();

@GET
@Path("/large-pojo")
@Produces(RestMediaType.APPLICATION_NDJSON)
Expand All @@ -172,8 +213,14 @@ Multi<Message> people(RoutingContext context) {

@Path("/stream")
public static class StreamingResource {
@Inject
Vertx vertx;
private final ObjectMapper mapper = new ObjectMapper();
private final ObjectWriter messageWriter = mapper.writerFor(Message.class);

private final Vertx vertx;

public StreamingResource(Vertx vertx) {
this.vertx = vertx;
}

@GET
@Path("/string")
Expand Down Expand Up @@ -212,19 +259,41 @@ public Multi<Message> readPojo() {
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public String getPojosAsString() throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
StringBuilder result = new StringBuilder();
ObjectWriter objectWriter = mapper.writerFor(Message.class);
for (var msg : List.of(Message.of("zero", "0"),
Message.of("one", "1"),
Message.of("two", "2"),
Message.of("three", "3"))) {
result.append(objectWriter.writeValueAsString(msg));
result.append(messageWriter.writeValueAsString(msg));
result.append("\n");
}
return result.toString();
}

@GET
@Path("/single-message-no-delimiter")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public String singleMessageNoDelimiter() throws JsonProcessingException {
return messageWriter.writeValueAsString(Message.of("foo", "bar"));
}

@GET
@Path("/multiple-messages-no-ending-delimiter")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public String multipleMessagesNoEndingDelimiter() throws JsonProcessingException {
return IntStream.range(0, 100)
.mapToObj(i -> Message.of("foo", "bar"))
.map(m -> {
try {
return messageWriter.writeValueAsString(m);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.joining("\n"));
}

@GET
@Path("/large-pojo")
@Produces(RestMediaType.APPLICATION_NDJSON)
Expand All @@ -235,7 +304,7 @@ public Multi<Message> readLargePojo() {
byte[] bytes = new byte[4 * 1024];
Random random = new Random();
random.nextBytes(bytes);
String value = Base64.encodeBase64String(bytes);
String value = Base64.getEncoder().encodeToString(bytes);
em.emit(Message.of("one", value));
em.emit(Message.of("two", value));
em.emit(Message.of("three", value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.parsetools.impl.RecordParserImpl;

public class MultiInvoker extends AbstractRxInvoker<Multi<?>> {

Expand Down Expand Up @@ -323,21 +326,31 @@ private <R> void registerForJsonStream(MultiRequest<? super R> multiRequest,
GenericType<R> responseType,
ResponseImpl response,
HttpClientResponse vertxClientResponse) {
RecordParser parser = RecordParser.newDelimited("\n");
parser.handler(new Handler<Buffer>() {
Buffer delimiter = RecordParserImpl.latin1StringToBytes("\n");
RecordParser parser = RecordParser.newDelimited(delimiter);
AtomicReference<Promise> finalDelimiterHandled = new AtomicReference<>();
parser.handler(new Handler<>() {
@Override
public void handle(Buffer chunk) {

ByteArrayInputStream in = new ByteArrayInputStream(chunk.getBytes());
try {
R item = restClientRequestContext.readEntity(in,
responseType,
response.getMediaType(),
restClientRequestContext.getMethodDeclaredAnnotationsSafe(),
response.getMetadata());
multiRequest.emit(item);
if (chunk.length() > 0) {
R item = restClientRequestContext.readEntity(in,
responseType,
response.getMediaType(),
restClientRequestContext.getMethodDeclaredAnnotationsSafe(),
response.getMetadata());
multiRequest.emit(item);
}
} catch (IOException e) {
multiRequest.fail(e);
} finally {
if (finalDelimiterHandled.get() != null) {
// in this case we know that we have handled the last event, so we need to
// signal completion so the Multi can be closed
finalDelimiterHandled.get().complete();
}
}
}
});
Expand All @@ -348,10 +361,25 @@ public void handle(Buffer chunk) {
multiRequest.fail(t);
}
});
vertxClientResponse.endHandler(new Handler<Void>() {
vertxClientResponse.endHandler(new Handler<>() {
@Override
public void handle(Void c) {
multiRequest.complete();
// Before closing the Multi, we need to make sure that the parser has emitted the last event.
// Recall that the parser is delimited, which means that won't emit an event until the delimiter is reached
// To force the parser to emit the last event we push a delimiter value and when we are sure that the Multi
// has pushed it down the pipeline, only then do we close it
Promise<Object> promise = Promise.promise();
promise.future().onComplete(new Handler<>() {
@Override
public void handle(AsyncResult<Object> event) {
multiRequest.complete();
}
});
finalDelimiterHandled.set(promise);

// this needs to happen after the promise has been set up, otherwise, the parser's handler could complete
// before the finalDelimiterHandled has been populated
parser.handle(delimiter);
}
});

Expand Down
Loading