Source code for xorbits._mars.dataframe.core

# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import operator
import weakref
from collections.abc import Iterable
from io import StringIO
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from ..core import (
    ENTITY_TYPE,
    Chunk,
    ChunkData,
    HasShapeTileable,
    HasShapeTileableData,
    OutputType,
    Tileable,
    _ExecuteAndFetchMixin,
    is_build_mode,
    register_output_types,
)
from ..core.entity.utils import refresh_tileable_shape
from ..deploy.oscar.session import get_default_session
from ..lib.groupby_wrapper import GroupByWrapper
from ..serialization.serializables import (
    AnyField,
    BoolField,
    DataTypeField,
    DictField,
    FieldTypes,
    Int8Field,
    Int32Field,
    IntervalArrayField,
    ListField,
    NDArrayField,
    OneOfField,
    ReferenceField,
    Serializable,
    SerializableMeta,
    SeriesField,
    SliceField,
    StringField,
    TupleField,
)
from ..tensor import statistics
from ..utils import (
    calc_nsplits,
    ceildiv,
    estimate_pandas_size,
    lazy_import,
    on_deserialize_shape,
    on_serialize_numpy_type,
    on_serialize_shape,
    tokenize,
)
from .utils import (
    ReprSeries,
    fetch_corner_data,
    is_dataframe,
    is_pandas_2,
    merge_index_value,
    parse_index,
)

cudf = lazy_import("cudf")


class IndexValue(Serializable):
    """
    Meta class for index, held by IndexData, SeriesData and DataFrameData
    """

    __slots__ = ()

    class IndexMeta(SerializableMeta):
        classmethod

        def __instancecheck__(cls, instance):
            if not is_pandas_2():
                return cls in instance.__class__.__mro__
            else:
                if cls is IndexValue.Int64Index:
                    return type(
                        instance
                    ) == IndexValue.Index and instance._dtype == np.dtype("int64")
                elif cls is IndexValue.Float64Index:
                    return type(
                        instance
                    ) == IndexValue.Index and instance._dtype == np.dtype("float64")
                elif cls is IndexValue.UInt64Index:
                    return type(
                        instance
                    ) == IndexValue.Index and instance._dtype == np.dtype("uinit64")
                else:
                    return cls in instance.__class__.__mro__

    class IndexBase(Serializable, metaclass=IndexMeta):
        _key = StringField("key")  # to identify if the index is the same
        _is_monotonic_increasing = BoolField("is_monotonic_increasing")
        _is_monotonic_decreasing = BoolField("is_monotonic_decreasing")
        _is_unique = BoolField("is_unique")
        _max_val = AnyField("max_val", on_serialize=on_serialize_numpy_type)
        _max_val_close = BoolField("max_val_close")
        _min_val = AnyField("min_val", on_serialize=on_serialize_numpy_type)
        _min_val_close = BoolField("min_val_close")

        @property
        def is_monotonic_increasing(self):
            return self._is_monotonic_increasing

        @property
        def is_monotonic_decreasing(self):
            return self._is_monotonic_decreasing

        @property
        def is_unique(self):
            return self._is_unique

        @property
        def min_val(self):
            return self._min_val

        @property
        def min_val_close(self):
            return self._min_val_close

        @property
        def max_val(self):
            return self._max_val

        @property
        def max_val_close(self):
            return self._max_val_close

        @property
        def key(self):
            return self._key

        @property
        def inferred_type(self):
            return None

        def to_pandas(self):
            kw = {
                field.tag: getattr(self, attr, None)
                for attr, field in self._FIELDS.items()
                if attr not in super(type(self), self)._FIELDS
            }
            kw = {k: v for k, v in kw.items() if v is not None}
            if kw.get("data") is None:
                kw["data"] = []

            pd_initializer = getattr(self, "_pd_initializer", None)
            if pd_initializer is None:
                pd_initializer = getattr(pd, type(self).__name__)
            return pd_initializer(**kw)

    class Index(IndexBase):
        _name = AnyField("name")
        _data = NDArrayField("data")
        _dtype = DataTypeField("dtype")

    class RangeIndex(IndexBase):
        _name = AnyField("name")
        _slice = SliceField("slice")
        _dtype = DataTypeField("dtype")

        @property
        def slice(self):
            return self._slice

        @property
        def dtype(self):
            return getattr(self, "_dtype", np.dtype(np.intc))

        def to_pandas(self):
            slc = self._slice
            return pd.RangeIndex(
                slc.start, slc.stop, slc.step, name=getattr(self, "_name", None)
            )

    class CategoricalIndex(IndexBase):
        _name = AnyField("name")
        _data = NDArrayField("data")
        _categories = AnyField("categories")
        _ordered = BoolField("ordered")

        @property
        def inferred_type(self):
            return "categorical"

    class IntervalIndex(IndexBase):
        _name = AnyField("name")
        _data = IntervalArrayField("data")
        _closed = StringField("closed")

        @property
        def inferred_type(self):
            return "interval"

    class DatetimeIndex(IndexBase):
        _name = AnyField("name")
        _data = NDArrayField("data")
        _freq = AnyField("freq")
        _start = AnyField("start")
        _periods = AnyField("periods")
        _end = AnyField("end")
        _closed = AnyField("closed")
        _tz = AnyField("tz")
        _ambiguous = AnyField("ambiguous")
        _dayfirst = BoolField("dayfirst")
        _yearfirst = BoolField("yearfirst")

        @property
        def inferred_type(self):
            return "datetime64"

        @property
        def freq(self):
            return getattr(self, "_freq", None)

    class TimedeltaIndex(IndexBase):
        _name = AnyField("name")
        _data = NDArrayField("data")
        _unit = AnyField("unit")
        _freq = AnyField("freq")
        _start = AnyField("start")
        _periods = AnyField("periods")
        _end = AnyField("end")
        _closed = AnyField("closed")

        @property
        def inferred_type(self):
            return "timedelta64"

    class PeriodIndex(IndexBase):
        _name = AnyField("name")
        _data = NDArrayField("data")
        _freq = AnyField("freq")
        _start = AnyField("start")
        _periods = AnyField("periods")
        _end = AnyField("end")
        _year = AnyField("year")
        _month = AnyField("month")
        _quarter = AnyField("quarter")
        _day = AnyField("day")
        _hour = AnyField("hour")
        _minute = AnyField("minute")
        _second = AnyField("second")
        _tz = AnyField("tz")
        _dtype = DataTypeField("dtype")

        @property
        def inferred_type(self):
            return "period"

    class Int64Index(IndexBase):
        _pd_initializer = pd.Index

        _name = AnyField("name")
        _data = NDArrayField("data")
        _dtype = DataTypeField("dtype")

        @property
        def inferred_type(self):
            return "integer"

    class UInt64Index(IndexBase):
        _pd_initializer = pd.Index

        _name = AnyField("name")
        _data = NDArrayField("data")
        _dtype = DataTypeField("dtype")

        @property
        def inferred_type(self):
            return "integer"

    class Float64Index(IndexBase):
        _pd_initializer = pd.Index

        _name = AnyField("name")
        _data = NDArrayField("data")
        _dtype = DataTypeField("dtype")

        @property
        def inferred_type(self):
            return "floating"

    class MultiIndex(IndexBase):
        _names = ListField("names", on_serialize=list)
        _dtypes = ListField("dtypes", on_serialize=list)
        _data = NDArrayField("data")
        _sortorder = Int32Field("sortorder")

        @property
        def inferred_type(self):
            return "mixed"

        @property
        def names(self) -> list:
            return self._names

        def to_pandas(self):
            data = getattr(self, "_data", None)
            sortorder = getattr(self, "_sortorder", None)

            def _build_empty_array(dtype):
                try:
                    return np.array([], dtype=dtype)
                except TypeError:  # pragma: no cover
                    return pd.array([], dtype=dtype)

            if data is None:
                return pd.MultiIndex.from_arrays(
                    [_build_empty_array(dtype) for dtype in self._dtypes],
                    sortorder=sortorder,
                    names=self._names,
                )
            return pd.MultiIndex.from_tuples(
                [tuple(d) for d in data], sortorder=sortorder, names=self._names
            )

    _index_value = OneOfField(
        "index_value",
        index=Index,
        range_index=RangeIndex,
        categorical_index=CategoricalIndex,
        interval_index=IntervalIndex,
        datetime_index=DatetimeIndex,
        timedelta_index=TimedeltaIndex,
        period_index=PeriodIndex,
        int64_index=Int64Index,
        uint64_index=UInt64Index,
        float64_index=Float64Index,
        multi_index=MultiIndex,
    )

    def __mars_tokenize__(self):
        # return object for tokenize
        v = self._index_value
        return v._key

    @property
    def value(self):
        return self._index_value

    @property
    def key(self):
        return self._index_value.key

    @property
    def is_monotonic_increasing(self):
        return self._index_value.is_monotonic_increasing

    @property
    def is_monotonic_decreasing(self):
        return self._index_value.is_monotonic_decreasing

    @property
    def is_monotonic_increasing_or_decreasing(self):
        return self.is_monotonic_increasing or self.is_monotonic_decreasing

    @property
    def is_unique(self):
        return self._index_value.is_unique

    @property
    def min_val(self):
        return self._index_value.min_val

    @property
    def min_val_close(self):
        return self._index_value.min_val_close

    @property
    def max_val(self):
        return self._index_value.max_val

    @property
    def max_val_close(self):
        return self._index_value.max_val_close

    @property
    def min_max(self):
        return (
            self._index_value.min_val,
            self._index_value.min_val_close,
            self._index_value.max_val,
            self._index_value.max_val_close,
        )

    @property
    def name(self):
        return getattr(self._index_value, "_name", None)

    @property
    def inferred_type(self):
        return self._index_value.inferred_type

    def has_value(self):
        if isinstance(self._index_value, self.RangeIndex):
            if np.isnan(self._index_value.max_val):
                return False
            else:
                return True
        elif getattr(self._index_value, "_data", None) is not None:
            return True
        return False

    def to_pandas(self):
        return self._index_value.to_pandas()


class DtypesValue(Serializable):
    """
    Meta class for dtypes.
    """

    __slots__ = ()

    _key = StringField("key")
    _value = SeriesField("value")

    def __init__(self, key=None, value=None, **kw):
        super().__init__(_key=key, _value=value, **kw)
        if self._key is None:
            self._key = tokenize(self._value)

    @property
    def key(self):
        return self._key

    @property
    def value(self):
        return self._value


def refresh_index_value(tileable: ENTITY_TYPE):
    index_to_index_values = dict()
    for chunk in tileable.chunks:
        if chunk.ndim == 1:
            index_to_index_values[chunk.index] = chunk.index_value
        elif chunk.index[1] == 0:
            index_to_index_values[chunk.index] = chunk.index_value
    index_value = merge_index_value(index_to_index_values, store_data=False)
    # keep key as original index_value's
    if tileable.index_value is not None:
        index_value._index_value._key = tileable.index_value.key
    tileable._index_value = index_value


def refresh_dtypes(tileable: ENTITY_TYPE):
    all_dtypes = [c.dtypes_value.value for c in tileable.chunks if c.index[0] == 0]
    dtypes = pd.concat(all_dtypes)
    tileable._dtypes = dtypes
    columns_values = parse_index(dtypes.index, store_data=True)
    tileable._columns_value = columns_values
    tileable._dtypes_value = DtypesValue(key=tokenize(dtypes), value=dtypes)


_tileable_key_property = "_tileable_key"
_tileable_dtypes_property = "_tileable_dtypes"
_tileable_index_value_property = "_tileable_index_value"
_tileable_columns_value_property = "_tileable_columns_value"
_nsplits_property = "_tileable_nsplits"
_lazy_chunk_meta_properties = (
    _tileable_key_property,
    _tileable_dtypes_property,
    _tileable_index_value_property,
    _tileable_columns_value_property,
    _nsplits_property,
)


class LazyMetaChunkData(ChunkData):
    __slots__ = _lazy_chunk_meta_properties

    def _set_tileable_meta(
        self,
        tileable_key: str = None,
        nsplits: Tuple[Tuple[int, ...]] = None,
        index_value: IndexValue = None,
        columns_value: IndexValue = None,
        dtypes: pd.Series = None,
    ):
        setattr(self, _tileable_key_property, tileable_key)
        setattr(self, _nsplits_property, nsplits)
        setattr(self, _tileable_index_value_property, index_value)
        setattr(self, _tileable_columns_value_property, columns_value)
        setattr(self, _tileable_dtypes_property, dtypes)


def is_chunk_meta_lazy(chunk: ChunkData) -> bool:
    chunk = chunk.data if hasattr(chunk, "data") else chunk
    return isinstance(chunk, LazyMetaChunkData) and hasattr(
        chunk, _tileable_key_property
    )


@functools.lru_cache(maxsize=128)
def _get_cum_nsplit(nsplit: Tuple[int]) -> List[int]:
    return [0] + np.cumsum(nsplit).tolist()


def _calc_axis_slice(nsplit: Tuple[int], index: int) -> slice:
    if not isinstance(nsplit, tuple):
        nsplit = tuple(nsplit)
    cum_nsplit = _get_cum_nsplit(nsplit)
    return slice(cum_nsplit[index], cum_nsplit[index + 1])


class ChunkDtypesField(SeriesField):
    _tileable_key_index_to_dtypes = dict()

    @staticmethod
    def _gen_chunk_dtypes(instance: Chunk, index: int) -> Optional[pd.Series]:
        # dtypes of tileable
        try:
            tileable_key = getattr(instance, _tileable_key_property)
        except AttributeError:
            return
        cache = ChunkDtypesField._tileable_key_index_to_dtypes
        try:
            return cache[tileable_key, index]
        except KeyError:
            tileable_dtypes = getattr(instance, _tileable_dtypes_property)
            # nsplits of tileable
            nsplits = getattr(instance, _nsplits_property)[1]
            # calc slice
            slc = _calc_axis_slice(nsplits, index)
            dtypes = tileable_dtypes.iloc[slc]
            cache[tileable_key, index] = dtypes
            return dtypes

    def __get__(self, instance, owner=None):
        if not issubclass(owner, LazyMetaChunkData):  # pragma: no cover
            return super().__get__(instance, owner)

        try:
            value = self.get(instance, owner)
            if value is not None:
                return value
        except AttributeError:  # pragma: no cover
            pass

        if instance.index is None:
            return super().__get__(instance, owner)

        # get dtypes lazily
        index = instance.index[1]
        dtypes = self._gen_chunk_dtypes(instance, index)
        # cache dtypes
        self.set(instance, dtypes)
        return dtypes


class ChunkIndexValueField(ReferenceField):
    _tileable_key_index_to_index_value = dict()

    @staticmethod
    def _gen_chunk_index_value(instance: Chunk, index: int) -> Optional[IndexValue]:
        # index_value of tileable
        try:
            tileable_key = getattr(instance, _tileable_key_property)
        except AttributeError:
            return
        cache = ChunkIndexValueField._tileable_key_index_to_index_value
        try:
            return cache[tileable_key, index]
        except KeyError:
            tileable_index_value = getattr(instance, _tileable_index_value_property)
            # nsplits of tileable
            nsplit = getattr(instance, _nsplits_property)[0]
            # calc slice
            slc = _calc_axis_slice(nsplit, index)
            pd_index = tileable_index_value.to_pandas()
            if np.isnan(slc.stop - slc.start):
                chunk_pd_index = pd_index[:0]
            else:
                chunk_pd_index = pd_index[slc]
            index_value = parse_index(
                chunk_pd_index,
                key=f"{tileable_index_value.key}_index_{index}_{slc.start}_{slc.stop}",
            )
            cache[tileable_key, index] = index_value
            return index_value

    def __get__(self, instance, owner=None):
        if not issubclass(owner, LazyMetaChunkData):  # pragma: no cover
            return super().__get__(instance, owner)

        try:
            value = self.get(instance, owner)
            if value is not None:
                return value
        except AttributeError:  # pragma: no cover
            pass

        if instance.index is None:
            return super().__get__(instance, owner)

        # get index_value lazily
        index = instance.index[0]
        index_value = self._gen_chunk_index_value(instance, index)
        # cache index_value
        self.set(instance, index_value)
        return index_value


class ChunkColumnsValueField(ReferenceField):
    _tileable_key_index_to_index_value = dict()

    @staticmethod
    def _gen_chunk_columns_value(instance: Chunk, index: int) -> Optional[IndexValue]:
        # columns_value of tileable
        try:
            tileable_key = getattr(instance, _tileable_key_property)
        except AttributeError:
            return
        cache = ChunkColumnsValueField._tileable_key_index_to_index_value
        try:
            return cache[tileable_key, index]
        except KeyError:
            tileable_columns_value = getattr(instance, _tileable_columns_value_property)
            # nsplits of tileable
            nsplit = getattr(instance, _nsplits_property)[1]
            # calc slice
            slc = _calc_axis_slice(nsplit, index)
            pd_index = tileable_columns_value.to_pandas()
            chunk_pd_index = (
                pd_index[:0] if np.isnan(slc.stop - slc.start) else pd_index[slc]
            )
            columns_value = parse_index(chunk_pd_index, store_data=True)
            cache[tileable_key, index] = columns_value
            return columns_value

    def __get__(self, instance, owner=None):
        if not issubclass(owner, LazyMetaChunkData):  # pragma: no cover
            return super().__get__(instance, owner)

        try:
            value = self.get(instance, owner)
            if value is not None:
                return value
        except AttributeError:  # pragma: no cover
            pass

        if instance.index is None:
            return super().__get__(instance, owner)

        # get columns_value lazily
        index = instance.index[1]
        columns_value = self._gen_chunk_columns_value(instance, index)
        # cache columns_value
        self.set(instance, columns_value)
        return columns_value


class IndexChunkData(ChunkData):
    __slots__ = ()
    type_name = "Index"

    # required fields
    _shape = TupleField(
        "shape",
        FieldTypes.int64,
        on_serialize=on_serialize_shape,
        on_deserialize=on_deserialize_shape,
    )
    # optional field
    _dtype = DataTypeField("dtype")
    _name = AnyField("name")
    _index_value = ReferenceField("index_value", IndexValue)

    def __init__(
        self,
        op=None,
        shape=None,
        index=None,
        dtype=None,
        name=None,
        index_value=None,
        **kw,
    ):
        super().__init__(
            _op=op,
            _shape=shape,
            _index=index,
            _dtype=dtype,
            _name=name,
            _index_value=index_value,
            **kw,
        )

    @property
    def params(self) -> Dict[str, Any]:
        # params return the properties which useful to rebuild a new chunk
        return {
            "shape": self.shape,
            "dtype": self.dtype,
            "index": self.index,
            "index_value": self.index_value,
            "name": self.name,
        }

    @params.setter
    def params(self, new_params: Dict[str, Any]):
        params = new_params.copy()
        params.pop("index", None)  # index not needed to update
        new_shape = params.pop("shape", None)
        if new_shape is not None:
            self._shape = new_shape
        dtype = params.pop("dtype", None)
        if dtype is not None:
            self._dtype = dtype
        index_value = params.pop("index_value", None)
        if index_value is not None:
            self._index_value = index_value
        name = params.pop("name", None)
        if name is not None:
            self._name = name
        if params:  # pragma: no cover
            raise TypeError(f"Unknown params: {list(params)}")

    @classmethod
    def get_params_from_data(
        cls, data: "Union[pd.Index, cudf.core.index.BaseIndex]"
    ) -> Dict[str, Any]:
        # cudf multi index doesn't have attribute 'dtype'.
        dtype = getattr(data, "dtype", np.dtype(object))
        return {
            "shape": data.shape,
            "dtype": dtype,
            "index_value": parse_index(data, store_data=False),
            "name": data.name,
        }

    @property
    def shape(self):
        return getattr(self, "_shape", None)

    @property
    def ndim(self):
        return len(self.shape)

    @property
    def dtype(self):
        return self._dtype

    @property
    def name(self):
        return self._name

    @property
    def index_value(self):
        return self._index_value


class IndexChunk(Chunk):
    __slots__ = ()
    _allow_data_type_ = (IndexChunkData,)
    type_name = "Index"


def _on_deserialize_index_value(index_value):
    if index_value is None:
        return
    try:
        getattr(index_value, "value")
        return index_value
    except AttributeError:
        return


class _ToPandasMixin(_ExecuteAndFetchMixin):
    __slots__ = ()

    def to_pandas(self, session=None, **kw):
        return self._execute_and_fetch(session=session, **kw)


class _BatchedFetcher:
    __slots__ = ()

    def _iter(self, batch_size=None, session=None, **kw):
        from .indexing.iloc import iloc

        if batch_size is not None:
            size = self.shape[0]
            n_batch = ceildiv(size, batch_size)

            if n_batch > 1:
                for i in range(n_batch):
                    batch_data = iloc(self)[batch_size * i : batch_size * (i + 1)]
                    yield batch_data._fetch(session=session, **kw)
            else:
                yield self._fetch(session=session, **kw)
        else:
            # if batch_size is not specified, use first batch to estimate
            # batch_size.
            default_batch_bytes = 50 * 1024**2
            first_batch = 1000
            size = self.shape[0]

            if size >= first_batch:
                batch_data = iloc(self)[:first_batch]
                first_batch_data = batch_data._fetch(session=session, **kw)
                yield first_batch_data
                data_size = estimate_pandas_size(first_batch_data)
                batch_size = int(default_batch_bytes / data_size * first_batch)
                n_batch = ceildiv(size - 1000, batch_size)
                for i in range(n_batch):
                    batch_data = iloc(self)[
                        first_batch
                        + batch_size * i : first_batch
                        + batch_size * (i + 1)
                    ]
                    yield batch_data._fetch(session=session, **kw)
            else:
                yield self._fetch(session=session, **kw)

    def iterbatch(self, batch_size=None, session=None, **kw):
        # trigger execution
        self.execute(session=session, **kw)
        return self._iter(batch_size=batch_size, session=session)

    def fetch(self, session=None, **kw):
        from .indexing.iloc import DataFrameIlocGetItem, SeriesIlocGetItem

        batch_size = kw.pop("batch_size", None)
        if isinstance(self.op, (DataFrameIlocGetItem, SeriesIlocGetItem)):
            # see GH#1871
            # already iloc, do not trigger batch fetch
            return self._fetch(session=session, **kw)
        else:
            batches = list(self._iter(batch_size=batch_size, session=session, **kw))
            return pd.concat(batches) if len(batches) > 1 else batches[0]

    def fetch_infos(self, fields=None, session=None, **kw):
        return self._fetch_infos(fields=fields, session=session, **kw)


class IndexData(HasShapeTileableData, _ToPandasMixin):
    __slots__ = "_cache", "_accessors"
    type_name = "Index"

    # optional field
    _dtype = DataTypeField("dtype")
    _name = AnyField("name")
    _names = AnyField("names")
    _index_value = ReferenceField(
        "index_value", IndexValue, on_deserialize=_on_deserialize_index_value
    )
    _chunks = ListField(
        "chunks",
        FieldTypes.reference(IndexChunkData),
        on_serialize=lambda x: [it.data for it in x] if x is not None else x,
        on_deserialize=lambda x: [IndexChunk(it) for it in x] if x is not None else x,
    )

    def __init__(
        self,
        op=None,
        shape=None,
        nsplits=None,
        dtype=None,
        name=None,
        names=None,
        index_value=None,
        chunks=None,
        **kw,
    ):
        super().__init__(
            _op=op,
            _shape=shape,
            _nsplits=nsplits,
            _dtype=dtype,
            _name=name,
            _names=names,
            _index_value=index_value,
            _chunks=chunks,
            **kw,
        )
        self._accessors = dict()

    @property
    def params(self) -> Dict[str, Any]:
        # params return the properties which useful to rebuild a new tileable object
        return {
            "shape": self.shape,
            "dtype": self.dtype,
            "name": self.name,
            "index_value": self.index_value,
        }

    @params.setter
    def params(self, new_params: Dict[str, Any]):
        params = new_params.copy()
        new_shape = params.pop("shape", None)
        if new_shape is not None:
            self._shape = new_shape
        dtype = params.pop("dtype", None)
        if dtype is not None:
            self._dtype = dtype
        index_value = params.pop("index_value", None)
        if index_value is not None:
            self._index_value = index_value
        name = params.pop("name", None)
        if name is not None:
            self._name = name
        if params:  # pragma: no cover
            raise TypeError(f"Unknown params: {list(params)}")

    def refresh_params(self):
        # refresh params when chunks updated
        refresh_tileable_shape(self)
        refresh_index_value(self)
        if self._dtype is None:
            self._dtype = self.chunks[0].dtype
        if self._name is None:
            self._name = self.chunks[0].name

    def _to_str(self, representation=False):
        if is_build_mode() or len(self._executed_sessions) == 0:
            # in build mode, or not executed, just return representation
            if representation:
                return f"Index <op={type(self._op).__name__}, key={self.key}"
            else:
                return f"Index(op={type(self._op).__name__})"
        else:
            data = self.fetch(session=self._executed_sessions[-1])
            return repr(data) if repr(data) else str(data)

    def __str__(self):
        return self._to_str(representation=False)

    def __repr__(self):
        return self._to_str(representation=True)

    def _to_arr(self):
        if len(self._executed_sessions) == 0:  # pragma: no cover
            raise NotImplementedError

        data = self.fetch(session=self._executed_sessions[-1])
        return np.asarray(data)

    def __array__(self):
        return self._to_arr()

    def _to_mars_tensor(self, dtype=None, order="K", extract_multi_index=False):
        tensor = self.to_tensor(extract_multi_index=extract_multi_index)
        dtype = dtype if dtype is not None else tensor.dtype
        return tensor.astype(dtype=dtype, order=order, copy=False)

    def __mars_tensor__(self, dtype=None, order="K"):
        return self._to_mars_tensor(dtype=dtype, order=order)

    @property
    def dtype(self):
        return getattr(self, "_dtype", None) or self.op.dtype

    @property
    def name(self):
        return self._name

    @property
    def names(self):
        return getattr(self, "_names", None) or [self.name]

    @property
    def index_value(self) -> IndexValue:
        return self._index_value

    @property
    def inferred_type(self):
        return self._index_value.inferred_type

    def to_tensor(self, dtype=None, extract_multi_index=False):
        from ..tensor.datasource.from_dataframe import from_index

        return from_index(self, dtype=dtype, extract_multi_index=extract_multi_index)


class Index(HasShapeTileable, _ToPandasMixin):
    __slots__ = "_df_or_series", "_parent_key", "_axis"
    _allow_data_type_ = (IndexData,)
    type_name = "Index"

    def __new__(cls, data: Union[pd.Index, IndexData] = None, **_):
        if data is not None and not isinstance(data, pd.Index):
            # create corresponding Index class
            # according to type of index_value
            clz = globals()[type(data.index_value.value).__name__]
        else:
            clz = cls
        return object.__new__(clz)

    def __len__(self):
        return len(self._data)

    def __mars_tensor__(self, dtype=None, order="K"):
        return self._data.__mars_tensor__(dtype=dtype, order=order)

    def _get_df_or_series(self):
        obj = getattr(self, "_df_or_series", None)
        if obj is not None:
            return obj()
        return None

    def _set_df_or_series(self, df_or_series, axis):
        self._df_or_series = weakref.ref(df_or_series)
        self._parent_key = df_or_series.key
        self._axis = axis

    @property
    def T(self):
        """Return the transpose, which is by definition self."""
        return self

    @property
    def name(self):
        return self._data.name

    @name.setter
    def name(self, value):
        df_or_series = self._get_df_or_series()
        if df_or_series is not None and df_or_series.key == self._parent_key:
            df_or_series.rename_axis(value, axis=self._axis, inplace=True)
            self.data = df_or_series.axes[self._axis].data
        else:
            self.rename(value, inplace=True)

    @property
    def names(self):
        return self._data.names

    @names.setter
    def names(self, value):
        df_or_series = self._get_df_or_series()
        if df_or_series is not None:
            df_or_series.rename_axis(value, axis=self._axis, inplace=True)
            self.data = df_or_series.axes[self._axis].data
        else:
            self.rename(value, inplace=True)

    @property
    def values(self):
        return self.to_tensor()

[docs] def to_frame(self, index: bool = True, name=None): """ Create a DataFrame with a column containing the Index. Parameters ---------- index : bool, default True Set the index of the returned DataFrame as the original Index. name : object, default None The passed name should substitute for the index name (if it has one). Returns ------- DataFrame DataFrame containing the original Index data. See Also -------- Index.to_series : Convert an Index to a Series. Series.to_frame : Convert Series to DataFrame. Examples -------- >>> import mars.dataframe as md >>> idx = md.Index(['Ant', 'Bear', 'Cow'], name='animal') >>> idx.to_frame().execute() animal animal Ant Ant Bear Bear Cow Cow By default, the original Index is reused. To enforce a new Index: >>> idx.to_frame(index=False).execute() animal 0 Ant 1 Bear 2 Cow To override the name of the resulting column, specify `name`: >>> idx.to_frame(index=False, name='zoo').execute() zoo 0 Ant 1 Bear 2 Cow """ from . import dataframe_from_tensor if isinstance(self.index_value.value, IndexValue.MultiIndex): old_names = self.index_value.value.names if ( name is not None and not isinstance(name, Iterable) or isinstance(name, str) ): raise TypeError("'name' must be a list / sequence of column names.") name = list(name if name is not None else old_names) if len(name) != len(old_names): raise ValueError( "'name' should have same length as number of levels on index." ) columns = [ old or new or idx for idx, (old, new) in enumerate(zip(old_names, name)) ] else: columns = [name or self.name or 0] index_ = self if index else None return dataframe_from_tensor( self._data._to_mars_tensor(self, extract_multi_index=True), index=index_, columns=columns, )
[docs] def to_series(self, index=None, name=None): """ Create a Series with both index and values equal to the index keys. Useful with map for returning an indexer based on an index. Parameters ---------- index : Index, optional Index of resulting Series. If None, defaults to original index. name : str, optional Dame of resulting Series. If None, defaults to name of original index. Returns ------- Series The dtype will be based on the type of the Index values. """ from . import series_from_index return series_from_index(self, index=index, name=name)
[docs] def copy(self, name=None, deep=False): """ Make a copy of this object. Name is set on the new object. Parameters ---------- name : Label, optional Set name for new object. deep : bool, default False Returns ------- Index Index refer to new object which is a copy of this object. Notes ----- In most cases, there should be no functional difference from using ``deep``, but if ``deep`` is passed it will attempt to deepcopy. Examples -------- >>> idx = pd.Index(['a', 'b', 'c']) >>> new_idx = idx.copy() >>> idx is new_idx False """ return super().copy(name=name)
class RangeIndex(Index): __slots__ = () class CategoricalIndex(Index): __slots__ = () class IntervalIndex(Index): __slots__ = () class DatetimeIndex(Index): __slots__ = () class TimedeltaIndex(Index): __slots__ = () class PeriodIndex(Index): __slots__ = () class Int64Index(Index): __slots__ = () class UInt64Index(Index): __slots__ = () class Float64Index(Index): __slots__ = () class MultiIndex(Index): __slots__ = () class BaseSeriesChunkData(LazyMetaChunkData): __slots__ = () # required fields _shape = TupleField( "shape", FieldTypes.int64, on_serialize=on_serialize_shape, on_deserialize=on_deserialize_shape, ) # optional field _dtype = DataTypeField("dtype") _name = AnyField("name") _index_value = ChunkIndexValueField( "index_value", IndexValue, on_deserialize=_on_deserialize_index_value ) def __init__( self, op=None, shape=None, index=None, dtype=None, name=None, index_value=None, **kw, ): super().__init__( _op=op, _shape=shape, _index=index, _dtype=dtype, _name=name, _index_value=index_value, **kw, ) def _get_params(self) -> Dict[str, Any]: # params return the properties which useful to rebuild a new chunk return { "shape": self.shape, "dtype": self.dtype, "index": self.index, "index_value": self.index_value, "name": self.name, } def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() params.pop("index", None) # index not needed to update new_shape = params.pop("shape", None) if new_shape is not None: self._shape = new_shape dtype = params.pop("dtype", None) if dtype is not None: self._dtype = dtype index_value = params.pop("index_value", None) if index_value is not None: self._index_value = index_value name = params.pop("name", None) if name is not None: self._name = name if params: # pragma: no cover raise TypeError(f"Unknown params: {list(params)}") params = property(_get_params, _set_params) @classmethod def get_params_from_data(cls, data: pd.Series) -> Dict[str, Any]: return { "shape": data.shape, "dtype": data.dtype, "index_value": parse_index(data.index, store_data=False), "name": data.name, } @property def shape(self): return getattr(self, "_shape", None) @property def ndim(self): return len(self.shape) @property def dtype(self): return self._dtype @property def name(self): return self._name @property def index_value(self): return self._index_value class SeriesChunkData(BaseSeriesChunkData): type_name = "Series" class SeriesChunk(Chunk): __slots__ = () _allow_data_type_ = (SeriesChunkData,) type_name = "Series" class BaseSeriesData(HasShapeTileableData, _ToPandasMixin): __slots__ = "_cache", "_accessors" # optional field _dtype = DataTypeField("dtype") _name = AnyField("name") _index_value = ReferenceField( "index_value", IndexValue, on_deserialize=_on_deserialize_index_value ) _chunks = ListField( "chunks", FieldTypes.reference(SeriesChunkData), on_serialize=lambda x: [it.data for it in x] if x is not None else x, on_deserialize=lambda x: [SeriesChunk(it) for it in x] if x is not None else x, ) def __init__( self, op=None, shape=None, nsplits=None, dtype=None, name=None, index_value=None, chunks=None, **kw, ): super().__init__( _op=op, _shape=shape, _nsplits=nsplits, _dtype=dtype, _name=name, _index_value=index_value, _chunks=chunks, **kw, ) self._accessors = dict() def _get_params(self) -> Dict[str, Any]: # params return the properties which useful to rebuild a new tileable object return { "shape": self.shape, "dtype": self.dtype, "name": self.name, "index_value": self.index_value, } def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() new_shape = params.pop("shape", None) if new_shape is not None: self._shape = new_shape dtype = params.pop("dtype", None) if dtype is not None: self._dtype = dtype index_value = params.pop("index_value", None) if index_value is not None: self._index_value = index_value name = params.pop("name", None) if name is not None: self._name = name if params: # pragma: no cover raise TypeError(f"Unknown params: {list(params)}") params = property(_get_params, _set_params) def refresh_params(self): # refresh params when chunks updated refresh_tileable_shape(self) refresh_index_value(self) if self._dtype is None: self._dtype = self.chunks[0].dtype if self._name is None: self._name = self.chunks[0].name def _to_str(self, representation=False): if is_build_mode() or len(self._executed_sessions) == 0: # in build mode, or not executed, just return representation if representation: return ( f"{self.type_name} <op={type(self._op).__name__}, key={self.key}>" ) else: return f"{self.type_name}(op={type(self._op).__name__})" else: corner_data = fetch_corner_data(self, session=self._executed_sessions[-1]) buf = StringIO() max_rows = pd.get_option("display.max_rows") corner_max_rows = ( max_rows if self.shape[0] <= max_rows else corner_data.shape[0] - 1 ) # make sure max_rows < corner_data with pd.option_context("display.max_rows", corner_max_rows): if self.shape[0] <= max_rows: corner_series = corner_data else: corner_series = ReprSeries(corner_data, self.shape) buf.write(repr(corner_series) if representation else str(corner_series)) return buf.getvalue() def __str__(self): return self._to_str(representation=False) def __repr__(self): return self._to_str(representation=False) def _to_arr(self): if len(self._executed_sessions) == 0: # pragma: no cover raise NotImplementedError data = self.fetch(session=self._executed_sessions[-1]) return np.asarray(data) def __array__(self): return self._to_arr() @property def dtype(self): return getattr(self, "_dtype", None) or getattr(self.op, "dtype", None) @property def name(self): return self._name @property def index_value(self): return self._index_value @property def index(self): from .datasource.index import from_tileable return from_tileable(self) @property def axes(self): return [self.index] @property def empty(self): shape = getattr(self, "_shape") if np.any(np.isnan(shape)): raise ValueError("Tileable object must be executed first") return shape == (0,) def to_tensor(self, dtype=None): from ..tensor.datasource.from_dataframe import from_series return from_series(self, dtype=dtype) @staticmethod def from_tensor(in_tensor, index=None, name=None): from .datasource.from_tensor import series_from_tensor return series_from_tensor(in_tensor, index=index, name=name) class SeriesData(_BatchedFetcher, BaseSeriesData): type_name = "Series" def __mars_tensor__(self, dtype=None, order="K"): tensor = self.to_tensor() dtype = dtype if dtype is not None else tensor.dtype return tensor.astype(dtype=dtype, order=order, copy=False) def items(self, batch_size=10000, session=None): for batch_data in self.iterbatch(batch_size=batch_size, session=session): yield from getattr(batch_data, "items")() def to_dict(self, into=dict, batch_size=10000, session=None): fetch_kwargs = dict(batch_size=batch_size) return self.to_pandas(session=session, fetch_kwargs=fetch_kwargs).to_dict( into=into ) class Series(HasShapeTileable, _ToPandasMixin): __slots__ = ("_cache",) _allow_data_type_ = (SeriesData,) type_name = "Series" def to_tensor(self, dtype=None): return self._data.to_tensor(dtype=dtype) def from_tensor(self, in_tensor, index=None, name=None): return self._data.from_tensor(in_tensor, index=index, name=name) @property def T(self): """Return the transpose, which is by definition self.""" return self @property def ndim(self): """ Return an int representing the number of axes / array dimensions. Return 1 if Series. Otherwise return 2 if DataFrame. See Also -------- ndarray.ndim : Number of array dimensions. Examples -------- >>> import mars.dataframe as md >>> s = md.Series({'a': 1, 'b': 2, 'c': 3}) >>> s.ndim.execute() 1 >>> df = md.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) >>> df.ndim.execute() 2 """ return super().ndim @property def index(self): """ The index (axis labels) of the Series. """ idx = self._data.index idx._set_df_or_series(self, 0) return idx @index.setter def index(self, new_index): self.set_axis(new_index, axis=0, copy=False) @property def name(self): return self._data.name @name.setter def name(self, val): from .indexing.rename import DataFrameRename op = DataFrameRename(new_name=val, output_types=[OutputType.series]) new_series = op(self) self.data = new_series.data @property def dtype(self): """ Return the dtype object of the underlying data. """ return self._data.dtype
[docs] def copy(self, deep=True): # pylint: disable=arguments-differ """ Make a copy of this object's indices and data. When ``deep=True`` (default), a new object will be created with a copy of the calling object's data and indices. Modifications to the data or indices of the copy will not be reflected in the original object (see notes below). When ``deep=False``, a new object will be created without copying the calling object's data or index (only references to the data and index are copied). Any changes to the data of the original will be reflected in the shallow copy (and vice versa). Parameters ---------- deep : bool, default True Make a deep copy, including a copy of the data and the indices. With ``deep=False`` neither the indices nor the data are copied. Returns ------- copy : Series or DataFrame Object type matches caller. """ if deep is False: raise NotImplementedError("Not support `deep=False` for now") return super().copy()
def __len__(self): return len(self._data) def __mars_tensor__(self, dtype=None, order="K"): return self._data.__mars_tensor__(dtype=dtype, order=order)
[docs] def keys(self): """ Return alias for index. Returns ------- Index Index of the Series. """ return self.index
@property def values(self): return self.to_tensor()
[docs] def items(self, batch_size=10000, session=None): """ Lazily iterate over (index, value) tuples. This method returns an iterable tuple (index, value). This is convenient if you want to create a lazy iterator. Returns ------- iterable Iterable of tuples containing the (index, value) pairs from a Series. See Also -------- DataFrame.items : Iterate over (column name, Series) pairs. DataFrame.iterrows : Iterate over DataFrame rows as (index, Series) pairs. Examples -------- >>> import mars.dataframe as md >>> s = md.Series(['A', 'B', 'C']) >>> for index, value in s.items(): ... print(f"Index : {index}, Value : {value}") Index : 0, Value : A Index : 1, Value : B Index : 2, Value : C """ return self._data.items(batch_size=batch_size, session=session)
[docs] def to_dict(self, into=dict, batch_size=10000, session=None): """ Convert Series to {label -> value} dict or dict-like object. Parameters ---------- into : class, default dict The collections.abc.Mapping subclass to use as the return object. Can be the actual class or an empty instance of the mapping type you want. If you want a collections.defaultdict, you must pass it initialized. Returns ------- collections.abc.Mapping Key-value representation of Series. Examples -------- >>> import mars.dataframe as md >>> s = md.Series([1, 2, 3, 4]) >>> s.to_dict() {0: 1, 1: 2, 2: 3, 3: 4} >>> from collections import OrderedDict, defaultdict >>> s.to_dict(OrderedDict) OrderedDict([(0, 1), (1, 2), (2, 3), (3, 4)]) >>> dd = defaultdict(list) >>> s.to_dict(dd) defaultdict(<class 'list'>, {0: 1, 1: 2, 2: 3, 3: 4}) """ return self._data.to_dict(into=into, batch_size=batch_size, session=session)
[docs] def to_frame(self, name=None): """ Convert Series to DataFrame. Parameters ---------- name : object, default None The passed name should substitute for the series name (if it has one). Returns ------- DataFrame DataFrame representation of Series. Examples -------- >>> import mars.dataframe as md >>> s = md.Series(["a", "b", "c"], name="vals") >>> s.to_frame().execute() vals 0 a 1 b 2 c """ from . import dataframe_from_tensor name = name or self.name or 0 return dataframe_from_tensor(self, columns=[name])
[docs] def between(self, left, right, inclusive="both"): """ Return boolean Series equivalent to left <= series <= right. This function returns a boolean vector containing `True` wherever the corresponding Series element is between the boundary values `left` and `right`. NA values are treated as `False`. Parameters ---------- left : scalar or list-like Left boundary. right : scalar or list-like Right boundary. inclusive : {"both", "neither", "left", "right"} Include boundaries. Whether to set each bound as closed or open. Returns ------- Series Series representing whether each element is between left and right (inclusive). See Also -------- Series.gt : Greater than of series and other. Series.lt : Less than of series and other. Notes ----- This function is equivalent to ``(left <= ser) & (ser <= right)`` Examples -------- >>> import mars.dataframe as md >>> s = md.Series([2, 0, 4, 8, np.nan]) Boundary values are included by default: >>> s.between(1, 4).execute() 0 True 1 False 2 True 3 False 4 False dtype: bool With `inclusive` set to ``"neither"`` boundary values are excluded: >>> s.between(1, 4, inclusive="neither").execute() 0 True 1 False 2 False 3 False 4 False dtype: bool `left` and `right` can be any scalar value: >>> s = md.Series(['Alice', 'Bob', 'Carol', 'Eve']) >>> s.between('Anna', 'Daniel').execute() 0 False 1 True 2 True 3 False dtype: bool """ if isinstance(inclusive, bool): # pragma: no cover # for pandas < 1.3.0 if inclusive: inclusive = "both" else: inclusive = "neither" if inclusive == "both": lmask = self >= left rmask = self <= right elif inclusive == "left": lmask = self >= left rmask = self < right elif inclusive == "right": lmask = self > left rmask = self <= right elif inclusive == "neither": lmask = self > left rmask = self < right else: raise ValueError( "Inclusive has to be either string of 'both'," "'left', 'right', or 'neither'." ) return lmask & rmask
[docs] def median( self, axis=None, skipna=True, out=None, overwrite_input=False, keepdims=False ): """ Return the median of the values over the requested axis. Parameters ---------- axis : {index (0)} Axis or axes along which the medians are computed. The default is to compute the median along a flattened version of the tensor. A sequence of axes is supported since version 1.9.0. skipna : bool, optional, default True Exclude NA/null values when computing the result. out : Tensor, default None Output tensor in which to place the result. It must have the same shape and buffer length as the expected output, but the type (of the output) will be cast if necessary. overwrite_input : bool, default False Just for compatibility with Numpy, would not take effect. keepdims : bool, default False If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original `arr`. Returns ------- median : scalar Return the median of the values over the requested axis. See Also -------- tensor.mean, tensor.percentile Notes ----- Given a vector ``V`` of length ``N``, the median of ``V`` is the middle value of a sorted copy of ``V``, ``V_sorted`` - i e., ``V_sorted[(N-1)/2]``, when ``N`` is odd, and the average of the two middle values of ``V_sorted`` when ``N`` is even. Examples -------- >>> import mars.dataframe as md >>> a = md.Series([10, 7, 4, 3, 2, 1]) >>> a.median().execute() 2.0 >>> mt.median(a).execute() 3.5 >>> a = md.Series([10, 7, 4, None, 2, 1]) >>> a.median().execute() 4.0 >>> a.median(skipna=False).execute() nan """ if skipna: return statistics.median( self.dropna(), axis=None, out=None, overwrite_input=False, keepdims=False, ) else: return statistics.median( self, axis=None, out=None, overwrite_input=False, keepdims=False )
class BaseDataFrameChunkData(LazyMetaChunkData): __slots__ = ("_dtypes_value",) _no_copy_attrs_ = ChunkData._no_copy_attrs_ | {"_dtypes", "_columns_value"} # required fields _shape = TupleField( "shape", FieldTypes.int64, on_serialize=on_serialize_shape, on_deserialize=on_deserialize_shape, ) # optional fields _dtypes = ChunkDtypesField("dtypes") _index_value = ChunkIndexValueField( "index_value", IndexValue, on_deserialize=_on_deserialize_index_value ) _columns_value = ChunkColumnsValueField("columns_value", IndexValue) def __init__( self, op=None, shape=None, index=None, dtypes=None, index_value=None, columns_value=None, **kw, ): super().__init__( _op=op, _shape=shape, _index=index, _dtypes=dtypes, _index_value=index_value, _columns_value=columns_value, **kw, ) self._dtypes_value = None def __on_deserialize__(self): super(BaseDataFrameChunkData, self).__on_deserialize__() self._dtypes_value = None def __len__(self): return self.shape[0] def _get_params(self) -> Dict[str, Any]: # params return the properties which useful to rebuild a new chunk return { "shape": self.shape, "dtypes": self.dtypes, "dtypes_value": self.dtypes_value, "index": self.index, "index_value": self.index_value, "columns_value": self.columns_value, } def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() params.pop("index", None) # index not needed to update new_shape = params.pop("shape", None) if new_shape is not None: self._shape = new_shape index_value = params.pop("index_value", None) if index_value is not None: self._index_value = index_value dtypes = params.pop("dtypes", None) if dtypes is not None: self._dtypes = dtypes columns_value = params.pop("columns_value", None) if columns_value is not None: self._columns_value = columns_value dtypes_value = params.pop("dtypes_value", None) if dtypes_value is not None: if dtypes is None: self._dtypes = dtypes_value.value if columns_value is None: self._columns_value = parse_index(self._dtypes.index, store_data=True) self._dtypes_value = dtypes_value if params: # pragma: no cover raise TypeError(f"Unknown params: {list(params)}") params = property(_get_params, _set_params) @classmethod def get_params_from_data(cls, data: pd.DataFrame) -> Dict[str, Any]: parse_index(data.index, store_data=False) return { "shape": data.shape, "index_value": parse_index(data.index, store_data=False), "dtypes_value": DtypesValue(key=tokenize(data.dtypes), value=data.dtypes), } @property def shape(self): return getattr(self, "_shape", None) @property def ndim(self): return len(self.shape) @property def dtypes(self): dt = getattr(self, "_dtypes", None) if dt is not None: return dt return getattr(self.op, "dtypes", None) @property def dtypes_value(self): if self._dtypes_value is not None: return self._dtypes_value # TODO(qinxuye): when creating Dataframe, # dtypes_value instead of dtypes later must be passed into dtypes = self.dtypes if dtypes is not None: self._dtypes_value = DtypesValue(key=tokenize(dtypes), value=dtypes) return self._dtypes_value @property def index_value(self): return self._index_value @property def columns_value(self): return self._columns_value class DataFrameChunkData(BaseDataFrameChunkData): type_name = "DataFrame" class DataFrameChunk(Chunk): __slots__ = () _allow_data_type_ = (DataFrameChunkData,) type_name = "DataFrame" def __len__(self): return len(self._data) class BaseDataFrameData(HasShapeTileableData, _ToPandasMixin): __slots__ = "_accessors", "_dtypes_value", "_dtypes_dict" # optional fields _dtypes = SeriesField("dtypes") _index_value = ReferenceField( "index_value", IndexValue, on_deserialize=_on_deserialize_index_value ) _columns_value = ReferenceField("columns_value", IndexValue) _chunks = ListField( "chunks", FieldTypes.reference(DataFrameChunkData), on_serialize=lambda x: [it.data for it in x] if x is not None else x, on_deserialize=lambda x: [DataFrameChunk(it) for it in x] if x is not None else x, ) def __init__( self, op=None, shape=None, nsplits=None, dtypes=None, index_value=None, columns_value=None, chunks=None, **kw, ): super().__init__( _op=op, _shape=shape, _nsplits=nsplits, _dtypes=dtypes, _index_value=index_value, _columns_value=columns_value, _chunks=chunks, **kw, ) self._accessors = dict() self._dtypes_value = None self._dtypes_dict = None def __on_deserialize__(self): super().__on_deserialize__() self._accessors = dict() self._dtypes_value = None self._dtypes_dict = None def _get_params(self) -> Dict[str, Any]: # params return the properties which useful to rebuild a new tileable object return { "shape": self.shape, "dtypes": self.dtypes, "index_value": self.index_value, "columns_value": self.columns_value, "dtypes_value": self.dtypes_value, } def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() new_shape = params.pop("shape", None) if new_shape is not None: self._shape = new_shape index_value = params.pop("index_value", None) if index_value is not None: self._index_value = index_value dtypes = params.pop("dtypes", None) if dtypes is not None: self._dtypes = dtypes columns_value = params.pop("columns_value", None) if columns_value is not None: self._columns_value = columns_value dtypes_value = params.pop("dtypes_value", None) if dtypes_value is not None: if dtypes is None: self._dtypes = dtypes_value.value if columns_value is None: self._columns_value = parse_index(self._dtypes.index, store_data=True) self._dtypes_value = dtypes_value if params: # pragma: no cover raise TypeError(f"Unknown params: {list(params)}") params = property(_get_params, _set_params) def refresh_params(self): # refresh params when chunks updated refresh_tileable_shape(self) refresh_index_value(self) refresh_dtypes(self) @property def dtypes(self): dt = getattr(self, "_dtypes", None) if dt is not None: return dt return getattr(self.op, "dtypes", None) @property def dtypes_value(self): if self._dtypes_value is not None: return self._dtypes_value # TODO(qinxuye): when creating Dataframe, # dtypes_value instead of dtypes later must be passed into dtypes = self.dtypes if dtypes is not None: self._dtypes_value = DtypesValue(key=tokenize(dtypes), value=dtypes) return self._dtypes_value @property def index_value(self): return self._index_value @property def columns_value(self): return self._columns_value @property def empty(self): shape = getattr(self, "_shape") if np.any(np.isnan(shape)): raise ValueError("Tileable object must be executed first") return 0 in shape def to_tensor(self, dtype=None): from ..tensor.datasource.from_dataframe import from_dataframe return from_dataframe(self, dtype=dtype) @staticmethod def from_tensor(in_tensor, index=None, columns=None): from .datasource.from_tensor import dataframe_from_tensor return dataframe_from_tensor(in_tensor, index=index, columns=columns) @staticmethod def from_records(records, **kw): from .datasource.from_records import from_records return from_records(records, **kw) @property def index(self): from .datasource.index import from_tileable return from_tileable(self) @property def columns(self): from .datasource.index import from_pandas as from_pandas_index return from_pandas_index(self.dtypes.index, store_data=True) @property def axes(self): return [self.index, self.columns] def _get_dtypes_dict(self): if self._dtypes_dict is None: self._dtypes_dict = d = dict() for k, v in self.dtypes.items(): try: obj_list = d[k] except KeyError: obj_list = d[k] = [] obj_list.append(v) return self._dtypes_dict def _get_dtypes_by_columns(self, columns: list): dtypes_dict = self._get_dtypes_dict() return functools.reduce(operator.add, (dtypes_dict[c] for c in columns), []) def _get_columns_by_columns(self, columns: list): dtypes_dict = self._get_dtypes_dict() return functools.reduce( operator.add, ([c] * len(dtypes_dict[c]) for c in columns), [] ) class DataFrameData(_BatchedFetcher, BaseDataFrameData): type_name = "DataFrame" def _to_str(self, representation=False): if is_build_mode() or len(self._executed_sessions) == 0: # in build mode, or not executed, just return representation if representation: return ( f"{self.type_name} <op={type(self._op).__name__}, key={self.key}>" ) else: return f"{self.type_name}(op={type(self._op).__name__})" else: corner_data = fetch_corner_data(self, session=self._executed_sessions[-1]) buf = StringIO() max_rows = pd.get_option("display.max_rows") if self.shape[0] <= max_rows: buf.write(repr(corner_data) if representation else str(corner_data)) else: # remember we cannot directly call repr(df), # because the [... rows x ... columns] may show wrong rows with pd.option_context( "display.show_dimensions", False, "display.max_rows", corner_data.shape[0] - 1, ): if representation: s = repr(corner_data) else: s = str(corner_data) buf.write(s) if pd.get_option("display.show_dimensions"): n_rows, n_cols = self.shape buf.write(f"\n\n[{n_rows} rows x {n_cols} columns]") return buf.getvalue() def __str__(self): return self._to_str(representation=False) def _to_arr(self): if len(self._executed_sessions) == 0: raise NotImplementedError data = self.fetch(session=self._executed_sessions[-1]) return np.asarray(data) def __array__(self): return self._to_arr() def __repr__(self): return self._to_str(representation=True) def __mars_tensor__(self, dtype=None, order="K"): tensor = self.to_tensor() dtype = dtype if dtype is not None else tensor.dtype return tensor.astype(dtype=dtype, order=order, copy=False) def _repr_html_(self): if len(self._executed_sessions) == 0: # not executed before, fall back to normal repr raise NotImplementedError corner_data = fetch_corner_data(self, session=self._executed_sessions[-1]) buf = StringIO() max_rows = pd.get_option("display.max_rows") if self.shape[0] <= max_rows: buf.write(corner_data._repr_html_()) else: with pd.option_context( "display.show_dimensions", False, "display.max_rows", corner_data.shape[0] - 1, ): buf.write(corner_data._repr_html_().rstrip().rstrip("</div>")) if pd.get_option("display.show_dimensions"): n_rows, n_cols = self.shape buf.write(f"<p>{n_rows} rows × {n_cols} columns</p>\n") buf.write("</div>") return buf.getvalue() def items(self): for col_name in self.dtypes.index: yield col_name, self[col_name] iteritems = items def iterrows(self, batch_size=1000, session=None): for batch_data in self.iterbatch(batch_size=batch_size, session=session): yield from getattr(batch_data, "iterrows")() def itertuples(self, index=True, name="Pandas", batch_size=1000, session=None): for batch_data in self.iterbatch(batch_size=batch_size, session=session): yield from getattr(batch_data, "itertuples")(index=index, name=name) def _need_execution(self): if self._dtypes is None: return True return False class DataFrame(HasShapeTileable, _ToPandasMixin): __slots__ = ("_cache",) _allow_data_type_ = (DataFrameData,) type_name = "DataFrame" def __len__(self): return len(self._data) def to_tensor(self): return self._data.to_tensor() def from_tensor(self, in_tensor, index=None, columns=None): return self._data.from_tensor(in_tensor, index=index, columns=columns)
[docs] def from_records(self, records, **kw): return self._data.from_records(records, **kw)
def __mars_tensor__(self, dtype=None, order="K"): return self._data.__mars_tensor__(dtype=dtype, order=order) def __getattr__(self, key): if self.dtypes is not None and key in self.dtypes: return self[key] return getattr(self._data, key) def __dir__(self): result = list(super().__dir__()) return sorted( result + [k for k in self.dtypes.index if isinstance(k, str) and k.isidentifier()] ) @property def T(self): return self.transpose() @property def ndim(self): """ Return an int representing the number of axes / array dimensions. Return 1 if Series. Otherwise return 2 if DataFrame. See Also -------- ndarray.ndim : Number of array dimensions. Examples -------- >>> import mars.dataframe as md >>> s = md.Series({'a': 1, 'b': 2, 'c': 3}) >>> s.ndim.execute() 1 >>> df = md.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) >>> df.ndim.execute() 2 """ return super().ndim @property def index(self): idx = self._data.index idx._set_df_or_series(self, 0) return idx @index.setter def index(self, new_index): self.set_axis(new_index, axis=0, copy=False) @property def columns(self): col = self._data.columns col._set_df_or_series(self, 1) return col @columns.setter def columns(self, new_columns): self.set_axis(new_columns, axis=1, copy=False)
[docs] def keys(self): """ Get the 'info axis' (see Indexing for more). This is index for Series, columns for DataFrame. Returns ------- Index Info axis. """ return self.columns
@property def values(self): return self.to_tensor() @property def dtypes(self): """ Return the dtypes in the DataFrame. This returns a Series with the data type of each column. The result's index is the original DataFrame's columns. Columns with mixed types are stored with the ``object`` dtype. See :ref:`the User Guide <basics.dtypes>` for more. Returns ------- pandas.Series The data type of each column. Examples -------- >>> import mars.dataframe as md >>> df = md.DataFrame({'float': [1.0], ... 'int': [1], ... 'datetime': [md.Timestamp('20180310')], ... 'string': ['foo']}) >>> df.dtypes float float64 int int64 datetime datetime64[ns] string object dtype: object """ return self._data.dtypes
[docs] def iterrows(self, batch_size=1000, session=None): """ Iterate over DataFrame rows as (index, Series) pairs. Yields ------ index : label or tuple of label The index of the row. A tuple for a `MultiIndex`. data : Series The data of the row as a Series. it : generator A generator that iterates over the rows of the frame. See Also -------- DataFrame.itertuples : Iterate over DataFrame rows as namedtuples of the values. DataFrame.items : Iterate over (column name, Series) pairs. Notes ----- 1. Because ``iterrows`` returns a Series for each row, it does **not** preserve dtypes across the rows (dtypes are preserved across columns for DataFrames). For example, >>> import mars.dataframe as md >>> df = md.DataFrame([[1, 1.5]], columns=['int', 'float']) >>> row = next(df.iterrows())[1] >>> row int 1.0 float 1.5 Name: 0, dtype: float64 >>> print(row['int'].dtype) float64 >>> print(df['int'].dtype) int64 To preserve dtypes while iterating over the rows, it is better to use :meth:`itertuples` which returns namedtuples of the values and which is generally faster than ``iterrows``. 2. You should **never modify** something you are iterating over. This is not guaranteed to work in all cases. Depending on the data types, the iterator returns a copy and not a view, and writing to it will have no effect. """ return self._data.iterrows(batch_size=batch_size, session=session)
[docs] def itertuples(self, index=True, name="Pandas", batch_size=1000, session=None): """ Iterate over DataFrame rows as namedtuples. Parameters ---------- index : bool, default True If True, return the index as the first element of the tuple. name : str or None, default "Pandas" The name of the returned namedtuples or None to return regular tuples. Returns ------- iterator An object to iterate over namedtuples for each row in the DataFrame with the first field possibly being the index and following fields being the column values. See Also -------- DataFrame.iterrows : Iterate over DataFrame rows as (index, Series) pairs. DataFrame.items : Iterate over (column name, Series) pairs. Notes ----- The column names will be renamed to positional names if they are invalid Python identifiers, repeated, or start with an underscore. On python versions < 3.7 regular tuples are returned for DataFrames with a large number of columns (>254). Examples -------- >>> import mars.dataframe as md >>> df = md.DataFrame({'num_legs': [4, 2], 'num_wings': [0, 2]}, ... index=['dog', 'hawk']) >>> df.execute() num_legs num_wings dog 4 0 hawk 2 2 >>> for row in df.itertuples(): ... print(row) ... Pandas(Index='dog', num_legs=4, num_wings=0) Pandas(Index='hawk', num_legs=2, num_wings=2) By setting the `index` parameter to False we can remove the index as the first element of the tuple: >>> for row in df.itertuples(index=False): ... print(row) ... Pandas(num_legs=4, num_wings=0) Pandas(num_legs=2, num_wings=2) With the `name` parameter set we set a custom name for the yielded namedtuples: >>> for row in df.itertuples(name='Animal'): ... print(row) ... Animal(Index='dog', num_legs=4, num_wings=0) Animal(Index='hawk', num_legs=2, num_wings=2) """ return self._data.itertuples( batch_size=batch_size, session=session, index=index, name=name )
[docs] def assign(self, **kwargs): """ Assign new columns to a DataFrame. Returns a new object with all original columns in addition to new ones. Existing columns that are re-assigned will be overwritten. Parameters ---------- **kwargs : dict of {str: callable or Series} The column names are keywords. If the values are callable, they are computed on the DataFrame and assigned to the new columns. The callable must not change input DataFrame (though pandas doesn't check it). If the values are not callable, (e.g. a Series, scalar, or array), they are simply assigned. Returns ------- DataFrame A new DataFrame with the new columns in addition to all the existing columns. Notes ----- Assigning multiple columns within the same ``assign`` is possible. Later items in 'kwargs' may refer to newly created or modified columns in 'df'; items are computed and assigned into 'df' in order. Examples -------- >>> import mars.dataframe as md >>> df = md.DataFrame({'temp_c': [17.0, 25.0]}, ... index=['Portland', 'Berkeley']) >>> df.execute() temp_c Portland 17.0 Berkeley 25.0 Where the value is a callable, evaluated on `df`: >>> df.assign(temp_f=lambda x: x.temp_c * 9 / 5 + 32).execute() temp_c temp_f Portland 17.0 62.6 Berkeley 25.0 77.0 Alternatively, the same behavior can be achieved by directly referencing an existing Series or sequence: >>> df.assign(temp_f=df['temp_c'] * 9 / 5 + 32).execute() temp_c temp_f Portland 17.0 62.6 Berkeley 25.0 77.0 You can create multiple columns within the same assign where one of the columns depends on another one defined within the same assign: >>> df.assign(temp_f=lambda x: x['temp_c'] * 9 / 5 + 32, ... temp_k=lambda x: (x['temp_f'] + 459.67) * 5 / 9).execute() temp_c temp_f temp_k Portland 17.0 62.6 290.15 Berkeley 25.0 77.0 298.15 """ def apply_if_callable(maybe_callable, obj, **kwargs): if callable(maybe_callable): return maybe_callable(obj, **kwargs) return maybe_callable data = self.copy() for k, v in kwargs.items(): data[k] = apply_if_callable(v, data) return data
[docs] def copy(self, deep=True): if deep is False: raise NotImplementedError("Not support `deep=False` for now") return super().copy()
class DataFrameGroupByChunkData(BaseDataFrameChunkData): type_name = "DataFrameGroupBy" _key_dtypes = SeriesField("key_dtypes") _selection = AnyField("selection") @property def key_dtypes(self): return self._key_dtypes @property def selection(self): return self._selection def _get_params(self) -> Dict[str, Any]: p = super()._get_params() p.update(dict(key_dtypes=self.key_dtypes, selection=self.selection)) return p def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() key_dtypes = params.pop("key_dtypes", None) if key_dtypes is not None: self._key_dtypes = key_dtypes selection = params.pop("selection", None) if selection is not None: self._selection = selection super()._set_params(params) params = property(_get_params, _set_params) @classmethod def get_params_from_data(cls, data: GroupByWrapper) -> Dict[str, Any]: params = super().get_params_from_data(data.obj) if data.selection: dtypes = params["dtypes_value"].value[data.selection] params["dtypes_value"] = DtypesValue(value=dtypes) params["shape"] = data.shape return params def __init__(self, key_dtypes=None, selection=None, **kw): super().__init__(_key_dtypes=key_dtypes, _selection=selection, **kw) class DataFrameGroupByChunk(Chunk): __slots__ = () _allow_data_type_ = (DataFrameGroupByChunkData,) type_name = "DataFrameGroupBy" def __len__(self): return len(self._data) class SeriesGroupByChunkData(BaseSeriesChunkData): type_name = "SeriesGroupBy" _key_dtypes = AnyField("key_dtypes") @property def key_dtypes(self): return self._key_dtypes def _get_params(self) -> Dict[str, Any]: p = super()._get_params() p["key_dtypes"] = self.key_dtypes return p def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() key_dtypes = params.pop("key_dtypes", None) if key_dtypes is not None: self._key_dtypes = key_dtypes super()._set_params(new_params) params = property(_get_params, _set_params) @classmethod def get_params_from_data(cls, data: GroupByWrapper): series_name = data.selection or data.obj.name if hasattr(data.obj, "dtype"): dtype = data.obj.dtype else: dtype = data.obj.dtypes[series_name] return { "shape": (data.obj.shape[0],), "dtype": dtype, "index_value": parse_index(data.obj.index, store_data=False), "name": series_name, } def __init__(self, key_dtypes=None, **kw): super().__init__(_key_dtypes=key_dtypes, **kw) class SeriesGroupByChunk(Chunk): __slots__ = () _allow_data_type_ = (SeriesGroupByChunkData,) type_name = "SeriesGroupBy" def __len__(self): return len(self._data) class DataFrameGroupByData(BaseDataFrameData): type_name = "DataFrameGroupBy" _key_dtypes = SeriesField("key_dtypes") _selection = AnyField("selection") _chunks = ListField( "chunks", FieldTypes.reference(DataFrameGroupByChunkData), on_serialize=lambda x: [it.data for it in x] if x is not None else x, on_deserialize=lambda x: [DataFrameGroupByChunk(it) for it in x] if x is not None else x, ) @property def key_dtypes(self): return self._key_dtypes @property def selection(self): return self._selection def _get_params(self) -> Dict[str, Any]: p = super()._get_params() p.update(dict(key_dtypes=self.key_dtypes, selection=self.selection)) return p def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() key_dtypes = params.pop("key_dtypes", None) if key_dtypes is not None: self._key_dtypes = key_dtypes selection = params.pop("selection", None) if selection is not None: self._selection = selection super()._set_params(params) params = property(_get_params, _set_params) def __init__(self, key_dtypes=None, selection=None, **kw): super().__init__(_key_dtypes=key_dtypes, _selection=selection, **kw) def _equal(self, o): # FIXME We need to implemented a true `==` operator for DataFrameGroupby if is_build_mode(): return self is o else: return self == o class SeriesGroupByData(BaseSeriesData): type_name = "SeriesGroupBy" _key_dtypes = AnyField("key_dtypes") _chunks = ListField( "chunks", FieldTypes.reference(SeriesGroupByChunkData), on_serialize=lambda x: [it.data for it in x] if x is not None else x, on_deserialize=lambda x: [SeriesGroupByChunk(it) for it in x] if x is not None else x, ) @property def key_dtypes(self): return self._key_dtypes def _get_params(self) -> Dict[str, Any]: p = super()._get_params() p["key_dtypes"] = self.key_dtypes return p def _set_params(self, new_params: Dict[str, Any]): params = new_params.copy() key_dtypes = params.pop("key_dtypes", None) if key_dtypes is not None: self._key_dtypes = key_dtypes super()._set_params(params) params = property(_get_params, _set_params) def __init__(self, key_dtypes=None, **kw): super().__init__(_key_dtypes=key_dtypes, **kw) def _equal(self, o): # FIXME We need to implemented a true `==` operator for DataFrameGroupby if is_build_mode(): return self is o else: return self == o class GroupBy(Tileable, _ToPandasMixin): __slots__ = () class DataFrameGroupBy(GroupBy): __slots__ = () _allow_data_type_ = (DataFrameGroupByData,) type_name = "DataFrameGroupBy" def __eq__(self, other): return self._equal(other) def __hash__(self): # NB: we have customized __eq__ explicitly, thus we need define __hash__ explicitly as well. return super().__hash__() def __getattr__(self, item): try: return super().__getattr__(item) except AttributeError: if item in self.dtypes: return self[item] else: raise def __dir__(self): result = list(super().__dir__()) return sorted( result + [k for k in self.dtypes.index if isinstance(k, str) and k.isidentifier()] ) class SeriesGroupBy(GroupBy): __slots__ = () _allow_data_type_ = (SeriesGroupByData,) type_name = "SeriesGroupBy" def __eq__(self, other): return self._equal(other) def __hash__(self): # NB: we have customized __eq__ explicitly, thus we need define __hash__ explicitly as well. return super().__hash__() class CategoricalChunkData(ChunkData): __slots__ = () type_name = "Categorical" # required fields _shape = TupleField( "shape", FieldTypes.int64, on_serialize=on_serialize_shape, on_deserialize=on_deserialize_shape, ) # optional field _dtype = DataTypeField("dtype") _categories_value = ReferenceField( "categories_value", IndexValue, on_deserialize=_on_deserialize_index_value ) def __init__( self, op=None, shape=None, index=None, dtype=None, categories_value=None, **kw ): super().__init__( _op=op, _shape=shape, _index=index, _dtype=dtype, _categories_value=categories_value, **kw, ) @property def params(self) -> Dict[str, Any]: # params return the properties which useful to rebuild a new chunk return { "shape": self.shape, "dtype": self.dtype, "index": self.index, "categories_value": self.categories_value, } @params.setter def params(self, new_params: Dict[str, Any]): params = new_params.copy() params.pop("index", None) # index not needed to update new_shape = params.pop("shape", None) if new_shape is not None: self._shape = new_shape dtype = params.pop("dtype", None) if dtype is not None: self._dtype = dtype categories_value = params.pop("categories_value", None) if categories_value is not None: self._categories_value = categories_value if params: # pragma: no cover raise TypeError(f"Unknown params: {list(params)}") @classmethod def get_params_from_data(cls, data: pd.Categorical) -> Dict[str, Any]: return { "shape": data.shape, "dtype": data.dtype, "categories_value": parse_index(data.categories, store_data=True), } @property def shape(self): return getattr(self, "_shape", None) @property def ndim(self): return len(self.shape) @property def dtype(self): return self._dtype @property def categories_value(self): return self._categories_value class CategoricalChunk(Chunk): __slots__ = () _allow_data_type_ = (CategoricalChunkData,) type_name = "Categorical" class CategoricalData(HasShapeTileableData, _ToPandasMixin): __slots__ = ("_cache",) type_name = "Categorical" # optional field _dtype = DataTypeField("dtype") _categories_value = ReferenceField( "categories_value", IndexValue, on_deserialize=_on_deserialize_index_value ) _chunks = ListField( "chunks", FieldTypes.reference(CategoricalChunkData), on_serialize=lambda x: [it.data for it in x] if x is not None else x, on_deserialize=lambda x: [CategoricalChunk(it) for it in x] if x is not None else x, ) def __init__( self, op=None, shape=None, nsplits=None, dtype=None, categories_value=None, chunks=None, **kw, ): super().__init__( _op=op, _shape=shape, _nsplits=nsplits, _dtype=dtype, _categories_value=categories_value, _chunks=chunks, **kw, ) @property def params(self) -> Dict[str, Any]: # params return the properties which useful to rebuild a new tileable object return { "shape": self.shape, "dtype": self.dtype, "categories_value": self.categories_value, } @params.setter def params(self, new_params: Dict[str, Any]): params = new_params.copy() new_shape = params.pop("shape", None) if new_shape is not None: self._shape = new_shape dtype = params.pop("dtype", None) if dtype is not None: self._dtype = dtype categories_value = params.pop("categories_value", None) if categories_value is not None: self._categories_value = categories_value if params: # pragma: no cover raise TypeError(f"Unknown params: {list(params)}") def refresh_params(self): # refresh params when chunks updated refresh_tileable_shape(self) if self._dtype is None: self._dtype = self.chunks[0].dtype if self._categories_value is None: categories = [] for chunk in self.chunks: categories.extend(chunk.categories_value.to_pandas()) self._categories_value = parse_index( pd.Categorical(categories).categories, store_data=True ) def _to_str(self, representation=False): if is_build_mode() or len(self._executed_sessions) == 0: # in build mode, or not executed, just return representation if representation: return f"{self.type_name} <op={type(self.op).__name__}, key={self.key}>" else: return f"{self.type_name}(op={type(self.op).__name__})" else: data = self.fetch(session=self._executed_sessions[-1]) return repr(data) if repr(data) else str(data) def __str__(self): return self._to_str(representation=False) def __repr__(self): return self._to_str(representation=True) def _equal(self, o): # FIXME We need to implemented a true `==` operator for DataFrameGroupby if is_build_mode(): return self is o else: # pragma: no cover return self == o @property def dtype(self): return getattr(self, "_dtype", None) or self.op.dtype @property def categories_value(self): return self._categories_value def __eq__(self, other): return self._equal(other) def __hash__(self): # NB: we have customized __eq__ explicitly, thus we need define __hash__ explicitly as well. return super().__hash__() class Categorical(HasShapeTileable, _ToPandasMixin): __slots__ = () _allow_data_type_ = (CategoricalData,) type_name = "Categorical" def __len__(self): return len(self._data) def __eq__(self, other): return self._equal(other) def __hash__(self): # NB: we have customized __eq__ explicitly, thus we need define __hash__ explicitly as well. return super().__hash__() class DataFrameOrSeriesChunkData(ChunkData): __slots__ = () type_name = "DataFrameOrSeries" _collapse_axis = Int8Field("collapse_axis") _data_type = StringField("data_type") _data_params = DictField("data_params") def __init__( self, op=None, index=None, collapse_axis=None, data_type=None, data_params=None, **kw, ): self._collapse_axis = collapse_axis self._index = index self._data_type = data_type self._data_params = data_params or dict() super().__init__(_op=op, **kw) def __getattr__(self, item): if item in self._data_params: return self._data_params[item] raise AttributeError(f"'{type(self)}' object has no attribute '{item}'") @property def ndim(self) -> int: return (self._data_type == "dataframe") + 1 @property def params(self) -> Dict[str, Any]: return { "collapse_axis": self._collapse_axis, "index": self._index, "data_type": self._data_type, "data_params": self._data_params, } @params.setter def params(self, new_params: Dict[str, Any]): self._data_type = new_params.get("data_type") if self._collapse_axis is not None and self._data_type == "series": self._index = (self._index[1 - self._collapse_axis],) if self._collapse_axis is None and self._data_type == "dataframe": self._index = (self._index[0], 0) data_params = new_params["data_params"] if self._data_type == "dataframe": data_params["dtypes"] = data_params["dtypes_value"].value data_params["columns_value"] = parse_index( data_params["dtypes_value"].value.index, store_data=True ) self._data_params = {k: v for k, v in data_params.items()} @classmethod def get_params_from_data(cls, data: Any) -> Dict[str, Any]: if is_dataframe(data): return { "data_type": "dataframe", "data_params": DataFrameChunkData.get_params_from_data(data), } else: return { "data_type": "series", "data_params": SeriesChunkData.get_params_from_data(data), } class DataFrameOrSeriesChunk(Chunk): __slots__ = () _allow_data_type_ = (DataFrameOrSeriesChunkData,) type_name = "DataFrameOrSeries" class DataFrameOrSeriesData(HasShapeTileableData, _ToPandasMixin): __slots__ = () _chunks = ListField( "chunks", FieldTypes.reference(DataFrameOrSeriesChunkData), on_serialize=lambda x: [it.data for it in x] if x is not None else x, on_deserialize=lambda x: [DataFrameOrSeriesChunk(it) for it in x] if x is not None else x, ) _data_type = StringField("data_type") _data_params = DictField("data_params") def __init__( self, op=None, chunks=None, data_type=None, data_params=None, **kw, ): self._data_type = data_type self._data_params = data_params or dict() super().__init__( _op=op, _chunks=chunks, **kw, ) def __getattr__(self, item): if item in self._data_params: return self._data_params[item] raise AttributeError(f"'{type(self)}' object has no attribute '{item}'") @property def shape(self): return self._data_params.get("shape", None) @property def nsplits(self): return self._data_params.get("nsplits", None) @property def data_type(self): return self._data_type @property def data_params(self): return self._data_params @property def params(self) -> Dict[str, Any]: return {"data_type": self._data_type, "data_params": self._data_params} @params.setter def params(self, new_params: Dict[str, Any]): # After execution, create DataFrameFetch, and the data # corresponding to the original key is still DataFrameOrSeries type, # so when restoring DataFrameOrSeries type, # there is no "data_type" field in params. if "data_type" not in new_params: if "dtype" in new_params: self._data_type = "series" else: self._data_type = "dataframe" self._data_params = new_params.copy() else: self._data_type = new_params.get("data_type") self._data_params = { k: v for k, v in new_params.get("data_params", {}).items() } def refresh_params(self): index_to_index_values = dict() for chunk in self.chunks: if chunk.ndim == 1: index_to_index_values[chunk.index] = chunk.index_value elif chunk.index[1] == 0: index_to_index_values[chunk.index] = chunk.index_value index_value = merge_index_value(index_to_index_values, store_data=False) nsplits = calc_nsplits({c.index: c.shape for c in self.chunks}) shape = tuple(sum(ns) for ns in nsplits) data_params = dict() data_params["nsplits"] = nsplits data_params["shape"] = shape data_params["index_value"] = index_value self._data_type = self._chunks[0]._data_type if self.data_type == "dataframe": all_dtypes = [c.dtypes_value.value for c in self.chunks if c.index[0] == 0] dtypes = pd.concat(all_dtypes) data_params["dtypes"] = dtypes columns_values = parse_index(dtypes.index, store_data=True) data_params["columns_value"] = columns_values data_params["dtypes_value"] = DtypesValue( key=tokenize(dtypes), value=dtypes ) else: data_params["dtype"] = self.chunks[0].dtype data_params["name"] = self.chunks[0].name self._data_params.update(data_params) def ensure_data(self): from .fetch.core import DataFrameFetch self.execute() default_sess = get_default_session() self._detach_session(default_sess._session) fetch_tileable = default_sess._session._tileable_to_fetch[self] new = DataFrameFetch( output_types=[getattr(OutputType, self.data_type)] ).new_tileable( [], _key=self.key, chunks=fetch_tileable.chunks, nsplits=fetch_tileable.nsplits, **self.data_params, ) new._attach_session(default_sess._session) return new class DataFrameOrSeries(HasShapeTileable, _ToPandasMixin): __slots__ = () _allow_data_type_ = (DataFrameOrSeriesData,) type_name = "DataFrameOrSeries" INDEX_TYPE = (Index, IndexData) INDEX_CHUNK_TYPE = (IndexChunk, IndexChunkData) SERIES_TYPE = (Series, SeriesData) SERIES_CHUNK_TYPE = (SeriesChunk, SeriesChunkData) DATAFRAME_OR_SERIES_TYPE = (DataFrameOrSeries, DataFrameOrSeriesData) DATAFRAME_OR_SERIES_CHUNK_TYPE = (DataFrameOrSeriesChunk, DataFrameOrSeriesChunkData) DATAFRAME_TYPE = (DataFrame, DataFrameData) DATAFRAME_CHUNK_TYPE = (DataFrameChunk, DataFrameChunkData) DATAFRAME_GROUPBY_TYPE = (DataFrameGroupBy, DataFrameGroupByData) DATAFRAME_GROUPBY_CHUNK_TYPE = (DataFrameGroupByChunk, DataFrameGroupByChunkData) SERIES_GROUPBY_TYPE = (SeriesGroupBy, SeriesGroupByData) SERIES_GROUPBY_CHUNK_TYPE = (SeriesGroupByChunk, SeriesGroupByChunkData) GROUPBY_TYPE = (GroupBy,) + DATAFRAME_GROUPBY_TYPE + SERIES_GROUPBY_TYPE GROUPBY_CHUNK_TYPE = DATAFRAME_GROUPBY_CHUNK_TYPE + SERIES_GROUPBY_CHUNK_TYPE CATEGORICAL_TYPE = (Categorical, CategoricalData) CATEGORICAL_CHUNK_TYPE = (CategoricalChunk, CategoricalChunkData) TILEABLE_TYPE = ( INDEX_TYPE + SERIES_TYPE + DATAFRAME_TYPE + GROUPBY_TYPE + CATEGORICAL_TYPE ) CHUNK_TYPE = ( INDEX_CHUNK_TYPE + SERIES_CHUNK_TYPE + DATAFRAME_CHUNK_TYPE + GROUPBY_CHUNK_TYPE + CATEGORICAL_CHUNK_TYPE ) register_output_types(OutputType.dataframe, DATAFRAME_TYPE, DATAFRAME_CHUNK_TYPE) register_output_types(OutputType.series, SERIES_TYPE, SERIES_CHUNK_TYPE) register_output_types( OutputType.df_or_series, DATAFRAME_OR_SERIES_TYPE, DATAFRAME_OR_SERIES_CHUNK_TYPE ) register_output_types(OutputType.index, INDEX_TYPE, INDEX_CHUNK_TYPE) register_output_types(OutputType.categorical, CATEGORICAL_TYPE, CATEGORICAL_CHUNK_TYPE) register_output_types( OutputType.dataframe_groupby, DATAFRAME_GROUPBY_TYPE, DATAFRAME_GROUPBY_CHUNK_TYPE ) register_output_types( OutputType.series_groupby, SERIES_GROUPBY_TYPE, SERIES_GROUPBY_CHUNK_TYPE )