@@ -34,9 +34,10 @@ def conn(self) -> SparkSession:
34
34
def read (
35
35
self ,
36
36
format : str ,
37
- options : Dict [ str , Any ] ,
37
+ path : Optional [ Union [ str , List [ str ]]] = None ,
38
38
schema : Optional [StructType ] = None ,
39
39
stream : bool = False ,
40
+ ** options : Any ,
40
41
) -> DataFrame :
41
42
"""Use the SparkSession.read interface to load data into a dataframe.
42
43
@@ -45,24 +46,27 @@ def read(
45
46
46
47
Args:
47
48
format: string with the format to be used by the DataframeReader.
48
- options: options to setup the DataframeReader .
49
+ path: optional string or a list of string for file-system .
49
50
stream: flag to indicate if data must be read in stream mode.
50
51
schema: an optional pyspark.sql.types.StructType for the input schema.
52
+ options: options to setup the DataframeReader.
51
53
52
54
Returns:
53
55
Dataframe
54
56
55
57
"""
56
58
if not isinstance (format , str ):
57
59
raise ValueError ("format needs to be a string with the desired read format" )
58
- if not isinstance (options , dict ):
59
- raise ValueError ("options needs to be a dict with the setup configurations " )
60
+ if not isinstance (path , ( str , list ) ):
61
+ raise ValueError ("path needs to be a string or a list of string " )
60
62
61
63
df_reader : Union [
62
64
DataStreamReader , DataFrameReader
63
65
] = self .conn .readStream if stream else self .conn .read
66
+
64
67
df_reader = df_reader .schema (schema ) if schema else df_reader
65
- return df_reader .format (format ).options (** options ).load ()
68
+
69
+ return df_reader .format (format ).load (path , ** options ) # type: ignore
66
70
67
71
def read_table (self , table : str , database : str = None ) -> DataFrame :
68
72
"""Use the SparkSession.read interface to read a metastore table.
@@ -223,3 +227,47 @@ def create_temporary_view(self, dataframe: DataFrame, name: str) -> Any:
223
227
if not dataframe .isStreaming :
224
228
return dataframe .createOrReplaceTempView (name )
225
229
return dataframe .writeStream .format ("memory" ).queryName (name ).start ()
230
+
231
+ def add_table_partitions (
232
+ self , partitions : List [Dict [str , Any ]], table : str , database : str = None
233
+ ) -> None :
234
+ """Add partitions to an existing table.
235
+
236
+ Args:
237
+ partitions: partitions to add to the table.
238
+ It's expected a list of partition dicts to add to the table.
239
+ Example: `[{"year": 2020, "month": 8, "day": 14}, ...]`
240
+ table: table to add the partitions.
241
+ database: name of the database where the table is saved.
242
+ """
243
+ for partition_dict in partitions :
244
+ if not all (
245
+ (
246
+ isinstance (key , str )
247
+ and (isinstance (value , str ) or isinstance (value , int ))
248
+ )
249
+ for key , value in partition_dict .items ()
250
+ ):
251
+ raise ValueError (
252
+ "Partition keys must be column names "
253
+ "and values must be string or int."
254
+ )
255
+
256
+ database_expr = f"`{ database } `." if database else ""
257
+ key_values_expr = [
258
+ ", " .join (
259
+ [
260
+ "{} = {}" .format (k , v )
261
+ if not isinstance (v , str )
262
+ else "{} = '{}'" .format (k , v )
263
+ for k , v in partition .items ()
264
+ ]
265
+ )
266
+ for partition in partitions
267
+ ]
268
+ partitions_expr = " " .join (f"PARTITION ( { expr } )" for expr in key_values_expr )
269
+ command = (
270
+ f"ALTER TABLE { database_expr } `{ table } ` ADD IF NOT EXISTS { partitions_expr } "
271
+ )
272
+
273
+ self .conn .sql (command )
0 commit comments