Skip to content

Commit 98c0878

Browse files
authored
feat(MLOP-2604): Create delta writer (#410)
## **Why?** 📖 Currently, Delta operations are not treated as a standard writer in Butterfree, making it difficult to handle Delta tables in the same way as other feature store writers. This PR introduces a **Delta Feature Store Writer**, enabling seamless integration of Delta operations into the existing pipeline. ## **What?** 🔧 This PR introduces: - **DeltaConfig**: A configuration class for Delta table operations, allowing merge strategies and schema enforcement. - **DeltaFeatureStoreWriter**: A new writer to handle Delta tables as a feature store, supporting deduplication, conditional updates, and schema evolution. - **DeltaWriter**: Implements merge logic and optimization operations (such as `VACUUM` and `OPTIMIZE`) for Delta tables. ## **Type of Change** - [ ] Bug fix (non-breaking change that fixes an issue) - [x] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update - [ ] Release ## **How was everything tested?** 📏 - Created **unit tests** covering all functionalities. - Mocked Spark interactions to ensure correct behavior without requiring a running Delta Lake environment.
1 parent 44bb0de commit 98c0878

File tree

7 files changed

+604
-129
lines changed

7 files changed

+604
-129
lines changed

butterfree/configs/db/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22

33
from butterfree.configs.db.abstract_config import AbstractWriteConfig
44
from butterfree.configs.db.cassandra_config import CassandraConfig
5+
from butterfree.configs.db.delta import DeltaConfig
56
from butterfree.configs.db.kafka_config import KafkaConfig
67
from butterfree.configs.db.metastore_config import MetastoreConfig
78

8-
__all__ = ["AbstractWriteConfig", "CassandraConfig", "KafkaConfig", "MetastoreConfig"]
9+
__all__ = [
10+
"AbstractWriteConfig",
11+
"CassandraConfig",
12+
"KafkaConfig",
13+
"MetastoreConfig",
14+
"DeltaConfig",
15+
]

butterfree/configs/db/delta.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""Holds configurations for Delta Lake operations."""
2+
3+
from typing import Any, Dict, List, Optional
4+
5+
from butterfree.configs.db import AbstractWriteConfig
6+
7+
8+
class DeltaConfig(AbstractWriteConfig):
9+
"""Configuration for Delta Lake operations.
10+
11+
Attributes:
12+
database: Target database name for the Delta table.
13+
table: Target table name for the Delta table.
14+
merge_on: List of columns to use as merge keys.
15+
when_not_matched_insert: Optional condition for insert operations.
16+
when_matched_update: Optional condition for update operations.
17+
when_matched_delete: Optional condition for delete operations.
18+
"""
19+
20+
def __init__(
21+
self,
22+
database: str,
23+
table: str,
24+
merge_on: List[str],
25+
when_not_matched_insert: Optional[str] = None,
26+
when_matched_update: Optional[str] = None,
27+
when_matched_delete: Optional[str] = None,
28+
):
29+
self.database = database
30+
self.table = table
31+
self.merge_on = merge_on
32+
self.when_not_matched_insert = when_not_matched_insert
33+
self.when_matched_update = when_matched_update
34+
self.when_matched_delete = when_matched_delete
35+
36+
@property
37+
def database(self) -> str:
38+
"""Database name."""
39+
return self.__database
40+
41+
@database.setter
42+
def database(self, value: str) -> None:
43+
if not value:
44+
raise ValueError("Config 'database' cannot be empty.")
45+
self.__database = value
46+
47+
@property
48+
def table(self) -> str:
49+
"""Table name."""
50+
return self.__table
51+
52+
@table.setter
53+
def table(self, value: str) -> None:
54+
if not value:
55+
raise ValueError("Config 'table' cannot be empty.")
56+
self.__table = value
57+
58+
@property
59+
def merge_on(self) -> List[str]:
60+
"""List of columns to use as merge keys."""
61+
return self.__merge_on
62+
63+
@merge_on.setter
64+
def merge_on(self, value: List[str]) -> None:
65+
if not value:
66+
raise ValueError("Config 'merge_on' cannot be empty.")
67+
self.__merge_on = value
68+
69+
@property
70+
def mode(self) -> str:
71+
"""Write mode for Spark."""
72+
return "overwrite"
73+
74+
@property
75+
def format_(self) -> str:
76+
"""Write format for Spark."""
77+
return "delta"
78+
79+
@property
80+
def when_not_matched_insert(self) -> Optional[str]:
81+
"""Condition for insert operations."""
82+
return self.__when_not_matched_insert
83+
84+
@when_not_matched_insert.setter
85+
def when_not_matched_insert(self, value: Optional[str]) -> None:
86+
self.__when_not_matched_insert = value
87+
88+
@property
89+
def when_matched_update(self) -> Optional[str]:
90+
"""Condition for update operations."""
91+
return self.__when_matched_update
92+
93+
@when_matched_update.setter
94+
def when_matched_update(self, value: Optional[str]) -> None:
95+
self.__when_matched_update = value
96+
97+
@property
98+
def when_matched_delete(self) -> Optional[str]:
99+
"""Condition for delete operations."""
100+
return self.__when_matched_delete
101+
102+
@when_matched_delete.setter
103+
def when_matched_delete(self, value: Optional[str]) -> None:
104+
self.__when_matched_delete = value
105+
106+
def get_options(self, key: str) -> Dict[str, Any]:
107+
"""Get options for Delta Lake operations.
108+
109+
Args:
110+
key: table name in Delta Lake.
111+
112+
Returns:
113+
Configuration for Delta Lake operations.
114+
"""
115+
return {
116+
"table": self.table,
117+
"database": self.database,
118+
}
119+
120+
def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
121+
"""Get feature set schema to be translated.
122+
123+
Delta Lake uses the same types as Spark SQL, so no translation is needed.
124+
125+
Args:
126+
schema: Feature set schema in Spark format.
127+
128+
Returns:
129+
The same schema, as Delta Lake uses Spark SQL types.
130+
"""
131+
return schema
Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
"""Holds data loaders for historical and online feature store."""
22

3+
from butterfree.load.writers.delta_feature_store_writer import DeltaFeatureStoreWriter
34
from butterfree.load.writers.delta_writer import DeltaWriter
45
from butterfree.load.writers.historical_feature_store_writer import (
56
HistoricalFeatureStoreWriter,
67
)
78
from butterfree.load.writers.online_feature_store_writer import OnlineFeatureStoreWriter
89

9-
__all__ = ["HistoricalFeatureStoreWriter", "OnlineFeatureStoreWriter", "DeltaWriter"]
10+
__all__ = [
11+
"HistoricalFeatureStoreWriter",
12+
"OnlineFeatureStoreWriter",
13+
"DeltaWriter",
14+
"DeltaFeatureStoreWriter",
15+
]
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
from typing import Any, Dict, List, Optional
2+
3+
from pyspark.sql.dataframe import DataFrame
4+
5+
from butterfree.clients import SparkClient
6+
from butterfree.configs.db import DeltaConfig
7+
from butterfree.load.writers.delta_writer import DeltaWriter
8+
from butterfree.load.writers.writer import Writer
9+
from butterfree.transform import FeatureSet
10+
11+
12+
class DeltaFeatureStoreWriter(Writer):
13+
"""Enable writing feature sets into Delta tables with merge capabilities.
14+
15+
Attributes:
16+
database: database name to use for the Delta table.
17+
table: table name to write the feature set to.
18+
merge_on: list of columns to use as merge keys.
19+
deduplicate: whether to deduplicate data before merging based on featr set keys.
20+
Default is False.
21+
when_not_matched_insert: optional condition for insert operations.
22+
When provided, rows will only be inserted if this condition is true.
23+
when_matched_update: optional condition for update operations.
24+
When provided, rows will only be updated if this condition is true.
25+
Source columns can be referenced as source.<column_name> and target
26+
columns as target.<column_name>.
27+
when_matched_delete: optional condition for delete operations.
28+
When provided, rows will be deleted if this condition is true.
29+
Source and target columns can be referenced as in update conditions.
30+
31+
Example:
32+
Simple example regarding DeltaFeatureStoreWriter class instantiation.
33+
We can instantiate this class with basic merge configuration:
34+
35+
>>> from butterfree.load.writers import DeltaFeatureStoreWriter
36+
>>> spark_client = SparkClient()
37+
>>> writer = DeltaFeatureStoreWriter(
38+
... database="feature_store",
39+
... table="user_features",
40+
... merge_on=["id", "timestamp"]
41+
... )
42+
>>> writer.write(feature_set=feature_set,
43+
... dataframe=dataframe,
44+
... spark_client=spark_client)
45+
46+
We can also enable deduplication based on the feature set keys:
47+
48+
>>> writer = DeltaFeatureStoreWriter(
49+
... database="feature_store",
50+
... table="user_features",
51+
... merge_on=["id", "timestamp"],
52+
... deduplicate=True
53+
... )
54+
55+
For more control over the merge operation, we can add conditions:
56+
57+
>>> writer = DeltaFeatureStoreWriter(
58+
... database="feature_store",
59+
... table="user_features",
60+
... merge_on=["id", "timestamp"],
61+
... when_matched_update="source.value > target.value",
62+
... when_not_matched_insert="source.value > 0"
63+
... )
64+
65+
The writer supports schema evolution by default and will automatically
66+
handle updates to the feature set schema.
67+
68+
When writing with deduplication enabled, the writer will use the feature
69+
set's key columns and timestamp to ensure data quality by removing
70+
duplicates before merging.
71+
72+
For optimal performance, it's recommended to:
73+
1. Choose appropriate merge keys
74+
2. Use conditions to filter unnecessary updates/inserts
75+
3. Enable deduplication only when needed
76+
"""
77+
78+
def __init__(
79+
self,
80+
database: str,
81+
table: str,
82+
merge_on: List[str],
83+
when_not_matched_insert: Optional[str] = None,
84+
when_matched_update: Optional[str] = None,
85+
when_matched_delete: Optional[str] = None,
86+
):
87+
self.config = DeltaConfig(
88+
database=database,
89+
table=table,
90+
merge_on=merge_on,
91+
when_not_matched_insert=when_not_matched_insert,
92+
when_matched_update=when_matched_update,
93+
when_matched_delete=when_matched_delete,
94+
)
95+
self.row_count_validation = False
96+
97+
def write(
98+
self,
99+
dataframe: DataFrame,
100+
spark_client: SparkClient,
101+
feature_set: FeatureSet,
102+
) -> None:
103+
"""Merges the input dataframe into a Delta table.
104+
105+
Performs a Delta merge operation with the provided dataframe using the config
106+
merge settings. When deduplication is enabled, uses the feature set's key cols
107+
to remove duplicates before merging.
108+
109+
Args:
110+
dataframe: Spark dataframe with data to be merged.
111+
spark_client: Client with an active Spark connection.
112+
feature_set: Feature set instance containing schema and configuration.
113+
Used for deduplication when enabled.
114+
115+
Example:
116+
>>> from butterfree.load.writers import DeltaFeatureStoreWriter
117+
>>> writer = DeltaFeatureStoreWriter(
118+
... database="feature_store",
119+
... table="user_features",
120+
... merge_on=["id", "timestamp"],
121+
... deduplicate=True
122+
... )
123+
>>> writer.write(
124+
... dataframe=dataframe,
125+
... spark_client=spark_client,
126+
... feature_set=feature_set
127+
... )
128+
"""
129+
options = self.config.get_options(self.config.table)
130+
131+
DeltaWriter().merge(
132+
client=spark_client,
133+
database=options["database"],
134+
table=options["table"],
135+
merge_on=self.config.merge_on,
136+
source_df=dataframe,
137+
when_not_matched_insert=self.config.when_not_matched_insert,
138+
when_matched_update=self.config.when_matched_update,
139+
when_matched_delete=self.config.when_matched_delete,
140+
)
141+
142+
def validate(
143+
self,
144+
dataframe: DataFrame,
145+
spark_client: SparkClient,
146+
feature_set: FeatureSet,
147+
) -> None:
148+
"""Validates the dataframe written to Delta table.
149+
150+
In Delta tables, schema validation is handled by Delta's schema enforcement
151+
and evolution. No additional validation is needed.
152+
153+
Args:
154+
dataframe: Spark dataframe to be validated
155+
spark_client: Client for Spark connection
156+
feature_set: Feature set with the schema definition
157+
"""
158+
pass
159+
160+
def check_schema(self, dataframe: DataFrame, schema: List[Dict[str, Any]]) -> None:
161+
"""Checks if the dataframe schema matches the feature set schema.
162+
163+
Schema validation in Delta tables is handled by Delta Lake's schema enforcement
164+
and evolution capabilities.
165+
166+
Args:
167+
dataframe: Spark dataframe to be validated
168+
schema: Schema definition from the feature set
169+
"""
170+
pass

0 commit comments

Comments
 (0)