Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public AsyncapiBindingConfig(
this.overrideRouteId = overrideRouteId;
this.options = AsyncapiOptionsConfig.class.cast(binding.options);
this.routes = binding.routes.stream().map(r -> new AsyncapiRouteConfig(r, options::resolveApiId)).collect(toList());
this.compositeResolvedIds = binding.composites.stream()
this.compositeResolvedIds = binding.composites.values().stream()
.map(c -> c.bindings)
.flatMap(List::stream)
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") ||
Expand All @@ -79,7 +79,7 @@ public AsyncapiBindingConfig(
IDENTITY_FINISH
));
this.composites = new Int2ObjectHashMap<>();
binding.composites.stream()
binding.composites.values().stream()
.map(c -> c.bindings)
.flatMap(List::stream)
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") || b.type.equals("kafka") && b.kind == CACHE_CLIENT ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ public void shouldAddCompositeBinding()
Assert.assertEquals(0, config.composites.size());
BindingConfig newConfig = adapter.adapt(config);
Assert.assertEquals(1, newConfig.composites.size());
Assert.assertEquals(2, newConfig.composites.get(0).bindings.size());
Assert.assertEquals(2, newConfig.composites.get("example:asyncapi0/mqtt").bindings.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public OpenapiAsyncapiBindingConfig(
.map(r -> new OpenapiAsyncapiRouteConfig(r, options::resolveOpenapiApiId))
.collect(toList());

this.resolvedIds = binding.composites.stream()
this.resolvedIds = binding.composites.values().stream()
.map(c -> c.bindings)
.flatMap(List::stream)
.filter(b -> b.type.equals("http-kafka"))
Expand All @@ -63,7 +63,7 @@ public OpenapiAsyncapiBindingConfig(
IDENTITY_FINISH
));

this.httpKafkaOrigins = binding.composites.stream()
this.httpKafkaOrigins = binding.composites.values().stream()
.map(c -> c.bindings)
.flatMap(List::stream)
.filter(b -> b.type.equals("http-kafka"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public OpenapiBindingConfig(

this.routes = binding.routes.stream().map(OpenapiRouteConfig::new).collect(toList());

this.resolvedIds = binding.composites.stream()
this.resolvedIds = binding.composites.values().stream()
.map(c -> c.bindings)
.flatMap(List::stream)
.filter(b -> b.type.equals("http"))
Expand All @@ -91,7 +91,7 @@ public OpenapiBindingConfig(
IDENTITY_FINISH
));

this.httpOrigins = binding.composites.stream()
this.httpOrigins = binding.composites.values().stream()
.map(c -> c.bindings)
.flatMap(List::stream)
.filter(b -> b.type.equals("http"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;

import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.AgentRunner;

import io.aklivity.zilla.runtime.engine.binding.Binding;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
Expand Down Expand Up @@ -85,7 +83,6 @@ public final class Engine implements Collector, AutoCloseable
{
private final Collection<Binding> bindings;
private final ExecutorService tasks;
private final Collection<AgentRunner> runners;
private final Tuning tuning;
private final List<EngineExtSpi> extensions;
private final ContextImpl context;
Expand All @@ -98,6 +95,7 @@ public final class Engine implements Collector, AutoCloseable
private final List<EngineWorker> workers;
private final boolean readonly;
private final EngineConfiguration config;

private Future<Void> watcherTaskRef;

Engine(
Expand Down Expand Up @@ -160,12 +158,12 @@ public final class Engine implements Collector, AutoCloseable
this.tuning = tuning;

List<EngineWorker> workers = new ArrayList<>(workerCount);
for (int coreIndex = 0; coreIndex < workerCount; coreIndex++)
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
{
EngineWorker worker =
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters,
guards, vaults, catalogs, models, metricGroups, this, this::supplyEventReader,
eventFormatterFactory, coreIndex, readonly);
eventFormatterFactory, workerIndex, readonly);
workers.add(worker);
}
this.workers = workers;
Expand Down Expand Up @@ -223,14 +221,10 @@ else if ("http".equals(protocol) || "https".equals(protocol))
throw new UnsupportedOperationException();
}

List<AgentRunner> runners = new ArrayList<>(workers.size());
workers.forEach(d -> runners.add(d.runner()));

this.bindings = bindings;
this.tasks = tasks;
this.extensions = extensions;
this.context = context;
this.runners = runners;
this.readonly = readonly;
}

Expand All @@ -246,10 +240,11 @@ public <T> T binding(

public void start() throws Exception
{
for (AgentRunner runner : runners)
for (EngineWorker worker : workers)
{
AgentRunner.startOnThread(runner, Thread::new);
worker.doStart();
}

watcherTaskRef = watcherTask.submit();
if (!readonly)
{
Expand All @@ -271,11 +266,11 @@ public void close() throws Exception
watcherTask.close();
watcherTaskRef.get();

for (AgentRunner runner : runners)
for (EngineWorker worker : workers)
{
try
{
CloseHelper.close(runner);
worker.doClose();
}
catch (Throwable ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@
import static java.util.function.Function.identity;

import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.function.UnaryOperator;

public class BindingConfig
{
public transient long id;
public transient long entryId;
public transient ToLongFunction<String> resolveId;
public transient Function<String, String> readURL;

public transient long vaultId;
public transient String qvault;

public transient long[] metricIds;

public transient UnaryOperator<NamespaceConfig> attach;
public transient Consumer<NamespaceConfig> detach;

public final String namespace;
public final String name;
public final String qname;
Expand All @@ -44,7 +51,7 @@ public class BindingConfig
public final List<CatalogedConfig> catalogs;
public final List<RouteConfig> routes;
public final TelemetryRefConfig telemetryRef;
public final List<NamespaceConfig> composites;
public final ConcurrentMap<String, NamespaceConfig> composites;

public static BindingConfigBuilder<BindingConfig> builder()
{
Expand All @@ -71,7 +78,7 @@ public static BindingConfigBuilder<BindingConfig> builder(
.catalogs(binding.catalogs)
.routes(binding.routes)
.telemetry(binding.telemetryRef)
.composites(binding.composites);
.composites(binding.composites.values());
}

BindingConfig(
Expand All @@ -85,7 +92,7 @@ public static BindingConfigBuilder<BindingConfig> builder(
List<CatalogedConfig> catalogs,
List<RouteConfig> routes,
TelemetryRefConfig telemetryRef,
List<NamespaceConfig> namespaces)
ConcurrentMap<String, NamespaceConfig> composites)
{
this.namespace = requireNonNull(namespace);
this.name = requireNonNull(name);
Expand All @@ -98,6 +105,6 @@ public static BindingConfigBuilder<BindingConfig> builder(
this.routes = routes;
this.catalogs = catalogs;
this.telemetryRef = telemetryRef;
this.composites = namespaces;
this.composites = composites;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

import static java.util.Collections.emptyList;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

public final class BindingConfigBuilder<T> extends ConfigBuilder<T, BindingConfigBuilder<T>>
Expand Down Expand Up @@ -198,7 +201,7 @@ public BindingConfigBuilder<T> composite(
}

public BindingConfigBuilder<T> composites(
List<NamespaceConfig> composites)
Collection<NamespaceConfig> composites)
{
composites.forEach(this::composite);
return this;
Expand All @@ -225,6 +228,14 @@ public T build()
Optional.ofNullable(catalogs).orElse(CATALOGS_DEFAULT),
Optional.ofNullable(routes).orElse(ROUTES_DEFAULT),
telemetryRef,
Optional.ofNullable(composites).orElse(COMPOSITES_DEFAULT)));
asConcurrentMap(Optional.ofNullable(composites).orElse(COMPOSITES_DEFAULT))));
}

private static ConcurrentMap<String, NamespaceConfig> asConcurrentMap(
List<NamespaceConfig> namespaces)
{
ConcurrentMap<String, NamespaceConfig> composites = new ConcurrentHashMap<>();
namespaces.forEach(n -> composites.put(n.name, n));
return composites;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.function.Function.identity;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class NamespaceConfig
Expand All @@ -33,6 +34,8 @@ public class NamespaceConfig
public final List<VaultConfig> vaults;
public final List<CatalogConfig> catalogs;

public final AtomicInteger refs;

public static NamespaceConfigBuilder<NamespaceConfig> builder()
{
return new NamespaceConfigBuilder<>(identity());
Expand All @@ -52,5 +55,6 @@ public static NamespaceConfigBuilder<NamespaceConfig> builder()
this.guards = requireNonNull(guards);
this.vaults = requireNonNull(vaults);
this.catalogs = requireNonNull(catalogs);
this.refs = new AtomicInteger();
}
}
Loading