Source code for xorbits._mars.learn.contrib.lightgbm._predict

# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pickle

import numpy as np
import pandas as pd

from .... import opcodes
from ....core import recursive_tile
from ....dataframe.utils import parse_index
from ....serialization.serializables import BoolField, BytesField, DictField, KeyField
from ....tensor.core import TENSOR_TYPE, TensorOrder
from ...operands import LearnOperand, LearnOperandMixin, OutputType


class LGBMPredict(LearnOperand, LearnOperandMixin):
    _op_type_ = opcodes.LGBM_PREDICT

    _data = KeyField("data")
    _model = BytesField("model", on_serialize=pickle.dumps, on_deserialize=pickle.loads)
    _proba = BoolField("proba")
    _kwds = DictField("kwds")

    def __init__(
        self, data=None, model=None, proba=None, kwds=None, output_types=None, **kw
    ):
        super().__init__(
            _data=data,
            _model=model,
            _proba=proba,
            _kwds=kwds,
            _output_types=output_types,
            **kw,
        )

    @property
    def data(self):
        return self._data

    @property
    def model(self):
        return self._model

    @property
    def proba(self) -> bool:
        return self._proba

    @property
    def kwds(self) -> dict:
        return self._kwds

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        it = iter(inputs)
        self._data = next(it)

    def __call__(self):
        num_class = int(getattr(self.model, "n_classes_", 2))
        if self.proba:
            shape = (self.data.shape[0], num_class)
        else:
            shape = (self.data.shape[0],)

        if self._proba:
            dtype = np.dtype(np.float_)
        elif hasattr(self.model, "classes_"):
            dtype = np.array(self.model.classes_).dtype
        else:
            dtype = getattr(self.model, "out_dtype_", np.dtype("float"))

        if self.output_types[0] == OutputType.tensor:
            # tensor
            return self.new_tileable(
                [self.data], shape=shape, dtype=dtype, order=TensorOrder.C_ORDER
            )
        elif self.output_types[0] == OutputType.dataframe:
            # dataframe
            dtypes = pd.Series([dtype] * num_class)
            columns_value = parse_index(pd.Index(self.model.classes_), store_data=True)
            return self.new_tileable(
                [self.data],
                shape=shape,
                dtypes=dtypes,
                columns_value=columns_value,
                index_value=self.data.index_value,
            )
        else:
            return self.new_tileable(
                [self.data],
                shape=shape,
                index_value=self.data.index_value,
                name="predictions",
                dtype=dtype,
            )

    @classmethod
    def tile(cls, op: "LGBMPredict"):
        out = op.outputs[0]
        out_chunks = []
        data = op.data
        if data.chunk_shape[1] > 1:
            data = yield from recursive_tile(data.rechunk({1: op.data.shape[1]}))

        for in_chunk in data.chunks:
            chunk_op = op.copy().reset_key()
            chunk_index = (in_chunk.index[0],)

            if len(out.shape) > 1:
                chunk_shape = (in_chunk.shape[0], out.shape[1])
                chunk_index += (0,)
            else:
                chunk_shape = (in_chunk.shape[0],)

            if op.output_types[0] == OutputType.tensor:
                out_chunk = chunk_op.new_chunk(
                    [in_chunk],
                    shape=chunk_shape,
                    dtype=out.dtype,
                    order=out.order,
                    index=chunk_index,
                )
            elif op.output_types[0] == OutputType.dataframe:
                # dataframe chunk
                out_chunk = chunk_op.new_chunk(
                    [in_chunk],
                    shape=chunk_shape,
                    dtypes=out.dtypes,
                    columns_value=out.columns_value,
                    index_value=in_chunk.index_value,
                    index=chunk_index,
                )
            else:
                # series chunk
                out_chunk = chunk_op.new_chunk(
                    [in_chunk],
                    shape=chunk_shape,
                    dtype=out.dtype,
                    index_value=in_chunk.index_value,
                    name=out.name,
                    index=chunk_index,
                )
            out_chunks.append(out_chunk)

        new_op = op.copy()
        params = out.params
        params["chunks"] = out_chunks
        nsplits = (data.nsplits[0],)
        if out.ndim > 1:
            nsplits += ((out.shape[1],),)
        params["nsplits"] = nsplits
        return new_op.new_tileables(op.inputs, kws=[params])

    @classmethod
    def execute(cls, ctx, op: "LGBMPredict"):
        in_data = ctx[op.data.key]
        in_data = in_data.spmatrix if hasattr(in_data, "spmatrix") else in_data
        out = op.outputs[0]

        if op.data.shape[0] == 0:
            result = np.array([])
        elif op.proba:
            result = op.model.predict_proba(in_data, **op.kwds)
        else:
            result = op.model.predict(in_data, **op.kwds)

        if op.output_types[0] == OutputType.dataframe:
            result = pd.DataFrame(
                result, index=in_data.index, columns=out.columns_value.to_pandas()
            )
        elif op.output_types[0] == OutputType.series:
            result = pd.Series(result, index=in_data.index, name="predictions")

        ctx[out.key] = result


def predict_base(model, data, session=None, run_kwargs=None, run=True, **kwargs):
    from lightgbm import LGBMModel

    if not isinstance(model, LGBMModel):
        raise TypeError(
            f"model has to be a lightgbm.LGBMModel, got {type(model)} instead"
        )
    model = model.to_local() if hasattr(model, "to_local") else model

    proba = kwargs.pop("proba", hasattr(model, "classes_"))

    if isinstance(data, TENSOR_TYPE):
        output_types = [OutputType.tensor]
    elif proba:
        output_types = [OutputType.dataframe]
    else:
        output_types = [OutputType.series]

    op = LGBMPredict(
        data=data,
        model=model,
        gpu=data.op.gpu,
        output_types=output_types,
        proba=proba,
        kwds=kwargs,
    )
    result = op()
    if run:
        result.execute(session=session, **(run_kwargs or dict()))
    return result


[docs]def predict(model, data, session=None, run_kwargs=None, run=True, **kw): if hasattr(model, "classes_"): return predict_base( model, data, session=session, run_kwargs=run_kwargs, proba=False, run=run, **kw, ) else: return predict_base( model, data, session=session, run_kwargs=run_kwargs, run=run, **kw )
[docs]def predict_proba(model, data, session=None, run_kwargs=None, run=True, **kw): return predict_base( model, data, session=session, run_kwargs=run_kwargs, run=run, proba=True, **kw )