Skip to content

Commit adb41bc

Browse files
committed
[#2472] feat(spark): Add an rpc method to obtain the uniffleId and delete the Write Stage for retry at the same time.
1 parent 1a577e7 commit adb41bc

File tree

53 files changed

+558
-1642
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+558
-1642
lines changed

client-spark/common/src/main/java/org/apache/ShuffleIdMappingManager.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ public ShuffleIdMappingManager() {
5454
shuffleDeterminateMap = JavaUtils.newConcurrentMap();
5555
}
5656

57+
/**
58+
* Record the Determinate status of each shuffle.
59+
*
60+
* @param shuffleId
61+
* @param isDeterminate
62+
*/
63+
public void recordShuffleIdDeterminate(int shuffleId, boolean isDeterminate) {
64+
shuffleDeterminateMap.put(shuffleId, isDeterminate);
65+
}
66+
67+
public boolean getShuffleIdDeterminate(int shuffleId) {
68+
return shuffleDeterminateMap.get(shuffleId);
69+
}
70+
5771
/**
5872
* Create the shuffleId of uniffle based on the ShuffleID of Spark. When registerShuffle is being
5973
* performed, the default number of attempts by our stage is 0.
@@ -74,7 +88,48 @@ public int createUniffleShuffleId(int shuffleId) {
7488
.get(0);
7589
}
7690

77-
public void recordShuffleIdDeterminate(int shuffleId, boolean isDeterminate) {
78-
shuffleDeterminateMap.put(shuffleId, isDeterminate);
91+
/**
92+
* A new shuffleId is generated based on the shuffleId of Spark and the stage Attempt number of
93+
* Spark, and it is generally called only when the stage retry is triggered.
94+
*
95+
* @param shuffleId
96+
* @param stageAttemptNumber
97+
* @return
98+
*/
99+
public int createUniffleShuffleId(int shuffleId, int stageAttemptNumber) {
100+
int generatorShuffleId = shuffleIdGenerator.incrementAndGet();
101+
shuffleIdMapping.get(shuffleId).put(stageAttemptNumber, generatorShuffleId);
102+
return generatorShuffleId;
103+
}
104+
105+
/**
106+
* Determine whether there is a record based on the passed-in Spark shuffleId and the number of
107+
* Spark Stage attempts.
108+
*
109+
* @param shuffleId
110+
* @param stageAttemptNumber
111+
* @return
112+
*/
113+
public boolean hasUniffleShuffleIdByStageNumber(int shuffleId, int stageAttemptNumber) {
114+
return shuffleIdMapping.get(shuffleId).get(stageAttemptNumber) != null;
115+
}
116+
117+
public int getUniffleShuffleIdByStageNumber(int shuffleId, int stageAttemptNumber) {
118+
return shuffleIdMapping.get(shuffleId).get(stageAttemptNumber);
119+
}
120+
121+
/**
122+
* Based on the passed-in Spark shuffleId, obtain all the stage attemptNumber. Sort in reverse
123+
* according to the stage attemptNumber to obtain the current Uniffle shuffleId
124+
*
125+
* @param shuffleId
126+
* @return
127+
*/
128+
public int getUniffleShuffleIdByMaxStageNumber(int shuffleId) {
129+
return shuffleIdMapping.get(shuffleId).entrySet().stream()
130+
.sorted(Map.Entry.<Integer, Integer>comparingByKey().reversed())
131+
.findFirst()
132+
.get()
133+
.getValue();
79134
}
80135
}

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,6 @@ public class RssSparkConfig {
4646
.withDeprecatedKeys(RssClientConfig.RSS_RESUBMIT_STAGE)
4747
.withDescription("Whether to enable the resubmit stage for fetch/write failure");
4848

49-
public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED =
50-
ConfigOptions.key("rss.stageRetry.fetchFailureEnabled")
51-
.booleanType()
52-
.defaultValue(false)
53-
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
54-
.withDescription(
55-
"If set to true, the stage retry mechanism will be enabled when a fetch failure occurs.");
56-
57-
public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED =
58-
ConfigOptions.key("rss.stageRetry.writeFailureEnabled")
59-
.booleanType()
60-
.defaultValue(false)
61-
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
62-
.withDescription(
63-
"If set to true, the stage retry mechanism will be enabled when a write failure occurs.");
64-
6549
public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
6650
ConfigOptions.key("rss.blockId.selfManagementEnabled")
6751
.booleanType()

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import org.apache.uniffle.common.exception.RssFetchFailedException;
5555
import org.apache.uniffle.common.util.Constants;
5656

57-
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
57+
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_ENABLED;
5858

5959
public class RssSparkShuffleUtils {
6060

@@ -366,17 +366,21 @@ public static RssException reportRssFetchFailedException(
366366
SparkConf sparkConf,
367367
String appId,
368368
int shuffleId,
369+
int uniffleShuffleId,
369370
int stageAttemptId,
371+
int stageAttemptNumber,
370372
Set<Integer> failedPartitions) {
371373
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
372-
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
374+
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_ENABLED)
373375
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
374376
for (int partitionId : failedPartitions) {
375377
RssReportShuffleFetchFailureRequest req =
376378
new RssReportShuffleFetchFailureRequest(
377379
appId,
378380
shuffleId,
381+
uniffleShuffleId,
379382
stageAttemptId,
383+
stageAttemptNumber,
380384
partitionId,
381385
rssFetchFailedException.getMessage());
382386
RssReportShuffleFetchFailureResponse response =

client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfoManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,26 @@
2121
import java.io.IOException;
2222
import java.util.Map;
2323

24-
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
24+
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
2525

2626
import org.apache.uniffle.common.util.JavaUtils;
2727

2828
public class ShuffleHandleInfoManager implements Closeable {
29-
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
29+
private Map<Integer, MutableShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
3030

3131
public ShuffleHandleInfoManager() {
3232
this.shuffleIdToShuffleHandleInfo = JavaUtils.newConcurrentMap();
3333
}
3434

35-
public ShuffleHandleInfo get(int shuffleId) {
35+
public MutableShuffleHandleInfo get(int shuffleId) {
3636
return shuffleIdToShuffleHandleInfo.get(shuffleId);
3737
}
3838

3939
public void remove(int shuffleId) {
4040
shuffleIdToShuffleHandleInfo.remove(shuffleId);
4141
}
4242

43-
public void register(int shuffleId, ShuffleHandleInfo handle) {
43+
public void register(int shuffleId, MutableShuffleHandleInfo handle) {
4444
shuffleIdToShuffleHandleInfo.put(shuffleId, handle);
4545
}
4646

client-spark/common/src/main/java/org/apache/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java

Lines changed: 0 additions & 144 deletions
This file was deleted.

client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ private RssFetchFailedIterator(Builder builder, Iterator<Product2<K, C>> iterato
4848
public static class Builder {
4949
private String appId;
5050
private int shuffleId;
51+
private int uniffleShuffleId;
5152
private int partitionId;
5253
private int stageAttemptId;
54+
private int stageAttemptNumber;
5355
private Supplier<ShuffleManagerClient> managerClientSupplier;
5456

5557
private Builder() {}
@@ -64,6 +66,11 @@ Builder shuffleId(int shuffleId) {
6466
return this;
6567
}
6668

69+
Builder uniffleShuffleId(int uniffleShuffleId) {
70+
this.uniffleShuffleId = uniffleShuffleId;
71+
return this;
72+
}
73+
6774
Builder partitionId(int partitionId) {
6875
this.partitionId = partitionId;
6976
return this;
@@ -74,6 +81,11 @@ Builder stageAttemptId(int stageAttemptId) {
7481
return this;
7582
}
7683

84+
Builder stageAttemptNumber(int stageAttemptNumber) {
85+
this.stageAttemptNumber = stageAttemptNumber;
86+
return this;
87+
}
88+
7789
Builder managerClientSupplier(Supplier<ShuffleManagerClient> managerClientSupplier) {
7890
this.managerClientSupplier = managerClientSupplier;
7991
return this;
@@ -95,7 +107,9 @@ private RssException generateFetchFailedIfNecessary(RssFetchFailedException e) {
95107
new RssReportShuffleFetchFailureRequest(
96108
builder.appId,
97109
builder.shuffleId,
110+
builder.uniffleShuffleId,
98111
builder.stageAttemptId,
112+
builder.stageAttemptNumber,
99113
builder.partitionId,
100114
e.getMessage());
101115
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,19 @@
2525
public class AddBlockEvent {
2626

2727
private String taskId;
28-
private int stageAttemptNumber;
2928
private List<ShuffleBlockInfo> shuffleDataInfoList;
3029
private List<Runnable> processedCallbackChain;
3130
private WriteBufferManager bufferManager;
3231

3332
public AddBlockEvent(String taskId, List<ShuffleBlockInfo> shuffleDataInfoList) {
34-
this(taskId, 0, shuffleDataInfoList, null);
33+
this(taskId, shuffleDataInfoList, null);
3534
}
3635

3736
public AddBlockEvent(
3837
String taskId,
39-
int stageAttemptNumber,
4038
List<ShuffleBlockInfo> shuffleDataInfoList,
4139
WriteBufferManager writeBufferManager) {
4240
this.taskId = taskId;
43-
this.stageAttemptNumber = stageAttemptNumber;
4441
this.shuffleDataInfoList = shuffleDataInfoList;
4542
this.processedCallbackChain = new ArrayList<>();
4643
this.bufferManager = writeBufferManager;
@@ -55,10 +52,6 @@ public String getTaskId() {
5552
return taskId;
5653
}
5754

58-
public int getStageAttemptNumber() {
59-
return stageAttemptNumber;
60-
}
61-
6255
public List<ShuffleBlockInfo> getShuffleDataInfoList() {
6356
return shuffleDataInfoList;
6457
}

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,7 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
9393
try {
9494
result =
9595
shuffleWriteClient.sendShuffleData(
96-
rssAppId,
97-
event.getStageAttemptNumber(),
98-
shuffleBlockInfoList,
99-
() -> !isValidTask(taskId));
96+
rssAppId, shuffleBlockInfoList, () -> !isValidTask(taskId));
10097
putBlockId(taskToSuccessBlockIds, taskId, result.getSuccessBlockIds());
10198
putFailedBlockSendTracker(
10299
taskToFailedBlockSendTracker, taskId, result.getFailedBlockSendTracker());

0 commit comments

Comments
 (0)