Skip to content

Commit 8077d86

Browse files
authored
[MLOP-639] Track logs in S3 (#306)
* Apply tracking logs and logging config. * Adjusts in CLI and logging.conf. * Some adjusts. * Change version to generate new dev package * Fix version. * Apply style. * Add new assert in the migrate unit test.
1 parent 3dcd975 commit 8077d86

File tree

13 files changed

+181
-61
lines changed

13 files changed

+181
-61
lines changed

butterfree/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
11
"""Module docstring example, following Google's docstring style."""
2+
import logging.config
3+
import os
4+
import sys
5+
6+
sys.path.insert(0, os.path.abspath("."))
7+
8+
logging.config.fileConfig(fname="butterfree/logging.conf")

butterfree/_cli/__init__.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +0,0 @@
1-
import logging
2-
3-
4-
def __logger(name: str) -> logging.Logger:
5-
format_ = "%(name)s:%(asctime)-15s:%(levelname)s:< %(message)s >"
6-
logging.basicConfig(format=format_, level=logging.INFO)
7-
return logging.getLogger(name)
8-
9-
10-
cli_logger = __logger("butterfree")

butterfree/_cli/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from butterfree._cli import migrate
44

55
app = typer.Typer()
6-
app.add_typer(migrate.app)
6+
app.add_typer(migrate.app, name="migrate")
77

88
if __name__ == "__main__":
99
app()

butterfree/_cli/migrate.py

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
import importlib
22
import inspect
3+
import logging
4+
import os
35
import pkgutil
46
import sys
57
from typing import Set
68

79
import setuptools
810
import typer
911

10-
from butterfree._cli import cli_logger
12+
from butterfree.clients import SparkClient
13+
from butterfree.configs import environment
14+
from butterfree.extract.readers import FileReader
1115
from butterfree.migrations.database_migration import ALLOWED_DATABASE
1216
from butterfree.pipelines import FeatureSetPipeline
1317

14-
app = typer.Typer()
18+
app = typer.Typer(help="Apply the automatic migrations in a database.")
19+
20+
logger = logging.getLogger("migrate")
1521

1622

1723
def __find_modules(path: str) -> Set[str]:
@@ -33,18 +39,18 @@ def __find_modules(path: str) -> Set[str]:
3339

3440

3541
def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
36-
cli_logger.info(f"Looking for python modules under {path}...")
42+
logger.info(f"Looking for python modules under {path}...")
3743
modules = __find_modules(path)
3844
if not modules:
3945
return set()
4046

41-
cli_logger.info(f"Importing modules...")
47+
logger.info(f"Importing modules...")
4248
package = ".".join(path.strip("/").split("/"))
4349
imported = set(
4450
importlib.import_module(f".{name}", package=package) for name in modules
4551
)
4652

47-
cli_logger.info(f"Scanning modules...")
53+
logger.info(f"Scanning modules...")
4854
content = {
4955
module: set(
5056
filter(
@@ -80,14 +86,18 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
8086

8187
instances.add(value)
8288

83-
cli_logger.info("Creating instances...")
89+
logger.info("Creating instances...")
8490
return set(value() for value in instances)
8591

8692

8793
PATH = typer.Argument(
8894
..., help="Full or relative path to where feature set pipelines are being defined.",
8995
)
9096

97+
GENERATE_LOGS = typer.Option(
98+
False, help="To generate the logs in local file 'logging.json'."
99+
)
100+
91101

92102
class Migrate:
93103
"""Execute migration operations in a Database based on pipeline Writer.
@@ -96,23 +106,43 @@ class Migrate:
96106
pipelines: list of Feature Set Pipelines to use to migration.
97107
"""
98108

99-
def __init__(self, pipelines: Set[FeatureSetPipeline]) -> None:
109+
def __init__(
110+
self, pipelines: Set[FeatureSetPipeline], spark_client: SparkClient = None
111+
) -> None:
100112
self.pipelines = pipelines
113+
self.spark_client = spark_client or SparkClient()
101114

102-
def _send_logs_to_s3(self) -> None:
115+
def _send_logs_to_s3(self, file_local: bool) -> None:
103116
"""Send all migration logs to S3."""
104-
pass
117+
file_reader = FileReader(id="name", path="logs/logging.json", format="json")
118+
df = file_reader.consume(self.spark_client)
119+
120+
path = environment.get_variable("FEATURE_STORE_S3_BUCKET")
105121

106-
def run(self) -> None:
122+
self.spark_client.write_dataframe(
123+
dataframe=df,
124+
format_="json",
125+
mode="append",
126+
**{"path": f"s3a://{path}/logging"},
127+
)
128+
129+
if not file_local:
130+
os.rmdir("logs/logging.json")
131+
132+
def run(self, generate_logs: bool) -> None:
107133
"""Construct and apply the migrations."""
108134
for pipeline in self.pipelines:
109135
for writer in pipeline.sink.writers:
110136
migration = ALLOWED_DATABASE[writer.db_config.database]
111137
migration.apply_migration(pipeline.feature_set, writer)
112138

139+
self._send_logs_to_s3(generate_logs)
140+
113141

114-
@app.callback()
115-
def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
142+
@app.command("apply")
143+
def migrate(
144+
path: str = PATH, generate_logs: bool = GENERATE_LOGS,
145+
) -> Set[FeatureSetPipeline]:
116146
"""Scan and run database migrations for feature set pipelines defined under PATH.
117147
118148
Butterfree will scan a given path for classes that inherit from its
@@ -124,5 +154,5 @@ def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
124154
import and instantiate them.
125155
"""
126156
pipe_set = __fs_objects(path)
127-
Migrate(pipe_set).run()
157+
Migrate(pipe_set).run(generate_logs)
128158
return pipe_set

butterfree/configs/db/metastore_config.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,29 @@ def get_path_with_partitions(self, key: str, dataframe: DataFrame) -> List:
122122

123123
def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
124124
"""Translate feature set spark schema to the corresponding database."""
125-
return schema
125+
spark_sql_mapping = {
126+
"TimestampType": "TIMESTAMP",
127+
"BinaryType": "BINARY",
128+
"BooleanType": "BOOLEAN",
129+
"DateType": "DATE",
130+
"DecimalType": "DECIMAL",
131+
"DoubleType": "DOUBLE",
132+
"FloatType": "FLOAT",
133+
"IntegerType": "INT",
134+
"LongType": "BIGINT",
135+
"StringType": "STRING",
136+
"ArrayType(LongType,true)": "ARRAY<BIGINT>",
137+
"ArrayType(StringType,true)": "ARRAY<STRING>",
138+
"ArrayType(FloatType,true)": "ARRAY<FLOAT>",
139+
}
140+
sql_schema = []
141+
for features in schema:
142+
sql_schema.append(
143+
{
144+
"column_name": features["column_name"],
145+
"type": spark_sql_mapping[str(features["type"])],
146+
"primary_key": features["primary_key"],
147+
}
148+
)
149+
150+
return sql_schema

butterfree/constants/data_type.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,21 @@
2121
class DataType(Enum):
2222
"""Holds constants for data types within Butterfree."""
2323

24-
TIMESTAMP = (TimestampType(), "timestamp")
25-
BINARY = (BinaryType(), "boolean")
26-
BOOLEAN = (BooleanType(), "boolean")
27-
DATE = (DateType(), "timestamp")
28-
DECIMAL = (DecimalType(), "decimal")
29-
DOUBLE = (DoubleType(), "double")
30-
FLOAT = (FloatType(), "float")
31-
INTEGER = (IntegerType(), "int")
32-
BIGINT = (LongType(), "bigint")
33-
STRING = (StringType(), "text")
34-
ARRAY_BIGINT = (ArrayType(LongType()), "frozen<list<bigint>>")
35-
ARRAY_STRING = (ArrayType(StringType()), "frozen<list<text>>")
36-
ARRAY_FLOAT = (ArrayType(FloatType()), "frozen<list<float>>")
24+
TIMESTAMP = (TimestampType(), "timestamp", "TIMESTAMP")
25+
BINARY = (BinaryType(), "boolean", "BINARY")
26+
BOOLEAN = (BooleanType(), "boolean", "BOOLEAN")
27+
DATE = (DateType(), "timestamp", "DATE")
28+
DECIMAL = (DecimalType(), "decimal", "DECIMAL")
29+
DOUBLE = (DoubleType(), "double", "DOUBLE")
30+
FLOAT = (FloatType(), "float", "FLOAT")
31+
INTEGER = (IntegerType(), "int", "INT")
32+
BIGINT = (LongType(), "bigint", "BIGINT")
33+
STRING = (StringType(), "text", "STRING")
34+
ARRAY_BIGINT = (ArrayType(LongType()), "frozen<list<bigint>>", "ARRAY<BIGINT>")
35+
ARRAY_STRING = (ArrayType(StringType()), "frozen<list<text>>", "ARRAY<STRING>")
36+
ARRAY_FLOAT = (ArrayType(FloatType()), "frozen<list<float>>", "ARRAY<FLOAT>")
3737

38-
def __init__(self, spark: PySparkDataType, cassandra: str) -> None:
38+
def __init__(self, spark: PySparkDataType, cassandra: str, spark_sql: str) -> None:
3939
self.spark = spark
4040
self.cassandra = cassandra
41+
self.spark_sql = spark_sql

butterfree/logging.conf

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
[loggers]
2+
keys=root,cli,migrate,database_migrate
3+
4+
[handlers]
5+
keys=consoleHandler,file
6+
7+
[formatters]
8+
keys=simpleFormatter,jsonFormatter
9+
10+
[logger_root]
11+
level=DEBUG
12+
handlers=consoleHandler
13+
14+
[logger_cli]
15+
level=DEBUG
16+
handlers=file
17+
qualname=cli
18+
propagate=0
19+
20+
[logger_migrate]
21+
level=DEBUG
22+
handlers=file
23+
qualname=migrate
24+
propagate=0
25+
26+
[logger_database_migrate]
27+
level=DEBUG
28+
handlers=file
29+
qualname=database_migrate
30+
propagate=0
31+
32+
[handler_consoleHandler]
33+
class=StreamHandler
34+
level=DEBUG
35+
formatter=simpleFormatter
36+
args=(sys.stdout,)
37+
38+
[handler_file]
39+
class=FileHandler
40+
level=DEBUG
41+
formatter=jsonFormatter
42+
args=('logs/logging.json', "a")
43+
44+
[formatter_simpleFormatter]
45+
format=%(name)s:%(asctime)-15s:%(levelname)s:%(message)s
46+
datefmt=
47+
class=logging.Formatter
48+
49+
[formatter_jsonFormatter]
50+
format={"name": "%(name)s", "timestamp": "%(asctime)-15s", "level": "%(levelname)s", "message": "%(message)s"}
51+
datefmt=
52+
class=logging.Formatter

butterfree/migrations/database_migration/database_migration.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from butterfree.load.writers.writer import Writer
1010
from butterfree.transform import FeatureSet
1111

12+
logger = logging.getLogger("database_migrate")
13+
1214

1315
@dataclass
1416
class Diff:
@@ -154,7 +156,7 @@ def _get_queries(
154156
)
155157
queries.append(alter_column_types_query)
156158
if alter_key_items:
157-
logging.info("This operation is not supported by Spark.")
159+
logger.info("This operation is not supported by Spark.")
158160

159161
return queries
160162

@@ -261,7 +263,7 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
261263
feature_set: the feature set.
262264
writer: the writer being used to load the feature set.
263265
"""
264-
logging.info(f"Migrating feature set: {feature_set.name}")
266+
logger.info(f"Migrating feature set: {feature_set.name}")
265267

266268
table_name = (
267269
feature_set.name if not writer.write_to_entity else feature_set.entity
@@ -275,7 +277,7 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
275277
)
276278

277279
for q in queries:
278-
logging.info(f"Applying {q}...")
280+
logger.info(f"Applying this query: {q} ...")
279281
self._client.sql(q)
280282

281-
logging.info(f"Feature Set migration finished successfully.")
283+
logger.info(f"Feature Set migration finished successfully.")

logs/logging.json

Whitespace-only changes.

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ pandas>=0.24,<1.1
44
parameters-validation>=1.1.5,<2.0
55
pyspark==3.*
66
typer>=0.3,<0.4
7-
setuptools>=41,<42
7+
setuptools>=41,<42
8+
typing-extensions==3.7.4.3

0 commit comments

Comments
 (0)