Skip to content

Commit 1a577e7

Browse files
committed
[#2469] feat(spark): Create a unique ShuffleId for Uniffle for stage retry
1 parent 057dcd2 commit 1a577e7

File tree

22 files changed

+153
-309
lines changed

22 files changed

+153
-309
lines changed

client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ public Thread newThread(Runnable r) {
285285
ShuffleDataDistributionType.NORMAL,
286286
RssMRConfig.toRssConf(conf)
287287
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
288-
0,
289288
remoteMergeEnable
290289
? MergeContext.newBuilder()
291290
.setKeyClass(conf.getMapOutputKeyClass().getName())

client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,6 @@ public void registerShuffle(
721721
RemoteStorageInfo remoteStorage,
722722
ShuffleDataDistributionType distributionType,
723723
int maxConcurrencyPerPartitionToWrite,
724-
int stageAttemptNumber,
725724
RssProtos.MergeContext mergeContext,
726725
Map<String, String> properties) {}
727726

@@ -781,9 +780,6 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
781780
int assignmentShuffleServerNumber,
782781
int estimateTaskConcurrency,
783782
Set<String> faultyServerIds,
784-
int stageId,
785-
int stageAttemptNumber,
786-
boolean reassign,
787783
long retryIntervalMs,
788784
int retryTimes) {
789785
return null;

client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,6 @@ public void registerShuffle(
507507
RemoteStorageInfo storageType,
508508
ShuffleDataDistributionType distributionType,
509509
int maxConcurrencyPerPartitionToWrite,
510-
int stageAttemptNumber,
511510
RssProtos.MergeContext mergeContext,
512511
Map<String, String> properties) {}
513512

@@ -548,9 +547,6 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
548547
int assignmentShuffleServerNumber,
549548
int estimateTaskConcurrency,
550549
Set<String> faultyServerIds,
551-
int stageId,
552-
int stageAttemptNumber,
553-
boolean reassign,
554550
long retryIntervalMs,
555551
int retryTimes) {
556552
return null;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache;
19+
20+
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
import org.apache.uniffle.common.util.JavaUtils;
24+
25+
/**
26+
* We establish a Spark shuffleId and a stage attempt number, and the correspondence between them
27+
* and the Uniffle shuffleId. The Uniffle shuffleId is incrementing forever. For example:
28+
*
29+
* <p>| spark shuffleId | stage attemptnumber | uniffle shuffleId |
30+
*
31+
* <p>| 0 | 0 | 0 |
32+
*
33+
* <p>| 0 | 1 | 1 |
34+
*
35+
* <p>| 0 | 2 | 2 |
36+
*
37+
* <p>| 0 | 3 | 3 |
38+
*
39+
* <p>| 1 | 0 | 4 |
40+
*
41+
* <p>...
42+
*/
43+
public class ShuffleIdMappingManager {
44+
// Generate a new ShuffleID.
45+
private AtomicInteger shuffleIdGenerator;
46+
// appShuffleId -> stageAttemptNumber -> newShuffleId.
47+
private Map<Integer, Map<Integer, Integer>> shuffleIdMapping;
48+
// Map the relationship between shuffleId and Determinate.
49+
private Map<Integer, Boolean> shuffleDeterminateMap;
50+
51+
public ShuffleIdMappingManager() {
52+
shuffleIdGenerator = new AtomicInteger(-1);
53+
shuffleIdMapping = JavaUtils.newConcurrentMap();
54+
shuffleDeterminateMap = JavaUtils.newConcurrentMap();
55+
}
56+
57+
/**
58+
* Create the shuffleId of uniffle based on the ShuffleID of Spark. When registerShuffle is being
59+
* performed, the default number of attempts by our stage is 0.
60+
*
61+
* @param shuffleId
62+
* @return
63+
*/
64+
public int createUniffleShuffleId(int shuffleId) {
65+
return shuffleIdMapping
66+
.computeIfAbsent(
67+
shuffleId,
68+
id -> {
69+
Map<Integer, Integer> stageNumberToNewShuffleIdMap = JavaUtils.newConcurrentMap();
70+
int newShuffleId = shuffleIdGenerator.incrementAndGet();
71+
stageNumberToNewShuffleIdMap.computeIfAbsent(0, stageNumber -> newShuffleId);
72+
return stageNumberToNewShuffleIdMap;
73+
})
74+
.get(0);
75+
}
76+
77+
public void recordShuffleIdDeterminate(int shuffleId, boolean isDeterminate) {
78+
shuffleDeterminateMap.put(shuffleId, isDeterminate);
79+
}
80+
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

Lines changed: 10 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.google.common.annotations.VisibleForTesting;
4444
import com.google.common.collect.Maps;
4545
import com.google.common.collect.Sets;
46+
import org.apache.ShuffleIdMappingManager;
4647
import org.apache.commons.collections4.CollectionUtils;
4748
import org.apache.commons.lang3.StringUtils;
4849
import org.apache.hadoop.conf.Configuration;
@@ -153,6 +154,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac
153154
protected ShuffleHandleInfoManager shuffleHandleInfoManager;
154155

155156
protected RssStageResubmitManager rssStageResubmitManager;
157+
protected ShuffleIdMappingManager shuffleIdMappingManager;
156158
protected int partitionReassignMaxServerNum;
157159
protected boolean blockIdSelfManagedEnabled;
158160
protected boolean partitionReassignEnabled;
@@ -340,6 +342,7 @@ public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
340342
rssConf.get(RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
341343
this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
342344
this.rssStageResubmitManager = new RssStageResubmitManager();
345+
this.shuffleIdMappingManager = new ShuffleIdMappingManager();
343346
}
344347

345348
@VisibleForTesting
@@ -390,6 +393,7 @@ protected RssShuffleManagerBase(
390393
rssConf.get(RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
391394
this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
392395
this.rssStageResubmitManager = new RssStageResubmitManager();
396+
this.shuffleIdMappingManager = new ShuffleIdMappingManager();
393397
}
394398

395399
public BlockIdManager getBlockIdManager() {
@@ -979,10 +983,7 @@ public boolean reassignOnStageResubmit(
979983
1,
980984
requiredShuffleServerNumber,
981985
estimateTaskConcurrency,
982-
rssStageResubmitManager.getServerIdBlackList(),
983-
stageAttemptId,
984-
stageAttemptNumber,
985-
false);
986+
rssStageResubmitManager.getServerIdBlackList());
986987
MutableShuffleHandleInfo shuffleHandleInfo =
987988
new MutableShuffleHandleInfo(
988989
shuffleId, partitionToServers, getRemoteStorageInfo(), partitionSplitMode);
@@ -1283,10 +1284,7 @@ private Set<ShuffleServerInfo> reassignServerForTask(
12831284
.collect(Collectors.toSet());
12841285
replacementsRef.set(replacements);
12851286
return createShuffleAssignmentsInfo(replacements, partitionIds);
1286-
},
1287-
stageId,
1288-
stageAttemptNumber,
1289-
reassign);
1287+
});
12901288
return replacementsRef.get();
12911289
}
12921290

@@ -1297,10 +1295,7 @@ private Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
12971295
int assignmentShuffleServerNumber,
12981296
int estimateTaskConcurrency,
12991297
Set<String> faultyServerIds,
1300-
Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo> reassignmentHandler,
1301-
int stageId,
1302-
int stageAttemptNumber,
1303-
boolean reassign) {
1298+
Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo> reassignmentHandler) {
13041299
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
13051300
ClientUtils.validateClientType(clientType);
13061301
assignmentTags.add(clientType);
@@ -1318,9 +1313,6 @@ private Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
13181313
assignmentShuffleServerNumber,
13191314
estimateTaskConcurrency,
13201315
faultyServerIds,
1321-
stageId,
1322-
stageAttemptNumber,
1323-
reassign,
13241316
retryInterval,
13251317
retryTimes);
13261318
LOG.info("Finished reassign");
@@ -1341,10 +1333,7 @@ protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
13411333
int partitionNumPerRange,
13421334
int assignmentShuffleServerNumber,
13431335
int estimateTaskConcurrency,
1344-
Set<String> faultyServerIds,
1345-
int stageId,
1346-
int stageAttemptNumber,
1347-
boolean reassign) {
1336+
Set<String> faultyServerIds) {
13481337
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
13491338
ClientUtils.validateClientType(clientType);
13501339
assignmentTags.add(clientType);
@@ -1367,17 +1356,10 @@ protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
13671356
assignmentShuffleServerNumber,
13681357
estimateTaskConcurrency,
13691358
faultyServerIds,
1370-
stageId,
1371-
stageAttemptNumber,
1372-
reassign,
13731359
0,
13741360
0);
13751361
registerShuffleServers(
1376-
appId,
1377-
shuffleId,
1378-
response.getServerToPartitionRanges(),
1379-
getRemoteStorageInfo(),
1380-
stageAttemptNumber);
1362+
appId, shuffleId, response.getServerToPartitionRanges(), getRemoteStorageInfo());
13811363
return response.getPartitionToServers();
13821364
},
13831365
retryInterval,
@@ -1387,32 +1369,11 @@ protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
13871369
}
13881370
}
13891371

1390-
protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
1391-
int shuffleId,
1392-
int partitionNum,
1393-
int partitionNumPerRange,
1394-
int assignmentShuffleServerNumber,
1395-
int estimateTaskConcurrency,
1396-
Set<String> faultyServerIds,
1397-
int stageAttemptNumber) {
1398-
return requestShuffleAssignment(
1399-
shuffleId,
1400-
partitionNum,
1401-
partitionNumPerRange,
1402-
assignmentShuffleServerNumber,
1403-
estimateTaskConcurrency,
1404-
faultyServerIds,
1405-
-1,
1406-
stageAttemptNumber,
1407-
false);
1408-
}
1409-
14101372
protected void registerShuffleServers(
14111373
String appId,
14121374
int shuffleId,
14131375
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges,
1414-
RemoteStorageInfo remoteStorage,
1415-
int stageAttemptNumber) {
1376+
RemoteStorageInfo remoteStorage) {
14161377
if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
14171378
return;
14181379
}
@@ -1430,47 +1391,13 @@ protected void registerShuffleServers(
14301391
remoteStorage,
14311392
ShuffleDataDistributionType.NORMAL,
14321393
maxConcurrencyPerPartitionToWrite,
1433-
stageAttemptNumber,
14341394
null,
14351395
sparkConfMap);
14361396
});
14371397
LOG.info(
14381398
"Finish register shuffleId {} with {} ms", shuffleId, (System.currentTimeMillis() - start));
14391399
}
14401400

1441-
@VisibleForTesting
1442-
protected void registerShuffleServers(
1443-
String appId,
1444-
int shuffleId,
1445-
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges,
1446-
RemoteStorageInfo remoteStorage) {
1447-
if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
1448-
return;
1449-
}
1450-
LOG.info("Start to register shuffleId[{}]", shuffleId);
1451-
long start = System.currentTimeMillis();
1452-
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
1453-
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
1454-
serverToPartitionRanges.entrySet();
1455-
entries.stream()
1456-
.forEach(
1457-
entry -> {
1458-
shuffleWriteClient.registerShuffle(
1459-
entry.getKey(),
1460-
appId,
1461-
shuffleId,
1462-
entry.getValue(),
1463-
remoteStorage,
1464-
dataDistributionType,
1465-
maxConcurrencyPerPartitionToWrite,
1466-
sparkConfMap);
1467-
});
1468-
LOG.info(
1469-
"Finish register shuffleId[{}] with {} ms",
1470-
shuffleId,
1471-
(System.currentTimeMillis() - start));
1472-
}
1473-
14741401
protected RemoteStorageInfo getRemoteStorageInfo() {
14751402
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
14761403
RemoteStorageInfo defaultRemoteStorage =

client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.spark.TaskContext;
3737
import org.apache.spark.broadcast.Broadcast;
3838
import org.apache.spark.executor.ShuffleWriteMetrics;
39+
import org.apache.spark.rdd.DeterministicLevel;
3940
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
4041
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
4142
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
@@ -113,6 +114,13 @@ public <K, V, C> ShuffleHandle registerShuffle(
113114
LOG.info("Generate application id used in rss: " + appId);
114115
}
115116

117+
// If stage retry is enabled, the Deterministic status of the ShuffleId needs to be recorded.
118+
if (rssStageRetryEnabled) {
119+
shuffleIdMappingManager.recordShuffleIdDeterminate(
120+
shuffleId,
121+
dependency.rdd().getOutputDeterministicLevel() != DeterministicLevel.INDETERMINATE());
122+
}
123+
116124
if (dependency.partitioner().numPartitions() == 0) {
117125
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
118126
shuffleIdToNumMapTasks.computeIfAbsent(
@@ -147,15 +155,22 @@ public <K, V, C> ShuffleHandle registerShuffle(
147155
RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
148156
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
149157

158+
// If the stage retry parameter is enabled, you need to generate a new ShuffleID.
159+
Integer uniffleShuffleId;
160+
if (rssStageRetryEnabled) {
161+
uniffleShuffleId = shuffleIdMappingManager.createUniffleShuffleId(shuffleId);
162+
} else {
163+
uniffleShuffleId = shuffleId;
164+
}
165+
150166
Map<Integer, List<ShuffleServerInfo>> partitionToServers =
151167
requestShuffleAssignment(
152-
shuffleId,
168+
uniffleShuffleId,
153169
dependency.partitioner().numPartitions(),
154170
1,
155171
requiredShuffleServerNumber,
156172
estimateTaskConcurrency,
157-
rssStageResubmitManager.getServerIdBlackList(),
158-
0);
173+
rssStageResubmitManager.getServerIdBlackList());
159174

160175
startHeartbeat();
161176

@@ -385,7 +400,7 @@ private ShuffleServerInfo assignShuffleServer(int shuffleId, String faultyShuffl
385400
Set<String> faultyServerIds = Sets.newHashSet(faultyShuffleServerId);
386401
faultyServerIds.addAll(rssStageResubmitManager.getServerIdBlackList());
387402
Map<Integer, List<ShuffleServerInfo>> partitionToServers =
388-
requestShuffleAssignment(shuffleId, 1, 1, 1, 1, faultyServerIds, 0);
403+
requestShuffleAssignment(shuffleId, 1, 1, 1, 1, faultyServerIds);
389404
if (partitionToServers.get(0) != null && partitionToServers.get(0).size() == 1) {
390405
return partitionToServers.get(0).get(0);
391406
}

0 commit comments

Comments
 (0)