Skip to content

Conversation

@yashmayya
Copy link
Contributor

@yashmayya yashmayya commented Dec 4, 2025

  • This is an important bugfix for the multi-stage query engine. Currently, for certain kinds of join queries, we can see incorrect results intermittently.
  • For instance, if two inputs to a join are both grouping aggregates where the group keys are the same as the join keys, the MSE query planner infers that we can do a "pre-partitioned" send where workers are connected between the stages 1:1 (if no parallelism), instead of doing a full partitioned shuffle (see PinotRelDistributionTraitRule, RelToPlanNodeConverter, MailboxAssignmentVisitor).
  • An important assumption for the above to work correctly is that the same sender worker ID corresponds to the same partition / hash bucket for the grouping keys (which are also the join keys in the downstream stage). However, this is a fragile assumption with the current state of things and relies on non-deterministic HashMap iteration during worker assignment at various places in WorkerManager.
  • The fix here is simple - we ensure that the mailbox list used for data routing in HashExchange is sorted by the receiver worker ID. Note that the v2 physical planner already does the same thing here.
  • With the fix here, it's now guaranteed that the same partitions / hash buckets will land on workers with the same IDs on all branches in a query plan (when the number of workers is the same), thus making pre-partitioned sends deterministically correct. Note that the underlying physical servers can be different and that's okay here, since we're simply doing a pre-partitioned send and not assuming full colocation.

@yashmayya yashmayya added bugfix multi-stage Related to the multi-stage query engine labels Dec 4, 2025
@codecov-commenter
Copy link

codecov-commenter commented Dec 4, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 63.25%. Comparing base (894665c) to head (df3fb09).
⚠️ Report is 12 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17323      +/-   ##
============================================
- Coverage     63.26%   63.25%   -0.01%     
- Complexity     1433     1474      +41     
============================================
  Files          3133     3135       +2     
  Lines        186168   186477     +309     
  Branches      28416    28495      +79     
============================================
+ Hits         117774   117958     +184     
- Misses        59326    59413      +87     
- Partials       9068     9106      +38     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.21% <100.00%> (+0.02%) ⬆️
java-21 63.22% <100.00%> (-0.02%) ⬇️
temurin 63.25% <100.00%> (-0.01%) ⬇️
unittests 63.25% <100.00%> (-0.01%) ⬇️
unittests1 55.67% <100.00%> (+<0.01%) ⬆️
unittests2 33.90% <0.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yashmayya
Copy link
Contributor Author

yashmayya commented Dec 5, 2025

Backward compatibility regression tests are failing with an interesting error, I'll need to take a closer look. Also weirdly enough, it's failing against master and 1.4.0 but passing with 1.3.0.

Edit: the issue is because we're sorting at execution time instead of plan time, so different sender stages could send partitions to different workers in the same receiver stage when the servers are on different versions. Moving the sort to plan time in the broker should fix the issue.

@yashmayya yashmayya force-pushed the mse-deterministic-hash-exchange-routing branch from 1b61486 to b8f8073 Compare December 5, 2025 02:07
Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an integration test with an example?

// receiving worker.
// Without this sorting, different stages could route the same hash value to different workers, resulting in
// incorrect join/union/intersect results for pre-partitioned sends (where a full partition shuffle is skipped).
mailboxInfoList.sort(Comparator.comparingInt(info -> info.getWorkerIds().get(0)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure the worker ids is going to have exactly one instance? Shouldn't we sort by all elements in case the first element is equal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be more than one if there's stage parallelism, but in that case the worker IDs per physical server should be contiguous IIUC -

for (QueryServerInstance serverInstance : candidateServers) {
for (int i = 0; i < stageParallelism; i++) {
workerIdToServerInstanceMap.put(workerId++, serverInstance);
}
. The v2 planner also does a similar sort -
// IMP: Return mailbox info sorted by workerIds. This is because SendingMailbox are created in this order, and
// record assignment for hash exchange follows modulo arithmetic. e.g. if we have sending mailbox in order:
// [worker-1, worker-0], then records with modulo 0 hash would end up in worker-1.
// Note that the workerIds list will be >1 in length only when there's a parallelism change. It's important to
// also know that MailboxSendOperator will iterate over this List<MailboxInfo> in order, and within each iteration
// iterate over all the workerIds of that MailboxInfo. The result List<SendingMailbox> is used for modulo
// arithmetic for any partitioning exchange strategy.
result.sort(Comparator.comparingInt(info -> info.getWorkerIds().get(0)));

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!
Let's also document the contract for this visitor as java-doc for this class.
Trying to understand where does the code break. Without this fix, mailboxInfoList won't have deterministic order of servers for each worker id. Where do we access it based on the index of the server?

// receiving worker.
// Without this sorting, different stages could route the same hash value to different workers, resulting in
// incorrect join/union/intersect results for pre-partitioned sends (where a full partition shuffle is skipped).
mailboxInfoList.sort(Comparator.comparingInt(info -> info.getWorkerIds().get(0)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easier to follow if we change serverToWorkerIdsMap to a LinkedHashMap? Essentially we want to preserve the worker id order in the generated server list

@yashmayya
Copy link
Contributor Author

Without this fix, mailboxInfoList won't have deterministic order of servers for each worker id. Where do we access it based on the index of the server?

It's eventually used here through the MailboxSendOperator -

protected void route(List<SendingMailbox> destinations, MseBlock.Data block) {
int numMailboxes = destinations.size();
if (numMailboxes == 1 || _keySelector == EmptyKeySelector.INSTANCE) {
sendBlock(destinations.get(0), block);
return;
}
List<Object[]>[] mailboxIdToRowsMap = new List[numMailboxes];
for (int i = 0; i < numMailboxes; i++) {
mailboxIdToRowsMap[i] = new ArrayList<>();
}
RowHeapDataBlock rowHeapBlock = block.asRowHeap();
List<Object[]> rows = rowHeapBlock.getRows();
for (Object[] row : rows) {
int mailboxId = _keySelector.computeHash(row) % numMailboxes;
mailboxIdToRowsMap[mailboxId].add(row);
}
AggregationFunction[] aggFunctions = rowHeapBlock.getAggFunctions();
for (int i = 0; i < numMailboxes; i++) {
if (!mailboxIdToRowsMap[i].isEmpty()) {
sendBlock(destinations.get(i),
new RowHeapDataBlock(mailboxIdToRowsMap[i], block.getDataSchema(), aggFunctions));
}
}
}

List<MailboxInfo> mailboxInfos =
context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
List<RoutingInfo> routingInfos =
MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId,
mailboxInfos);
List<SendingMailbox> sendingMailboxes = routingInfos.stream()
.map(v -> mailboxService.getSendingMailbox(v.getHostname(), v.getPort(), v.getMailboxId(), deadlineMs, statMap))
.collect(Collectors.toList());
statMap.merge(StatKey.FAN_OUT, sendingMailboxes.size());
return BlockExchange.getExchange(sendingMailboxes, distributionType, node.getKeys(), splitter,
node.getHashFunction());

Is it easier to follow if we change serverToWorkerIdsMap to a LinkedHashMap? Essentially we want to preserve the worker id order in the generated server list

Hm no, I believe the issue is more so that there's no guarantee that across different branches in the same query plan tree, we'll have the same partition <-> workerId mapping because the worker / mailbox order depends on the iteration order of various maps (enabled server instances, routing table etc.) in WorkerManager. The particular issue that this patch is aiming to fix is when inputs to a join are determined to be "pre-partitioned" based on something like a grouping aggregation using the same keys as the join keys. In this case, we need to ensure that the join inputs have the same partitions / hash buckets on the same workers, because the way we do the distribution then relies on a fixed upstream worker -> downstream worker mapping -

// 1-to-1 mapping
for (int workerId = 0; workerId < numSenders; workerId++) {
QueryServerInstance senderServer = senderServerMap.get(workerId);
QueryServerInstance receiverServer = receiverServerMap.get(workerId);
List<Integer> workerIds = List.of(workerId);
MailboxInfos senderMailboxInfos;
MailboxInfos receiverMailboxInfos;
if (senderServer.equals(receiverServer)) {
senderMailboxInfos = new SharedMailboxInfos(
new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds));
receiverMailboxInfos = senderMailboxInfos;
} else {
senderMailboxInfos = new MailboxInfos(
new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds));
receiverMailboxInfos = new MailboxInfos(
new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds));
}
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(receiverStageId, receiverMailboxInfos);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(senderStageId, senderMailboxInfos);
}

IMO, the cleanest solution is guaranteeing that the workerIds themselves are ordered here so that the partition assignment is deterministic (this also happens to be the same solution chosen by the v2 physical planner).

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync'ed offline. LinkedHashMap and the current explicit sorting achieve the same goal: deterministic order of servers based on worker order. Both of them work.

The root cause of this issue is the indeterministic order from worker id to server, which should be fixed separately. Let's add a TODO for that

@yashmayya
Copy link
Contributor Author

Sync'ed offline. LinkedHashMap and the current explicit sorting achieve the same goal: deterministic order of servers based on worker order. Both of them work.

Yup that's right, thanks for clarifying -- I had misunderstood your earlier point.

@yashmayya yashmayya merged commit 1ce48fb into apache:master Dec 8, 2025
17 of 18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bugfix multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants