Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,69 @@ final class TopologyParserContext {
final Mermaid mermaid = new Mermaid();

void addSubTopology(String subTopology) {
subTopologies.add(subTopology);
graphviz.addSubTopology(subTopology);
mermaid.addSubTopology(subTopology);
final var sanitizedSubTopology = sanitize(subTopology);
subTopologies.add(sanitizedSubTopology);
graphviz.addSubTopology(sanitizedSubTopology);
mermaid.addSubTopology(sanitizedSubTopology);
}

void addSink(String sink, String topic) {
sinks.add(topic);
currentNode = sink;
graphviz.addSink(sink, topic);
mermaid.addSink(sink, topic);
final var sanitizedTopic = sanitize(topic);
sinks.add(sanitizedTopic);
final var sanitizedSink = sanitize(sink);
currentNode = sanitize(sanitizedSink);
graphviz.addSink(sanitizedSink, sanitizedTopic);
mermaid.addSink(sanitizedSink, sanitizedTopic);
}

void addSources(String source, String[] topics) {
currentNode = source;
currentNode = sanitize(source);
Arrays.stream(topics)
.map(String::trim).filter(topic -> !topic.isEmpty())
.forEachOrdered(topic -> {
sources.add(topic);
graphviz.addSource(source, topic);
mermaid.addSource(source, topic);
final var sanitizedTopic = sanitize(topic);
sources.add(sanitizedTopic);
graphviz.addSource(currentNode, sanitizedTopic);
mermaid.addSource(currentNode, sanitizedTopic);
});
}

void addRegexSource(String source, String regex) {
currentNode = source;
final var cleanRegex = regex.trim();
if (!cleanRegex.isEmpty()) {
sources.add(cleanRegex);
graphviz.addRegexSource(source, cleanRegex);
mermaid.addRegexSource(source, cleanRegex);
currentNode = sanitize(source);
final var sanitizedRegex = sanitize(regex);
if (!sanitizedRegex.isEmpty()) {
sources.add(sanitizedRegex);
graphviz.addRegexSource(currentNode, sanitizedRegex);
mermaid.addRegexSource(currentNode, sanitizedRegex);
}
}

void addStores(String[] stores, String processor, boolean join) {
currentNode = processor;
currentNode = sanitize(processor);
Arrays.stream(stores)
.map(String::trim).filter(store -> !store.isEmpty())
.forEachOrdered(store -> {
this.stores.add(store);
graphviz.addStore(store, currentNode, join);
mermaid.addStore(store, currentNode, join);
final var sanitizedStore = sanitize(store);
this.stores.add(sanitizedStore);
graphviz.addStore(sanitizedStore, currentNode, join);
mermaid.addStore(sanitizedStore, currentNode, join);
});
}

void addTargets(String[] targets) {
Arrays.stream(targets)
.map(String::trim).filter(target -> !("none".equals(target) || target.isEmpty()))
.forEachOrdered(target -> {
graphviz.addTarget(target, currentNode);
mermaid.addTarget(target, currentNode);
final var sanitizedTarget = sanitize(target);
graphviz.addTarget(sanitizedTarget, currentNode);
mermaid.addTarget(sanitizedTarget, currentNode);
});
}

private static String sanitize(String name) {
return name != null ? name.trim().replaceAll("\"", "") : null;
}

static final class Graphviz {
String currentGraph = "";
final List<String> nodes = new ArrayList<>();
Expand Down Expand Up @@ -138,7 +148,7 @@ private void addStore(String store, String node, boolean join) {
}

private static String toId(String name) {
return name.replaceAll("-", "_");
return '\"' + name + '\"';
}

private static String toLabel(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void shouldParsingStayConstant() {
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000001\n"
+ "Sub-topology: 1\n"
+ " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature-values])\n"
+ " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature.values])\n"
+ " --> KSTREAM-LEFTJOIN-0000000004\n"
+ " Processor: KSTREAM-LEFTJOIN-0000000004 (stores: [])\n"
+ " --> KSTREAM-AGGREGATE-0000000005\n"
Expand All @@ -43,62 +43,62 @@ public void shouldParsingStayConstant() {

assertEquals(expectedDescribe, actual.getString("describe"));
assertEquals("[0, 1, 2]", actual.getString("subTopologies"));
assertEquals("[notification\\..+, temperature-values, weather-stations]", actual.getString("sources"));
assertEquals("[notification\\..+, temperature.values, weather-stations]", actual.getString("sources"));
assertEquals("[temperatures-aggregated]", actual.getString("sinks"));
assertEquals("[weather-stations-STATE-STORE-0000000000, weather-stations-store]", actual.getString("stores"));
assertEquals("digraph {\n"
+ " fontname=Helvetica; fontsize=10;\n"
+ " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];\n"
+ " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n"
+ " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n"
+ " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
+ " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n"
+ " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n"
+ " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n"
+ " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n"
+ " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n"
+ " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n"
+ " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n"
+ " \"weather-stations\" [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n"
+ " \"KSTREAM-SOURCE-0000000001\" [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n"
+ " \"KTABLE-SOURCE-0000000002\" [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n"
+ " \"weather-stations-STATE-STORE-0000000000\" [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
+ " \"temperature.values\" [label=\"temperature.values\" shape=invhouse margin=\"0,0\"];\n"
+ " \"KSTREAM-SOURCE-0000000003\" [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n"
+ " \"KSTREAM-LEFTJOIN-0000000004\" [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n"
+ " \"KSTREAM-AGGREGATE-0000000005\" [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n"
+ " \"weather-stations-store\" [label=\"weather\\nstations\\nstore\" shape=cylinder];\n"
+ " \"KTABLE-TOSTREAM-0000000006\" [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n"
+ " \"KSTREAM-SINK-0000000007\" [label=\"KSTREAM\\nSINK\\n0000000007\"];\n"
+ " \"temperatures-aggregated\" [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n"
+ " REGEX_12 [label=\"notification\\\\..+\" shape=invhouse style=dashed margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000008 [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n"
+ " KSTREAM_FOREACH_0000000009 [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n"
+ " \"KSTREAM-SOURCE-0000000008\" [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n"
+ " \"KSTREAM-FOREACH-0000000009\" [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n"
+ " subgraph cluster0 {\n"
+ " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000001;\n"
+ " KTABLE_SOURCE_0000000002;\n"
+ " \"KSTREAM-SOURCE-0000000001\";\n"
+ " \"KTABLE-SOURCE-0000000002\";\n"
+ " }\n"
+ " subgraph cluster1 {\n"
+ " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000003;\n"
+ " KSTREAM_LEFTJOIN_0000000004;\n"
+ " KSTREAM_AGGREGATE_0000000005;\n"
+ " KTABLE_TOSTREAM_0000000006;\n"
+ " KSTREAM_SINK_0000000007;\n"
+ " \"KSTREAM-SOURCE-0000000003\";\n"
+ " \"KSTREAM-LEFTJOIN-0000000004\";\n"
+ " \"KSTREAM-AGGREGATE-0000000005\";\n"
+ " \"KTABLE-TOSTREAM-0000000006\";\n"
+ " \"KSTREAM-SINK-0000000007\";\n"
+ " }\n"
+ " subgraph cluster2 {\n"
+ " label=\"Sub-Topology: 2\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000008;\n"
+ " KSTREAM_FOREACH_0000000009;\n"
+ " \"KSTREAM-SOURCE-0000000008\";\n"
+ " \"KSTREAM-FOREACH-0000000009\";\n"
+ " }\n"
+ " weather_stations -> KSTREAM_SOURCE_0000000001;\n"
+ " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n"
+ " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n"
+ " temperature_values -> KSTREAM_SOURCE_0000000003;\n"
+ " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n"
+ " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n"
+ " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n"
+ " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n"
+ " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n"
+ " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n"
+ " REGEX_12 -> KSTREAM_SOURCE_0000000008;\n"
+ " KSTREAM_SOURCE_0000000008 -> KSTREAM_FOREACH_0000000009;\n"
+ " \"weather-stations\" -> \"KSTREAM-SOURCE-0000000001\";\n"
+ " \"KSTREAM-SOURCE-0000000001\" -> \"KTABLE-SOURCE-0000000002\";\n"
+ " \"KTABLE-SOURCE-0000000002\" -> \"weather-stations-STATE-STORE-0000000000\";\n"
+ " \"temperature.values\" -> \"KSTREAM-SOURCE-0000000003\";\n"
+ " \"KSTREAM-SOURCE-0000000003\" -> \"KSTREAM-LEFTJOIN-0000000004\";\n"
+ " \"KSTREAM-LEFTJOIN-0000000004\" -> \"KSTREAM-AGGREGATE-0000000005\";\n"
+ " \"KSTREAM-AGGREGATE-0000000005\" -> \"weather-stations-store\";\n"
+ " \"KSTREAM-AGGREGATE-0000000005\" -> \"KTABLE-TOSTREAM-0000000006\";\n"
+ " \"KTABLE-TOSTREAM-0000000006\" -> \"KSTREAM-SINK-0000000007\";\n"
+ " \"KSTREAM-SINK-0000000007\" -> \"temperatures-aggregated\";\n"
+ " REGEX_12 -> \"KSTREAM-SOURCE-0000000008\";\n"
+ " \"KSTREAM-SOURCE-0000000008\" -> \"KSTREAM-FOREACH-0000000009\";\n"
+ "}", actual.getString("graphviz"));
assertEquals("graph TD\n"
+ " weather-stations[weather-stations] --> KSTREAM-SOURCE-0000000001(KSTREAM-<br>SOURCE-<br>0000000001)\n"
+ " KTABLE-SOURCE-0000000002[KTABLE-<br>SOURCE-<br>0000000002] --> weather-stations-STATE-STORE-0000000000(weather-<br>stations-<br>STATE-<br>STORE-<br>0000000000)\n"
+ " temperature-values[temperature-values] --> KSTREAM-SOURCE-0000000003(KSTREAM-<br>SOURCE-<br>0000000003)\n"
+ " temperature.values[temperature.values] --> KSTREAM-SOURCE-0000000003(KSTREAM-<br>SOURCE-<br>0000000003)\n"
+ " KSTREAM-AGGREGATE-0000000005[KSTREAM-<br>AGGREGATE-<br>0000000005] --> weather-stations-store(weather-<br>stations-<br>store)\n"
+ " KSTREAM-SINK-0000000007[KSTREAM-<br>SINK-<br>0000000007] --> temperatures-aggregated(temperatures-aggregated)\n"
+ " REGEX_5[notification\\..+] --> KSTREAM-SOURCE-0000000008(KSTREAM-<br>SOURCE-<br>0000000008)\n"
Expand Down
Loading