Source code for xorbits._mars.tensor.base.unique

# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools

import numpy as np

from ... import opcodes as OperandDef
from ...config import options
from ...core import recursive_tile
from ...core.operand import OperandStage
from ...lib import sparse
from ...lib.sparse.core import get_array_module as get_sparse_array_module
from ...serialization.serializables import BoolField, Int32Field, Int64Field
from ...utils import has_unknown_shape, is_same_module
from ..array_utils import as_same_device, device
from ..core import TensorOrder
from ..operands import TensorMapReduceOperand, TensorOperandMixin, TensorShuffleProxy
from ..utils import hash_on_axis, validate_axis


class TensorUnique(TensorMapReduceOperand, TensorOperandMixin):
    _op_type_ = OperandDef.UNIQUE

    _return_index = BoolField("return_index")
    _return_inverse = BoolField("return_inverse")
    _return_counts = BoolField("return_counts")
    _axis = Int32Field("axis")
    _aggregate_size = Int32Field("aggregate_size")

    _start_pos = Int64Field("start_pos")

    def __init__(
        self,
        return_index=None,
        return_inverse=None,
        return_counts=None,
        axis=None,
        start_pos=None,
        aggregate_size=None,
        **kw
    ):
        super().__init__(
            _return_index=return_index,
            _return_inverse=return_inverse,
            _return_counts=return_counts,
            _axis=axis,
            _start_pos=start_pos,
            _aggregate_size=aggregate_size,
            **kw
        )

    @property
    def output_limit(self):
        if self.stage == OperandStage.map:
            return 1
        return (
            1
            + bool(self._return_index)
            + bool(self._return_inverse)
            + bool(self._return_counts)
        )

    @property
    def return_index(self):
        return self._return_index

    @property
    def return_inverse(self):
        return self._return_inverse

    @property
    def return_counts(self):
        return self._return_counts

    @property
    def axis(self):
        return self._axis

    @property
    def aggregate_size(self):
        return self._aggregate_size

    @property
    def start_pos(self):
        return self._start_pos

    @classmethod
    def _gen_kws(cls, op, input_obj, chunk=False, chunk_index=None):
        kws = []

        # unique tensor
        shape = list(input_obj.shape)
        shape[op.axis] = np.nan
        kw = {"shape": tuple(shape), "dtype": input_obj.dtype, "gpu": input_obj.op.gpu}
        if chunk:
            idx = [0] * len(shape)
            idx[op.axis] = chunk_index or 0
            kw["index"] = tuple(idx)
        kws.append(kw)

        # unique indices tensor
        if op.return_index:
            kw = {
                "shape": (np.nan,),
                "dtype": np.dtype(np.intp),
                "gpu": input_obj.op.gpu,
                "type": "indices",
            }
            if chunk:
                kw["index"] = (chunk_index or 0,)
            kws.append(kw)

        # unique inverse tensor
        if op.return_inverse:
            kw = {
                "shape": (input_obj.shape[op.axis],),
                "dtype": np.dtype(np.intp),
                "gpu": input_obj.op.gpu,
                "type": "inverse",
            }
            if chunk:
                kw["index"] = (chunk_index or 0,)
            kws.append(kw)

        # unique counts tensor
        if op.return_counts:
            kw = {
                "shape": (np.nan,),
                "dtype": np.dtype(np.int_),
                "gpu": input_obj.op.gpu,
                "type": "counts",
            }
            if chunk:
                kw["index"] = (chunk_index or 0,)
            kws.append(kw)

        return kws

    def __call__(self, ar):
        from .atleast_1d import atleast_1d

        ar = atleast_1d(ar)
        if self.axis is None:
            if ar.ndim > 1:
                ar = ar.flatten()
            self._axis = 0
        else:
            self._axis = validate_axis(ar.ndim, self._axis)

        kws = self._gen_kws(self, ar)
        tensors = self.new_tensors([ar], kws=kws, order=TensorOrder.C_ORDER)
        if len(tensors) == 1:
            return tensors[0]
        return tensors

    @classmethod
    def _tile_one_chunk(cls, op):
        outs = op.outputs
        ins = op.inputs

        chunk_op = op.copy().reset_key()
        in_chunk = ins[0].chunks[0]
        kws = cls._gen_kws(chunk_op, in_chunk, chunk=True)
        out_chunks = chunk_op.new_chunks([in_chunk], kws=kws, order=outs[0].order)
        new_op = op.copy()
        kws = [out.params.copy() for out in outs]
        for kw, out_chunk in zip(kws, out_chunks):
            kw["chunks"] = [out_chunk]
            kw["nsplits"] = tuple((s,) for s in out_chunk.shape)
        return new_op.new_tensors(ins, kws=kws, order=outs[0].order)

    @classmethod
    def _tile_via_shuffle(cls, op):
        # rechunk the axes except the axis to do unique into 1 chunk
        inp = op.inputs[0]
        if has_unknown_shape(inp):
            yield

        if inp.ndim > 1:
            new_chunk_size = dict()
            for axis in range(inp.ndim):
                if axis == op.axis:
                    continue
                if np.isnan(inp.shape[axis]):
                    yield
                new_chunk_size[axis] = inp.shape[axis]
            if has_unknown_shape(inp):
                yield
            inp = yield from recursive_tile(inp.rechunk(new_chunk_size))

        aggregate_size = op.aggregate_size
        if aggregate_size is None:
            aggregate_size = max(inp.chunk_shape[op.axis] // options.combine_size, 1)

        unique_on_chunk_sizes = inp.nsplits[op.axis]
        start_poses = np.cumsum((0,) + unique_on_chunk_sizes).tolist()[:-1]
        map_chunks = []
        for c in inp.chunks:
            map_op = TensorUnique(
                stage=OperandStage.map,
                return_index=op.return_index,
                return_inverse=op.return_inverse,
                return_counts=op.return_counts,
                axis=op.axis,
                aggregate_size=aggregate_size,
                start_pos=start_poses[c.index[op.axis]],
                dtype=inp.dtype,
            )
            shape = list(c.shape)
            shape[op.axis] = np.nan
            map_chunks.append(map_op.new_chunk([c], shape=tuple(shape), index=c.index))

        shuffle_chunk = TensorShuffleProxy(
            dtype=inp.dtype, _tensor_keys=[inp.op.key]
        ).new_chunk(map_chunks, shape=())

        reduce_chunks = [list() for _ in range(len(op.outputs))]
        for i in range(aggregate_size):
            reduce_op = TensorUnique(
                stage=OperandStage.reduce,
                return_index=op.return_index,
                return_inverse=op.return_inverse,
                return_counts=op.return_counts,
                axis=op.axis,
                reducer_index=(i,),
                reducer_phase="agg",
                n_reducers=aggregate_size,
            )
            kws = cls._gen_kws(op, inp, chunk=True, chunk_index=i)
            chunks = reduce_op.new_chunks(
                [shuffle_chunk], kws=kws, order=op.outputs[0].order
            )
            if op.return_inverse:
                inverse_idx = 2 if op.return_index else 1
                for j, chk in enumerate(chunks):
                    if j == inverse_idx:
                        chk.is_mapper = True
                    else:
                        chk.is_mapper = False
            for j, c in enumerate(chunks):
                reduce_chunks[j].append(c)

        if op.return_inverse:
            inverse_pos = 2 if op.return_index else 1
            map_inverse_chunks = reduce_chunks[inverse_pos]
            inverse_shuffle_chunk = TensorShuffleProxy(
                dtype=map_inverse_chunks[0].dtype
            ).new_chunk(map_inverse_chunks, shape=())
            inverse_chunks = []
            for j, cs in enumerate(unique_on_chunk_sizes):
                chunk_op = TensorUnique(
                    stage=OperandStage.reduce,
                    n_reducers=len(unique_on_chunk_sizes),
                    dtype=map_inverse_chunks[0].dtype,
                    reducer_index=(j,),
                    reducer_phase="inverse",
                )
                inverse_chunk = chunk_op.new_chunk(
                    [inverse_shuffle_chunk], shape=(cs,), index=(j,)
                )
                inverse_chunks.append(inverse_chunk)
            reduce_chunks[inverse_pos] = inverse_chunks

        kws = [out.params for out in op.outputs]
        for kw, chunks in zip(kws, reduce_chunks):
            kw["chunks"] = chunks
        unique_nsplits = list(inp.nsplits)
        unique_nsplits[op.axis] = (np.nan,) * len(reduce_chunks[0])
        kws[0]["nsplits"] = tuple(unique_nsplits)
        i = 1
        if op.return_index:
            kws[i]["nsplits"] = ((np.nan,) * len(reduce_chunks[i]),)
            i += 1
        if op.return_inverse:
            kws[i]["nsplits"] = (inp.nsplits[op.axis],)
            i += 1
        if op.return_counts:
            kws[i]["nsplits"] = ((np.nan,) * len(reduce_chunks[i]),)

        new_op = op.copy()
        return new_op.new_tensors(op.inputs, kws=kws)

    @classmethod
    def tile(cls, op: "TensorUnique"):
        if len(op.inputs[0].chunks) == 1:
            return cls._tile_one_chunk(op)
        else:
            return (yield from cls._tile_via_shuffle(op))

    @classmethod
    def _execute_map(cls, ctx, op: "TensorUnique"):
        (ar,), device_id, xp = as_same_device(
            [ctx[c.key] for c in op.inputs], device=op.device, ret_extra=True
        )
        n_reducers = op.aggregate_size

        with device(device_id):
            results = xp.unique(
                ar,
                return_index=op.return_index,
                return_inverse=op.return_inverse,
                return_counts=op.return_counts,
                axis=op.axis,
            )
            results = (results,) if not isinstance(results, tuple) else results
            results_iter = iter(results)
            unique_ar = next(results_iter)
            indices_ar = next(results_iter) + op.start_pos if op.return_index else None
            inverse_ar = next(results_iter) if op.return_inverse else None
            counts_ar = next(results_iter) if op.return_counts else None

            if is_same_module(xp, sparse):
                dense_xp = get_sparse_array_module(unique_ar)
            else:
                dense_xp = xp
            unique_index = (
                dense_xp.arange(unique_ar.shape[op.axis])
                if inverse_ar is not None
                else None
            )
            if unique_ar.size > 0:
                unique_reducers = dense_xp.asarray(
                    hash_on_axis(unique_ar, op.axis, n_reducers)
                )
            else:
                unique_reducers = dense_xp.empty_like(unique_ar)
            ind_ar = dense_xp.arange(ar.shape[op.axis])

            for reducer in range(n_reducers):
                res = []
                cond = unique_reducers == reducer
                # unique
                slc = (slice(None),) * op.axis + (cond,)
                res.append(unique_ar[slc])
                # indices
                if indices_ar is not None:
                    res.append(indices_ar[cond])
                # inverse
                if inverse_ar is not None:
                    index_selected = unique_index[cond]
                    inv_cond = xp.isin(inverse_ar, index_selected)
                    inv_selected = xp.searchsorted(index_selected, inverse_ar[inv_cond])
                    ind_selected = ind_ar[inv_cond]
                    res.append(xp.stack([ind_selected, inv_selected]))
                # counts
                if counts_ar is not None:
                    res.append(counts_ar[cond])
                ctx[op.outputs[0].key, (reducer,)] = (
                    ctx.get_current_chunk().index,
                    tuple(res),
                )

    @classmethod
    def _execute_agg_reduce(cls, ctx, op: "TensorUnique"):
        input_indexes, input_data = zip(*list(op.iter_mapper_data(ctx)))

        inputs = list(zip(*input_data))
        flatten, device_id, xp = as_same_device(
            list(itertools.chain(*inputs)), device=op.device, ret_extra=True
        )
        n_ret = len(inputs[0])
        inputs = [flatten[i * n_ret : (i + 1) * n_ret] for i in range(len(inputs))]

        inputs_iter = iter(inputs)
        unique_arrays = next(inputs_iter)
        indices_arrays = next(inputs_iter) if op.return_index else None
        inverse_arrays = next(inputs_iter) if op.return_inverse else None
        counts_arrays = next(inputs_iter) if op.return_counts else None

        with device(device_id):
            ar = xp.concatenate(unique_arrays, axis=op.axis)
            result_return_inverse = op.return_inverse or op.return_counts
            axis = op.axis
            if ar.size == 0 or ar.shape[axis] == 0:
                # empty array on the axis
                results = [xp.empty(ar.shape)]
                i = 1
                for it in (op.return_index, op.return_inverse, op.return_counts):
                    if it:
                        results.append(xp.empty([], dtype=op.outputs[i].dtype))
                        i += 1
                results = tuple(results)
            else:
                results = xp.unique(
                    ar,
                    return_index=op.return_index,
                    return_inverse=result_return_inverse,
                    axis=axis,
                )
            results = (results,) if not isinstance(results, tuple) else results
            results_iter = iter(results)
            outputs_iter = iter(op.outputs)
            # unique array
            ctx[next(outputs_iter).key] = next(results_iter)

            if op.output_limit == 1:
                return

            # calc indices
            if op.return_index:
                ctx[next(outputs_iter).key] = xp.concatenate(indices_arrays)[
                    next(results_iter)
                ]
            # calc inverse
            try:
                inverse_result = next(results_iter)
                if op.return_inverse:
                    unique_sizes = tuple(ua.shape[op.axis] for ua in unique_arrays)
                    cum_unique_sizes = np.cumsum((0,) + unique_sizes)
                    indices_out_key = next(outputs_iter).key
                    for i, inverse_array in enumerate(inverse_arrays):
                        p = inverse_result[
                            cum_unique_sizes[i] : cum_unique_sizes[i + 1]
                        ]
                        r = xp.empty(inverse_array.shape, dtype=inverse_array.dtype)
                        if inverse_array.size > 0:
                            r[0] = inverse_array[0]
                            r[1] = p[inverse_array[1]]
                        # return unique length and
                        ctx[indices_out_key, (input_indexes[i][op.axis],)] = (
                            results[0].shape[op.axis],
                            r,
                        )
                # calc counts
                if op.return_counts:
                    result_counts = xp.zeros(results[0].shape[op.axis], dtype=int)
                    t = np.stack([inverse_result, np.concatenate(counts_arrays)])

                    def acc(a):
                        i, v = a
                        result_counts[i] += v

                    np.apply_along_axis(acc, 0, t)
                    ctx[next(outputs_iter).key] = xp.asarray(result_counts)
            except StopIteration:
                pass

    @classmethod
    def _execute_inverse_reduce(cls, ctx, op: "TensorUnique"):
        out = op.outputs[0]
        inputs = list(op.iter_mapper_data(ctx))
        unique_sizes = [inp[0] for inp in inputs]
        cum_unique_sizes = np.cumsum([0] + unique_sizes)
        invs, device_id, xp = as_same_device(
            [inp[1] for inp in inputs], device=op.device, ret_extra=True
        )
        with device(device_id):
            ret = xp.empty(out.shape, dtype=out.dtype)
            for i, inv in enumerate(invs):
                ret[inv[0]] = cum_unique_sizes[i] + inv[1]
            ctx[out.key] = ret

    @classmethod
    def execute(cls, ctx, op: "TensorUnique"):
        if op.stage == OperandStage.map:
            cls._execute_map(ctx, op)
        elif op.stage == OperandStage.reduce:
            if op.reducer_phase == "agg":
                cls._execute_agg_reduce(ctx, op)
            else:
                assert op.reducer_phase == "inverse"
                cls._execute_inverse_reduce(ctx, op)
        else:
            (ar,), device_id, xp = as_same_device(
                [ctx[c.key] for c in op.inputs], device=op.device, ret_extra=True
            )

            with device(device_id):
                kw = dict(
                    return_index=op.return_index,
                    return_inverse=op.return_inverse,
                    return_counts=op.return_counts,
                )
                if ar.dtype != object and sum(ar.shape) > 0:
                    # axis cannot pass when dtype is object or array size is 0
                    kw["axis"] = op.axis
                results = xp.unique(ar, **kw)
                outs = op.outputs
                if len(outs) == 1:
                    ctx[outs[0].key] = results
                    return

                assert len(outs) == len(results)
                for out, result in zip(outs, results):
                    ctx[out.key] = result


[docs]def unique( ar, return_index=False, return_inverse=False, return_counts=False, axis=None, aggregate_size=None, ): """ Find the unique elements of a tensor. Returns the sorted unique elements of a tensor. There are three optional outputs in addition to the unique elements: * the indices of the input tensor that give the unique values * the indices of the unique tensor that reconstruct the input tensor * the number of times each unique value comes up in the input tensor Parameters ---------- ar : array_like Input tensor. Unless `axis` is specified, this will be flattened if it is not already 1-D. return_index : bool, optional If True, also return the indices of `ar` (along the specified axis, if provided, or in the flattened tensor) that result in the unique tensor. return_inverse : bool, optional If True, also return the indices of the unique tensor (for the specified axis, if provided) that can be used to reconstruct `ar`. return_counts : bool, optional If True, also return the number of times each unique item appears in `ar`. axis : int or None, optional The axis to operate on. If None, `ar` will be flattened. If an integer, the subarrays indexed by the given axis will be flattened and treated as the elements of a 1-D tensor with the dimension of the given axis, see the notes for more details. Object tensors or structured tensors that contain objects are not supported if the `axis` kwarg is used. The default is None. aggregate_size: int or None, optional How many chunks will be after unique, default as #input.chunks / options.combine_size Returns ------- unique : Tensor The sorted unique values. unique_indices : Tensor, optional The indices of the first occurrences of the unique values in the original tensor. Only provided if `return_index` is True. unique_inverse : Tensor, optional The indices to reconstruct the original tensor from the unique tensor. Only provided if `return_inverse` is True. unique_counts : Tensor, optional The number of times each of the unique values comes up in the original tensor. Only provided if `return_counts` is True. Examples -------- >>> import mars.tensor as mt >>> mt.unique([1, 1, 2, 2, 3, 3]).execute() array([1, 2, 3]) >>> a = mt.array([[1, 1], [2, 3]]) >>> mt.unique(a).execute() array([1, 2, 3]) Return the unique rows of a 2D tensor >>> a = mt.array([[1, 0, 0], [1, 0, 0], [2, 3, 4]]) >>> mt.unique(a, axis=0).execute() array([[1, 0, 0], [2, 3, 4]]) Return the indices of the original tensor that give the unique values: >>> a = mt.array(['a', 'b', 'b', 'c', 'a']) >>> u, indices = mt.unique(a, return_index=True) >>> u.execute() array(['a', 'b', 'c'], dtype='|S1') >>> indices.execute() array([0, 1, 3]) >>> a[indices].execute() array(['a', 'b', 'c'], dtype='|S1') Reconstruct the input array from the unique values: >>> a = mt.array([1, 2, 6, 4, 2, 3, 2]) >>> u, indices = mt.unique(a, return_inverse=True) >>> u.execute() array([1, 2, 3, 4, 6]) >>> indices.execute() array([0, 1, 4, 3, 1, 2, 1]) >>> u[indices].execute() array([1, 2, 6, 4, 2, 3, 2]) """ op = TensorUnique( return_index=return_index, return_inverse=return_inverse, return_counts=return_counts, axis=axis, aggregate_size=aggregate_size, ) return op(ar)