26
26
import com .logicalclocks .hsfs .OnDemandFeatureGroup ;
27
27
import com .logicalclocks .hsfs .Split ;
28
28
import com .logicalclocks .hsfs .StorageConnector ;
29
- import com .logicalclocks .hsfs .StorageConnectorType ;
30
29
import com .logicalclocks .hsfs .TimeTravelFormat ;
31
30
import com .logicalclocks .hsfs .TrainingDataset ;
32
31
import com .logicalclocks .hsfs .metadata .OnDemandOptions ;
32
+ import com .logicalclocks .hsfs .metadata .Option ;
33
33
import com .logicalclocks .hsfs .util .Constants ;
34
34
import lombok .Getter ;
35
35
import org .apache .hadoop .fs .Path ;
@@ -80,7 +80,7 @@ public Dataset<Row> sql(String query) {
80
80
}
81
81
82
82
public Dataset <Row > jdbc (String query , StorageConnector storageConnector ) throws FeatureStoreException {
83
- Map <String , String > readOptions = storageConnector .getSparkOptions ();
83
+ Map <String , String > readOptions = storageConnector .getSparkOptionsInt ();
84
84
if (!Strings .isNullOrEmpty (query )) {
85
85
readOptions .put ("query" , query );
86
86
}
@@ -124,42 +124,6 @@ public void registerHudiTemporaryTable(FeatureGroup featureGroup, String alias,
124
124
leftFeaturegroupStartTimestamp , leftFeaturegroupEndTimestamp , readOptions );
125
125
}
126
126
127
- public void configureConnector (StorageConnector storageConnector ) {
128
- if (storageConnector .getStorageConnectorType () == StorageConnectorType .S3 ) {
129
- configureS3Connector (storageConnector );
130
- }
131
- }
132
-
133
- public static String sparkPath (String path ) {
134
- if (path .startsWith (Constants .S3_SCHEME )) {
135
- return path .replaceFirst (Constants .S3_SCHEME , Constants .S3_SPARK_SCHEME );
136
- }
137
- return path ;
138
- }
139
-
140
- private void configureS3Connector (StorageConnector storageConnector ) {
141
- if (!Strings .isNullOrEmpty (storageConnector .getAccessKey ())
142
- && Strings .isNullOrEmpty (storageConnector .getSessionToken ())) {
143
- sparkSession .conf ().set (Constants .S3_ACCESS_KEY_ENV , storageConnector .getAccessKey ());
144
- sparkSession .conf ().set (Constants .S3_SECRET_KEY_ENV , storageConnector .getSecretKey ());
145
- }
146
- if (!Strings .isNullOrEmpty (storageConnector .getSessionToken ())) {
147
- sparkSession .conf ().set (Constants .S3_CREDENTIAL_PROVIDER_ENV , Constants .S3_TEMPORARY_CREDENTIAL_PROVIDER );
148
- sparkSession .conf ().set (Constants .S3_ACCESS_KEY_ENV , storageConnector .getAccessKey ());
149
- sparkSession .conf ().set (Constants .S3_SECRET_KEY_ENV , storageConnector .getSecretKey ());
150
- sparkSession .conf ().set (Constants .S3_SESSION_KEY_ENV , storageConnector .getSessionToken ());
151
- }
152
- if (!Strings .isNullOrEmpty (storageConnector .getServerEncryptionAlgorithm ())) {
153
- sparkSession .conf ().set (
154
- "fs.s3a.server-side-encryption-algorithm" ,
155
- storageConnector .getServerEncryptionAlgorithm ()
156
- );
157
- }
158
- if (!Strings .isNullOrEmpty (storageConnector .getServerEncryptionKey ())) {
159
- sparkSession .conf ().set ("fs.s3a.server-side-encryption.key" , storageConnector .getServerEncryptionKey ());
160
- }
161
- }
162
-
163
127
/**
164
128
* Setup Spark to write the data on the File System.
165
129
*
@@ -171,9 +135,8 @@ private void configureS3Connector(StorageConnector storageConnector) {
171
135
public void write (TrainingDataset trainingDataset , Dataset <Row > dataset ,
172
136
Map <String , String > writeOptions , SaveMode saveMode ) {
173
137
174
- if (trainingDataset .getStorageConnector () != null ) {
175
- SparkEngine .getInstance ().configureConnector (trainingDataset .getStorageConnector ());
176
- }
138
+ setupConnectorHadoopConf (trainingDataset .getStorageConnector ());
139
+
177
140
if (trainingDataset .getSplits () == null ) {
178
141
// Write a single dataset
179
142
@@ -296,10 +259,7 @@ private void writeSingle(Dataset<Row> dataset, DataFormat dataFormat,
296
259
// OnDemand Feature Group in TFRecords format. However Spark does not use an enum but a string.
297
260
public Dataset <Row > read (StorageConnector storageConnector , String dataFormat ,
298
261
Map <String , String > readOptions , String path ) {
299
-
300
- if (storageConnector .getStorageConnectorType () == StorageConnectorType .S3 ) {
301
- configureS3Connector (storageConnector );
302
- }
262
+ setupConnectorHadoopConf (storageConnector );
303
263
304
264
return SparkEngine .getInstance ().getSparkSession ()
305
265
.read ()
@@ -322,7 +282,7 @@ public Dataset<Row> read(StorageConnector storageConnector, String dataFormat,
322
282
public Map <String , String > getOnlineOptions (Map <String , String > providedWriteOptions ,
323
283
FeatureGroup featureGroup ,
324
284
StorageConnector storageConnector ) throws FeatureStoreException {
325
- Map <String , String > writeOptions = storageConnector .getSparkOptions ();
285
+ Map <String , String > writeOptions = storageConnector .getSparkOptionsInt ();
326
286
writeOptions .put (Constants .JDBC_TABLE , utils .getFgName (featureGroup ));
327
287
328
288
// add user provided configuration
@@ -400,4 +360,62 @@ public String profile(Dataset<Row> df, boolean correlation, boolean histogram) {
400
360
public String profile (Dataset <Row > df ) {
401
361
return profile (df , null , true , true );
402
362
}
363
+
364
+ public void setupConnectorHadoopConf (StorageConnector storageConnector ) {
365
+ if (storageConnector == null ) {
366
+ return ;
367
+ }
368
+
369
+ switch (storageConnector .getStorageConnectorType ()) {
370
+ case S3 :
371
+ setupS3ConnectorHadoopConf (storageConnector );
372
+ break ;
373
+ case ADLS :
374
+ setupAdlsConnectorHadoopConf (storageConnector );
375
+ break ;
376
+ default :
377
+ // No-OP
378
+ break ;
379
+ }
380
+ }
381
+
382
+ public static String sparkPath (String path ) {
383
+ if (path .startsWith (Constants .S3_SCHEME )) {
384
+ return path .replaceFirst (Constants .S3_SCHEME , Constants .S3_SPARK_SCHEME );
385
+ }
386
+ return path ;
387
+ }
388
+
389
+ private void setupS3ConnectorHadoopConf (StorageConnector storageConnector ) {
390
+ if (!Strings .isNullOrEmpty (storageConnector .getAccessKey ())) {
391
+ sparkSession .sparkContext ().hadoopConfiguration ()
392
+ .set (Constants .S3_ACCESS_KEY_ENV , storageConnector .getAccessKey ());
393
+ }
394
+ if (!Strings .isNullOrEmpty (storageConnector .getSecretKey ())) {
395
+ sparkSession .sparkContext ().hadoopConfiguration ()
396
+ .set (Constants .S3_SECRET_KEY_ENV , storageConnector .getSecretKey ());
397
+ }
398
+ if (!Strings .isNullOrEmpty (storageConnector .getServerEncryptionAlgorithm ())) {
399
+ sparkSession .sparkContext ().hadoopConfiguration ().set (
400
+ "fs.s3a.server-side-encryption-algorithm" ,
401
+ storageConnector .getServerEncryptionAlgorithm ()
402
+ );
403
+ }
404
+ if (!Strings .isNullOrEmpty (storageConnector .getServerEncryptionKey ())) {
405
+ sparkSession .sparkContext ().hadoopConfiguration ()
406
+ .set ("fs.s3a.server-side-encryption.key" , storageConnector .getServerEncryptionKey ());
407
+ }
408
+ if (!Strings .isNullOrEmpty (storageConnector .getSessionToken ())) {
409
+ sparkSession .sparkContext ().hadoopConfiguration ()
410
+ .set (Constants .S3_CREDENTIAL_PROVIDER_ENV , Constants .S3_TEMPORARY_CREDENTIAL_PROVIDER );
411
+ sparkSession .sparkContext ().hadoopConfiguration ()
412
+ .set (Constants .S3_SESSION_KEY_ENV , storageConnector .getSessionToken ());
413
+ }
414
+ }
415
+
416
+ private void setupAdlsConnectorHadoopConf (StorageConnector storageConnector ) {
417
+ for (Option confOption : storageConnector .getSparkOptions ()) {
418
+ sparkSession .sparkContext ().hadoopConfiguration ().set (confOption .getName (), confOption .getValue ());
419
+ }
420
+ }
403
421
}
0 commit comments