# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pandas as pd
from ... import opcodes as OperandDef
from ...core import ENTITY_TYPE, OutputType, recursive_tile
from ...serialization.serializables import (
AnyField,
BoolField,
FieldTypes,
ListField,
StringField,
)
from ...utils import has_unknown_shape, lazy_import
from ..operands import SERIES_TYPE, DataFrameOperand, DataFrameOperandMixin
from ..utils import (
build_empty_df,
build_empty_series,
parse_index,
standardize_range_index,
validate_axis,
)
cudf = lazy_import("cudf")
class DataFrameConcat(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.CONCATENATE
axis = AnyField("axis", default=None)
join = StringField("join", default=None)
ignore_index = BoolField("ignore_index", default=None)
keys = ListField("keys", default=None)
levels = ListField("levels", default=None)
names = ListField("names", default=None)
verify_integrity = BoolField("verify_integrity", default=None)
sort = BoolField("sort", default=None)
copy_ = BoolField("copy", default=None)
def __init__(self, copy=None, output_types=None, **kw):
super().__init__(copy_=copy, _output_types=output_types, **kw)
@property
def level(self):
return self.levels
@property
def name(self):
return self.names
@classmethod
def _tile_dataframe(cls, op):
from ..indexing.iloc import DataFrameIlocGetItem
out_df = op.outputs[0]
inputs = op.inputs
axis = op.axis
if not all(
inputs[i].nsplits[1 - axis] == inputs[i + 1].nsplits[1 - axis]
for i in range(len(inputs) - 1)
):
# need rechunk
if has_unknown_shape(*inputs):
yield
normalized_nsplits = {1 - axis: inputs[0].nsplits[1 - axis]}
new_inputs = []
for inp in inputs:
new_inputs.append(
(yield from recursive_tile(inp.rechunk(normalized_nsplits)))
)
inputs = new_inputs
out_chunks = []
nsplits = []
cum_index = 0
for df in inputs:
for c in df.chunks:
if op.axis == 0:
index = (c.index[0] + cum_index, c.index[1])
else:
index = (c.index[0], c.index[1] + cum_index)
iloc_op = DataFrameIlocGetItem(indexes=[slice(None)] * 2)
out_chunks.append(
iloc_op.new_chunk(
[c],
shape=c.shape,
index=index,
dtypes=c.dtypes,
index_value=c.index_value,
columns_value=c.columns_value,
)
)
nsplits.extend(df.nsplits[op.axis])
cum_index += len(df.nsplits[op.axis])
out_nsplits = (
(tuple(nsplits), inputs[0].nsplits[1])
if op.axis == 0
else (inputs[0].nsplits[0], tuple(nsplits))
)
if op.ignore_index:
yield out_chunks
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
return new_op.new_dataframes(
op.inputs,
out_df.shape,
nsplits=out_nsplits,
chunks=out_chunks,
dtypes=out_df.dtypes,
index_value=out_df.index_value,
columns_value=out_df.columns_value,
)
@classmethod
def _tile_series(cls, op: "DataFrameConcat"):
from ..datasource.from_tensor import DataFrameFromTensor
from ..indexing.iloc import DataFrameIlocGetItem, SeriesIlocGetItem
out = op.outputs[0]
inputs = op.inputs
out_chunks = []
if op.axis == 1:
if has_unknown_shape(*inputs):
yield
new_inputs = []
for inp in inputs:
new_inputs.append(
(yield from recursive_tile(inp.rechunk(op.inputs[0].nsplits)))
)
inputs = new_inputs
cum_index = 0
offset = 0
nsplits = []
for series in inputs:
for c in series.chunks:
if op.axis == 0:
index = (c.index[0] + cum_index,)
shape = c.shape
iloc_op = SeriesIlocGetItem(indexes=(slice(None),))
out_chunks.append(
iloc_op.new_chunk(
[c],
shape=shape,
index=index,
index_value=c.index_value,
dtype=c.dtype,
name=c.name,
)
)
else:
index = (c.index[0], cum_index)
shape = (c.shape[0], 1)
to_frame_op = DataFrameFromTensor(
input=c,
index=None,
columns=None,
)
if c.name:
dtypes = pd.Series([c.dtype], index=[c.name])
else:
dtypes = pd.Series(
[c.dtype], index=pd.RangeIndex(offset, offset + 1)
)
df_chunk = to_frame_op.new_chunk(
[c],
shape=shape,
index=index,
index_value=c.index_value,
columns_value=parse_index(dtypes.index, store_data=True),
dtypes=dtypes,
)
iloc_op = DataFrameIlocGetItem(indexes=[slice(None)] * 2)
out_chunks.append(
iloc_op.new_chunk(
[df_chunk],
shape=df_chunk.shape,
index=index,
dtypes=df_chunk.dtypes,
index_value=df_chunk.index_value,
columns_value=df_chunk.columns_value,
)
)
if op.axis == 0:
nsplits.extend(series.nsplits[0])
cum_index += len(series.nsplits[op.axis])
else:
nsplits.append(1)
cum_index += 1
offset += 1
if op.ignore_index:
yield out_chunks
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
if op.axis == 0:
nsplits = (tuple(nsplits),)
return new_op.new_seriess(
op.inputs,
out.shape,
nsplits=nsplits,
chunks=out_chunks,
dtype=out.dtype,
index_value=out.index_value,
name=out.name,
)
else:
nsplits = (inputs[0].nsplits[0], tuple(nsplits))
return new_op.new_dataframes(
op.inputs,
out.shape,
nsplits=nsplits,
chunks=out_chunks,
dtypes=out.dtypes,
index_value=out.index_value,
columns_value=out.columns_value,
)
@classmethod
def tile(cls, op: "DataFrameConcat"):
if isinstance(op.inputs[0], SERIES_TYPE):
return (yield from cls._tile_series(op))
else:
return (yield from cls._tile_dataframe(op))
@classmethod
def execute(cls, ctx, op: "DataFrameConcat"):
def _base_concat(chunk, inputs):
# auto generated concat when executing a DataFrame, Series or Index
if chunk.op.output_types[0] == OutputType.dataframe:
return _auto_concat_dataframe_chunks(chunk, inputs)
elif chunk.op.output_types[0] == OutputType.series:
return _auto_concat_series_chunks(chunk, inputs)
elif chunk.op.output_types[0] == OutputType.index:
return _auto_concat_index_chunks(chunk, inputs)
elif chunk.op.output_types[0] == OutputType.categorical:
return _auto_concat_categorical_chunks(chunk, inputs)
else: # pragma: no cover
raise TypeError(
"Only DataFrameChunk, SeriesChunk, IndexChunk, "
"and CategoricalChunk can be automatically concatenated"
)
def _auto_concat_dataframe_chunks(chunk, inputs):
xdf = (
pd
if isinstance(inputs[0], (pd.DataFrame, pd.Series)) or cudf is None
else cudf
)
if chunk.op.axis is not None:
return xdf.concat(inputs, axis=op.axis)
# auto generated concat when executing a DataFrame
if len(inputs) == 1:
ret = inputs[0]
else:
n_rows = len(set(inp.index[0] for inp in chunk.inputs))
n_cols = int(len(inputs) // n_rows)
assert n_rows * n_cols == len(inputs)
concats = []
for i in range(n_rows):
if n_cols == 1:
concats.append(inputs[i])
else:
concat = xdf.concat(
[inputs[i * n_cols + j] for j in range(n_cols)], axis=1
)
concats.append(concat)
if xdf is pd:
# The `sort=False` is to suppress a `FutureWarning` of pandas,
# when the index or column of chunks to concatenate is not aligned,
# which may happens for certain ops.
#
# See also Note [Columns of Left Join] in test_merge_execution.py.
ret = xdf.concat(concats, sort=False)
else:
ret = xdf.concat(concats)
# cuDF will lost index name when concat two seriess.
ret.index.name = concats[0].index.name
return ret
def _auto_concat_series_chunks(chunk, inputs):
# auto generated concat when executing a Series
if len(inputs) == 1:
concat = inputs[0]
else:
xdf = pd if isinstance(inputs[0], pd.Series) or cudf is None else cudf
if chunk.op.axis is not None:
concat = xdf.concat(inputs, axis=chunk.op.axis)
else:
concat = xdf.concat(inputs)
return concat
def _auto_concat_index_chunks(chunk, inputs):
if len(inputs) == 1:
xdf = pd if isinstance(inputs[0], pd.Index) or cudf is None else cudf
concat_df = xdf.DataFrame(index=inputs[0])
else:
xdf = pd if isinstance(inputs[0], pd.Index) or cudf is None else cudf
empty_dfs = [xdf.DataFrame(index=inp) for inp in inputs]
concat_df = xdf.concat(empty_dfs, axis=0)
return concat_df.index
def _auto_concat_categorical_chunks(_, inputs):
if len(inputs) == 1: # pragma: no cover
return inputs[0]
else:
# convert categorical into array
arrays = [np.asarray(inp) for inp in inputs]
array = np.concatenate(arrays)
return pd.Categorical(
array, categories=inputs[0].categories, ordered=inputs[0].ordered
)
chunk = op.outputs[0]
inputs = [ctx[input.key] for input in op.inputs]
if isinstance(inputs[0], tuple):
ctx[chunk.key] = tuple(
_base_concat(chunk, [input[i] for input in inputs])
for i in range(len(inputs[0]))
)
else:
ctx[chunk.key] = _base_concat(chunk, inputs)
@classmethod
def _concat_index(cls, prev_index: pd.Index, cur_index: pd.Index):
if isinstance(prev_index, pd.RangeIndex) and isinstance(
cur_index, pd.RangeIndex
):
# handle RangeIndex that append may generate huge amount of data
# e.g. pd.RangeIndex(10_000) and pd.RangeIndex(10_000)
# will generate a Int64Index full of data
# for details see GH#1647
prev_stop = prev_index.start + prev_index.size * prev_index.step
cur_start = cur_index.start
if prev_stop == cur_start and prev_index.step == cur_index.step:
# continuous RangeIndex, still return RangeIndex
return prev_index.append(cur_index)
else:
# otherwise, return an empty index
return pd.Index([], dtype=prev_index.dtype)
elif isinstance(prev_index, pd.RangeIndex):
return pd.Index([], prev_index.dtype).append(cur_index)
elif isinstance(cur_index, pd.RangeIndex):
return prev_index.append(pd.Index([], cur_index.dtype))
return prev_index.append(cur_index)
def _call_series(self, objs):
if self.axis == 0:
row_length = 0
index = None
for series in objs:
if index is None:
index = series.index_value.to_pandas()
else:
index = self._concat_index(index, series.index_value.to_pandas())
row_length += series.shape[0]
if self.ignore_index: # pragma: no cover
index_value = parse_index(pd.RangeIndex(row_length))
else:
index_value = parse_index(index, objs)
return self.new_series(
objs,
shape=(row_length,),
dtype=objs[0].dtype,
index_value=index_value,
name=objs[0].name,
)
else:
col_length = 0
columns = []
dtypes = dict()
undefined_name = 0
for series in objs:
if series.name is None:
dtypes[undefined_name] = series.dtype
undefined_name += 1
columns.append(undefined_name)
else:
dtypes[series.name] = series.dtype
columns.append(series.name)
col_length += 1
if self.ignore_index or undefined_name == len(objs):
columns_value = parse_index(pd.RangeIndex(col_length))
else:
columns_value = parse_index(pd.Index(columns), store_data=True)
shape = (objs[0].shape[0], col_length)
return self.new_dataframe(
objs,
shape=shape,
dtypes=pd.Series(dtypes),
index_value=objs[0].index_value,
columns_value=columns_value,
)
def _call_dataframes(self, objs):
if self.axis == 0:
row_length = 0
index = None
empty_dfs = []
for df in objs:
if index is None:
index = df.index_value.to_pandas()
else:
index = self._concat_index(index, df.index_value.to_pandas())
row_length += df.shape[0]
if df.ndim == 2:
empty_dfs.append(build_empty_df(df.dtypes))
else:
empty_dfs.append(build_empty_series(df.dtype, name=df.name))
emtpy_result = pd.concat(empty_dfs, join=self.join, sort=self.sort)
shape = (row_length, emtpy_result.shape[1])
columns_value = parse_index(emtpy_result.columns, store_data=True)
if self.join == "inner":
objs = [o[list(emtpy_result.columns)] for o in objs]
if self.ignore_index: # pragma: no cover
index_value = parse_index(pd.RangeIndex(row_length))
else:
index_value = parse_index(index, objs)
new_objs = []
for obj in objs:
if obj.ndim != 2:
# series
new_obj = obj.to_frame().reindex(columns=emtpy_result.dtypes.index)
else:
# dataframe
if list(obj.dtypes.index) != list(emtpy_result.dtypes.index):
new_obj = obj.reindex(columns=emtpy_result.dtypes.index)
else:
new_obj = obj
new_objs.append(new_obj)
return self.new_dataframe(
new_objs,
shape=shape,
dtypes=emtpy_result.dtypes,
index_value=index_value,
columns_value=columns_value,
)
else:
col_length = 0
empty_dfs = []
for df in objs:
if df.ndim == 2:
# DataFrame
col_length += df.shape[1]
empty_dfs.append(build_empty_df(df.dtypes))
else:
# Series
col_length += 1
empty_dfs.append(build_empty_series(df.dtype, name=df.name))
emtpy_result = pd.concat(empty_dfs, join=self.join, axis=1, sort=True)
if self.ignore_index:
columns_value = parse_index(pd.RangeIndex(col_length))
else:
columns_value = parse_index(
pd.Index(emtpy_result.columns), store_data=True
)
if self.ignore_index or len({o.index_value.key for o in objs}) == 1:
new_objs = [obj if obj.ndim == 2 else obj.to_frame() for obj in objs]
else: # pragma: no cover
raise NotImplementedError(
"Does not support concat dataframes which has different index"
)
shape = (objs[0].shape[0], col_length)
return self.new_dataframe(
new_objs,
shape=shape,
dtypes=emtpy_result.dtypes,
index_value=objs[0].index_value,
columns_value=columns_value,
)
def __call__(self, objs):
if all(isinstance(obj, SERIES_TYPE) for obj in objs):
self.output_types = [OutputType.series]
return self._call_series(objs)
else:
self.output_types = [OutputType.dataframe]
return self._call_dataframes(objs)
class GroupByConcat(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.GROUPBY_CONCAT
_groups = ListField("groups", FieldTypes.key)
_groupby_params = AnyField("groupby_params")
def __init__(self, groups=None, groupby_params=None, output_types=None, **kw):
super().__init__(
_groups=groups,
_groupby_params=groupby_params,
_output_types=output_types,
**kw
)
@property
def groups(self):
return self._groups
@property
def groupby_params(self):
return self._groupby_params
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(self._inputs)
new_groups = []
for _ in self._groups:
new_groups.append(next(inputs_iter))
self._groups = new_groups
if isinstance(self._groupby_params["by"], list):
by = []
for v in self._groupby_params["by"]:
if isinstance(v, ENTITY_TYPE):
by.append(next(inputs_iter))
else:
by.append(v)
self._groupby_params["by"] = by
@classmethod
def execute(cls, ctx, op):
input_data = [ctx[input.key] for input in op.groups]
obj = pd.concat([d.obj for d in input_data])
params = op.groupby_params.copy()
if isinstance(params["by"], list):
by = []
for v in params["by"]:
if isinstance(v, ENTITY_TYPE):
by.append(ctx[v.key])
else:
by.append(v)
params["by"] = by
selection = params.pop("selection", None)
result = obj.groupby(**params)
if selection:
result = result[selection]
ctx[op.outputs[0].key] = result
[文档]def concat(
objs,
axis=0,
join="outer",
ignore_index=False,
keys=None,
levels=None,
names=None,
verify_integrity=False,
sort=False,
copy=True,
):
if not isinstance(objs, (list, tuple)): # pragma: no cover
raise TypeError(
"first argument must be an iterable of dataframe or series objects"
)
axis = validate_axis(axis)
if isinstance(objs, dict): # pragma: no cover
keys = objs.keys()
objs = objs.values()
if axis == 1 and join == "inner": # pragma: no cover
raise NotImplementedError("inner join is not support when specify `axis=1`")
if verify_integrity or sort or keys: # pragma: no cover
raise NotImplementedError(
"verify_integrity, sort, keys arguments are not supported now"
)
op = DataFrameConcat(
axis=axis,
join=join,
ignore_index=ignore_index,
keys=keys,
levels=levels,
names=names,
verify_integrity=verify_integrity,
sort=sort,
copy=copy,
)
return op(objs)