Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ private void handleRequestGlobalStatisticsEvent(int subtask, RequestGlobalStatis
if (globalStatistics != null) {
runInCoordinatorThread(
() -> {
if (event.signature() != null && event.signature() != globalStatistics.hashCode()) {
if (event.signature() != null && event.signature() == globalStatistics.hashCode()) {
Copy link
Contributor

@stevenzwu stevenzwu Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for catching the bug. I think your fix is correct.

Should we also fix the log message as following?

Skip responding to statistics request from subtask {}, as the operator task already holds the same global statistics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have change it now.

LOG.debug(
"Skip responding to statistics request from subtask {}, as hashCode matches or not included in the request",
"Skip responding to statistics request from subtask {}, as the operator task already holds the same global statistics",
subtask);
} else {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,59 @@ public void testRequestGlobalStatisticsEventHandling() throws Exception {
}
}

@Test
public void testMultipleRequestGlobalStatisticsEvents() throws Exception {
try (DataStatisticsCoordinator dataStatisticsCoordinator =
createCoordinator(StatisticsType.Map)) {
dataStatisticsCoordinator.start();
tasksReady(dataStatisticsCoordinator);

StatisticsEvent checkpoint1Subtask0DataStatisticEvent =
Fixtures.createStatisticsEvent(
StatisticsType.Map, Fixtures.TASK_STATISTICS_SERIALIZER, 1L, CHAR_KEYS.get("a"));
StatisticsEvent checkpoint1Subtask1DataStatisticEvent =
Fixtures.createStatisticsEvent(
StatisticsType.Map, Fixtures.TASK_STATISTICS_SERIALIZER, 1L, CHAR_KEYS.get("b"));

dataStatisticsCoordinator.handleEventFromOperator(
0, 0, checkpoint1Subtask0DataStatisticEvent);
dataStatisticsCoordinator.handleEventFromOperator(
1, 0, checkpoint1Subtask1DataStatisticEvent);

waitForCoordinatorToProcessActions(dataStatisticsCoordinator);

// signature is null
dataStatisticsCoordinator.handleEventFromOperator(0, 0, new RequestGlobalStatisticsEvent());

// Checkpoint StatisticEvent + RequestGlobalStatisticsEvent
Awaitility.await("wait for first statistics event")
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> receivingTasks.getSentEventsForSubtask(0).size() == 2);

// Simulate the scenario where a subtask send global statistics request with the same hash
// code. The coordinator would skip the response after comparing the request contained hash
// code with latest global statistics hash code.
int correctSignature = dataStatisticsCoordinator.globalStatistics().hashCode();
dataStatisticsCoordinator.handleEventFromOperator(
0, 0, new RequestGlobalStatisticsEvent(correctSignature));

waitForCoordinatorToProcessActions(dataStatisticsCoordinator);
// Checkpoint StatisticEvent + RequestGlobalStatisticsEvent
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(2);

// signature is different
dataStatisticsCoordinator.handleEventFromOperator(
0, 0, new RequestGlobalStatisticsEvent(correctSignature + 1));

// Checkpoint StatisticEvent + RequestGlobalStatisticsEvent + RequestGlobalStatisticsEvent
Awaitility.await("wait for second statistics event")
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> receivingTasks.getSentEventsForSubtask(0).size() == 3);
}
}

static void setAllTasksReady(
int subtasks,
DataStatisticsCoordinator dataStatisticsCoordinator,
Expand Down