Skip to content

KAFKA-17019: Producer TimeoutException should include root cause #20159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,8 @@ public void initTransactions(boolean keepPreparedTxn) {
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new KafkaException("InitTransactions timed out – could not discover the transaction coordinator or receive the InitProducerId response within max.block.ms (broker unavailable, network lag, or ACL denial)."));
producerMetrics.recordInit(time.nanoseconds() - now);
transactionManager.maybeUpdateTransactionV2Enabled(true);
}
Expand Down Expand Up @@ -760,7 +761,10 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
long start = time.nanoseconds();
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new KafkaException("SendOffsetsToTransaction timed out – unable to reach the consumer-group or "
+ "transaction coordinator or to receive the TxnOffsetCommit/AddOffsetsToTxn "
+ "response within max.block.ms (coordinator unavailable, rebalance in progress, network/ACL issue)."));
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
Expand Down Expand Up @@ -845,7 +849,9 @@ public void commitTransaction() throws ProducerFencedException {
long commitStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new KafkaException("CommitTransaction timed out – failed to complete EndTxn with the transaction coordinator "
+ "within max.block.ms (coordinator unavailable, network lag, ACL/rebalance)."));
producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
}

Expand Down Expand Up @@ -880,7 +886,10 @@ public void abortTransaction() throws ProducerFencedException {
long abortStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new KafkaException("AbortTransaction timed out – could not complete EndTxn(abort) "
+ "with the transaction coordinator within max.block.ms "
+ "(coordinator unavailable/rebalancing, network lag, or ACL denial)."));
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}

Expand Down Expand Up @@ -1225,7 +1234,9 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
if (metadata.getError(topic) != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to refactor it like this:

Errors error = metadata.getError(topic);
if (error != null) {
    cause = error.exception();
} else if (ex.getCause() != null) {
    cause = ex.getCause();
} else {
    cause = new KafkaException(METADATA_TIMEOUT_MSG);
}

throw new TimeoutException(errorMessage, cause);

Also, since ex.getCause() != null can never be true based on the current awaitUpdate implementation, do we need to check for it?

throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
throw new TimeoutException(errorMessage);
if (ex.getCause() != null)
throw new TimeoutException(errorMessage, ex.getCause());
throw new TimeoutException(errorMessage, new KafkaException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
Expand All @@ -1234,7 +1245,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) {
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
throw new TimeoutException(errorMessage);
throw new TimeoutException(errorMessage, new KafkaException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
}
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public Uuid clientInstanceId(Duration timeout) {
if (injectTimeoutExceptionCounter > 0) {
--injectTimeoutExceptionCounter;
}
throw new TimeoutException();
throw new TimeoutException(new KafkaException("TimeoutExceptions are successfully injected for test."));
}

return clientInstanceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ private long sendProducerData(long now) {
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
KafkaException potentialCause = new KafkaException(
"The broker might be unavailable or responding slowly, or the CPU might be busy.");
failBatch(expiredBatch, new TimeoutException(errorMessage, potentialCause), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package org.apache.kafka.clients.producer.internals;


import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public final class TransactionalRequestResult {
private final CountDownLatch latch;
Expand All @@ -48,15 +50,19 @@ public void done() {
}

public void await() {
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new KafkaException("Unknown reason."));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new KafkaException("Unknown reason."));
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

}

public void await(long timeout, TimeUnit unit) {
this.await(timeout, unit, () -> new KafkaException("Unknown reason."));
}

public void await(long timeout, TimeUnit unit, Supplier<KafkaException> potentialCauseException) {
try {
boolean success = latch.await(timeout, unit);
if (!success) {
throw new TimeoutException("Timeout expired after " + unit.toMillis(timeout) +
"ms while awaiting " + operation);
"ms while awaiting " + operation, potentialCauseException.get());
}

isAcked = true;
Expand Down