Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private void handleRequestGlobalStatisticsEvent(int subtask, RequestGlobalStatis
if (globalStatistics != null) {
runInCoordinatorThread(
() -> {
if (event.signature() != null && event.signature() != globalStatistics.hashCode()) {
if (event.signature() != null && event.signature() == globalStatistics.hashCode()) {
Copy link
Contributor

@stevenzwu stevenzwu Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for catching the bug. I think your fix is correct.

Should we also fix the log message as following?

Skip responding to statistics request from subtask {}, as the operator task already holds the same global statistics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have change it now.

LOG.debug(
"Skip responding to statistics request from subtask {}, as hashCode matches or not included in the request",
subtask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,57 @@ public void testRequestGlobalStatisticsEventHandling() throws Exception {
}
}

@Test
public void testMultipleRequestGlobalStatisticsEvents() throws Exception {
try (DataStatisticsCoordinator dataStatisticsCoordinator =
createCoordinator(StatisticsType.Map)) {
dataStatisticsCoordinator.start();
tasksReady(dataStatisticsCoordinator);

StatisticsEvent checkpoint1Subtask0DataStatisticEvent =
Fixtures.createStatisticsEvent(
StatisticsType.Map, Fixtures.TASK_STATISTICS_SERIALIZER, 1L, CHAR_KEYS.get("a"));
StatisticsEvent checkpoint1Subtask1DataStatisticEvent =
Fixtures.createStatisticsEvent(
StatisticsType.Map, Fixtures.TASK_STATISTICS_SERIALIZER, 1L, CHAR_KEYS.get("b"));

dataStatisticsCoordinator.handleEventFromOperator(
0, 0, checkpoint1Subtask0DataStatisticEvent);
dataStatisticsCoordinator.handleEventFromOperator(
1, 0, checkpoint1Subtask1DataStatisticEvent);

waitForCoordinatorToProcessActions(dataStatisticsCoordinator);

// signature is null
dataStatisticsCoordinator.handleEventFromOperator(0, 0, new RequestGlobalStatisticsEvent());

// Checkpoint StatisticEvent + RequestGlobalStatisticsEvent
Awaitility.await("wait for first statistics event")
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> receivingTasks.getSentEventsForSubtask(0).size() == 2);

// signature is right
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe the comment can be changed to following?

Simulate the scenario where a subtask send global statistics request with the same hash code. The coordinator would skip the response after comparing the request contained hash code with latest global statistics hash code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

int correctSignature = dataStatisticsCoordinator.globalStatistics().hashCode();
dataStatisticsCoordinator.handleEventFromOperator(
0, 0, new RequestGlobalStatisticsEvent(correctSignature));

Thread.sleep(200);
Copy link
Contributor

@stevenzwu stevenzwu Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we are waiting for 200 ms to confirm no response is sent in this case. We can probably replace the sleep with waitForCoordinatorToProcessActions. Then we can immediately assert the sent events count hasn't changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for point is out, I have change it .

// Checkpoint StatisticEvent + RequestGlobalStatisticsEvent
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(2);

// signature is different
dataStatisticsCoordinator.handleEventFromOperator(
0, 0, new RequestGlobalStatisticsEvent(correctSignature + 1));

// Checkpoint StatisticEvent + RequestGlobalStatisticsEvent + RequestGlobalStatisticsEvent
Awaitility.await("wait for second statistics event")
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> receivingTasks.getSentEventsForSubtask(0).size() == 3);
}
}

static void setAllTasksReady(
int subtasks,
DataStatisticsCoordinator dataStatisticsCoordinator,
Expand Down