# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from ....serialization.serializables import (
AnyField,
BoolField,
Int32Field,
Int64Field,
StringField,
)
from ....utils import pd_release_version
from ...core import DATAFRAME_TYPE
from ...utils import build_empty_df, build_empty_series, is_pandas_2, validate_axis
from ..core import Window
_window_has_method = pd_release_version >= (1, 3, 0)
_PAIRWISE_AGG = (
"corr",
"cov",
)
class Rolling(Window):
_window = AnyField("window")
_min_periods = Int64Field("min_periods")
_center = BoolField("center")
_win_type = StringField("win_type")
_on = StringField("on")
_axis = Int32Field("axis")
_closed = StringField("closed")
_method = StringField("method")
def __init__(
self,
window=None,
min_periods=None,
center=None,
win_type=None,
on=None,
axis=None,
closed=None,
method=None,
**kw
):
super().__init__(
_window=window,
_min_periods=min_periods,
_center=center,
_win_type=win_type,
_on=on,
_axis=axis,
_closed=closed,
_method=method,
**kw
)
@property
def window(self):
return self._window
@property
def min_periods(self):
return self._min_periods
@property
def center(self):
return self._center
@property
def win_type(self):
return self._win_type
@property
def on(self):
return self._on
@property
def axis(self):
return self._axis
@property
def closed(self):
return self._closed
@property
def method(self):
return self._method or "single"
@property
def params(self):
p = OrderedDict()
if not _window_has_method: # pragma: no cover
args = [
"window",
"min_periods",
"center",
"win_type",
"axis",
"on",
"closed",
]
else:
args = [
"window",
"min_periods",
"center",
"win_type",
"axis",
"on",
"closed",
"method",
]
for attr in args:
p[attr] = getattr(self, attr)
return p
def _repr_name(self):
return "Rolling" if self.win_type is None else "Window"
def validate(self):
# leverage pandas itself to do validation
pd_index = self._input.index_value.to_pandas()
if isinstance(self._input, DATAFRAME_TYPE):
empty_obj = build_empty_df(self._input.dtypes, index=pd_index[:0])
else:
empty_obj = build_empty_series(
self._input.dtype, index=pd_index[:0], name=self._input.name
)
pd_rolling = empty_obj.rolling(**self.params)
for k in self.params:
# update value according to pandas rolling
setattr(self, "_" + k, getattr(pd_rolling, k))
if is_pandas_2() and pd_rolling._win_freq_i8 is not None:
setattr(self, "_win_type", "freq")
[docs] def aggregate(self, func, *args, **kwargs):
from .aggregation import DataFrameRollingAgg
params = self.params
if func in _PAIRWISE_AGG:
# for convenience, since pairwise aggregations are axis irrelevant.
params["axis"] = 0
op = DataFrameRollingAgg(
func=func, func_args=args, func_kwargs=kwargs, **params
)
return op(self)
def agg(self, func, *args, **kwargs):
return self.aggregate(func, *args, **kwargs)
[docs] def count(self, *args, **kwargs):
return self.aggregate("count", *args, **kwargs)
[docs] def sum(self, *args, **kwargs):
return self.aggregate("sum", *args, **kwargs)
[docs] def mean(self, *args, **kwargs):
return self.aggregate("mean", *args, **kwargs)
[docs] def var(self, ddof=1, *args, **kwargs):
return self.aggregate("var", ddof=ddof, *args, **kwargs)
[docs] def std(self, ddof=1, *args, **kwargs):
return self.aggregate("std", ddof=ddof, *args, **kwargs)
[docs] def min(self, *args, **kwargs):
return self.aggregate("min", *args, **kwargs)
[docs] def max(self, *args, **kwargs):
return self.aggregate("max", *args, **kwargs)
[docs] def skew(self, *args, **kwargs):
return self.aggregate("skew", *args, **kwargs)
[docs] def kurt(self, *args, **kwargs):
return self.aggregate("kurt", *args, **kwargs)
[docs] def corr(self, **kwargs):
# not taking positional args since the tiling depends on "pairwise"
return self.aggregate("corr", **kwargs)
[docs] def cov(self, **kwargs):
# not taking positional args since the tiling depends on "pairwise"
return self.aggregate("cov", **kwargs)
def rolling(
obj,
window,
min_periods=None,
center=False,
win_type=None,
on=None,
axis=0,
closed=None,
):
"""
Provide rolling window calculations.
Parameters
----------
window : int, or offset
Size of the moving window. This is the number of observations used for
calculating the statistic. Each window will be a fixed size.
If its an offset then this will be the time period of each window. Each
window will be a variable sized based on the observations included in
the time-period. This is only valid for datetimelike indexes. This is
new in 0.19.0
min_periods : int, default None
Minimum number of observations in window required to have a value
(otherwise result is NA). For a window that is specified by an offset,
`min_periods` will default to 1. Otherwise, `min_periods` will default
to the size of the window.
center : bool, default False
Set the labels at the center of the window.
win_type : str, default None
Provide a window type. If ``None``, all points are evenly weighted.
See the notes below for further information.
on : str, optional
For a DataFrame, a datetime-like column on which to calculate the rolling
window, rather than the DataFrame's index. Provided integer column is
ignored and excluded from result since an integer index is not used to
calculate the rolling window.
axis : int or str, default 0
closed : str, default None
Make the interval closed on the 'right', 'left', 'both' or
'neither' endpoints.
For offset-based windows, it defaults to 'right'.
For fixed windows, defaults to 'both'. Remaining cases not implemented
for fixed windows.
Returns
-------
a Window or Rolling sub-classed for the particular operation
See Also
--------
expanding : Provides expanding transformations.
ewm : Provides exponential weighted functions.
Notes
-----
By default, the result is set to the right edge of the window. This can be
changed to the center of the window by setting ``center=True``.
To learn more about the offsets & frequency strings, please see `this link
<http://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`__.
The recognized win_types are:
* ``boxcar``
* ``triang``
* ``blackman``
* ``hamming``
* ``bartlett``
* ``parzen``
* ``bohman``
* ``blackmanharris``
* ``nuttall``
* ``barthann``
* ``kaiser`` (needs beta)
* ``gaussian`` (needs std)
* ``general_gaussian`` (needs power, width)
* ``slepian`` (needs width)
* ``exponential`` (needs tau), center is set to None.
If ``win_type=None`` all points are evenly weighted. To learn more about
different window types see `scipy.signal window functions
<https://docs.scipy.org/doc/scipy/reference/signal.html#window-functions>`__.
Examples
--------
>>> import numpy as np
>>> import mars.dataframe as md
>>> df = md.DataFrame({'B': [0, 1, 2, np.nan, 4]})
>>> df.execute()
B
0 0.0
1 1.0
2 2.0
3 NaN
4 4.0
Rolling sum with a window length of 2, using the 'triang'
window type.
>>> df.rolling(2, win_type='triang').sum().execute()
B
0 NaN
1 0.5
2 1.5
3 NaN
4 NaN
Rolling sum with a window length of 2, min_periods defaults
to the window length.
>>> df.rolling(2).sum().execute()
B
0 NaN
1 1.0
2 3.0
3 NaN
4 NaN
Same as above, but explicitly set the min_periods
>>> df.rolling(2, min_periods=1).sum().execute()
B
0 0.0
1 1.0
2 3.0
3 2.0
4 4.0
A ragged (meaning not-a-regular frequency), time-indexed DataFrame
>>> df = md.DataFrame({'B': [0, 1, 2, np.nan, 4]},
>>> index = [md.Timestamp('20130101 09:00:00'),
>>> md.Timestamp('20130101 09:00:02'),
>>> md.Timestamp('20130101 09:00:03'),
>>> md.Timestamp('20130101 09:00:05'),
>>> md.Timestamp('20130101 09:00:06')])
>>> df.execute()
B
2013-01-01 09:00:00 0.0
2013-01-01 09:00:02 1.0
2013-01-01 09:00:03 2.0
2013-01-01 09:00:05 NaN
2013-01-01 09:00:06 4.0
Contrasting to an integer rolling window, this will roll a variable
length window corresponding to the time period.
The default for min_periods is 1.
>>> df.rolling('2s').sum().execute()
B
2013-01-01 09:00:00 0.0
2013-01-01 09:00:02 1.0
2013-01-01 09:00:03 3.0
2013-01-01 09:00:05 NaN
2013-01-01 09:00:06 4.0
"""
axis = validate_axis(axis, obj)
r = Rolling(
input=obj,
window=window,
min_periods=min_periods,
center=center,
win_type=win_type,
on=on,
axis=axis,
closed=closed,
)
r.validate()
return r