Skip to content

Commit 3d93a09

Browse files
roelschrmoromimay
andauthored
[MLOP-647] [MLOP-646] Apply migrations (#300)
* add apply migration method * add test apply migration * add migrate actor with tests * mypy compliant * fix test interaction with mocked object * Rebase and some adjusts. Co-authored-by: Mayara Moromisato <[email protected]>
1 parent e8fc0da commit 3d93a09

File tree

17 files changed

+179
-72
lines changed

17 files changed

+179
-72
lines changed

butterfree/_cli/migrate.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import typer
99

1010
from butterfree._cli import cli_logger
11+
from butterfree.migrations.database_migration import ALLOWED_DATABASE
1112
from butterfree.pipelines import FeatureSetPipeline
1213

1314
app = typer.Typer()
@@ -88,6 +89,28 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
8889
)
8990

9091

92+
class Migrate:
93+
"""Execute migration operations in a Database based on pipeline Writer.
94+
95+
Attributes:
96+
pipelines: list of Feature Set Pipelines to use to migration.
97+
"""
98+
99+
def __init__(self, pipelines: Set[FeatureSetPipeline]) -> None:
100+
self.pipelines = pipelines
101+
102+
def _send_logs_to_s3(self) -> None:
103+
"""Send all migration logs to S3."""
104+
pass
105+
106+
def run(self) -> None:
107+
"""Construct and apply the migrations."""
108+
for pipeline in self.pipelines:
109+
for writer in pipeline.sink.writers:
110+
migration = ALLOWED_DATABASE[writer.db_config.database]
111+
migration.apply_migration(pipeline.feature_set, writer)
112+
113+
91114
@app.callback()
92115
def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
93116
"""Scan and run database migrations for feature set pipelines defined under PATH.
@@ -100,5 +123,6 @@ def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
100123
All pipelines must be under python modules inside path, so we can dynamically
101124
import and instantiate them.
102125
"""
103-
# TODO call the Migration actor with all feature set pipeline objects
104-
return __fs_objects(path)
126+
pipe_set = __fs_objects(path)
127+
Migrate(pipe_set).run()
128+
return pipe_set

butterfree/configs/db/abstract_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
class AbstractWriteConfig(ABC):
88
"""Abstract class for database write configurations with spark."""
99

10+
@property
11+
@abstractmethod
12+
def database(self) -> str:
13+
"""Database name."""
14+
1015
@property
1116
@abstractmethod
1217
def mode(self) -> Any:

butterfree/configs/db/cassandra_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ def __init__(
5050
self.stream_output_mode = stream_output_mode
5151
self.stream_checkpoint_path = stream_checkpoint_path
5252

53+
@property
54+
def database(self) -> str:
55+
"""Database name."""
56+
return "cassandra"
57+
5358
@property
5459
def username(self) -> Optional[str]:
5560
"""Username used in connection to Cassandra DB."""

butterfree/configs/db/kafka_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ def __init__(
4141
self.stream_output_mode = stream_output_mode
4242
self.stream_checkpoint_path = stream_checkpoint_path
4343

44+
@property
45+
def database(self) -> str:
46+
"""Database name."""
47+
return "kafka"
48+
4449
@property
4550
def kafka_topic(self) -> Optional[str]:
4651
"""Kafka topic name."""

butterfree/configs/db/metastore_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ def __init__(
3535
self.format_ = format_
3636
self.file_system = file_system
3737

38+
@property
39+
def database(self) -> str:
40+
"""Database name."""
41+
return "metastore"
42+
3843
@property
3944
def path(self) -> Optional[str]:
4045
"""Bucket name."""

butterfree/load/writers/historical_feature_store_writer.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Holds the Historical Feature Store writer class."""
22

33
import os
4-
from typing import Any, Union
4+
from typing import Any
55

66
from pyspark.sql.dataframe import DataFrame
77
from pyspark.sql.functions import dayofmonth, month, year
@@ -106,16 +106,17 @@ class HistoricalFeatureStoreWriter(Writer):
106106

107107
def __init__(
108108
self,
109-
db_config: Union[AbstractWriteConfig, MetastoreConfig] = None,
109+
db_config: AbstractWriteConfig = None,
110110
database: str = None,
111111
num_partitions: int = None,
112112
validation_threshold: float = DEFAULT_VALIDATION_THRESHOLD,
113113
debug_mode: bool = False,
114114
interval_mode: bool = False,
115115
check_schema_hook: Hook = None,
116116
):
117-
super(HistoricalFeatureStoreWriter, self).__init__(debug_mode, interval_mode)
118-
self.db_config = db_config or MetastoreConfig()
117+
super(HistoricalFeatureStoreWriter, self).__init__(
118+
db_config or MetastoreConfig(), debug_mode, interval_mode
119+
)
119120
self.database = database or environment.get_variable(
120121
"FEATURE_STORE_HISTORICAL_DATABASE"
121122
)

butterfree/load/writers/online_feature_store_writer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@ class OnlineFeatureStoreWriter(Writer):
8080

8181
def __init__(
8282
self,
83-
db_config: Union[AbstractWriteConfig, CassandraConfig] = None,
83+
db_config: AbstractWriteConfig = None,
8484
debug_mode: bool = False,
8585
write_to_entity: bool = False,
8686
interval_mode: bool = False,
8787
check_schema_hook: Hook = None,
8888
):
89-
super(OnlineFeatureStoreWriter, self).__init__(debug_mode, interval_mode)
90-
self.db_config = db_config or CassandraConfig()
91-
self.write_to_entity = write_to_entity
89+
super(OnlineFeatureStoreWriter, self).__init__(
90+
db_config or CassandraConfig(), debug_mode, interval_mode, write_to_entity
91+
)
9292
self.check_schema_hook = check_schema_hook
9393

9494
@staticmethod

butterfree/load/writers/writer.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from pyspark.sql.dataframe import DataFrame
88

99
from butterfree.clients import SparkClient
10+
from butterfree.configs.db import AbstractWriteConfig
1011
from butterfree.hooks import HookableComponent
1112
from butterfree.transform import FeatureSet
1213

@@ -19,11 +20,19 @@ class Writer(ABC, HookableComponent):
1920
2021
"""
2122

22-
def __init__(self, debug_mode: bool = False, interval_mode: bool = False) -> None:
23+
def __init__(
24+
self,
25+
db_config: AbstractWriteConfig,
26+
debug_mode: bool = False,
27+
interval_mode: bool = False,
28+
write_to_entity: bool = False,
29+
) -> None:
2330
super().__init__()
31+
self.db_config = db_config
2432
self.transformations: List[Dict[str, Any]] = []
2533
self.debug_mode = debug_mode
2634
self.interval_mode = interval_mode
35+
self.write_to_entity = write_to_entity
2736

2837
def with_(
2938
self, transformer: Callable[..., DataFrame], *args: Any, **kwargs: Any

butterfree/migrations/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1 @@
11
"""Holds available migrations."""
2-
from butterfree.migrations.migrate import Migrate
3-
4-
__all__ = ["Migrate"]

butterfree/migrations/database_migration/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,9 @@
99
)
1010

1111
__all__ = ["CassandraMigration", "MetastoreMigration", "Diff"]
12+
13+
14+
ALLOWED_DATABASE = {
15+
"cassandra": CassandraMigration(),
16+
"metastore": MetastoreMigration(),
17+
}

0 commit comments

Comments
 (0)