Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/health-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ First, we need a new project. Create a new project with the following command:

[source, subs=attributes+]
----
mvn io.quarkus:quarkus-maven-plugin:0.11.0:create \
mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=microprofile-health \
-Dextensions="smallrye-health"
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
301 changes: 301 additions & 0 deletions docs/src/main/asciidoc/kafka-guide.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
include::./attributes.adoc[]
= {project-name} - Using Apache Kafka with Reactive Messaging

This guide demonstrates how your {project-name} application can utilize MicroProfile Reactive Messaging to interact with Apache Kafka.

== Prerequisites

To complete this guide, you need:

* less than 15 minutes
* an IDE
* JDK 1.8+ installed with `JAVA_HOME` configured appropriately
* Apache Maven 3.5.3+
* A running Kafka cluster, or Docker Compose to start a development cluster
* GraalVM installed if you want to run in native mode.

== Architecture

In this guide, we are going to generate (random) prices in one component.
These prices are written in a Kafka topic (`prices`).
A second component reads from the `prices` Kafka topic and apply some magic conversion to the price.
The result is sent to an in-memory stream consumed by a JAX-RS resource.
The data is sent to a browser using server-sent event.

image::kafka-guide-architecture.png[alt=Architecture]

== Solution

We recommend that you follow the instructions in the next sections and create the application step by step.
However, you can go right to the completed example.

Clone the Git repository: `git clone {quickstarts-clone-url}`, or download an {quickstarts-archive-url}[archive].

The solution is located in the `kafka-quickstart` {quickstarts-tree-url}/kafka-quickstart[directory].

== Creating the Maven Project

First, we need a new project. Create a new project with the following command:

[source, subs=attributes+]
----
mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=kafka-quickstart \
-Dextensions="reactive-kafka,vert.x"
----

This command generates a Maven project, importing the Reactive Messaging and Kafka connector extensions. Eclipse Vert.x is also required as the Kafka connector relies on Vert.x.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't Vert.x be a dependency of the Kafka connector extension, then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some strange reason, it does not work when building the native image. It's a known bug, we need to have a look.


== Starting Kafka

Then, we need a Kafka cluster.
You can follow the instructions from the https://kafka.apache.org/quickstart[Apache Kafka web site] or create a `docker-compose.yaml` file with the following content:

[source, yaml]
----
version: '2'

services:

zookeeper:
image: wurstmeister/zookeeper:3.4.6
expose:
- "2181"

kafka:
image: wurstmeister/kafka:2.12-2.1.1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
----

Once created, run `docker-compose up`.

NOTE: This is a development cluster, do not use in production.

== The price generator

Create the `src/main/java/org/acme/quarkus/sample/PriceGenerator.java` file, with the following content:

[source, java]
----
package org.acme.quarkus.sample;

import io.reactivex.Flowable;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* A bean producing random prices every 5 seconds.
* The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
*/
@ApplicationScoped
public class PriceGenerator {

private Random random = new Random();

@Outgoing("generated-price") // <1>
public Flowable<Integer> generate() { // <2>
return Flowable.interval(5, TimeUnit.SECONDS)
.map(tick -> random.nextInt(100));
}

}
----
<1> Instruct Reactive Messaging to dispatch the items from returned stream to `generated-price`.
<2> The method returns a RX Java 2 _stream_ (`Flowable`) emitting a random _price_ every 5 seconds.

The method returns a _Reactive Streams_. The generated items are sent to the stream named `generated-price`.
This stream is mapped to Kafka using the `application.properties` file that we will create soon.

== The price converter

The price converter reads the prices from Kafka, and transforms them.
Creates the `src/main/java/org/acme/quarkus/sample/PriceConverter.java` file with the following content:

[source, java]
----
package org.acme.quarkus.sample;

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "prices" Kafka topic and applying some conversion.
* The result is pushed to the "my-data-stream" stream which is an in-memory stream.
*/
@ApplicationScoped
public class PriceConverter {

private static final double CONVERSION_RATE = 0.88;

@Incoming("prices") // <1>
@Outgoing("my-data-stream") // <2>
@Broadcast // <3>
public double process(int priceInUsd) {
return priceInUsd * CONVERSION_RATE;
}

}
----
<1> Indicates that the method consumes the items from the `prices` topic
<2> Indicates that the objects returned by the method are sent to the `my-data-stream` stream
<3> Indicates that the item are dispatched to all _subscribers_

The `process` method is called for every Kafka _record_ from the `prices` topic (configured in the application configuration).
Every result is sent to the `my-data-stream` in-memory stream.

== The price resource

Finally, let's bind our stream to a Jax-RS resource.
Creates the `src/main/java/org/acme/quarkus/sample/PriceResource.java` file with the following content:

[source, java]
----
package org.acme.quarkus.sample;

import io.smallrye.reactive.messaging.annotations.Stream;
import org.reactivestreams.Publisher;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

/**
* A simple resource retrieving the in-memory "my-data-stream" and sending the items to a server sent event.
*/
@Path("/prices")
public class PriceResource {

@Inject
@Stream("my-data-stream") Publisher<Double> prices; // <1>

@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS) // <2>
public Publisher<Double> stream() { // <3>
return prices;
}
}
----
<1> Injects the `my-data-stream` stream using the `@Stream` qualifier
<2> Indicates that the content is sent using `Server Sent Events`
<3> Returns the stream (_Reactive Stream_)

== Configuring the Kafka connector

We need to configure Kafka connector. This is done in the `application.properties` file.
The keys are structured as follows:

`smallrye.messaging.[sink|source].{stream-name}.property=value`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will really need to talk about if we want to achieve consistency with every configuration setting being quarkus.<something>. Maybe it doesn't make sense for MP components, I don't know, but I thought it was the convention.

(General comment, don't change anything)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still in discussion for the spec, so likely be changed soon. But yes, the question is whether we want quarkus.. The implementation would support this (you just need to add a bean in the extension).


The `stream-name` segment must match the value set in the `@Incoming` and `@Outgoing` annotation:
* `generated-price` -> sink in which we write the prices
* `prices` -> source in which we read the prices

[source]
----
# Configure the Kafka sink (we write to it)
smallrye.messaging.sink.generated-price.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.sink.generated-price.topic=prices
smallrye.messaging.sink.generated-price.bootstrap.servers=localhost:9092
smallrye.messaging.sink.generated-price.key.serializer=org.apache.kafka.common.serialization.StringSerializer
smallrye.messaging.sink.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
smallrye.messaging.sink.generated-price.acks=1

# Configure the Kafka source (we read from it)
smallrye.messaging.source.prices.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.prices.topic=prices
smallrye.messaging.source.prices.bootstrap.servers=localhost:9092
smallrye.messaging.source.prices.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
smallrye.messaging.source.prices.group.id=my-group-id
----

More details about this configuration is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration] section from the Kafka documentation.

NOTE: What about `my-data-stream`? This is an in-memory stream, not connected to a message broker.

== The HTML page

Final touch, the HTML page reading the converted prices using SSE.

Create the `src/main/resources/META-INF/resources/prices.html` file, with the following content:

[source, html]
----
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Prices</title>

<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

<h2>Last price</h2>
<div class="row">
<p class="col-md-12">The last price is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
</div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script>
var source = new EventSource("/prices/stream");
source.onmessage = function (event) {
document.getElementById("content").innerHTML = event.data;
};
</script>
</html>
----

Nothing spectacular here. On each received price, it updates the page.

== Get it running

If you followed the instructions, you should have Kafka running.
Then, you just need to run the application using:

[source, shell]
----
mvn compile quarkus:dev
----

Open `http://localhost:8080/prices.html` in your browser.

NOTE: If you started the Kafka broker with docker compose, stop it using `CTRL+C` followed by `docker-compose down`.

== Running Native

You can build the native executable with:

[source, shell]
----
mvn package -Pnative
----

== Going further

This guide has shown how you can interact with Kafka using Quarkus.
It utilizes MicroProfile Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging], the implementation used in Quarkus.