Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,15 @@ public <C> BindingConfigBuilder<C> injectMqttKafkaRoutes(
final AsyncapiChannelView channel = AsyncapiChannelView.of(mqttAsyncapi.channels, whenOperation.channel);
final MqttKafkaConditionKind kind = whenOperation.action.equals(ASYNCAPI_SEND_ACTION_NAME) ?
MqttKafkaConditionKind.PUBLISH : MqttKafkaConditionKind.SUBSCRIBE;
final String topic = channel.address().replaceAll("\\{[^}]+\\}", "+");
String topic = channel.address();

routeBuilder
.when(MqttKafkaConditionConfig::builder)
.topic(topic)
.kind(kind)
.build()
.with(MqttKafkaWithConfig::builder)
.messages(messages)
.messages(messages.replaceAll("\\{([^{}]*)\\}", "\\${params.$1}"))
.build()
.exit(qname);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@

public class MqttKafkaRouteConfig
{
private final Optional<MqttKafkaWithResolver> with;

public final MqttKafkaWithResolver with;
private final List<MqttKafkaConditionMatcher> when;
private final LongPredicate authorized;

public final long id;
public final long order;

public final String16FW messages;
public final String16FW retained;

public MqttKafkaRouteConfig(
Expand All @@ -45,12 +45,13 @@ public MqttKafkaRouteConfig(
this.order = route.order;
this.with = Optional.ofNullable(route.with)
.map(MqttKafkaWithConfig.class::cast)
.map(c -> new MqttKafkaWithResolver(options, c));
this.messages = with.isPresent() ? with.get().messages() : options.topics.messages;
.map(c -> new MqttKafkaWithResolver(options, c))
.orElse(new MqttKafkaWithResolver(options, null));
this.retained = options.topics.retained;
this.when = route.when.stream()
.map(MqttKafkaConditionConfig.class::cast)
.map(MqttKafkaConditionMatcher::new)
.peek(m -> m.observe(with::onConditionMatched))
.collect(toList());
this.authorized = route.authorized;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.agrona.LangUtil;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.engine.Configuration;

public class MqttKafkaConfiguration extends Configuration
Expand All @@ -39,6 +40,7 @@ public class MqttKafkaConfiguration extends Configuration
public static final IntPropertyDef WILL_STREAM_RECONNECT_DELAY;
public static final BooleanPropertyDef BOOTSTRAP_AVAILABLE;
public static final IntPropertyDef BOOTSTRAP_STREAM_RECONNECT_DELAY;
public static final IntPropertyDef PUBLISH_QOS_MAX;

static
{
Expand All @@ -57,6 +59,7 @@ public class MqttKafkaConfiguration extends Configuration
WILL_STREAM_RECONNECT_DELAY = config.property("will.stream.reconnect", 2);
BOOTSTRAP_AVAILABLE = config.property("bootstrap.available", true);
BOOTSTRAP_STREAM_RECONNECT_DELAY = config.property("bootstrap.stream.reconnect", 2);
PUBLISH_QOS_MAX = config.property("publish.qos.max", 2);
MQTT_KAFKA_CONFIG = config;
}

Expand Down Expand Up @@ -116,6 +119,11 @@ public int bootstrapStreamReconnectDelay()
return BOOTSTRAP_STREAM_RECONNECT_DELAY.getAsInt(this);
}

public MqttQoS publishQosMax()
{
return MqttQoS.valueOf(PUBLISH_QOS_MAX.getAsInt(this));
}


private static StringSupplier decodeStringSupplier(
String fullyQualifiedMethodName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream.MqttKafkaSessionFactory;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttTopicFilterFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
Expand Down Expand Up @@ -95,6 +96,12 @@ public List<MqttKafkaRouteConfig> resolveAll(
.collect(Collectors.toList());
}

public MqttQoS publishQosMax()
{
return routes.stream().noneMatch(r -> r.with != null && r.with.containsParams()) ?
MqttQoS.EXACTLY_ONCE : MqttQoS.AT_LEAST_ONCE;
}

public String16FW messagesTopic()
{
return options.topics.messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -24,34 +25,45 @@

public class MqttKafkaConditionMatcher
{
private final List<Matcher> matchers;
private final Matcher matcher;
public final MqttKafkaConditionKind kind;
private Consumer<MqttKafkaConditionMatcher> observer;

public MqttKafkaConditionMatcher(
MqttKafkaConditionConfig condition)
{
this.matchers = asTopicMatchers(condition.topics);
this.matcher = asTopicMatcher(condition.topics);
this.kind = condition.kind;
}

public boolean matches(
String topic)
{
boolean match = false;
if (matchers != null)
return this.matcher == null ||
this.matcher.reset(topic).matches() && observeMatched();
}

public String parameter(
String name)
{
return matcher.group(name);
}

public void observe(
Consumer<MqttKafkaConditionMatcher> observer)
{
this.observer = observer;
}

private boolean observeMatched()
{
if (observer != null)
{
for (Matcher matcher : matchers)
{
if (matcher.reset(topic).matches())
{
match = true;
break;
}
}
observer.accept(this);
}
return match;
}

return true;
}

private static List<Matcher> asTopicMatchers(
List<String> wildcards)
Expand All @@ -62,11 +74,37 @@ private static List<Matcher> asTopicMatchers(
String patternBegin = wildcard.startsWith("/") ? "(" : "^(?!\\/)(";
String fixedPattern = patternBegin + asRegexPattern(wildcard, 0, true) + ")?\\/?\\#?";
String nonFixedPattern = patternBegin + asRegexPattern(wildcard, 0, false) + ")?\\/?\\#";
fixedPattern = fixedPattern.replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>.+)");
nonFixedPattern = nonFixedPattern.replaceAll("\\{([a-zA-Z_]+)\\}", "");
matchers.add(Pattern.compile(nonFixedPattern + "|" + fixedPattern).matcher(""));
}
return matchers;
}

private static Matcher asTopicMatcher(
List<String> wildcards)
{
StringBuilder combinedRegex = new StringBuilder();

for (String wildcard : wildcards)
{
String patternBegin = wildcard.startsWith("/") ? "(" : "^(?!\\/)(";
String fixedPattern = patternBegin + asRegexPattern(wildcard, 0, true) + ")?\\/?\\#?";
String nonFixedPattern = patternBegin + asRegexPattern(wildcard, 0, false) + ")?\\/?\\#";
fixedPattern = fixedPattern.replaceAll("\\{([a-zA-Z_]+)\\}", "(?<$1>.+)");
nonFixedPattern = nonFixedPattern.replaceAll("\\{([a-zA-Z_]+)\\}", "");

combinedRegex.append(nonFixedPattern).append("|").append(fixedPattern).append("|");
}

if (combinedRegex.length() > 0)
{
combinedRegex.deleteCharAt(combinedRegex.length() - 1);
}

return Pattern.compile(combinedRegex.toString()).matcher("");
}

private static String asRegexPattern(
String wildcard,
int level,
Expand All @@ -93,6 +131,7 @@ private static String asRegexPattern(
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("\\{[^}]+\\}", "[^/]*")
.replace("#", ".*");
pattern = (level > 0) ? "(\\/\\+|\\/" + currentPart + ")" : "(\\+|" + currentPart + ")";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,62 @@
*/
package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.config;

import java.util.function.Function;
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaWithConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;

public class MqttKafkaWithResolver
{
private final String16FW messages;
private static final Pattern PARAMS_PATTERN = Pattern.compile("\\$\\{params\\.([a-zA-Z_]+)\\}");

private final Matcher paramsMatcher;
private final MqttKafkaWithConfig with;
private final MqttKafkaOptionsConfig options;

private Function<MatchResult, String> replacer = r -> null;

public MqttKafkaWithResolver(
MqttKafkaOptionsConfig options,
MqttKafkaWithConfig with)
{
this.messages = with.messages == null ? options.topics.messages : new String16FW(with.messages);
this.paramsMatcher = PARAMS_PATTERN.matcher("");
this.with = with;
this.options = options;
}

public void onConditionMatched(
MqttKafkaConditionMatcher condition)
{
this.replacer = r -> condition.parameter(r.group(1));
}

public String16FW messages()
public boolean containsParams()
{
return messages;
return with != null && paramsMatcher.reset(with.messages).find();
}

public String16FW resolveMessages()
{
String topic = null;
if (with != null)
{
topic = with.messages;
Matcher topicMatcher = paramsMatcher.reset(topic);
StringBuilder result = new StringBuilder();
while (topicMatcher.find())
{
String replacement = replacer.apply(paramsMatcher.toMatchResult());
topicMatcher.appendReplacement(result, replacement);
}
topicMatcher.appendTail(result);

topic = result.toString();
}
return topic == null ? options.topics.messages : new String16FW(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,17 @@ public MessageConsumer newStream(
binding.resolve(authorization, mqttPublishBeginEx.topic().asString()) : null;
MessageConsumer newStream = null;

if (resolved != null)
final int qos = mqttPublishBeginEx.qos();
final int qosMax = binding.publishQosMax().value();

if (mqttPublishBeginEx.qos() > qosMax)
{
return null;
}
else if (resolved != null)
{
final long resolvedId = resolved.id;
final String16FW messagesTopic = resolved.messages;
final int qos = mqttPublishBeginEx.qos();
final String16FW messagesTopic = resolved.with.resolveMessages();
final MqttPublishProxy proxy = new MqttPublishProxy(mqtt, originId, routedId, initialId, resolvedId, affinity,
binding, messagesTopic, binding.retainedTopic(), qos, binding.clients);
newStream = proxy::onMqttMessage;
Expand Down
Loading