Skip to content

Commit 3178ca4

Browse files
roelschrMayara Moromisato
authored andcommitted
[MLOP-640] Create CLI with migrate command (#298)
1 parent 1196a53 commit 3178ca4

File tree

6 files changed

+28
-137
lines changed

6 files changed

+28
-137
lines changed

butterfree/_cli/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import logging
2+
3+
4+
def __logger(name: str) -> logging.Logger:
5+
format_ = "%(name)s:%(asctime)-15s:%(levelname)s:< %(message)s >"
6+
logging.basicConfig(format=format_, level=logging.INFO)
7+
return logging.getLogger(name)
8+
9+
10+
cli_logger = __logger("butterfree")

butterfree/_cli/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from butterfree._cli import migrate
44

55
app = typer.Typer()
6-
app.add_typer(migrate.app, name="migrate")
6+
app.add_typer(migrate.app)
77

88
if __name__ == "__main__":
99
app()

butterfree/_cli/migrate.py

Lines changed: 10 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,16 @@
1-
import datetime
21
import importlib
32
import inspect
4-
import os
53
import pkgutil
64
import sys
75
from typing import Set
86

9-
import boto3
107
import setuptools
118
import typer
12-
from botocore.exceptions import ClientError
139

14-
from butterfree.configs import environment
15-
from butterfree.configs.logger import __logger
16-
from butterfree.migrations.database_migration import ALLOWED_DATABASE
10+
from butterfree._cli import cli_logger
1711
from butterfree.pipelines import FeatureSetPipeline
1812

19-
app = typer.Typer(help="Apply the automatic migrations in a database.")
20-
21-
logger = __logger("migrate", True)
13+
app = typer.Typer()
2214

2315

2416
def __find_modules(path: str) -> Set[str]:
@@ -40,19 +32,18 @@ def __find_modules(path: str) -> Set[str]:
4032

4133

4234
def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
43-
logger.info(f"Looking for python modules under {path}...")
35+
cli_logger.info(f"Looking for python modules under {path}...")
4436
modules = __find_modules(path)
4537
if not modules:
46-
logger.error(f"Path: {path} not found!")
4738
return set()
4839

49-
logger.info(f"Importing modules...")
40+
cli_logger.info(f"Importing modules...")
5041
package = ".".join(path.strip("/").split("/"))
5142
imported = set(
5243
importlib.import_module(f".{name}", package=package) for name in modules
5344
)
5445

55-
logger.info(f"Scanning modules...")
46+
cli_logger.info(f"Scanning modules...")
5647
content = {
5748
module: set(
5849
filter(
@@ -88,91 +79,17 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
8879

8980
instances.add(value)
9081

91-
logger.info("Creating instances...")
82+
cli_logger.info("Creating instances...")
9283
return set(value() for value in instances)
9384

9485

9586
PATH = typer.Argument(
9687
..., help="Full or relative path to where feature set pipelines are being defined.",
9788
)
9889

99-
GENERATE_LOGS = typer.Option(
100-
False, help="To generate the logs in local file 'logging.json'."
101-
)
102-
103-
DEBUG_MODE = typer.Option(
104-
False,
105-
help="To view the queries resulting from the migration, DON'T apply the migration.",
106-
)
107-
10890

109-
class Migrate:
110-
"""Execute migration operations in a Database based on pipeline Writer.
111-
112-
Attributes:
113-
pipelines: list of Feature Set Pipelines to use to migration.
114-
"""
115-
116-
def __init__(self, pipelines: Set[FeatureSetPipeline],) -> None:
117-
self.pipelines = pipelines
118-
119-
def _send_logs_to_s3(self, file_local: bool, debug_mode: bool) -> None:
120-
"""Send all migration logs to S3."""
121-
file_name = "../logging.json"
122-
123-
if not file_local and os.path.exists(file_name):
124-
s3_client = boto3.client("s3")
125-
126-
timestamp = datetime.datetime.now()
127-
128-
if debug_mode:
129-
object_name = (
130-
f"logs/migrate-debug-mode/"
131-
f"{timestamp.strftime('%Y-%m-%d')}"
132-
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
133-
)
134-
else:
135-
object_name = (
136-
f"logs/migrate/"
137-
f"{timestamp.strftime('%Y-%m-%d')}"
138-
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
139-
)
140-
bucket = environment.get_variable("FEATURE_STORE_S3_BUCKET")
141-
142-
try:
143-
s3_client.upload_file(
144-
file_name,
145-
bucket,
146-
object_name,
147-
ExtraArgs={"ACL": "bucket-owner-full-control"},
148-
)
149-
except ClientError:
150-
raise
151-
152-
os.remove(file_name)
153-
elif os.path.exists(file_name):
154-
print("Logs written to ../logging.json")
155-
else:
156-
print("No logs were generated.")
157-
158-
def run(self, generate_logs: bool = False, debug_mode: bool = False) -> None:
159-
"""Construct and apply the migrations."""
160-
for pipeline in self.pipelines:
161-
for writer in pipeline.sink.writers:
162-
db = writer.db_config.database
163-
if db == "cassandra":
164-
migration = ALLOWED_DATABASE[db]
165-
migration.apply_migration(pipeline.feature_set, writer, debug_mode)
166-
else:
167-
logger.warning(f"Butterfree not supporting {db} Migrations yet.")
168-
169-
self._send_logs_to_s3(generate_logs, debug_mode)
170-
171-
172-
@app.command("apply")
173-
def migrate(
174-
path: str = PATH, generate_logs: bool = GENERATE_LOGS, debug_mode: bool = DEBUG_MODE
175-
) -> Set[FeatureSetPipeline]:
91+
@app.callback()
92+
def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
17693
"""Scan and run database migrations for feature set pipelines defined under PATH.
17794
17895
Butterfree will scan a given path for classes that inherit from its
@@ -183,6 +100,5 @@ def migrate(
183100
All pipelines must be under python modules inside path, so we can dynamically
184101
import and instantiate them.
185102
"""
186-
pipe_set = __fs_objects(path)
187-
Migrate(pipe_set).run(generate_logs, debug_mode)
188-
return pipe_set
103+
# TODO call the Migration actor with all feature set pipeline objects
104+
return __fs_objects(path)

requirements.txt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,4 @@ pandas>=0.24,<1.1
44
parameters-validation>=1.1.5,<2.0
55
pyspark==3.*
66
typer>=0.3,<0.4
7-
setuptools>=41,<42
8-
typing-extensions==3.7.4.3
9-
boto3==1.17.*
7+
setuptools>=41,<42

setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@
3636
install_requires=requirements,
3737
extras_require={"h3": ["cmake==3.16.3", "h3==3.4.2"]},
3838
python_requires=">=3.7, <4",
39+
entry_points={"console_scripts": ["butterfree=butterfree._cli.main:app"]},
40+
include_package_data=True,
3941
)
Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,8 @@
1-
from unittest.mock import call
2-
3-
from typer.testing import CliRunner
4-
51
from butterfree._cli import migrate
6-
from butterfree._cli.main import app
7-
from butterfree.migrations.database_migration import CassandraMigration
82
from butterfree.pipelines import FeatureSetPipeline
93

10-
runner = CliRunner()
11-
12-
13-
class TestMigrate:
14-
def test_migrate_success(self, mocker):
15-
mocker.patch.object(migrate.Migrate, "run")
16-
all_fs = migrate.migrate("tests/mocks/entities/")
17-
assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs)
18-
assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"]
19-
20-
def test_migrate_run_methods(self, mocker):
21-
mocker.patch.object(CassandraMigration, "apply_migration")
22-
mocker.patch.object(migrate.Migrate, "_send_logs_to_s3")
23-
24-
all_fs = migrate.migrate("tests/mocks/entities/", False, False)
25-
26-
assert CassandraMigration.apply_migration.call_count == 2
27-
28-
cassandra_pairs = [
29-
call(pipe.feature_set, pipe.sink.writers[1], False) for pipe in all_fs
30-
]
31-
CassandraMigration.apply_migration.assert_has_calls(
32-
cassandra_pairs, any_order=True
33-
)
34-
migrate.Migrate._send_logs_to_s3.assert_called_once()
35-
36-
def test_app_cli(self):
37-
result = runner.invoke(app, "migrate")
38-
assert result.exit_code == 0
394

40-
def test_app_migrate(self, mocker):
41-
mocker.patch.object(migrate.Migrate, "run")
42-
result = runner.invoke(app, ["migrate", "apply", "tests/mocks/entities/"])
43-
assert result.exit_code == 0
5+
def test_migrate_success():
6+
all_fs = migrate.migrate("tests/mocks/entities/")
7+
assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs)
8+
assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"]

0 commit comments

Comments
 (0)