1818
1919package org .apache .spark .sql .hive .thriftserver
2020
21+ import java .io ._
22+
2123import scala .collection .mutable .ArrayBuffer
22- import scala .concurrent .ExecutionContext .Implicits .global
2324import scala .concurrent .duration ._
24- import scala .concurrent .{Await , Future , Promise }
25+ import scala .concurrent .{Await , Promise }
2526import scala .sys .process .{Process , ProcessLogger }
2627
27- import java .io ._
28- import java .util .concurrent .atomic .AtomicInteger
29-
3028import org .apache .hadoop .hive .conf .HiveConf .ConfVars
3129import org .scalatest .{BeforeAndAfterAll , FunSuite }
3230
33- import org .apache .spark .{ SparkException , Logging }
31+ import org .apache .spark .Logging
3432import org .apache .spark .sql .catalyst .util .getTempFilePath
3533
3634class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
5351 """ .stripMargin.split(" \\ s+" ).toSeq ++ extraArgs
5452 }
5553
56- // AtomicInteger is needed because stderr and stdout of the forked process are handled in
57- // different threads.
58- val next = new AtomicInteger (0 )
54+ var next = 0
5955 val foundAllExpectedAnswers = Promise .apply[Unit ]()
6056 val queryStream = new ByteArrayInputStream (queries.mkString(" \n " ).getBytes)
6157 val buffer = new ArrayBuffer [String ]()
58+ val lock = new Object
6259
63- def captureOutput (source : String )(line : String ) {
60+ def captureOutput (source : String )(line : String ): Unit = lock. synchronized {
6461 buffer += s " $source> $line"
65- // If we haven't found all expected answers...
66- if (next.get() < expectedAnswers.size) {
67- // If another expected answer is found...
68- if (line.startsWith(expectedAnswers(next.get()))) {
69- // If all expected answers have been found...
70- if (next.incrementAndGet() == expectedAnswers.size) {
71- foundAllExpectedAnswers.trySuccess(())
72- }
62+ // If we haven't found all expected answers and another expected answer comes up...
63+ if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
64+ next += 1
65+ // If all expected answers have been found...
66+ if (next == expectedAnswers.size) {
67+ foundAllExpectedAnswers.trySuccess(())
7368 }
7469 }
7570 }
@@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
8883 |=======================
8984 |Spark SQL CLI command line: ${command.mkString(" " )}
9085 |
91- |Executed query ${ next.get()} " ${queries(next.get() )}",
92- |But failed to capture expected output " ${expectedAnswers(next.get() )}" within $timeout.
86+ |Executed query $next " ${queries(next)}",
87+ |But failed to capture expected output " ${expectedAnswers(next)}" within $timeout.
9388 |
9489 | ${buffer.mkString(" \n " )}
9590 |===========================
0 commit comments