Source code for xorbits._mars.learn.contrib.xgboost.predict

# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pickle

import numpy as np
import pandas as pd

from .... import opcodes as OperandDef
from ....core import recursive_tile
from ....dataframe.core import DATAFRAME_CHUNK_TYPE, SERIES_CHUNK_TYPE
from ....dataframe.utils import parse_index
from ....serialization.serializables import BytesField, DictField, FieldTypes, KeyField
from ....tensor.core import TENSOR_TYPE, TensorOrder
from ....utils import ensure_own_data, has_unknown_shape
from ...operands import LearnOperand, LearnOperandMixin, OutputType
from .dmatrix import ToDMatrix, check_data


class XGBPredict(LearnOperand, LearnOperandMixin):
    _op_type_ = OperandDef.XGBOOST_PREDICT

    data = KeyField("data", default=None)
    model = BytesField(
        "model", on_serialize=pickle.dumps, on_deserialize=pickle.loads, default=None
    )
    kwargs = DictField("kwargs", key_type=FieldTypes.string, default_factory=dict)

    def __init__(self, output_types=None, gpu=None, **kw):
        super().__init__(_output_types=output_types, gpu=gpu, **kw)

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        self.data = self._inputs[0]

    def __call__(self):
        num_class = self.model.attr("num_class")
        if num_class is not None:
            num_class = int(num_class)
        if num_class is not None:
            shape = (self.data.shape[0], num_class)
        else:
            shape = (self.data.shape[0],)
        inputs = [self.data]
        if self.output_types[0] == OutputType.tensor:
            # tensor
            return self.new_tileable(
                inputs,
                shape=shape,
                dtype=np.dtype(np.float32),
                order=TensorOrder.C_ORDER,
            )
        elif self.output_types[0] == OutputType.dataframe:
            # dataframe
            dtypes = pd.DataFrame(np.random.rand(0, num_class), dtype=np.float32).dtypes
            return self.new_tileable(
                inputs,
                shape=shape,
                dtypes=dtypes,
                columns_value=parse_index(dtypes.index),
                index_value=self.data.index_value,
            )
        else:
            # series
            return self.new_tileable(
                inputs,
                shape=shape,
                index_value=self.data.index_value,
                name="predictions",
                dtype=np.dtype(np.float32),
            )

    @classmethod
    def tile(cls, op: "XGBPredict"):
        out = op.outputs[0]
        out_chunks = []
        data = op.data
        if data.chunk_shape[1] > 1:
            if has_unknown_shape(op.data):
                yield
            data = yield from recursive_tile(data.rechunk({1: op.data.shape[1]}))
        for in_chunk in data.chunks:
            chunk_op = op.copy().reset_key()
            chunk_index = (in_chunk.index[0],)
            if op.model.attr("num_class"):
                chunk_shape = (in_chunk.shape[0], int(op.model.attr("num_class")))
                chunk_index += (0,)
            else:
                chunk_shape = (in_chunk.shape[0],)
            if op.output_types[0] == OutputType.tensor:
                out_chunk = chunk_op.new_chunk(
                    [in_chunk],
                    shape=chunk_shape,
                    dtype=out.dtype,
                    order=out.order,
                    index=chunk_index,
                )
            elif op.output_types[0] == OutputType.dataframe:
                # dataframe chunk
                out_chunk = chunk_op.new_chunk(
                    [in_chunk],
                    shape=chunk_shape,
                    dtypes=data.dtypes,
                    columns_value=data.columns_value,
                    index_value=in_chunk.index_value,
                    index=chunk_index,
                )
            else:
                # series chunk
                out_chunk = chunk_op.new_chunk(
                    [in_chunk],
                    shape=chunk_shape,
                    dtype=out.dtype,
                    index_value=in_chunk.index_value,
                    name=out.name,
                    index=chunk_index,
                )
            out_chunks.append(out_chunk)

        new_op = op.copy()
        params = out.params
        params["chunks"] = out_chunks
        nsplits = (data.nsplits[0],)
        if out.ndim > 1:
            nsplits += ((out.shape[1],),)
        params["nsplits"] = nsplits
        return new_op.new_tileables(op.inputs, kws=[params])

    @classmethod
    def execute(cls, ctx, op: "XGBPredict"):
        from xgboost import DMatrix

        raw_data = data = ctx[op.data.key]
        if isinstance(data, tuple):
            data = ToDMatrix.get_xgb_dmatrix(ensure_own_data(data))
        else:
            data = data.spmatrix if hasattr(data, "spmatrix") else data
            data = DMatrix(data)

        # do not pass arguments that are None
        kwargs = dict((k, v) for k, v in op.kwargs.items() if v is not None)
        result = op.model.predict(data, **kwargs)

        if isinstance(op.outputs[0], DATAFRAME_CHUNK_TYPE):
            result = pd.DataFrame(result, index=raw_data.index)
        elif isinstance(op.outputs[0], SERIES_CHUNK_TYPE):
            result = pd.Series(result, index=raw_data.index, name="predictions")

        ctx[op.outputs[0].key] = result


[docs]def predict( model, data, output_margin=False, ntree_limit=None, validate_features=True, base_margin=None, session=None, run_kwargs=None, run=True, ): import xgboost data = check_data(data) if not isinstance(model, xgboost.Booster): raise TypeError(f"model has to be a xgboost.Booster, got {type(model)} instead") num_class = model.attr("num_class") if isinstance(data, TENSOR_TYPE): output_types = [OutputType.tensor] elif num_class is not None: output_types = [OutputType.dataframe] else: output_types = [OutputType.series] kwargs = { "output_margin": output_margin, "ntree_limit": ntree_limit, "validate_features": validate_features, "base_margin": base_margin, } op = XGBPredict( data=data, model=model, kwargs=kwargs, gpu=data.op.gpu, output_types=output_types, ) result = op() if run: result.execute(session=session, **(run_kwargs or dict())) return result