Skip to content

Commit 0fa4118

Browse files
aahmed-sebsideup
authored andcommitted
Add Apache Pulsar Support (#713)
* Add Apache Pulsar Support * Reafctor container class * Change property configuration * Remove unused imports
1 parent 2fdf6e7 commit 0fa4118

File tree

5 files changed

+113
-0
lines changed

5 files changed

+113
-0
lines changed

core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public String getKafkaImage() {
5858
return (String) properties.getOrDefault("kafka.container.image", "confluentinc/cp-kafka");
5959
}
6060

61+
public String getPulsarImage() {
62+
return (String) properties.getOrDefault("pulsar.container.image", "apachepulsar/pulsar");
63+
}
64+
6165
public boolean isDisableChecks() {
6266
return Boolean.parseBoolean((String) properties.getOrDefault("checks.disable", "false"));
6367
}

modules/pulsar/build.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
description = "Testcontainers :: Pulsar"
2+
3+
dependencies {
4+
compile project(':testcontainers')
5+
6+
testCompile group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.0.0-rc1-incubating'
7+
testCompile group: 'org.assertj', name: 'assertj-core', version: '3.10.0'
8+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.testcontainers.containers;
2+
3+
import org.testcontainers.containers.wait.strategy.Wait;
4+
import org.testcontainers.utility.TestcontainersConfiguration;
5+
6+
/**
7+
* This container wraps Apache pulsar running in stanalone mode
8+
*/
9+
public class PulsarContainer extends GenericContainer<PulsarContainer> {
10+
11+
public static final int PULSAR_PORT = 6850;
12+
13+
public PulsarContainer() {
14+
this("2.0.0-rc1-incubating");
15+
}
16+
17+
public PulsarContainer(String pulsarVersion) {
18+
super(TestcontainersConfiguration.getInstance().getPulsarImage() + ":" + pulsarVersion);
19+
withExposedPorts(PULSAR_PORT);
20+
withCommand("/bin/bash", "-c", "" +
21+
"servicePort=6850 webServicePort=8280 webServicePortTls=8643 bin/apply-config-from-env.py conf/proxy.conf && " +
22+
"bin/pulsar standalone & " +
23+
"bin/pulsar proxy --zookeeper-servers localhost:2181 --global-zookeeper-servers localhost:2181"
24+
);
25+
26+
waitingFor(Wait.forLogMessage(".*messaging service is ready.*\\s", 1));
27+
}
28+
29+
public String getPulsarBrokerUrl() {
30+
return String.format("pulsar://%s:%s", this.getContainerIpAddress(), this.getFirstMappedPort());
31+
}
32+
33+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.testcontainers.containers;
2+
3+
import org.apache.pulsar.client.api.Consumer;
4+
import org.apache.pulsar.client.api.Message;
5+
import org.apache.pulsar.client.api.Producer;
6+
import org.apache.pulsar.client.api.PulsarClient;
7+
import org.junit.Test;
8+
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import static org.assertj.core.api.Assertions.assertThat;
13+
14+
public class PulsarContainerTest {
15+
16+
public static final String TEST_TOPIC = "test_topic";
17+
18+
@Test
19+
public void testUsage() throws Exception {
20+
try (PulsarContainer pulsar = new PulsarContainer()) {
21+
pulsar.start();
22+
testPulsarFunctionality(pulsar.getPulsarBrokerUrl());
23+
}
24+
}
25+
26+
protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
27+
28+
try (
29+
PulsarClient client = PulsarClient.builder()
30+
.serviceUrl(pulsarBrokerUrl)
31+
.build();
32+
Consumer consumer = client.newConsumer()
33+
.topic(TEST_TOPIC)
34+
.subscriptionName("test-subs")
35+
.subscribe();
36+
Producer<byte[]> producer = client.newProducer()
37+
.topic(TEST_TOPIC)
38+
.create()
39+
) {
40+
41+
producer.send("test containers".getBytes());
42+
CompletableFuture<Message> future = consumer.receiveAsync();
43+
Message message = future.get(5, TimeUnit.SECONDS);
44+
45+
assertThat(new String(message.getData()))
46+
.isEqualTo("test containers");
47+
}
48+
}
49+
50+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<configuration>
2+
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<!-- encoders are assigned the type
5+
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
6+
<encoder>
7+
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
8+
</encoder>
9+
</appender>
10+
11+
<root level="INFO">
12+
<appender-ref ref="STDOUT"/>
13+
</root>
14+
15+
<logger name="org.testcontainers" level="DEBUG"/>
16+
<logger name="org.testcontainers.shaded" level="WARN"/>
17+
18+
</configuration>

0 commit comments

Comments
 (0)