Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AsyncReadMode,
AutoSwitchBackend,
Backend,
BackendJoinConsiderAllBackends,
BackendMergeCastInPlace,
BenchmarkMode,
CIAWSAccessKeyID,
Expand Down Expand Up @@ -79,6 +80,7 @@
"GpuCount",
"Memory",
"Backend",
"BackendJoinConsiderAllBackends",
"BackendMergeCastInPlace",
"Execution",
"AutoSwitchBackend",
Expand Down
49 changes: 49 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,27 @@ def set_active_backends(cls, new_choices: tuple) -> None:
)
cls.choices = new_choices

@classmethod
def activate(cls, backend: str) -> None:
"""
Activate a backend that was previously registered.

This is a no-op if the backend is already active.

Parameters
----------
backend : str
Backend to activate.

Raises
------
ValueError
Raises a ValueError if backend was not previously registered.
"""
if backend not in cls._BACKEND_TO_EXECUTION:
raise ValueError(f"Unknown backend '{backend}' is not registered.")
cls.choices = (*cls.choices, backend)

@classmethod
def get_active_backends(cls) -> tuple[str, ...]:
"""
Expand Down Expand Up @@ -570,6 +591,10 @@ def get_execution_for_backend(cls, backend: str) -> Execution:
)
normalized_value = cls.normalize(backend)
if normalized_value not in cls.choices:
if normalized_value in cls._BACKEND_TO_EXECUTION:
raise ValueError(
f"Backend '{backend}' is not currently active. Activate it first with Backend.activate('{backend})'."
)
backend_choice_string = ", ".join(f"'{choice}'" for choice in cls.choices)
raise ValueError(
f"Unknown backend '{backend}'. Available backends are: "
Expand Down Expand Up @@ -1409,6 +1434,30 @@ def disable(cls) -> None:
cls.put(False)


class BackendJoinConsiderAllBackends(EnvironmentVariable, type=bool):
"""
Whether to consider all active backends when performing a pre-operation switch for join operations.

Only used when AutoSwitchBackend is active.
By default, only backends already present in the arguments of a join operation are considered when
switching backends. Enabling this flag will allow join operations that are registered
as pre-op switches to consider backends other than those directly present in the arguments.
"""

varname = "MODIN_BACKEND_JOIN_CONSIDER_ALL_BACKENDS"
default = True

@classmethod
def enable(cls) -> None:
"""Enable casting in place when performing a merge operation betwen two different compilers."""
cls.put(True)

@classmethod
def disable(cls) -> None:
"""Disable casting in place when performing a merge operation betwen two different compilers."""
cls.put(False)


class DynamicPartitioning(EnvironmentVariable, type=bool):
"""
Set to true to use Modin's dynamic-partitioning implementation where possible.
Expand Down
7 changes: 4 additions & 3 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def move_to_cost(
api_cls_name: Optional[str],
operation: str,
arguments: MappingProxyType[str, Any],
) -> int:
) -> Optional[int]:
"""
Return the coercion costs of this qc to other_qc type.

Expand All @@ -353,7 +353,7 @@ def move_to_cost(

Returns
-------
int
Optional[int]
Cost of migrating the data from this qc to the other_qc or
None if the cost cannot be determined.
"""
Expand Down Expand Up @@ -516,7 +516,8 @@ def _transfer_threshold(cls) -> int:
return cls._TRANSFER_THRESHOLD

@disable_logging
def max_cost(self) -> int:
@classmethod
def max_cost(cls) -> int:
"""
Return the max cost allowed by this engine.

Expand Down
188 changes: 136 additions & 52 deletions modin/core/storage_formats/base/query_compiler_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from types import MappingProxyType
from typing import Any, Optional

from modin.config import Backend, BackendJoinConsiderAllBackends
from modin.core.storage_formats.base.query_compiler import (
BaseQueryCompiler,
QCCoercionCost,
Expand All @@ -31,6 +32,28 @@
from modin.logging.metrics import emit_metric


def all_switchable_backends() -> list[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this cannot be part of envvars

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this configurable, but I just refactored this out from a function in the QC caster:

for backend in Backend.get_active_backends():
if backend in ("Ray", "Unidist", "Dask"):
# Disable automatically switching to these engines for now, because
# 1) _get_prepared_factory_for_backend() currently calls
# _initialize_engine(), which starts up the ray/dask/unidist
# processes
# 2) we can't decide to switch to unidist in the middle of execution.
continue

"""
Return a list of all currently active backends that are candidates for switching.

Returns
-------
list
A list of valid backends.
"""
return list(
filter(
# Disable automatically switching to these engines for now, because
# 1) _get_prepared_factory_for_backend() currently calls
# _initialize_engine(), which starts up the ray/dask/unidist
# processes
# 2) we can't decide to switch to unidist in the middle of execution.
lambda backend: backend not in ("Ray", "Unidist", "Dask"),
Backend.get_active_backends(),
)
)


class AggregatedBackendData:
"""
Contains information on Backends considered for computation.
Expand All @@ -39,14 +62,15 @@ class AggregatedBackendData:
----------
backend : str
String representing the backend name.
query_compiler : QueryCompiler
qc_cls : type[QueryCompiler]
The query compiler sub-class for this backend.
"""

def __init__(self, backend: str, query_compiler: BaseQueryCompiler):
def __init__(self, backend: str, qc_cls: type[BaseQueryCompiler]):
self.backend = backend
self.qc_cls = type(query_compiler)
self.qc_cls = qc_cls
self.cost = 0
self.max_cost = query_compiler.max_cost()
self.max_cost = qc_cls.max_cost()


class BackendCostCalculator:
Expand All @@ -65,89 +89,149 @@ class BackendCostCalculator:
api_cls_name : str or None
Representing the class name of the function being called.
operation : str representing the operation being performed
query_compilers : list of query compiler arguments
preop_switch : bool
True if the operation is a pre-operation switch point.
"""

def __init__(
self,
*,
operation_arguments: MappingProxyType[str, Any],
api_cls_name: Optional[str],
operation: str,
query_compilers: list[BaseQueryCompiler],
preop_switch: bool,
):
self._backend_data: dict[str, AggregatedBackendData] = {}
from modin.core.execution.dispatching.factories.dispatcher import (
FactoryDispatcher,
)

self._qc_list: list[BaseQueryCompiler] = []
self._result_backend = None
self._api_cls_name = api_cls_name
self._op = operation
self._operation_arguments = operation_arguments

def add_query_compiler(self, query_compiler: BaseQueryCompiler):
"""
Add a query compiler to be considered for casting.

Parameters
----------
query_compiler : QueryCompiler
"""
self._qc_list.append(query_compiler)
backend = query_compiler.get_backend()
backend_data = AggregatedBackendData(backend, query_compiler)
self._backend_data[backend] = backend_data
self._backend_data = {}
self._qc_list = query_compilers[:]
for query_compiler in query_compilers:
# If a QC's backend was not configured as active, we need to create an entry for it here.
backend = query_compiler.get_backend()
if backend not in self._backend_data:
self._backend_data[backend] = AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)
if preop_switch and BackendJoinConsiderAllBackends.get():
# Initialize backend data for any backends not found among query compiler arguments.
# Because we default to the first query compiler's backend if no cost information is available,
# this initialization must occur after iterating over query compiler arguments to ensure
# correct ordering in dictionary arguments.
for backend in all_switchable_backends():
if backend not in self._backend_data:
self._backend_data[backend] = AggregatedBackendData(
backend,
FactoryDispatcher._get_prepared_factory_for_backend(
backend=backend
).io_cls.query_compiler_cls,
)

def calculate(self) -> str:
"""
Calculate which query compiler we should cast to.

Switching calculation is performed as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the documentation here.

- For every registered query compiler in qc_list, with backend `backend_from`, compute
`self_cost = qc_from.stay_cost(...)` and add it to the total cost for `backend_from`.
- For every valid target `backend_to`, compute `qc_from.move_to_cost(qc_cls_to, ...)`. If it
returns None, instead compute `qc_cls_to.move_to_me_cost(qc_from, ...)`. Add the result
to the cost for `backend_to`.
At a high level, the cost for choosing a particular backend is the sum of
(all stay costs for data already on that backend)
+ (cost of moving all other query compilers to this backend)

If the operation is a registered pre-operation switch point, then the list of target backends
is ALL active backends. Otherwise, only backends found among the arguments are considered.
Post-operation switch points are not yet supported.

If the arguments contain no query compilers for a particular backend, then there are no stay
costs. In this scenario, we expect the move_to cost for this backend to outweigh the corresponding
stay costs for each query compiler's original backend.

If no argument QCs have cost information for each other (that is, move_to_cost and move_to_me_cost
returns None), then we attempt to move all data to the backend of the first QC.

We considered a few alternative algorithms for switching calculation:

1. Instead of considering all active backends, consider only backends found among input QCs.
This was used in the calculator's original implementation, as we figured transfer cost to
unrelated backends would outweigh any possible gains in computation speed. However, certain
pathological cases that significantly changed the size of input or output data (e.g. cross join)
would create situations where transferring data after the computation became prohibitively
expensive, so we chose to allow switching to unrelated backends.
Additionally, the original implementation had a bug where stay_cost was only computed for the
_first_ query compiler of each backend, thus under-reporting the cost of computation for any
backend with multiple QCs present. In practice this very rarely affected the chosen result.
2. Compute stay/move costs only once for each backend pair, but force QCs to consider other
arguments when calculating.
This approach is the most robust and accurate for cases like cross join, where a product of
transfer costs between backends is more reflective of cost than size. This approach requires
more work in the query compiler, as each QC must be aware of when multiple QC arguments are
passed and adjust the cost computation accordingly. It is also unclear how often this would
make a meaningful difference compared to the summation approach.

Returns
-------
str
A string representing a backend.

Raises
------
ValueError
Raises ValueError when the reported transfer cost for every backend exceeds its maximum cost.
"""
if self._result_backend is not None:
return self._result_backend
if len(self._qc_list) == 1:
return self._qc_list[0].get_backend()
if len(self._qc_list) == 0:
raise ValueError("No query compilers registered")
qc_from_cls_costed = set()
# instance selection
# See docstring for explanation of switching decision algorithm.
for qc_from in self._qc_list:

# Add self cost for the current query compiler
if type(qc_from) not in qc_from_cls_costed:
self_cost = qc_from.stay_cost(
self._api_cls_name, self._op, self._operation_arguments
self_cost = qc_from.stay_cost(
self._api_cls_name, self._op, self._operation_arguments
)
backend_from = qc_from.get_backend()
if self_cost is not None:
self._add_cost_data(backend_from, self_cost)

for backend_to, agg_data_to in self._backend_data.items():
if backend_to == backend_from:
continue
qc_cls_to = agg_data_to.qc_cls
cost = qc_from.move_to_cost(
qc_cls_to,
self._api_cls_name,
self._op,
self._operation_arguments,
)
backend_from = qc_from.get_backend()
if self_cost is not None:
self._add_cost_data(backend_from, self_cost)
qc_from_cls_costed.add(type(qc_from))

qc_to_cls_costed = set()
for qc_to in self._qc_list:
qc_cls_to = type(qc_to)
if qc_cls_to not in qc_to_cls_costed:
qc_to_cls_costed.add(qc_cls_to)
backend_to = qc_to.get_backend()
cost = qc_from.move_to_cost(
qc_cls_to,
if cost is not None:
self._add_cost_data(backend_to, cost)
else:
# We have some information asymmetry in query compilers,
# qc_from does not know about qc_to types so we instead
# ask the same question but of qc_to.
cost = qc_cls_to.move_to_me_cost(
qc_from,
self._api_cls_name,
self._op,
self._operation_arguments,
)
if cost is not None:
self._add_cost_data(backend_to, cost)
else:
# We have some information asymmetry in query compilers,
# qc_from does not know about qc_to types so we instead
# ask the same question but of qc_to.
cost = qc_cls_to.move_to_me_cost(
qc_from,
self._api_cls_name,
self._op,
self._operation_arguments,
)
if cost is not None:
self._add_cost_data(backend_to, cost)

min_value = None
for k, v in self._backend_data.items():
Expand All @@ -159,7 +243,7 @@ def calculate(self) -> str:

if len(self._backend_data) > 1:
get_logger().info(
f"BackendCostCalculator Results: {self._calc_result_log(self._result_backend)}"
f"BackendCostCalculator results for {'pd' if self._api_cls_name is None else self._api_cls_name}.{self._op}: {self._calc_result_log(self._result_backend)}"
)
# Does not need to be secure, should not use system entropy
metrics_group = "%04x" % random.randrange(16**4)
Expand All @@ -185,7 +269,7 @@ def calculate(self) -> str:

if self._result_backend is None:
raise ValueError(
f"Cannot cast to any of the available backends, as the estimated cost is too high. Tried these backends: [{','.join(self._backend_data.keys())}]"
f"Cannot cast to any of the available backends, as the estimated cost is too high. Tried these backends: [{', '.join(self._backend_data.keys())}]"
)
return self._result_backend

Expand Down Expand Up @@ -227,7 +311,7 @@ def _calc_result_log(self, selected_backend: str) -> str:
str
String representation of calculator state.
"""
return ",".join(
return ", ".join(
f"{'*'+k if k is selected_backend else k}:{v.cost}/{v.max_cost}"
for k, v in self._backend_data.items()
)
Loading
Loading