|
19 | 19 | from typing import Any, List, Optional
|
20 | 20 |
|
21 | 21 | import click
|
| 22 | +import pandas as pd |
22 | 23 | import yaml
|
23 | 24 | from bigtree import Node
|
24 | 25 | from colorama import Fore, Style
|
@@ -537,6 +538,175 @@ def feature_view_list(ctx: click.Context, tags: list[str]):
|
537 | 538 | print(tabulate(table, headers=["NAME", "ENTITIES", "TYPE"], tablefmt="plain"))
|
538 | 539 |
|
539 | 540 |
|
| 541 | +@cli.group(name="features") |
| 542 | +def features_cmd(): |
| 543 | + """ |
| 544 | + Access features |
| 545 | + """ |
| 546 | + pass |
| 547 | + |
| 548 | + |
| 549 | +@features_cmd.command(name="list") |
| 550 | +@click.option( |
| 551 | + "--output", |
| 552 | + type=click.Choice(["table", "json"], case_sensitive=False), |
| 553 | + default="table", |
| 554 | + show_default=True, |
| 555 | + help="Output format", |
| 556 | +) |
| 557 | +@click.pass_context |
| 558 | +def features_list(ctx: click.Context, output: str): |
| 559 | + """ |
| 560 | + List all features |
| 561 | + """ |
| 562 | + store = create_feature_store(ctx) |
| 563 | + feature_views = [ |
| 564 | + *store.list_batch_feature_views(), |
| 565 | + *store.list_on_demand_feature_views(), |
| 566 | + *store.list_stream_feature_views(), |
| 567 | + ] |
| 568 | + feature_list = [] |
| 569 | + for fv in feature_views: |
| 570 | + for feature in fv.features: |
| 571 | + feature_list.append([feature.name, fv.name, str(feature.dtype)]) |
| 572 | + |
| 573 | + if output == "json": |
| 574 | + json_output = [ |
| 575 | + {"feature_name": fn, "feature_view": fv, "dtype": dt} |
| 576 | + for fv, fn, dt in feature_list |
| 577 | + ] |
| 578 | + click.echo(json.dumps(json_output, indent=4)) |
| 579 | + else: |
| 580 | + from tabulate import tabulate |
| 581 | + |
| 582 | + click.echo( |
| 583 | + tabulate( |
| 584 | + feature_list, |
| 585 | + headers=["Feature", "Feature View", "Data Type"], |
| 586 | + tablefmt="plain", |
| 587 | + ) |
| 588 | + ) |
| 589 | + |
| 590 | + |
| 591 | +@features_cmd.command("describe") |
| 592 | +@click.argument("feature_name", type=str) |
| 593 | +@click.pass_context |
| 594 | +def describe_feature(ctx: click.Context, feature_name: str): |
| 595 | + """ |
| 596 | + Describe a specific feature by name |
| 597 | + """ |
| 598 | + store = create_feature_store(ctx) |
| 599 | + feature_views = [ |
| 600 | + *store.list_batch_feature_views(), |
| 601 | + *store.list_on_demand_feature_views(), |
| 602 | + *store.list_stream_feature_views(), |
| 603 | + ] |
| 604 | + |
| 605 | + feature_details = [] |
| 606 | + for fv in feature_views: |
| 607 | + for feature in fv.features: |
| 608 | + if feature.name == feature_name: |
| 609 | + feature_details.append( |
| 610 | + { |
| 611 | + "Feature Name": feature.name, |
| 612 | + "Feature View": fv.name, |
| 613 | + "Data Type": str(feature.dtype), |
| 614 | + "Description": getattr(feature, "description", "N/A"), |
| 615 | + "Online Store": getattr(fv, "online", "N/A"), |
| 616 | + "Source": json.loads(str(getattr(fv, "batch_source", "N/A"))), |
| 617 | + } |
| 618 | + ) |
| 619 | + if not feature_details: |
| 620 | + click.echo(f"Feature '{feature_name}' not found in any feature view.") |
| 621 | + return |
| 622 | + |
| 623 | + click.echo(json.dumps(feature_details, indent=4)) |
| 624 | + |
| 625 | + |
| 626 | +@cli.command("get-online-features") |
| 627 | +@click.option( |
| 628 | + "--entities", |
| 629 | + "-e", |
| 630 | + type=str, |
| 631 | + multiple=True, |
| 632 | + required=True, |
| 633 | + help="Entity key-value pairs (e.g., driver_id=1001)", |
| 634 | +) |
| 635 | +@click.option( |
| 636 | + "--features", |
| 637 | + "-f", |
| 638 | + multiple=True, |
| 639 | + required=True, |
| 640 | + help="Features to retrieve. (e.g.,feature-view:feature-name) ex: driver_hourly_stats:conv_rate", |
| 641 | +) |
| 642 | +@click.pass_context |
| 643 | +def get_online_features(ctx: click.Context, entities: List[str], features: List[str]): |
| 644 | + """ |
| 645 | + Fetch online feature values for a given entity ID |
| 646 | + """ |
| 647 | + store = create_feature_store(ctx) |
| 648 | + entity_dict: dict[str, List[str]] = {} |
| 649 | + for entity in entities: |
| 650 | + try: |
| 651 | + key, value = entity.split("=") |
| 652 | + if key not in entity_dict: |
| 653 | + entity_dict[key] = [] |
| 654 | + entity_dict[key].append(value) |
| 655 | + except ValueError: |
| 656 | + click.echo(f"Invalid entity format: {entity}. Use key=value format.") |
| 657 | + return |
| 658 | + entity_rows = [ |
| 659 | + dict(zip(entity_dict.keys(), values)) for values in zip(*entity_dict.values()) |
| 660 | + ] |
| 661 | + feature_vector = store.get_online_features( |
| 662 | + features=list(features), |
| 663 | + entity_rows=entity_rows, |
| 664 | + ).to_dict() |
| 665 | + |
| 666 | + click.echo(json.dumps(feature_vector, indent=4)) |
| 667 | + |
| 668 | + |
| 669 | +@cli.command(name="get-historical-features") |
| 670 | +@click.option( |
| 671 | + "--dataframe", |
| 672 | + "-d", |
| 673 | + type=str, |
| 674 | + required=True, |
| 675 | + help='JSON string containing entities and timestamps. Example: \'[{"event_timestamp": "2025-03-29T12:00:00", "driver_id": 1001}]\'', |
| 676 | +) |
| 677 | +@click.option( |
| 678 | + "--features", |
| 679 | + "-f", |
| 680 | + multiple=True, |
| 681 | + required=True, |
| 682 | + help="Features to retrieve. feature-view:feature-name ex: driver_hourly_stats:conv_rate", |
| 683 | +) |
| 684 | +@click.pass_context |
| 685 | +def get_historical_features(ctx: click.Context, dataframe: str, features: List[str]): |
| 686 | + """ |
| 687 | + Fetch historical feature values for a given entity ID |
| 688 | + """ |
| 689 | + store = create_feature_store(ctx) |
| 690 | + try: |
| 691 | + entity_list = json.loads(dataframe) |
| 692 | + if not isinstance(entity_list, list): |
| 693 | + raise ValueError("Entities must be a list of dictionaries.") |
| 694 | + |
| 695 | + entity_df = pd.DataFrame(entity_list) |
| 696 | + entity_df["event_timestamp"] = pd.to_datetime(entity_df["event_timestamp"]) |
| 697 | + |
| 698 | + except Exception as e: |
| 699 | + click.echo(f"Error parsing entities JSON: {e}", err=True) |
| 700 | + return |
| 701 | + |
| 702 | + feature_vector = store.get_historical_features( |
| 703 | + entity_df=entity_df, |
| 704 | + features=list(features), |
| 705 | + ).to_df() |
| 706 | + |
| 707 | + click.echo(feature_vector.to_json(orient="records", indent=4)) |
| 708 | + |
| 709 | + |
540 | 710 | @cli.group(name="on-demand-feature-views")
|
541 | 711 | def on_demand_feature_views_cmd():
|
542 | 712 | """
|
|
0 commit comments