Skip to content

Commit 9bcca0e

Browse files
authored
feat(MLOP-1985): optional params (#347)
* feat: optional params
1 parent 97e44fa commit 9bcca0e

File tree

3 files changed

+39
-7
lines changed

3 files changed

+39
-7
lines changed

butterfree/extract/source.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,22 @@ class Source(HookableComponent):
4949
temporary views regarding each reader and, after, will run the
5050
desired query and return a dataframe.
5151
52+
The `eager_evaluation` param forces Spark to apply the currently
53+
mapped changes to the DataFrame. When this parameter is set to
54+
False, Spark follows its standard behaviour of lazy evaluation.
55+
Lazy evaluation can improve Spark's performance as it allows
56+
Spark to build the best version of the execution plan.
57+
5258
"""
5359

54-
def __init__(self, readers: List[Reader], query: str) -> None:
60+
def __init__(
61+
self, readers: List[Reader], query: str, eager_evaluation: bool = True,
62+
) -> None:
5563
super().__init__()
5664
self.enable_pre_hooks = False
5765
self.readers = readers
5866
self.query = query
67+
self.eager_evaluation = eager_evaluation
5968

6069
def construct(
6170
self, client: SparkClient, start_date: str = None, end_date: str = None
@@ -87,7 +96,7 @@ def construct(
8796

8897
dataframe = client.sql(self.query)
8998

90-
if not dataframe.isStreaming:
99+
if not dataframe.isStreaming and self.eager_evaluation:
91100
dataframe.cache().count()
92101

93102
post_hook_df = self.run_post_hooks(dataframe)

butterfree/transform/aggregated_feature_set.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,23 @@ def __init__(
197197
keys: List[KeyFeature],
198198
timestamp: TimestampFeature,
199199
features: List[Feature],
200+
deduplicate_rows: bool = True,
201+
eager_evaluation: bool = True,
200202
):
201203
self._windows: List[Any] = []
202204
self._pivot_column: Optional[str] = None
203205
self._pivot_values: Optional[List[Union[bool, float, int, str]]] = []
204206
self._distinct_subset: List[Any] = []
205207
self._distinct_keep: Optional[str] = None
206208
super(AggregatedFeatureSet, self).__init__(
207-
name, entity, description, keys, timestamp, features,
209+
name,
210+
entity,
211+
description,
212+
keys,
213+
timestamp,
214+
features,
215+
deduplicate_rows,
216+
eager_evaluation,
208217
)
209218

210219
@property
@@ -626,8 +635,10 @@ def construct(
626635
float("nan"), None
627636
)
628637
if not output_df.isStreaming:
629-
output_df = self._filter_duplicated_rows(output_df)
630-
output_df.cache().count()
638+
if self.deduplicate_rows:
639+
output_df = self._filter_duplicated_rows(output_df)
640+
if self.eager_evaluation:
641+
output_df.cache().count()
631642

632643
post_hook_df = self.run_post_hooks(output_df)
633644

butterfree/transform/feature_set.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ class FeatureSet(HookableComponent):
9797
values over key columns and timestamp column, we do this in order to reduce
9898
our dataframe (regarding the number of rows). A detailed explation of this
9999
method can be found at filter_duplicated_rows docstring.
100+
101+
The `eager_evaluation` param forces Spark to apply the currently
102+
mapped changes to the DataFrame. When this parameter is set to
103+
False, Spark follows its standard behaviour of lazy evaluation.
104+
Lazy evaluation can improve Spark's performance as it allows
105+
Spark to build the best version of the execution plan.
100106
"""
101107

102108
def __init__(
@@ -107,6 +113,8 @@ def __init__(
107113
keys: List[KeyFeature],
108114
timestamp: TimestampFeature,
109115
features: List[Feature],
116+
deduplicate_rows: bool = True,
117+
eager_evaluation: bool = True,
110118
) -> None:
111119
super().__init__()
112120
self.name = name
@@ -116,6 +124,8 @@ def __init__(
116124
self.timestamp = timestamp
117125
self.features = features
118126
self.incremental_strategy = IncrementalStrategy(column=TIMESTAMP_COLUMN)
127+
self.deduplicate_rows = deduplicate_rows
128+
self.eager_evaluation = eager_evaluation
119129

120130
@property
121131
def name(self) -> str:
@@ -426,8 +436,10 @@ def construct(
426436
).select(*self.columns)
427437

428438
if not output_df.isStreaming:
429-
output_df = self._filter_duplicated_rows(output_df)
430-
output_df.cache().count()
439+
if self.deduplicate_rows:
440+
output_df = self._filter_duplicated_rows(output_df)
441+
if self.eager_evaluation:
442+
output_df.cache().count()
431443

432444
output_df = self.incremental_strategy.filter_with_incremental_strategy(
433445
dataframe=output_df, start_date=start_date, end_date=end_date

0 commit comments

Comments
 (0)