Skip to content

Commit 0d2f4bc

Browse files
FEAT-#7549: Emit metrics on auto-switch and casting behavior (#7550)
In both the caster and the calculator we emit some new metrics to track the switching behavior. This will be used by a metrics handler to implement debugging tools. Co-authored-by: Jonathan Shi <[email protected]>
1 parent 5bc025e commit 0d2f4bc

File tree

3 files changed

+108
-1
lines changed

3 files changed

+108
-1
lines changed

modin/core/storage_formats/base/query_compiler_calculator.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
"""
2121

2222
import logging
23+
import random
2324
from types import MappingProxyType
2425
from typing import Any, Optional
2526

2627
from modin.core.storage_formats.base.query_compiler import (
2728
BaseQueryCompiler,
2829
QCCoercionCost,
2930
)
31+
from modin.logging.metrics import emit_metric
3032

3133

3234
class AggregatedBackendData:
@@ -102,6 +104,8 @@ def calculate(self) -> str:
102104
"""
103105
if self._result_backend is not None:
104106
return self._result_backend
107+
if len(self._qc_list) == 1:
108+
return self._qc_list[0].get_backend()
105109
if len(self._qc_list) == 0:
106110
raise ValueError("No query compilers registered")
107111

@@ -146,12 +150,32 @@ def calculate(self) -> str:
146150
logging.info(
147151
f"BackendCostCalculator Results: {self._calc_result_log(self._result_backend)}"
148152
)
153+
# Does not need to be secure, should not use system entropy
154+
metrics_group = "%04x" % random.randrange(16**4)
155+
for qc in self._qc_list:
156+
max_shape = qc._max_shape()
157+
backend = qc.get_backend()
158+
emit_metric(
159+
f"hybrid.merge.candidate.{backend}.group.{metrics_group}.rows",
160+
max_shape[0],
161+
)
162+
emit_metric(
163+
f"hybrid.merge.candidate.{backend}.group.{metrics_group}.cols",
164+
max_shape[1],
165+
)
166+
for k, v in self._backend_data.items():
167+
emit_metric(
168+
f"hybrid.merge.candidate.{k}.group.{metrics_group}.cost", v.cost
169+
)
170+
emit_metric(
171+
f"hybrid.merge.decision.{self._result_backend}.group.{metrics_group}",
172+
1,
173+
)
149174

150175
if self._result_backend is None:
151176
raise ValueError(
152177
f"Cannot cast to any of the available backends, as the estimated cost is too high. Tried these backends: [{','.join(self._backend_data.keys())}]"
153178
)
154-
155179
return self._result_backend
156180

157181
def _add_cost_data(self, backend, cost):

modin/core/storage_formats/pandas/query_compiler_caster.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import functools
2323
import inspect
2424
import logging
25+
import random
2526
from abc import ABC, abstractmethod
2627
from collections import defaultdict, namedtuple
2728
from types import FunctionType, MappingProxyType, MethodType
@@ -42,6 +43,7 @@
4243
)
4344
from modin.error_message import ErrorMessage
4445
from modin.logging import disable_logging
46+
from modin.logging.metrics import emit_metric
4547
from modin.utils import sentinel
4648

4749
Fn = TypeVar("Fn", bound=Any)
@@ -724,6 +726,8 @@ def _get_backend_for_auto_switch(
724726
# backend.
725727
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
726728

729+
# Does not need to be secure, should not use system entropy
730+
metrics_group = "%04x" % random.randrange(16**4)
727731
starting_backend = input_qc.get_backend()
728732

729733
min_move_stay_delta = None
@@ -734,6 +738,23 @@ def _get_backend_for_auto_switch(
734738
operation=function_name,
735739
arguments=arguments,
736740
)
741+
data_max_shape = input_qc._max_shape()
742+
emit_metric(
743+
f"hybrid.auto.api.{class_of_wrapped_fn}.{function_name}.group.{metrics_group}",
744+
1,
745+
)
746+
emit_metric(
747+
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.stay_cost",
748+
stay_cost,
749+
)
750+
emit_metric(
751+
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.rows",
752+
data_max_shape[0],
753+
)
754+
emit_metric(
755+
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols",
756+
data_max_shape[1],
757+
)
737758
for backend in Backend.get_active_backends():
738759
if backend in ("Ray", "Unidist", "Dask"):
739760
# Disable automatically switching to these engines for now, because
@@ -778,16 +799,32 @@ def _get_backend_for_auto_switch(
778799
):
779800
min_move_stay_delta = move_stay_delta
780801
best_backend = backend
802+
emit_metric(
803+
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.move_to_cost",
804+
move_to_cost,
805+
)
806+
emit_metric(
807+
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.other_execute_cost",
808+
other_execute_cost,
809+
)
810+
emit_metric(
811+
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.delta",
812+
move_stay_delta,
813+
)
814+
781815
logging.info(
782816
f"After {class_of_wrapped_fn} function {function_name}, "
783817
+ f"considered moving to backend {backend} with "
784818
+ f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) "
785819
+ f", stay_cost {stay_cost}, and move-stay delta "
786820
+ f"{move_stay_delta}"
787821
)
822+
788823
if best_backend == starting_backend:
824+
emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 0)
789825
logging.info(f"Chose not to switch backends after operation {function_name}")
790826
else:
827+
emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 1)
791828
logging.info(f"Chose to move to backend {best_backend}")
792829
return best_backend
793830

modin/tests/pandas/native_df_interoperability/test_compiler_caster.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
register_function_for_post_op_switch,
4545
register_function_for_pre_op_switch,
4646
)
47+
from modin.logging.metrics import add_metric_handler, clear_metric_handler
4748
from modin.pandas.api.extensions import register_pd_accessor
4849
from modin.tests.pandas.utils import create_test_dfs, df_equals, eval_general
4950

@@ -1308,3 +1309,48 @@ class AQC(NativeQueryCompiler):
13081309

13091310
assert qc._engine_max_size() == oldmax
13101311
assert qc._transfer_threshold() == oldthresh
1312+
1313+
1314+
def test_cast_metrics(pico_df, cluster_df):
1315+
try:
1316+
count = 0
1317+
1318+
def test_handler(metric: str, value) -> None:
1319+
nonlocal count
1320+
if metric.startswith("modin.hybrid.merge"):
1321+
count += 1
1322+
1323+
add_metric_handler(test_handler)
1324+
df3 = pd.concat([pico_df, cluster_df], axis=1)
1325+
assert df3.get_backend() == "Cluster" # result should be on cluster
1326+
assert count == 7
1327+
finally:
1328+
clear_metric_handler(test_handler)
1329+
1330+
1331+
def test_switch_metrics(pico_df, cluster_df):
1332+
with backend_test_context(
1333+
test_backend="Big_Data_Cloud",
1334+
choices=("Big_Data_Cloud", "Small_Data_Local"),
1335+
):
1336+
try:
1337+
count = 0
1338+
1339+
def test_handler(metric: str, value) -> None:
1340+
nonlocal count
1341+
if metric.startswith("modin.hybrid.auto"):
1342+
count += 1
1343+
1344+
add_metric_handler(test_handler)
1345+
1346+
register_function_for_pre_op_switch(
1347+
class_name="DataFrame",
1348+
backend="Big_Data_Cloud",
1349+
method="describe",
1350+
)
1351+
df = pd.DataFrame([1] * 10)
1352+
assert df.get_backend() == "Big_Data_Cloud"
1353+
df.describe()
1354+
assert count == 8
1355+
finally:
1356+
clear_metric_handler(test_handler)

0 commit comments

Comments
 (0)