|
| 1 | +import datetime |
1 | 2 | import importlib
|
2 | 3 | import inspect
|
3 | 4 | import os
|
4 | 5 | import pkgutil
|
5 | 6 | import sys
|
6 | 7 | from typing import Set
|
7 | 8 |
|
| 9 | +import boto3 |
8 | 10 | import setuptools
|
9 | 11 | import typer
|
| 12 | +from botocore.exceptions import ClientError |
10 | 13 |
|
11 |
| -from butterfree.clients import SparkClient |
12 | 14 | from butterfree.configs import environment
|
13 | 15 | from butterfree.configs.logger import __logger
|
14 |
| -from butterfree.extract.readers import FileReader |
15 | 16 | from butterfree.migrations.database_migration import ALLOWED_DATABASE
|
16 | 17 | from butterfree.pipelines import FeatureSetPipeline
|
17 | 18 |
|
@@ -106,30 +107,34 @@ class Migrate:
|
106 | 107 | pipelines: list of Feature Set Pipelines to use to migration.
|
107 | 108 | """
|
108 | 109 |
|
109 |
| - def __init__( |
110 |
| - self, pipelines: Set[FeatureSetPipeline], spark_client: SparkClient = None |
111 |
| - ) -> None: |
| 110 | + def __init__(self, pipelines: Set[FeatureSetPipeline],) -> None: |
112 | 111 | self.pipelines = pipelines
|
113 |
| - self.spark_client = spark_client or SparkClient() |
114 | 112 |
|
115 | 113 | def _send_logs_to_s3(self, file_local: bool) -> None:
|
116 | 114 | """Send all migration logs to S3."""
|
117 |
| - log_path = "../logging.json" |
118 |
| - |
119 |
| - file_reader = FileReader(id="name", path=log_path, format="json") |
120 |
| - df = file_reader.consume(self.spark_client) |
121 |
| - |
122 |
| - path = environment.get_variable("FEATURE_STORE_S3_BUCKET") |
123 |
| - |
124 |
| - self.spark_client.write_dataframe( |
125 |
| - dataframe=df, |
126 |
| - format_="json", |
127 |
| - mode="append", |
128 |
| - **{"path": f"s3a://{path}/logging"}, |
| 115 | + s3_client = boto3.client("s3") |
| 116 | + |
| 117 | + file_name = "../logging.json" |
| 118 | + timestamp = datetime.datetime.now() |
| 119 | + object_name = ( |
| 120 | + f"logs/migrate/" |
| 121 | + f"{timestamp.strftime('%Y-%m-%d')}" |
| 122 | + f"/logging-{timestamp.strftime('%H:%M:%S')}.json" |
129 | 123 | )
|
| 124 | + bucket = environment.get_variable("FEATURE_STORE_S3_BUCKET") |
| 125 | + |
| 126 | + try: |
| 127 | + s3_client.upload_file( |
| 128 | + file_name, |
| 129 | + bucket, |
| 130 | + object_name, |
| 131 | + ExtraArgs={"ACL": "bucket-owner-full-control"}, |
| 132 | + ) |
| 133 | + except ClientError: |
| 134 | + raise |
130 | 135 |
|
131 |
| - if not file_local and os.path.exists(log_path): |
132 |
| - os.remove(log_path) |
| 136 | + if not file_local and os.path.exists(file_name): |
| 137 | + os.remove(file_name) |
133 | 138 |
|
134 | 139 | def run(self, generate_logs: bool = False) -> None:
|
135 | 140 | """Construct and apply the migrations."""
|
|
0 commit comments