Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion modin/core/storage_formats/base/query_compiler_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,20 @@ def calculate(self) -> str:
return self._qc_list[0].get_backend()
if len(self._qc_list) == 0:
raise ValueError("No query compilers registered")

qc_from_cls_costed = set()
# instance selection
for qc_from in self._qc_list:

# Add self cost for the current query compiler
if type(qc_from) not in qc_from_cls_costed:
self_cost = qc_from.stay_cost(
self._api_cls_name, self._op, self._operation_arguments
)
backend_from = qc_from.get_backend()
if self_cost is not None:
self._add_cost_data(backend_from, self_cost)
qc_from_cls_costed.add(type(qc_from))

qc_to_cls_costed = set()
for qc_to in self._qc_list:
qc_cls_to = type(qc_to)
Expand Down
6 changes: 6 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ def lazy_column_count(self):
"""
return not self.frame_has_materialized_columns

# The default implementation of stay_cost will cache some information
# which will violate some assumptions in test_internals. Since this class
# is only used for non-hybrid operations we simply return 0 here for now.
def stay_cost(self, api_cls_name, operation, arguments):
return 0

def finalize(self):
self._modin_frame.finalize()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
]
return {
CloudQC: QCCoercionCost.COST_ZERO,
CloudQCHighSelf: QCCoercionCost.COST_LOW,
ClusterQC: QCCoercionCost.COST_MEDIUM,
DefaultQC: QCCoercionCost.COST_MEDIUM,
LocalMachineQC: QCCoercionCost.COST_HIGH,
Expand All @@ -105,6 +106,14 @@ def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
OmniscientLazyQC: None,
}[other_qc_cls]

def stay_cost(self, api_cls_name, op, arguments):
return QCCoercionCost.COST_ZERO


class CloudQCHighSelf(CloudQC):
def get_backend(self):
return "Cloud_High_Self"

def stay_cost(self, api_cls_name, op, arguments):
return QCCoercionCost.COST_HIGH

Expand All @@ -121,6 +130,7 @@ def max_cost(self):
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: QCCoercionCost.COST_MEDIUM,
CloudQCHighSelf: QCCoercionCost.COST_MEDIUM,
ClusterQC: QCCoercionCost.COST_ZERO,
DefaultQC: None, # cluster qc knows nothing about default qc
LocalMachineQC: QCCoercionCost.COST_MEDIUM,
Expand All @@ -140,6 +150,7 @@ def max_cost(self):
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: QCCoercionCost.COST_MEDIUM,
CloudQCHighSelf: QCCoercionCost.COST_MEDIUM,
ClusterQC: QCCoercionCost.COST_LOW,
LocalMachineQC: QCCoercionCost.COST_ZERO,
PicoQC: QCCoercionCost.COST_MEDIUM,
Expand All @@ -158,6 +169,7 @@ def max_cost(self):
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: QCCoercionCost.COST_LOW,
CloudQCHighSelf: QCCoercionCost.COST_LOW,
ClusterQC: QCCoercionCost.COST_LOW,
LocalMachineQC: QCCoercionCost.COST_LOW,
PicoQC: QCCoercionCost.COST_ZERO,
Expand All @@ -173,6 +185,7 @@ def get_backend(self):
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: -1000,
CloudQCHighSelf: -1000,
ClusterQC: 10000,
AdversarialQC: QCCoercionCost.COST_ZERO,
}[other_qc_cls]
Expand Down Expand Up @@ -255,6 +268,9 @@ def stay_cost(self, api_cls_name, operation, arguments):
def get_backend(self) -> str:
return "Big_Data_Cloud"

def max_cost(self):
return QCCoercionCost.COST_IMPOSSIBLE * 10

@classmethod
def move_to_me_cost(cls, other_qc, api_cls_name, operation, arguments):
if api_cls_name in ("DataFrame", "Series") and operation == "__init__":
Expand Down Expand Up @@ -288,6 +304,9 @@ def __init__(self, pandas_frame):
def get_backend(self) -> str:
return "Small_Data_Local"

def max_cost(self):
return QCCoercionCost.COST_IMPOSSIBLE * 10


def register_backend(name, qc):
class TestCasterIO(BaseIO):
Expand All @@ -310,6 +329,7 @@ def prepare(cls):
register_backend("Pico", PicoQC)
register_backend("Cluster", ClusterQC)
register_backend("Cloud", CloudQC)
register_backend("Cloud_High_Self", CloudQCHighSelf)
register_backend("Local_Machine", LocalMachineQC)
register_backend("Adversarial", AdversarialQC)
register_backend("Eager", OmniscientEagerQC)
Expand All @@ -325,6 +345,11 @@ def cloud_df():
return pd.DataFrame(query_compiler=CloudQC(pandas.DataFrame([0, 1, 2])))


@pytest.fixture()
def cloud_high_self_df():
return pd.DataFrame(query_compiler=CloudQCHighSelf(pandas.DataFrame([0, 1, 2])))


@pytest.fixture()
def cluster_df():
return pd.DataFrame(query_compiler=ClusterQC(pandas.DataFrame([0, 1, 2])))
Expand Down Expand Up @@ -449,6 +474,30 @@ def test_no_solution(pico_df, local_df, cluster_df, cloud_df):
pd.concat(axis=1, objs=[pico_df, local_df, cluster_df, cloud_df])


def test_self_cost_causes_move(cloud_high_self_df, cluster_df):
"""
Test that ``self_cost`` is being properly considered.

Cost to stay on cloud_high_self is HIGH, but moving to cluster is MEDIUM.
Cost to stay on cluster is ZERO, and moving to cloud_high_self is MEDIUM.

With two dataframes, one on each backend, the total cost of using
``cloud_high_self`` as the final backend is:
``stay_cost(cloud_high_self) + move_cost(cluster->cloud_high_self)``
which is ``HIGH + MEDIUM``.
The total cost of using ``cluster`` as the final backend is:
``stay_cost(cluster) + move_cost(cloud_high_self->cluster)``
which is ``ZERO + MEDIUM``.

So we should select ``cluster``.
"""
result = pd.concat([cloud_high_self_df, cluster_df])
assert result.get_backend() == "Cluster"

result = pd.concat([cluster_df, cloud_high_self_df])
assert result.get_backend() == "Cluster"


@pytest.mark.parametrize(
"df1, df2, df3, df4, expected_result_backend",
[
Expand Down Expand Up @@ -633,23 +682,19 @@ def test_switch_local_to_cloud_with_iloc___setitem__(local_df, cloud_df, pin_loc
assert local_df.get_backend() == "Local_machine" if pin_local else "Cloud"


# Outlines a future generic function for determining when to stay
# or move to different engines. In the current state it is pretty
# trivial, but added for completeness
def test_stay_or_move_evaluation(cloud_df, default_df):
def test_stay_or_move_evaluation(cloud_high_self_df, default_df):
default_cls = type(default_df._get_query_compiler())
cloud_cls = type(cloud_df._get_query_compiler())
cloud_cls = type(cloud_high_self_df._get_query_compiler())
empty_arguments = MappingProxyType({})

stay_cost = cloud_df._get_query_compiler().stay_cost(
stay_cost = cloud_high_self_df._get_query_compiler().stay_cost(
"Series", "myop", arguments=empty_arguments
)
move_cost = cloud_df._get_query_compiler().move_to_cost(
move_cost = cloud_high_self_df._get_query_compiler().move_to_cost(
default_cls, "Series", "myop", arguments=empty_arguments
)
df = cloud_df
if stay_cost > move_cost:
df = cloud_df.move_to("Test_casting_default")
df = cloud_high_self_df.move_to("Test_casting_default")
else:
assert False

Expand Down
2 changes: 1 addition & 1 deletion modin/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_df_metrics(metric_client):
add_metric_handler(metric_client._metric_handler)
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
df.sum()
assert len(metric_client._metrics) == 54
assert len(metric_client._metrics) == 55
assert metric_client._metrics["modin.pandas-api.dataframe.sum"] is not None
assert metric_client._metrics["modin.pandas-api.dataframe.sum"] > 0.0

Expand Down
Loading