Skip to content

Commit e8fc0da

Browse files
Fix Validation Step (#302)
1 parent 5fe4c40 commit e8fc0da

File tree

9 files changed

+12
-144
lines changed

9 files changed

+12
-144
lines changed

butterfree/configs/db/metastore_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,4 @@ def get_path_with_partitions(self, key: str, dataframe: DataFrame) -> List:
117117

118118
def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
119119
"""Translate feature set spark schema to the corresponding database."""
120-
pass
120+
return schema

butterfree/load/writers/historical_feature_store_writer.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,7 @@ def write(
140140
"""
141141
dataframe = self._create_partitions(dataframe)
142142

143-
partition_df = self._apply_transformations(dataframe)
144-
145-
if self.debug_mode:
146-
dataframe = partition_df
147-
else:
148-
dataframe = self.check_schema(
149-
spark_client, partition_df, feature_set.name, self.database
150-
)
143+
dataframe = self._apply_transformations(dataframe)
151144

152145
if self.interval_mode:
153146
if self.debug_mode:

butterfree/load/writers/online_feature_store_writer.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from pyspark.sql.functions import col, row_number
88
from pyspark.sql.streaming import StreamingQuery
99

10-
from butterfree.clients import CassandraClient, SparkClient
10+
from butterfree.clients import SparkClient
1111
from butterfree.configs.db import AbstractWriteConfig, CassandraConfig
1212
from butterfree.constants.columns import TIMESTAMP_COLUMN
1313
from butterfree.hooks import Hook
@@ -180,22 +180,6 @@ def write(
180180
"""
181181
table_name = feature_set.entity if self.write_to_entity else feature_set.name
182182

183-
if not self.debug_mode:
184-
config = (
185-
self.db_config
186-
if self.db_config == CassandraConfig
187-
else CassandraConfig()
188-
)
189-
190-
cassandra_client = CassandraClient(
191-
host=[config.host],
192-
keyspace=config.keyspace,
193-
user=config.username,
194-
password=config.password,
195-
)
196-
197-
dataframe = self.check_schema(cassandra_client, dataframe, table_name)
198-
199183
if dataframe.isStreaming:
200184
dataframe = self._apply_transformations(dataframe)
201185
if self.debug_mode:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from setuptools import find_packages, setup
22

33
__package_name__ = "butterfree"
4-
__version__ = "1.2.0.dev2"
4+
__version__ = "1.2.0.dev3"
55
__repository_url__ = "https://github.com/quintoandar/butterfree"
66

77
with open("requirements.txt") as f:

tests/integration/butterfree/load/test_sink.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
)
1010

1111

12-
def test_sink(input_dataframe, feature_set, mocker):
12+
def test_sink(input_dataframe, feature_set):
1313
# arrange
1414
client = SparkClient()
1515
client.conn.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
@@ -34,11 +34,6 @@ def test_sink(input_dataframe, feature_set, mocker):
3434
db_config=s3config, interval_mode=True
3535
)
3636

37-
schema_dataframe = historical_writer._create_partitions(feature_set_df)
38-
historical_writer.check_schema_hook = mocker.stub("check_schema_hook")
39-
historical_writer.check_schema_hook.run = mocker.stub("run")
40-
historical_writer.check_schema_hook.run.return_value = schema_dataframe
41-
4237
# setup online writer
4338
# TODO: Change for CassandraConfig when Cassandra for test is ready
4439
online_config = Mock()
@@ -49,10 +44,6 @@ def test_sink(input_dataframe, feature_set, mocker):
4944
)
5045
online_writer = OnlineFeatureStoreWriter(db_config=online_config)
5146

52-
online_writer.check_schema_hook = mocker.stub("check_schema_hook")
53-
online_writer.check_schema_hook.run = mocker.stub("run")
54-
online_writer.check_schema_hook.run.return_value = feature_set_df
55-
5647
writers = [historical_writer, online_writer]
5748
sink = Sink(writers)
5849

tests/integration/butterfree/pipelines/test_feature_set_pipeline.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from pyspark.sql import DataFrame
55
from pyspark.sql import functions as F
66

7-
from butterfree.clients import SparkClient
87
from butterfree.configs import environment
98
from butterfree.configs.db import MetastoreConfig
109
from butterfree.constants import DataType
@@ -75,11 +74,7 @@ def create_ymd(dataframe):
7574

7675
class TestFeatureSetPipeline:
7776
def test_feature_set_pipeline(
78-
self,
79-
mocked_df,
80-
spark_session,
81-
fixed_windows_output_feature_set_dataframe,
82-
mocker,
77+
self, mocked_df, spark_session, fixed_windows_output_feature_set_dataframe,
8378
):
8479
# arrange
8580
table_reader_id = "a_source"
@@ -93,11 +88,6 @@ def test_feature_set_pipeline(
9388
table_reader_table=table_reader_table,
9489
)
9590

96-
spark_client = SparkClient()
97-
spark_client.conn.conf.set(
98-
"spark.sql.sources.partitionOverwriteMode", "dynamic"
99-
)
100-
10191
dbconfig = Mock()
10292
dbconfig.mode = "overwrite"
10393
dbconfig.format_ = "parquet"
@@ -107,12 +97,6 @@ def test_feature_set_pipeline(
10797

10898
historical_writer = HistoricalFeatureStoreWriter(db_config=dbconfig)
10999

110-
historical_writer.check_schema_hook = mocker.stub("check_schema_hook")
111-
historical_writer.check_schema_hook.run = mocker.stub("run")
112-
historical_writer.check_schema_hook.run.return_value = (
113-
fixed_windows_output_feature_set_dataframe
114-
)
115-
116100
# act
117101
test_pipeline = FeatureSetPipeline(
118102
source=Source(
@@ -187,7 +171,6 @@ def test_feature_set_pipeline_with_dates(
187171
spark_session,
188172
fixed_windows_output_feature_set_date_dataframe,
189173
feature_set_pipeline,
190-
mocker,
191174
):
192175
# arrange
193176
table_reader_table = "b_table"
@@ -211,7 +194,6 @@ def test_feature_set_pipeline_with_execution_date(
211194
spark_session,
212195
fixed_windows_output_feature_set_date_dataframe,
213196
feature_set_pipeline,
214-
mocker,
215197
):
216198
# arrange
217199
table_reader_table = "b_table"
@@ -233,7 +215,7 @@ def test_feature_set_pipeline_with_execution_date(
233215
# assert
234216
assert_dataframe_equality(df, target_df)
235217

236-
def test_pipeline_with_hooks(self, spark_session, mocker):
218+
def test_pipeline_with_hooks(self, spark_session):
237219
# arrange
238220
hook1 = AddHook(value=1)
239221

tests/unit/butterfree/load/test_sink.py

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def test_flush_with_writers_list_empty(self):
120120
with pytest.raises(ValueError):
121121
Sink(writers=writer)
122122

123-
def test_flush_streaming_df(self, feature_set, mocker):
123+
def test_flush_streaming_df(self, feature_set):
124124
"""Testing the return of the streaming handlers by the sink."""
125125
# arrange
126126
spark_client = SparkClient()
@@ -137,24 +137,10 @@ def test_flush_streaming_df(self, feature_set, mocker):
137137

138138
online_feature_store_writer = OnlineFeatureStoreWriter()
139139

140-
online_feature_store_writer.check_schema_hook = mocker.stub("check_schema_hook")
141-
online_feature_store_writer.check_schema_hook.run = mocker.stub("run")
142-
online_feature_store_writer.check_schema_hook.run.return_value = (
143-
mocked_stream_df
144-
)
145-
146140
online_feature_store_writer_on_entity = OnlineFeatureStoreWriter(
147141
write_to_entity=True
148142
)
149143

150-
online_feature_store_writer_on_entity.check_schema_hook = mocker.stub(
151-
"check_schema_hook"
152-
)
153-
online_feature_store_writer_on_entity.check_schema_hook.run = mocker.stub("run")
154-
online_feature_store_writer_on_entity.check_schema_hook.run.return_value = (
155-
mocked_stream_df
156-
)
157-
158144
sink = Sink(
159145
writers=[
160146
online_feature_store_writer,
@@ -177,7 +163,7 @@ def test_flush_streaming_df(self, feature_set, mocker):
177163
assert isinstance(handler, StreamingQuery)
178164

179165
def test_flush_with_multiple_online_writers(
180-
self, feature_set, feature_set_dataframe, mocker
166+
self, feature_set, feature_set_dataframe
181167
):
182168
"""Testing the flow of writing to a feature-set table and to an entity table."""
183169
# arrange
@@ -189,24 +175,10 @@ def test_flush_with_multiple_online_writers(
189175

190176
online_feature_store_writer = OnlineFeatureStoreWriter()
191177

192-
online_feature_store_writer.check_schema_hook = mocker.stub("check_schema_hook")
193-
online_feature_store_writer.check_schema_hook.run = mocker.stub("run")
194-
online_feature_store_writer.check_schema_hook.run.return_value = (
195-
feature_set_dataframe
196-
)
197-
198178
online_feature_store_writer_on_entity = OnlineFeatureStoreWriter(
199179
write_to_entity=True
200180
)
201181

202-
online_feature_store_writer_on_entity.check_schema_hook = mocker.stub(
203-
"check_schema_hook"
204-
)
205-
online_feature_store_writer_on_entity.check_schema_hook.run = mocker.stub("run")
206-
online_feature_store_writer_on_entity.check_schema_hook.run.return_value = (
207-
feature_set_dataframe
208-
)
209-
210182
sink = Sink(
211183
writers=[online_feature_store_writer, online_feature_store_writer_on_entity]
212184
)

tests/unit/butterfree/load/writers/test_historical_feature_store_writer.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ def test_write(
2323
spark_client.write_table = mocker.stub("write_table")
2424
writer = HistoricalFeatureStoreWriter()
2525

26-
schema_dataframe = writer._create_partitions(feature_set_dataframe)
27-
writer.check_schema_hook = mocker.stub("check_schema_hook")
28-
writer.check_schema_hook.run = mocker.stub("run")
29-
writer.check_schema_hook.run.return_value = schema_dataframe
30-
3126
# when
3227
writer.write(
3328
feature_set=feature_set,
@@ -62,11 +57,6 @@ def test_write_interval_mode(
6257
)
6358
writer = HistoricalFeatureStoreWriter(interval_mode=True)
6459

65-
schema_dataframe = writer._create_partitions(feature_set_dataframe)
66-
writer.check_schema_hook = mocker.stub("check_schema_hook")
67-
writer.check_schema_hook.run = mocker.stub("run")
68-
writer.check_schema_hook.run.return_value = schema_dataframe
69-
7060
# when
7161
writer.write(
7262
feature_set=feature_set,
@@ -104,11 +94,6 @@ def test_write_interval_mode_invalid_partition_mode(
10494

10595
writer = HistoricalFeatureStoreWriter(interval_mode=True)
10696

107-
schema_dataframe = writer._create_partitions(feature_set_dataframe)
108-
writer.check_schema_hook = mocker.stub("check_schema_hook")
109-
writer.check_schema_hook.run = mocker.stub("run")
110-
writer.check_schema_hook.run.return_value = schema_dataframe
111-
11297
# when
11398
with pytest.raises(RuntimeError):
11499
_ = writer.write(
@@ -123,7 +108,6 @@ def test_write_in_debug_mode(
123108
historical_feature_set_dataframe,
124109
feature_set,
125110
spark_session,
126-
mocker,
127111
):
128112
# given
129113
spark_client = SparkClient()
@@ -321,12 +305,6 @@ def test_write_with_transform(
321305

322306
writer = HistoricalFeatureStoreWriter().with_(json_transform)
323307

324-
schema_dataframe = writer._create_partitions(feature_set_dataframe)
325-
json_dataframe = writer._apply_transformations(schema_dataframe)
326-
writer.check_schema_hook = mocker.stub("check_schema_hook")
327-
writer.check_schema_hook.run = mocker.stub("run")
328-
writer.check_schema_hook.run.return_value = json_dataframe
329-
330308
# when
331309
writer.write(
332310
feature_set=feature_set,

0 commit comments

Comments
 (0)