Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,22 @@ def set_active_backends(cls, new_choices: tuple) -> None:
)
cls.choices = new_choices

@classmethod
def activate(cls, backend: str) -> None:
"""
Activate a backend that was previously registered.

This is a no-op if the backend is already active.

Raises
------
ValueError
Raises a ValueError if backend was not previously registered.
"""
if backend not in cls._BACKEND_TO_EXECUTION:
raise ValueError(f"Unknown backend '{backend}' is not registered.")
cls.choices = (*cls.choices, backend)

@classmethod
def get_active_backends(cls) -> tuple[str, ...]:
"""
Expand Down Expand Up @@ -570,6 +586,10 @@ def get_execution_for_backend(cls, backend: str) -> Execution:
)
normalized_value = cls.normalize(backend)
if normalized_value not in cls.choices:
if normalized_value in cls._BACKEND_TO_EXECUTION:
raise ValueError(
f"Backend '{backend}' is not currently active. Activate it first with Backend.activate('{backend})'."
)
backend_choice_string = ", ".join(f"'{choice}'" for choice in cls.choices)
raise ValueError(
f"Unknown backend '{backend}'. Available backends are: "
Expand Down
3 changes: 2 additions & 1 deletion modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ def _transfer_threshold(cls) -> int:
return cls._TRANSFER_THRESHOLD

@disable_logging
def max_cost(self) -> int:
@classmethod
def max_cost(cls) -> int:
"""
Return the max cost allowed by this engine.

Expand Down
144 changes: 106 additions & 38 deletions modin/core/storage_formats/base/query_compiler_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from types import MappingProxyType
from typing import Any, Optional

from modin.config import Backend
from modin.core.storage_formats.base.query_compiler import (
BaseQueryCompiler,
QCCoercionCost,
Expand All @@ -31,6 +32,18 @@
from modin.logging.metrics import emit_metric


def all_switchable_backends():
yield from filter(
# Disable automatically switching to these engines for now, because
# 1) _get_prepared_factory_for_backend() currently calls
# _initialize_engine(), which starts up the ray/dask/unidist
# processes
# 2) we can't decide to switch to unidist in the middle of execution.
lambda backend: backend not in ("Ray", "Unidist", "Dask"),
Backend.get_active_backends(),
)


class AggregatedBackendData:
"""
Contains information on Backends considered for computation.
Expand All @@ -42,11 +55,11 @@
query_compiler : QueryCompiler
"""

def __init__(self, backend: str, query_compiler: BaseQueryCompiler):
def __init__(self, backend: str, qc_cls: type[BaseQueryCompiler]):
self.backend = backend
self.qc_cls = type(query_compiler)
self.qc_cls = qc_cls
self.cost = 0
self.max_cost = query_compiler.max_cost()
self.max_cost = qc_cls.max_cost()


class BackendCostCalculator:
Expand All @@ -73,12 +86,25 @@
api_cls_name: Optional[str],
operation: str,
):
self._backend_data: dict[str, AggregatedBackendData] = {}
from modin.core.execution.dispatching.factories.dispatcher import (
FactoryDispatcher,
)

self._backend_data: dict[str, AggregatedBackendData] = {
backend: AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)
for backend in all_switchable_backends()
}
self._qc_list: list[BaseQueryCompiler] = []
self._result_backend = None
self._api_cls_name = api_cls_name
self._op = operation
self._operation_arguments = operation_arguments
self._unswitchable_backends: set[str] = set()

def add_query_compiler(self, query_compiler: BaseQueryCompiler):
"""
Expand All @@ -88,15 +114,58 @@
----------
query_compiler : QueryCompiler
"""
from modin.core.execution.dispatching.factories.dispatcher import (
FactoryDispatcher,
)

self._qc_list.append(query_compiler)
# If a QC's backend was not configured as active, we need to create an entry for it here.
backend = query_compiler.get_backend()
backend_data = AggregatedBackendData(backend, query_compiler)
self._backend_data[backend] = backend_data
if backend not in self._backend_data:
self._backend_data[backend] = AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)

def calculate(self) -> str:
"""
Calculate which query compiler we should cast to.
Switching calculation is performed as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the documentation here.

- For every registered query compiler in qc_list, with backend `backend_from`, compute
`self_cost = qc_from.stay_cost(...)` and add it to the total cost for `backend_from`.
- For every valid target `backend_to`, compute `qc_from.move_to_cost(qc_cls_to, ...)`. If it
returns None, instead compute `qc_cls_to.move_to_me_cost(qc_from, ...)`. Add the result
to the cost for `backend_to`.
At a high level, the cost for choosing a particular backend is the sum of
(all stay costs for data already on that backend)
+ (cost of moving all other query compilers to this backend)
If the arguments contain no query compilers for a particular backend, then there are no stay
costs. In this scenario, we expect the move_to cost for this backend to outweigh the corresponding
stay costs for each query compiler's original backend.
We considered a few alternative algorithms for switching calculation:
1. Instead of considering all active backends, consider only backends found among input QCs.
This was used in the calculator's original implementation, as we figured transfer cost to
unrelated backends would outweigh any possible gains in computation speed. However, certain
pathological cases that significantly changed the size of input or output data (e.g. cross join)
would create situations where transferring data after the computation became prohibitively
expensive, so we chose to instead. --------------------
Additionally, the original implementation had a bug where stay_cost was only computed for the
_first_ query compiler of each backend, thus under-reporting the cost of computation for any
backend with multiple QCs present. In practice this very rarely affected the chosen result.
2. Compute stay/move costs only once for each backend pair, but force QCs to consider other
arguments when calculating.
This approach is the most robust and accurate for cases like cross join, where a product of
transfer costs between backends is more reflective of cost than size. This approach requires
more work in the query compiler, as each QC must be aware of when multiple QC arguments are
passed and adjust the cost computation accordingly. It is also unclear how often this would
make a meaningful difference compared to the summation approach.
Returns
-------
str
Expand All @@ -108,58 +177,56 @@
return self._qc_list[0].get_backend()
if len(self._qc_list) == 0:
raise ValueError("No query compilers registered")
qc_from_cls_costed = set()
# instance selection
# See docstring for explanation of switching decision algorithm.
for qc_from in self._qc_list:

# Add self cost for the current query compiler
if type(qc_from) not in qc_from_cls_costed:
self_cost = qc_from.stay_cost(
self._api_cls_name, self._op, self._operation_arguments
self_cost = qc_from.stay_cost(
self._api_cls_name, self._op, self._operation_arguments
)
backend_from = qc_from.get_backend()
if self_cost is not None:
self._add_cost_data(backend_from, self_cost)

for backend_to, agg_data_to in self._backend_data.items():
if backend_to == backend_from:
continue
qc_cls_to = agg_data_to.qc_cls
cost = qc_from.move_to_cost(
qc_cls_to,
self._api_cls_name,
self._op,
self._operation_arguments,
)
backend_from = qc_from.get_backend()
if self_cost is not None:
self._add_cost_data(backend_from, self_cost)
qc_from_cls_costed.add(type(qc_from))

qc_to_cls_costed = set()
for qc_to in self._qc_list:
qc_cls_to = type(qc_to)
if qc_cls_to not in qc_to_cls_costed:
qc_to_cls_costed.add(qc_cls_to)
backend_to = qc_to.get_backend()
cost = qc_from.move_to_cost(
qc_cls_to,
if cost is not None:
self._add_cost_data(backend_to, cost)
else:
# We have some information asymmetry in query compilers,
# qc_from does not know about qc_to types so we instead
# ask the same question but of qc_to.
cost = qc_cls_to.move_to_me_cost(
qc_from,
self._api_cls_name,
self._op,
self._operation_arguments,
)
if cost is not None:
self._add_cost_data(backend_to, cost)
else:
# We have some information asymmetry in query compilers,
# qc_from does not know about qc_to types so we instead
# ask the same question but of qc_to.
cost = qc_cls_to.move_to_me_cost(
qc_from,
self._api_cls_name,
self._op,
self._operation_arguments,
)
if cost is not None:
self._add_cost_data(backend_to, cost)
# If move_to_me_cost and move_to_cost both returned none, then we cannot switch
# to this backend.
self._unswitchable_backends.add(backend_to)

min_value = None
for k, v in self._backend_data.items():
if v.cost > v.max_cost:
if v.cost > v.max_cost or k in self._unswitchable_backends:
continue
if min_value is None or min_value > v.cost:
min_value = v.cost
self._result_backend = k

if len(self._backend_data) > 1:
get_logger().info(
f"BackendCostCalculator Results: {self._calc_result_log(self._result_backend)}"
f"BackendCostCalculator results for {'pd' if self._api_cls_name else self._api_cls_name}.{self._op}: {self._calc_result_log(self._result_backend)}"
)
# Does not need to be secure, should not use system entropy
metrics_group = "%04x" % random.randrange(16**4)
Expand Down Expand Up @@ -230,4 +297,5 @@
return ",".join(
f"{'*'+k if k is selected_backend else k}:{v.cost}/{v.max_cost}"
for k, v in self._backend_data.items()
if k not in self._unswitchable_backends
)
10 changes: 2 additions & 8 deletions modin/core/storage_formats/pandas/query_compiler_caster.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from modin.core.storage_formats.base.query_compiler_calculator import (
BackendCostCalculator,
all_switchable_backends,
)
from modin.error_message import ErrorMessage
from modin.logging import disable_logging, get_logger
Expand Down Expand Up @@ -796,14 +797,7 @@
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols",
data_max_shape[1],
)
for backend in Backend.get_active_backends():
if backend in ("Ray", "Unidist", "Dask"):
# Disable automatically switching to these engines for now, because
# 1) _get_prepared_factory_for_backend() currently calls
# _initialize_engine(), which starts up the ray/dask/unidist
# processes
# 2) we can't decide to switch to unidist in the middle of execution.
continue
for backend in all_switchable_backends():
if backend == starting_backend:
continue
move_to_class = FactoryDispatcher._get_prepared_factory_for_backend(
Expand Down
Loading
Loading