# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from collections.abc import Iterable
from contextlib import contextmanager
import numpy as np
from ...config import options
from ...core import recursive_tile
from ...serialization.serializables import FieldTypes, Int32Field, TupleField
from ...utils import is_same_module
from ..array_utils import array_module, device
from ..base import broadcast_to
from ..core import TENSOR_CHUNK_TYPE, TENSOR_TYPE
from ..datasource import tensor as astensor
from ..operands import TensorMapReduceOperand, TensorOperand, TensorOperandMixin
from ..utils import broadcast_shape, decide_chunk_sizes, gen_random_seeds
[docs]class RandomState:
def __init__(self, seed=None):
self._random_state = np.random.RandomState(seed=seed)
def seed(self, seed=None):
"""
Seed the generator.
This method is called when `RandomState` is initialized. It can be
called again to re-seed the generator. For details, see `RandomState`.
Parameters
----------
seed : int or 1-d array_like, optional
Seed for `RandomState`.
Must be convertible to 32 bit unsigned integers.
See Also
--------
RandomState
"""
self._random_state.seed(seed=seed)
def to_numpy(self):
return self._random_state
@classmethod
def from_numpy(cls, np_random_state):
state = RandomState()
state._random_state = np_random_state
return state
@classmethod
def _handle_size(cls, size):
if size is None:
return size
try:
return tuple(int(s) for s in size)
except TypeError:
return (size,)
_random_state = RandomState()
def handle_array(arg):
if not isinstance(arg, TENSOR_TYPE):
if not isinstance(arg, Iterable):
return arg
arg = np.asarray(arg)
return arg[(0,) * max(1, arg.ndim)]
elif hasattr(arg, "op") and hasattr(arg.op, "data"):
return arg.op.data[(0,) * max(1, arg.ndim)]
return np.empty((0,), dtype=arg.dtype)
class TensorRandomOperandMixin(TensorOperandMixin):
__slots__ = ()
@classmethod
def tile(cls, op):
tensor = op.outputs[0]
chunk_size = tensor.extra_params.raw_chunk_size or options.chunk_size
nsplits = decide_chunk_sizes(tensor.shape, chunk_size, tensor.dtype.itemsize)
fields = getattr(op, "_input_fields_", [])
to_one_chunk_fields = set(getattr(op, "_into_one_chunk_fields_", list()))
new_inputs = []
changed = False
for field in fields:
t = getattr(op, field)
if not isinstance(t, TENSOR_TYPE):
continue
if field not in to_one_chunk_fields:
t_nsplits = nsplits
else:
t_nsplits = t.shape # into 1 chunk
rechunked = t.rechunk(t_nsplits)
if rechunked is not t:
yield from recursive_tile(rechunked)
changed = True
new_inputs.append(rechunked)
else:
new_inputs.append(t)
if changed:
op.inputs = new_inputs
idxes = list(itertools.product(*[range(len(s)) for s in nsplits]))
seeds = gen_random_seeds(len(idxes), np.random.RandomState(op.seed))
out_chunks = []
for seed, idx, shape in zip(seeds, idxes, itertools.product(*nsplits)):
inputs = []
for inp in op.inputs:
if len(inp.chunks) == 1:
inputs.append(inp.chunks[0])
else:
inputs.append(inp.cix[idx])
try:
s = len(tuple(op.size))
size = shape[:s]
except TypeError:
if op.size is None:
size = None
else:
size = shape[:1]
except AttributeError:
size = shape
chunk_op = op.copy().reset_key()
chunk_op.seed = int(seed)
chunk_op.size = size
out_chunk = chunk_op.new_chunk(
inputs, shape=shape, index=idx, order=tensor.order
)
out_chunks.append(out_chunk)
new_op = op.copy()
return new_op.new_tensors(
op.inputs,
tensor.shape,
order=tensor.order,
chunks=out_chunks,
nsplits=nsplits,
**tensor.extra_params
)
@classmethod
def execute(cls, ctx, op):
xp = array_module(op.gpu)
if is_same_module(xp, np):
device_id = -1
else:
device_id = op.device or 0
get_val = lambda x: ctx[x.key] if isinstance(x, TENSOR_CHUNK_TYPE) else x
with device(device_id):
rs = xp.random.RandomState(op.seed)
method_name = getattr(cls, "_func_name")
try:
if method_name in ("rand", "randn"):
try:
res = getattr(rs, method_name)(*op.size, dtype=op.dtype)
except TypeError:
res = getattr(rs, method_name)(*op.size)
elif method_name == "randint":
try:
res = rs.randint(
get_val(op.low),
get_val(op.high),
size=op.size,
dtype=op.dtype,
)
except TypeError:
res = rs.randint(
get_val(op.low), get_val(op.high), size=op.size
)
else:
try:
res = getattr(rs, method_name)(
*(get_val(getattr(op, arg)) for arg in op.args),
dtype=op.dtype
)
except TypeError:
res = getattr(rs, method_name)(
*(get_val(getattr(op, arg)) for arg in op.args)
)
if hasattr(res, "dtype") and res.dtype != op.dtype:
res = res.astype(op.dtype, copy=False)
if xp is not np:
ctx[op.outputs[0].key] = xp.asarray(res)
else:
ctx[op.outputs[0].key] = res
except AttributeError:
if xp is not np:
# cupy cannot generate data, fallback to numpy
rs = np.random.RandomState(op.seed)
if method_name in ("rand", "randn"):
res = getattr(rs, method_name)(*op.size)
else:
res = getattr(rs, method_name)(
*(get_val(getattr(op, arg)) for arg in op.args)
)
if res.dtype != op.dtype:
res = res.astype(op.dtype, copy=False)
ctx[op.outputs[0].key] = xp.asarray(res)
else:
raise
def _calc_shape(self, shapes):
shapes = list(shapes)
if getattr(self, "size", None) is not None:
shapes.append(getattr(self, "size"))
return broadcast_shape(*shapes)
@classmethod
def _handle_arg(cls, arg, chunk_size):
if isinstance(arg, (list, np.ndarray)):
arg = astensor(arg, chunk_size=chunk_size)
return arg
@contextmanager
def _get_inputs_shape_by_given_fields(
self, inputs, shape, raw_chunk_size=None, tensor=True
):
fields = getattr(self, "_input_fields_", [])
to_one_chunk_fields = set(getattr(self, "_into_one_chunk_fields_", list()))
field_to_obj = dict()
to_broadcast_shapes = []
if fields:
if getattr(self, fields[0], None) is None:
# create from beginning
for field, val in zip(fields, inputs):
if field not in to_one_chunk_fields:
if isinstance(val, list):
val = np.asarray(val)
if tensor:
val = self._handle_arg(val, raw_chunk_size)
if isinstance(val, TENSOR_TYPE + TENSOR_CHUNK_TYPE):
field_to_obj[field] = val
if field not in to_one_chunk_fields:
to_broadcast_shapes.append(val.shape)
setattr(self, field, val)
else:
inputs_iter = iter(inputs)
for field in fields:
if isinstance(
getattr(self, field), TENSOR_TYPE + TENSOR_CHUNK_TYPE
):
field_to_obj[field] = next(inputs_iter)
if tensor:
if shape is None:
shape = self._calc_shape(to_broadcast_shapes)
for field, inp in field_to_obj.items():
if field not in to_one_chunk_fields:
field_to_obj[field] = broadcast_to(inp, shape)
yield [field_to_obj[f] for f in fields if f in field_to_obj], shape
inputs_iter = iter(getattr(self, "_inputs"))
for field in fields:
if field in field_to_obj:
setattr(self, field, next(inputs_iter))
@classmethod
def _get_shape(cls, kws, kw):
if kw.get("shape") is not None:
return kw.get("shape")
elif kws is not None and len(kws) > 0:
return kws[0].get("shape")
def _new_tileables(self, inputs, kws=None, **kw):
raw_chunk_size = kw.get("chunk_size", None)
shape = self._get_shape(kws, kw)
with self._get_inputs_shape_by_given_fields(
inputs, shape, raw_chunk_size, True
) as (inputs, shape):
kw["shape"] = shape
return super()._new_tileables(inputs, kws=kws, **kw)
def _new_chunks(self, inputs, kws=None, **kw):
shape = self._get_shape(kws, kw)
with self._get_inputs_shape_by_given_fields(inputs, shape, None, False) as (
inputs,
shape,
):
kw["shape"] = shape
return super()._new_chunks(inputs, kws=kws, **kw)
def _on_serialize_random_state(rs):
return rs.get_state() if rs is not None else None
def _on_deserialize_random_state(tup):
if tup is None:
return None
rs = np.random.RandomState()
rs.set_state(tup)
return rs
def RandomStateField(name, **kwargs):
kwargs.update(
dict(
on_serialize=_on_serialize_random_state,
on_deserialize=_on_deserialize_random_state,
)
)
return TupleField(name, **kwargs)
class TensorSeedOperandMixin(object):
@property
def seed(self):
return getattr(self, "seed", None)
@property
def args(self):
if hasattr(self, "_fields_"):
return self._fields_
else:
return [
field
for field in self._FIELDS
if field not in TensorRandomOperand._FIELDS
]
class TensorRandomOperand(TensorSeedOperandMixin, TensorOperand):
seed = Int32Field("seed")
def __init__(self, dtype=None, **kw):
dtype = np.dtype(dtype) if dtype is not None else dtype
if "state" in kw:
kw["_state"] = kw.pop("state")
super().__init__(dtype=dtype, **kw)
class TensorRandomMapReduceOperand(TensorSeedOperandMixin, TensorMapReduceOperand):
seed = Int32Field("seed")
def __init__(self, dtype=None, **kw):
dtype = np.dtype(dtype) if dtype is not None else dtype
if "state" in kw:
kw["_state"] = kw.pop("state")
super().__init__(dtype=dtype, **kw)
class TensorDistribution(TensorRandomOperand):
size = TupleField("size", FieldTypes.int64)
@classmethod
def execute(cls, ctx, op):
xp = array_module(op.gpu)
if is_same_module(xp, np):
device_id = -1
else:
device_id = op.device or 0
with device(device_id):
rs = xp.random.RandomState(op.seed)
args = []
for k in op.args:
val = getattr(op, k, None)
if isinstance(val, TENSOR_CHUNK_TYPE):
args.append(ctx[val.key])
else:
args.append(val)
method_name = getattr(cls, "_func_name")
try:
res = getattr(rs, method_name)(*args)
if xp is not np:
ctx[op.outputs[0].key] = xp.asarray(res)
else:
ctx[op.outputs[0].key] = res
except AttributeError:
if xp is not np:
# cupy cannot generate, fall back to numpy
rs = np.random.RandomState(op.seed)
res = getattr(rs, method_name)(*args)
ctx[op.outputs[0].key] = xp.asarray(res)
else:
raise
class TensorSimpleRandomData(TensorRandomOperand):
size = TupleField("size", FieldTypes.int64)
def __init__(self, size=None, **kw):
if type(size) is int:
size = (size,)
super().__init__(size=size, **kw)