Skip to content

Commit e4083eb

Browse files
FEAT-#7607: Support pinning groupby objects in place. (#7608)
Resolves #7607 --------- Signed-off-by: sfc-gh-mvashishtha <[email protected]>
1 parent 5b2fadf commit e4083eb

File tree

4 files changed

+248
-3
lines changed

4 files changed

+248
-3
lines changed

modin/pandas/dataframe.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ def groupby(
614614
drop=drop,
615615
dropna=dropna,
616616
return_tuple_when_iterating=return_tuple_when_iterating,
617+
backend_pinned=self.is_backend_pinned(),
617618
)
618619

619620
def keys(self) -> pandas.Index: # noqa: RT01, D200

modin/pandas/groupby.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def __init__(
126126
group_keys,
127127
idx_name,
128128
drop,
129+
backend_pinned: bool,
129130
**kwargs,
130131
):
131132
self._axis = axis
@@ -141,6 +142,8 @@ def __init__(
141142
self._return_tuple_when_iterating = kwargs.pop(
142143
"return_tuple_when_iterating", False
143144
)
145+
# Whether the backend of this groupby object has been pinned.
146+
self._backend_pinned = backend_pinned
144147

145148
if (
146149
level is None
@@ -199,15 +202,26 @@ def set_backend(
199202

200203
@_inherit_docstrings(QueryCompilerCaster.is_backend_pinned)
201204
def is_backend_pinned(self) -> bool:
202-
return False
205+
return self._backend_pinned
203206

204207
@_inherit_docstrings(QueryCompilerCaster._set_backend_pinned)
205208
def _set_backend_pinned(self, pinned: bool, inplace: bool) -> Optional[Self]:
206-
ErrorMessage.not_implemented()
209+
if not inplace:
210+
ErrorMessage.not_implemented(
211+
"Only inplace=True is supported for groupby pinning"
212+
)
213+
214+
self._backend_pinned = pinned
215+
return None
207216

208217
@_inherit_docstrings(QueryCompilerCaster.pin_backend)
209218
def pin_backend(self, inplace: bool = False) -> Optional[Self]:
210-
ErrorMessage.not_implemented()
219+
if not inplace:
220+
ErrorMessage.not_implemented(
221+
"Only inplace=True is supported for groupby pinning"
222+
)
223+
224+
return self._set_backend_pinned(True, inplace=True)
211225

212226
@disable_logging
213227
@_inherit_docstrings(QueryCompilerCaster._get_query_compiler)
@@ -237,6 +251,7 @@ def _override(self, **kwargs):
237251
axis=self._axis,
238252
idx_name=self._idx_name,
239253
drop=self._drop,
254+
backend_pinned=self._backend_pinned,
240255
**self._kwargs,
241256
)
242257
new_kw.update(kwargs)
@@ -925,6 +940,7 @@ def __getitem__(self, key):
925940
return DataFrameGroupBy(
926941
self._df[key],
927942
drop=self._drop,
943+
backend_pinned=self._backend_pinned,
928944
**kwargs,
929945
)
930946
if (
@@ -939,6 +955,7 @@ def __getitem__(self, key):
939955
return SeriesGroupBy(
940956
self._df[key],
941957
drop=False,
958+
backend_pinned=self._backend_pinned,
942959
**kwargs,
943960
)
944961

@@ -1223,6 +1240,7 @@ def size(self):
12231240
0,
12241241
drop=self._drop,
12251242
idx_name=self._idx_name,
1243+
backend_pinned=self._backend_pinned,
12261244
**self._kwargs,
12271245
).size()
12281246
result = self._wrap_aggregation(
@@ -1422,6 +1440,7 @@ def fillna(
14221440
axis=self._axis,
14231441
idx_name=self._idx_name,
14241442
drop=self._drop,
1443+
backend_pinned=self._backend_pinned,
14251444
**new_groupby_kwargs,
14261445
)
14271446
return work_object._wrap_aggregation(

modin/pandas/series.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,6 +1280,7 @@ def groupby(
12801280
observed=observed,
12811281
drop=False,
12821282
dropna=dropna,
1283+
backend_pinned=self.is_backend_pinned(),
12831284
)
12841285

12851286
def gt(

modin/tests/pandas/native_df_interoperability/test_compiler_caster.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,111 @@ def test_auto_switch_config_can_disable_groupby_agg_auto_switch(
977977
)
978978
assert modin_groupby.get_backend() == "Big_Data_Cloud"
979979

980+
@pytest.mark.parametrize(
981+
"groupby_class,groupby_operation,agg_operation",
982+
[
983+
param(
984+
"DataFrameGroupBy",
985+
lambda df: df.groupby("col0"),
986+
lambda groupby: groupby.sum(),
987+
id="DataFrameGroupBy",
988+
),
989+
param(
990+
"SeriesGroupBy",
991+
lambda df: df.groupby("col0")["col1"],
992+
lambda groupby: groupby.sum(),
993+
id="SeriesGroupBy",
994+
),
995+
],
996+
)
997+
@backend_test_context(
998+
test_backend="Big_Data_Cloud",
999+
choices=("Big_Data_Cloud", "Small_Data_Local"),
1000+
)
1001+
def test_pinned_dataframe_prevents_groupby_backend_switch(
1002+
self, groupby_class, groupby_operation, agg_operation
1003+
):
1004+
"""Test that pinning a DataFrame prevents groupby operations from switching backends."""
1005+
modin_df, pandas_df = create_test_dfs(
1006+
{
1007+
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
1008+
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
1009+
}
1010+
)
1011+
1012+
assert modin_df.get_backend() == "Big_Data_Cloud"
1013+
1014+
# Pin the DataFrame
1015+
modin_df.pin_backend(inplace=True)
1016+
assert modin_df.is_backend_pinned()
1017+
1018+
# Create groupby object - should inherit pin status from dataframe
1019+
modin_groupby = groupby_operation(modin_df)
1020+
pandas_groupby = groupby_operation(pandas_df)
1021+
assert modin_groupby.is_backend_pinned() # Inherited from DataFrame
1022+
1023+
# Register a post-op switch that would normally move to Small_Data_Local
1024+
register_function_for_post_op_switch(
1025+
class_name=groupby_class, backend="Big_Data_Cloud", method="sum"
1026+
)
1027+
1028+
# The operation should stay on Big_Data_Cloud due to inherited pinning
1029+
modin_result = agg_operation(modin_groupby)
1030+
pandas_result = agg_operation(pandas_groupby)
1031+
df_equals(modin_result, pandas_result)
1032+
assert modin_result.get_backend() == "Big_Data_Cloud"
1033+
1034+
@pytest.mark.parametrize(
1035+
"groupby_class,groupby_operation,agg_operation",
1036+
[
1037+
param(
1038+
"DataFrameGroupBy",
1039+
lambda df: df.groupby("col0"),
1040+
lambda groupby: groupby.sum(),
1041+
id="DataFrameGroupBy",
1042+
),
1043+
param(
1044+
"SeriesGroupBy",
1045+
lambda df: df.groupby("col0")["col1"],
1046+
lambda groupby: groupby.sum(),
1047+
id="SeriesGroupBy",
1048+
),
1049+
],
1050+
)
1051+
@backend_test_context(
1052+
test_backend="Big_Data_Cloud",
1053+
choices=("Big_Data_Cloud", "Small_Data_Local"),
1054+
)
1055+
def test_pinned_groupby_prevents_backend_switch(
1056+
self, groupby_class, groupby_operation, agg_operation
1057+
):
1058+
"""Test that pinning a GroupBy object prevents operations from switching backends."""
1059+
modin_df, pandas_df = create_test_dfs(
1060+
{
1061+
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
1062+
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
1063+
}
1064+
)
1065+
1066+
assert modin_df.get_backend() == "Big_Data_Cloud"
1067+
1068+
# Create groupby object and pin it directly
1069+
modin_groupby = groupby_operation(modin_df)
1070+
pandas_groupby = groupby_operation(pandas_df)
1071+
modin_groupby.pin_backend(inplace=True)
1072+
assert modin_groupby.is_backend_pinned()
1073+
1074+
# Register a post-op switch that would normally move to Small_Data_Local
1075+
register_function_for_post_op_switch(
1076+
class_name=groupby_class, backend="Big_Data_Cloud", method="sum"
1077+
)
1078+
1079+
# The operation should stay on Big_Data_Cloud due to pinning
1080+
modin_result = agg_operation(modin_groupby)
1081+
pandas_result = agg_operation(pandas_groupby)
1082+
df_equals(modin_result, pandas_result)
1083+
assert modin_result.get_backend() == "Big_Data_Cloud"
1084+
9801085

9811086
class TestSwitchBackendPreOp:
9821087
@pytest.mark.parametrize(
@@ -1386,6 +1491,125 @@ def test_concat_with_pin(pin_backends, expected_backend):
13861491
)
13871492

13881493

1494+
@pytest.mark.parametrize(
1495+
"groupby_operation",
1496+
[
1497+
param(
1498+
lambda df: df.groupby("col0"),
1499+
id="DataFrameGroupBy",
1500+
),
1501+
param(
1502+
lambda df: df.groupby("col0")["col1"],
1503+
id="SeriesGroupBy",
1504+
),
1505+
],
1506+
)
1507+
def test_pin_groupby_in_place(groupby_operation):
1508+
"""Test that groupby objects can be pinned with inplace=True."""
1509+
modin_df = pd.DataFrame(
1510+
{
1511+
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
1512+
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
1513+
}
1514+
)
1515+
1516+
groupby_object = groupby_operation(modin_df)
1517+
assert not groupby_object.is_backend_pinned()
1518+
1519+
groupby_object.pin_backend(inplace=True)
1520+
assert groupby_object.is_backend_pinned()
1521+
1522+
groupby_object.unpin_backend(inplace=True)
1523+
assert not groupby_object.is_backend_pinned()
1524+
1525+
1526+
@pytest.mark.parametrize(
1527+
"groupby_operation",
1528+
[
1529+
param(
1530+
lambda df: df.groupby("col0"),
1531+
id="DataFrameGroupBy",
1532+
),
1533+
param(
1534+
lambda df: df.groupby("col0")["col1"],
1535+
id="SeriesGroupBy",
1536+
),
1537+
],
1538+
)
1539+
@pytest.mark.parametrize("method", ["pin_backend", "unpin_backend"])
1540+
@pytest.mark.xfail(
1541+
strict=True, raises=NotImplementedError, reason="Only inplace=True is supported"
1542+
)
1543+
def test_pin_or_unpin_groupby_not_in_place(groupby_operation, method):
1544+
"""Test that groupby pinning and unpinning with inplace=False."""
1545+
modin_df = pd.DataFrame(
1546+
{
1547+
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
1548+
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
1549+
}
1550+
)
1551+
1552+
groupby_obj = groupby_operation(modin_df)
1553+
1554+
getattr(groupby_obj, method)(inplace=False)
1555+
1556+
1557+
@pytest.mark.parametrize(
1558+
"data_type,data_factory,groupby_factory",
1559+
[
1560+
param(
1561+
"DataFrame",
1562+
lambda: pd.DataFrame(
1563+
{
1564+
"col0": list(range(BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
1565+
"col1": list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)),
1566+
}
1567+
),
1568+
lambda obj: obj.groupby("col0"),
1569+
id="DataFrame",
1570+
),
1571+
param(
1572+
"Series",
1573+
lambda: pd.Series(list(range(1, BIG_DATA_CLOUD_MIN_NUM_ROWS)), name="data"),
1574+
lambda obj: obj.groupby([0] * (BIG_DATA_CLOUD_MIN_NUM_ROWS - 1)),
1575+
id="Series",
1576+
),
1577+
],
1578+
)
1579+
def test_groupby_pinning_reflects_parent_object_pin_status(
1580+
data_type, data_factory, groupby_factory
1581+
):
1582+
"""Test that groupby pinning inherits from parent object (DataFrame/Series) pin status but can be modified independently."""
1583+
modin_obj = data_factory()
1584+
1585+
old_groupby_obj = groupby_factory(modin_obj)
1586+
1587+
# Initially not pinned
1588+
assert not old_groupby_obj.is_backend_pinned()
1589+
assert not modin_obj.is_backend_pinned()
1590+
1591+
# Pin the parent object - new groupby objects should inherit this
1592+
modin_obj.pin_backend(inplace=True)
1593+
1594+
# Create a new groupby object after pinning parent object
1595+
new_groupby_obj = groupby_factory(modin_obj)
1596+
1597+
# New groupby should inherit the pinned status
1598+
assert new_groupby_obj.is_backend_pinned()
1599+
assert modin_obj.is_backend_pinned()
1600+
1601+
# But we can still modify groupby pinning independently
1602+
new_groupby_obj.unpin_backend(inplace=True)
1603+
1604+
# Parent object should remain pinned, groupby should be unpinned
1605+
assert not new_groupby_obj.is_backend_pinned()
1606+
assert modin_obj.is_backend_pinned()
1607+
1608+
assert not old_groupby_obj.is_backend_pinned()
1609+
old_groupby_obj.pin_backend(inplace=True)
1610+
assert old_groupby_obj.is_backend_pinned()
1611+
1612+
13891613
def test_second_init_only_calls_from_pandas_once_github_issue_7559():
13901614
with config_context(Backend="Big_Data_Cloud"):
13911615
# Create a dataframe once first so that we can initialize the dummy

0 commit comments

Comments
 (0)