Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 130 additions & 40 deletions bigframes/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
from __future__ import annotations

import datetime
import typing
from typing import Literal, Mapping, Sequence, TYPE_CHECKING, Union

import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling
import numpy
import pandas

from bigframes import dtypes
from bigframes.core import agg_expressions
from bigframes.core import expression as ex
from bigframes.core import log_adapter, ordering, window_spec
from bigframes.core import log_adapter, ordering, utils, window_spec
import bigframes.core.blocks as blocks
from bigframes.core.window import ordering as window_ordering
import bigframes.operations.aggregations as agg_ops

if TYPE_CHECKING:
import bigframes.dataframe as df
import bigframes.series as series


@log_adapter.class_logger
class Window(vendored_pandas_rolling.Window):
Expand All @@ -37,7 +42,7 @@ def __init__(
self,
block: blocks.Block,
window_spec: window_spec.WindowSpec,
value_column_ids: typing.Sequence[str],
value_column_ids: Sequence[str],
drop_null_groups: bool = True,
is_series: bool = False,
skip_agg_column_id: str | None = None,
Expand All @@ -52,55 +57,106 @@ def __init__(
self._skip_agg_column_id = skip_agg_column_id

def count(self):
return self._apply_aggregate(agg_ops.count_op)
return self._apply_aggregate_op(agg_ops.count_op)

def sum(self):
return self._apply_aggregate(agg_ops.sum_op)
return self._apply_aggregate_op(agg_ops.sum_op)

def mean(self):
return self._apply_aggregate(agg_ops.mean_op)
return self._apply_aggregate_op(agg_ops.mean_op)

def var(self):
return self._apply_aggregate(agg_ops.var_op)
return self._apply_aggregate_op(agg_ops.var_op)

def std(self):
return self._apply_aggregate(agg_ops.std_op)
return self._apply_aggregate_op(agg_ops.std_op)

def max(self):
return self._apply_aggregate(agg_ops.max_op)
return self._apply_aggregate_op(agg_ops.max_op)

def min(self):
return self._apply_aggregate(agg_ops.min_op)
return self._apply_aggregate_op(agg_ops.min_op)

def _apply_aggregate(
self,
op: agg_ops.UnaryAggregateOp,
):
agg_block = self._aggregate_block(op)
def agg(self, func) -> Union[df.DataFrame, series.Series]:
if utils.is_dict_like(func):
return self._agg_dict(func)
elif utils.is_list_like(func):
return self._agg_list(func)
else:
return self._agg_func(func)

if self._is_series:
from bigframes.series import Series
aggregate = agg

def _agg_func(self, func) -> df.DataFrame:
ids, labels = self._aggregated_columns()
aggregations = [agg(col_id, agg_ops.lookup_agg_func(func)[0]) for col_id in ids]
return self._apply_aggs(aggregations, labels)

def _agg_dict(self, func: Mapping) -> df.DataFrame:
aggregations: list[agg_expressions.Aggregation] = []
column_labels = []
function_labels = []

return Series(agg_block)
want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values())

for label, funcs_for_id in func.items():
col_id = self._block.label_to_col_id[label][-1] # get last matching column
func_list = (
funcs_for_id if utils.is_list_like(funcs_for_id) else [funcs_for_id]
)
for f in func_list:
f_op, f_label = agg_ops.lookup_agg_func(f)
aggregations.append(agg(col_id, f_op))
column_labels.append(label)
function_labels.append(f_label)
if want_aggfunc_level:
result_labels: pandas.Index = utils.combine_indices(
pandas.Index(column_labels),
pandas.Index(function_labels),
)
else:
from bigframes.dataframe import DataFrame
result_labels = pandas.Index(column_labels)

# Preserve column order.
column_labels = [
self._block.col_id_to_label[col_id] for col_id in self._value_column_ids
]
return DataFrame(agg_block)._reindex_columns(column_labels)
return self._apply_aggs(aggregations, result_labels)

def _aggregate_block(self, op: agg_ops.UnaryAggregateOp) -> blocks.Block:
agg_col_ids = [
col_id
for col_id in self._value_column_ids
if col_id != self._skip_agg_column_id
def _agg_list(self, func: Sequence) -> df.DataFrame:
ids, labels = self._aggregated_columns()
aggregations = [
agg(col_id, agg_ops.lookup_agg_func(f)[0]) for col_id in ids for f in func
]
block, result_ids = self._block.multi_apply_window_op(
agg_col_ids,
op,
self._window_spec,

if self._is_series:
# if series, no need to rebuild
result_cols_idx = pandas.Index(
[agg_ops.lookup_agg_func(f)[1] for f in func]
)
else:
if self._block.column_labels.nlevels > 1:
# Restructure MultiIndex for proper format: (idx1, idx2, func)
# rather than ((idx1, idx2), func).
column_labels = [
tuple(label) + (agg_ops.lookup_agg_func(f)[1],)
for label in labels.to_frame(index=False).to_numpy()
for f in func
]
else: # Single-level index
column_labels = [
(label, agg_ops.lookup_agg_func(f)[1])
for label in labels
for f in func
]
result_cols_idx = pandas.MultiIndex.from_tuples(
column_labels, names=[*self._block.column_labels.names, None]
)
return self._apply_aggs(aggregations, result_cols_idx)

def _apply_aggs(
self, exprs: Sequence[agg_expressions.Aggregation], labels: pandas.Index
):
block, ids = self._block.apply_analytic(
agg_exprs=exprs,
window=self._window_spec,
result_labels=labels,
skip_null_groups=self._drop_null_groups,
)

Expand All @@ -115,24 +171,50 @@ def _aggregate_block(self, op: agg_ops.UnaryAggregateOp) -> blocks.Block:
)
block = block.set_index(col_ids=index_ids)

labels = [self._block.col_id_to_label[col] for col in agg_col_ids]
if self._skip_agg_column_id is not None:
result_ids = [self._skip_agg_column_id, *result_ids]
labels.insert(0, self._block.col_id_to_label[self._skip_agg_column_id])
block = block.select_columns([self._skip_agg_column_id, *ids])
else:
block = block.select_columns(ids).with_column_labels(labels)

if self._is_series and (len(block.value_columns) == 1):
import bigframes.series as series

return series.Series(block)
else:
import bigframes.dataframe as df

return df.DataFrame(block)

def _apply_aggregate_op(
self,
op: agg_ops.UnaryAggregateOp,
):
ids, labels = self._aggregated_columns()
aggregations = [agg(col_id, op) for col_id in ids]
return self._apply_aggs(aggregations, labels)

return block.select_columns(result_ids).with_column_labels(labels)
def _aggregated_columns(self) -> tuple[Sequence[str], pandas.Index]:
agg_col_ids = [
col_id
for col_id in self._value_column_ids
if col_id != self._skip_agg_column_id
]
labels: pandas.Index = pandas.Index(
[self._block.col_id_to_label[col] for col in agg_col_ids]
)
return agg_col_ids, labels


def create_range_window(
block: blocks.Block,
window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
*,
value_column_ids: typing.Sequence[str] = tuple(),
value_column_ids: Sequence[str] = tuple(),
min_periods: int | None,
on: str | None = None,
closed: typing.Literal["right", "left", "both", "neither"],
closed: Literal["right", "left", "both", "neither"],
is_series: bool,
grouping_keys: typing.Sequence[str] = tuple(),
grouping_keys: Sequence[str] = tuple(),
drop_null_groups: bool = True,
) -> Window:

Expand Down Expand Up @@ -184,3 +266,11 @@ def create_range_window(
skip_agg_column_id=None if on is None else rolling_key_col_id,
drop_null_groups=drop_null_groups,
)


def agg(input: str, op: agg_ops.AggregateOp) -> agg_expressions.Aggregation:
if isinstance(op, agg_ops.UnaryAggregateOp):
return agg_expressions.UnaryAggregation(op, ex.deref(input))
else:
assert isinstance(op, agg_ops.NullaryAggregateOp)
return agg_expressions.NullaryAggregation(op)
69 changes: 69 additions & 0 deletions tests/system/small/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,75 @@ def test_dataframe_window_agg_ops(scalars_dfs, windowing, agg_op):
pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False)


@pytest.mark.parametrize(
("windowing"),
[
pytest.param(lambda x: x.expanding(), id="expanding"),
pytest.param(lambda x: x.rolling(3, min_periods=3), id="rolling"),
pytest.param(
lambda x: x.groupby(level=0).rolling(3, min_periods=3), id="rollinggroupby"
),
pytest.param(
lambda x: x.groupby("int64_too").expanding(min_periods=2),
id="expandinggroupby",
),
],
)
@pytest.mark.parametrize(
("func"),
[
pytest.param("sum", id="sum_by_name"),
pytest.param(np.sum, id="sum_by_by_np"),
pytest.param([np.sum, np.mean], id="list_of_funcs"),
pytest.param(
{"int64_col": np.sum, "float64_col": "mean"}, id="dict_of_single_funcs"
),
pytest.param(
{"int64_col": np.sum, "float64_col": ["mean", np.max]},
id="dict_of_lists_and_single_funcs",
),
],
)
def test_dataframe_window_agg_func(scalars_dfs, windowing, func):
bf_df, pd_df = scalars_dfs
target_columns = ["int64_too", "float64_col", "bool_col", "int64_col"]
index_column = "bool_col"
bf_df = bf_df[target_columns].set_index(index_column)
pd_df = pd_df[target_columns].set_index(index_column)

bf_result = windowing(bf_df).agg(func).to_pandas()

pd_result = windowing(pd_df).agg(func)

pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False)


def test_series_window_agg_single_func(scalars_dfs):
bf_df, pd_df = scalars_dfs
index_column = "bool_col"
bf_series = bf_df.set_index(index_column).int64_too
pd_series = pd_df.set_index(index_column).int64_too

bf_result = bf_series.expanding().agg("sum").to_pandas()

pd_result = pd_series.expanding().agg("sum")

pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


def test_series_window_agg_multi_func(scalars_dfs):
bf_df, pd_df = scalars_dfs
index_column = "bool_col"
bf_series = bf_df.set_index(index_column).int64_too
pd_series = pd_df.set_index(index_column).int64_too

bf_result = bf_series.expanding().agg(["sum", np.mean]).to_pandas()

pd_result = pd_series.expanding().agg(["sum", np.mean])

pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False)


@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"])
@pytest.mark.parametrize(
"window", # skipped numpy timedelta because Pandas does not support it.
Expand Down
49 changes: 49 additions & 0 deletions third_party/bigframes_vendored/pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,52 @@ def max(self):
def min(self):
"""Calculate the weighted window minimum."""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def agg(self, func):
"""
Aggregate using one or more operations over the specified axis.

**Examples:**

>>> import bigframes.pandas as bpd

>>> df = bpd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
>>> df
A B C
0 1 4 7
1 2 5 8
2 3 6 9
<BLANKLINE>
[3 rows x 3 columns]

>>> df.rolling(2).sum()
A B C
0 <NA> <NA> <NA>
1 3 9 15
2 5 11 17
<BLANKLINE>
[3 rows x 3 columns]

>>> df.rolling(2).agg({"A": "sum", "B": "min"})
A B
0 <NA> <NA>
1 3 4
2 5 5
<BLANKLINE>
[3 rows x 2 columns]

Args:
func (function, str, list or dict):
Function to use for aggregating the data.

Accepted combinations are:

- string function name
- list of function names, e.g. ``['sum', 'mean']``
- dict of axis labels -> function names or list of such.

Returns:
Series or DataFrame

"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)