Skip to content

Commit d96b9f6

Browse files
authored
Support engine events and detect config watcher failed (#1107)
1 parent d9ffaea commit d96b9f6

File tree

17 files changed

+457
-37
lines changed

17 files changed

+457
-37
lines changed

runtime/engine/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,26 @@
8282
<artifactId>filesystem-http</artifactId>
8383
<scope>test</scope>
8484
</dependency>
85+
<dependency>
86+
<groupId>org.jboss.byteman</groupId>
87+
<artifactId>byteman</artifactId>
88+
<scope>test</scope>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.jboss.byteman</groupId>
92+
<artifactId>byteman-submit</artifactId>
93+
<scope>test</scope>
94+
</dependency>
95+
<dependency>
96+
<groupId>org.jboss.byteman</groupId>
97+
<artifactId>byteman-install</artifactId>
98+
<scope>test</scope>
99+
</dependency>
100+
<dependency>
101+
<groupId>org.jboss.byteman</groupId>
102+
<artifactId>byteman-bmunit</artifactId>
103+
<scope>test</scope>
104+
</dependency>
85105
<dependency>
86106
<groupId>org.jmock</groupId>
87107
<artifactId>jmock-junit4</artifactId>
@@ -231,6 +251,10 @@
231251
<groupId>org.apache.maven.plugins</groupId>
232252
<artifactId>maven-compiler-plugin</artifactId>
233253
</plugin>
254+
<plugin>
255+
<groupId>org.jboss.byteman</groupId>
256+
<artifactId>byteman-rulecheck-maven-plugin</artifactId>
257+
</plugin>
234258
<plugin>
235259
<groupId>org.apache.maven.plugins</groupId>
236260
<artifactId>maven-surefire-plugin</artifactId>

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.agrona.LangUtil.rethrowUnchecked;
2222

2323
import java.net.URL;
24+
import java.time.Clock;
2425
import java.util.ArrayList;
2526
import java.util.Collection;
2627
import java.util.List;
@@ -56,6 +57,7 @@
5657
import io.aklivity.zilla.runtime.engine.internal.Info;
5758
import io.aklivity.zilla.runtime.engine.internal.LabelManager;
5859
import io.aklivity.zilla.runtime.engine.internal.Tuning;
60+
import io.aklivity.zilla.runtime.engine.internal.event.EngineEventContext;
5961
import io.aklivity.zilla.runtime.engine.internal.layouts.EventsLayout;
6062
import io.aklivity.zilla.runtime.engine.internal.registry.EngineManager;
6163
import io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker;
@@ -68,6 +70,8 @@
6870

6971
public final class Engine implements Collector, AutoCloseable
7072
{
73+
public static final String NAME = "engine";
74+
7175
private final Collection<Binding> bindings;
7276
private final ExecutorService tasks;
7377
private final Tuning tuning;
@@ -82,6 +86,8 @@ public final class Engine implements Collector, AutoCloseable
8286
private final EngineConfiguration config;
8387
private final EngineManager manager;
8488

89+
private final EventsLayout eventsLayout;
90+
8591
Engine(
8692
EngineConfiguration config,
8793
Collection<Binding> bindings,
@@ -141,6 +147,11 @@ public final class Engine implements Collector, AutoCloseable
141147
}
142148
this.tuning = tuning;
143149

150+
this.eventsLayout = new EventsLayout.Builder()
151+
.path(config.directory().resolve("events"))
152+
.capacity(config.eventsBufferCapacity())
153+
.build();
154+
144155
List<EngineWorker> workers = new ArrayList<>(workerCount);
145156
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
146157
{
@@ -174,6 +185,8 @@ public final class Engine implements Collector, AutoCloseable
174185
final Map<String, Guard> guardsByType = guards.stream()
175186
.collect(Collectors.toMap(g -> g.name(), g -> g));
176187

188+
EngineEventContext events = new EngineEventContext(this);
189+
177190
EngineManager manager = new EngineManager(
178191
schemaTypes,
179192
bindingsByType::get,
@@ -186,6 +199,7 @@ public final class Engine implements Collector, AutoCloseable
186199
logger,
187200
context,
188201
config,
202+
events,
189203
extensions);
190204

191205
this.bindings = bindings;
@@ -273,6 +287,21 @@ public ContextImpl context()
273287
return context;
274288
}
275289

290+
public EventsLayout.EventAccessor createEventAccessor()
291+
{
292+
return eventsLayout.createEventAccessor();
293+
}
294+
295+
public MessageConsumer supplyEventWriter()
296+
{
297+
return this.eventsLayout::writeEvent;
298+
}
299+
300+
public Clock clock()
301+
{
302+
return Clock.systemUTC();
303+
}
304+
276305
public static EngineBuilder builder()
277306
{
278307
return new EngineBuilder();
@@ -441,20 +470,30 @@ public int supplyLabelId(
441470
return worker.supplyTypeId(label);
442471
}
443472

473+
public long supplyNamespacedId(
474+
String namespace,
475+
String name)
476+
{
477+
final int namespaceId = supplyLabelId(namespace);
478+
final int bindingId = supplyLabelId(name);
479+
return NamespacedId.id(namespaceId, bindingId);
480+
}
481+
444482
private final class EventReader implements MessageReader
445483
{
446484
private final EventsLayout.EventAccessor[] accessors;
447485
private final EventFW eventRO = new EventFW();
448-
private int minWorkerIndex;
486+
private int minAccessorIndex;
449487
private long minTimeStamp;
450488

451489
EventReader()
452490
{
453-
accessors = new EventsLayout.EventAccessor[workers.size()];
491+
accessors = new EventsLayout.EventAccessor[workers.size() + 1];
454492
for (int i = 0; i < workers.size(); i++)
455493
{
456494
accessors[i] = workers.get(i).createEventAccessor();
457495
}
496+
accessors[workers.size()] = createEventAccessor();
458497
}
459498

460499
@Override
@@ -467,26 +506,26 @@ public int read(
467506
while (!empty && messagesRead < messageCountLimit)
468507
{
469508
int eventCount = 0;
470-
minWorkerIndex = 0;
509+
minAccessorIndex = 0;
471510
minTimeStamp = Long.MAX_VALUE;
472-
for (int j = 0; j < workers.size(); j++)
511+
for (int j = 0; j < accessors.length; j++)
473512
{
474-
final int workerIndex = j;
475-
int eventPeeked = accessors[workerIndex].peekEvent((m, b, i, l) ->
513+
final int accessorIndex = j;
514+
int eventPeeked = accessors[accessorIndex].peekEvent((m, b, i, l) ->
476515
{
477516
eventRO.wrap(b, i, i + l);
478517
if (eventRO.timestamp() < minTimeStamp)
479518
{
480519
minTimeStamp = eventRO.timestamp();
481-
minWorkerIndex = workerIndex;
520+
minAccessorIndex = accessorIndex;
482521
}
483522
});
484523
eventCount += eventPeeked;
485524
}
486525
empty = eventCount == 0;
487526
if (!empty)
488527
{
489-
messagesRead += accessors[minWorkerIndex].readEvent(handler, 1);
528+
messagesRead += accessors[minAccessorIndex].readEvent(handler, 1);
490529
}
491530
}
492531
return messagesRead;

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class EngineConfiguration extends Configuration
4646

4747
public static final PropertyDef<URL> ENGINE_CONFIG_URL;
4848
public static final PropertyDef<URI> ENGINE_CONFIG_URI;
49+
public static final BooleanPropertyDef ENGINE_CONFIG_WATCH;
4950
public static final IntPropertyDef ENGINE_CONFIG_POLL_INTERVAL_SECONDS;
5051
public static final PropertyDef<String> ENGINE_NAME;
5152
public static final PropertyDef<String> ENGINE_DIRECTORY;
@@ -85,6 +86,7 @@ public class EngineConfiguration extends Configuration
8586
ENGINE_CONFIG_URL = config.property(URL.class, "config.url", EngineConfiguration::configURL, "file:zilla.yaml");
8687
ENGINE_CONFIG_URI = config.property(URI.class, "config.uri", EngineConfiguration::decodeConfigURI,
8788
EngineConfiguration::defaultConfigURI);
89+
ENGINE_CONFIG_WATCH = config.property("config.watch", true);
8890
ENGINE_CONFIG_POLL_INTERVAL_SECONDS = config.property("config.poll.interval.seconds", 60);
8991
ENGINE_NAME = config.property("name", EngineConfiguration::defaultName);
9092
ENGINE_DIRECTORY = config.property("directory", EngineConfiguration::defaultDirectory);
@@ -156,6 +158,11 @@ public URI configURI()
156158
return ENGINE_CONFIG_URI.get(this);
157159
}
158160

161+
public boolean configWatch()
162+
{
163+
return ENGINE_CONFIG_WATCH.get(this);
164+
}
165+
159166
public int configPollIntervalSeconds()
160167
{
161168
return ENGINE_CONFIG_POLL_INTERVAL_SECONDS.getAsInt(this);
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2021-2023 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.internal.event;
17+
18+
import static io.aklivity.zilla.runtime.engine.internal.types.event.EngineEventType.CONFIG_WATCHER_FAILED;
19+
20+
import java.nio.ByteBuffer;
21+
import java.time.Clock;
22+
23+
import org.agrona.MutableDirectBuffer;
24+
import org.agrona.concurrent.UnsafeBuffer;
25+
26+
import io.aklivity.zilla.runtime.engine.Engine;
27+
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
28+
import io.aklivity.zilla.runtime.engine.internal.types.event.EngineEventExFW;
29+
import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW;
30+
31+
public final class EngineEventContext
32+
{
33+
private static final int EVENT_BUFFER_CAPACITY = 1024;
34+
35+
private final MutableDirectBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
36+
private final MutableDirectBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
37+
38+
private final EventFW.Builder eventRW = new EventFW.Builder();
39+
private final EngineEventExFW.Builder eventExRW = new EngineEventExFW.Builder();
40+
41+
private final long engineId;
42+
private final int engineTypeId;
43+
private final int configWatcherFailedEventId;
44+
private final MessageConsumer eventWriter;
45+
private final Clock clock;
46+
47+
public EngineEventContext(
48+
Engine engine)
49+
{
50+
this.engineId = engine.supplyNamespacedId(Engine.NAME, "events");
51+
this.engineTypeId = engine.supplyLabelId(Engine.NAME);
52+
this.configWatcherFailedEventId = engine.supplyLabelId("engine.config.watcher.failed");
53+
this.eventWriter = engine.supplyEventWriter();
54+
this.clock = engine.clock();
55+
}
56+
57+
public void configWatcherFailed(
58+
long traceId,
59+
String reason)
60+
{
61+
EngineEventExFW extension = eventExRW
62+
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
63+
.configWatcherFailed(e -> e
64+
.typeId(CONFIG_WATCHER_FAILED.value())
65+
.reason(reason)
66+
)
67+
.build();
68+
69+
EventFW event = eventRW
70+
.wrap(eventBuffer, 0, eventBuffer.capacity())
71+
.id(configWatcherFailedEventId)
72+
.timestamp(clock.millis())
73+
.traceId(traceId)
74+
.namespacedId(engineId)
75+
.extension(extension.buffer(), extension.offset(), extension.limit())
76+
.build();
77+
78+
eventWriter.accept(engineTypeId, event.buffer(), event.offset(), event.limit());
79+
}
80+
81+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2021-2023 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.engine.internal.event;
17+
18+
import org.agrona.DirectBuffer;
19+
20+
import io.aklivity.zilla.runtime.engine.Configuration;
21+
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;
22+
import io.aklivity.zilla.runtime.engine.internal.types.event.EngineConfigWatcherFailedExFW;
23+
import io.aklivity.zilla.runtime.engine.internal.types.event.EngineEventExFW;
24+
import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW;
25+
26+
public final class EngineEventFormatter implements EventFormatterSpi
27+
{
28+
private static final String CONFIG_WATCHER_FAILED_FORMAT =
29+
"Dynamic config reloading is disabled.";
30+
private static final String CONFIG_WATCHER_FAILED_WITH_REASON_FORMAT =
31+
CONFIG_WATCHER_FAILED_FORMAT + " %s.";
32+
33+
private final EventFW eventRO = new EventFW();
34+
private final EngineEventExFW eventExRO = new EngineEventExFW();
35+
36+
EngineEventFormatter(
37+
Configuration config)
38+
{
39+
}
40+
41+
public String format(
42+
DirectBuffer buffer,
43+
int index,
44+
int length)
45+
{
46+
final EventFW event = eventRO.wrap(buffer, index, index + length);
47+
final EngineEventExFW extension = eventExRO
48+
.wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit());
49+
50+
String text = null;
51+
switch (extension.kind())
52+
{
53+
case CONFIG_WATCHER_FAILED:
54+
EngineConfigWatcherFailedExFW configWatcherFailed = extension.configWatcherFailed();
55+
String reason = configWatcherFailed.reason().asString();
56+
String format = reason != null
57+
? CONFIG_WATCHER_FAILED_WITH_REASON_FORMAT
58+
: CONFIG_WATCHER_FAILED_FORMAT;
59+
text = String.format(format, reason);
60+
break;
61+
}
62+
63+
return text;
64+
}
65+
}

0 commit comments

Comments
 (0)