Skip to content

Commit dcf07d2

Browse files
committed
feat: Add comprehensive Pinot Proxy and gRPC support to pinot-spark-3-connector
🔧 Pinot Proxy Support: - Add proxy.enabled configuration option (default: false) - Implement HTTP proxy forwarding with FORWARD_HOST and FORWARD_PORT headers - Support proxy routing for all controller and broker API requests - Enable proxy-based secure cluster access where proxy is the only exposed endpoint 🚀 Comprehensive gRPC Configuration: - Add grpc.port configuration (default: 8090) - Add grpc.max-inbound-message-size configuration (default: 128MB) - Add grpc.use-plain-text configuration (default: true) - Support grpc.tls.keystore-type, grpc.tls.keystore-path, grpc.tls.keystore-password - Support grpc.tls.truststore-type, grpc.tls.truststore-path, grpc.tls.truststore-password - Add grpc.tls.ssl-provider configuration (default: JDK) - Add grpc.proxy-uri for gRPC proxy endpoint configuration 🔒 gRPC Proxy Integration: - Implement gRPC proxy support with FORWARD_HOST and FORWARD_PORT metadata - Create comprehensive GrpcUtils for channel management and proxy headers - Support secure gRPC communication through proxy infrastructure - Enable TLS/SSL configuration for gRPC connections 🏗️ Architecture Updates: - Update PinotDataSourceReadOptions with all new proxy and gRPC fields - Enhance PinotClusterClient with proxy-aware API methods - Add HttpUtils.sendGetRequestWithProxyHeaders() for proxy HTTP requests - Update PinotServerDataFetcher with gRPC proxy configuration support - Modify all Spark DataSource V2 components to pass proxy parameters 🧪 Comprehensive Testing: - Add 8 new test cases for proxy and gRPC configuration parsing - Create GrpcUtilsTest for gRPC channel creation and proxy metadata - Update existing tests to include new configuration parameters - Achieve 39/39 passing tests with full backward compatibility 📚 Enhanced Documentation: - Add comprehensive Pinot Proxy Support section with examples - Add detailed gRPC Configuration section with TLS examples - Include Security Best Practices for production deployments - Provide proxy + gRPC + HTTPS + authentication integration examples 🎯 Production Features: - Full backward compatibility - existing code works unchanged - Based on Trino PR #13015 reference implementation - Supports secure production deployments with proxy-only access - Comprehensive error handling and validation - Performance optimizations for gRPC connections All tests passing (39/39) with complete feature parity to Trino's implementation.
1 parent 6d1fd97 commit dcf07d2

File tree

22 files changed

+1123
-62
lines changed

22 files changed

+1123
-62
lines changed

pinot-connectors/pinot-spark-3-connector/README.md

Lines changed: 166 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,44 @@ val data = spark.read
6060
data.show(100)
6161
```
6262

63-
## HTTPS Configuration
63+
## Security Configuration
6464

65-
The connector supports HTTPS/TLS connections to Pinot clusters. To enable HTTPS, use the following configuration options:
65+
You can secure both HTTP and gRPC using a unified switch or explicit flags.
66+
67+
- Unified: set `secureMode=true` to enable HTTPS and gRPC TLS together (recommended)
68+
- Explicit: set `useHttps` for REST and `grpc.use-plain-text=false` for gRPC
69+
70+
### Quick examples
71+
72+
```scala
73+
// Unified secure mode (enables HTTPS + gRPC TLS by default)
74+
val data = spark.read
75+
.format("pinot")
76+
.option("table", "airlineStats")
77+
.option("tableType", "offline")
78+
.option("secureMode", "true")
79+
.load()
80+
81+
// Explicit HTTPS only (gRPC remains plaintext by default)
82+
val data = spark.read
83+
.format("pinot")
84+
.option("table", "airlineStats")
85+
.option("tableType", "offline")
86+
.option("useHttps", "true")
87+
.load()
88+
89+
// Explicit gRPC TLS only (REST remains HTTP by default)
90+
val data = spark.read
91+
.format("pinot")
92+
.option("table", "airlineStats")
93+
.option("tableType", "offline")
94+
.option("grpc.use-plain-text", "false")
95+
.load()
96+
```
97+
98+
### HTTPS Configuration
99+
100+
When HTTPS is enabled (either via `secureMode=true` or `useHttps=true`), you can configure keystore/truststore as needed:
66101

67102
```scala
68103
val data = spark.read
@@ -81,7 +116,8 @@ val data = spark.read
81116

82117
| Option | Description | Required | Default |
83118
|--------|-------------|----------|---------|
84-
| `useHttps` | Enable HTTPS connections | No | `false` |
119+
| `secureMode` | Unified switch to enable HTTPS and gRPC TLS | No | `false` |
120+
| `useHttps` | Enable HTTPS connections (overrides `secureMode` for REST) | No | `false` |
85121
| `keystorePath` | Path to client keystore file (JKS format) | No | None |
86122
| `keystorePassword` | Password for the keystore | No | None |
87123
| `truststorePath` | Path to truststore file (JKS format) | No | None |
@@ -130,6 +166,107 @@ val data = spark.read
130166

131167
**Note:** If only `authToken` is provided without `authHeader`, the connector will automatically use `Authorization: Bearer <token>`.
132168

169+
## Pinot Proxy Support
170+
171+
The connector supports Pinot Proxy for secure cluster access where the proxy is the only exposed endpoint. When proxy is enabled, all HTTP requests to controllers/brokers and gRPC requests to servers are routed through the proxy.
172+
173+
### Proxy Configuration Examples
174+
175+
```scala
176+
// Basic proxy configuration
177+
val data = spark.read
178+
.format("pinot")
179+
.option("table", "airlineStats")
180+
.option("tableType", "offline")
181+
.option("controller", "pinot-proxy:8080") // Proxy endpoint
182+
.option("proxy.enabled", "true")
183+
.load()
184+
185+
// Proxy with authentication
186+
val data = spark.read
187+
.format("pinot")
188+
.option("table", "airlineStats")
189+
.option("tableType", "offline")
190+
.option("controller", "pinot-proxy:8080")
191+
.option("proxy.enabled", "true")
192+
.option("authToken", "my-proxy-token")
193+
.load()
194+
195+
// Proxy with gRPC configuration
196+
val data = spark.read
197+
.format("pinot")
198+
.option("table", "airlineStats")
199+
.option("tableType", "offline")
200+
.option("controller", "pinot-proxy:8080")
201+
.option("proxy.enabled", "true")
202+
.option("grpc.proxy-uri", "pinot-proxy:8094") // gRPC proxy endpoint
203+
.load()
204+
```
205+
206+
### Proxy Configuration Options
207+
208+
| Option | Description | Required | Default |
209+
|--------|-------------|----------|----------|
210+
| `proxy.enabled` | Use Pinot Proxy for controller and broker requests | No | `false` |
211+
212+
**Note:** When proxy is enabled, the connector adds `FORWARD_HOST` and `FORWARD_PORT` headers to route requests to the actual Pinot services.
213+
214+
## gRPC Configuration
215+
216+
The connector supports comprehensive gRPC configuration for secure and optimized communication with Pinot servers.
217+
218+
### gRPC Configuration Examples
219+
220+
```scala
221+
// Basic gRPC configuration
222+
val data = spark.read
223+
.format("pinot")
224+
.option("table", "airlineStats")
225+
.option("tableType", "offline")
226+
.option("grpc.port", "8091")
227+
.option("grpc.max-inbound-message-size", "256000000") // 256MB
228+
.load()
229+
230+
// gRPC with TLS (explicit)
231+
val data = spark.read
232+
.format("pinot")
233+
.option("table", "airlineStats")
234+
.option("tableType", "offline")
235+
.option("grpc.use-plain-text", "false")
236+
.option("grpc.tls.keystore-path", "/path/to/grpc-keystore.jks")
237+
.option("grpc.tls.keystore-password", "keystore-password")
238+
.option("grpc.tls.truststore-path", "/path/to/grpc-truststore.jks")
239+
.option("grpc.tls.truststore-password", "truststore-password")
240+
.load()
241+
242+
// gRPC with proxy
243+
val data = spark.read
244+
.format("pinot")
245+
.option("table", "airlineStats")
246+
.option("tableType", "offline")
247+
.option("proxy.enabled", "true")
248+
.option("grpc.proxy-uri", "pinot-proxy:8094")
249+
.load()
250+
```
251+
252+
### gRPC Configuration Options
253+
254+
| Option | Description | Required | Default |
255+
|--------|-------------|----------|----------|
256+
| `grpc.port` | Pinot gRPC port | No | `8090` |
257+
| `grpc.max-inbound-message-size` | Max inbound message bytes when init gRPC client | No | `128MB` |
258+
| `grpc.use-plain-text` | Use plain text for gRPC communication (overrides `secureMode` for gRPC) | No | `true` |
259+
| `grpc.tls.keystore-type` | TLS keystore type for gRPC connection | No | `JKS` |
260+
| `grpc.tls.keystore-path` | TLS keystore file location for gRPC connection | No | None |
261+
| `grpc.tls.keystore-password` | TLS keystore password | No | None |
262+
| `grpc.tls.truststore-type` | TLS truststore type for gRPC connection | No | `JKS` |
263+
| `grpc.tls.truststore-path` | TLS truststore file location for gRPC connection | No | None |
264+
| `grpc.tls.truststore-password` | TLS truststore password | No | None |
265+
| `grpc.tls.ssl-provider` | SSL provider | No | `JDK` |
266+
| `grpc.proxy-uri` | Pinot Rest Proxy gRPC endpoint URI | No | None |
267+
268+
**Note:** When using gRPC with proxy, the connector automatically adds `FORWARD_HOST` and `FORWARD_PORT` metadata headers for proper request routing.
269+
133270
## Examples
134271

135272
There are more examples included in `src/test/scala/.../ExampleSparkPinotConnectorTest.scala`.
@@ -152,6 +289,32 @@ Spark-Pinot connector uses Spark `DatasourceV2 API`. Please check the Databricks
152289

153290
https://www.slideshare.net/databricks/apache-spark-data-source-v2-with-wenchen-fan-and-gengliang-wang
154291

292+
## Security Best Practices
293+
294+
### Production HTTPS Configuration
295+
- Always use HTTPS in production environments
296+
- Store certificates in secure locations with appropriate file permissions
297+
- Use proper certificate validation with valid truststore
298+
- Rotate certificates regularly
299+
300+
### Production Authentication
301+
- Use service accounts with minimal required permissions
302+
- Store authentication tokens securely (environment variables, secret management systems)
303+
- Implement token rotation policies
304+
- Monitor authentication failures
305+
306+
### Production gRPC Configuration
307+
- Enable TLS for gRPC communication in production
308+
- Use certificate-based authentication when possible
309+
- Configure appropriate message size limits based on your data
310+
- Use connection pooling for high-throughput scenarios
311+
312+
### Production Proxy Configuration
313+
- Ensure proxy endpoints are properly secured
314+
- Monitor proxy health and performance
315+
- Implement proper request routing and load balancing
316+
- Use authentication for proxy access
317+
155318
## Future Works
156319
- Add integration tests for read operation
157320
- Add write support(pinot segment write logic will be changed in later versions of pinot)

pinot-connectors/pinot-spark-3-connector/documentation/read_model.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,14 @@ val df = spark.read
132132
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------- |-------------------------------------------------------|
133133
| table | Pinot table name without table type | Yes | - |
134134
| tableType | Pinot table type(`realtime`, `offline` or `hybrid`) | Yes | - |
135-
| controller | Pinot controller url and port. Input should be `url:port` format without schema. Connector does not support `https` schema for now. | No | localhost:9000 |
136-
| broker | Pinot broker url and port. Input should be `url:port` format without schema. If not specified, connector will find broker instances of table automatically. Connector does not support `https` schema for now | No | Fetch broker instances of table from Pinot Controller |
135+
| controller | Pinot controller `host:port` (schema inferred from `useHttps`/`secureMode`) | No | localhost:9000 |
136+
| broker | Pinot broker `host:port` (schema inferred from `useHttps`/`secureMode`) | No | Fetch broker instances of table from Pinot Controller |
137137
| usePushDownFilters | Push filters to pinot servers or not. If true, data exchange between pinot server and spark will be minimized. | No | true |
138138
| segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 |
139139
| pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
140140
| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
141141
| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |
142142
| failOnInvalidSegments | Fail the read operation if response metadata indicates invalid segments | No | false |
143+
| secureMode | Unified switch to enable HTTPS and gRPC TLS (explicit `useHttps`/`grpc.use-plain-text` take precedence) | No | false |
144+
| useHttps | Enable HTTPS for REST calls (overrides `secureMode` for REST) | No | false |
145+
| grpc.use-plain-text | Use plaintext for gRPC (overrides `secureMode` for gRPC) | No | true |
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<!--
2+
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
20+
-->
21+
# Pinot Spark 3 Connector Example: Reading from Pinot via Proxy with Auth Token
22+
23+
This example demonstrates how to use the Pinot Spark 3 Connector to read data from a Pinot cluster via a proxy with authentication token support.
24+
25+
## Prerequisites
26+
27+
- Apache Spark 3.x installed and `spark-shell` available in your PATH.
28+
29+
- Setup PINOT_HOME env variable:
30+
```
31+
export PINOT_HOME=/path/to/pinot
32+
```
33+
34+
- The Pinot Spark 3 Connector shaded JAR built and available at:
35+
```
36+
$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar
37+
```
38+
39+
- Example Scala script located at:
40+
```
41+
$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala
42+
```
43+
44+
## How to Run
45+
46+
Launch the example in `spark-shell` with the following command:
47+
48+
```bash
49+
spark-shell --master 'local[*]' --name read-pinot --jars "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/target/pinot-spark-3-connector-*-shaded.jar" < "$PINOT_HOME/pinot-connectors/pinot-spark-3-connector/examples/read_pinot_from_proxy_with_auth_token.scala"
50+
```
51+
52+
53+
54+
55+
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
import org.apache.spark.sql.SparkSession
20+
21+
val spark = SparkSession.builder().appName("read-pinot-airlineStats").master("local[*]").getOrCreate()
22+
23+
val df = spark.read.
24+
format("org.apache.pinot.connector.spark.v3.datasource.PinotDataSource").
25+
option("table", "myTable").
26+
option("tableType", "offline").
27+
option("controller", "pinot-controller").
28+
option("broker", "pinot-broker").
29+
option("secureMode", "true").
30+
option("authToken", "st-xxxxxxx").
31+
option("proxy.enabled", "true").
32+
option("grpc.proxy-uri", "grpc-pinot-proxy").
33+
option("useGrpcServer", "true").
34+
load()
35+
36+
println("Schema:")
37+
df.printSchema()
38+
39+
println("Sample rows:")
40+
df.show(10, truncate = false)
41+
42+
println(s"Total rows: ${df.count()}")
43+
44+
spark.stop()

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/DataExtractor.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@ private[pinot] object DataExtractor {
5454
case FieldSpec.DataType.LONG => LongType
5555
case FieldSpec.DataType.FLOAT => FloatType
5656
case FieldSpec.DataType.DOUBLE => DoubleType
57+
case FieldSpec.DataType.BIG_DECIMAL => DecimalType(38, 18)
5758
case FieldSpec.DataType.STRING => StringType
5859
case FieldSpec.DataType.BYTES => ArrayType(ByteType)
5960
case FieldSpec.DataType.TIMESTAMP => LongType
6061
case FieldSpec.DataType.BOOLEAN => BooleanType
62+
case FieldSpec.DataType.JSON => StringType
6163
case _ =>
6264
throw PinotException(s"Unsupported pinot data type '$dataType")
6365
}
@@ -116,6 +118,21 @@ private[pinot] object DataExtractor {
116118
dataTable.getFloat(rowIndex, colIndex)
117119
case ColumnDataType.DOUBLE =>
118120
dataTable.getDouble(rowIndex, colIndex)
121+
case ColumnDataType.BIG_DECIMAL =>
122+
val bd = dataTable.getBigDecimal(rowIndex, colIndex)
123+
if (bd == null) null
124+
else {
125+
// Derive precision/scale from actual value; clamp to Spark's max precision
126+
val precision = math.min(DecimalType.MAX_PRECISION, bd.precision())
127+
val scale = math.min(DecimalType.MAX_SCALE, math.max(0, bd.scale()))
128+
val scaled =
129+
if (bd.scale() == scale) bd
130+
else bd.setScale(scale, java.math.RoundingMode.HALF_UP)
131+
Decimal(scaled, precision, scale)
132+
}
133+
case ColumnDataType.JSON =>
134+
// Use underlying string
135+
UTF8String.fromString(dataTable.getString(rowIndex, colIndex))
119136
case ColumnDataType.TIMESTAMP =>
120137
dataTable.getLong(rowIndex, colIndex)
121138
case ColumnDataType.BOOLEAN =>

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class PinotDataSource extends TableProvider with DataSourceRegister {
4040
val controller = readParameters.controller
4141

4242
val pinotTableSchema =
43-
PinotClusterClient.getTableSchema(controller, tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
43+
PinotClusterClient.getTableSchema(controller, tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
4444
DataExtractor.pinotSchemaToSparkSchema(pinotTableSchema)
4545
}
4646

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ class PinotScan(
5454
override def toBatch: Batch = this
5555

5656
override def planInputPartitions(): Array[InputPartition] = {
57-
val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, query, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
57+
val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, query, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
5858

5959
val instanceInfo : Map[String, InstanceInfo] = Map()
6060
val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips
6161
instanceInfo.getOrElseUpdate(
6262
instance,
63-
PinotClusterClient.getInstanceInfo(readParameters.controller, instance, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
63+
PinotClusterClient.getInstanceInfo(readParameters.controller, instance, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
6464
)
6565
}
6666

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class PinotScanBuilder(readParameters: PinotDataSourceReadOptions)
4545
if (readParameters.tableType.isDefined) {
4646
None
4747
} else {
48-
PinotClusterClient.getTimeBoundaryInfo(readParameters.broker, readParameters.tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken)
48+
PinotClusterClient.getTimeBoundaryInfo(readParameters.broker, readParameters.tableName, readParameters.useHttps, readParameters.authHeader, readParameters.authToken, readParameters.proxyEnabled)
4949
}
5050

5151
val whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(this.acceptedFilters)

pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/SparkToPinotTypeTranslator.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ object SparkToPinotTypeTranslator {
5858
case _: LongType => FieldSpec.DataType.LONG
5959
case _: FloatType => FieldSpec.DataType.FLOAT
6060
case _: DoubleType => FieldSpec.DataType.DOUBLE
61+
case _: DecimalType => FieldSpec.DataType.BIG_DECIMAL
6162
case _: BooleanType => FieldSpec.DataType.BOOLEAN
6263
case _: BinaryType => FieldSpec.DataType.BYTES
6364
case _: TimestampType => FieldSpec.DataType.LONG

0 commit comments

Comments
 (0)