Skip to content

Commit d96f005

Browse files
Add sse, ws, fs extension parsing to dump command (#660)
1 parent 0218bfc commit d96f005

File tree

14 files changed

+1437
-101
lines changed

14 files changed

+1437
-101
lines changed

incubator/command-dump/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,24 @@
6767
<version>${project.version}</version>
6868
<scope>test</scope>
6969
</dependency>
70+
<dependency>
71+
<groupId>io.aklivity.zilla</groupId>
72+
<artifactId>binding-sse.spec</artifactId>
73+
<version>${project.version}</version>
74+
<scope>test</scope>
75+
</dependency>
76+
<dependency>
77+
<groupId>io.aklivity.zilla</groupId>
78+
<artifactId>binding-ws.spec</artifactId>
79+
<version>${project.version}</version>
80+
<scope>test</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>io.aklivity.zilla</groupId>
84+
<artifactId>binding-filesystem.spec</artifactId>
85+
<version>${project.version}</version>
86+
<scope>test</scope>
87+
</dependency>
7088
<dependency>
7189
<groupId>org.junit.jupiter</groupId>
7290
<artifactId>junit-jupiter-engine</artifactId>

incubator/command-dump/src/main/java/io/aklivity/zilla/runtime/command/dump/internal/airline/ZillaDumpCommand.java

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -299,51 +299,58 @@ public void run()
299299
.forEach(filtered::add);
300300
final LongPredicate filter = filtered.isEmpty() ? b -> true : filtered::contains;
301301

302-
try (Stream<Path> files = Files.walk(directory, 3);
303-
WritableByteChannel writer = Files.newByteChannel(output, CREATE, WRITE, TRUNCATE_EXISTING))
304-
{
305-
final RingBufferSpy[] streamBuffers = files
306-
.filter(this::isStreamsFile)
307-
.sorted()
308-
.peek(this::onDiscovered)
309-
.map(this::createStreamBuffer)
310-
.collect(Collectors.toList())
311-
.toArray(RingBufferSpy[]::new);
312-
final int streamBufferCount = streamBuffers.length;
313-
314-
final IdleStrategy idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS);
315-
final BindingsLayoutReader bindings = BindingsLayoutReader.builder().directory(directory).build();
316-
final DumpHandler[] dumpHandlers = new DumpHandler[streamBufferCount];
317-
for (int i = 0; i < streamBufferCount; i++)
302+
if (output != null)
303+
{
304+
try (Stream<Path> files = Files.walk(directory, 3);
305+
WritableByteChannel writer = Files.newByteChannel(output, CREATE, WRITE, TRUNCATE_EXISTING))
318306
{
319-
dumpHandlers[i] = new DumpHandler(i, filter, labels::lookupLabel, bindings.bindings()::get, writer);
320-
}
307+
final RingBufferSpy[] streamBuffers = files
308+
.filter(this::isStreamsFile)
309+
.sorted()
310+
.peek(this::onDiscovered)
311+
.map(this::createStreamBuffer)
312+
.collect(Collectors.toList())
313+
.toArray(RingBufferSpy[]::new);
314+
final int streamBufferCount = streamBuffers.length;
315+
316+
final IdleStrategy idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS);
317+
final BindingsLayoutReader bindings = BindingsLayoutReader.builder().directory(directory).build();
318+
final DumpHandler[] dumpHandlers = new DumpHandler[streamBufferCount];
319+
for (int i = 0; i < streamBufferCount; i++)
320+
{
321+
dumpHandlers[i] = new DumpHandler(i, filter, labels::lookupLabel, bindings.bindings()::get, writer);
322+
}
321323

322-
final MutableDirectBuffer buffer = writeBuffer;
323-
encodePcapGlobal(buffer);
324-
writePcapOutput(writer, buffer, 0, PCAP_GLOBAL_SIZE);
324+
final MutableDirectBuffer buffer = writeBuffer;
325+
encodePcapGlobal(buffer);
326+
writePcapOutput(writer, buffer, 0, PCAP_GLOBAL_SIZE);
325327

326-
final int exitWorkCount = continuous ? -1 : 0;
327-
int workCount;
328-
do
328+
final int exitWorkCount = continuous ? -1 : 0;
329+
int workCount;
330+
do
331+
{
332+
workCount = 0;
333+
for (int i = 0; i < streamBufferCount; i++)
334+
{
335+
final RingBufferSpy streamBuffer = streamBuffers[i];
336+
MessagePredicate spyHandler = dumpHandlers[i]::handleFrame;
337+
workCount += streamBuffer.spy(spyHandler, 1);
338+
}
339+
idleStrategy.idle(workCount);
340+
} while (workCount != exitWorkCount);
341+
}
342+
catch (Exception ex)
329343
{
330-
workCount = 0;
331-
for (int i = 0; i < streamBufferCount; i++)
344+
if (exceptions)
332345
{
333-
final RingBufferSpy streamBuffer = streamBuffers[i];
334-
MessagePredicate spyHandler = dumpHandlers[i]::handleFrame;
335-
workCount += streamBuffer.spy(spyHandler, 1);
346+
ex.printStackTrace();
336347
}
337-
idleStrategy.idle(workCount);
338-
} while (workCount != exitWorkCount);
348+
rethrowUnchecked(ex);
349+
}
339350
}
340-
catch (Exception ex)
351+
else if (verbose)
341352
{
342-
if (exceptions)
343-
{
344-
ex.printStackTrace();
345-
}
346-
rethrowUnchecked(ex);
353+
System.out.println("Output file not specified, exiting now.");
347354
}
348355
}
349356

0 commit comments

Comments
 (0)