Skip to content

Commit 102b74e

Browse files
sfc-gh-jkewsfc-gh-joshi
authored andcommitted
Sync with upstream metrics emit in modin-project/modin#7550
1 parent b18c4fe commit 102b74e

File tree

3 files changed

+39
-88
lines changed

3 files changed

+39
-88
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*PANDAS_REQUIREMENTS,
4545
### For private customer releases
4646
#"modin @ git+https://github.com/sfc-gh-joshi/modin.git@joshi/hybrid-do-not-release", # TODO point at main
47-
"modin @ git+https://github.com/modin-project/modin.git@hybrid-client-alpha",
47+
"modin @ git+https://github.com/modin-project/modin.git@main",
4848
"tqdm",
4949
]
5050
DEVELOPMENT_REQUIREMENTS = [

src/snowflake/snowpark/modin/plugin/_internal/telemetry.py

Lines changed: 28 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -622,10 +622,8 @@ def snowpark_pandas_api_watcher(api_name: str, _time: Union[int, float]) -> None
622622

623623

624624
@cached(cache={})
625-
def get_user_source_location(mode, group) -> str:
626-
625+
def get_user_source_location(group) -> str:
627626
import inspect
628-
629627
stack = inspect.stack()
630628
frame_before_snowpandas = None
631629
location = "<unknown>"
@@ -641,95 +639,42 @@ def get_user_source_location(mode, group) -> str:
641639
and frame_before_snowpandas.code_context is not None
642640
):
643641
location = frame_before_snowpandas.code_context[0].replace("\n", "")
644-
return {"mode": mode, "group": group, "location": location}
642+
return { 'group': group, 'source': location }
645643

646644

647645
def get_hybrid_switch_log():
648646
global hybrid_switch_log
649647
return hybrid_switch_log.copy()
650648

651649

652-
def add_to_hybrid_switch_log(metrics: dict):
650+
def hybrid_metrics_watcher(metric_name: str, metric_value: Union[int, float]) -> None:
653651
global hybrid_switch_log
654-
try:
655-
mode = metrics["mode"]
656-
source = get_user_source_location(mode, metrics["group"])["location"]
657-
if len(source) > 40:
658-
source = source[0:17] + "..." + source[-20:-1] + source[-1]
659-
hybrid_switch_log = native_pd.concat(
660-
[
661-
hybrid_switch_log,
662-
native_pd.DataFrame(
663-
{
664-
"source": [source],
665-
"mode": [metrics["mode"]],
666-
"group": [metrics["group"]],
667-
"metric": [metrics["metric"]],
668-
"submetric": [metrics["submetric"] or None],
669-
"value": [metrics["value"]],
670-
"from": [metrics["from"] if "from" in metrics else None],
671-
"to": [metrics["to"] if "to" in metrics else None],
672-
}
673-
),
674-
]
675-
)
676-
except Exception as e:
677-
print(f"Exception: {type(e).__name__} - {e}")
678-
679-
680-
def hybrid_metrics_watcher(metric_name: str, value: Union[int, float]) -> None:
652+
mode = None
681653
if metric_name.startswith("modin.hybrid.auto"):
682-
tokens = metric_name.split(".")
683-
from_engine = None
684-
to_engine = None
685-
metric = None
686-
group = None
687-
submetric = None
688-
if len(tokens) >= 9:
689-
from_engine = tokens[4]
690-
to_engine = tokens[6]
691-
metric = tokens[7]
692-
if len(tokens) == 9:
693-
group = tokens[8]
694-
if len(tokens) == 10:
695-
submetric = tokens[8]
696-
group = tokens[9]
697-
add_to_hybrid_switch_log(
698-
{
699-
"mode": "single",
700-
"from": from_engine,
701-
"to": to_engine,
702-
"metric": metric,
703-
"submetric": submetric,
704-
"group": group,
705-
"value": value,
706-
}
707-
)
708-
if metric_name.startswith("modin.hybrid.cast"):
709-
tokens = metric_name.split(".")
710-
to_engine = None
711-
metric = None
712-
submetric = None
713-
group = None
714-
if len(tokens) == 7 and tokens[3] == "to" and tokens[5] == "cost":
715-
to_engine = tokens[4]
716-
group = tokens[6]
717-
metric = "cost"
718-
if len(tokens) == 6 and tokens[3] == "decision":
719-
submetric = tokens[4]
720-
group = tokens[5]
721-
metric = "decision"
722-
add_to_hybrid_switch_log(
723-
{
724-
"mode": "merge",
725-
"to": to_engine,
726-
"metric": metric,
727-
"submetric": submetric,
728-
"group": group,
729-
"value": value,
730-
}
731-
)
732-
654+
mode = "auto"
655+
elif metric_name.startswith("modin.hybrid.merge"):
656+
mode = "merge"
657+
else:
658+
return
659+
tokens = metric_name.split(".")[3:]
660+
entry = {'mode': mode}
661+
while len(tokens) >= 2:
662+
key = tokens.pop(0)
663+
if key == 'api':
664+
value = tokens.pop(0) + '.' + tokens.pop(0)
665+
else:
666+
value = tokens.pop(0)
667+
entry[key] = value
668+
669+
if len(tokens) == 1:
670+
key = tokens.pop(0)
671+
entry[key] = metric_value
672+
673+
source = get_user_source_location(entry['group'])
674+
entry['source'] = source['source']
675+
new_row = native_pd.DataFrame(entry, index=[0])
676+
hybrid_switch_log = native_pd.concat([hybrid_switch_log, new_row])
677+
733678

734679
def connect_modin_telemetry() -> None:
735680
MetricsMode.enable()

src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,16 @@ def _snowpark_pandas_obj_check(obj: Union[DataFrame, Series]):
3939

4040
@register_pd_accessor("explain")
4141
def explain(last=20) -> native_pd.DataFrame:
42-
stats = get_hybrid_switch_log().tail(last)
43-
stats = stats.drop_duplicates().fillna(value=' ')
44-
stats2 = stats.reset_index()
45-
return stats2.set_index([ 'source', 'mode', 'from', 'to', 'metric']).drop(columns=['index', 'group'])
42+
stats = get_hybrid_switch_log()
43+
stats = stats.reset_index(drop=True).sort_index().reset_index()
44+
stats['decision'] = stats.groupby(['group']).bfill().ffill()['decision']
45+
stats['api'] = stats.groupby(['group']).bfill().ffill()['api']
46+
stats = stats.groupby('group', sort=False).apply(include_groups=False,func=lambda x: x.melt(ignore_index=False, id_vars=['source', 'api', 'decision', 'candidate', 'index', 'mode'], var_name='metric', value_vars=['stay_cost', 'move_to_cost', 'other_execute_cost', 'delta', 'rows', 'cols'])).dropna()
47+
stats = stats.set_index(['source', 'decision', 'api', 'index'])
48+
stats = stats.sort_index(level='index')
49+
stats['value'] = stats['value'].astype(int)
50+
stats.reset_index().drop(columns='index').set_index(['source', 'decision', 'api'])
51+
return stats.tail(last)
4652

4753
@register_pd_accessor("read_snowflake")
4854
def read_snowflake(

0 commit comments

Comments
 (0)