Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
if: success()
with:
path: ~/docker-cache/docker.tar
key: docker-${{ runner.os }}-zilla-develop-SNAPSHOT
key: zilla-develop-SNAPSHOT-${{ github.run_id }}

- name: Conditional Artifact Upload
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -83,7 +83,6 @@ jobs:
^\.assets$
^\.git$
^\.vscode$
^\asyncapi.sse.proxy$
^\asyncapi.sse.kafka.proxy$

testing:
Expand All @@ -106,7 +105,7 @@ jobs:
uses: actions/cache@v4
with:
path: ~/docker-cache/docker.tar
key: docker-${{ runner.os }}-zilla-develop-SNAPSHOT
key: zilla-develop-SNAPSHOT-${{ github.run_id }}

- name: Load cached Docker image
run: |
Expand Down
15 changes: 5 additions & 10 deletions examples/asyncapi.sse.proxy/.github/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ EXIT=0
PORT="7114"
INPUT_GOOD='{ "name": "event name", "data": { "id": 1, "name": "Hello World!" } }'
INPUT_BAD='{ "name": "event name", "data": { "id": -1, "name": "Hello bad World!" } }'
EXPECTED='data:{ "id": 1, "name": "Hello World!" }'
EXPECTED='data:{"id":1,"name":"Hello World!"}'
echo \# Testing asyncapi.sse.proxy/
echo PORT="$PORT"
echo INPUT_GOOD="$INPUT_GOOD"
Expand All @@ -19,13 +19,13 @@ echo
# send request to zilla
timeout 3s curl -N --http2 -H "Accept:text/event-stream" "http://localhost:$PORT/events/1" | tee .testoutput &

# push response to kafka with kafkacat
echo "$INPUT_GOOD" | nc -c localhost 7001
echo "$INPUT_BAD" | nc -c localhost 7001
# push events to sse server
echo "$INPUT_GOOD" | nc -w 1 localhost 7001
echo "$INPUT_BAD" | nc -w 1 localhost 7001

# fetch the output of zilla request; try 5 times
for i in $(seq 0 2); do
sleep $i
sleep 5
OUTPUT=$(cat .testoutput | grep "^data:")
if [ -n "$OUTPUT" ]; then
break
Expand All @@ -44,9 +44,4 @@ else
EXIT=1
fi

# TODO remove once fixed
echo '❌ curl: (52) Empty reply from server. Tested on main. and does not work with described instructions'
echo 'Refer: https://github.com/aklivity/zilla/issues/1417'
EXIT=1

exit $EXIT
1 change: 0 additions & 1 deletion examples/asyncapi.sse.proxy/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ services:
ZILLA_INCUBATOR_ENABLED: "true"
volumes:
- ./etc:/etc/zilla
- ./sse-asyncapi.yaml:/etc/zilla/specs/sse-asyncapi.yaml
command: start -v -e

sse-server:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ info:
servers:
plain:
host: localhost:7114
protocol: sse
protocol: http
defaultContentType: application/json

channels:
Expand All @@ -27,6 +27,9 @@ operations:
action: receive
channel:
$ref: "#/channels/showEventById"
bindings:
x-zilla-sse:
method: "GET"

components:
schemas:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import io.aklivity.zilla.runtime.binding.mqtt.config.MqttTopicConfigBuilder;
import io.aklivity.zilla.runtime.binding.mqtt.config.MqttWithConfig;
import io.aklivity.zilla.runtime.binding.sse.config.SseConditionConfig;
import io.aklivity.zilla.runtime.binding.sse.config.SseOptionsConfig;
import io.aklivity.zilla.runtime.binding.sse.config.SseOptionsConfigBuilder;
import io.aklivity.zilla.runtime.binding.sse.config.SsePathConfigBuilder;
import io.aklivity.zilla.runtime.binding.sse.config.SseWithConfig;
import io.aklivity.zilla.runtime.binding.tcp.config.TcpConditionConfig;
import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
Expand Down Expand Up @@ -457,11 +460,52 @@ private <C> NamespaceConfigBuilder<C> injectSseServer(
.name("sse_server0")
.type("sse")
.kind(SERVER)
.options(SseOptionsConfig::builder)
.inject(this::injectSseRequests)
.build()
.inject(this::injectSseRoutes)
.inject(this::injectMetrics)
.build();
}

private <C> SseOptionsConfigBuilder<C> injectSseRequests(
SseOptionsConfigBuilder<C> options)
{
Stream.of(schema)
.map(s -> s.asyncapi)
.flatMap(v -> v.operations.values().stream())
.filter(AsyncapiOperationView::hasBindingsSse)
.filter(AsyncapiOperationView::hasMessagesOrParameters)
.forEach(operation ->
{
options
.request()
.path(operation.channel.address.replaceAll(REGEX_ADDRESS_PARAMETER, "*"))
.inject(request -> injectSseContent(request, operation))
.build();
});

return options;
}

private <C> SsePathConfigBuilder<C> injectSseContent(
SsePathConfigBuilder<C> request,
AsyncapiOperationView operation)
{
if (operation.channel.hasMessages())
{
request
.content(JsonModelConfig::builder)
.catalog()
.name("catalog0")
.inject(cataloged -> injectHttpContentSchemas(cataloged, operation))
.build()
.build();
}

return request;
}

private <C>BindingConfigBuilder<C> injectSseRoutes(
BindingConfigBuilder<C> binding)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ write zilla:begin.ext ${asyncapi:beginEx()
.build()}
connected

read "Hello, world"
read '{ "id": 1, "name": "Hello World!" }'

read "Hello, again"
read '{ "id": 1, "name": "Hello World!" }'
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ read zilla:begin.ext ${asyncapi:matchBeginEx()
.build()}
connected

write "Hello, world"
write '{ "id": 1, "name": "Hello World!" }'

write "Hello, again"
write '{ "id": 1, "name": "Hello World!" }'
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ write close
read http:status "200" "OK"
read http:header "content-type" "text/event-stream"

read "data:Hello, world\n"
read 'data:{ "id": 1, "name": "Hello World!" }\n'
"\n"

read "data:Hello, again\n"
read 'data:{ "id": 1, "name": "Hello World!" }\n'
"\n"
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ write http:status "200" "OK"
write http:header "content-type" "text/event-stream"
write http:header "transfer-encoding" "chunked"

write "data:Hello, world\n"
write 'data:{ "id": 1, "name": "Hello World!" }\n'
"\n"
write flush

write "data:Hello, again\n"
write 'data:{ "id": 1, "name": "Hello World!" }\n'
"\n"
write flush
Loading