Skip to content

Commit 4b82788

Browse files
jerqixianjingfeng
authored andcommitted
Revert "[apache#886] fix(mr): MR Client may lost data or throw exception when rss.storage.type without MEMORY. (apache#887)"
This reverts commit 4423b43.
1 parent 926c314 commit 4b82788

File tree

2 files changed

+7
-110
lines changed

2 files changed

+7
-110
lines changed

client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,12 @@ public void waitSendFinished() {
260260
sendBuffersToServers();
261261
}
262262
long start = System.currentTimeMillis();
263+
long commitDuration = 0;
264+
if (!isMemoryShuffleEnabled) {
265+
long s = System.currentTimeMillis();
266+
sendCommit();
267+
commitDuration = System.currentTimeMillis() - s;
268+
}
263269
while (true) {
264270
// if failed when send data to shuffle server, mark task as failed
265271
if (failedBlockIds.size() > 0) {
@@ -285,12 +291,6 @@ public void waitSendFinished() {
285291
throw new RssException(errorMsg);
286292
}
287293
}
288-
long commitDuration = 0;
289-
if (!isMemoryShuffleEnabled) {
290-
long s = System.currentTimeMillis();
291-
sendCommit();
292-
commitDuration = System.currentTimeMillis() - s;
293-
}
294294

295295
start = System.currentTimeMillis();
296296
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 0,

client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java

Lines changed: 1 addition & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.hadoop.mapred;
1919

20-
import java.util.ArrayList;
2120
import java.util.List;
2221
import java.util.Map;
2322
import java.util.Random;
2423
import java.util.Set;
2524
import java.util.function.Supplier;
26-
import java.util.stream.Collectors;
2725

2826
import com.google.common.collect.Sets;
2927
import org.apache.hadoop.conf.Configuration;
@@ -263,91 +261,9 @@ public void testWriteNormal() throws Exception {
263261
assertTrue(manager.getWaitSendBuffers().isEmpty());
264262
}
265263

266-
@Test
267-
public void testCommitBlocksWhenMemoryShuffleDisabled() throws Exception {
268-
JobConf jobConf = new JobConf(new Configuration());
269-
SerializationFactory serializationFactory = new SerializationFactory(jobConf);
270-
MockShuffleWriteClient client = new MockShuffleWriteClient();
271-
client.setMode(3);
272-
Map<Integer, List<ShuffleServerInfo>> partitionToServers = JavaUtils.newConcurrentMap();
273-
Set<Long> successBlocks = Sets.newConcurrentHashSet();
274-
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
275-
Counters.Counter mapOutputByteCounter = new Counters.Counter();
276-
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
277-
SortWriteBufferManager<BytesWritable, BytesWritable> manager;
278-
manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
279-
10240,
280-
1L,
281-
10,
282-
serializationFactory.getSerializer(BytesWritable.class),
283-
serializationFactory.getSerializer(BytesWritable.class),
284-
WritableComparator.get(BytesWritable.class),
285-
0.9,
286-
"test",
287-
client,
288-
500,
289-
5 * 1000,
290-
partitionToServers,
291-
successBlocks,
292-
failedBlocks,
293-
mapOutputByteCounter,
294-
mapOutputRecordCounter,
295-
1,
296-
100,
297-
1,
298-
false,
299-
5,
300-
0.2f,
301-
1024000L,
302-
new RssConf());
303-
Random random = new Random();
304-
for (int i = 0; i < 1000; i++) {
305-
byte[] key = new byte[20];
306-
byte[] value = new byte[1024];
307-
random.nextBytes(key);
308-
random.nextBytes(value);
309-
int partitionId = random.nextInt(50);
310-
manager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
311-
}
312-
manager.waitSendFinished();
313-
assertTrue(manager.getWaitSendBuffers().isEmpty());
314-
// When MEMOEY storage type is disable, all blocks should flush.
315-
assertEquals(client.mockedShuffleServer.getFinishBlockSize(), client.mockedShuffleServer.getFlushBlockSize());
316-
}
317-
318-
class MockShuffleServer {
319-
320-
// All methods of MockShuffle are thread safe, because send-thread may do something in concurrent way.
321-
private List<ShuffleBlockInfo> cachedBlockInfos = new ArrayList<>();
322-
private List<ShuffleBlockInfo> flushBlockInfos = new ArrayList<>();
323-
private List<Long> finishedBlockInfos = new ArrayList<>();
324-
325-
public synchronized void finishShuffle() {
326-
flushBlockInfos.addAll(cachedBlockInfos);
327-
}
328-
329-
public synchronized void addCachedBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
330-
cachedBlockInfos.addAll(shuffleBlockInfoList);
331-
}
332-
333-
public synchronized void addFinishedBlockInfos(List<Long> shuffleBlockInfoList) {
334-
finishedBlockInfos.addAll(shuffleBlockInfoList);
335-
}
336-
337-
public synchronized int getFlushBlockSize() {
338-
return flushBlockInfos.size();
339-
}
340-
341-
public synchronized int getFinishBlockSize() {
342-
return finishedBlockInfos.size();
343-
}
344-
}
345-
346264
class MockShuffleWriteClient implements ShuffleWriteClient {
347265

348266
int mode = 0;
349-
MockShuffleServer mockedShuffleServer = new MockShuffleServer();
350-
int committedMaps = 0;
351267

352268
public void setMode(int mode) {
353269
this.mode = mode;
@@ -361,15 +277,6 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
361277
} else if (mode == 1) {
362278
return new SendShuffleDataResult(Sets.newHashSet(2L), Sets.newHashSet(1L));
363279
} else {
364-
if (mode == 3) {
365-
try {
366-
Thread.sleep(10);
367-
mockedShuffleServer.addCachedBlockInfos(shuffleBlockInfoList);
368-
} catch (InterruptedException e) {
369-
Thread.currentThread().interrupt();
370-
throw new RssException(e);
371-
}
372-
}
373280
Set<Long> successBlockIds = Sets.newHashSet();
374281
for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
375282
successBlockIds.add(blockInfo.getBlockId());
@@ -401,13 +308,6 @@ public void registerShuffle(
401308

402309
@Override
403310
public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
404-
if (mode == 3) {
405-
committedMaps++;
406-
if (committedMaps >= numMaps) {
407-
mockedShuffleServer.finishShuffle();
408-
}
409-
return true;
410-
}
411311
return false;
412312
}
413313

@@ -429,10 +329,7 @@ public RemoteStorageInfo fetchRemoteStorage(String appId) {
429329
@Override
430330
public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
431331
int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
432-
if (mode == 3) {
433-
mockedShuffleServer.addFinishedBlockInfos(
434-
partitionToBlockIds.values().stream().flatMap(it -> it.stream()).collect(Collectors.toList()));
435-
}
332+
436333
}
437334

438335
@Override

0 commit comments

Comments
 (0)