Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions butterfree/_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import logging


def __logger(name: str) -> logging.Logger:
format_ = "%(name)s:%(asctime)-15s:%(levelname)s:< %(message)s >"
logging.basicConfig(format=format_, level=logging.INFO)
return logging.getLogger(name)


cli_logger = __logger("butterfree")
9 changes: 9 additions & 0 deletions butterfree/_cli/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import typer

from butterfree._cli import migrate

app = typer.Typer()
app.add_typer(migrate.app)

if __name__ == "__main__":
app()
104 changes: 104 additions & 0 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import importlib
import inspect
import pkgutil
import sys
from typing import Set

import setuptools
import typer

from butterfree._cli import cli_logger
from butterfree.pipelines import FeatureSetPipeline

app = typer.Typer()


def __find_modules(path: str) -> Set[str]:
modules = set()
for pkg in setuptools.find_packages(path):
modules.add(pkg)
pkg_path = path + "/" + pkg.replace(".", "/")

# different usage for older python3 versions
if sys.version_info.minor < 6:
for _, name, is_pkg in pkgutil.iter_modules([pkg_path]):
if not is_pkg:
modules.add(pkg + "." + name)
else:
for info in pkgutil.iter_modules([pkg_path]):
if not info.ispkg:
modules.add(pkg + "." + info.name)
return modules


def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
cli_logger.info(f"Looking for python modules under {path}...")
modules = __find_modules(path)
if not modules:
return set()

cli_logger.info(f"Importing modules...")
package = ".".join(path.strip("/").split("/"))
imported = set(
importlib.import_module(f".{name}", package=package) for name in modules
)

cli_logger.info(f"Scanning modules...")
content = {
module: set(
filter(
lambda x: not x.startswith("__"), # filter "__any__" attributes
set(item for item in dir(module)),
)
)
for module in imported
}

instances = set()
for module, items in content.items():
for item in items:
value = getattr(module, item)
if not value:
continue

# filtering non-classes
if not inspect.isclass(value):
continue

# filtering abstractions
if inspect.isabstract(value):
continue

# filtering classes that doesn't inherit from FeatureSetPipeline
if not issubclass(value, FeatureSetPipeline):
continue

# filtering FeatureSetPipeline itself
if value == FeatureSetPipeline:
continue

instances.add(value)

cli_logger.info("Creating instances...")
return set(value() for value in instances)


PATH = typer.Argument(
..., help="Full or relative path to where feature set pipelines are being defined.",
)


@app.callback()
def migrate(path: str = PATH) -> Set[FeatureSetPipeline]:
"""Scan and run database migrations for feature set pipelines defined under PATH.

Butterfree will scan a given path for classes that inherit from its
FeatureSetPipeline and create dry instances of it to extract schema and writer
information. By doing this, Butterfree can compare all defined feature set schemas
to their current state on each sink being used.

All pipelines must be under python modules inside path, so we can dynamically
import and instantiate them.
"""
# TODO call the Migration actor with all feature set pipeline objects
return __fs_objects(path)
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ mdutils>=1.2.2,<2.0
pandas>=0.24,<1.1
parameters-validation>=1.1.5,<2.0
pyspark==3.*
typer>=0.3,<0.4
setuptools>=41,<42
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@
install_requires=requirements,
extras_require={"h3": ["cmake==3.16.3", "h3==3.4.2"]},
python_requires=">=3.7, <4",
entry_points={"console_scripts": ["butterfree=butterfree._cli.main:app"]},
include_package_data=True,
)
Empty file added tests/mocks/__init__.py
Empty file.
Empty file.
3 changes: 3 additions & 0 deletions tests/mocks/entities/first/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .first_pipeline import FirstPipeline

__all__ = ["FirstPipeline"]
43 changes: 43 additions & 0 deletions tests/mocks/entities/first/first_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from butterfree.constants.data_type import DataType
from butterfree.extract import Source
from butterfree.extract.readers import TableReader
from butterfree.load import Sink
from butterfree.load.writers import (
HistoricalFeatureStoreWriter,
OnlineFeatureStoreWriter,
)
from butterfree.pipelines import FeatureSetPipeline
from butterfree.transform import FeatureSet
from butterfree.transform.features import Feature, KeyFeature, TimestampFeature


class FirstPipeline(FeatureSetPipeline):
def __init__(self):
super(FirstPipeline, self).__init__(
source=Source(
readers=[TableReader(id="t", database="db", table="table",)],
query=f"select * from t", # noqa
),
feature_set=FeatureSet(
name="first",
entity="entity",
description="description",
features=[
Feature(name="feature1", description="test", dtype=DataType.FLOAT,),
Feature(
name="feature2",
description="another test",
dtype=DataType.STRING,
),
],
keys=[
KeyFeature(
name="id", description="identifier", dtype=DataType.BIGINT,
)
],
timestamp=TimestampFeature(),
),
sink=Sink(
writers=[HistoricalFeatureStoreWriter(), OnlineFeatureStoreWriter()]
),
)
Empty file.
3 changes: 3 additions & 0 deletions tests/mocks/entities/second/deeper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .second_pipeline import SecondPipeline

__all__ = ["SecondPipeline"]
45 changes: 45 additions & 0 deletions tests/mocks/entities/second/deeper/second_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from butterfree.constants.data_type import DataType
from butterfree.extract import Source
from butterfree.extract.readers import TableReader
from butterfree.load import Sink
from butterfree.load.writers import (
HistoricalFeatureStoreWriter,
OnlineFeatureStoreWriter,
)
from butterfree.pipelines import FeatureSetPipeline
from butterfree.transform import FeatureSet
from butterfree.transform.features import Feature, KeyFeature, TimestampFeature


class SecondPipeline(FeatureSetPipeline):
def __init__(self):
super(SecondPipeline, self).__init__(
source=Source(
readers=[TableReader(id="t", database="db", table="table",)],
query=f"select * from t", # noqa
),
feature_set=FeatureSet(
name="second",
entity="entity",
description="description",
features=[
Feature(
name="feature1", description="test", dtype=DataType.STRING,
),
Feature(
name="feature2",
description="another test",
dtype=DataType.FLOAT,
),
],
keys=[
KeyFeature(
name="id", description="identifier", dtype=DataType.BIGINT,
)
],
timestamp=TimestampFeature(),
),
sink=Sink(
writers=[HistoricalFeatureStoreWriter(), OnlineFeatureStoreWriter()]
),
)
Empty file.
8 changes: 8 additions & 0 deletions tests/unit/butterfree/_cli/test_migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from butterfree._cli import migrate
from butterfree.pipelines import FeatureSetPipeline


def test_migrate_success():
all_fs = migrate.migrate("tests/mocks/entities/")
assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs)
assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"]