Skip to content

Commit f5fb6cc

Browse files
authored
fix the version string checks in KafkaIO (#35703)
* fix: use Strings.isNullOrEmpty for version string checks in KafkaIO * fix: replace Strings.isNullOrEmpty with null and empty checks for version string validation in KafkaIO
1 parent e80ffd4 commit f5fb6cc

File tree

1 file changed

+2
-0
lines changed
  • sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka

1 file changed

+2
-0
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
18591859
.as(StreamingOptions.class)
18601860
.getUpdateCompatibilityVersion();
18611861
if (requestedVersionString != null
1862+
&& !requestedVersionString.isEmpty()
18621863
&& TransformUpgrader.compareVersions(requestedVersionString, "2.66.0") < 0) {
18631864
// Use discouraged Impulse for backwards compatibility with previous released versions.
18641865
output =
@@ -2831,6 +2832,7 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
28312832
.as(StreamingOptions.class)
28322833
.getUpdateCompatibilityVersion();
28332834
if (requestedVersionString != null
2835+
&& !requestedVersionString.isEmpty()
28342836
&& TransformUpgrader.compareVersions(requestedVersionString, "2.60.0") < 0) {
28352837
// Redistribute is not allowed with commits prior to 2.59.0, since there is a Reshuffle
28362838
// prior to the redistribute. The reshuffle will occur before commits are offsetted and

0 commit comments

Comments
 (0)