Skip to content

Commit e06b3b3

Browse files
committed
Fix case where the last streamed JSON object was not retrieved
This would happen when the JSON stream didn't end with a new line delimiter. Relates to: quarkiverse/quarkus-langchain4j#1594
1 parent fd74904 commit e06b3b3

File tree

2 files changed

+115
-18
lines changed
  • extensions/resteasy-reactive/rest-client/deployment/src/test/java/io/quarkus/rest/client/reactive
  • independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl

2 files changed

+115
-18
lines changed

extensions/resteasy-reactive/rest-client/deployment/src/test/java/io/quarkus/rest/client/reactive/MultiNdjsonTest.java

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@
55

66
import java.net.URI;
77
import java.util.Arrays;
8+
import java.util.Base64;
89
import java.util.List;
910
import java.util.Objects;
1011
import java.util.Random;
1112
import java.util.concurrent.CopyOnWriteArrayList;
1213
import java.util.concurrent.CountDownLatch;
1314
import java.util.concurrent.TimeUnit;
15+
import java.util.stream.Collectors;
16+
import java.util.stream.IntStream;
1417

15-
import jakarta.inject.Inject;
1618
import jakarta.ws.rs.GET;
1719
import jakarta.ws.rs.Path;
1820
import jakarta.ws.rs.Produces;
@@ -34,7 +36,6 @@
3436
import io.smallrye.mutiny.Multi;
3537
import io.vertx.core.Vertx;
3638
import io.vertx.ext.web.RoutingContext;
37-
import wiremock.org.apache.hc.client5.http.utils.Base64;
3839

3940
public class MultiNdjsonTest {
4041
@RegisterExtension
@@ -110,6 +111,34 @@ void shouldReadNdjsonFromSingleMessage() throws InterruptedException {
110111
assertThat(collected).hasSize(4).containsAll(expected);
111112
}
112113

114+
@Test
115+
void shouldReadNdjsonFromSingleMessageWithNoDelimiter() throws InterruptedException {
116+
var client = createClient(uri);
117+
var collected = new CopyOnWriteArrayList<Message>();
118+
var completionLatch = new CountDownLatch(1);
119+
client.readSingleMessageNoDelimiter().onCompletion().invoke(completionLatch::countDown)
120+
.subscribe().with(collected::add);
121+
122+
if (!completionLatch.await(5, TimeUnit.SECONDS)) {
123+
fail("Streaming did not complete in time");
124+
}
125+
assertThat(collected).singleElement().satisfies(m -> assertThat(m).isEqualTo(Message.of("foo", "bar")));
126+
}
127+
128+
@Test
129+
void shouldReadNdjsonFromMultipleMessagesWithNoEndingDelimiter() throws InterruptedException {
130+
var client = createClient(uri);
131+
var collected = new CopyOnWriteArrayList<Message>();
132+
var completionLatch = new CountDownLatch(1);
133+
client.readMultipleMessagesNoEndingDelimiter().onCompletion().invoke(completionLatch::countDown)
134+
.subscribe().with(collected::add);
135+
136+
if (!completionLatch.await(5, TimeUnit.SECONDS)) {
137+
fail("Streaming did not complete in time");
138+
}
139+
assertThat(collected).hasSize(100);
140+
}
141+
113142
@Test
114143
void shouldReadLargeNdjsonPojoAsMulti() throws InterruptedException {
115144
var client = createClient(uri);
@@ -151,6 +180,18 @@ public interface Client {
151180
@RestStreamElementType(MediaType.APPLICATION_JSON)
152181
Multi<Message> readPojoSingle();
153182

183+
@GET
184+
@Path("/single-message-no-delimiter")
185+
@Produces(RestMediaType.APPLICATION_NDJSON)
186+
@RestStreamElementType(MediaType.APPLICATION_JSON)
187+
Multi<Message> readSingleMessageNoDelimiter();
188+
189+
@GET
190+
@Path("multiple-messages-no-ending-delimiter")
191+
@Produces(RestMediaType.APPLICATION_NDJSON)
192+
@RestStreamElementType(MediaType.APPLICATION_JSON)
193+
Multi<Message> readMultipleMessagesNoEndingDelimiter();
194+
154195
@GET
155196
@Path("/large-pojo")
156197
@Produces(RestMediaType.APPLICATION_NDJSON)
@@ -172,8 +213,14 @@ Multi<Message> people(RoutingContext context) {
172213

173214
@Path("/stream")
174215
public static class StreamingResource {
175-
@Inject
176-
Vertx vertx;
216+
private final ObjectMapper mapper = new ObjectMapper();
217+
private final ObjectWriter messageWriter = mapper.writerFor(Message.class);
218+
219+
private final Vertx vertx;
220+
221+
public StreamingResource(Vertx vertx) {
222+
this.vertx = vertx;
223+
}
177224

178225
@GET
179226
@Path("/string")
@@ -212,19 +259,41 @@ public Multi<Message> readPojo() {
212259
@Produces(RestMediaType.APPLICATION_NDJSON)
213260
@RestStreamElementType(MediaType.APPLICATION_JSON)
214261
public String getPojosAsString() throws JsonProcessingException {
215-
ObjectMapper mapper = new ObjectMapper();
216262
StringBuilder result = new StringBuilder();
217-
ObjectWriter objectWriter = mapper.writerFor(Message.class);
218263
for (var msg : List.of(Message.of("zero", "0"),
219264
Message.of("one", "1"),
220265
Message.of("two", "2"),
221266
Message.of("three", "3"))) {
222-
result.append(objectWriter.writeValueAsString(msg));
267+
result.append(messageWriter.writeValueAsString(msg));
223268
result.append("\n");
224269
}
225270
return result.toString();
226271
}
227272

273+
@GET
274+
@Path("/single-message-no-delimiter")
275+
@Produces(RestMediaType.APPLICATION_NDJSON)
276+
@RestStreamElementType(MediaType.APPLICATION_JSON)
277+
public String singleMessageNoDelimiter() throws JsonProcessingException {
278+
return messageWriter.writeValueAsString(Message.of("foo", "bar"));
279+
}
280+
281+
@GET
282+
@Path("/multiple-messages-no-ending-delimiter")
283+
@Produces(RestMediaType.APPLICATION_NDJSON)
284+
@RestStreamElementType(MediaType.APPLICATION_JSON)
285+
public String multipleMessagesNoEndingDelimiter() throws JsonProcessingException {
286+
return IntStream.range(0, 100)
287+
.mapToObj(i -> Message.of("foo", "bar"))
288+
.map(m -> {
289+
try {
290+
return messageWriter.writeValueAsString(m);
291+
} catch (JsonProcessingException e) {
292+
throw new RuntimeException(e);
293+
}
294+
}).collect(Collectors.joining("\n"));
295+
}
296+
228297
@GET
229298
@Path("/large-pojo")
230299
@Produces(RestMediaType.APPLICATION_NDJSON)
@@ -235,7 +304,7 @@ public Multi<Message> readLargePojo() {
235304
byte[] bytes = new byte[4 * 1024];
236305
Random random = new Random();
237306
random.nextBytes(bytes);
238-
String value = Base64.encodeBase64String(bytes);
307+
String value = Base64.getEncoder().encodeToString(bytes);
239308
em.emit(Message.of("one", value));
240309
em.emit(Message.of("two", value));
241310
em.emit(Message.of("three", value));

independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020

2121
import io.smallrye.mutiny.Multi;
2222
import io.smallrye.mutiny.subscription.MultiEmitter;
23+
import io.vertx.core.AsyncResult;
2324
import io.vertx.core.Handler;
25+
import io.vertx.core.Promise;
2426
import io.vertx.core.buffer.Buffer;
2527
import io.vertx.core.http.HttpClientResponse;
2628
import io.vertx.core.net.impl.ConnectionBase;
2729
import io.vertx.core.parsetools.RecordParser;
30+
import io.vertx.core.parsetools.impl.RecordParserImpl;
2831

2932
public class MultiInvoker extends AbstractRxInvoker<Multi<?>> {
3033

@@ -323,21 +326,31 @@ private <R> void registerForJsonStream(MultiRequest<? super R> multiRequest,
323326
GenericType<R> responseType,
324327
ResponseImpl response,
325328
HttpClientResponse vertxClientResponse) {
326-
RecordParser parser = RecordParser.newDelimited("\n");
327-
parser.handler(new Handler<Buffer>() {
329+
Buffer delimiter = RecordParserImpl.latin1StringToBytes("\n");
330+
RecordParser parser = RecordParser.newDelimited(delimiter);
331+
AtomicReference<Promise> finalDelimiterHandled = new AtomicReference<>();
332+
parser.handler(new Handler<>() {
328333
@Override
329334
public void handle(Buffer chunk) {
330335

331336
ByteArrayInputStream in = new ByteArrayInputStream(chunk.getBytes());
332337
try {
333-
R item = restClientRequestContext.readEntity(in,
334-
responseType,
335-
response.getMediaType(),
336-
restClientRequestContext.getMethodDeclaredAnnotationsSafe(),
337-
response.getMetadata());
338-
multiRequest.emit(item);
338+
if (chunk.length() > 0) {
339+
R item = restClientRequestContext.readEntity(in,
340+
responseType,
341+
response.getMediaType(),
342+
restClientRequestContext.getMethodDeclaredAnnotationsSafe(),
343+
response.getMetadata());
344+
multiRequest.emit(item);
345+
}
339346
} catch (IOException e) {
340347
multiRequest.fail(e);
348+
} finally {
349+
if (finalDelimiterHandled.get() != null) {
350+
// in this case we know that we have handled the last event, so we need to
351+
// signal completion so the Multi can be closed
352+
finalDelimiterHandled.get().complete();
353+
}
341354
}
342355
}
343356
});
@@ -348,10 +361,25 @@ public void handle(Buffer chunk) {
348361
multiRequest.fail(t);
349362
}
350363
});
351-
vertxClientResponse.endHandler(new Handler<Void>() {
364+
vertxClientResponse.endHandler(new Handler<>() {
352365
@Override
353366
public void handle(Void c) {
354-
multiRequest.complete();
367+
// Before closing the Multi, we need to make sure that the parser has emitted the last event.
368+
// Recall that the parser is delimited, which means that won't emit an event until the delimiter is reached
369+
// To force the parser to emit the last event we push a delimiter value and when we are sure that the Multi
370+
// has pushed it down the pipeline, only then do we close it
371+
Promise<Object> promise = Promise.promise();
372+
promise.future().onComplete(new Handler<>() {
373+
@Override
374+
public void handle(AsyncResult<Object> event) {
375+
multiRequest.complete();
376+
}
377+
});
378+
finalDelimiterHandled.set(promise);
379+
380+
// this needs to happen after the promise has been set up, otherwise, the parser's handler could complete
381+
// before the finalDelimiterHandled has been populated
382+
parser.handle(delimiter);
355383
}
356384
});
357385

0 commit comments

Comments
 (0)