Skip to content

Commit f3c8174

Browse files
committed
Fix EventHubsConsumer updating the checkpoint on every event.
This is because EventHubsConsumer#processEvents is initially equal to zero and the condition checking the batch size in EventHubsConsumer#processCheckpoint will always be met.
1 parent e82f0d5 commit f3c8174

File tree

1 file changed

+14
-43
lines changed
  • components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs

1 file changed

+14
-43
lines changed

components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
import static org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_SIZE;
3636
import static org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_TIMEOUT;
37-
import static org.apache.camel.component.azure.eventhubs.EventHubsConstants.UNCOMPLETED;
3837

3938
public class EventHubsConsumer extends DefaultConsumer {
4039

@@ -170,21 +169,28 @@ private void processCommit(final Exchange exchange, final EventContext eventCont
170169
}
171170

172171
try {
173-
var completionCondition = processCheckpoint(exchange);
174-
if (completionCondition.equals(COMPLETED_BY_SIZE)) {
172+
var cnt = processedEvents.incrementAndGet();
173+
if (cnt == getConfiguration().getCheckpointBatchSize()) {
174+
processedEvents.set(0);
175+
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_SIZE);
176+
LOG.debug("eventhub consumer batch size of reached");
177+
if (lastTask != null) {
178+
lastTask.cancel();
179+
}
175180
eventContext.updateCheckpointAsync()
176181
.subscribe(unused -> LOG.debug("Processed one event..."),
177182
error -> LOG.debug("Error when updating Checkpoint: {}", error.getMessage()),
178183
() -> {
179-
processedEvents.set(0);
180184
LOG.debug("Checkpoint updated.");
181185
});
182-
183-
} else if (!completionCondition.equals(COMPLETED_BY_TIMEOUT)) {
184-
processedEvents.incrementAndGet();
186+
} else if (System.currentTimeMillis() >= lastTask.scheduledExecutionTime()) {
187+
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_TIMEOUT);
188+
LOG.debug("eventhub consumer batch timeout reached");
189+
} else {
190+
LOG.debug("neither eventhub consumer batch size of {}/{} nor batch timeout reached yet", cnt,
191+
getConfiguration().getCheckpointBatchSize());
185192
}
186193
// we assume that the timer task has done the update by its side
187-
188194
} catch (Exception ex) {
189195
getExceptionHandler().handleException("Error occurred during updating the checkpoint. This exception is ignored.",
190196
exchange, ex);
@@ -202,39 +208,4 @@ private void processRollback(Exchange exchange) {
202208
getExceptionHandler().handleException("Error during processing exchange.", exchange, cause);
203209
}
204210
}
205-
206-
/**
207-
* Checks either the batch size or the batch timeout is reached
208-
*
209-
* @param exchange the exchange
210-
* @return the completion condition (batch size or batch timeout) if one of them is reached, else the
211-
* 'uncompleted' state adds a header {@value EventHubsConstants#CHECKPOINT_UPDATED_BY} with the
212-
* completion condition (
213-
*/
214-
private String processCheckpoint(Exchange exchange) {
215-
// Check if the batch size is reached
216-
if (processedEvents.get() % getConfiguration().getCheckpointBatchSize() == 0) {
217-
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_SIZE);
218-
LOG.debug("eventhub consumer batch size of reached");
219-
// no need to run task if the batch size already did the checkpointing
220-
if (lastTask != null) {
221-
lastTask.cancel();
222-
}
223-
224-
return COMPLETED_BY_SIZE;
225-
} else {
226-
LOG.debug("eventhub consumer batch size of {}/{} not reached yet", processedEvents.get(),
227-
getConfiguration().getCheckpointBatchSize());
228-
}
229-
230-
// Check if the batch timeout is reached
231-
if (System.currentTimeMillis() >= lastTask.scheduledExecutionTime()) {
232-
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_TIMEOUT);
233-
LOG.debug("eventhub consumer batch timeout reached");
234-
235-
return COMPLETED_BY_TIMEOUT;
236-
}
237-
238-
return UNCOMPLETED;
239-
}
240211
}

0 commit comments

Comments
 (0)