Skip to content

Commit 0218bfc

Browse files
authored
Handle large message in grpc (#649)
1 parent bd9b4ea commit 0218bfc

File tree

75 files changed

+1974
-664
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+1974
-664
lines changed

incubator/command-log/src/main/java/io/aklivity/zilla/runtime/command/log/internal/LoggableStream.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@
9292
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaGroupBeginExFW;
9393
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaGroupFlushExFW;
9494
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedBeginExFW;
95+
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedConsumerFlushExFW;
9596
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedDataExFW;
9697
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedFetchDataExFW;
98+
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedFetchFlushExFW;
9799
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedFlushExFW;
98100
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedProduceDataExFW;
99101
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMetaBeginExFW;
@@ -1291,19 +1293,48 @@ private void onKafkaMergedFlushEx(
12911293
long timestamp,
12921294
KafkaMergedFlushExFW merged)
12931295
{
1294-
final ArrayFW<KafkaOffsetFW> progress = merged.fetch().progress();
1295-
final Array32FW<KafkaFilterFW> filters = merged.fetch().filters();
1296+
switch (merged.kind())
1297+
{
1298+
case KafkaFlushExFW.KIND_FETCH:
1299+
onKafkaMergedFetchFlushEx(offset, timestamp, merged.fetch());
1300+
break;
1301+
case KafkaFlushExFW.KIND_CONSUMER:
1302+
onKafkaMergedConsumerFlushEx(offset, timestamp, merged.consumer());
1303+
break;
1304+
}
1305+
}
1306+
1307+
private void onKafkaMergedFetchFlushEx(
1308+
int offset,
1309+
long timestamp,
1310+
KafkaMergedFetchFlushExFW fetch)
1311+
{
1312+
final ArrayFW<KafkaOffsetFW> progress = fetch.progress();
1313+
final Array32FW<KafkaFilterFW> filters = fetch.filters();
12961314

1297-
out.printf(verboseFormat, index, offset, timestamp, "[merged]");
1315+
out.printf(verboseFormat, index, offset, timestamp, "[merged] [fetch]");
12981316
progress.forEach(p -> out.printf(verboseFormat, index, offset, timestamp,
1299-
format("%d: %d %d %d",
1300-
p.partitionId(),
1301-
p.partitionOffset(),
1302-
p.stableOffset(),
1303-
p.latestOffset())));
1317+
format("%d: %d %d %d",
1318+
p.partitionId(),
1319+
p.partitionOffset(),
1320+
p.stableOffset(),
1321+
p.latestOffset())));
13041322
filters.forEach(f -> f.conditions().forEach(c -> out.printf(verboseFormat, index, offset, timestamp, asString(c))));
13051323
}
13061324

1325+
private void onKafkaMergedConsumerFlushEx(
1326+
int offset,
1327+
long timestamp,
1328+
KafkaMergedConsumerFlushExFW consumer)
1329+
{
1330+
final KafkaOffsetFW progress = consumer.progress();
1331+
final long correlationId = consumer.correlationId();
1332+
1333+
out.printf(verboseFormat, index, offset, timestamp,
1334+
format("[merged] [consumer] %d %d %d ",
1335+
progress.partitionId(), progress.partitionOffset(), correlationId));
1336+
}
1337+
13071338
private void onKafkaGroupFlushEx(
13081339
int offset,
13091340
long timestamp,

runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
7373
private static final String16FW HEADER_VALUE_GRPC_OK = new String16FW("0");
7474
private static final String16FW HEADER_VALUE_GRPC_ABORTED = new String16FW("10");
7575
private static final String16FW HEADER_VALUE_GRPC_INTERNAL_ERROR = new String16FW("13");
76-
private final String16FW.Builder string16RW =
77-
new String16FW.Builder().wrap(new UnsafeBuffer(new byte[256], 0, 256), 0, 256);
7876

7977
private final Varuint32FW.Builder lenRW =
8078
new Varuint32FW.Builder().wrap(new UnsafeBuffer(new byte[1024 * 8]), 0, 1024 * 8);;
@@ -86,7 +84,6 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
8684
private final EndFW endRO = new EndFW();
8785
private final AbortFW abortRO = new AbortFW();
8886

89-
9087
private final String16FW.Builder statusRW = new
9188
String16FW.Builder().wrap(new UnsafeBuffer(new byte[256], 0, 256), 0, 256);
9289

@@ -105,13 +102,10 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
105102
private final ExtensionFW extensionRO = new ExtensionFW();
106103
private final GrpcBeginExFW grpcBeginExRO = new GrpcBeginExFW();
107104
private final GrpcDataExFW grpcDataExRO = new GrpcDataExFW();
108-
private final GrpcResetExFW resetExRO = new GrpcResetExFW();
109-
private final GrpcAbortExFW abortExRO = new GrpcAbortExFW();
110105

111106
private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
112107
private final KafkaDataExFW kafkaDataExRO = new KafkaDataExFW();
113108

114-
private final GrpcBeginExFW.Builder grpcBeginExRW = new GrpcBeginExFW.Builder();
115109
private final GrpcDataExFW.Builder grpcDataExRW = new GrpcDataExFW.Builder();
116110
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
117111
private final GrpcAbortExFW.Builder grpcAbortExRW = new GrpcAbortExFW.Builder();
@@ -300,7 +294,7 @@ protected void onKafkaData(
300294
int reserved,
301295
int flags,
302296
OctetsFW payload,
303-
OctetsFW extension)
297+
KafkaDataExFW kafkaDataEx)
304298
{
305299
}
306300

@@ -1300,7 +1294,7 @@ protected void onKafkaData(
13001294
int reserved,
13011295
int flags,
13021296
OctetsFW payload,
1303-
OctetsFW extension)
1297+
KafkaDataExFW kafkaDataEx)
13041298
{
13051299
if (GrpcKafkaState.replyClosing(state))
13061300
{
@@ -1312,10 +1306,6 @@ protected void onKafkaData(
13121306
{
13131307
if (payload == null)
13141308
{
1315-
final ExtensionFW dataEx = extension.get(extensionRO::tryWrap);
1316-
final KafkaDataExFW kafkaDataEx =
1317-
dataEx != null && dataEx.typeId() == kafkaTypeId ? extension.get(kafkaDataExRO::tryWrap) : null;
1318-
13191309
KafkaHeaderFW grpcStatus = kafkaDataEx.merged().fetch().headers()
13201310
.matchFirst(h -> HEADER_NAME_ZILLA_GRPC_STATUS.value().equals(h.name().value()));
13211311

@@ -1336,7 +1326,13 @@ protected void onKafkaData(
13361326
}
13371327
else if (GrpcKafkaState.replyOpening(state))
13381328
{
1339-
doGrpcData(traceId, authorization, budgetId, reserved, flags, payload);
1329+
int deferred = 0;
1330+
if (kafkaDataEx != null)
1331+
{
1332+
deferred = kafkaDataEx.merged().fetch().deferred();
1333+
}
1334+
1335+
doGrpcData(traceId, authorization, budgetId, reserved, deferred, flags, payload);
13401336
}
13411337
}
13421338
}
@@ -1397,11 +1393,18 @@ private void doGrpcData(
13971393
long authorization,
13981394
long budgetId,
13991395
int reserved,
1396+
int deferred,
14001397
int flags,
14011398
OctetsFW payload)
14021399
{
1400+
GrpcDataExFW dataEx = grpcDataExRW
1401+
.wrap(extBuffer, 0, extBuffer.capacity())
1402+
.typeId(grpcTypeId)
1403+
.deferred(deferred)
1404+
.build();
1405+
14031406
doData(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax,
1404-
traceId, authorization, budgetId, flags, reserved, payload, emptyRO);
1407+
traceId, authorization, budgetId, flags, reserved, payload, dataEx);
14051408

14061409
replySeq += reserved;
14071410

@@ -1952,7 +1955,11 @@ private void onKafkaData(
19521955
final OctetsFW payload = data.payload();
19531956
final OctetsFW extension = data.extension();
19541957

1955-
delegate.onKafkaData(traceId, authorization, budgetId, reserved, flags, payload, extension);
1958+
final ExtensionFW dataEx = extension.get(extensionRO::tryWrap);
1959+
final KafkaDataExFW kafkaDataEx =
1960+
dataEx != null && dataEx.typeId() == kafkaTypeId ? extension.get(kafkaDataExRO::tryWrap) : null;
1961+
1962+
delegate.onKafkaData(traceId, authorization, budgetId, reserved, flags, payload, kafkaDataEx);
19561963
}
19571964
}
19581965

runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ public void shouldExchangeMessageWithUnaryRpc() throws Exception
5858
k3po.finish();
5959
}
6060

61+
@Test
62+
@Configuration("produce.proxy.rpc.yaml")
63+
@Specification({
64+
"${grpc}/unary.rpc.message.value.100k/client",
65+
"${kafka}/unary.rpc.message.value.100k/server"})
66+
public void shouldExchange100kMessageWithUnaryRpc() throws Exception
67+
{
68+
k3po.finish();
69+
}
70+
6171
@Test
6272
@Configuration("produce.proxy.rpc.yaml")
6373
@Specification({

runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/stream/GrpcClientFactory.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.BeginFW;
3838
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.DataFW;
3939
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.EndFW;
40-
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.FlushFW;
4140
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcAbortExFW;
4241
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcBeginExFW;
4342
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcDataExFW;
@@ -87,15 +86,13 @@ public class GrpcClientFactory implements GrpcStreamFactory
8786
private final DataFW dataRO = new DataFW();
8887
private final EndFW endRO = new EndFW();
8988
private final AbortFW abortRO = new AbortFW();
90-
private final FlushFW flushRO = new FlushFW();
9189
private final WindowFW windowRO = new WindowFW();
9290
private final ResetFW resetRO = new ResetFW();
9391

9492
private final BeginFW.Builder beginRW = new BeginFW.Builder();
9593
private final DataFW.Builder dataRW = new DataFW.Builder();
9694
private final EndFW.Builder endRW = new EndFW.Builder();
9795
private final AbortFW.Builder abortRW = new AbortFW.Builder();
98-
private final FlushFW.Builder flushRW = new FlushFW.Builder();
9996
private final WindowFW.Builder windowRW = new WindowFW.Builder();
10097
private final ResetFW.Builder resetRW = new ResetFW.Builder();
10198
private final OctetsFW.Builder octetsRW = new OctetsFW.Builder();
@@ -106,8 +103,6 @@ public class GrpcClientFactory implements GrpcStreamFactory
106103
private final HttpEndExFW endExRO = new HttpEndExFW();
107104
private final GrpcMessageFW grpcMessageRO = new GrpcMessageFW();
108105
private final HttpBeginExFW.Builder httpBeginExRW = new HttpBeginExFW.Builder();
109-
private final HttpEndExFW.Builder httpEndExRW = new HttpEndExFW.Builder();
110-
private final GrpcBeginExFW.Builder grpcBeginExRW = new GrpcBeginExFW.Builder();
111106
private final GrpcDataExFW.Builder grpcDataExRW = new GrpcDataExFW.Builder();
112107
private final GrpcAbortExFW.Builder grpcAbortExRW = new GrpcAbortExFW.Builder();
113108
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
@@ -343,13 +338,24 @@ private void onAppData(
343338
assert acknowledge <= sequence;
344339
assert sequence >= initialSeq;
345340

346-
initialSeq = sequence;
341+
initialSeq = sequence + reserved;
347342

348343
assert initialAck <= initialSeq;
349344

350-
final GrpcDataExFW grpcDataEx = extension.get(grpcDataExRO::tryWrap);
351-
final int deferred = grpcDataEx != null ? grpcDataEx.deferred() : 0;
352-
delegate.doNetData(traceId, authorization, budgetId, reserved, deferred, flags, payload);
345+
if (initialSeq > initialAck + initialMax)
346+
{
347+
delegate.doNetAbort(traceId, authorization);
348+
delegate.doNetReset(traceId, authorization);
349+
350+
doAppReset(traceId, authorization);
351+
doAppAbort(traceId, authorization, EMPTY_OCTETS);
352+
}
353+
else
354+
{
355+
final GrpcDataExFW grpcDataEx = extension.get(grpcDataExRO::tryWrap);
356+
final int deferred = grpcDataEx != null ? grpcDataEx.deferred() : 0;
357+
delegate.doNetData(traceId, authorization, budgetId, reserved, deferred, flags, payload);
358+
}
353359
}
354360

355361
private void onAppEnd(
@@ -539,6 +545,8 @@ private void doAppWindow(
539545

540546
doWindow(application, originId, routedId, initialId, initialSeq, this.initialAck, this.initialMax,
541547
traceId, authorization, budgetId, padding);
548+
549+
assert initialSeq <= initialAck + initialMax;
542550
}
543551

544552
private void doAppReset(
@@ -813,12 +821,11 @@ private void onNetData(
813821
messageDeferred = messageLength - payloadSize;
814822

815823
Flyweight dataEx = messageDeferred > 0 ?
816-
grpcDataExRW.wrap(writeBuffer, DataFW.FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
824+
grpcDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
817825
.typeId(grpcTypeId)
818826
.deferred(messageDeferred)
819827
.build() : EMPTY_OCTETS;
820828

821-
822829
int flags = messageDeferred > 0 ? DATA_FLAG_INIT : DATA_FLAG_INIT | DATA_FLAG_FIN;
823830
delegate.doAppData(traceId, authorization, budgetId, reserved, flags,
824831
buffer, offset + GRPC_MESSAGE_PADDING, payloadSize, dataEx);
@@ -918,7 +925,7 @@ private void onNetWindow(
918925
initialMax = maximum;
919926
state = GrpcState.openInitial(state);
920927

921-
assert initialAck <= initialMax;
928+
assert initialAck <= initialSeq;
922929

923930
delegate.doAppWindow(traceId, authorization, budgetId, padding + GRPC_MESSAGE_PADDING,
924931
initialAck, initialMax);

runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/stream/GrpcServerFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ private void onNetData(
532532
messageDeferred = messageLength - payloadSize;
533533

534534
Flyweight dataEx = messageDeferred > 0 ?
535-
grpcDataExRW.wrap(writeBuffer, DataFW.FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
535+
grpcDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
536536
.typeId(grpcTypeId)
537537
.deferred(messageDeferred)
538538
.build() : EMPTY_OCTETS;
@@ -674,8 +674,6 @@ private void doNetBegin(
674674
int replyMax)
675675
{
676676
this.replySeq = replySeq;
677-
this.replyAck = replyAck;
678-
this.replyMax = replyMax;
679677

680678
doBegin(network, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization,
681679
affinity, hs -> hs.item(h -> h.name(HEADER_NAME_STATUS).value(HEADER_VALUE_STATUS_200))

runtime/binding-grpc/src/test/java/io/aklivity/zilla/runtime/binding/grpc/internal/streams/client/UnaryRpcIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,15 @@ public void shouldAbortResponseMissingGrpcStatus() throws Exception
145145
{
146146
k3po.finish();
147147
}
148+
149+
@Test
150+
@Configuration("client.when.yaml")
151+
@Specification({
152+
"${app}/message.exchange.100k/client",
153+
"${net}/message.exchange.100k/server"
154+
})
155+
public void shouldExchange100kMessage() throws Exception
156+
{
157+
k3po.finish();
158+
}
148159
}

runtime/binding-grpc/src/test/java/io/aklivity/zilla/runtime/binding/grpc/internal/streams/server/UnaryRpcIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,16 @@ public void serverSendsWriteAbortOnOpenRequestResponse() throws Exception
145145
k3po.finish();
146146
}
147147

148+
@Test
149+
@Configuration("server.when.yaml")
150+
@Specification({
151+
"${net}/message.exchange.100k/client",
152+
"${app}/message.exchange.100k/server"
153+
})
154+
public void shouldExchange100kMessage() throws Exception
155+
{
156+
k3po.finish();
157+
}
158+
159+
148160
}

0 commit comments

Comments
 (0)