@@ -22,27 +22,25 @@ import java.sql.Timestamp
2222import java .util .concurrent .Future
2323import java .util .{ArrayList => JArrayList , List => JList , Map => JMap }
2424
25- import org .apache .hadoop .hive .conf .HiveConf
26- import org .apache .hadoop .hive .ql .metadata .Hive
27- import org .apache .hadoop .hive .ql .processors .CommandProcessorFactory
28- import org .apache .hadoop .hive .ql .session .SessionState
29-
3025import scala .collection .JavaConversions ._
3126import scala .collection .mutable .{ArrayBuffer , Map => SMap }
3227import scala .math ._
3328
34- import org .apache .hadoop .hive .common .`type` .HiveDecimal
29+ import org .apache .hadoop .hive .conf .HiveConf
30+ import org .apache .hadoop .hive .ql .metadata .Hive
31+ import org .apache .hadoop .hive .ql .processors .CommandProcessorFactory
32+ import org .apache .hadoop .hive .ql .session .SessionState
3533import org .apache .hadoop .hive .metastore .api .FieldSchema
3634import org .apache .hadoop .hive .shims .ShimLoader
3735import org .apache .hadoop .security .UserGroupInformation
3836import org .apache .hive .service .cli ._
3937import org .apache .hive .service .cli .operation .ExecuteStatementOperation
4038import org .apache .hive .service .cli .session .HiveSession
39+
4140import org .apache .spark .Logging
42- import org .apache .spark .sql .catalyst .plans .logical .SetCommand
4341import org .apache .spark .sql .catalyst .types ._
44- import org .apache .spark .sql .{Row => SparkRow , SQLConf , SchemaRDD }
45- import org .apache .spark .sql .hive .{HiveMetastoreTypes , HiveContext }
42+ import org .apache .spark .sql .{Row => SparkRow , SchemaRDD }
43+ import org .apache .spark .sql .hive .{HiveContext , HiveMetastoreTypes }
4644import org .apache .spark .sql .hive .thriftserver .ReflectionUtils ._
4745
4846/**
@@ -52,7 +50,7 @@ private[thriftserver] object HiveShim {
5250 val version = " 0.13.1"
5351
5452 def setServerUserName (sparkServiceUGI : UserGroupInformation , sparkCliService: SparkSQLCLIService ) = {
55- setSuperField(sparkCliService, " serviceUGI" , sparkServiceUGI)// is this alright?
53+ setSuperField(sparkCliService, " serviceUGI" , sparkServiceUGI)
5654 }
5755
5856 def getCommandProcessor (cmd : Array [String ], conf : HiveConf ) = {
@@ -117,6 +115,39 @@ private[hive] class SparkExecuteStatementOperation(
117115 logDebug(" CLOSING" )
118116 }
119117
118+ def addNonNullColumnValue (from : SparkRow , to : ArrayBuffer [Any ], ordinal : Int ) {
119+ dataTypes(ordinal) match {
120+ case StringType =>
121+ to += from.get(ordinal).asInstanceOf [String ]
122+ case IntegerType =>
123+ to += from.getInt(ordinal)
124+ case BooleanType =>
125+ to += from.getBoolean(ordinal)
126+ case DoubleType =>
127+ to += from.getDouble(ordinal)
128+ case FloatType =>
129+ to += from.getFloat(ordinal)
130+ case DecimalType =>
131+ to += from.get(ordinal).asInstanceOf [BigDecimal ].bigDecimal
132+ case LongType =>
133+ to += from.getLong(ordinal)
134+ case ByteType =>
135+ to += from.getByte(ordinal)
136+ case ShortType =>
137+ to += from.getShort(ordinal)
138+ case TimestampType =>
139+ to += from.get(ordinal).asInstanceOf [Timestamp ]
140+ case BinaryType =>
141+ to += from.get(ordinal).asInstanceOf [String ]
142+ case _ : ArrayType =>
143+ to += from.get(ordinal).asInstanceOf [String ]
144+ case _ : StructType =>
145+ to += from.get(ordinal).asInstanceOf [String ]
146+ case _ : MapType =>
147+ to += from.get(ordinal).asInstanceOf [String ]
148+ }
149+ }
150+
120151 def getNextRowSet (order : FetchOrientation , maxRowsL : Long ): RowSet = {
121152 validateDefaultFetchOrientation(order)
122153 assertState(OperationState .FINISHED )
@@ -133,40 +164,14 @@ private[hive] class SparkExecuteStatementOperation(
133164 val row = ArrayBuffer [Any ]()
134165 var curCol = 0
135166 while (curCol < sparkRow.length) {
136- dataTypes(curCol) match {
137- case StringType =>
138- row += sparkRow.get(curCol).asInstanceOf [String ]
139- case IntegerType =>
140- row += sparkRow.getInt(curCol)
141- case BooleanType =>
142- row += sparkRow.getBoolean(curCol)
143- case DoubleType =>
144- row += sparkRow.getDouble(curCol)
145- case FloatType =>
146- row += sparkRow.getFloat(curCol)
147- case DecimalType =>
148- row += sparkRow.get(curCol).asInstanceOf [BigDecimal ].bigDecimal
149- case LongType =>
150- row += sparkRow.getLong(curCol)
151- case ByteType =>
152- row += sparkRow.getByte(curCol)
153- case ShortType =>
154- row += sparkRow.getShort(curCol)
155- case TimestampType =>
156- row += sparkRow.get(curCol).asInstanceOf [Timestamp ]
157- case BinaryType =>
158- row += sparkRow.get(curCol).asInstanceOf [String ]
159- case _ : ArrayType =>
160- row += sparkRow.get(curCol).asInstanceOf [String ]
161- case _ : StructType =>
162- row += sparkRow.get(curCol).asInstanceOf [String ]
163- case _ : MapType =>
164- row += sparkRow.get(curCol).asInstanceOf [String ]
167+ if (sparkRow.isNullAt(curCol)) {
168+ row += null
169+ } else {
170+ row += addNonNullColumnValue(sparkRow, row, curCol)
165171 }
166172 curCol += 1
167173 }
168174 reultRowSet.addRow(row.toArray.asInstanceOf [Array [Object ]])
169- row.clear()
170175 curRow += 1
171176 }
172177 reultRowSet
0 commit comments