Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@

# instance selection
for qc_from in self._qc_list:
self_cost = qc_from.stay_cost(

Check warning on line 114 in modin/core/storage_formats/base/query_compiler_calculator.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler_calculator.py#L114

Added line #L114 was not covered by tests
self._api_cls_name, self._op, self._operation_arguments
)
backend_from = qc_from.get_backend()
self._add_cost_data(backend_from, self_cost)

Check warning on line 118 in modin/core/storage_formats/base/query_compiler_calculator.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler_calculator.py#L117-L118

Added lines #L117 - L118 were not covered by tests
qc_to_cls_costed = set()
for qc_to in self._qc_list:
qc_cls_to = type(qc_to)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
]
return {
CloudQC: QCCoercionCost.COST_ZERO,
CloudQCHighSelf: QCCoercionCost.COST_LOW,
ClusterQC: QCCoercionCost.COST_MEDIUM,
DefaultQC: QCCoercionCost.COST_MEDIUM,
LocalMachineQC: QCCoercionCost.COST_HIGH,
Expand All @@ -106,8 +107,14 @@
}[other_qc_cls]

def stay_cost(self, api_cls_name, op, arguments):
return QCCoercionCost.COST_HIGH
return QCCoercionCost.COST_ZERO

class CloudQCHighSelf(CloudQC):
def get_backend(self):
return "Cloud_High_Self"

def stay_cost(self, api_cls_name, op, arguments):
return QCCoercionCost.COST_HIGH

class ClusterQC(CalculatorTestQc):
"Represents a local network cluster query compiler"
Expand All @@ -121,6 +128,7 @@
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: QCCoercionCost.COST_MEDIUM,
CloudQCHighSelf: QCCoercionCost.COST_MEDIUM,
ClusterQC: QCCoercionCost.COST_ZERO,
DefaultQC: None, # cluster qc knows nothing about default qc
LocalMachineQC: QCCoercionCost.COST_MEDIUM,
Expand All @@ -140,6 +148,7 @@
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: QCCoercionCost.COST_MEDIUM,
CloudQCHighSelf: QCCoercionCost.COST_MEDIUM,
ClusterQC: QCCoercionCost.COST_LOW,
LocalMachineQC: QCCoercionCost.COST_ZERO,
PicoQC: QCCoercionCost.COST_MEDIUM,
Expand All @@ -158,6 +167,7 @@
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: QCCoercionCost.COST_LOW,
CloudQCHighSelf: QCCoercionCost.COST_LOW,
ClusterQC: QCCoercionCost.COST_LOW,
LocalMachineQC: QCCoercionCost.COST_LOW,
PicoQC: QCCoercionCost.COST_ZERO,
Expand All @@ -173,6 +183,7 @@
def move_to_cost(self, other_qc_cls, api_cls_name, op, arguments):
return {
CloudQC: -1000,
CloudQCHighSelf: -1000,
ClusterQC: 10000,
AdversarialQC: QCCoercionCost.COST_ZERO,
}[other_qc_cls]
Expand Down Expand Up @@ -254,6 +265,9 @@

def get_backend(self) -> str:
return "Big_Data_Cloud"

def max_cost(self):
return QCCoercionCost.COST_IMPOSSIBLE * 10

@classmethod
def move_to_me_cost(cls, other_qc, api_cls_name, operation, arguments):
Expand Down Expand Up @@ -287,6 +301,9 @@

def get_backend(self) -> str:
return "Small_Data_Local"

def max_cost(self):
return QCCoercionCost.COST_IMPOSSIBLE * 10


def register_backend(name, qc):
Expand All @@ -310,6 +327,7 @@
register_backend("Pico", PicoQC)
register_backend("Cluster", ClusterQC)
register_backend("Cloud", CloudQC)
register_backend("Cloud_High_Self", CloudQCHighSelf)
register_backend("Local_Machine", LocalMachineQC)
register_backend("Adversarial", AdversarialQC)
register_backend("Eager", OmniscientEagerQC)
Expand All @@ -324,6 +342,9 @@
def cloud_df():
return pd.DataFrame(query_compiler=CloudQC(pandas.DataFrame([0, 1, 2])))

@pytest.fixture()
def cloud_high_self_df():
return pd.DataFrame(query_compiler=CloudQCHighSelf(pandas.DataFrame([0, 1, 2])))

@pytest.fixture()
def cluster_df():
Expand Down Expand Up @@ -449,6 +470,30 @@
pd.concat(axis=1, objs=[pico_df, local_df, cluster_df, cloud_df])


def test_self_cost_causes_move(cloud_high_self_df, cluster_df):
"""
Test that ``self_cost`` is being properly considered.
Cost to stay on cloud_high_self is HIGH, but moving to cluster is MEDIUM.
Cost to stay on cluster is ZERO, and moving to cloud_high_self is MEDIUM.
With two dataframes, one on each backend, the total cost of using
``cloud_high_self`` as the final backend is:
``stay_cost(cloud_high_self) + move_cost(cluster->cloud_high_self)``
which is ``HIGH + MEDIUM``.
The total cost of using ``cluster`` as the final backend is:
``stay_cost(cluster) + move_cost(cloud_high_self->cluster)``
which is ``ZERO + MEDIUM``.
So we should select ``cluster``.
"""
result = pd.concat([cloud_high_self_df, cluster_df])
assert result.get_backend() == "Cluster"

result = pd.concat([cluster_df, cloud_high_self_df])
assert result.get_backend() == "Cluster"


@pytest.mark.parametrize(
"df1, df2, df3, df4, expected_result_backend",
[
Expand Down Expand Up @@ -633,23 +678,20 @@
assert local_df.get_backend() == "Local_machine" if pin_local else "Cloud"


# Outlines a future generic function for determining when to stay
# or move to different engines. In the current state it is pretty
# trivial, but added for completeness
def test_stay_or_move_evaluation(cloud_df, default_df):
def test_stay_or_move_evaluation(cloud_high_self_df, default_df):
default_cls = type(default_df._get_query_compiler())
cloud_cls = type(cloud_df._get_query_compiler())
cloud_cls = type(cloud_high_self_df._get_query_compiler())
empty_arguments = MappingProxyType({})

stay_cost = cloud_df._get_query_compiler().stay_cost(
stay_cost = cloud_high_self_df._get_query_compiler().stay_cost(
"Series", "myop", arguments=empty_arguments
)
move_cost = cloud_df._get_query_compiler().move_to_cost(
move_cost = cloud_high_self_df._get_query_compiler().move_to_cost(
default_cls, "Series", "myop", arguments=empty_arguments
)
df = cloud_df
df = cloud_high_self_df
if stay_cost > move_cost:
df = cloud_df.move_to("Test_casting_default")
df = cloud_high_self_df.move_to("Test_casting_default")
else:
assert False

Expand Down
Loading