Skip to content

Commit 53abb52

Browse files
AlvaroMarquesAndraderalphrass
authored andcommitted
Fix Validation Step (#302)
1 parent 46ba599 commit 53abb52

File tree

7 files changed

+8
-98
lines changed

7 files changed

+8
-98
lines changed

butterfree/load/writers/historical_feature_store_writer.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,7 @@ def write(
146146
"""
147147
dataframe = self._create_partitions(dataframe)
148148

149-
partition_df = self._apply_transformations(dataframe)
150-
151-
if self.debug_mode:
152-
dataframe = partition_df
153-
else:
154-
dataframe = self.check_schema(
155-
spark_client, partition_df, feature_set.name, self.database
156-
)
149+
dataframe = self._apply_transformations(dataframe)
157150

158151
if self.interval_mode:
159152
if self.debug_mode:

butterfree/load/writers/online_feature_store_writer.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from pyspark.sql.functions import col, row_number
88
from pyspark.sql.streaming import StreamingQuery
99

10-
from butterfree.clients import CassandraClient, SparkClient
10+
from butterfree.clients import SparkClient
1111
from butterfree.configs.db import AbstractWriteConfig, CassandraConfig
1212
from butterfree.constants.columns import TIMESTAMP_COLUMN
1313
from butterfree.hooks import Hook
@@ -182,22 +182,6 @@ def write(
182182
"""
183183
table_name = feature_set.entity if self.write_to_entity else feature_set.name
184184

185-
if not self.debug_mode:
186-
config = (
187-
self.db_config
188-
if self.db_config == CassandraConfig
189-
else CassandraConfig()
190-
)
191-
192-
cassandra_client = CassandraClient(
193-
host=[config.host],
194-
keyspace=config.keyspace,
195-
user=config.username,
196-
password=config.password,
197-
)
198-
199-
dataframe = self.check_schema(cassandra_client, dataframe, table_name)
200-
201185
if dataframe.isStreaming:
202186
dataframe = self._apply_transformations(dataframe)
203187
if self.debug_mode:

tests/integration/butterfree/load/test_sink.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
)
1010

1111

12-
def test_sink(input_dataframe, feature_set, mocker):
12+
def test_sink(input_dataframe, feature_set):
1313
# arrange
1414
client = SparkClient()
1515
client.conn.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
@@ -44,10 +44,6 @@ def test_sink(input_dataframe, feature_set, mocker):
4444
)
4545
online_writer = OnlineFeatureStoreWriter(db_config=online_config)
4646

47-
online_writer.check_schema_hook = mocker.stub("check_schema_hook")
48-
online_writer.check_schema_hook.run = mocker.stub("run")
49-
online_writer.check_schema_hook.run.return_value = feature_set_df
50-
5147
writers = [historical_writer, online_writer]
5248
sink = Sink(writers)
5349

tests/integration/butterfree/pipelines/test_feature_set_pipeline.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from pyspark.sql import DataFrame
55
from pyspark.sql import functions as F
66

7-
from butterfree.clients import SparkClient
87
from butterfree.configs import environment
98
from butterfree.configs.db import MetastoreConfig
109
from butterfree.constants import DataType
@@ -419,7 +418,6 @@ def test_feature_set_pipeline_with_dates(
419418
spark_session,
420419
fixed_windows_output_feature_set_date_dataframe,
421420
feature_set_pipeline,
422-
mocker,
423421
):
424422
# arrange
425423
table_reader_table = "b_table"
@@ -443,7 +441,6 @@ def test_feature_set_pipeline_with_execution_date(
443441
spark_session,
444442
fixed_windows_output_feature_set_date_dataframe,
445443
feature_set_pipeline,
446-
mocker,
447444
):
448445
# arrange
449446
table_reader_table = "b_table"
@@ -465,7 +462,7 @@ def test_feature_set_pipeline_with_execution_date(
465462
# assert
466463
assert_dataframe_equality(df, target_df)
467464

468-
def test_pipeline_with_hooks(self, spark_session, mocker):
465+
def test_pipeline_with_hooks(self, spark_session):
469466
# arrange
470467
hook1 = AddHook(value=1)
471468

tests/unit/butterfree/load/test_sink.py

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def test_flush_with_writers_list_empty(self):
120120
with pytest.raises(ValueError):
121121
Sink(writers=writer)
122122

123-
def test_flush_streaming_df(self, feature_set, mocker):
123+
def test_flush_streaming_df(self, feature_set):
124124
"""Testing the return of the streaming handlers by the sink."""
125125
# arrange
126126
spark_client = SparkClient()
@@ -141,14 +141,6 @@ def test_flush_streaming_df(self, feature_set, mocker):
141141
write_to_entity=True
142142
)
143143

144-
online_feature_store_writer_on_entity.check_schema_hook = mocker.stub(
145-
"check_schema_hook"
146-
)
147-
online_feature_store_writer_on_entity.check_schema_hook.run = mocker.stub("run")
148-
online_feature_store_writer_on_entity.check_schema_hook.run.return_value = (
149-
mocked_stream_df
150-
)
151-
152144
sink = Sink(
153145
writers=[
154146
online_feature_store_writer,
@@ -171,7 +163,7 @@ def test_flush_streaming_df(self, feature_set, mocker):
171163
assert isinstance(handler, StreamingQuery)
172164

173165
def test_flush_with_multiple_online_writers(
174-
self, feature_set, feature_set_dataframe, mocker
166+
self, feature_set, feature_set_dataframe
175167
):
176168
"""Testing the flow of writing to a feature-set table and to an entity table."""
177169
# arrange
@@ -187,14 +179,6 @@ def test_flush_with_multiple_online_writers(
187179
write_to_entity=True
188180
)
189181

190-
online_feature_store_writer_on_entity.check_schema_hook = mocker.stub(
191-
"check_schema_hook"
192-
)
193-
online_feature_store_writer_on_entity.check_schema_hook.run = mocker.stub("run")
194-
online_feature_store_writer_on_entity.check_schema_hook.run.return_value = (
195-
feature_set_dataframe
196-
)
197-
198182
sink = Sink(
199183
writers=[online_feature_store_writer, online_feature_store_writer_on_entity]
200184
)

tests/unit/butterfree/load/writers/test_historical_feature_store_writer.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ def test_write(
2323
spark_client.write_table = mocker.stub("write_table")
2424
writer = HistoricalFeatureStoreWriter()
2525

26-
schema_dataframe = writer._create_partitions(feature_set_dataframe)
27-
writer.check_schema_hook = mocker.stub("check_schema_hook")
28-
writer.check_schema_hook.run = mocker.stub("run")
29-
writer.check_schema_hook.run.return_value = schema_dataframe
30-
3126
# when
3227
writer.write(
3328
feature_set=feature_set,
@@ -107,7 +102,6 @@ def test_write_in_debug_mode(
107102
historical_feature_set_dataframe,
108103
feature_set,
109104
spark_session,
110-
mocker,
111105
):
112106
# given
113107
spark_client = SparkClient()
@@ -310,12 +304,6 @@ def test_write_with_transform(
310304

311305
writer = HistoricalFeatureStoreWriter().with_(json_transform)
312306

313-
schema_dataframe = writer._create_partitions(feature_set_dataframe)
314-
json_dataframe = writer._apply_transformations(schema_dataframe)
315-
writer.check_schema_hook = mocker.stub("check_schema_hook")
316-
writer.check_schema_hook.run = mocker.stub("run")
317-
writer.check_schema_hook.run.return_value = json_dataframe
318-
319307
# when
320308
writer.write(
321309
feature_set=feature_set,

tests/unit/butterfree/load/writers/test_online_feature_store_writer.py

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ def test_write(
6868
spark_client.write_dataframe = mocker.stub("write_dataframe")
6969
writer = OnlineFeatureStoreWriter(cassandra_config)
7070

71-
writer.check_schema_hook = mocker.stub("check_schema_hook")
72-
writer.check_schema_hook.run = mocker.stub("run")
73-
writer.check_schema_hook.run.return_value = feature_set_dataframe
74-
7571
# when
7672
writer.write(feature_set, feature_set_dataframe, spark_client)
7773

@@ -104,10 +100,6 @@ def test_write_in_debug_mode(
104100
spark_client = SparkClient()
105101
writer = OnlineFeatureStoreWriter(debug_mode=True)
106102

107-
writer.check_schema_hook = mocker.stub("check_schema_hook")
108-
writer.check_schema_hook.run = mocker.stub("run")
109-
writer.check_schema_hook.run.return_value = feature_set_dataframe
110-
111103
# when
112104
writer.write(
113105
feature_set=feature_set,
@@ -132,10 +124,6 @@ def test_write_in_debug_and_stream_mode(self, feature_set, spark_session):
132124

133125
writer = OnlineFeatureStoreWriter(debug_mode=True)
134126

135-
writer.check_schema_hook = mocker.stub("check_schema_hook")
136-
writer.check_schema_hook.run = mocker.stub("run")
137-
writer.check_schema_hook.run.return_value = mocked_stream_df
138-
139127
# act
140128
handler = writer.write(
141129
feature_set=feature_set,
@@ -151,7 +139,7 @@ def test_write_in_debug_and_stream_mode(self, feature_set, spark_session):
151139
assert isinstance(handler, StreamingQuery)
152140

153141
@pytest.mark.parametrize("has_checkpoint", [True, False])
154-
def test_write_stream(self, feature_set, has_checkpoint, monkeypatch, mocker):
142+
def test_write_stream(self, feature_set, has_checkpoint, monkeypatch):
155143
# arrange
156144
spark_client = SparkClient()
157145
spark_client.write_stream = Mock()
@@ -174,10 +162,6 @@ def test_write_stream(self, feature_set, has_checkpoint, monkeypatch, mocker):
174162
writer = OnlineFeatureStoreWriter(cassandra_config)
175163
writer.filter_latest = Mock()
176164

177-
writer.check_schema_hook = mocker.stub("check_schema_hook")
178-
writer.check_schema_hook.run = mocker.stub("run")
179-
writer.check_schema_hook.run.return_value = dataframe
180-
181165
# act
182166
stream_handler = writer.write(feature_set, dataframe, spark_client)
183167

@@ -201,7 +185,7 @@ def test_get_db_schema(self, cassandra_config, test_feature_set, expected_schema
201185

202186
assert schema == expected_schema
203187

204-
def test_write_stream_on_entity(self, feature_set, monkeypatch, mocker):
188+
def test_write_stream_on_entity(self, feature_set, monkeypatch):
205189
"""Test write method with stream dataframe and write_to_entity enabled.
206190
207191
The main purpose of this test is assert the correct setup of stream checkpoint
@@ -224,10 +208,6 @@ def test_write_stream_on_entity(self, feature_set, monkeypatch, mocker):
224208

225209
writer = OnlineFeatureStoreWriter(write_to_entity=True)
226210

227-
writer.check_schema_hook = mocker.stub("check_schema_hook")
228-
writer.check_schema_hook.run = mocker.stub("run")
229-
writer.check_schema_hook.run.return_value = dataframe
230-
231211
# act
232212
stream_handler = writer.write(feature_set, dataframe, spark_client)
233213

@@ -256,10 +236,6 @@ def test_write_with_transform(
256236
spark_client.write_dataframe = mocker.stub("write_dataframe")
257237
writer = OnlineFeatureStoreWriter(cassandra_config).with_(json_transform)
258238

259-
writer.check_schema_hook = mocker.stub("check_schema_hook")
260-
writer.check_schema_hook.run = mocker.stub("run")
261-
writer.check_schema_hook.run.return_value = feature_set_dataframe
262-
263239
# when
264240
writer.write(feature_set, feature_set_dataframe, spark_client)
265241

@@ -293,10 +269,6 @@ def test_write_with_kafka_config(
293269
kafka_config = KafkaConfig()
294270
writer = OnlineFeatureStoreWriter(kafka_config).with_(json_transform)
295271

296-
writer.check_schema_hook = mocker.stub("check_schema_hook")
297-
writer.check_schema_hook.run = mocker.stub("run")
298-
writer.check_schema_hook.run.return_value = feature_set_dataframe
299-
300272
# when
301273
writer.write(feature_set, feature_set_dataframe, spark_client)
302274

@@ -320,10 +292,6 @@ def test_write_with_custom_kafka_config(
320292
json_transform
321293
)
322294

323-
custom_writer.check_schema_hook = mocker.stub("check_schema_hook")
324-
custom_writer.check_schema_hook.run = mocker.stub("run")
325-
custom_writer.check_schema_hook.run.return_value = feature_set_dataframe
326-
327295
# when
328296
custom_writer.write(feature_set, feature_set_dataframe, spark_client)
329297

0 commit comments

Comments
 (0)