Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler_caster.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from modin.error_message import ErrorMessage
from modin.logging import disable_logging, get_logger
from modin.logging.metrics import emit_metric
from modin.utils import sentinel
from modin.utils import _inherit_docstrings, sentinel

Fn = TypeVar("Fn", bound=Any)

Expand Down Expand Up @@ -270,7 +270,11 @@ def get_backend(self) -> str:

@abstractmethod
def set_backend(
self, backend: str, inplace: bool, *, switch_operation: Optional[str] = None
self,
backend: str,
inplace: bool = False,
*,
switch_operation: Optional[str] = None,
) -> Optional[Self]:
"""
Set the backend of this object.
Expand All @@ -294,6 +298,18 @@ def set_backend(
"""
pass

@_inherit_docstrings(set_backend)
def move_to(
self,
backend: str,
inplace: bool = False,
*,
switch_operation: Optional[str] = None,
) -> Optional[Self]:
return self.set_backend(
backend=backend, inplace=inplace, switch_operation=switch_operation
)

@abstractmethod
def _copy_into(self, other: Self) -> None:
"""
Expand Down
84 changes: 79 additions & 5 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
EXTENSION_DICT_TYPE,
EXTENSION_NO_LOOKUP,
QueryCompilerCaster,
visit_nested_args,
)
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger, disable_logging
Expand Down Expand Up @@ -187,18 +188,91 @@ def get_backend(self) -> str:
return self._df.get_backend()

@disable_logging
@_inherit_docstrings(QueryCompilerCaster.set_backend)
def set_backend(
self,
backend: str,
inplace: bool = False,
*,
switch_operation: Optional[str] = None,
) -> Optional[Self]:
# TODO(https://github.com/modin-project/modin/issues/7544): implement
# this method to support automatic pre-operation backend switch for
# groupby methods.
ErrorMessage.not_implemented()
"""
Move the data in this groupby object to a different backend.

Parameters
----------
backend : str
The name of the backend to switch to.
inplace : bool, default: False
Whether to perform the operation in-place.
switch_operation : str, optional
The operation being performed that triggered the backend switch.

Returns
-------
DataFrameGroupBy or None
If inplace=False, returns a new groupby object with the specified backend.
If inplace=True, returns None and changes the backend of the current object.

Notes
-----
When `inplace=True`, this method will move the data between backends
for all parent objects (the DataFrame/Series used to create this
groupby, and any DataFrames/Series in the `by` list). When
`inplace=False`, new copies of the parent objects are created with their
data in the target backend for the returned groupby object, leaving the
original parent objects unchanged.
"""

def set_instance_variable_backend(arg: Any) -> Any:
# groupby object _by and _df fields may include both
# QueryCompilerCaster objects and BaseQueryCompiler objects,
# so we have to be able to set the backend on both of those.

if isinstance(arg, QueryCompilerCaster):
result = arg.set_backend(
backend=backend, inplace=inplace, switch_operation=switch_operation
)
return arg if inplace else result
if isinstance(arg, BaseQueryCompiler):
# Use a cyclic import here because query compilers themselves
# do not implement set_backend().
from modin.pandas import DataFrame

return (
DataFrame(query_compiler=arg)
.set_backend(backend=backend, inplace=False)
._query_compiler
)
return arg

new_by = visit_nested_args([self._by], set_instance_variable_backend)[0]
new_df = visit_nested_args([self._df], set_instance_variable_backend)[0]

if inplace:
self._df = new_df
self._query_compiler = new_df._query_compiler
self._by = new_by
return None
return type(self)(
df=new_df,
by=new_by,
axis=self._axis,
level=self._level,
as_index=self._as_index,
sort=self._sort,
group_keys=self._kwargs["group_keys"],
idx_name=self._idx_name,
drop=self._drop,
backend_pinned=self._backend_pinned,
# We have added as_index, sort, group_keys, and level to the kwargs
# dictionary, so we need to remove them from the keyword arguments
# that we pass to the new DataFrameGroupBy object.
**{
k: v
for k, v in self._kwargs.items()
if k not in ["as_index", "sort", "group_keys", "level"]
},
)

@_inherit_docstrings(QueryCompilerCaster.is_backend_pinned)
def is_backend_pinned(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1399,11 +1399,6 @@ def test___init___with_in_memory_data_uses_native_query_compiler(
param(
BIG_DATA_CLOUD_MIN_NUM_ROWS - 1,
"Small_Data_Local",
marks=pytest.mark.xfail(
strict=True,
raises=NotImplementedError,
reason="https://github.com/modin-project/modin/issues/7542",
),
),
(BIG_DATA_CLOUD_MIN_NUM_ROWS, "Big_Data_Cloud"),
],
Expand Down
213 changes: 212 additions & 1 deletion modin/tests/pandas/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
import re
from unittest.mock import patch

import pandas
import pytest
import tqdm.auto

import modin.pandas as pd
from modin.config import Backend
from modin.config import context as config_context
from modin.tests.pandas.utils import df_equals
from modin.tests.pandas.utils import (
create_test_dfs,
default_to_pandas_ignore_string,
df_equals,
)

WINDOWS_RAY_SKIP_MARK = pytest.mark.skipif(
platform.system() == "Windows",
Expand All @@ -32,6 +37,10 @@
),
)

# Some modin methods warn about defaulting to pandas at the API layer. That's
# expected and not an error as it would be normally.
pytestmark = pytest.mark.filterwarnings(default_to_pandas_ignore_string)


def test_new_dataframe_uses_default_backend():
# We run this test with `Backend` set to just one value (instead of
Expand Down Expand Up @@ -205,3 +214,205 @@ def test_set_backend_docstrings(setter_method):
assert dataframe_method.__doc__ == series_method.__doc__.replace(
"Series", "DataFrame"
)


class TestGroupbySetBackend:
@pytest.mark.parametrize("setter_method", ["set_backend", "move_to"])
@pytest.mark.parametrize(
"inplace_kwargs",
[
pytest.param({"inplace": True}, id="inplace"),
pytest.param({"inplace": False}, id="not_inplace"),
pytest.param({}, id="no_inplace_kwargs"),
],
)
@pytest.mark.parametrize(
"starting_backend, new_backend",
[
pytest.param(Backend.get(), "Pandas", id="current_to_pandas"),
pytest.param("Pandas", Backend.get(), id="pandas_to_current"),
pytest.param(Backend.get(), "Python_Test", id="current_to_python"),
pytest.param("Python_Test", Backend.get(), id="python_to_current"),
pytest.param("Python_Test", "Pandas", id="python_to_pandas"),
pytest.param("Pandas", "Python_Test", id="pandas_to_python"),
],
)
@pytest.mark.parametrize(
"by_level_factory",
[
pytest.param(lambda df: ("C", None), id="by_string_column"),
pytest.param(lambda df: (["C", "D"], None), id="by_list_of_strings"),
pytest.param(lambda df: (df["C"], None), id="by_series"),
pytest.param(lambda df: (["C", df["D"]], None), id="by_list_mixed"),
pytest.param(lambda df: (pandas.Grouper(key="C"), None), id="by_grouper"),
pytest.param(lambda df: (None, 0), id="level_scalar"),
pytest.param(lambda df: (None, [0, 1]), id="level_list"),
pytest.param(
lambda df: (["C", df["D"]], None), id="by_mixed_string_series"
),
],
)
def test_dataframe(
self,
setter_method,
inplace_kwargs,
starting_backend,
new_backend,
by_level_factory,
):
"""Test set_backend functionality for DataFrame groupby objects with various 'by' and 'level' combinations."""
with config_context(Backend=starting_backend):

def do_groupby(df):
by, level = by_level_factory(df)
return df.groupby(by=by, level=level)

inplace = inplace_kwargs.get("inplace", False)
original_modin_df, original_pandas_df = create_test_dfs(
pandas.DataFrame(
data={
"A": [1, 2, 3, 4, 5, 6],
"B": [10, 20, 30, 40, 50, 60],
"C": ["x", "y", "x", "y", "x", "y"],
"D": ["p", "p", "q", "q", "r", "r"],
},
index=pd.MultiIndex.from_tuples(
[
("foo", 1),
("foo", 2),
("bar", 1),
("bar", 2),
("baz", 1),
("baz", 2),
],
names=["first", "second"],
),
)
)

# Create DataFrame groupby object
original_groupby = do_groupby(original_modin_df)

setter_result = getattr(original_groupby, setter_method)(
new_backend, **inplace_kwargs
)

if inplace:
assert setter_result is None
result_groupby = original_groupby
# Verify that the underlying DataFrame's backend was also changed
assert original_groupby._df.get_backend() == new_backend
else:
assert setter_result is not original_groupby
result_groupby = setter_result
# Verify original DataFrame's backend was not changed
assert original_groupby._df.get_backend() == starting_backend

# Verify backend was changed
assert result_groupby.get_backend() == new_backend

# Verify that groupby still works correctly after backend switch
# Create a fresh groupby for comparison to avoid mixed backend states
pandas_groupby_sum = do_groupby(original_pandas_df).sum()
df_equals(
result_groupby.sum(),
pandas_groupby_sum,
)
if not inplace:
df_equals(
original_groupby.sum(),
pandas_groupby_sum,
)

@pytest.mark.parametrize("setter_method", ["set_backend", "move_to"])
@pytest.mark.parametrize(
"inplace_kwargs",
[
pytest.param({"inplace": True}, id="inplace"),
pytest.param({"inplace": False}, id="not_inplace"),
pytest.param({}, id="no_inplace_kwargs"),
],
)
@pytest.mark.parametrize(
"starting_backend, new_backend",
[
pytest.param(Backend.get(), "Pandas", id="current_to_pandas"),
pytest.param("Pandas", Backend.get(), id="pandas_to_current"),
pytest.param(Backend.get(), "Python_Test", id="current_to_python"),
pytest.param("Python_Test", Backend.get(), id="python_to_current"),
pytest.param("Python_Test", "Pandas", id="python_to_pandas"),
pytest.param("Pandas", "Python_Test", id="pandas_to_python"),
],
)
@pytest.mark.parametrize(
"by_level_factory",
[
pytest.param(lambda series: (None, 0), id="by_index_level_0"),
pytest.param(
lambda series: (None, [0, 1]),
id="by_index_levels_list",
),
pytest.param(
lambda series: (pandas.Grouper(level=0), None),
id="by_grouper_level",
),
pytest.param(lambda series: (None, 0), id="level_scalar"),
pytest.param(lambda series: (None, [0, 1]), id="level_list"),
pytest.param(lambda series: (series, None), id="by_self"),
pytest.param(lambda series: (series % 2, None), id="by_self_modulo_2"),
],
)
def test_series(
self,
setter_method,
inplace_kwargs,
starting_backend,
new_backend,
by_level_factory,
):
"""Test set_backend functionality for Series groupby objects with various 'by' and 'level' combinations."""
with config_context(Backend=starting_backend):
inplace = inplace_kwargs.get("inplace", False)
# Create test data with MultiIndex to support level-based grouping
idx = pd.MultiIndex.from_tuples(
[
("foo", 1),
("foo", 2),
("bar", 1),
("bar", 2),
("baz", 1),
("baz", 2),
],
names=["first", "second"],
)
original_pandas_series = pandas.Series([1, 2, 1, 3, 4, 5], index=idx)
original_modin_series = pd.Series([1, 2, 1, 3, 4, 5], index=idx)

def do_groupby(series):
by, level = by_level_factory(series)
return series.groupby(by=by, level=level)

# Create Series groupby object
original_groupby = do_groupby(original_modin_series)

setter_result = getattr(original_groupby, setter_method)(
new_backend, **inplace_kwargs
)

if inplace:
assert setter_result is None
result_groupby = original_groupby
# Verify that the underlying Series's backend was also changed
assert original_groupby._df.get_backend() == new_backend
else:
assert setter_result is not original_groupby
result_groupby = setter_result
# Verify original Series's backend was not changed
assert original_groupby._df.get_backend() == starting_backend

assert result_groupby.get_backend() == new_backend

pandas_groupby_sum = do_groupby(original_pandas_series).sum()
df_equals(result_groupby.sum(), pandas_groupby_sum)
if not inplace:
df_equals(original_groupby.sum(), pandas_groupby_sum)
Loading