Skip to content

Commit 51606a0

Browse files
committed
feat: Add comprehensive Pinot Proxy and gRPC support to pinot-spark-3-connector
🔧 Pinot Proxy Support: - Add proxy.enabled configuration option (default: false) - Implement HTTP proxy forwarding with FORWARD_HOST and FORWARD_PORT headers - Support proxy routing for all controller and broker API requests - Enable proxy-based secure cluster access where proxy is the only exposed endpoint 🚀 Comprehensive gRPC Configuration: - Add grpc.port configuration (default: 8090) - Add grpc.max-inbound-message-size configuration (default: 128MB) - Add grpc.use-plain-text configuration (default: true) - Support grpc.tls.keystore-type, grpc.tls.keystore-path, grpc.tls.keystore-password - Support grpc.tls.truststore-type, grpc.tls.truststore-path, grpc.tls.truststore-password - Add grpc.tls.ssl-provider configuration (default: JDK) - Add grpc.proxy-uri for gRPC proxy endpoint configuration 🔒 gRPC Proxy Integration: - Implement gRPC proxy support with FORWARD_HOST and FORWARD_PORT metadata - Create comprehensive GrpcUtils for channel management and proxy headers - Support secure gRPC communication through proxy infrastructure - Enable TLS/SSL configuration for gRPC connections 🏗️ Architecture Updates: - Update PinotDataSourceReadOptions with all new proxy and gRPC fields - Enhance PinotClusterClient with proxy-aware API methods - Add HttpUtils.sendGetRequestWithProxyHeaders() for proxy HTTP requests - Update PinotServerDataFetcher with gRPC proxy configuration support - Modify all Spark DataSource V2 components to pass proxy parameters 🧪 Comprehensive Testing: - Add 8 new test cases for proxy and gRPC configuration parsing - Create GrpcUtilsTest for gRPC channel creation and proxy metadata - Update existing tests to include new configuration parameters - Achieve 39/39 passing tests with full backward compatibility 📚 Enhanced Documentation: - Add comprehensive Pinot Proxy Support section with examples - Add detailed gRPC Configuration section with TLS examples - Include Security Best Practices for production deployments - Provide proxy + gRPC + HTTPS + authentication integration examples 🎯 Production Features: - Full backward compatibility - existing code works unchanged - Based on Trino PR #13015 reference implementation - Supports secure production deployments with proxy-only access - Comprehensive error handling and validation - Performance optimizations for gRPC connections All tests passing (39/39) with complete feature parity to Trino's implementation.
1 parent 6d1fd97 commit 51606a0

File tree

20 files changed

+1017
-45
lines changed

20 files changed

+1017
-45
lines changed

pinot-connectors/pinot-spark-3-connector/README.md

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,107 @@ val data = spark.read
130130

131131
**Note:** If only `authToken` is provided without `authHeader`, the connector will automatically use `Authorization: Bearer <token>`.
132132

133+
## Pinot Proxy Support
134+
135+
The connector supports Pinot Proxy for secure cluster access where the proxy is the only exposed endpoint. When proxy is enabled, all HTTP requests to controllers/brokers and gRPC requests to servers are routed through the proxy.
136+
137+
### Proxy Configuration Examples
138+
139+
```scala
140+
// Basic proxy configuration
141+
val data = spark.read
142+
.format("pinot")
143+
.option("table", "airlineStats")
144+
.option("tableType", "offline")
145+
.option("controller", "pinot-proxy:8080") // Proxy endpoint
146+
.option("proxy.enabled", "true")
147+
.load()
148+
149+
// Proxy with authentication
150+
val data = spark.read
151+
.format("pinot")
152+
.option("table", "airlineStats")
153+
.option("tableType", "offline")
154+
.option("controller", "pinot-proxy:8080")
155+
.option("proxy.enabled", "true")
156+
.option("authToken", "my-proxy-token")
157+
.load()
158+
159+
// Proxy with gRPC configuration
160+
val data = spark.read
161+
.format("pinot")
162+
.option("table", "airlineStats")
163+
.option("tableType", "offline")
164+
.option("controller", "pinot-proxy:8080")
165+
.option("proxy.enabled", "true")
166+
.option("grpc.proxy-uri", "pinot-proxy:8094") // gRPC proxy endpoint
167+
.load()
168+
```
169+
170+
### Proxy Configuration Options
171+
172+
| Option | Description | Required | Default |
173+
|--------|-------------|----------|----------|
174+
| `proxy.enabled` | Use Pinot Proxy for controller and broker requests | No | `false` |
175+
176+
**Note:** When proxy is enabled, the connector adds `FORWARD_HOST` and `FORWARD_PORT` headers to route requests to the actual Pinot services.
177+
178+
## gRPC Configuration
179+
180+
The connector supports comprehensive gRPC configuration for secure and optimized communication with Pinot servers.
181+
182+
### gRPC Configuration Examples
183+
184+
```scala
185+
// Basic gRPC configuration
186+
val data = spark.read
187+
.format("pinot")
188+
.option("table", "airlineStats")
189+
.option("tableType", "offline")
190+
.option("grpc.port", "8091")
191+
.option("grpc.max-inbound-message-size", "256000000") // 256MB
192+
.load()
193+
194+
// gRPC with TLS
195+
val data = spark.read
196+
.format("pinot")
197+
.option("table", "airlineStats")
198+
.option("tableType", "offline")
199+
.option("grpc.use-plain-text", "false")
200+
.option("grpc.tls.keystore-path", "/path/to/grpc-keystore.jks")
201+
.option("grpc.tls.keystore-password", "keystore-password")
202+
.option("grpc.tls.truststore-path", "/path/to/grpc-truststore.jks")
203+
.option("grpc.tls.truststore-password", "truststore-password")
204+
.load()
205+
206+
// gRPC with proxy
207+
val data = spark.read
208+
.format("pinot")
209+
.option("table", "airlineStats")
210+
.option("tableType", "offline")
211+
.option("proxy.enabled", "true")
212+
.option("grpc.proxy-uri", "pinot-proxy:8094")
213+
.load()
214+
```
215+
216+
### gRPC Configuration Options
217+
218+
| Option | Description | Required | Default |
219+
|--------|-------------|----------|----------|
220+
| `grpc.port` | Pinot gRPC port | No | `8090` |
221+
| `grpc.max-inbound-message-size` | Max inbound message bytes when init gRPC client | No | `128MB` |
222+
| `grpc.use-plain-text` | Use plain text for gRPC communication | No | `true` |
223+
| `grpc.tls.keystore-type` | TLS keystore type for gRPC connection | No | `JKS` |
224+
| `grpc.tls.keystore-path` | TLS keystore file location for gRPC connection | No | None |
225+
| `grpc.tls.keystore-password` | TLS keystore password | No | None |
226+
| `grpc.tls.truststore-type` | TLS truststore type for gRPC connection | No | `JKS` |
227+
| `grpc.tls.truststore-path` | TLS truststore file location for gRPC connection | No | None |
228+
| `grpc.tls.truststore-password` | TLS truststore password | No | None |
229+
| `grpc.tls.ssl-provider` | SSL provider | No | `JDK` |
230+
| `grpc.proxy-uri` | Pinot Rest Proxy gRPC endpoint URI | No | None |
231+
232+
**Note:** When using gRPC with proxy, the connector automatically adds `FORWARD_HOST` and `FORWARD_PORT` metadata headers for proper request routing.
233+
133234
## Examples
134235

135236
There are more examples included in `src/test/scala/.../ExampleSparkPinotConnectorTest.scala`.
@@ -152,6 +253,32 @@ Spark-Pinot connector uses Spark `DatasourceV2 API`. Please check the Databricks
152253

153254
https://www.slideshare.net/databricks/apache-spark-data-source-v2-with-wenchen-fan-and-gengliang-wang
154255

256+
## Security Best Practices
257+
258+
### Production HTTPS Configuration
259+
- Always use HTTPS in production environments
260+
- Store certificates in secure locations with appropriate file permissions
261+
- Use proper certificate validation with valid truststore
262+
- Rotate certificates regularly
263+
264+
### Production Authentication
265+
- Use service accounts with minimal required permissions
266+
- Store authentication tokens securely (environment variables, secret management systems)
267+
- Implement token rotation policies
268+
- Monitor authentication failures
269+
270+
### Production gRPC Configuration
271+
- Enable TLS for gRPC communication in production
272+
- Use certificate-based authentication when possible
273+
- Configure appropriate message size limits based on your data
274+
- Use connection pooling for high-throughput scenarios
275+
276+
### Production Proxy Configuration
277+
- Ensure proxy endpoints are properly secured
278+
- Monitor proxy health and performance
279+
- Implement proper request routing and load balancing
280+
- Use authentication for proxy access
281+
155282
## Future Works
156283
- Add integration tests for read operation
157284
- Add write support(pinot segment write logic will be changed in later versions of pinot)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
# Pinot Spark 3 Connector Example: Reading from Pinot via Proxy with Auth Token
22+
23+
This example demonstrates how to use the Pinot Spark 3 Connector to read data from a Pinot cluster via a proxy with authentication token support.
24+
25+
## Prerequisites
26+
27+
- Apache Spark 3.x installed and `spark-shell` available in your PATH.
28+
29+
- Setup PINOT_HOME env variable:
30+
```
31+
export PINOT_HOME=/path/to/pinot
32+
```
33+
34+
- The Pinot Spark 3 Connector shaded JAR built and available at:
35+
```
36+
$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar
37+
```
38+
39+
- Example Scala script located at:
40+
```
41+
$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala
42+
```
43+
44+
## How to Run
45+
46+
Launch the example in `spark-shell` with the following command:
47+
48+
```bash
49+
spark-shell --master 'local[*]' --name read-pinot --jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"
50+
```
51+
52+
53+
54+
55+
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
import org.apache.spark.sql.SparkSession
20+
21+
val spark = SparkSession.builder().appName("read-pinot-airlineStats").master("local[*]").getOrCreate()
22+
23+
val df = spark.read.
24+
format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
25+
option("table", "myTable").
26+
option("tableType", "offline").
27+
option("controller", "pinot-controller").
28+
option("broker", "pinot-broker").
29+
option("useHttps", "true").
30+
option("authToken", "st-xxxxxxx").
31+
option("proxy.enabled", "true").
32+
option("grpc.proxy-uri", "grpc-pinot-proxy").
33+
option("useGrpcServer", "true").
34+
option("grpc.use-plain-text", "false").
35+
load()
36+
37+
println("Schema:")
38+
df.printSchema()
39+
40+
println("Sample rows:")
41+
df.show(10, truncate = false)
42+
43+
println(s"Total rows: ${df.count()}")
44+
45+
spark.stop()

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/DataExtractor.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@ private[pinot] object DataExtractor {
5454
case FieldSpec.DataType.LONG => LongType
5555
case FieldSpec.DataType.FLOAT => FloatType
5656
case FieldSpec.DataType.DOUBLE => DoubleType
57+
case FieldSpec.DataType.BIG_DECIMAL => DecimalType(38, 18)
5758
case FieldSpec.DataType.STRING => StringType
5859
case FieldSpec.DataType.BYTES => ArrayType(ByteType)
5960
case FieldSpec.DataType.TIMESTAMP => LongType
6061
case FieldSpec.DataType.BOOLEAN => BooleanType
62+
case FieldSpec.DataType.JSON => StringType
6163
case _ =>
6264
throw PinotException(s"Unsupported pinot data type '$dataType")
6365
}
@@ -116,6 +118,13 @@ private[pinot] object DataExtractor {
116118
dataTable.getFloat(rowIndex, colIndex)
117119
case ColumnDataType.DOUBLE =>
118120
dataTable.getDouble(rowIndex, colIndex)
121+
case ColumnDataType.BIG_DECIMAL =>
122+
val bd = dataTable.getBigDecimal(rowIndex, colIndex)
123+
// Map to Spark Decimal(38,18) by rescaling if needed; null-safe
124+
if (bd == null) null else Decimal(bd.setScale(18, java.math.RoundingMode.HALF_UP), 38, 18)
125+
case ColumnDataType.JSON =>
126+
// Use underlying string
127+
UTF8String.fromString(dataTable.getString(rowIndex, colIndex))
119128
case ColumnDataType.TIMESTAMP =>
120129
dataTable.getLong(rowIndex, colIndex)
121130
case ColumnDataType.BOOLEAN =>

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class PinotDataSource extends TableProvider with DataSourceRegister {
4040
val controller = readParameters.controller
4141

4242
val pinotTableSchema =
43-
PinotClusterClient.getTableSchema(controller, tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
43+
PinotClusterClient.getTableSchema(controller, tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
4444
DataExtractor.pinotSchemaToSparkSchema(pinotTableSchema)
4545
}
4646

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ class PinotScan(
5454
override def toBatch: Batch = this
5555

5656
override def planInputPartitions(): Array[InputPartition] = {
57-
val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, query, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
57+
val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, query, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
5858

5959
val instanceInfo : Map[String, InstanceInfo] = Map()
6060
val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips
6161
instanceInfo.getOrElseUpdate(
6262
instance,
63-
PinotClusterClient.getInstanceInfo(readParameters.controller, instance, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
63+
PinotClusterClient.getInstanceInfo(readParameters.controller, instance, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
6464
)
6565
}
6666

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class PinotScanBuilder(readParameters: PinotDataSourceReadOptions)
4545
if (readParameters.tableType.isDefined) {
4646
None
4747
} else {
48-
PinotClusterClient.getTimeBoundaryInfo(readParameters.broker, readParameters.tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
48+
PinotClusterClient.getTimeBoundaryInfo(readParameters.broker, readParameters.tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
4949
}
5050

5151
val whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(this.acceptedFilters)

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ object SparkToPinotTypeTranslator {
5858
case _: LongType => FieldSpec.DataType.LONG
5959
case _: FloatType => FieldSpec.DataType.FLOAT
6060
case _: DoubleType => FieldSpec.DataType.DOUBLE
61+
case _: DecimalType => FieldSpec.DataType.BIG_DECIMAL
6162
case _: BooleanType => FieldSpec.DataType.BOOLEAN
6263
case _: BinaryType => FieldSpec.DataType.BYTES
6364
case _: TimestampType => FieldSpec.DataType.LONG

pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/DataExtractorTest.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.unsafe.types.UTF8String
3131
import org.roaringbitmap.RoaringBitmap
3232

3333
import scala.io.Source
34+
import java.math.{BigDecimal => JBigDecimal, RoundingMode}
3435

3536
/**
3637
* Test pinot/spark conversions like schema, data table etc.
@@ -201,6 +202,18 @@ class DataExtractorTest extends BaseTest {
201202
resultSchema.fields should contain theSameElementsAs sparkSchema.fields
202203
}
203204

205+
test("Pinot schema to Spark schema should map BIG_DECIMAL and JSON correctly") {
206+
val pinotSchema = new Schema.SchemaBuilder()
207+
.addSingleValueDimension("decCol", org.apache.pinot.spi.data.FieldSpec.DataType.BIG_DECIMAL)
208+
.addSingleValueDimension("jsonCol", org.apache.pinot.spi.data.FieldSpec.DataType.JSON)
209+
.build()
210+
211+
val sparkSchema = DataExtractor.pinotSchemaToSparkSchema(pinotSchema)
212+
213+
sparkSchema.apply("decCol").dataType shouldEqual DecimalType(38, 18)
214+
sparkSchema.apply("jsonCol").dataType shouldEqual StringType
215+
}
216+
204217
test("Should fail if configured when metadata indicates invalid segments") {
205218
val columnNames = Array(
206219
"strCol"
@@ -234,4 +247,38 @@ class DataExtractorTest extends BaseTest {
234247
DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false).head
235248
result.getString(0) shouldEqual "strValue"
236249
}
250+
251+
test("DataExtractor should handle BIG_DECIMAL and JSON column types") {
252+
val columnNames = Array(
253+
"decCol",
254+
"jsonCol"
255+
)
256+
val columnTypes = Array(
257+
ColumnDataType.BIG_DECIMAL,
258+
ColumnDataType.JSON
259+
)
260+
val dataSchema = new DataSchema(columnNames, columnTypes)
261+
262+
val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
263+
dataTableBuilder.startRow()
264+
val inputDecimal = new JBigDecimal("123.456789")
265+
dataTableBuilder.setColumn(0, inputDecimal)
266+
val jsonString = "{" + "\"a\":1," + "\"b\":\"x\"}" // {"a":1,"b":"x"}
267+
dataTableBuilder.setColumn(1, jsonString)
268+
dataTableBuilder.finishRow()
269+
val dataTable = dataTableBuilder.build()
270+
271+
val sparkSchema = StructType(
272+
Seq(
273+
StructField("decCol", DecimalType(38, 18)),
274+
StructField("jsonCol", StringType)
275+
)
276+
)
277+
278+
val result = DataExtractor.pinotDataTableToInternalRows(dataTable, sparkSchema, failOnInvalidSegments = false).head
279+
280+
val expectedScaled = inputDecimal.setScale(18, RoundingMode.HALF_UP)
281+
result.getDecimal(0, 38, 18).toJavaBigDecimal shouldEqual expectedScaled
282+
result.getString(1) shouldEqual jsonString
283+
}
237284
}

pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslatorTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
3232
(LongType, FieldSpec.DataType.LONG),
3333
(FloatType, FieldSpec.DataType.FLOAT),
3434
(DoubleType, FieldSpec.DataType.DOUBLE),
35+
(DecimalType(38, 18), FieldSpec.DataType.BIG_DECIMAL),
3536
(BooleanType, FieldSpec.DataType.BOOLEAN),
3637
(BinaryType, FieldSpec.DataType.BYTES),
3738
(TimestampType, FieldSpec.DataType.LONG),
@@ -67,6 +68,7 @@ class SparkToPinotTypeTranslatorTest extends AnyFunSuite {
6768
(ArrayType(LongType), FieldSpec.DataType.LONG),
6869
(ArrayType(FloatType), FieldSpec.DataType.FLOAT),
6970
(ArrayType(DoubleType), FieldSpec.DataType.DOUBLE),
71+
(ArrayType(DecimalType(38, 18)), FieldSpec.DataType.BIG_DECIMAL),
7072
(ArrayType(BooleanType), FieldSpec.DataType.BOOLEAN),
7173
(ArrayType(BinaryType), FieldSpec.DataType.BYTES),
7274
(ArrayType(TimestampType), FieldSpec.DataType.LONG),

0 commit comments

Comments
 (0)