-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Flink: Fix hash code comparison for requesting global statistics in DataStatisticsCoordinator #13827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @stevenzwu @pvary , could you please help review this PR and verify whether it is appropriate? Thank you very much! |
@@ -277,7 +277,7 @@ private void handleRequestGlobalStatisticsEvent(int subtask, RequestGlobalStatis | |||
if (globalStatistics != null) { | |||
runInCoordinatorThread( | |||
() -> { | |||
if (event.signature() != null && event.signature() != globalStatistics.hashCode()) { | |||
if (event.signature() != null && event.signature() == globalStatistics.hashCode()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx for catching the bug. I think your fix is correct.
Should we also fix the log message as following?
Skip responding to statistics request from subtask {}, as the operator task already holds the same global statistics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have change it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just some minor comments
dataStatisticsCoordinator.handleEventFromOperator( | ||
0, 0, new RequestGlobalStatisticsEvent(correctSignature)); | ||
|
||
Thread.sleep(200); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we are waiting for 200 ms to confirm no response is sent in this case. We can probably replace the sleep with waitForCoordinatorToProcessActions
. Then we can immediately assert the sent events count hasn't changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for point is out, I have change it .
.atMost(Duration.ofSeconds(10)) | ||
.until(() -> receivingTasks.getSentEventsForSubtask(0).size() == 2); | ||
|
||
// signature is right |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe the comment can be changed to following?
Simulate the scenario where a subtask send global statistics request with the same hash code. The coordinator would skip the response after comparing the request contained hash code with latest global statistics hash code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
thanks @Guosmilesmile for catching and fixing the bug |
In
DataStatisticsCoordinator
, when handling theRequestGlobalStatisticsEvent
, the coordinator should skip responding to the subtask if the event's signature matches thehashCode
of the currentglobalStatistics
.The current implementation is incorrect—this PR fixes that behavior.