Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions runtime/engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,26 @@
<artifactId>filesystem-http</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-submit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-install</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-bmunit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
Expand Down Expand Up @@ -231,6 +251,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman-rulecheck-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.agrona.LangUtil.rethrowUnchecked;

import java.net.URL;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -56,6 +57,7 @@
import io.aklivity.zilla.runtime.engine.internal.Info;
import io.aklivity.zilla.runtime.engine.internal.LabelManager;
import io.aklivity.zilla.runtime.engine.internal.Tuning;
import io.aklivity.zilla.runtime.engine.internal.event.EngineEventContext;
import io.aklivity.zilla.runtime.engine.internal.layouts.EventsLayout;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineManager;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker;
Expand All @@ -68,6 +70,8 @@

public final class Engine implements Collector, AutoCloseable
{
public static final String NAME = "engine";

private final Collection<Binding> bindings;
private final ExecutorService tasks;
private final Tuning tuning;
Expand All @@ -82,6 +86,8 @@ public final class Engine implements Collector, AutoCloseable
private final EngineConfiguration config;
private final EngineManager manager;

private final EventsLayout eventsLayout;

Engine(
EngineConfiguration config,
Collection<Binding> bindings,
Expand Down Expand Up @@ -141,6 +147,11 @@ public final class Engine implements Collector, AutoCloseable
}
this.tuning = tuning;

this.eventsLayout = new EventsLayout.Builder()
.path(config.directory().resolve("events"))
.capacity(config.eventsBufferCapacity())
.build();

List<EngineWorker> workers = new ArrayList<>(workerCount);
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
{
Expand Down Expand Up @@ -174,6 +185,8 @@ public final class Engine implements Collector, AutoCloseable
final Map<String, Guard> guardsByType = guards.stream()
.collect(Collectors.toMap(g -> g.name(), g -> g));

EngineEventContext events = new EngineEventContext(this);

EngineManager manager = new EngineManager(
schemaTypes,
bindingsByType::get,
Expand All @@ -186,6 +199,7 @@ public final class Engine implements Collector, AutoCloseable
logger,
context,
config,
events,
extensions);

this.bindings = bindings;
Expand Down Expand Up @@ -273,6 +287,21 @@ public ContextImpl context()
return context;
}

public EventsLayout.EventAccessor createEventAccessor()
{
return eventsLayout.createEventAccessor();
}

public MessageConsumer supplyEventWriter()
{
return this.eventsLayout::writeEvent;
}

public Clock clock()
{
return Clock.systemUTC();
}

public static EngineBuilder builder()
{
return new EngineBuilder();
Expand Down Expand Up @@ -441,20 +470,30 @@ public int supplyLabelId(
return worker.supplyTypeId(label);
}

public long supplyNamespacedId(
String namespace,
String name)
{
final int namespaceId = supplyLabelId(namespace);
final int bindingId = supplyLabelId(name);
return NamespacedId.id(namespaceId, bindingId);
}

private final class EventReader implements MessageReader
{
private final EventsLayout.EventAccessor[] accessors;
private final EventFW eventRO = new EventFW();
private int minWorkerIndex;
private int minAccessorIndex;
private long minTimeStamp;

EventReader()
{
accessors = new EventsLayout.EventAccessor[workers.size()];
accessors = new EventsLayout.EventAccessor[workers.size() + 1];
for (int i = 0; i < workers.size(); i++)
{
accessors[i] = workers.get(i).createEventAccessor();
}
accessors[workers.size()] = createEventAccessor();
}

@Override
Expand All @@ -467,26 +506,26 @@ public int read(
while (!empty && messagesRead < messageCountLimit)
{
int eventCount = 0;
minWorkerIndex = 0;
minAccessorIndex = 0;
minTimeStamp = Long.MAX_VALUE;
for (int j = 0; j < workers.size(); j++)
for (int j = 0; j < accessors.length; j++)
{
final int workerIndex = j;
int eventPeeked = accessors[workerIndex].peekEvent((m, b, i, l) ->
final int accessorIndex = j;
int eventPeeked = accessors[accessorIndex].peekEvent((m, b, i, l) ->
{
eventRO.wrap(b, i, i + l);
if (eventRO.timestamp() < minTimeStamp)
{
minTimeStamp = eventRO.timestamp();
minWorkerIndex = workerIndex;
minAccessorIndex = accessorIndex;
}
});
eventCount += eventPeeked;
}
empty = eventCount == 0;
if (!empty)
{
messagesRead += accessors[minWorkerIndex].readEvent(handler, 1);
messagesRead += accessors[minAccessorIndex].readEvent(handler, 1);
}
}
return messagesRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class EngineConfiguration extends Configuration

public static final PropertyDef<URL> ENGINE_CONFIG_URL;
public static final PropertyDef<URI> ENGINE_CONFIG_URI;
public static final BooleanPropertyDef ENGINE_CONFIG_WATCH;
public static final IntPropertyDef ENGINE_CONFIG_POLL_INTERVAL_SECONDS;
public static final PropertyDef<String> ENGINE_NAME;
public static final PropertyDef<String> ENGINE_DIRECTORY;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class EngineConfiguration extends Configuration
ENGINE_CONFIG_URL = config.property(URL.class, "config.url", EngineConfiguration::configURL, "file:zilla.yaml");
ENGINE_CONFIG_URI = config.property(URI.class, "config.uri", EngineConfiguration::decodeConfigURI,
EngineConfiguration::defaultConfigURI);
ENGINE_CONFIG_WATCH = config.property("config.watch", true);
ENGINE_CONFIG_POLL_INTERVAL_SECONDS = config.property("config.poll.interval.seconds", 60);
ENGINE_NAME = config.property("name", EngineConfiguration::defaultName);
ENGINE_DIRECTORY = config.property("directory", EngineConfiguration::defaultDirectory);
Expand Down Expand Up @@ -156,6 +158,11 @@ public URI configURI()
return ENGINE_CONFIG_URI.get(this);
}

public boolean configWatch()
{
return ENGINE_CONFIG_WATCH.get(this);
}

public int configPollIntervalSeconds()
{
return ENGINE_CONFIG_POLL_INTERVAL_SECONDS.getAsInt(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.engine.internal.event;

import static io.aklivity.zilla.runtime.engine.internal.types.event.EngineEventType.CONFIG_WATCHER_FAILED;

import java.nio.ByteBuffer;
import java.time.Clock;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.engine.Engine;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
import io.aklivity.zilla.runtime.engine.internal.types.event.EngineEventExFW;
import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW;

public final class EngineEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 1024;

private final MutableDirectBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final MutableDirectBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));

private final EventFW.Builder eventRW = new EventFW.Builder();
private final EngineEventExFW.Builder eventExRW = new EngineEventExFW.Builder();

private final long engineId;
private final int engineTypeId;
private final int configWatcherFailedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public EngineEventContext(
Engine engine)
{
this.engineId = engine.supplyNamespacedId(Engine.NAME, "events");
this.engineTypeId = engine.supplyLabelId(Engine.NAME);
this.configWatcherFailedEventId = engine.supplyLabelId("engine.config.watcher.failed");
this.eventWriter = engine.supplyEventWriter();
this.clock = engine.clock();
}

public void configWatcherFailed(
long traceId,
String reason)
{
EngineEventExFW extension = eventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.configWatcherFailed(e -> e
.typeId(CONFIG_WATCHER_FAILED.value())
.reason(reason)
)
.build();

EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(configWatcherFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(engineId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();

eventWriter.accept(engineTypeId, event.buffer(), event.offset(), event.limit());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.engine.internal.event;

import org.agrona.DirectBuffer;

import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;
import io.aklivity.zilla.runtime.engine.internal.types.event.EngineConfigWatcherFailedExFW;
import io.aklivity.zilla.runtime.engine.internal.types.event.EngineEventExFW;
import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW;

public final class EngineEventFormatter implements EventFormatterSpi
{
private static final String CONFIG_WATCHER_FAILED_FORMAT =
"Dynamic config reloading is disabled.";
private static final String CONFIG_WATCHER_FAILED_WITH_REASON_FORMAT =
CONFIG_WATCHER_FAILED_FORMAT + " %s.";

private final EventFW eventRO = new EventFW();
private final EngineEventExFW eventExRO = new EngineEventExFW();

EngineEventFormatter(
Configuration config)
{
}

public String format(
DirectBuffer buffer,
int index,
int length)
{
final EventFW event = eventRO.wrap(buffer, index, index + length);
final EngineEventExFW extension = eventExRO
.wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit());

String text = null;
switch (extension.kind())
{
case CONFIG_WATCHER_FAILED:
EngineConfigWatcherFailedExFW configWatcherFailed = extension.configWatcherFailed();
String reason = configWatcherFailed.reason().asString();
String format = reason != null
? CONFIG_WATCHER_FAILED_WITH_REASON_FORMAT
: CONFIG_WATCHER_FAILED_FORMAT;
text = String.format(format, reason);
break;
}

return text;
}
}
Loading