Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ def groupby(
drop=drop,
dropna=dropna,
return_tuple_when_iterating=return_tuple_when_iterating,
backend_pinned=self.is_backend_pinned(),
)

def keys(self) -> pandas.Index: # noqa: RT01, D200
Expand Down
25 changes: 22 additions & 3 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(
group_keys,
idx_name,
drop,
backend_pinned: bool,
**kwargs,
):
self._axis = axis
Expand All @@ -141,6 +142,8 @@ def __init__(
self._return_tuple_when_iterating = kwargs.pop(
"return_tuple_when_iterating", False
)
# Whether the backend of this groupby object has been pinned.
self._backend_pinned = backend_pinned

if (
level is None
Expand Down Expand Up @@ -199,15 +202,26 @@ def set_backend(

@_inherit_docstrings(QueryCompilerCaster.is_backend_pinned)
def is_backend_pinned(self) -> bool:
return False
return self._backend_pinned

@_inherit_docstrings(QueryCompilerCaster._set_backend_pinned)
def _set_backend_pinned(self, pinned: bool, inplace: bool) -> Optional[Self]:
ErrorMessage.not_implemented()
if not inplace:
ErrorMessage.not_implemented(
"Only inplace=True is supported for groupby pinning"
)

self._backend_pinned = pinned
return None

@_inherit_docstrings(QueryCompilerCaster.pin_backend)
def pin_backend(self, inplace: bool = False) -> Optional[Self]:
ErrorMessage.not_implemented()
if not inplace:
ErrorMessage.not_implemented(
"Only inplace=True is supported for groupby pinning"
)

return self._set_backend_pinned(True, inplace=True)

@disable_logging
@_inherit_docstrings(QueryCompilerCaster._get_query_compiler)
Expand Down Expand Up @@ -237,6 +251,7 @@ def _override(self, **kwargs):
axis=self._axis,
idx_name=self._idx_name,
drop=self._drop,
backend_pinned=self._backend_pinned,
**self._kwargs,
)
new_kw.update(kwargs)
Expand Down Expand Up @@ -925,6 +940,7 @@ def __getitem__(self, key):
return DataFrameGroupBy(
self._df[key],
drop=self._drop,
backend_pinned=self._backend_pinned,
**kwargs,
)
if (
Expand All @@ -939,6 +955,7 @@ def __getitem__(self, key):
return SeriesGroupBy(
self._df[key],
drop=False,
backend_pinned=self._backend_pinned,
**kwargs,
)

Expand Down Expand Up @@ -1223,6 +1240,7 @@ def size(self):
0,
drop=self._drop,
idx_name=self._idx_name,
backend_pinned=self._backend_pinned,
**self._kwargs,
).size()
result = self._wrap_aggregation(
Expand Down Expand Up @@ -1422,6 +1440,7 @@ def fillna(
axis=self._axis,
idx_name=self._idx_name,
drop=self._drop,
backend_pinned=self._backend_pinned,
**new_groupby_kwargs,
)
return work_object._wrap_aggregation(
Expand Down
1 change: 1 addition & 0 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,7 @@ def groupby(
observed=observed,
drop=False,
dropna=dropna,
backend_pinned=self.is_backend_pinned(),
)

def gt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,111 @@ def test_auto_switch_config_can_disable_groupby_agg_auto_switch(
)
assert modin_groupby.get_backend() == "Big_Data_Cloud"

@pytest.mark.parametrize(
"groupby_class,groupby_operation,agg_operation",
[
param(
"DataFrameGroupBy",
lambda df: df.groupby("col0"),
lambda groupby: groupby.sum(),
id="DataFrameGroupBy",
),
param(
"SeriesGroupBy",
lambda df: df.groupby("col0")["col1"],
lambda groupby: groupby.sum(),
id="SeriesGroupBy",
),
],
)
@backend_test_context(
test_backend="Big_Data_Cloud",
choices=("Big_Data_Cloud", "Small_Data_Local"),
)
def test_pinned_dataframe_prevents_groupby_backend_switch(
self, groupby_class, groupby_operation, agg_operation
):
"""Test that pinning a DataFrame prevents groupby operations from switching backends."""
modin_df, pandas_df = create_test_dfs(
{
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
}
)

assert modin_df.get_backend() == "Big_Data_Cloud"

# Pin the DataFrame
modin_df.pin_backend(inplace=True)
assert modin_df.is_backend_pinned()

# Create groupby object - should inherit pin status from dataframe
modin_groupby = groupby_operation(modin_df)
pandas_groupby = groupby_operation(pandas_df)
assert modin_groupby.is_backend_pinned() # Inherited from DataFrame

# Register a post-op switch that would normally move to Small_Data_Local
register_function_for_post_op_switch(
class_name=groupby_class, backend="Big_Data_Cloud", method="sum"
)

# The operation should stay on Big_Data_Cloud due to inherited pinning
modin_result = agg_operation(modin_groupby)
pandas_result = agg_operation(pandas_groupby)
df_equals(modin_result, pandas_result)
assert modin_result.get_backend() == "Big_Data_Cloud"

@pytest.mark.parametrize(
"groupby_class,groupby_operation,agg_operation",
[
param(
"DataFrameGroupBy",
lambda df: df.groupby("col0"),
lambda groupby: groupby.sum(),
id="DataFrameGroupBy",
),
param(
"SeriesGroupBy",
lambda df: df.groupby("col0")["col1"],
lambda groupby: groupby.sum(),
id="SeriesGroupBy",
),
],
)
@backend_test_context(
test_backend="Big_Data_Cloud",
choices=("Big_Data_Cloud", "Small_Data_Local"),
)
def test_pinned_groupby_prevents_backend_switch(
self, groupby_class, groupby_operation, agg_operation
):
"""Test that pinning a GroupBy object prevents operations from switching backends."""
modin_df, pandas_df = create_test_dfs(
{
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
}
)

assert modin_df.get_backend() == "Big_Data_Cloud"

# Create groupby object and pin it directly
modin_groupby = groupby_operation(modin_df)
pandas_groupby = groupby_operation(pandas_df)
modin_groupby.pin_backend(inplace=True)
assert modin_groupby.is_backend_pinned()

# Register a post-op switch that would normally move to Small_Data_Local
register_function_for_post_op_switch(
class_name=groupby_class, backend="Big_Data_Cloud", method="sum"
)

# The operation should stay on Big_Data_Cloud due to pinning
modin_result = agg_operation(modin_groupby)
pandas_result = agg_operation(pandas_groupby)
df_equals(modin_result, pandas_result)
assert modin_result.get_backend() == "Big_Data_Cloud"


class TestSwitchBackendPreOp:
@pytest.mark.parametrize(
Expand Down Expand Up @@ -1386,6 +1491,125 @@ def test_concat_with_pin(pin_backends, expected_backend):
)


@pytest.mark.parametrize(
"groupby_operation",
[
param(
lambda df: df.groupby("col0"),
id="DataFrameGroupBy",
),
param(
lambda df: df.groupby("col0")["col1"],
id="SeriesGroupBy",
),
],
)
def test_pin_groupby_in_place(groupby_operation):
"""Test that groupby objects can be pinned with inplace=True."""
modin_df = pd.DataFrame(
{
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
}
)

groupby_object = groupby_operation(modin_df)
assert not groupby_object.is_backend_pinned()

groupby_object.pin_backend(inplace=True)
assert groupby_object.is_backend_pinned()

groupby_object.unpin_backend(inplace=True)
assert not groupby_object.is_backend_pinned()


@pytest.mark.parametrize(
"groupby_operation",
[
param(
lambda df: df.groupby("col0"),
id="DataFrameGroupBy",
),
param(
lambda df: df.groupby("col0")["col1"],
id="SeriesGroupBy",
),
],
)
@pytest.mark.parametrize("method", ["pin_backend", "unpin_backend"])
@pytest.mark.xfail(
strict=True, raises=NotImplementedError, reason="Only inplace=True is supported"
)
def test_pin_or_unpin_groupby_not_in_place(groupby_operation, method):
"""Test that groupby pinning and unpinning with inplace=False."""
modin_df = pd.DataFrame(
{
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
}
)

groupby_obj = groupby_operation(modin_df)

getattr(groupby_obj, method)(inplace=False)


@pytest.mark.parametrize(
"data_type,data_factory,groupby_factory",
[
param(
"DataFrame",
lambda: pd.DataFrame(
{
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
}
),
lambda obj: obj.groupby("col0"),
id="DataFrame",
),
param(
"Series",
lambda: pd.Series(list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)), name="data"),
lambda obj: obj.groupby([0] * (BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
id="Series",
),
],
)
def test_groupby_pinning_reflects_parent_object_pin_status(
data_type, data_factory, groupby_factory
):
"""Test that groupby pinning inherits from parent object (DataFrame/Series) pin status but can be modified independently."""
modin_obj = data_factory()

old_groupby_obj = groupby_factory(modin_obj)

# Initially not pinned
assert not old_groupby_obj.is_backend_pinned()
assert not modin_obj.is_backend_pinned()

# Pin the parent object - new groupby objects should inherit this
modin_obj.pin_backend(inplace=True)

# Create a new groupby object after pinning parent object
new_groupby_obj = groupby_factory(modin_obj)

# New groupby should inherit the pinned status
assert new_groupby_obj.is_backend_pinned()
assert modin_obj.is_backend_pinned()

# But we can still modify groupby pinning independently
new_groupby_obj.unpin_backend(inplace=True)

# Parent object should remain pinned, groupby should be unpinned
assert not new_groupby_obj.is_backend_pinned()
assert modin_obj.is_backend_pinned()

assert not old_groupby_obj.is_backend_pinned()
old_groupby_obj.pin_backend(inplace=True)
assert old_groupby_obj.is_backend_pinned()


def test_second_init_only_calls_from_pandas_once_github_issue_7559():
with config_context(Backend="Big_Data_Cloud"):
# Create a dataframe once first so that we can initialize the dummy
Expand Down
Loading