Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
False, help="To generate the logs in local file 'logging.json'."
)

DEBUG_MODE = typer.Option(
False,
help="To view the queries resulting from the migration, DON'T apply the migration.",
)


class Migrate:
"""Execute migration operations in a Database based on pipeline Writer.
Expand All @@ -112,19 +117,27 @@ class Migrate:
def __init__(self, pipelines: Set[FeatureSetPipeline],) -> None:
self.pipelines = pipelines

def _send_logs_to_s3(self, file_local: bool) -> None:
def _send_logs_to_s3(self, file_local: bool, debug_mode: bool) -> None:
"""Send all migration logs to S3."""
file_name = "../logging.json"

if not file_local and os.path.exists(file_name):
s3_client = boto3.client("s3")

timestamp = datetime.datetime.now()
object_name = (
f"logs/migrate/"
f"{timestamp.strftime('%Y-%m-%d')}"
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
)

if debug_mode:
object_name = (
f"logs/migrate-debug-mode/"
f"{timestamp.strftime('%Y-%m-%d')}"
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
)
else:
object_name = (
f"logs/migrate/"
f"{timestamp.strftime('%Y-%m-%d')}"
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
)
bucket = environment.get_variable("FEATURE_STORE_S3_BUCKET")

try:
Expand All @@ -143,23 +156,23 @@ def _send_logs_to_s3(self, file_local: bool) -> None:
json_data = json.load(json_f)
print(json_data)

def run(self, generate_logs: bool = False) -> None:
def run(self, generate_logs: bool = False, debug_mode: bool = False) -> None:
"""Construct and apply the migrations."""
for pipeline in self.pipelines:
for writer in pipeline.sink.writers:
db = writer.db_config.database
if db == "cassandra":
migration = ALLOWED_DATABASE[db]
migration.apply_migration(pipeline.feature_set, writer)
migration.apply_migration(pipeline.feature_set, writer, debug_mode)
else:
logger.warning(f"Butterfree not supporting {db} Migrations yet.")

self._send_logs_to_s3(generate_logs)
self._send_logs_to_s3(generate_logs, debug_mode)


@app.command("apply")
def migrate(
path: str = PATH, generate_logs: bool = GENERATE_LOGS,
path: str = PATH, generate_logs: bool = GENERATE_LOGS, debug_mode: bool = DEBUG_MODE
) -> Set[FeatureSetPipeline]:
"""Scan and run database migrations for feature set pipelines defined under PATH.

Expand All @@ -172,5 +185,5 @@ def migrate(
import and instantiate them.
"""
pipe_set = __fs_objects(path)
Migrate(pipe_set).run(generate_logs)
Migrate(pipe_set).run(generate_logs, debug_mode)
return pipe_set
21 changes: 16 additions & 5 deletions butterfree/migrations/database_migration/database_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,15 @@ def _get_schema(
db_schema = []
return db_schema

def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
def apply_migration(
self, feature_set: FeatureSet, writer: Writer, debug_mode: bool = False
) -> None:
"""Apply the migration in the respective database.

Args:
feature_set: the feature set.
writer: the writer being used to load the feature set.
debug_mode: if active, it brings up the queries generated.
"""
logger.info(f"Migrating feature set: {feature_set.name}")

Expand All @@ -280,8 +283,16 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
fs_schema, table_name, db_schema, writer.write_to_entity
)

for q in queries:
logger.info(f"Applying this query: {q} ...")
self._client.sql(q)
if debug_mode:
print(
"#### DEBUG MODE ###\n"
f"Feature set: {feature_set.name}\n"
"Queries:\n"
f"{queries}"
)
else:
for q in queries:
logger.info(f"Applying this query: {q} ...")
self._client.sql(q)

logger.info(f"Feature Set migration finished successfully.")
logger.info(f"Feature Set migration finished successfully.")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.2.0.dev16"
__version__ = "1.2.0.dev17"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down