Skip to content

Commit 35dd929

Browse files
authored
Add Delta support (#370)
* feat: delta
1 parent 12d5e98 commit 35dd929

35 files changed

+560
-54
lines changed

.github/workflows/publish.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ on:
44
paths:
55
- 'setup.py'
66

7-
87
jobs:
98
Pipeline:
109
if: github.ref == 'refs/heads/master'
11-
1210
runs-on: ubuntu-latest
1311

1412
steps:
@@ -19,7 +17,7 @@ jobs:
1917

2018
- uses: actions/setup-java@v4
2119
with:
22-
java-version: '11'
20+
java-version: '17'
2321
distribution: microsoft
2422

2523
- uses: vemonet/setup-spark@v1

.github/workflows/skip_lint.yml

Lines changed: 0 additions & 17 deletions
This file was deleted.

.github/workflows/staging.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ on:
77
jobs:
88
Pipeline:
99
if: github.ref == 'refs/heads/staging'
10-
1110
runs-on: ubuntu-latest
1211

1312
steps:
@@ -18,7 +17,7 @@ jobs:
1817

1918
- uses: actions/setup-java@v4
2019
with:
21-
java-version: '11'
20+
java-version: '17'
2221
distribution: microsoft
2322

2423
- uses: vemonet/setup-spark@v1

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919

2020
- uses: actions/setup-java@v4
2121
with:
22-
java-version: '11'
22+
java-version: '17'
2323
distribution: microsoft
2424

2525
- uses: vemonet/setup-spark@v1

CHANGELOG.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,28 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each
55

66
## [Unreleased]
77

8+
## [1.3.5](https://github.com/quintoandar/butterfree/releases/tag/1.3.5)
9+
* Auto create feature sets ([#368](https://github.com/quintoandar/butterfree/pull/368))
10+
11+
## [1.3.4](https://github.com/quintoandar/butterfree/releases/tag/1.3.4)
12+
* Fix Cassandra Config and tests ([#366](https://github.com/quintoandar/butterfree/pull/366))
13+
14+
## [1.3.3](https://github.com/quintoandar/butterfree/releases/tag/1.3.3)
15+
* Fix Cassandra Config and Numpy version ([#364](https://github.com/quintoandar/butterfree/pull/364))
16+
17+
## [1.3.2](https://github.com/quintoandar/butterfree/releases/tag/1.3.2)
18+
* Fix publish script ([#362](https://github.com/quintoandar/butterfree/pull/362))
19+
20+
## [1.3.2](https://github.com/quintoandar/butterfree/releases/tag/1.3.2)
21+
* Fix publish script ([#360](https://github.com/quintoandar/butterfree/pull/362))
22+
23+
## [1.3.1](https://github.com/quintoandar/butterfree/releases/tag/1.3.1)
24+
* Timestamp NTZ available ([#360](https://github.com/quintoandar/butterfree/pull/360))
25+
26+
## [1.3.0](https://github.com/quintoandar/butterfree/releases/tag/1.3.0)
27+
* Bump versions ([#355](https://github.com/quintoandar/butterfree/pull/355))
28+
* Sphinx version ([#356](https://github.com/quintoandar/butterfree/pull/356))
29+
830
## [1.2.4](https://github.com/quintoandar/butterfree/releases/tag/1.2.4)
931
* Auto create feature sets ([#351](https://github.com/quintoandar/butterfree/pull/351))
1032

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ minimum-requirements:
3636

3737
.PHONY: requirements
3838
## install all requirements
39-
requirements: requirements-test requirements-lint dev-requirements minimum-requirements
39+
requirements: minimum-requirements dev-requirements requirements-test requirements-lint
4040

4141
.PHONY: ci-install
4242
ci-install:
@@ -146,6 +146,7 @@ package-name:
146146
.PHONY: package
147147
## build butterfree package wheel
148148
package:
149+
@PYTHONPATH=. pip3 install wheel
149150
@PYTHONPATH=. python -m setup sdist bdist_wheel
150151

151152
.PHONY: update-docs

butterfree/clients/spark_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def conn(self) -> SparkSession:
3030
"""
3131
if not self._session:
3232
self._session = SparkSession.builder.getOrCreate()
33+
3334
return self._session
3435

3536
def read(
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
"""Holds data loaders for historical and online feature store."""
22

3+
from butterfree.load.writers.delta_writer import DeltaWriter
34
from butterfree.load.writers.historical_feature_store_writer import (
45
HistoricalFeatureStoreWriter,
56
)
67
from butterfree.load.writers.online_feature_store_writer import OnlineFeatureStoreWriter
78

8-
__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter"]
9+
__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter", "DeltaWriter"]
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
from delta.tables import DeltaTable
2+
from pyspark.sql.dataframe import DataFrame
3+
4+
from butterfree.clients import SparkClient
5+
from butterfree.configs.logger import __logger
6+
7+
logger = __logger("delta_writer", True)
8+
9+
10+
class DeltaWriter:
11+
"""Control operations on Delta Tables.
12+
13+
Resposible for merging and optimizing.
14+
"""
15+
16+
@staticmethod
17+
def _get_full_table_name(table, database):
18+
if database:
19+
return "{}.{}".format(database, table)
20+
else:
21+
return table
22+
23+
@staticmethod
24+
def _convert_to_delta(client: SparkClient, table: str):
25+
logger.info(f"Converting {table} to Delta...")
26+
client.conn.sql(f"CONVERT TO DELTA {table}")
27+
logger.info("Conversion done.")
28+
29+
@staticmethod
30+
def merge(
31+
client: SparkClient,
32+
database: str,
33+
table: str,
34+
merge_on: list,
35+
source_df: DataFrame,
36+
when_not_matched_insert_condition: str = None,
37+
when_matched_update_condition: str = None,
38+
when_matched_delete_condition: str = None,
39+
):
40+
"""
41+
Merge a source dataframe to a Delta table.
42+
43+
By default, it will update when matched, and insert when
44+
not matched (simple upsert).
45+
46+
You can change this behavior by setting:
47+
- when_not_matched_insert_condition: it will only insert
48+
when this specified condition is true
49+
- when_matched_update_condition: it will only update when this
50+
specified condition is true. You can refer to the columns
51+
in the source dataframe as source.<column_name>, and the columns
52+
in the target table as target.<column_name>.
53+
- when_matched_delete_condition: it will add an operation to delete,
54+
but only if this condition is true. Again, source and
55+
target dataframe columns can be referred to respectively as
56+
source.<column_name> and target.<column_name>
57+
"""
58+
try:
59+
full_table_name = DeltaWriter._get_full_table_name(table, database)
60+
61+
table_exists = client.conn.catalog.tableExists(full_table_name)
62+
63+
if table_exists:
64+
pd_df = client.conn.sql(
65+
f"DESCRIBE TABLE EXTENDED {full_table_name}"
66+
).toPandas()
67+
provider = (
68+
pd_df.reset_index()
69+
.groupby(["col_name"])["data_type"]
70+
.aggregate("first")
71+
.Provider
72+
)
73+
table_is_delta = provider.lower() == "delta"
74+
75+
if not table_is_delta:
76+
DeltaWriter()._convert_to_delta(client, full_table_name)
77+
78+
# For schema evolution
79+
client.conn.conf.set(
80+
"spark.databricks.delta.schema.autoMerge.enabled", "true"
81+
)
82+
83+
target_table = DeltaTable.forName(client.conn, full_table_name)
84+
join_condition = " AND ".join(
85+
[f"source.{col} = target.{col}" for col in merge_on]
86+
)
87+
merge_builder = target_table.alias("target").merge(
88+
source_df.alias("source"), join_condition
89+
)
90+
if when_matched_delete_condition:
91+
merge_builder = merge_builder.whenMatchedDelete(
92+
condition=when_matched_delete_condition
93+
)
94+
95+
merge_builder.whenMatchedUpdateAll(
96+
condition=when_matched_update_condition
97+
).whenNotMatchedInsertAll(
98+
condition=when_not_matched_insert_condition
99+
).execute()
100+
except Exception as e:
101+
logger.error(f"Merge operation on {full_table_name} failed: {e}")
102+
103+
@staticmethod
104+
def vacuum(table: str, retention_hours: int, client: SparkClient):
105+
"""Vacuum a Delta table.
106+
107+
Vacuum remove unused files (files not managed by Delta + files
108+
that are not in the latest state).
109+
After vacuum it's impossible to time travel to versions
110+
older than the `retention` time.
111+
Default retention is 7 days. Lower retentions will be warned,
112+
unless it's set to false.
113+
Set spark.databricks.delta.retentionDurationCheck.enabled
114+
to false for low retentions.
115+
https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html
116+
"""
117+
118+
command = f"VACUUM {table} RETAIN {retention_hours} HOURS"
119+
logger.info(f"Running vacuum with command {command}")
120+
client.conn.sql(command)
121+
logger.info(f"Vacuum successful for table {table}")
122+
123+
@staticmethod
124+
def optimize(
125+
client: SparkClient,
126+
table: str = None,
127+
z_order: list = None,
128+
date_column: str = "timestamp",
129+
from_date: str = None,
130+
auto_compact: bool = False,
131+
optimize_write: bool = False,
132+
):
133+
"""Optimize a Delta table.
134+
135+
For auto-compaction and optimize write DBR >= 14.3 LTS
136+
and Delta >= 3.1.0 are MANDATORY.
137+
For z-ordering DBR >= 13.3 LTS and Delta >= 2.0.0 are MANDATORY.
138+
Auto-compaction (recommended) reduces the small file problem
139+
(overhead due to lots of metadata).
140+
Z-order by columns that is commonly used in queries
141+
predicates and has a high cardinality.
142+
https://docs.delta.io/latest/optimizations-oss.html
143+
"""
144+
145+
if auto_compact:
146+
client.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
147+
148+
if optimize_write:
149+
client.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
150+
151+
if table:
152+
command = f"OPTIMIZE {table}"
153+
154+
if from_date:
155+
command += f"WHERE {date_column} >= {from_date}"
156+
157+
if z_order:
158+
command += f" ZORDER BY {','.join(z_order)}"
159+
160+
logger.info(f"Running optimize with command {command}...")
161+
client.conn.sql(command)
162+
logger.info(f"Optimize successful for table {table}.")

butterfree/load/writers/historical_feature_store_writer.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from butterfree.dataframe_service import repartition_df
1515
from butterfree.hooks import Hook
1616
from butterfree.hooks.schema_compatibility import SparkTableSchemaCompatibilityHook
17+
from butterfree.load.writers.delta_writer import DeltaWriter
1718
from butterfree.load.writers.writer import Writer
1819
from butterfree.transform import FeatureSet
1920

@@ -114,13 +115,15 @@ def __init__(
114115
interval_mode: bool = False,
115116
check_schema_hook: Optional[Hook] = None,
116117
row_count_validation: bool = True,
118+
merge_on: list = None,
117119
):
118120
super(HistoricalFeatureStoreWriter, self).__init__(
119121
db_config or MetastoreConfig(),
120122
debug_mode,
121123
interval_mode,
122124
False,
123125
row_count_validation,
126+
merge_on,
124127
)
125128
self.database = database or environment.get_variable(
126129
"FEATURE_STORE_HISTORICAL_DATABASE"
@@ -141,6 +144,7 @@ def write(
141144
feature_set: object processed with feature_set informations.
142145
dataframe: spark dataframe containing data from a feature set.
143146
spark_client: client for spark connections with external services.
147+
merge_on: when filled, the writing is an upsert in a Delta table.
144148
145149
If the debug_mode is set to True, a temporary table with a name in the format:
146150
historical_feature_store__{feature_set.name} will be created instead of writing
@@ -174,13 +178,22 @@ def write(
174178

175179
s3_key = os.path.join("historical", feature_set.entity, feature_set.name)
176180

177-
spark_client.write_table(
178-
dataframe=dataframe,
179-
database=self.database,
180-
table_name=feature_set.name,
181-
partition_by=self.PARTITION_BY,
182-
**self.db_config.get_options(s3_key),
183-
)
181+
if self.merge_on:
182+
DeltaWriter.merge(
183+
client=spark_client,
184+
database=self.database,
185+
table=feature_set.name,
186+
merge_on=self.merge_on,
187+
source_df=dataframe,
188+
)
189+
else:
190+
spark_client.write_table(
191+
dataframe=dataframe,
192+
database=self.database,
193+
table_name=feature_set.name,
194+
partition_by=self.PARTITION_BY,
195+
**self.db_config.get_options(s3_key),
196+
)
184197

185198
def _assert_validation_count(
186199
self, table_name: str, written_count: int, dataframe_count: int

0 commit comments

Comments
 (0)