The Vert.x kafka library allows asynchronous publishing and receiving of messages on Kafka topic through the vert.x event bus.
####To use this library you must have kafka and zookeeper up and running. Follow instructions at Kafka quick start guide
This is a multi-threaded worker library that consumes kafka messages and then re-broadcast them on an address on the vert.x event bus.
Add a dependency to vertx-kafka:
<dependency>
<groupId>com.cyngn.vertx</groupId>
<artifactId>vertx-kafka</artifactId>
<version>3.3.0-SNAPSHOT</version>
</dependency>| vertx-kafka | vert.x | kafka |
|---|---|---|
| 3.3.0-SNAPSHOT | 3.3.0-SNAPSHOT | 0.9.0 |
| 0.4.1 | 3.1.0 | 0.9.0 |
Listening for messages coming from a kafka broker.
{
"zookeeper.connect" : "<host1:2181,host:2181...>",
"group.id" : "<kafkaConsumerGroupId>",
"bootstrap.servers" "<host1:9092,host2:9092...>",
"backoff.increment.ms" : "<backTimeInMilli>",
"autooffset.reset" : "<kafkaAutoOffset>",
"topics" : ["<topic1>", "<topic2>"],
"eventbus.address" : "<default kafka.message.consumer>",
"consumer.poll.interval.ms" : <default 100 ms>
}For example:
{
"zookeeper.host" : "localhost:2181",
"group.id" : "testGroup",
"bootstrap.servers" "localhost:9092",
"backoff.increment.ms" : "100",
"autooffset.reset" : "smallest",
"topics" : ["testTopic"],
"eventbus.address" : "kafka.to.vertx.bridge",
"consumer.poll.interval.ms" : 1000
}Field breakdown:
zookeeper.connecta zookeeper connection string of form hostname1:port1,hostname2:port2,hostname3:port3/chroot/path used with your kafka clustersgroup.idthe kafka consumer group name that will be consuming related tobootstrap.serversthe list of initial kafka hosts to connect tobackoff.increment.msbackoff interval for contacting broker without messages in millisecondsautooffset.resethow to reset the offsettopicsthe kafka topics to listen foreventbus.addressthe vert.x address to publish messages onto when received form kafkaconsumer.poll.interval.mshow often to try and consume messages
For a deeper look at kafka configuration parameters check this page out.
You should only need one consumer per application.
vertx = Vertx.vertx();
// sample config
JsonObject consumerConfig = new JsonObject();
consumerConfig.put(ConfigConstants.GROUP_ID, "testGroup");
List<String> topics = new ArrayList<>();
topics.add("testTopic");
consumerConfig.put("topics", new JsonArray(topics));
deployKafka(config);
public void deployKafka(JsonObject config) {
// use your vert.x reference to deploy the consumer verticle
vertx.deployVerticle(MessageConsumer.class.getName(),
new DeploymentOptions().setConfig(config),
deploy -> {
if(deploy.failed()) {
System.err.println(String.format("Failed to start kafka consumer verticle, ex: %s", deploy.cause()));
vertx.close()
return;
}
System.out.println("kafka consumer verticle started");
}
);
}vertx.eventBus().consumer(MessageConsumer.EVENTBUS_DEFAULT_ADDRESS,
message -> {
System.out.println(String.format("got message: %s", message.body()))
// message handling code
KafkaEvent event = new KafkaEvent(message.body());
});You can listen on the address kafka.producer.error for errors from the kafka producer.
Send a message to a kafka cluster on a predefined topic.
{
"serializer.class":"<the default encoder>",
"key.serializer":"<the key encoder>",
"value.serializer":"<the value encoder>",
"bootstrap.servers":"<host1:9092,host2:9092>,"
"default_topic":"<default kafka topic to send to>,"
"eventbus.address":"<the event bus topic where you send messages to send to kafka>"
"max.block.ms" : <defaults to 60000>
}For example:
{
"serializer.class":"org.apache.kafka.common.serialization.StringSerializer",
"bootstrap.servers":"localhost:9092",
"default_topic":"testTopic"
}serializer.classThe serializer class for messageskey.serializerThe serializer class for keys, defaults to the serializel.class if not setvalue.serializerThe serializer class for values, defaults to the serializel.class if not setbootstrap.serversThe socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.default_topicThe default topic in kafka to send toeventbus.addressThe address to listen to on the event bus, defaults to 'kafka.message.publisher'max.block.msHow long should the sender wait before getting meta data or time out in ms.
For a deeper look at kafka configuration parameters check this page out.
You should only need one producer per application.
vertx = Vertx.vertx();
// sample config
JsonObject producerConfig = new JsonObject();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("serializer.class", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("default_topic", "testTopic");
deployKafka(producerConfig);
public void deployKafka(JsonObject config) {
// use your vert.x reference to deploy the consumer verticle
vertx.deployVerticle(MessageProducer.class.getName(),
new DeploymentOptions().setConfig(config),
deploy -> {
if(deploy.failed()) {
System.err.println(String.format("Failed to start kafka producer verticle, ex: %s", deploy.cause()));
vertx.close()
return;
}
System.out.println("kafka producer verticle started");
});
}KafkaPublisher publisher = new KafkaPublisher(vertx.eventBus());
// send to the default topic
publisher.send("a test message on a default topic");
// send to a specific topic
publisher.send("SomeSpecialTopic", "a test message on a default topic");
// send to a specific topic with custom key
publisher.send("SomeSpecialTopic", "aUserId", "a test message on a default topic");
// send to a specific topic and partition
publisher.send("SomeSpecialTopic", "", 5, "a test message on a default topic");You can listen on the address kafka.producer.error for errors from the kafka producer.
- cd [yourKafkaInstallDir]
- bin/zookeeper-server-start.sh config/zookeeper.properties
- bin/kafka-server-start.sh config/server.properties
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 8 --topic [yourTestTopic]
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic [yourTestTopic]
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic [yourTestTopic]