11/*
2- * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
2+ * Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1919import java .time .Duration ;
2020import java .util .concurrent .CountDownLatch ;
2121import java .util .concurrent .TimeUnit ;
22- import java .util .concurrent .atomic .AtomicReference ;
2322
2423import org .openjdk .jcstress .annotations .Actor ;
2524import org .openjdk .jcstress .annotations .Arbiter ;
2928import org .openjdk .jcstress .annotations .State ;
3029import org .openjdk .jcstress .infra .results .IIZ_Result ;
3130import org .openjdk .jcstress .infra .results .Z_Result ;
31+ import reactor .core .Disposable ;
3232
3333public abstract class BasicSchedulersStressTest {
3434
@@ -43,11 +43,15 @@ private static boolean canScheduleTask(Scheduler scheduler) {
4343 if (scheduler .isDisposed ()) {
4444 return false ;
4545 }
46- scheduler .schedule (latch ::countDown );
46+ Disposable disposable = scheduler .schedule (latch ::countDown );
4747 boolean taskDone = false ;
4848 try {
49- taskDone = latch .await (100 , TimeUnit .MILLISECONDS );
50- } catch (InterruptedException ignored ) {
49+ taskDone = latch .await (1 , TimeUnit .SECONDS );
50+ } catch (InterruptedException ie ) {
51+ throw new RuntimeException (ie );
52+ }
53+ if (((SchedulerTask ) disposable ).future .isCancelled ()) {
54+ throw new RuntimeException ("Future cancelled " + disposable );
5155 }
5256 return taskDone ;
5357 }
@@ -78,7 +82,7 @@ public void arbiter(Z_Result r) {
7882 // At this stage, at least one actor called scheduler.start(),
7983 // so we should be able to execute a task.
8084 r .r1 = canScheduleTask (scheduler );
81- scheduler .dispose ( );
85+ scheduler .disposeGracefully (). block ( Duration . ofMillis ( 500 ) );
8286 }
8387 }
8488
@@ -88,7 +92,7 @@ public void arbiter(Z_Result r) {
8892 public static class ParallelSchedulerStartDisposeStressTest {
8993
9094 private final ParallelScheduler scheduler =
91- new ParallelScheduler (4 , Thread ::new );
95+ new ParallelScheduler (2 , Thread ::new );
9296
9397 {
9498 scheduler .init ();
@@ -109,7 +113,7 @@ public void arbiter(Z_Result r) {
109113 // At this stage, at least one actor called scheduler.start(),
110114 // so we should be able to execute a task.
111115 r .r1 = canScheduleTask (scheduler );
112- scheduler .dispose () ;
116+ scheduler .disposeGracefully (). block ( Duration . ofMillis ( 500 )); ;
113117 }
114118 }
115119
@@ -169,7 +173,7 @@ public static class ParallelSchedulerDisposeGracefullyStressTest {
169173
170174 private final CountDownLatch latch = new CountDownLatch (2 );
171175 private final ParallelScheduler scheduler =
172- new ParallelScheduler (10 , Thread ::new );
176+ new ParallelScheduler (2 , Thread ::new );
173177
174178 {
175179 scheduler .init ();
@@ -263,7 +267,7 @@ public void arbiter(IIZ_Result r) {
263267 public static class ParallelSchedulerDisposeGracefullyAndDisposeStressTest {
264268
265269 private final ParallelScheduler scheduler =
266- new ParallelScheduler (10 , Thread ::new );
270+ new ParallelScheduler (2 , Thread ::new );
267271
268272 {
269273 scheduler .init ();
0 commit comments