Skip to content

Commit 7c41d13

Browse files
scwfmarmbrus
authored andcommitted
[SPARK-3826][SQL]enable hive-thriftserver to support hive-0.13.1
In #2241 hive-thriftserver is not enabled. This patch enable hive-thriftserver to support hive-0.13.1 by using a shim layer refer to #2241. 1 A light shim layer(code in sql/hive-thriftserver/hive-version) for each different hive version to handle api compatibility 2 New pom profiles "hive-default" and "hive-versions"(copy from #2241) to activate different hive version 3 SBT cmd for different version as follows: hive-0.12.0 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.12.0 assembly hive-0.13.1 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.13.1 assembly 4 Since hive-thriftserver depend on hive subproject, this patch should be merged with #2241 to enable hive-0.13.1 for hive-thriftserver Author: wangfei <[email protected]> Author: scwf <[email protected]> Closes #2685 from scwf/shim-thriftserver1 and squashes the following commits: f26f3be [wangfei] remove clean to save time f5cac74 [wangfei] remove local hivecontext test 578234d [wangfei] use new shaded hive 18fb1ff [wangfei] exclude kryo in hive pom fa21d09 [wangfei] clean package assembly/assembly 8a4daf2 [wangfei] minor fix 0d7f6cf [wangfei] address comments f7c93ae [wangfei] adding build with hive 0.13 before running tests bcf943f [wangfei] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1 c359822 [wangfei] reuse getCommandProcessor in hiveshim 52674a4 [scwf] sql/hive included since examples depend on it 3529e98 [scwf] move hive module to hive profile f51ff4e [wangfei] update and fix conflicts f48d3a5 [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1 41f727b [scwf] revert pom changes 13afde0 [scwf] fix small bug 4b681f4 [scwf] enable thriftserver in profile hive-0.13.1 0bc53aa [scwf] fixed when result filed is null dfd1c63 [scwf] update run-tests to run hive-0.12.0 default now c6da3ce [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver 7c66b8e [scwf] update pom according spark-2706 ae47489 [scwf] update and fix conflicts
1 parent adb6415 commit 7c41d13

File tree

12 files changed

+571
-230
lines changed

12 files changed

+571
-230
lines changed

assembly/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,6 @@
201201
<artifactId>spark-hive_${scala.binary.version}</artifactId>
202202
<version>${project.version}</version>
203203
</dependency>
204-
</dependencies>
205-
</profile>
206-
<profile>
207-
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
208-
<id>hive-0.12.0</id>
209-
<dependencies>
210204
<dependency>
211205
<groupId>org.apache.spark</groupId>
212206
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>

dev/run-tests

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,24 @@ CURRENT_BLOCK=$BLOCK_BUILD
142142
# We always build with Hive because the PySpark Spark SQL tests need it.
143143
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"
144144

145-
echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS"
146145

147146
# NOTE: echo "q" is needed because sbt on encountering a build file with failure
148147
#+ (either resolution or compilation) prompts the user for input either q, r, etc
149148
#+ to quit or retry. This echo is there to make it not block.
150-
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
149+
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
151150
#+ single argument!
152151
# QUESTION: Why doesn't 'yes "q"' work?
153152
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
153+
# First build with 0.12 to ensure patches do not break the hive 12 build
154+
echo "[info] Compile with hive 0.12"
154155
echo -e "q\n" \
155-
| sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly \
156+
| sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \
157+
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
158+
159+
# Then build with default version(0.13.1) because tests are based on this version
160+
echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive"
161+
echo -e "q\n" \
162+
| sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly \
156163
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
157164
}
158165

pom.xml

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@
129129
<flume.version>1.4.0</flume.version>
130130
<zookeeper.version>3.4.5</zookeeper.version>
131131
<!-- Version used in Maven Hive dependency -->
132-
<hive.version>0.13.1</hive.version>
132+
<hive.version>0.13.1a</hive.version>
133133
<!-- Version used for internal directory structure -->
134134
<hive.version.short>0.13.1</hive.version.short>
135135
<derby.version>10.10.1.1</derby.version>
@@ -240,6 +240,18 @@
240240
<enabled>false</enabled>
241241
</snapshots>
242242
</repository>
243+
<repository>
244+
<!-- This is temporarily included to fix issues with Hive 0.13 -->
245+
<id>spark-staging-hive13</id>
246+
<name>Spring Staging Repository Hive 13</name>
247+
<url>https://oss.sonatype.org/content/repositories/orgspark-project-1089/</url>
248+
<releases>
249+
<enabled>true</enabled>
250+
</releases>
251+
<snapshots>
252+
<enabled>false</enabled>
253+
</snapshots>
254+
</repository>
243255
</repositories>
244256
<pluginRepositories>
245257
<pluginRepository>
@@ -908,9 +920,9 @@
908920
by Spark SQL for code generation. -->
909921
<compilerPlugins>
910922
<compilerPlugin>
911-
<groupId>org.scalamacros</groupId>
912-
<artifactId>paradise_${scala.version}</artifactId>
913-
<version>${scala.macros.version}</version>
923+
<groupId>org.scalamacros</groupId>
924+
<artifactId>paradise_${scala.version}</artifactId>
925+
<version>${scala.macros.version}</version>
914926
</compilerPlugin>
915927
</compilerPlugins>
916928
</configuration>
@@ -1314,14 +1326,19 @@
13141326
</dependencies>
13151327
</profile>
13161328
<profile>
1317-
<id>hive-0.12.0</id>
1329+
<id>hive</id>
13181330
<activation>
13191331
<activeByDefault>false</activeByDefault>
13201332
</activation>
1321-
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
13221333
<modules>
13231334
<module>sql/hive-thriftserver</module>
13241335
</modules>
1336+
</profile>
1337+
<profile>
1338+
<id>hive-0.12.0</id>
1339+
<activation>
1340+
<activeByDefault>false</activeByDefault>
1341+
</activation>
13251342
<properties>
13261343
<hive.version>0.12.0-protobuf-2.5</hive.version>
13271344
<hive.version.short>0.12.0</hive.version.short>

python/pyspark/sql.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,33 +1400,6 @@ def hql(self, hqlQuery):
14001400

14011401
class LocalHiveContext(HiveContext):
14021402

1403-
"""Starts up an instance of hive where metadata is stored locally.
1404-
1405-
An in-process metadata data is created with data stored in ./metadata.
1406-
Warehouse data is stored in in ./warehouse.
1407-
1408-
>>> import os
1409-
>>> hiveCtx = LocalHiveContext(sc)
1410-
>>> try:
1411-
... supress = hiveCtx.sql("DROP TABLE src")
1412-
... except Exception:
1413-
... pass
1414-
>>> kv1 = os.path.join(os.environ["SPARK_HOME"],
1415-
... 'examples/src/main/resources/kv1.txt')
1416-
>>> supress = hiveCtx.sql(
1417-
... "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
1418-
>>> supress = hiveCtx.sql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src"
1419-
... % kv1)
1420-
>>> results = hiveCtx.sql("FROM src SELECT value"
1421-
... ).map(lambda r: int(r.value.split('_')[1]))
1422-
>>> num = results.count()
1423-
>>> reduce_sum = results.reduce(lambda x, y: x + y)
1424-
>>> num
1425-
500
1426-
>>> reduce_sum
1427-
130091
1428-
"""
1429-
14301403
def __init__(self, sparkContext, sqlContext=None):
14311404
HiveContext.__init__(self, sparkContext, sqlContext)
14321405
warnings.warn("LocalHiveContext is deprecated. "

sql/hive-thriftserver/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@
7070
<groupId>org.scalatest</groupId>
7171
<artifactId>scalatest-maven-plugin</artifactId>
7272
</plugin>
73+
<plugin>
74+
<groupId>org.codehaus.mojo</groupId>
75+
<artifactId>build-helper-maven-plugin</artifactId>
76+
<executions>
77+
<execution>
78+
<id>add-default-sources</id>
79+
<phase>generate-sources</phase>
80+
<goals>
81+
<goal>add-source</goal>
82+
</goals>
83+
<configuration>
84+
<sources>
85+
<source>v${hive.version.short}/src/main/scala</source>
86+
</sources>
87+
</configuration>
88+
</execution>
89+
</executions>
90+
</plugin>
7391
<plugin>
7492
<groupId>org.apache.maven.plugins</groupId>
7593
<artifactId>maven-deploy-plugin</artifactId>
Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
2929
import org.apache.spark.Logging
3030
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
3131

32-
private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
33-
extends Driver with Logging {
32+
private[hive] abstract class AbstractSparkSQLDriver(
33+
val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {
3434

35-
private var tableSchema: Schema = _
36-
private var hiveResponse: Seq[String] = _
35+
private[hive] var tableSchema: Schema = _
36+
private[hive] var hiveResponse: Seq[String] = _
3737

3838
override def init(): Unit = {
3939
}
@@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo
7474

7575
override def getSchema: Schema = tableSchema
7676

77-
override def getResults(res: JArrayList[String]): Boolean = {
78-
if (hiveResponse == null) {
79-
false
80-
} else {
81-
res.addAll(hiveResponse)
82-
hiveResponse = null
83-
true
84-
}
85-
}
86-
8777
override def destroy() {
8878
super.destroy()
8979
hiveResponse = null

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
3838
import org.apache.thrift.transport.TSocket
3939

4040
import org.apache.spark.Logging
41+
import org.apache.spark.sql.hive.HiveShim
42+
import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim
4143

4244
private[hive] object SparkSQLCLIDriver {
4345
private var prompt = "spark-sql"
@@ -116,7 +118,7 @@ private[hive] object SparkSQLCLIDriver {
116118
}
117119
}
118120

119-
if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
121+
if (!sessionState.isRemoteMode) {
120122
// Hadoop-20 and above - we need to augment classpath using hiveconf
121123
// components.
122124
// See also: code in ExecDriver.java
@@ -258,7 +260,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
258260
} else {
259261
var ret = 0
260262
val hconf = conf.asInstanceOf[HiveConf]
261-
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
263+
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
262264

263265
if (proc != null) {
264266
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) {

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.{List => JList}
2424
import javax.security.auth.login.LoginException
2525

2626
import org.apache.commons.logging.Log
27+
import org.apache.hadoop.security.UserGroupInformation
2728
import org.apache.hadoop.hive.conf.HiveConf
2829
import org.apache.hadoop.hive.shims.ShimLoader
2930
import org.apache.hive.service.Service.STATE
@@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
4445
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
4546
setSuperField(this, "sessionManager", sparkSqlSessionManager)
4647
addService(sparkSqlSessionManager)
48+
var sparkServiceUGI: UserGroupInformation = null
4749

48-
try {
49-
HiveAuthFactory.loginFromKeytab(hiveConf)
50-
val serverUserName = ShimLoader.getHadoopShims
51-
.getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
52-
setSuperField(this, "serverUserName", serverUserName)
53-
} catch {
54-
case e @ (_: IOException | _: LoginException) =>
55-
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
50+
if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
51+
try {
52+
HiveAuthFactory.loginFromKeytab(hiveConf)
53+
sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
54+
HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
55+
} catch {
56+
case e @ (_: IOException | _: LoginException) =>
57+
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
58+
}
5659
}
5760

5861
initCompositeService(hiveConf)

0 commit comments

Comments
 (0)