Source code for xorbits._mars.dataframe.window.expanding.core
# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from ....serialization.serializables import Int32Field, Int64Field, StringField
from ....utils import pd_release_version
from ...utils import validate_axis
from ..core import Window
_window_has_method = pd_release_version >= (1, 3, 0)
class Expanding(Window):
_min_periods = Int64Field("min_periods")
_axis = Int32Field("axis")
_method = StringField("method")
def __init__(self, min_periods=None, axis=None, method=None, **kw):
super().__init__(_min_periods=min_periods, _axis=axis, _method=method, **kw)
@property
def min_periods(self):
return self._min_periods
@property
def axis(self):
return self._axis
@property
def center(self):
return self._center
@property
def method(self):
return self._method or "single"
def __call__(self, df):
return df.expanding(**self.params)
@property
def params(self):
p = OrderedDict()
if not _window_has_method: # pragma: no cover
args = ["min_periods", "axis"]
else:
args = ["min_periods", "axis", "method"]
for k in args:
p[k] = getattr(self, k)
return p
[docs] def aggregate(self, func, **kwargs):
from .aggregation import DataFrameExpandingAgg
count_always_valid = kwargs.pop("_count_always_valid", False)
op = DataFrameExpandingAgg(
func=func, count_always_valid=count_always_valid, **self.params
)
return op(self)
agg = aggregate
[docs] def sum(self):
return self.aggregate("sum")
[docs] def count(self):
return self.aggregate("count")
[docs] def min(self):
return self.aggregate("min")
[docs] def max(self):
return self.aggregate("max")
[docs] def mean(self):
return self.aggregate("mean")
[docs] def var(self):
return self.aggregate("var")
[docs] def std(self):
return self.aggregate("std")
def expanding(obj, min_periods=1, axis=0):
"""
Provide expanding transformations.
Parameters
----------
min_periods : int, default 1
Minimum number of observations in window required to have a value
(otherwise result is NA).
Set the labels at the center of the window.
axis : int or str, default 0
Returns
-------
a Window sub-classed for the particular operation
See Also
--------
rolling : Provides rolling window calculations.
ewm : Provides exponential weighted functions.
Notes
-----
By default, the result is set to the right edge of the window. This can be
changed to the center of the window by setting ``center=True``.
Examples
--------
>>> import numpy as np
>>> import mars.dataframe as md
>>> df = md.DataFrame({'B': [0, 1, 2, np.nan, 4]})
>>> df.execute()
B
0 0.0
1 1.0
2 2.0
3 NaN
4 4.0
>>> df.expanding(2).sum().execute()
B
0 NaN
1 1.0
2 3.0
3 3.0
4 7.0
"""
axis = validate_axis(axis, obj)
if axis == 1:
raise NotImplementedError("axis other than 0 is not supported")
return Expanding(input=obj, min_periods=min_periods, axis=axis)