Skip to content

Commit ab2b1ca

Browse files
committed
Client done?
1 parent 079c781 commit ab2b1ca

File tree

18 files changed

+315
-69
lines changed

18 files changed

+315
-69
lines changed

incubator/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/config/kafka/asyncapi.yaml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,28 @@ servers:
2626
protocol: kafka
2727

2828
operations:
29-
onUserSignedUp:
29+
onSensorData:
3030
action: receive
3131
channel:
32-
$ref: '#/channels/userSignedUp'
32+
$ref: '#/channels/sensorData'
3333

3434
channels:
35-
userSignedUp:
36-
description: This channel contains a message per each user who signs up in our application.
37-
address: user_signedup
35+
sensorData:
36+
description: This channel contains a message for sensors.
37+
address: sensors
3838
messages:
39-
userSignedUp:
40-
$ref: '#/components/messages/userSignedUp'
39+
sensorData:
40+
$ref: '#/components/messages/sensorData'
4141

4242
components:
4343
messages:
44-
userSignedUp:
44+
sensorData:
4545
payload:
4646
type: object
4747
properties:
48-
userId:
48+
sensorId:
4949
type: integer
50-
description: This property describes the id of the user
51-
userEmail:
50+
description: This property describes the id of the sensor
51+
message:
5252
type: string
53-
description: This property describes the email of the user
53+
description: This property describes message of the sensor
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#
2+
# Copyright 2021-2023 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
property deltaMillis 0L
18+
property newTimestamp ${kafka:timestamp() + deltaMillis}
19+
20+
connect "zilla://streams/asyncapi0"
21+
option zilla:window 8192
22+
option zilla:transmission "half-duplex"
23+
24+
write zilla:begin.ext ${asyncapi:beginEx()
25+
.typeId(zilla:id("asyncapi"))
26+
.extension(kafka:beginEx()
27+
.typeId(zilla:id("kafka"))
28+
.merged()
29+
.capabilities("PRODUCE_ONLY")
30+
.topic("test")
31+
.ackMode("LEADER_ONLY")
32+
.build()
33+
.build())
34+
.build()}
35+
36+
connected
37+
38+
write zilla:data.ext ${kafka:dataEx()
39+
.typeId(zilla:id("kafka"))
40+
.merged()
41+
.produce()
42+
.timestamp(newTimestamp)
43+
.partition(0, 1)
44+
.build()
45+
.build()}
46+
write "Hello, world"
47+
write flush
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Copyright 2021-2023 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
accept "zilla://streams/asyncapi0"
18+
option zilla:window 8192
19+
option zilla:transmission "duplex"
20+
21+
accepted
22+
23+
read zilla:begin.ext ${asyncapi:matchBeginEx()
24+
.typeId(zilla:id("asyncapi"))
25+
.extension(kafka:beginEx()
26+
.typeId(zilla:id("kafka"))
27+
.merged()
28+
.capabilities("PRODUCE_ONLY")
29+
.topic("test")
30+
.ackMode("LEADER_ONLY")
31+
.build()
32+
.build())
33+
.build()}
34+
35+
connected
36+
37+
read zilla:data.ext ${kafka:matchDataEx()
38+
.typeId(zilla:id("kafka"))
39+
.merged()
40+
.produce()
41+
.partition(0, 1)
42+
.build()
43+
.build()}
44+
read "Hello, world"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#
2+
# Copyright 2021-2023 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
property deltaMillis 0L
18+
property newTimestamp ${kafka:timestamp() + deltaMillis}
19+
20+
connect "zilla://streams/composite0"
21+
option zilla:window 8192
22+
option zilla:transmission "half-duplex"
23+
option zilla:ephemeral "test:composite0/kafka"
24+
25+
write zilla:begin.ext ${kafka:beginEx()
26+
.typeId(zilla:id("kafka"))
27+
.merged()
28+
.capabilities("PRODUCE_ONLY")
29+
.topic("test")
30+
.ackMode("LEADER_ONLY")
31+
.build()
32+
.build()}
33+
34+
connected
35+
36+
write zilla:data.ext ${kafka:dataEx()
37+
.typeId(zilla:id("kafka"))
38+
.merged()
39+
.produce()
40+
.timestamp(newTimestamp)
41+
.partition(0, 1)
42+
.build()
43+
.build()}
44+
write "Hello, world"
45+
write flush
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#
2+
# Copyright 2021-2023 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
property serverAddress "zilla://streams/composite0"
18+
19+
accept ${serverAddress}
20+
option zilla:window 16
21+
option zilla:transmission "half-duplex"
22+
23+
accepted
24+
25+
read zilla:begin.ext ${kafka:beginEx()
26+
.typeId(zilla:id("kafka"))
27+
.merged()
28+
.capabilities("PRODUCE_ONLY")
29+
.topic("test")
30+
.ackMode("LEADER_ONLY")
31+
.build()
32+
.build()}
33+
34+
connected
35+
36+
read zilla:data.ext ${kafka:matchDataEx()
37+
.typeId(zilla:id("kafka"))
38+
.merged()
39+
.produce()
40+
.partition(0, 1)
41+
.build()
42+
.build()}
43+
read "Hello, world"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* License for the specific language governing permissions and limitations
1414
* under the License.
1515
*/
16-
package io.aklivity.zilla.specs.binding.asyncapi.streams.mqtt;
16+
package io.aklivity.zilla.specs.binding.asyncapi.streams;
1717

1818
import static java.util.concurrent.TimeUnit.SECONDS;
1919
import static org.junit.rules.RuleChain.outerRule;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2021-2023 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.specs.binding.asyncapi.streams;
17+
18+
import static java.util.concurrent.TimeUnit.SECONDS;
19+
import static org.junit.rules.RuleChain.outerRule;
20+
21+
import org.junit.Rule;
22+
import org.junit.Test;
23+
import org.junit.rules.DisableOnDebug;
24+
import org.junit.rules.TestRule;
25+
import org.junit.rules.Timeout;
26+
import org.kaazing.k3po.junit.annotation.Specification;
27+
import org.kaazing.k3po.junit.rules.K3poRule;
28+
29+
public class KafkaIT
30+
{
31+
private final K3poRule k3po = new K3poRule()
32+
.addScriptRoot("kafka", "io/aklivity/zilla/specs/binding/asyncapi/streams/kafka");
33+
34+
private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS));
35+
36+
@Rule
37+
public final TestRule chain = outerRule(k3po).around(timeout);
38+
39+
40+
@Test
41+
@Specification({
42+
"${kafka}/produce.message/client",
43+
"${kafka}/produce.message/server"
44+
})
45+
public void shouldProduceMessage() throws Exception
46+
{
47+
k3po.finish();
48+
}
49+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* License for the specific language governing permissions and limitations
1414
* under the License.
1515
*/
16-
package io.aklivity.zilla.specs.binding.asyncapi.streams.mqtt;
16+
package io.aklivity.zilla.specs.binding.asyncapi.streams;
1717

1818
import static java.util.concurrent.TimeUnit.SECONDS;
1919
import static org.junit.rules.RuleChain.outerRule;

incubator/binding-asyncapi.spec/src/test/java/io/aklivity/zilla/specs/binding/asyncapi/streams/asyncapi/AsyncapiIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,14 @@ public void shouldCreateItem() throws Exception
5555
{
5656
k3po.finish();
5757
}
58+
59+
@Test
60+
@Specification({
61+
"${asyncapi}/produce.message/client",
62+
"${asyncapi}/produce.message/server"
63+
})
64+
public void shouldProduceMessage() throws Exception
65+
{
66+
k3po.finish();
67+
}
5868
}

incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected AsyncapiProtocol resolveProtocol(
3939
String protocol,
4040
AsyncapiOptionsConfig options)
4141
{
42-
Pattern pattern = Pattern.compile("(http|mqtt)");
42+
Pattern pattern = Pattern.compile("(http|mqtt|kafka)");
4343
Matcher matcher = pattern.matcher(protocol);
4444
AsyncapiProtocol asyncapiProtocol = null;
4545
if (matcher.find())

0 commit comments

Comments
 (0)