Skip to content

Commit febcb2a

Browse files
committed
Some adjusts.
1 parent 5c448f2 commit febcb2a

File tree

4 files changed

+48
-35
lines changed

4 files changed

+48
-35
lines changed

butterfree/migrations/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""Holds available migrations."""
2-
from butterfree.migrations.migrator import Migrator
2+
from butterfree.migrations.migrate import Migrate
33

4-
__all__ = ["Migrator"]
4+
__all__ = ["Migrate"]

butterfree/migrations/database_migration/database_migration.py

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Migration entity."""
2-
2+
import logging
33
from abc import ABC, abstractmethod
4+
from itertools import filterfalse
45
from typing import Any, Dict, List, Optional
56

67
from butterfree.transform import FeatureSet
@@ -45,26 +46,17 @@ def _validate_schema(
4546
if not db_schema:
4647
return fs_schema
4748

48-
for feature in fs_schema:
49+
diff_list = list(filterfalse(lambda x: x in db_schema, fs_schema))
50+
51+
for feature in diff_list:
4952
matching_features = [
5053
x for x in db_schema if x["column_name"] == feature["column_name"]
5154
]
5255

53-
if not matching_features:
54-
mismatches.append(feature)
55-
continue
56-
57-
if feature["type"] == matching_features[0]["type"]:
58-
continue
56+
if matching_features:
57+
raise ValueError(f"The {feature['column_name']} can't be changed.")
5958

60-
mismatches.append(
61-
{
62-
"column_name": feature["column_name"],
63-
"type": feature["type"],
64-
"primary_key": feature["primary_key"],
65-
"old_type": matching_features[0]["type"],
66-
}
67-
)
59+
mismatches.append(feature)
6860

6961
return None if mismatches == [] else mismatches
7062

@@ -76,12 +68,7 @@ def _get_schema(
7668
Returns:
7769
Schema object.
7870
"""
79-
try:
80-
db_schema = db_client.get_schema(table_name)
81-
except RuntimeError:
82-
db_schema = None
83-
84-
return db_schema
71+
pass
8572

8673
def run(self, feature_set: FeatureSet) -> None:
8774
"""Runs the migrations.
@@ -90,4 +77,4 @@ def run(self, feature_set: FeatureSet) -> None:
9077
feature_set: the feature set.
9178
9279
"""
93-
self._apply_migration(feature_set)
80+
pass

butterfree/migrations/migrator.py renamed to butterfree/migrations/migrate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from butterfree.transform import FeatureSet
77

88

9-
class Migrator:
9+
class Migrate:
1010
"""Execute migration operations in a Database based on pipeline Writer.
1111
1212
Attributes:
@@ -37,5 +37,5 @@ def migration(self) -> None:
3737
self._parse_feature_set_pipeline(pipeline) for pipeline in self.pipelines
3838
]
3939

40-
for migrate, fs in migration_list:
41-
migrate.run(fs)
40+
for migration, fs in migration_list:
41+
migration.run(fs)

tests/unit/butterfree/migrations/database_migration/test_cassandra_migration.py renamed to tests/unit/butterfree/migrations/database_migration/test_database_migration.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import pytest
12
from pyspark.sql.types import DoubleType, FloatType, LongType, TimestampType
23

34
from butterfree.migrations.database_migration import CassandraMigration
45

56

6-
class TestCassandraMigration:
7+
class TestDatabaseMigration:
78
def test_validate_schema(self, mocker):
89
fs_schema = [
910
{"column_name": "id", "type": LongType(), "primary_key": True},
@@ -34,11 +35,6 @@ def test_validate_schema_diff(self, mocker):
3435
fs_schema = [
3536
{"column_name": "id", "type": LongType(), "primary_key": True},
3637
{"column_name": "timestamp", "type": TimestampType(), "primary_key": False},
37-
{
38-
"column_name": "feature1__avg_over_1_week_rolling_windows",
39-
"type": FloatType(),
40-
"primary_key": False,
41-
},
4238
{"column_name": "new_feature", "type": FloatType(), "primary_key": False},
4339
]
4440

@@ -61,15 +57,45 @@ def test_validate_schema_diff(self, mocker):
6157
m._client = mocker.stub("client")
6258
schema = m._validate_schema(fs_schema, db_schema)
6359
assert schema == [
60+
{"column_name": "new_feature", "type": FloatType(), "primary_key": False},
61+
]
62+
63+
def test_validate_schema_diff_invalid(self, mocker):
64+
fs_schema = [
65+
{"column_name": "id", "type": LongType(), "primary_key": True},
66+
{"column_name": "timestamp", "type": TimestampType(), "primary_key": False},
6467
{
6568
"column_name": "feature1__avg_over_1_week_rolling_windows",
6669
"type": FloatType(),
6770
"primary_key": False,
68-
"old_type": DoubleType(),
6971
},
7072
{"column_name": "new_feature", "type": FloatType(), "primary_key": False},
7173
]
7274

75+
db_schema = [
76+
{"column_name": "id", "type": LongType(), "primary_key": True},
77+
{"column_name": "timestamp", "type": TimestampType(), "primary_key": False},
78+
{
79+
"column_name": "feature1__avg_over_1_week_rolling_windows",
80+
"type": DoubleType(),
81+
"primary_key": False,
82+
},
83+
{
84+
"column_name": "feature1__avg_over_2_days_rolling_windows",
85+
"type": DoubleType(),
86+
"primary_key": False,
87+
},
88+
]
89+
90+
with pytest.raises(
91+
ValueError,
92+
match="The feature1__avg_over_1_week_rolling_windows can't be changed.",
93+
):
94+
m = CassandraMigration()
95+
m._client = mocker.stub("client")
96+
97+
m._validate_schema(fs_schema, db_schema)
98+
7399
def test_validate_schema_without_db(self, mocker):
74100
fs_schema = [
75101
{"column_name": "id", "type": LongType(), "primary_key": True},

0 commit comments

Comments
 (0)