Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ def activate(cls, backend: str) -> None:

This is a no-op if the backend is already active.

Parameters
----------
backend : str
Backend to activate.

Raises
------
ValueError
Expand Down
4 changes: 2 additions & 2 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def move_to_cost(
api_cls_name: Optional[str],
operation: str,
arguments: MappingProxyType[str, Any],
) -> int:
) -> Optional[int]:
"""
Return the coercion costs of this qc to other_qc type.

Expand All @@ -353,7 +353,7 @@ def move_to_cost(

Returns
-------
int
Optional[int]
Cost of migrating the data from this qc to the other_qc or
None if the cost cannot be determined.
"""
Expand Down
122 changes: 69 additions & 53 deletions modin/core/storage_formats/base/query_compiler_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,25 @@
from modin.logging.metrics import emit_metric


def all_switchable_backends():
yield from filter(
# Disable automatically switching to these engines for now, because
# 1) _get_prepared_factory_for_backend() currently calls
# _initialize_engine(), which starts up the ray/dask/unidist
# processes
# 2) we can't decide to switch to unidist in the middle of execution.
lambda backend: backend not in ("Ray", "Unidist", "Dask"),
Backend.get_active_backends(),
def all_switchable_backends() -> list[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this cannot be part of envvars

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this configurable, but I just refactored this out from a function in the QC caster:

for backend in Backend.get_active_backends():
if backend in ("Ray", "Unidist", "Dask"):
# Disable automatically switching to these engines for now, because
# 1) _get_prepared_factory_for_backend() currently calls
# _initialize_engine(), which starts up the ray/dask/unidist
# processes
# 2) we can't decide to switch to unidist in the middle of execution.
continue

"""
Return a list of all currently active backends that are candidates for switching.

Returns
-------
list
A list of valid backends.
"""
return list(
filter(
# Disable automatically switching to these engines for now, because
# 1) _get_prepared_factory_for_backend() currently calls
# _initialize_engine(), which starts up the ray/dask/unidist
# processes
# 2) we can't decide to switch to unidist in the middle of execution.
lambda backend: backend not in ("Ray", "Unidist", "Dask"),
Backend.get_active_backends(),
)
)


Expand All @@ -52,7 +62,8 @@ class AggregatedBackendData:
----------
backend : str
String representing the backend name.
query_compiler : QueryCompiler
qc_cls : type[QueryCompiler]
The query compiler sub-class for this backend.
"""

def __init__(self, backend: str, qc_cls: type[BaseQueryCompiler]):
Expand All @@ -78,56 +89,54 @@ class BackendCostCalculator:
api_cls_name : str or None
Representing the class name of the function being called.
operation : str representing the operation being performed
query_compilers : list of query compiler arguments
preop_switch : bool
True if the operation is a pre-operation switch point.
"""

def __init__(
self,
*,
operation_arguments: MappingProxyType[str, Any],
api_cls_name: Optional[str],
operation: str,
query_compilers: list[BaseQueryCompiler],
preop_switch: bool,
):
from modin.core.execution.dispatching.factories.dispatcher import (
FactoryDispatcher,
)

self._backend_data: dict[str, AggregatedBackendData] = {
backend: AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)
for backend in all_switchable_backends()
}
self._qc_list: list[BaseQueryCompiler] = []
self._result_backend = None
self._api_cls_name = api_cls_name
self._op = operation
self._operation_arguments = operation_arguments
self._unswitchable_backends: set[str] = set()

def add_query_compiler(self, query_compiler: BaseQueryCompiler):
"""
Add a query compiler to be considered for casting.

Parameters
----------
query_compiler : QueryCompiler
"""
from modin.core.execution.dispatching.factories.dispatcher import (
FactoryDispatcher,
)

self._qc_list.append(query_compiler)
# If a QC's backend was not configured as active, we need to create an entry for it here.
backend = query_compiler.get_backend()
if backend not in self._backend_data:
self._backend_data[backend] = AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)
self._backend_data = {}
self._qc_list = query_compilers[:]
for query_compiler in query_compilers:
# If a QC's backend was not configured as active, we need to create an entry for it here.
backend = query_compiler.get_backend()
if backend not in self._backend_data:
self._backend_data[backend] = AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)
if preop_switch:
# Initialize backend data for any backends not found among query compiler arguments.
# Because we default to the first query compiler's backend if no cost information is available,
# this initialization must occur after iterating over query compiler arguments to ensure
# correct ordering in dictionary arguments.
for backend in all_switchable_backends():
if backend not in self._backend_data:
self._backend_data[backend] = AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)

def calculate(self) -> str:
"""
Expand All @@ -143,18 +152,25 @@ def calculate(self) -> str:
(all stay costs for data already on that backend)
+ (cost of moving all other query compilers to this backend)

If the operation is a registered pre-operation switch point, then the list of target backends
is ALL active backends. Otherwise, only backends found among the arguments are considered.
Post-operation switch points are not yet supported.

If the arguments contain no query compilers for a particular backend, then there are no stay
costs. In this scenario, we expect the move_to cost for this backend to outweigh the corresponding
stay costs for each query compiler's original backend.

If no argument QCs have cost information for each other (that is, move_to_cost and move_to_me_cost
returns None), then we attempt to move all data to the backend of the first QC.

We considered a few alternative algorithms for switching calculation:

1. Instead of considering all active backends, consider only backends found among input QCs.
This was used in the calculator's original implementation, as we figured transfer cost to
unrelated backends would outweigh any possible gains in computation speed. However, certain
pathological cases that significantly changed the size of input or output data (e.g. cross join)
would create situations where transferring data after the computation became prohibitively
expensive, so we chose to instead. --------------------
expensive, so we chose to allow switching to unrelated backends.
Additionally, the original implementation had a bug where stay_cost was only computed for the
_first_ query compiler of each backend, thus under-reporting the cost of computation for any
backend with multiple QCs present. In practice this very rarely affected the chosen result.
Expand All @@ -170,6 +186,11 @@ def calculate(self) -> str:
-------
str
A string representing a backend.

Raises
------
ValueError
Raises ValueError when the reported transfer cost for every backend exceeds its maximum cost.
"""
if self._result_backend is not None:
return self._result_backend
Expand Down Expand Up @@ -211,22 +232,18 @@ def calculate(self) -> str:
)
if cost is not None:
self._add_cost_data(backend_to, cost)
else:
# If move_to_me_cost and move_to_cost both returned none, then we cannot switch
# to this backend.
self._unswitchable_backends.add(backend_to)

min_value = None
for k, v in self._backend_data.items():
if v.cost > v.max_cost or k in self._unswitchable_backends:
if v.cost > v.max_cost:
continue
if min_value is None or min_value > v.cost:
min_value = v.cost
self._result_backend = k

if len(self._backend_data) > 1:
get_logger().info(
f"BackendCostCalculator results for {'pd' if self._api_cls_name else self._api_cls_name}.{self._op}: {self._calc_result_log(self._result_backend)}"
f"BackendCostCalculator results for {'pd' if self._api_cls_name is None else self._api_cls_name}.{self._op}: {self._calc_result_log(self._result_backend)}"
)
# Does not need to be secure, should not use system entropy
metrics_group = "%04x" % random.randrange(16**4)
Expand All @@ -252,7 +269,7 @@ def calculate(self) -> str:

if self._result_backend is None:
raise ValueError(
f"Cannot cast to any of the available backends, as the estimated cost is too high. Tried these backends: [{','.join(self._backend_data.keys())}]"
f"Cannot cast to any of the available backends, as the estimated cost is too high. Tried these backends: [{', '.join(self._backend_data.keys())}]"
)
return self._result_backend

Expand Down Expand Up @@ -294,8 +311,7 @@ def _calc_result_log(self, selected_backend: str) -> str:
str
String representation of calculator state.
"""
return ",".join(
return ", ".join(
f"{'*'+k if k is selected_backend else k}:{v.cost}/{v.max_cost}"
for k, v in self._backend_data.items()
if k not in self._unswitchable_backends
)
14 changes: 11 additions & 3 deletions modin/core/storage_formats/pandas/query_compiler_caster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,15 +1094,23 @@ def register_query_compilers(arg):
arguments=args_dict,
)
else:
preop_switch = (
name
in _CLASS_AND_BACKEND_TO_PRE_OP_SWITCH_METHODS[
BackendAndClassName(
backend=input_backend,
class_name=class_of_wrapped_fn,
)
]
)
calculator: BackendCostCalculator = BackendCostCalculator(
operation_arguments=args_dict,
api_cls_name=class_of_wrapped_fn,
operation=name,
query_compilers=input_query_compilers,
preop_switch=preop_switch,
)

for qc in input_query_compilers:
calculator.add_query_compiler(qc)

if pin_target_backend is None:
result_backend = calculator.calculate()
else:
Expand Down
Loading
Loading