Skip to content

KAFKA-17019: Producer TimeoutException should include root cause #20159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.PotentialCauseException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
Expand Down Expand Up @@ -671,7 +672,8 @@ public void initTransactions(boolean keepPreparedTxn) {
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new PotentialCauseException("InitTransactions timed out – could not discover the transaction coordinator or receive the InitProducerId response within max.block.ms (broker unavailable, network lag, or ACL denial)."));
producerMetrics.recordInit(time.nanoseconds() - now);
transactionManager.maybeUpdateTransactionV2Enabled(true);
}
Expand Down Expand Up @@ -760,7 +762,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
long start = time.nanoseconds();
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new PotentialCauseException("SendOffsetsToTransaction timed out – unable to reach the consumer-group or transaction coordinator or to receive the TxnOffsetCommit/AddOffsetsToTxn response within max.block.ms (coordinator unavailable, rebalance in progress, network/ACL issue)."));
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
Expand Down Expand Up @@ -845,7 +848,8 @@ public void commitTransaction() throws ProducerFencedException {
long commitStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new PotentialCauseException("CommitTransaction timed out – failed to complete EndTxn with the transaction coordinator within max.block.ms (coordinator unavailable, network lag, ACL/rebalance)."));
producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
}

Expand Down Expand Up @@ -880,7 +884,8 @@ public void abortTransaction() throws ProducerFencedException {
long abortStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
() -> new PotentialCauseException("AbortTransaction timed out – could not complete EndTxn(abort) with the transaction coordinator within max.block.ms (coordinator unavailable/rebalancing, network lag, or ACL denial)."));
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}

Expand Down Expand Up @@ -1225,7 +1230,9 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
if (metadata.getError(topic) != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to refactor it like this:

Errors error = metadata.getError(topic);
if (error != null) {
    cause = error.exception();
} else if (ex.getCause() != null) {
    cause = ex.getCause();
} else {
    cause = new KafkaException(METADATA_TIMEOUT_MSG);
}

throw new TimeoutException(errorMessage, cause);

Also, since ex.getCause() != null can never be true based on the current awaitUpdate implementation, do we need to check for it?

throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
throw new TimeoutException(errorMessage);
if (ex.getCause() != null)
throw new TimeoutException(errorMessage, ex.getCause());
throw new TimeoutException(errorMessage, new PotentialCauseException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems PotentialCauseException does not offer more useful information than the error message, right? Perhaps we could enrich the error message instead of introducing a new nonspecific exception

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 thanks for your comment. 🙇‍♂️
I agree your opinion especially that 'we could enrich the error message instead of introducing a new nonspecific exception'.
However, Introducing a new nonspecific exception to solve this issue has has pros and cons.

Let me explain more.

The KAFKA-17019 requires that all org.apache.kafka.common.errors.TimeoutException should include root cause as a nested exception.

It’s hard to pinpoint the root cause in every situation where a TimeoutException occurs

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);

In here, expiredInflightBatches and expiredBatches can cause TimeoutException.
However, TimeoutException is thrown as a result of calculating elapsed time.
Therefore, expiredInflightBatches and expiredBatches don't have information about root cause.

If expiredInflight encounters some problem --

  1. Normal Case - No errors and succeed.
  2. Network Issue - Error might occurs after the expired period.
  3. a little bit slow network issue because of Bandwidth, and so on. - No errors and slow response.
  4. Busy CPU due to various reasons - No errors and called slowly.

There are many possible scenarios where a TimeoutException can occur, even with a simple analysis.
However, the expiredInFlight instance can actually throw an error in only 2 (Maybe connection closed, fail to establish connection, ...)

After spending a considerable amount of time reviewing the code and analyzing the TimeoutExceptions thrown by the Producer, I concluded that it's difficult to extract the root cause at every point where a TimeoutException is created.

Idea + Pros and Cons

Developers usually have a good understanding of what kind of error might occur in a given context.
Therefore, in cases where it's difficult to catch the actual root cause, it's possible to include an expected exception as the root cause instead.

Here’s a summary of the pros and cons (compared to simply enhancing the error message):

  • pros : At call sites(For example, kafka streams, kafka connect, kafka producer internal and so on) where a TimeoutException is expected, the root cause can be used to handle different cases conditionally.
    • For example, you could create NetworkPotentialCauseException and CPUBusyPotentialCauseException as subclasses of PotentialCauseException, and handle different branches based on the root cause—branch A if the root cause is a NetworkPotentialCauseException, and branch B if it’s a CPUBusyPotentialCauseException.
  • cons : Higher instance creation cost compared to String instance.

I spent quite a bit of time thinking through the direction of this PR.
What are your thoughts on it?
Please let me know. 🙇‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your explanation. I agree that specific exception types could help developer to catch the actual root cause. My question is, do we really need PotentialCauseException to be the base exception for NetworkPotentialCauseException and CPUBusyPotentialCauseException? Perhaps KafkaException is good enough.

Another exception design uses TimeoutException as a parent class, similar to e032a36. The benefit of this is simplifying the code, since developers wouldn't need to check root cause of a TimeoutException from root cause of an ExecutionException 😄

thanks for bringing up this great discussion. I'd like to see kafka exception hierarchy becomr more developer-friendly.

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712
Thank you for considering my proposal and for sharing your valuable thoughts.
As you mentioned, I also think it’s a good idea to reuse an existing Exception class.

Personally, I think using KafkaException would be a better approach.
Since there can be multiple potential causes for a TimeoutException in certain code paths, it might be difficult to pinpoint the exact cause. In such cases, it could be unclear which subclass of TimeoutException should be used. (e032a36)

So, my suggestion is as follows:

  • Include a KafkaException as the root cause of the TimeoutException, and describe the possible scenario in the error message of the KafkaException.
  • Let the detailed error information be available via the root cause of the TimeoutException.
  • Keep the current message format of the TimeoutException itself (which currently only includes the elapsed time before it expired).

I’m thinking of revising the PR in this direction.

This way, I believe we can preserve the current semantics of TimeoutException while still conveying helpful contextual information (such as a potential cause) when necessary.
In the future, if there's a need to branch logic based on a more specific cause of the timeout—even if no actual exception was thrown—then the developer could define a concrete PotentialCauseException class as needed.

Also, if this direction sounds reasonable, I think it wouldn’t require a KIP change.

What do you think?
Please share your opinion 🙇‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposal of reusing KafkaException sounds good. +1 to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me also. There's no need to create a KIP for this approach and it gives just as much information to the user.

}
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
Expand All @@ -1234,7 +1241,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) {
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
throw new TimeoutException(errorMessage);
throw new TimeoutException(errorMessage, new PotentialCauseException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
}
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.PotentialCauseException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand Down Expand Up @@ -424,7 +425,7 @@ public Uuid clientInstanceId(Duration timeout) {
if (injectTimeoutExceptionCounter > 0) {
--injectTimeoutExceptionCounter;
}
throw new TimeoutException();
throw new TimeoutException(new PotentialCauseException("TimeoutExceptions are successfully injected for test."));
}

return clientInstanceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.PotentialCauseException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -415,7 +416,7 @@ private long sendProducerData(long now) {
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
failBatch(expiredBatch, new TimeoutException(new PotentialCauseException(errorMessage)), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@


import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.PotentialCauseException;
import org.apache.kafka.common.errors.TimeoutException;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public final class TransactionalRequestResult {
private final CountDownLatch latch;
Expand All @@ -48,15 +50,19 @@ public void done() {
}

public void await() {
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new PotentialCauseException("Unknown reason."));
}

public void await(long timeout, TimeUnit unit) {
this.await(timeout, unit, () -> new PotentialCauseException("Unknown reason."));
}

public void await(long timeout, TimeUnit unit, Supplier<PotentialCauseException> potentialCauseException) {
try {
boolean success = latch.await(timeout, unit);
if (!success) {
throw new TimeoutException("Timeout expired after " + unit.toMillis(timeout) +
"ms while awaiting " + operation);
"ms while awaiting " + operation, potentialCauseException.get());
}

isAcked = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Indicates potential reason that can cause {@link TimeoutException}.
*/
public class PotentialCauseException extends RetriableException {

private static final long serialVersionUID = 1L;

public PotentialCauseException(String message) {
super(message);
}

}
Loading