1717
1818package org .apache .hadoop .mapred ;
1919
20+ import java .util .ArrayList ;
2021import java .util .List ;
2122import java .util .Map ;
2223import java .util .Random ;
2324import java .util .Set ;
2425import java .util .function .Supplier ;
26+ import java .util .stream .Collectors ;
2527
2628import com .google .common .collect .Sets ;
2729import org .apache .hadoop .conf .Configuration ;
@@ -261,9 +263,91 @@ public void testWriteNormal() throws Exception {
261263 assertTrue (manager .getWaitSendBuffers ().isEmpty ());
262264 }
263265
266+ @ Test
267+ public void testCommitBlocksWhenMemoryShuffleDisabled () throws Exception {
268+ JobConf jobConf = new JobConf (new Configuration ());
269+ SerializationFactory serializationFactory = new SerializationFactory (jobConf );
270+ MockShuffleWriteClient client = new MockShuffleWriteClient ();
271+ client .setMode (3 );
272+ Map <Integer , List <ShuffleServerInfo >> partitionToServers = JavaUtils .newConcurrentMap ();
273+ Set <Long > successBlocks = Sets .newConcurrentHashSet ();
274+ Set <Long > failedBlocks = Sets .newConcurrentHashSet ();
275+ Counters .Counter mapOutputByteCounter = new Counters .Counter ();
276+ Counters .Counter mapOutputRecordCounter = new Counters .Counter ();
277+ SortWriteBufferManager <BytesWritable , BytesWritable > manager ;
278+ manager = new SortWriteBufferManager <BytesWritable , BytesWritable >(
279+ 10240 ,
280+ 1L ,
281+ 10 ,
282+ serializationFactory .getSerializer (BytesWritable .class ),
283+ serializationFactory .getSerializer (BytesWritable .class ),
284+ WritableComparator .get (BytesWritable .class ),
285+ 0.9 ,
286+ "test" ,
287+ client ,
288+ 500 ,
289+ 5 * 1000 ,
290+ partitionToServers ,
291+ successBlocks ,
292+ failedBlocks ,
293+ mapOutputByteCounter ,
294+ mapOutputRecordCounter ,
295+ 1 ,
296+ 100 ,
297+ 1 ,
298+ false ,
299+ 5 ,
300+ 0.2f ,
301+ 1024000L ,
302+ new RssConf ());
303+ Random random = new Random ();
304+ for (int i = 0 ; i < 1000 ; i ++) {
305+ byte [] key = new byte [20 ];
306+ byte [] value = new byte [1024 ];
307+ random .nextBytes (key );
308+ random .nextBytes (value );
309+ int partitionId = random .nextInt (50 );
310+ manager .addRecord (partitionId , new BytesWritable (key ), new BytesWritable (value ));
311+ }
312+ manager .waitSendFinished ();
313+ assertTrue (manager .getWaitSendBuffers ().isEmpty ());
314+ // When MEMOEY storage type is disable, all blocks should flush.
315+ assertEquals (client .mockedShuffleServer .getFinishBlockSize (), client .mockedShuffleServer .getFlushBlockSize ());
316+ }
317+
318+ class MockShuffleServer {
319+
320+ // All methods of MockShuffle are thread safe, because send-thread may do something in concurrent way.
321+ private List <ShuffleBlockInfo > cachedBlockInfos = new ArrayList <>();
322+ private List <ShuffleBlockInfo > flushBlockInfos = new ArrayList <>();
323+ private List <Long > finishedBlockInfos = new ArrayList <>();
324+
325+ public synchronized void finishShuffle () {
326+ flushBlockInfos .addAll (cachedBlockInfos );
327+ }
328+
329+ public synchronized void addCachedBlockInfos (List <ShuffleBlockInfo > shuffleBlockInfoList ) {
330+ cachedBlockInfos .addAll (shuffleBlockInfoList );
331+ }
332+
333+ public synchronized void addFinishedBlockInfos (List <Long > shuffleBlockInfoList ) {
334+ finishedBlockInfos .addAll (shuffleBlockInfoList );
335+ }
336+
337+ public synchronized int getFlushBlockSize () {
338+ return flushBlockInfos .size ();
339+ }
340+
341+ public synchronized int getFinishBlockSize () {
342+ return finishedBlockInfos .size ();
343+ }
344+ }
345+
264346 class MockShuffleWriteClient implements ShuffleWriteClient {
265347
266348 int mode = 0 ;
349+ MockShuffleServer mockedShuffleServer = new MockShuffleServer ();
350+ int committedMaps = 0 ;
267351
268352 public void setMode (int mode ) {
269353 this .mode = mode ;
@@ -277,6 +361,15 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
277361 } else if (mode == 1 ) {
278362 return new SendShuffleDataResult (Sets .newHashSet (2L ), Sets .newHashSet (1L ));
279363 } else {
364+ if (mode == 3 ) {
365+ try {
366+ Thread .sleep (10 );
367+ mockedShuffleServer .addCachedBlockInfos (shuffleBlockInfoList );
368+ } catch (InterruptedException e ) {
369+ Thread .currentThread ().interrupt ();
370+ throw new RssException (e );
371+ }
372+ }
280373 Set <Long > successBlockIds = Sets .newHashSet ();
281374 for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList ) {
282375 successBlockIds .add (blockInfo .getBlockId ());
@@ -308,6 +401,13 @@ public void registerShuffle(
308401
309402 @ Override
310403 public boolean sendCommit (Set <ShuffleServerInfo > shuffleServerInfoSet , String appId , int shuffleId , int numMaps ) {
404+ if (mode == 3 ) {
405+ committedMaps ++;
406+ if (committedMaps >= numMaps ) {
407+ mockedShuffleServer .finishShuffle ();
408+ }
409+ return true ;
410+ }
311411 return false ;
312412 }
313413
@@ -329,7 +429,10 @@ public RemoteStorageInfo fetchRemoteStorage(String appId) {
329429 @ Override
330430 public void reportShuffleResult (Map <Integer , List <ShuffleServerInfo >> partitionToServers , String appId ,
331431 int shuffleId , long taskAttemptId , Map <Integer , List <Long >> partitionToBlockIds , int bitmapNum ) {
332-
432+ if (mode == 3 ) {
433+ mockedShuffleServer .addFinishedBlockInfos (
434+ partitionToBlockIds .values ().stream ().flatMap (it -> it .stream ()).collect (Collectors .toList ()));
435+ }
333436 }
334437
335438 @ Override
0 commit comments