@@ -235,7 +235,7 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
235235 * @throws StatusException if the write to the server failed
236236 */
237237 @ ExperimentalApi ("https://github.com/grpc/grpc-java/issues/10918" )
238- public static <ReqT , RespT > BlockingClientCall <? , RespT > blockingV2ServerStreamingCall (
238+ public static <ReqT , RespT > BlockingClientCall <ReqT , RespT > blockingV2ServerStreamingCall (
239239 Channel channel , MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , ReqT req )
240240 throws InterruptedException , StatusException {
241241 BlockingClientCall <ReqT , RespT > call =
@@ -436,7 +436,7 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T
436436 abstract void onStart ();
437437 }
438438
439- private static class CallToStreamObserverAdapter <ReqT >
439+ private static final class CallToStreamObserverAdapter <ReqT >
440440 extends ClientCallStreamObserver <ReqT > {
441441 private boolean frozen ;
442442 private final ClientCall <ReqT , ?> call ;
@@ -787,7 +787,7 @@ void onStart() {
787787 }
788788
789789 @ SuppressWarnings ("serial" )
790- static final class ThreadlessExecutor extends ConcurrentLinkedQueue <Runnable >
790+ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue <Runnable >
791791 implements Executor {
792792 private static final Logger log = Logger .getLogger (ThreadlessExecutor .class .getName ());
793793
@@ -804,12 +804,14 @@ static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
804804 * Must only be called by one thread at a time.
805805 */
806806 public void waitAndDrain () throws InterruptedException {
807+ throwIfInterrupted ();
807808 Runnable runnable = poll ();
808809 if (runnable == null ) {
809810 waiter = Thread .currentThread ();
810811 try {
811812 while ((runnable = poll ()) == null ) {
812813 LockSupport .park (this );
814+ throwIfInterrupted ();
813815 }
814816 } finally {
815817 waiter = null ;
@@ -820,6 +822,12 @@ public void waitAndDrain() throws InterruptedException {
820822 } while ((runnable = poll ()) != null );
821823 }
822824
825+ private static void throwIfInterrupted () throws InterruptedException {
826+ if (Thread .interrupted ()) {
827+ throw new InterruptedException ();
828+ }
829+ }
830+
823831 /**
824832 * Called after final call to {@link #waitAndDrain()}, from same thread.
825833 */
@@ -857,7 +865,7 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Ru
857865 private static final Logger log =
858866 Logger .getLogger (ThreadSafeThreadlessExecutor .class .getName ());
859867
860- private Lock waiterLock = new ReentrantLock ();
868+ private final Lock waiterLock = new ReentrantLock ();
861869 private final Condition waiterCondition = waiterLock .newCondition ();
862870
863871 // Non private to avoid synthetic class
0 commit comments