Skip to content

Commit 9afc39c

Browse files
authored
[MLOP-640] Create CLI with migrate command (#298)
1 parent aeb7999 commit 9afc39c

File tree

14 files changed

+229
-0
lines changed

14 files changed

+229
-0
lines changed

butterfree/_cli/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import logging
2+
3+
4+
def __logger(name: str) -> logging.Logger:
5+
format_ = "%(name)s:%(asctime)-15s:%(levelname)s:< %(message)s >"
6+
logging.basicConfig(format=format_, level=logging.INFO)
7+
return logging.getLogger(name)
8+
9+
10+
cli_logger = __logger("butterfree")

butterfree/_cli/main.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import typer
2+
3+
from butterfree._cli import migrate
4+
5+
app = typer.Typer()
6+
app.add_typer(migrate.app)
7+
8+
if __name__ == "__main__":
9+
app()

butterfree/_cli/migrate.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import importlib
2+
import inspect
3+
import pkgutil
4+
import sys
5+
from typing import Set
6+
7+
import setuptools
8+
import typer
9+
10+
from butterfree._cli import cli_logger
11+
from butterfree.pipelines import FeatureSetPipeline
12+
13+
app = typer.Typer()
14+
15+
16+
def __find_modules(path: str) -> Set[str]:
17+
modules = set()
18+
for pkg in setuptools.find_packages(path):
19+
modules.add(pkg)
20+
pkg_path = path + "/" + pkg.replace(".", "/")
21+
22+
# different usage for older python3 versions
23+
if sys.version_info.minor < 6:
24+
for _, name, is_pkg in pkgutil.iter_modules([pkg_path]):
25+
if not is_pkg:
26+
modules.add(pkg + "." + name)
27+
else:
28+
for info in pkgutil.iter_modules([pkg_path]):
29+
if not info.ispkg:
30+
modules.add(pkg + "." + info.name)
31+
return modules
32+
33+
34+
def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
35+
cli_logger.info(f"Looking for python modules under {path}...")
36+
modules = __find_modules(path)
37+
if not modules:
38+
return set()
39+
40+
cli_logger.info(f"Importing modules...")
41+
package = ".".join(path.strip("/").split("/"))
42+
imported = set(
43+
importlib.import_module(f".{name}", package=package) for name in modules
44+
)
45+
46+
cli_logger.info(f"Scanning modules...")
47+
content = {
48+
module: set(
49+
filter(
50+
lambda x: not x.startswith("__"), # filter "__any__" attributes
51+
set(item for item in dir(module)),
52+
)
53+
)
54+
for module in imported
55+
}
56+
57+
instances = set()
58+
for module, items in content.items():
59+
for item in items:
60+
value = getattr(module, item)
61+
if not value:
62+
continue
63+
64+
# filtering non-classes
65+
if not inspect.isclass(value):
66+
continue
67+
68+
# filtering abstractions
69+
if inspect.isabstract(value):
70+
continue
71+
72+
# filtering classes that doesn't inherit from FeatureSetPipeline
73+
if not issubclass(value, FeatureSetPipeline):
74+
continue
75+
76+
# filtering FeatureSetPipeline itself
77+
if value == FeatureSetPipeline:
78+
continue
79+
80+
instances.add(value)
81+
82+
cli_logger.info("Creating instances...")
83+
return set(value() for value in instances)
84+
85+
86+
PATH = typer.Argument(
87+
..., help="Full or relative path to where feature set pipelines are being defined.",
88+
)
89+
90+
91+
@app.callback()
92+
def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
93+
"""Scan and run database migrations for feature set pipelines defined under PATH.
94+
95+
Butterfree will scan a given path for classes that inherit from its
96+
FeatureSetPipeline and create dry instances of it to extract schema and writer
97+
information. By doing this, Butterfree can compare all defined feature set schemas
98+
to their current state on each sink being used.
99+
100+
All pipelines must be under python modules inside path, so we can dynamically
101+
import and instantiate them.
102+
"""
103+
# TODO call the Migration actor with all feature set pipeline objects
104+
return __fs_objects(path)

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ mdutils>=1.2.2,<2.0
33
pandas>=0.24,<1.1
44
parameters-validation>=1.1.5,<2.0
55
pyspark==3.*
6+
typer>=0.3,<0.4
7+
setuptools>=41,<42

setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@
3636
install_requires=requirements,
3737
extras_require={"h3": ["cmake==3.16.3", "h3==3.4.2"]},
3838
python_requires=">=3.7, <4",
39+
entry_points={"console_scripts": ["butterfree=butterfree._cli.main:app"]},
40+
include_package_data=True,
3941
)

tests/mocks/__init__.py

Whitespace-only changes.

tests/mocks/entities/__init__.py

Whitespace-only changes.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .first_pipeline import FirstPipeline
2+
3+
__all__ = ["FirstPipeline"]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from butterfree.constants.data_type import DataType
2+
from butterfree.extract import Source
3+
from butterfree.extract.readers import TableReader
4+
from butterfree.load import Sink
5+
from butterfree.load.writers import (
6+
HistoricalFeatureStoreWriter,
7+
OnlineFeatureStoreWriter,
8+
)
9+
from butterfree.pipelines import FeatureSetPipeline
10+
from butterfree.transform import FeatureSet
11+
from butterfree.transform.features import Feature, KeyFeature, TimestampFeature
12+
13+
14+
class FirstPipeline(FeatureSetPipeline):
15+
def __init__(self):
16+
super(FirstPipeline, self).__init__(
17+
source=Source(
18+
readers=[TableReader(id="t", database="db", table="table",)],
19+
query=f"select * from t", # noqa
20+
),
21+
feature_set=FeatureSet(
22+
name="first",
23+
entity="entity",
24+
description="description",
25+
features=[
26+
Feature(name="feature1", description="test", dtype=DataType.FLOAT,),
27+
Feature(
28+
name="feature2",
29+
description="another test",
30+
dtype=DataType.STRING,
31+
),
32+
],
33+
keys=[
34+
KeyFeature(
35+
name="id", description="identifier", dtype=DataType.BIGINT,
36+
)
37+
],
38+
timestamp=TimestampFeature(),
39+
),
40+
sink=Sink(
41+
writers=[HistoricalFeatureStoreWriter(), OnlineFeatureStoreWriter()]
42+
),
43+
)

tests/mocks/entities/second/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)