Skip to content

Commit cf03e41

Browse files
steinerdaJoergSiebahn
authored andcommitted
fix(server-kafka): ignore commits without record changes
1 parent a143470 commit cf03e41

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

sda-commons-server-kafka/src/main/java/org/sdase/commons/server/kafka/consumer/strategies/synccommit/SyncCommitMLS.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ public SyncCommitMLS(
4444

4545
@Override
4646
public void processRecords(ConsumerRecords<K, V> records, KafkaConsumer<K, V> consumer) {
47-
super.processRecords(records, consumer);
48-
commitSync(consumer);
47+
// Only process and commit if there are records to process
48+
if (!records.isEmpty()) {
49+
super.processRecords(records, consumer);
50+
this.commitSync(consumer);
51+
}
4952
}
5053

5154
private void commitSync(KafkaConsumer<K, V> consumer) {

sda-commons-server-kafka/src/test/java/org/sdase/commons/server/kafka/consumer/SyncCommitStrategyTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import static org.mockito.Mockito.timeout;
77
import static org.mockito.Mockito.when;
88

9+
import java.util.Collections;
10+
import org.apache.kafka.clients.consumer.ConsumerRecords;
911
import org.apache.kafka.clients.consumer.KafkaConsumer;
1012
import org.apache.kafka.common.errors.TimeoutException;
1113
import org.junit.jupiter.api.BeforeEach;
@@ -83,4 +85,18 @@ void shouldStillThrowOtherExceptionsIfNoSyncCommitErrorHandlerConfigured() {
8385

8486
Mockito.verify(consumer, timeout(100).times(1)).commitSync();
8587
}
88+
89+
@Test
90+
void shouldNotCommitOnEmptyRecords() {
91+
// given
92+
SyncCommitMLS<String, String> strategy = new SyncCommitMLS<>(handler, errorHandler);
93+
strategy.init(null);
94+
ConsumerRecords<String, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
95+
96+
// when the strategy processes the empty records
97+
strategy.processRecords(emptyRecords, consumer);
98+
99+
// then commitSync() should never be called on the consumer
100+
Mockito.verify(consumer, Mockito.never()).commitSync();
101+
}
86102
}

0 commit comments

Comments
 (0)