This Flink job replicates Kafka topics between Kafka clusters.
The job can be submitted to Flink, either through the command line or through the Flink UI. All configuration is provided through the job’s command line arguments.
The following command launches a Flink job on a YARN cluster, reading from an unsecured Kafka cluster and writing to a secure cluster (using TLS connections and SASL/PLAIN authentication):
flink run \
  --detached \
  --class com.cloudera.flink.FlinkKafkaReplicator \
  --shutdownOnAttachedExit \
  --jobmanager yarn-cluster \
  --parallelism 1 \
  /tmp/flink-replicate-kafka-1.0.jar \
    --topics '^[^_].*' \
    --consumer.bootstrap.servers edge2ai-1.dim.local:9092 \
    --consumer.security.protocol PLAINTEXT \
    --consumer.group.id consuming \
    --producer.bootstrap.servers araujo-kafka-broker0.se-sandb.a465-9q4k.cloudera.site:9093,araujo-kafka-broker1.se-sandb.a465-9q4k.cloudera.site:9093,araujo-kafka-broker2.se-sandb.a465-9q4k.cloudera.site:9093 \
    --producer.security.protocol SASL_SSL \
    --producer.sasl.mechanism PLAIN \
    --producer.sasl.jaas.config 'org.apache.kafka.common.security.plain.PlainLoginModule required username=<<araujo>> password=<<Supersecret1!>>;' \
    --producer.ssl.truststore.location /opt/cloudera/security/truststore.jks \
    --create-topicsThe following table lists the minimum set of arguments that must be specified:
| Argument | Description | 
|---|---|
| --topics <regex> | Regular expression matching the name of topics to replicate. The expression will match any topics on the source Kafka, even Kafka internal topics. It’s up to the user to provide an expression that excludes internal topics. Some examples: 
 | 
| --consumer.bootstrap.servers <brokers> | Comma-separated list of brokers for the source Kafka cluster (where we’ll read from). Each broker must be specified in the  | 
| --consumer.group.id <group_id> | Consumer group ID to use when reading data from the source | 
| --producer.bootstrap.servers <brokers> | Comma-separated list of brokers for the target Kafka cluster (where we’ll write to). Each broker must be specified in the  | 
The following arguments are optional:
| Argument | Description | Default | 
|---|---|---|
| --checkpoint-interval-ms <time_ms> | Checkpoint interval, in milliseconds | 1000 | 
| --create-partitions | If a topic being replicated already exists in the target cluster but with fewer partitions, the job will increase the number of partition for the target topic to match the source | If any topic on the target has fewer partitions than the same topic on the source, the job fails, unless  | 
| --create-topics | If a topic being replicated does not exist in the target cluster, it is created, with the same number of partitions and same properties the topic has on the source | If any topic doesn’t exist on the target, the job fails | 
| --exactly-once | Uses transactional writes to the target cluster to achieve exactly-once semantics. | Writes to the target are non-transactional (at-least-once semantics) | 
| --latest-offset | When the job runs without resuming from a previous checkpoint/savepoint, it will start reading from the latest topic offsets | When the job runs without resuming from a previous checkpoint/savepoint, it will start reading from the earliest topic offsets | 
| --no-preserve-partitioning | The partition assignment for each messages on the source topics will be ignored. Each message will be assigned a partition on the target cluster upon writing, according to the  | The messages will be replicated to the same partition they belong to on the source. The default behaviour can only be used if all the topics being replicated have the same number or more partitions on the target | 
| --sync-properties | Copy all the topic properties from the source to the replicated topics on the target | Properties are not replicated | 
| --transaction-timeout-ms <time_ms> | The maximum amount of time, in milliseconds, that the transaction coordinator on the target will wait for a transaction to be updated. This argument is ignored if  | 870000 (14.5 minutes) |