Skip to content

Commit 911ef5e

Browse files
Fix parameter check in Kafka cluster examples (#10361)
1 parent e450b07 commit 911ef5e

File tree

4 files changed

+23
-15
lines changed

4 files changed

+23
-15
lines changed

examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ public class ApacheKafkaContainerCluster implements Startable {
2525
private final Collection<KafkaContainer> brokers;
2626

2727
public ApacheKafkaContainerCluster(String version, int brokersNum, int internalTopicsRf) {
28-
if (brokersNum < 0) {
28+
if (brokersNum <= 0) {
2929
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
3030
}
31-
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
31+
if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) {
3232
throw new IllegalArgumentException(
33-
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
33+
"internalTopicsRf '" +
34+
internalTopicsRf +
35+
"' must be less than or equal to brokersNum and greater than 0"
3436
);
3537
}
3638

@@ -99,6 +101,6 @@ public void start() {
99101

100102
@Override
101103
public void stop() {
102-
this.brokers.stream().parallel().forEach(GenericContainer::stop);
104+
this.brokers.parallelStream().forEach(GenericContainer::stop);
103105
}
104106
}

examples/kafka-cluster/src/test/java/com/example/kafkacluster/ConfluentKafkaContainerCluster.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ public class ConfluentKafkaContainerCluster implements Startable {
2525
private final Collection<ConfluentKafkaContainer> brokers;
2626

2727
public ConfluentKafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
28-
if (brokersNum < 0) {
28+
if (brokersNum <= 0) {
2929
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
3030
}
31-
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
31+
if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) {
3232
throw new IllegalArgumentException(
33-
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
33+
"internalTopicsRf '" +
34+
internalTopicsRf +
35+
"' must be less than or equal to brokersNum and greater than 0"
3436
);
3537
}
3638

@@ -100,6 +102,6 @@ public void start() {
100102

101103
@Override
102104
public void stop() {
103-
this.brokers.stream().parallel().forEach(GenericContainer::stop);
105+
this.brokers.parallelStream().forEach(GenericContainer::stop);
104106
}
105107
}

examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ public class KafkaContainerCluster implements Startable {
3131
private final Collection<KafkaContainer> brokers;
3232

3333
public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
34-
if (brokersNum < 0) {
34+
if (brokersNum <= 0) {
3535
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
3636
}
37-
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
37+
if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) {
3838
throw new IllegalArgumentException(
39-
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
39+
"internalTopicsRf '" +
40+
internalTopicsRf +
41+
"' must be less than or equal to brokersNum and greater than 0"
4042
);
4143
}
4244

examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ public class KafkaContainerKraftCluster implements Startable {
2525
private final Collection<KafkaContainer> brokers;
2626

2727
public KafkaContainerKraftCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
28-
if (brokersNum < 0) {
28+
if (brokersNum <= 0) {
2929
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
3030
}
31-
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
31+
if (internalTopicsRf <= 0 || internalTopicsRf > brokersNum) {
3232
throw new IllegalArgumentException(
33-
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
33+
"internalTopicsRf '" +
34+
internalTopicsRf +
35+
"' must be less than or equal to brokersNum and greater than 0"
3436
);
3537
}
3638

@@ -101,6 +103,6 @@ public void start() {
101103

102104
@Override
103105
public void stop() {
104-
this.brokers.stream().parallel().forEach(GenericContainer::stop);
106+
this.brokers.parallelStream().forEach(GenericContainer::stop);
105107
}
106108
}

0 commit comments

Comments
 (0)