|
| 1 | +from delta.tables import DeltaTable |
| 2 | +from pyspark.sql.dataframe import DataFrame |
| 3 | + |
| 4 | +from butterfree.clients import SparkClient |
| 5 | +from butterfree.configs.logger import __logger |
| 6 | + |
| 7 | +logger = __logger("delta_writer", True) |
| 8 | + |
| 9 | + |
| 10 | +class DeltaWriter: |
| 11 | + """Control operations on Delta Tables. |
| 12 | +
|
| 13 | + Resposible for merging and optimizing. |
| 14 | + """ |
| 15 | + |
| 16 | + @staticmethod |
| 17 | + def _get_full_table_name(table, database): |
| 18 | + if database: |
| 19 | + return "{}.{}".format(database, table) |
| 20 | + else: |
| 21 | + return table |
| 22 | + |
| 23 | + @staticmethod |
| 24 | + def _convert_to_delta(client: SparkClient, table: str): |
| 25 | + logger.info(f"Converting {table} to Delta...") |
| 26 | + client.conn.sql(f"CONVERT TO DELTA {table}") |
| 27 | + logger.info("Conversion done.") |
| 28 | + |
| 29 | + @staticmethod |
| 30 | + def merge( |
| 31 | + client: SparkClient, |
| 32 | + database: str, |
| 33 | + table: str, |
| 34 | + merge_on: list, |
| 35 | + source_df: DataFrame, |
| 36 | + when_not_matched_insert_condition: str = None, |
| 37 | + when_matched_update_condition: str = None, |
| 38 | + when_matched_delete_condition: str = None, |
| 39 | + ): |
| 40 | + """ |
| 41 | + Merge a source dataframe to a Delta table. |
| 42 | +
|
| 43 | + By default, it will update when matched, and insert when |
| 44 | + not matched (simple upsert). |
| 45 | +
|
| 46 | + You can change this behavior by setting: |
| 47 | + - when_not_matched_insert_condition: it will only insert |
| 48 | + when this specified condition is true |
| 49 | + - when_matched_update_condition: it will only update when this |
| 50 | + specified condition is true. You can refer to the columns |
| 51 | + in the source dataframe as source.<column_name>, and the columns |
| 52 | + in the target table as target.<column_name>. |
| 53 | + - when_matched_delete_condition: it will add an operation to delete, |
| 54 | + but only if this condition is true. Again, source and |
| 55 | + target dataframe columns can be referred to respectively as |
| 56 | + source.<column_name> and target.<column_name> |
| 57 | + """ |
| 58 | + try: |
| 59 | + full_table_name = DeltaWriter._get_full_table_name(table, database) |
| 60 | + |
| 61 | + table_exists = client.conn.catalog.tableExists(full_table_name) |
| 62 | + |
| 63 | + if table_exists: |
| 64 | + pd_df = client.conn.sql( |
| 65 | + f"DESCRIBE TABLE EXTENDED {full_table_name}" |
| 66 | + ).toPandas() |
| 67 | + provider = ( |
| 68 | + pd_df.reset_index() |
| 69 | + .groupby(["col_name"])["data_type"] |
| 70 | + .aggregate("first") |
| 71 | + .Provider |
| 72 | + ) |
| 73 | + table_is_delta = provider.lower() == "delta" |
| 74 | + |
| 75 | + if not table_is_delta: |
| 76 | + DeltaWriter()._convert_to_delta(client, full_table_name) |
| 77 | + |
| 78 | + # For schema evolution |
| 79 | + client.conn.conf.set( |
| 80 | + "spark.databricks.delta.schema.autoMerge.enabled", "true" |
| 81 | + ) |
| 82 | + |
| 83 | + target_table = DeltaTable.forName(client.conn, full_table_name) |
| 84 | + join_condition = " AND ".join( |
| 85 | + [f"source.{col} = target.{col}" for col in merge_on] |
| 86 | + ) |
| 87 | + merge_builder = target_table.alias("target").merge( |
| 88 | + source_df.alias("source"), join_condition |
| 89 | + ) |
| 90 | + if when_matched_delete_condition: |
| 91 | + merge_builder = merge_builder.whenMatchedDelete( |
| 92 | + condition=when_matched_delete_condition |
| 93 | + ) |
| 94 | + |
| 95 | + merge_builder.whenMatchedUpdateAll( |
| 96 | + condition=when_matched_update_condition |
| 97 | + ).whenNotMatchedInsertAll( |
| 98 | + condition=when_not_matched_insert_condition |
| 99 | + ).execute() |
| 100 | + except Exception as e: |
| 101 | + logger.error(f"Merge operation on {full_table_name} failed: {e}") |
| 102 | + |
| 103 | + @staticmethod |
| 104 | + def vacuum(table: str, retention_hours: int, client: SparkClient): |
| 105 | + """Vacuum a Delta table. |
| 106 | +
|
| 107 | + Vacuum remove unused files (files not managed by Delta + files |
| 108 | + that are not in the latest state). |
| 109 | + After vacuum it's impossible to time travel to versions |
| 110 | + older than the `retention` time. |
| 111 | + Default retention is 7 days. Lower retentions will be warned, |
| 112 | + unless it's set to false. |
| 113 | + Set spark.databricks.delta.retentionDurationCheck.enabled |
| 114 | + to false for low retentions. |
| 115 | + https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html |
| 116 | + """ |
| 117 | + |
| 118 | + command = f"VACUUM {table} RETAIN {retention_hours} HOURS" |
| 119 | + logger.info(f"Running vacuum with command {command}") |
| 120 | + client.conn.sql(command) |
| 121 | + logger.info(f"Vacuum successful for table {table}") |
| 122 | + |
| 123 | + @staticmethod |
| 124 | + def optimize( |
| 125 | + client: SparkClient, |
| 126 | + table: str = None, |
| 127 | + z_order: list = None, |
| 128 | + date_column: str = "timestamp", |
| 129 | + from_date: str = None, |
| 130 | + auto_compact: bool = False, |
| 131 | + optimize_write: bool = False, |
| 132 | + ): |
| 133 | + """Optimize a Delta table. |
| 134 | +
|
| 135 | + For auto-compaction and optimize write DBR >= 14.3 LTS |
| 136 | + and Delta >= 3.1.0 are MANDATORY. |
| 137 | + For z-ordering DBR >= 13.3 LTS and Delta >= 2.0.0 are MANDATORY. |
| 138 | + Auto-compaction (recommended) reduces the small file problem |
| 139 | + (overhead due to lots of metadata). |
| 140 | + Z-order by columns that is commonly used in queries |
| 141 | + predicates and has a high cardinality. |
| 142 | + https://docs.delta.io/latest/optimizations-oss.html |
| 143 | + """ |
| 144 | + |
| 145 | + if auto_compact: |
| 146 | + client.conf.set("spark.databricks.delta.autoCompact.enabled", "true") |
| 147 | + |
| 148 | + if optimize_write: |
| 149 | + client.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true") |
| 150 | + |
| 151 | + if table: |
| 152 | + command = f"OPTIMIZE {table}" |
| 153 | + |
| 154 | + if from_date: |
| 155 | + command += f"WHERE {date_column} >= {from_date}" |
| 156 | + |
| 157 | + if z_order: |
| 158 | + command += f" ZORDER BY {','.join(z_order)}" |
| 159 | + |
| 160 | + logger.info(f"Running optimize with command {command}...") |
| 161 | + client.conn.sql(command) |
| 162 | + logger.info(f"Optimize successful for table {table}.") |
0 commit comments