Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ public final class AmqpServerFactory implements AmqpStreamFactory
private final AmqpMapFW<AmqpValueFW, AmqpValueFW> annotationsRO = new AmqpMapFW<>(new AmqpValueFW(), new AmqpValueFW());
private final OctetsFW deliveryTagRO = new OctetsFW();
private final AmqpMessagePropertiesFW amqpPropertiesRO = new AmqpMessagePropertiesFW();
private final AmqpMapFW<AmqpValueFW, AmqpValueFW> applicationPropertyRO =
new AmqpMapFW<>(new AmqpValueFW(), new AmqpValueFW());
private final AmqpApplicationPropertiesFW<AmqpSimpleTypeFW> applicationPropertiesRO =
new AmqpApplicationPropertiesFW<>(new AmqpStringFW(), new AmqpSimpleTypeFW());
private final AmqpMapFW<AmqpValueFW, AmqpValueFW> footerRO = new AmqpMapFW<>(new AmqpValueFW(), new AmqpValueFW());
Expand All @@ -275,7 +273,6 @@ public final class AmqpServerFactory implements AmqpStreamFactory
private final AmqpCloseFW.Builder amqpCloseRW = new AmqpCloseFW.Builder();
private final AmqpErrorListFW.Builder amqpErrorListRW = new AmqpErrorListFW.Builder();
private final AmqpStringFW.Builder amqpStringRW = new AmqpStringFW.Builder();
private final AmqpSimpleTypeFW.Builder amqpValueRW = new AmqpSimpleTypeFW.Builder();
private final AmqpSymbolFW.Builder amqpSymbolRW = new AmqpSymbolFW.Builder();
private final AmqpSourceListFW.Builder amqpSourceListRW = new AmqpSourceListFW.Builder();
private final AmqpTargetListFW.Builder amqpTargetListRW = new AmqpTargetListFW.Builder();
Expand Down Expand Up @@ -4503,6 +4500,8 @@ private int encodeSectionData(
messageFragmentRW.put(buffer, progress, Integer.BYTES);
progress += Integer.BYTES;
break;
default:
break;
}
this.sectionEncoder = this::encodeSectionDataBytes;
}
Expand Down Expand Up @@ -4553,6 +4552,8 @@ private int encodeSectionSequence(
messageFragmentRW.put(buffer, progress, Integer.BYTES);
progress += Integer.BYTES;
break;
default:
break;
}
this.sectionEncoder = this::encodeSectionSequenceBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public final class PgsqlClientFactory implements PgsqlStreamFactory
private final Int2ObjectHashMap<PgsqlClientDecoder> decodersByType;

{
Int2ObjectHashMap<PgsqlClientDecoder> decodersByType = new Int2ObjectHashMap();
Int2ObjectHashMap<PgsqlClientDecoder> decodersByType = new Int2ObjectHashMap<>();
decodersByType.put(MESSAGE_TYPE_AUTH, decodePgsqlAuth);
decodersByType.put(MESSAGE_TYPE_BACKEND_KEY, decodePgsqlBackendKey);
decodersByType.put(MESSAGE_TYPE_PARAMETER_STATUS, decodePgsqlParameterStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public final class PgsqlServerFactory implements PgsqlStreamFactory

private final Int2ObjectHashMap<PgsqlServerDecoder> decodersByType;
{
Int2ObjectHashMap<PgsqlServerDecoder> decodersByType = new Int2ObjectHashMap();
Int2ObjectHashMap<PgsqlServerDecoder> decodersByType = new Int2ObjectHashMap<>();
decodersByType.put(MESSAGE_TYPE_QUERY, decodePgsqlQuery);
decodersByType.put(MESSAGE_TYPE_TERMINATE, decodePgsqlTermination);
this.decodersByType = decodersByType;
Expand Down
2 changes: 1 addition & 1 deletion incubator/command-log/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ under the License.

This project includes:
agrona under The Apache License, Version 2.0
Apache Commons CLI under Apache-2.0
Apache Commons CLI under Apache License, Version 2.0
ICU4J under Unicode/ICU License
Jackson-annotations under The Apache Software License, Version 2.0
Jackson-core under The Apache Software License, Version 2.0
Expand Down
1 change: 0 additions & 1 deletion incubator/command-log/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public ConditionConfig adaptFromJson(
textValue = ((JsonString) v).getString();
base64Value = encoder64.encodeToString(textValue.getBytes());
break;
default:
break;
}

GrpcKafkaMetadataValueConfig metadataValue = new GrpcKafkaMetadataValueConfig(new String16FW(textValue),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public OctetsFW correlationId()

if (digest != null && correlationId != null)
{
octetsRW.reset();
octetsRW.rewrap();
newCorrelationId = octetsRW.put(correlationId).put(dashOctets).put(toHex(digest).getBytes()).build();
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ public MessageConsumer newStream(
final OctetsFW extension = begin.extension();
final HttpBeginExFW httpBeginEx = extension.get(httpBeginExRO::tryWrap);

@SuppressWarnings("resource")
MessageConsumer newStream = (t, b, i, l) -> {};

if (!isGrpcRequestMethod(httpBeginEx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.config;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -65,22 +64,6 @@ private boolean observeMatched()
return true;
}

private static List<Matcher> asTopicMatchers(
List<String> wildcards)
{
final List<Matcher> matchers = new ArrayList<>();
for (String wildcard : wildcards)
{
String patternBegin = wildcard.startsWith("/") ? "(" : "^(?!\\/)(";
String fixedPattern = patternBegin + asRegexPattern(wildcard, 0, true) + ")?\\/?\\#?";
String nonFixedPattern = patternBegin + asRegexPattern(wildcard, 0, false) + ")?\\/?\\#";
fixedPattern = fixedPattern.replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>.+)");
nonFixedPattern = nonFixedPattern.replaceAll("\\{([a-zA-Z_]+)\\}", "");
matchers.add(Pattern.compile(nonFixedPattern + "|" + fixedPattern).matcher(""));
}
return matchers;
}

private static Matcher asTopicMatcher(
List<String> wildcards)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,6 @@ private int decodePublish(

int progress = offset;

decode:
if (length >= client.decodeablePacketBytes)
{
int reasonCode = SUCCESS;
Expand Down Expand Up @@ -1051,7 +1050,7 @@ private int decodePublishPayload(
if (canPublish && subscriber.debitorIndex != NO_DEBITOR_INDEX && reserved != 0)
{
final int minimum = reserved; // TODO: fragmentation
reserved = subscriber.debitor.claim(subscriber.debitorIndex, subscriber.replyId, minimum, reserved);
reserved = subscriber.debitor.claim(traceId, subscriber.debitorIndex, subscriber.replyId, minimum, reserved, 0);
}

if (canPublish && (reserved != 0 || payloadSize == 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,6 @@ private int decodePublishPayload(
int progress = offset;
int reasonCode = SUCCESS;

decode:
if (length >= 0)
{
MqttServer.MqttPublishStream publisher = server.publishes.get(server.decodePublisherKey);
Expand All @@ -1512,7 +1511,7 @@ private int decodePublishPayload(
if (canPublish && publisher.debitorIndex != NO_DEBITOR_INDEX && publishablePayloadSize != 0)
{
valueClaimed =
publisher.debitor.claim(publisher.debitorIndex, publisher.initialId, minimum, maximum);
publisher.debitor.claim(traceId, publisher.debitorIndex, publisher.initialId, minimum, maximum, 0);
}

if (canPublish && (valueClaimed != 0 || payload.sizeof() == 0))
Expand Down Expand Up @@ -3080,79 +3079,78 @@ private int onDecodeConnectWillPayload(
int limit)
{
int progress = offset;
decode:
{
final int willFlags = decodeWillFlags(connectFlags);
final int willQos = decodeWillQos(connectFlags);
final boolean willFlagSet = isSetWillFlag(connectFlags);

if (willFlagSet && MqttState.initialOpened(session.state))
{
int publishedWillSize = 0;
if (willPayloadDeferred == 0)
final int willFlags = decodeWillFlags(connectFlags);
final int willQos = decodeWillQos(connectFlags);
final boolean willFlagSet = isSetWillFlag(connectFlags);

if (willFlagSet && MqttState.initialOpened(session.state))
{
int publishedWillSize = 0;
if (willPayloadDeferred == 0)
{
final MqttWillMessageFW.Builder willMessageBuilder =
mqttWillMessageRW.wrap(willMessageBuffer, 0, willMessageBuffer.capacity())
.topic(mqttConnectPayloadRO.willTopic)
.delay(mqttConnectPayloadRO.willDelay)
.qos(willQos)
.flags(willFlags)
.expiryInterval(mqttConnectPayloadRO.expiryInterval)
.contentType(mqttConnectPayloadRO.contentType)
.format(f -> f.set(mqttConnectPayloadRO.payloadFormat))
.responseTopic(mqttConnectPayloadRO.responseTopic)
.correlation(c -> c.bytes(mqttConnectPayloadRO.correlationData))
.payloadSize(mqttConnectPayloadRO.payloadSize);

if (version == 5)
{
final MqttWillMessageFW.Builder willMessageBuilder =
mqttWillMessageRW.wrap(willMessageBuffer, 0, willMessageBuffer.capacity())
.topic(mqttConnectPayloadRO.willTopic)
.delay(mqttConnectPayloadRO.willDelay)
.qos(willQos)
.flags(willFlags)
.expiryInterval(mqttConnectPayloadRO.expiryInterval)
.contentType(mqttConnectPayloadRO.contentType)
.format(f -> f.set(mqttConnectPayloadRO.payloadFormat))
.responseTopic(mqttConnectPayloadRO.responseTopic)
.correlation(c -> c.bytes(mqttConnectPayloadRO.correlationData))
.payloadSize(mqttConnectPayloadRO.payloadSize);

if (version == 5)
{
final Array32FW<MqttUserPropertyFW> userProperties = willUserPropertiesRW.build();
userProperties.forEach(
c -> willMessageBuilder.propertiesItem(p -> p.key(c.key()).value(c.value())));
}

final MqttWillMessageFW will = willMessageBuilder.build();
final int headerSize = willMessageBuilder.sizeof();
int payloadSize = Math.min(limit - offset, session.initialBudget() - headerSize);
final Array32FW<MqttUserPropertyFW> userProperties = willUserPropertiesRW.build();
userProperties.forEach(
c -> willMessageBuilder.propertiesItem(p -> p.key(c.key()).value(c.value())));
}

final OctetsFW payload = payloadRO.wrap(buffer, offset, offset + payloadSize);
final MqttWillMessageFW will = willMessageBuilder.build();
final int headerSize = willMessageBuilder.sizeof();
int payloadSize = Math.min(limit - offset, session.initialBudget() - headerSize);

willMessageBuffer.putBytes(will.limit(), payload.buffer(), payload.offset(), payload.limit());
final OctetsFW payload = payloadRO.wrap(buffer, offset, offset + payloadSize);

int flags = willPayloadBytes + headerSize > session.initialBudget() ? FLAG_INIT : FLAG_INIT | FLAG_FIN;
int deferred = Math.max(willPayloadBytes + headerSize - session.initialBudget(), 0);
willPayloadDeferred = deferred;
willMessageBuffer.putBytes(will.limit(), payload.buffer(), payload.offset(), payload.limit());

final MqttDataExFW.Builder sessionDataExBuilder =
mqttSessionDataExRW.wrap(sessionExtBuffer, 0, sessionExtBuffer.capacity())
.typeId(mqttTypeId)
.session(s -> s.deferred(deferred).kind(k -> k.set(MqttSessionDataKind.WILL)));
int flags = willPayloadBytes + headerSize > session.initialBudget() ? FLAG_INIT : FLAG_INIT | FLAG_FIN;
int deferred = Math.max(willPayloadBytes + headerSize - session.initialBudget(), 0);
willPayloadDeferred = deferred;

publishedWillSize = session.doSessionData(traceId, flags,
willMessageBuffer, 0, headerSize + payload.sizeof(), headerSize, sessionDataExBuilder.build());
final MqttDataExFW.Builder sessionDataExBuilder =
mqttSessionDataExRW.wrap(sessionExtBuffer, 0, sessionExtBuffer.capacity())
.typeId(mqttTypeId)
.session(s -> s.deferred(deferred).kind(k -> k.set(MqttSessionDataKind.WILL)));

if (publishedWillSize < headerSize)
{
willPayloadDeferred = 0;
}
publishedWillSize = session.doSessionData(traceId, flags,
willMessageBuffer, 0, headerSize + payload.sizeof(), headerSize, sessionDataExBuilder.build());

willPayloadBytes -= payloadSize;
progress += payloadSize;
}
else
if (publishedWillSize < headerSize)
{
final OctetsFW payload = payloadRO.wrap(buffer, offset, limit);
assert willPayloadDeferred >= 0;
int flags = willPayloadDeferred - payload.sizeof() > 0 ? FLAG_CONT : FLAG_FIN;

publishedWillSize = session.doSessionData(traceId, flags,
payload.buffer(), offset, limit, 0, EMPTY_OCTETS);
willPayloadDeferred -= publishedWillSize;
willPayloadBytes -= publishedWillSize;
progress += publishedWillSize;
willPayloadDeferred = 0;
}

willPayloadBytes -= payloadSize;
progress += payloadSize;
}
else
{
final OctetsFW payload = payloadRO.wrap(buffer, offset, limit);
assert willPayloadDeferred >= 0;
int flags = willPayloadDeferred - payload.sizeof() > 0 ? FLAG_CONT : FLAG_FIN;

publishedWillSize = session.doSessionData(traceId, flags,
payload.buffer(), offset, limit, 0, EMPTY_OCTETS);
willPayloadDeferred -= publishedWillSize;
willPayloadBytes -= publishedWillSize;
progress += publishedWillSize;
}
}

return progress;
}

Expand Down
7 changes: 0 additions & 7 deletions runtime/binding-openapi-asyncapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,37 +44,30 @@
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-openapi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-asyncapi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-http-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-mqtt-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-mqtt</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
*/
package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;

import jakarta.json.bind.Jsonb;
import jakarta.json.bind.JsonbBuilder;
Expand Down
6 changes: 0 additions & 6 deletions runtime/binding-openapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,26 @@
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-tcp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-tls</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>catalog-inline</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>model-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>model-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Loading