# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import partial
from numbers import Integral
import numpy as np
import pandas as pd
from ... import opcodes as OperandDef
from ...core import ENTITY_TYPE, ExecutableTuple, OutputType, recursive_tile
from ...core.context import get_context
from ...serialization.serializables import (
AnyField,
BoolField,
Int32Field,
KeyField,
StringField,
)
from ...tensor import tensor as astensor
from ...tensor.core import TENSOR_TYPE, TensorOrder
from ...utils import has_unknown_shape
from ..core import INDEX_TYPE, SERIES_TYPE
from ..datasource.index import from_pandas as asindex
from ..initializer import Series as asseries
from ..operands import DataFrameOperand, DataFrameOperandMixin
from ..utils import parse_index
class DataFrameCut(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.CUT
_input = KeyField("input")
_bins = AnyField("bins")
_right = BoolField("right")
_labels = AnyField("labels")
_retbins = BoolField("retbins")
_precision = Int32Field("precision")
_include_lowest = BoolField("include_lowest")
_duplicates = StringField("duplicates")
_ordered = BoolField("ordered")
def __init__(
self,
bins=None,
right=None,
labels=None,
retbins=None,
precision=None,
include_lowest=None,
duplicates=None,
ordered=None,
**kw
):
super().__init__(
_bins=bins,
_right=right,
_labels=labels,
_retbins=retbins,
_precision=precision,
_include_lowest=include_lowest,
_duplicates=duplicates,
_ordered=ordered,
**kw
)
@property
def input(self):
return self._input
@property
def bins(self):
return self._bins
@property
def right(self):
return self._right
@property
def labels(self):
return self._labels
@property
def retbins(self):
return self._retbins
@property
def precision(self):
return self._precision
@property
def include_lowest(self):
return self._include_lowest
@property
def duplicates(self):
return self._duplicates
@property
def ordered(self):
return self._ordered
@property
def output_limit(self):
return 1 if not self._retbins else 2
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(self._inputs)
self._input = next(inputs_iter)
if isinstance(self._bins, ENTITY_TYPE):
self._bins = next(inputs_iter)
if isinstance(self._labels, ENTITY_TYPE):
self._labels = next(inputs_iter)
def __call__(self, x):
if isinstance(x, pd.Series):
x = asseries(x)
elif not isinstance(x, ENTITY_TYPE):
x = astensor(x)
if x.ndim != 1:
raise ValueError("Input array must be 1 dimensional")
if x.size == 0:
raise ValueError("Cannot cut empty array")
inputs = [x]
if self._labels is not None and not isinstance(
self._labels, (bool, ENTITY_TYPE)
):
self._labels = np.asarray(self._labels)
# infer dtype
x_empty = (
pd.Series([1], dtype=x.dtype)
if isinstance(x, SERIES_TYPE)
else np.asarray([1], dtype=x.dtype)
)
if isinstance(self._bins, INDEX_TYPE):
bins = self._bins.index_value.to_pandas()
inputs.append(self._bins)
bins_unknown = True
elif isinstance(self._bins, ENTITY_TYPE):
bins = np.asarray([2], dtype=self._bins.dtype)
inputs.append(self._bins)
bins_unknown = True
else:
bins = self._bins
bins_unknown = isinstance(self._bins, Integral)
if isinstance(self._labels, ENTITY_TYPE):
bins_unknown = True
labels = None
inputs.append(self._labels)
else:
if self._labels is False or not bins_unknown:
labels = self._labels
else:
labels = None
ret = pd.cut(
x_empty,
bins,
right=self._right,
labels=labels,
retbins=True,
include_lowest=self._include_lowest,
duplicates=self._duplicates,
)
kws = []
output_types = []
if bins_unknown and isinstance(ret[0].dtype, pd.CategoricalDtype):
# inaccurate dtype, just create an empty one
out_dtype = pd.CategoricalDtype()
else:
out_dtype = ret[0].dtype
if isinstance(ret[0], pd.Series):
output_types.append(OutputType.series)
kws.append(
{
"dtype": out_dtype,
"shape": x.shape,
"index_value": x.index_value,
"name": x.name,
}
)
elif isinstance(ret[0], np.ndarray):
output_types.append(OutputType.tensor)
kws.append(
{"dtype": out_dtype, "shape": x.shape, "order": TensorOrder.C_ORDER}
)
else:
assert isinstance(ret[0], pd.Categorical)
output_types.append(OutputType.categorical)
kws.append(
{
"dtype": out_dtype,
"shape": x.shape,
"categories_value": parse_index(
out_dtype.categories, store_data=True
),
}
)
if self._retbins:
if isinstance(self._bins, (pd.IntervalIndex, INDEX_TYPE)):
output_types.append(OutputType.index)
kws.append(
{
"dtype": self._bins.dtype,
"shape": self._bins.shape,
"index_value": self._bins.index_value
if isinstance(self._bins, INDEX_TYPE)
else parse_index(self._bins, store_data=False),
"name": self._bins.name,
}
)
else:
output_types.append(OutputType.tensor)
kws.append(
{
"dtype": ret[1].dtype,
"shape": ret[1].shape if ret[1].size > 0 else (np.nan,),
"order": TensorOrder.C_ORDER,
}
)
self.output_types = output_types
return ExecutableTuple(self.new_tileables(inputs, kws=kws))
@classmethod
def tile(cls, op):
if isinstance(op.bins, ENTITY_TYPE):
# check op.bins chunk shapes
if has_unknown_shape(op.bins):
yield
bins = yield from recursive_tile(op.bins.rechunk(op.bins.shape))
else:
bins = op.bins
if isinstance(op.labels, ENTITY_TYPE):
# check op.labels chunk shapes
if has_unknown_shape(op.labels):
yield
labels = yield from recursive_tile(op.labels.rechunk(op.labels.shape))
else:
labels = op.labels
if isinstance(op.bins, Integral):
input_min, input_max = yield from recursive_tile(
op.input.min(), op.input.max()
)
input_min_chunk = input_min.chunks[0]
input_max_chunk = input_max.chunks[0]
# let input min and max execute first
min_max_chunks = [input_min_chunk, input_max_chunk]
yield min_max_chunks + [c for inp in op.inputs for c in inp.chunks]
ctx = get_context()
keys = [input_min_chunk.key, input_max_chunk.key]
# get min and max of x
min_val, max_val = ctx.get_chunks_result(keys)
# calculate bins
if np.isinf(min_val) or np.isinf(max_val):
raise ValueError(
"cannot specify integer `bins` when input data contains infinity"
)
elif min_val == max_val: # adjust end points before binning
min_val -= 0.001 * abs(min_val) if min_val != 0 else 0.001
max_val += 0.001 * abs(max_val) if max_val != 0 else 0.001
bins = np.linspace(min_val, max_val, bins + 1, endpoint=True)
else: # adjust end points before binning
bins = np.linspace(min_val, max_val, bins + 1, endpoint=True)
adj = (max_val - min_val) * 0.001 # 0.1% of the range
if op.right:
bins[0] -= adj
else:
bins[-1] += adj
outs = op.outputs
out_chunks = []
for c in op.input.chunks:
chunk_op = op.copy().reset_key()
chunk_inputs = [c]
chunk_op._bins = bins
# do not return bins always for chunk
chunk_op._retbins = False
if isinstance(bins, ENTITY_TYPE):
chunk_inputs.append(bins.chunks[0])
chunk_op._labels = labels
if isinstance(labels, ENTITY_TYPE):
chunk_inputs.append(labels.chunks[0])
chunk_kws = []
if isinstance(outs[0], SERIES_TYPE):
chunk_kws.append(
{
"dtype": outs[0].dtype,
"shape": c.shape,
"index_value": c.index_value,
"name": c.name,
"index": c.index,
}
)
elif isinstance(outs[0], TENSOR_TYPE):
chunk_kws.append(
{
"dtype": outs[0].dtype,
"shape": c.shape,
"order": TensorOrder.C_ORDER,
"index": c.index,
}
)
else:
chunk_kws.append(
{
"dtype": outs[0].dtype,
"shape": c.shape,
"categories_value": outs[0].categories_value,
"index": c.index,
}
)
out_chunks.append(chunk_op.new_chunk(chunk_inputs, kws=chunk_kws))
kws = []
out_kw = outs[0].params
out_kw["chunks"] = out_chunks
out_kw["nsplits"] = op.input.nsplits
kws.append(out_kw)
if len(outs) == 2:
bins_kw = outs[1].params
bins_kw["chunks"] = bins_chunks = []
if isinstance(bins, ENTITY_TYPE):
bins_chunks.append(bins.chunks[0])
else:
if op.duplicates == "drop":
if isinstance(bins, (np.ndarray, list, tuple)):
bins = np.unique(bins)
else:
bins = bins.unique()
bins = bins.astype(outs[1].dtype, copy=False)
convert = (
astensor if not isinstance(bins, pd.IntervalIndex) else asindex
)
converted = yield from recursive_tile(
convert(bins, chunk_size=len(bins))
)
bins_chunks.append(converted.chunks[0])
bins_kw["nsplits"] = ((len(bins),),)
kws.append(bins_kw)
new_op = op.copy()
return new_op.new_tileables(op.inputs, kws=kws)
@classmethod
def execute(cls, ctx, op):
x = ctx[op.input.key]
bins = ctx[op.bins.key] if isinstance(op.bins, ENTITY_TYPE) else op.bins
labels = ctx[op.labels.key] if isinstance(op.labels, ENTITY_TYPE) else op.labels
if pd.__version__ >= "1.1.0":
cut = partial(
pd.cut,
right=op.right,
retbins=op.retbins,
precision=op.precision,
include_lowest=op.include_lowest,
duplicates=op.duplicates,
ordered=op.ordered,
)
else:
cut = partial(
pd.cut,
right=op.right,
retbins=op.retbins,
precision=op.precision,
include_lowest=op.include_lowest,
duplicates=op.duplicates,
)
try:
ret = cut(x, bins, labels=labels)
except ValueError:
# fail due to buffer source array is read-only
ret = cut(x.copy(), bins, labels=labels)
if op.retbins: # pragma: no cover
ctx[op.outputs[0].key] = ret[0]
ctx[op.outputs[1].key] = ret[1]
else:
ctx[op.outputs[0].key] = ret
[文档]def cut(
x,
bins,
right: bool = True,
labels=None,
retbins: bool = False,
precision: int = 3,
include_lowest: bool = False,
duplicates: str = "raise",
ordered: bool = True,
):
"""
Bin values into discrete intervals.
Use `cut` when you need to segment and sort data values into bins. This
function is also useful for going from a continuous variable to a
categorical variable. For example, `cut` could convert ages to groups of
age ranges. Supports binning into an equal number of bins, or a
pre-specified array of bins.
Parameters
----------
x : array-like
The input array to be binned. Must be 1-dimensional.
bins : int, sequence of scalars, or IntervalIndex
The criteria to bin by.
* int : Defines the number of equal-width bins in the range of `x`. The
range of `x` is extended by .1% on each side to include the minimum
and maximum values of `x`.
* sequence of scalars : Defines the bin edges allowing for non-uniform
width. No extension of the range of `x` is done.
* IntervalIndex : Defines the exact bins to be used. Note that
IntervalIndex for `bins` must be non-overlapping.
right : bool, default True
Indicates whether `bins` includes the rightmost edge or not. If
``right == True`` (the default), then the `bins` ``[1, 2, 3, 4]``
indicate (1,2], (2,3], (3,4]. This argument is ignored when
`bins` is an IntervalIndex.
labels : array or False, default None
Specifies the labels for the returned bins. Must be the same length as
the resulting bins. If False, returns only integer indicators of the
bins. This affects the type of the output container (see below).
This argument is ignored when `bins` is an IntervalIndex. If True,
raises an error.
retbins : bool, default False
Whether to return the bins or not. Useful when bins is provided
as a scalar.
precision : int, default 3
The precision at which to store and display the bins labels.
include_lowest : bool, default False
Whether the first interval should be left-inclusive or not.
duplicates : {default 'raise', 'drop'}, optional
If bin edges are not unique, raise ValueError or drop non-uniques.
ordered : bool, default True
Whether the labels are ordered or not. Applies to returned types
Categorical and Series (with Categorical dtype). If True, the resulting
categorical will be ordered. If False, the resulting categorical will be
unordered (labels must be provided).
Returns
-------
out : Categorical, Series, or Tensor
An array-like object representing the respective bin for each value
of `x`. The type depends on the value of `labels`.
* True (default) : returns a Series for Series `x` or a
Categorical for all other inputs. The values stored within
are Interval dtype.
* sequence of scalars : returns a Series for Series `x` or a
Categorical for all other inputs. The values stored within
are whatever the type in the sequence is.
* False : returns a tensor of integers.
bins : Tensor or IntervalIndex.
The computed or specified bins. Only returned when `retbins=True`.
For scalar or sequence `bins`, this is a tensor with the computed
bins. If set `duplicates=drop`, `bins` will drop non-unique bin. For
an IntervalIndex `bins`, this is equal to `bins`.
See Also
--------
qcut : Discretize variable into equal-sized buckets based on rank
or based on sample quantiles.
Categorical : Array type for storing data that come from a
fixed set of values.
Series : One-dimensional array with axis labels (including time series).
IntervalIndex : Immutable Index implementing an ordered, sliceable set.
Notes
-----
Any NA values will be NA in the result. Out of bounds values will be NA in
the resulting Series or Categorical object.
Examples
--------
Discretize into three equal-sized bins.
>>> import mars.tensor as mt
>>> import mars.dataframe as md
>>> md.cut(mt.array([1, 7, 5, 4, 6, 3]), 3).execute()
... # doctest: +ELLIPSIS
[(0.994, 3.0], (5.0, 7.0], (3.0, 5.0], (3.0, 5.0], (5.0, 7.0], ...
Categories (3, interval[float64]): [(0.994, 3.0] < (3.0, 5.0] ...
>>> md.cut(mt.array([1, 7, 5, 4, 6, 3]), 3, retbins=True).execute()
... # doctest: +ELLIPSIS
([(0.994, 3.0], (5.0, 7.0], (3.0, 5.0], (3.0, 5.0], (5.0, 7.0], ...
Categories (3, interval[float64]): [(0.994, 3.0] < (3.0, 5.0] ...
array([0.994, 3. , 5. , 7. ]))
Discovers the same bins, but assign them specific labels. Notice that
the returned Categorical's categories are `labels` and is ordered.
>>> md.cut(mt.array([1, 7, 5, 4, 6, 3]),
... 3, labels=["bad", "medium", "good"]).execute()
[bad, good, medium, medium, good, bad]
Categories (3, object): [bad < medium < good]
ordered=False will result in unordered categories when labels are passed. This parameter
can be used to allow non-unique labels:
>>> md.cut(np.array([1, 7, 5, 4, 6, 3]), 3,
... labels=["B", "A", "B"], ordered=False).execute()
['B', 'B', 'A', 'A', 'B', 'B']
Categories (2, object): ['A', 'B']
``labels=False`` implies you just want the bins back.
>>> md.cut([0, 1, 1, 2], bins=4, labels=False).execute()
array([0, 1, 1, 3])
Passing a Series as an input returns a Series with categorical dtype:
>>> s = md.Series(mt.array([2, 4, 6, 8, 10]),
... index=['a', 'b', 'c', 'd', 'e'])
>>> md.cut(s, 3).execute()
... # doctest: +ELLIPSIS
a (1.992, 4.667]
b (1.992, 4.667]
c (4.667, 7.333]
d (7.333, 10.0]
e (7.333, 10.0]
dtype: category
Categories (3, interval[float64]): [(1.992, 4.667] < (4.667, ...
Passing a Series as an input returns a Series with mapping value.
It is used to map numerically to intervals based on bins.
>>> s = md.Series(mt.array([2, 4, 6, 8, 10]),
... index=['a', 'b', 'c', 'd', 'e'])
>>> md.cut(s, [0, 2, 4, 6, 8, 10], labels=False, retbins=True, right=False).execute()
... # doctest: +ELLIPSIS
(a 0.0
b 1.0
c 2.0
d 3.0
e NaN
dtype: float64, array([0, 2, 4, 6, 8, 10]))
Use `drop` optional when bins is not unique
>>> md.cut(s, [0, 2, 4, 6, 10, 10], labels=False, retbins=True,
... right=False, duplicates='drop').execute()
... # doctest: +ELLIPSIS
(a 0.0
b 1.0
c 2.0
d 3.0
e NaN
dtype: float64, array([0, 2, 4, 6, 10]))
Passing an IntervalIndex for `bins` results in those categories exactly.
Notice that values not covered by the IntervalIndex are set to NaN. 0
is to the left of the first bin (which is closed on the right), and 1.5
falls between two bins.
>>> bins = md.Index(pd.IntervalIndex.from_tuples([(0, 1), (2, 3), (4, 5)]))
>>> md.cut([0, 0.5, 1.5, 2.5, 4.5], bins).execute()
[NaN, (0, 1], NaN, (2, 3], (4, 5]]
Categories (3, interval[int64]): [(0, 1] < (2, 3] < (4, 5]]
"""
if isinstance(bins, Integral) and bins < 1:
raise ValueError("`bins` should be a positive integer")
op = DataFrameCut(
bins=bins,
right=right,
labels=labels,
retbins=retbins,
precision=precision,
include_lowest=include_lowest,
duplicates=duplicates,
ordered=ordered,
)
ret = op(x)
if not retbins:
return ret[0]
else:
return ret