Skip to content

Commit 90539fb

Browse files
committed
Revert "[#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (#887)"
This reverts commit 4423b43.
1 parent 4423b43 commit 90539fb

File tree

2 files changed

+7
-110
lines changed

2 files changed

+7
-110
lines changed

client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,12 @@ public void waitSendFinished() {
262262
sendBuffersToServers();
263263
}
264264
long start = System.currentTimeMillis();
265+
long commitDuration = 0;
266+
if (!isMemoryShuffleEnabled) {
267+
long s = System.currentTimeMillis();
268+
sendCommit();
269+
commitDuration = System.currentTimeMillis() - s;
270+
}
265271
while (true) {
266272
// if failed when send data to shuffle server, mark task as failed
267273
if (failedBlockIds.size() > 0) {
@@ -287,12 +293,6 @@ public void waitSendFinished() {
287293
throw new RssException(errorMsg);
288294
}
289295
}
290-
long commitDuration = 0;
291-
if (!isMemoryShuffleEnabled) {
292-
long s = System.currentTimeMillis();
293-
sendCommit();
294-
commitDuration = System.currentTimeMillis() - s;
295-
}
296296

297297
start = System.currentTimeMillis();
298298
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 0,

client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java

Lines changed: 1 addition & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.hadoop.mapred;
1919

20-
import java.util.ArrayList;
2120
import java.util.List;
2221
import java.util.Map;
2322
import java.util.Random;
2423
import java.util.Set;
2524
import java.util.function.Supplier;
26-
import java.util.stream.Collectors;
2725

2826
import com.google.common.collect.Maps;
2927
import com.google.common.collect.Sets;
@@ -260,91 +258,9 @@ public void testWriteNormal() throws Exception {
260258
assertTrue(manager.getWaitSendBuffers().isEmpty());
261259
}
262260

263-
@Test
264-
public void testCommitBlocksWhenMemoryShuffleDisabled() throws Exception {
265-
JobConf jobConf = new JobConf(new Configuration());
266-
SerializationFactory serializationFactory = new SerializationFactory(jobConf);
267-
MockShuffleWriteClient client = new MockShuffleWriteClient();
268-
client.setMode(3);
269-
Map<Integer, List<ShuffleServerInfo>> partitionToServers = JavaUtils.newConcurrentMap();
270-
Set<Long> successBlocks = Sets.newConcurrentHashSet();
271-
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
272-
Counters.Counter mapOutputByteCounter = new Counters.Counter();
273-
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
274-
SortWriteBufferManager<BytesWritable, BytesWritable> manager;
275-
manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
276-
10240,
277-
1L,
278-
10,
279-
serializationFactory.getSerializer(BytesWritable.class),
280-
serializationFactory.getSerializer(BytesWritable.class),
281-
WritableComparator.get(BytesWritable.class),
282-
0.9,
283-
"test",
284-
client,
285-
500,
286-
5 * 1000,
287-
partitionToServers,
288-
successBlocks,
289-
failedBlocks,
290-
mapOutputByteCounter,
291-
mapOutputRecordCounter,
292-
1,
293-
100,
294-
1,
295-
false,
296-
5,
297-
0.2f,
298-
1024000L,
299-
new RssConf());
300-
Random random = new Random();
301-
for (int i = 0; i < 1000; i++) {
302-
byte[] key = new byte[20];
303-
byte[] value = new byte[1024];
304-
random.nextBytes(key);
305-
random.nextBytes(value);
306-
int partitionId = random.nextInt(50);
307-
manager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
308-
}
309-
manager.waitSendFinished();
310-
assertTrue(manager.getWaitSendBuffers().isEmpty());
311-
// When MEMOEY storage type is disable, all blocks should flush.
312-
assertEquals(client.mockedShuffleServer.getFinishBlockSize(), client.mockedShuffleServer.getFlushBlockSize());
313-
}
314-
315-
class MockShuffleServer {
316-
317-
// All methods of MockShuffle are thread safe, because send-thread may do something in concurrent way.
318-
private List<ShuffleBlockInfo> cachedBlockInfos = new ArrayList<>();
319-
private List<ShuffleBlockInfo> flushBlockInfos = new ArrayList<>();
320-
private List<Long> finishedBlockInfos = new ArrayList<>();
321-
322-
public synchronized void finishShuffle() {
323-
flushBlockInfos.addAll(cachedBlockInfos);
324-
}
325-
326-
public synchronized void addCachedBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
327-
cachedBlockInfos.addAll(shuffleBlockInfoList);
328-
}
329-
330-
public synchronized void addFinishedBlockInfos(List<Long> shuffleBlockInfoList) {
331-
finishedBlockInfos.addAll(shuffleBlockInfoList);
332-
}
333-
334-
public synchronized int getFlushBlockSize() {
335-
return flushBlockInfos.size();
336-
}
337-
338-
public synchronized int getFinishBlockSize() {
339-
return finishedBlockInfos.size();
340-
}
341-
}
342-
343261
class MockShuffleWriteClient implements ShuffleWriteClient {
344262

345263
int mode = 0;
346-
MockShuffleServer mockedShuffleServer = new MockShuffleServer();
347-
int committedMaps = 0;
348264

349265
public void setMode(int mode) {
350266
this.mode = mode;
@@ -358,15 +274,6 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
358274
} else if (mode == 1) {
359275
return new SendShuffleDataResult(Sets.newHashSet(2L), Sets.newHashSet(1L));
360276
} else {
361-
if (mode == 3) {
362-
try {
363-
Thread.sleep(10);
364-
mockedShuffleServer.addCachedBlockInfos(shuffleBlockInfoList);
365-
} catch (InterruptedException e) {
366-
Thread.currentThread().interrupt();
367-
throw new RssException(e);
368-
}
369-
}
370277
Set<Long> successBlockIds = Sets.newHashSet();
371278
for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
372279
successBlockIds.add(blockInfo.getBlockId());
@@ -397,13 +304,6 @@ public void registerShuffle(
397304

398305
@Override
399306
public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
400-
if (mode == 3) {
401-
committedMaps++;
402-
if (committedMaps >= numMaps) {
403-
mockedShuffleServer.finishShuffle();
404-
}
405-
return true;
406-
}
407307
return false;
408308
}
409309

@@ -425,10 +325,7 @@ public RemoteStorageInfo fetchRemoteStorage(String appId) {
425325
@Override
426326
public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
427327
int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
428-
if (mode == 3) {
429-
mockedShuffleServer.addFinishedBlockInfos(
430-
partitionToBlockIds.values().stream().flatMap(it -> it.stream()).collect(Collectors.toList()));
431-
}
328+
432329
}
433330

434331
@Override

0 commit comments

Comments
 (0)