Skip to content

Conversation

RaidenE1
Copy link
Contributor

@RaidenE1 RaidenE1 commented Jul 31, 2025

Implements a timeout mechanism (using maxPollTimeMs) that waits for
missing source topics to be created before failing, instead of
immediately throwing exceptions in the new Streams protocol.
Additionally, throw TopologyException when partition count mismatch is
detected.

Reviewers: Lucas Brutschy [email protected], Alieh Saeedi
[email protected], Matthias J. Sax [email protected]

@github-actions github-actions bot added triage PRs from the community streams labels Jul 31, 2025
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looking good to me. I left a few comments to cleanup/simplify the code.

@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>();

// Missing source topic timeout tracking
private long firstMissingSourceTopicTime = -1L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would make things slighly more easy to read if we'd use
org.apache.kafka.common.utils.Timer for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we rename this to a more generic topicsReadyTimer? I think we may want to reuse the timer to also time out when internal topics are not created in time.

@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new KafkaFutureImpl<>();

// Missing source topic timeout tracking
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you describe a member, I'd use a javadoc comment. But this comment isn't adding anything on top of the variable name, so maybe we can drop it altogether?

handleMissingSourceTopicsWithTimeout(missingTopicsDetail);
} else {
// Reset timeout tracking when no missing source topics are reported
firstMissingSourceTopicTime = -1L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you use org.apache.kafka.common.utils.Timer and call reset here, you don't need the inline comment.

@github-actions github-actions bot removed the triage PRs from the community label Aug 2, 2025

// Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout
mockTime.sleep(300001);

Copy link
Contributor

@aliehsaeedii aliehsaeedii Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: advance time less than 5 min and check if 2nd call throws exception and also check the log message (if easy) and then next step advancing time beyond 5 min as you did!


// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();

Copy link
Contributor

@aliehsaeedii aliehsaeedii Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same suggestion (below) here.

@aliehsaeedii
Copy link
Contributor

@RaidenE1 Thank you, the PR looks good to me now. I had a suggestion, but it’s not necessary to address since you’re already checking the condition in a later test.

log.error(errorMsg);

// Reset timer for next timeout cycle
topicsReadyTimer.updateAndReset(maxPollTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to update the timer? We throw MissingSourceTopicException below, and this should shut down the thread?

log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
throw new TopologyException(errorMsg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this case is newly added, but we did not add a new test for it?

@mjsax mjsax merged commit 03190e4 into apache:trunk Aug 6, 2025
20 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants