Skip to content

Commit 8760449

Browse files
moromimayMayara Moromisato
authored andcommitted
[MLOP-702] Debug mode for Automate Migration (#322)
* Create flag debug-mode. * Fix tests. * Fix migrate test.
1 parent b2dc28a commit 8760449

File tree

4 files changed

+44
-20
lines changed

4 files changed

+44
-20
lines changed

butterfree/_cli/migrate.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
101101
False, help="To generate the logs in local file 'logging.json'."
102102
)
103103

104+
DEBUG_MODE = typer.Option(
105+
False,
106+
help="To view the queries resulting from the migration, DON'T apply the migration.",
107+
)
108+
104109

105110
class Migrate:
106111
"""Execute migration operations in a Database based on pipeline Writer.
@@ -112,19 +117,27 @@ class Migrate:
112117
def __init__(self, pipelines: Set[FeatureSetPipeline],) -> None:
113118
self.pipelines = pipelines
114119

115-
def _send_logs_to_s3(self, file_local: bool) -> None:
120+
def _send_logs_to_s3(self, file_local: bool, debug_mode: bool) -> None:
116121
"""Send all migration logs to S3."""
117122
file_name = "../logging.json"
118123

119124
if not file_local and os.path.exists(file_name):
120125
s3_client = boto3.client("s3")
121126

122127
timestamp = datetime.datetime.now()
123-
object_name = (
124-
f"logs/migrate/"
125-
f"{timestamp.strftime('%Y-%m-%d')}"
126-
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
127-
)
128+
129+
if debug_mode:
130+
object_name = (
131+
f"logs/migrate-debug-mode/"
132+
f"{timestamp.strftime('%Y-%m-%d')}"
133+
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
134+
)
135+
else:
136+
object_name = (
137+
f"logs/migrate/"
138+
f"{timestamp.strftime('%Y-%m-%d')}"
139+
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
140+
)
128141
bucket = environment.get_variable("FEATURE_STORE_S3_BUCKET")
129142

130143
try:
@@ -143,23 +156,23 @@ def _send_logs_to_s3(self, file_local: bool) -> None:
143156
json_data = json.load(json_f)
144157
print(json_data)
145158

146-
def run(self, generate_logs: bool = False) -> None:
159+
def run(self, generate_logs: bool = False, debug_mode: bool = False) -> None:
147160
"""Construct and apply the migrations."""
148161
for pipeline in self.pipelines:
149162
for writer in pipeline.sink.writers:
150163
db = writer.db_config.database
151164
if db == "cassandra":
152165
migration = ALLOWED_DATABASE[db]
153-
migration.apply_migration(pipeline.feature_set, writer)
166+
migration.apply_migration(pipeline.feature_set, writer, debug_mode)
154167
else:
155168
logger.warning(f"Butterfree not supporting {db} Migrations yet.")
156169

157-
self._send_logs_to_s3(generate_logs)
170+
self._send_logs_to_s3(generate_logs, debug_mode)
158171

159172

160173
@app.command("apply")
161174
def migrate(
162-
path: str = PATH, generate_logs: bool = GENERATE_LOGS,
175+
path: str = PATH, generate_logs: bool = GENERATE_LOGS, debug_mode: bool = DEBUG_MODE
163176
) -> Set[FeatureSetPipeline]:
164177
"""Scan and run database migrations for feature set pipelines defined under PATH.
165178
@@ -172,5 +185,5 @@ def migrate(
172185
import and instantiate them.
173186
"""
174187
pipe_set = __fs_objects(path)
175-
Migrate(pipe_set).run(generate_logs)
188+
Migrate(pipe_set).run(generate_logs, debug_mode)
176189
return pipe_set

butterfree/migrations/database_migration/database_migration.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,15 @@ def _get_schema(
260260
db_schema = []
261261
return db_schema
262262

263-
def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
263+
def apply_migration(
264+
self, feature_set: FeatureSet, writer: Writer, debug_mode: bool
265+
) -> None:
264266
"""Apply the migration in the respective database.
265267
266268
Args:
267269
feature_set: the feature set.
268270
writer: the writer being used to load the feature set.
271+
debug_mode: if active, it brings up the queries generated.
269272
"""
270273
logger.info(f"Migrating feature set: {feature_set.name}")
271274

@@ -280,8 +283,16 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
280283
fs_schema, table_name, db_schema, writer.write_to_entity
281284
)
282285

283-
for q in queries:
284-
logger.info(f"Applying this query: {q} ...")
285-
self._client.sql(q)
286+
if debug_mode:
287+
print(
288+
"#### DEBUG MODE ###\n"
289+
f"Feature set: {feature_set.name}\n"
290+
"Queries:\n"
291+
f"{queries}"
292+
)
293+
else:
294+
for q in queries:
295+
logger.info(f"Applying this query: {q} ...")
296+
self._client.sql(q)
286297

287-
logger.info(f"Feature Set migration finished successfully.")
298+
logger.info(f"Feature Set migration finished successfully.")

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from setuptools import find_packages, setup
22

33
__package_name__ = "butterfree"
4-
__version__ = "1.2.0.dev16"
4+
__version__ = "1.2.0.dev17"
55
__repository_url__ = "https://github.com/quintoandar/butterfree"
66

77
with open("requirements.txt") as f:

tests/unit/butterfree/_cli/test_migrate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@ def test_migrate_success(self, mocker):
1717
assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs)
1818
assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"]
1919

20-
def test_migrate_all_pairs(self, mocker):
20+
def test_migrate_run_methods(self, mocker):
2121
mocker.patch.object(CassandraMigration, "apply_migration")
2222
mocker.patch.object(migrate.Migrate, "_send_logs_to_s3")
2323

24-
all_fs = migrate.migrate("tests/mocks/entities/")
24+
all_fs = migrate.migrate("tests/mocks/entities/", False, False)
2525

2626
assert CassandraMigration.apply_migration.call_count == 2
2727

2828
cassandra_pairs = [
29-
call(pipe.feature_set, pipe.sink.writers[1]) for pipe in all_fs
29+
call(pipe.feature_set, pipe.sink.writers[1], False) for pipe in all_fs
3030
]
3131
CassandraMigration.apply_migration.assert_has_calls(
3232
cassandra_pairs, any_order=True

0 commit comments

Comments
 (0)