# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from collections import namedtuple
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from ... import opcodes as OperandDef
from ...core import OutputType, TileStatus, recursive_tile
from ...core.context import get_context
from ...core.operand import MapReduceOperand, OperandStage
from ...serialization.serializables import (
AnyField,
BoolField,
DictField,
Int32Field,
KeyField,
NamedTupleField,
StringField,
TupleField,
)
from ...typing import TileableType
from ...utils import has_unknown_shape, lazy_import
from ..base.bloom_filter import filter_by_bloom_filter
from ..base.core import DataFrameAutoMergeMixin
from ..core import DataFrame, DataFrameChunk, Series
from ..operands import DataFrameOperand, DataFrameOperandMixin, DataFrameShuffleProxy
from ..utils import (
auto_merge_chunks,
build_concatenated_rows_frame,
build_df,
hash_dataframe_on,
infer_index_value,
is_cudf,
parse_index,
)
logger = logging.getLogger(__name__)
DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD = 10
# use bloom filter to filter large DataFrame
BLOOM_FILTER_OPTIONS = [
"max_elements",
"error_rate",
"apply_chunk_size_threshold",
"filter",
"combine_size",
]
BLOOM_FILTER_ON_OPTIONS = ["large", "small", "both"]
DEFAULT_BLOOM_FILTER_ON = "large"
cudf = lazy_import("cudf")
class DataFrameMergeAlign(MapReduceOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.DATAFRAME_SHUFFLE_MERGE_ALIGN
index_shuffle_size = Int32Field("index_shuffle_size")
shuffle_on = AnyField("shuffle_on")
input = KeyField("input")
# for mapper
mapper_id = Int32Field("mapper_id", default=0)
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)
if output_types is None:
if self.stage == OperandStage.map:
output_types = [OutputType.dataframe]
elif self.stage == OperandStage.reduce:
output_types = [OutputType.dataframe] * 2
self._output_types = output_types
@property
def output_limit(self) -> int:
return len(self.output_types)
@classmethod
def execute_map(cls, ctx, op):
chunk = op.outputs[0]
df = ctx[op.inputs[0].key]
shuffle_on = op.shuffle_on
if shuffle_on is not None:
# shuffle on field may be resident in index
to_reset_index_names = []
if not isinstance(shuffle_on, (list, tuple)):
if shuffle_on not in df.dtypes:
to_reset_index_names.append(shuffle_on)
else:
for son in shuffle_on:
if son not in df.dtypes:
to_reset_index_names.append(shuffle_on)
if len(to_reset_index_names) > 0:
df = df.reset_index(to_reset_index_names)
filters = hash_dataframe_on(df, shuffle_on, op.index_shuffle_size)
# shuffle on index
for index_idx, index_filter in enumerate(filters):
reducer_index = (index_idx, chunk.index[1])
# for MultiIndex in cudf,
# get each line of df and then concat them.
if is_cudf(df) and isinstance(df.index, cudf.MultiIndex):
filtered_dfs = [
df.iloc[int(index) : int(index) + 1]
for index in index_filter.values
]
if filtered_dfs:
filtered_df = cudf.concat(filtered_dfs, axis=0)
else: # empty dataframe
filtered_df = df.iloc[0:0]
ctx[chunk.key, reducer_index] = (
op.mapper_id,
ctx.get_current_chunk().index,
filtered_df,
)
else:
ctx[chunk.key, reducer_index] = (
op.mapper_id,
ctx.get_current_chunk().index,
df.iloc[index_filter],
)
@classmethod
def execute_reduce(cls, ctx, op: "DataFrameMergeAlign"):
for i, chunk in enumerate(op.outputs):
input_idx_to_df = {
partition_index: data
for mapper_id, partition_index, data in op.iter_mapper_data(
ctx, skip_none=True
)
if mapper_id == i
}
row_idxes = sorted({idx[0] for idx in input_idx_to_df})
res = []
for row_idx in row_idxes:
row_df = input_idx_to_df.get((row_idx, 0), None)
if row_df is not None:
res.append(row_df)
xdf = cudf if is_cudf(res[0]) else pd
ctx[chunk.key] = xdf.concat(res, axis=0)
@classmethod
def execute(cls, ctx, op):
if op.stage == OperandStage.map:
cls.execute_map(ctx, op)
else:
cls.execute_reduce(ctx, op)
MergeSplitInfo = namedtuple("MergeSplitInfo", "split_side, split_index, nsplits")
class MergeMethod(Enum):
one_chunk = 0
broadcast = 1
shuffle = 2
class DataFrameMerge(DataFrameOperand, DataFrameAutoMergeMixin):
_op_type_ = OperandDef.DATAFRAME_MERGE
how = StringField("how")
on = AnyField("on")
left_on = AnyField("left_on")
right_on = AnyField("right_on")
left_index = BoolField("left_index")
right_index = BoolField("right_index")
sort = BoolField("sort")
suffixes = TupleField("suffixes")
copy_ = BoolField("copy_")
indicator = BoolField("indicator")
validate = AnyField("validate")
method = StringField("method")
auto_merge = StringField("auto_merge")
auto_merge_threshold = Int32Field("auto_merge_threshold")
bloom_filter = AnyField("bloom_filter")
bloom_filter_options = DictField("bloom_filter_options")
# only for broadcast merge
split_info = NamedTupleField("split_info")
def __init__(self, copy=None, **kwargs):
super().__init__(copy_=copy, **kwargs)
def __call__(self, left, right):
empty_left, empty_right = build_df(left), build_df(right)
# validate arguments.
merged = empty_left.merge(
empty_right,
how=self.how,
on=self.on,
left_on=self.left_on,
right_on=self.right_on,
left_index=self.left_index,
right_index=self.right_index,
sort=self.sort,
suffixes=self.suffixes,
copy=self.copy_,
indicator=self.indicator,
validate=self.validate,
)
# update default values.
if self.on is None and self.left_on is None and self.right_on is None:
if not self.left_index or not self.right_index:
# use the common columns
left_cols = empty_left.columns
right_cols = empty_right.columns
common_cols = left_cols.intersection(right_cols)
self.left_on = self.right_on = list(common_cols)
# the `index_value` doesn't matter.
index_tokenize_objects = [
left,
right,
self.how,
self.left_on,
self.right_on,
self.left_index,
self.right_index,
]
return self.new_dataframe(
[left, right],
shape=(np.nan, merged.shape[1]),
dtypes=merged.dtypes,
index_value=parse_index(merged.index[:0], *index_tokenize_objects),
columns_value=parse_index(merged.columns, store_data=True),
)
@classmethod
def _gen_map_chunk(
cls,
chunk: DataFrameChunk,
shuffle_on: Union[List, str],
out_size: int,
mapper_id: int = 0,
):
map_op = DataFrameMergeAlign(
stage=OperandStage.map,
shuffle_on=shuffle_on,
sparse=chunk.issparse(),
mapper_id=mapper_id,
index_shuffle_size=out_size,
)
return map_op.new_chunk(
[chunk],
shape=(np.nan, np.nan),
dtypes=chunk.dtypes,
index=chunk.index,
index_value=chunk.index_value,
columns_value=chunk.columns_value,
)
@classmethod
def _gen_shuffle_chunks(
cls,
out_shape: Tuple,
shuffle_on: Union[List, str],
df: Union[DataFrame, Series],
):
# gen map chunks
map_chunks = [
cls._gen_map_chunk(chunk, shuffle_on, out_shape[0]) for chunk in df.chunks
]
proxy_chunk = DataFrameShuffleProxy(
output_types=[OutputType.dataframe]
).new_chunk(
map_chunks,
shape=(),
dtypes=df.dtypes,
index_value=df.index_value,
columns_value=df.columns_value,
)
# gen reduce chunks
reduce_chunks = []
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for out_idx in out_indices:
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce,
n_reducers=len(out_indices),
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
)
reduce_chunks.append(
reduce_op.new_chunk(
[proxy_chunk],
shape=(np.nan, np.nan),
dtypes=proxy_chunk.dtypes,
index=out_idx,
index_value=proxy_chunk.index_value,
columns_value=proxy_chunk.columns_value,
)
)
return reduce_chunks
@classmethod
def _gen_both_shuffle_chunks(
cls,
out_shape: Tuple,
left_shuffle_on: Union[List, str],
right_shuffle_on: Union[List, str],
left: Union[DataFrame, Series],
right: Union[DataFrame, Series],
):
# gen map chunks
# for left dataframe, use 0 as mapper_id
left_map_chunks = [
cls._gen_map_chunk(chunk, left_shuffle_on, out_shape[0], mapper_id=0)
for chunk in left.chunks
]
# for right dataframe, use 1 as mapper_id
right_map_chunks = [
cls._gen_map_chunk(chunk, right_shuffle_on, out_shape[0], mapper_id=1)
for chunk in right.chunks
]
map_chunks = left_map_chunks + right_map_chunks
proxy_chunk = DataFrameShuffleProxy(
output_types=[OutputType.dataframe]
).new_chunk(
map_chunks,
shape=(),
dtypes=left.dtypes,
index_value=left.index_value,
columns_value=left.columns_value,
)
# gen reduce chunks
left_reduce_chunks = []
right_reduce_chunks = []
out_indices = list(itertools.product(*(range(s) for s in out_shape)))
for out_idx in out_indices:
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce,
sparse=proxy_chunk.issparse(),
n_reducers=len(out_indices),
)
left_param = {
"shape": (np.nan, np.nan),
"dtypes": left.dtypes,
"index": out_idx,
"index_value": left.index_value,
"columns_value": left.columns_value,
}
right_param = {
"shape": (np.nan, np.nan),
"dtypes": right.dtypes,
"index": out_idx,
"index_value": right.index_value,
"columns_value": right.columns_value,
}
params = [left_param, right_param]
left_reduce, right_reduce = reduce_op.new_chunks([proxy_chunk], kws=params)
left_reduce_chunks.append(left_reduce)
right_reduce_chunks.append(right_reduce)
return left_reduce_chunks, right_reduce_chunks
@classmethod
def _apply_bloom_filter(
cls,
left: TileableType,
right: TileableType,
left_on: Union[List, str],
right_on: Union[List, str],
op: "DataFrameMerge",
):
bloom_filter_params = dict()
bloom_filter_options = op.bloom_filter_options or dict()
for option in ["max_elements", "error_rate", "combine_size"]:
if option in bloom_filter_options:
bloom_filter_params[option] = bloom_filter_options[option]
if "max_elements" not in bloom_filter_params:
bloom_filter_params["max_elements"] = max(
c.shape[0] for c in left.chunks + right.chunks
)
filter_on = bloom_filter_options.get("filter", DEFAULT_BLOOM_FILTER_ON)
if filter_on == "large":
if len(left.chunks) > len(right.chunks):
left = filter_by_bloom_filter(
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
elif filter_on == "small":
if len(left.chunks) < len(right.chunks):
left = filter_by_bloom_filter(
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
else:
assert filter_on == "both"
# both
left = filter_by_bloom_filter(
left, right, left_on, right_on, **bloom_filter_params
)
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
return left, right
@classmethod
def _tile_one_chunk(
cls,
op: "DataFrameMerge",
left: Union[DataFrame, Series],
right: Union[DataFrame, Series],
):
df = op.outputs[0]
if len(left.chunks) == 1 and len(right.chunks) == 1:
merge_op = op.copy().reset_key()
out_chunk = merge_op.new_chunk(
[left.chunks[0], right.chunks[0]],
shape=df.shape,
index=left.chunks[0].index,
index_value=df.index_value,
dtypes=df.dtypes,
columns_value=df.columns_value,
)
out_chunks = [out_chunk]
nsplits = ((np.nan,), (df.shape[1],))
elif len(left.chunks) == 1:
out_chunks = []
left_chunk = left.chunks[0]
left_chunk.is_broadcaster = True
for c in right.chunks:
merge_op = op.copy().reset_key()
out_chunk = merge_op.new_chunk(
[left_chunk, c],
shape=(np.nan, df.shape[1]),
index=c.index,
index_value=infer_index_value(
left_chunk.index_value, c.index_value
),
dtypes=df.dtypes,
columns_value=df.columns_value,
)
out_chunks.append(out_chunk)
nsplits = ((np.nan,) * len(right.chunks), (df.shape[1],))
else:
out_chunks = []
right_chunk = right.chunks[0]
# set `is_broadcaster` as True
right_chunk.is_broadcaster = True
for c in left.chunks:
merge_op = op.copy().reset_key()
out_chunk = merge_op.new_chunk(
[c, right_chunk],
shape=(np.nan, df.shape[1]),
index=c.index,
index_value=infer_index_value(
right_chunk.index_value, c.index_value
),
dtypes=df.dtypes,
columns_value=df.columns_value,
)
out_chunks.append(out_chunk)
nsplits = ((np.nan,) * len(left.chunks), (df.shape[1],))
new_op = op.copy()
return new_op.new_dataframes(
op.inputs,
df.shape,
nsplits=nsplits,
chunks=out_chunks,
dtypes=df.dtypes,
index_value=df.index_value,
columns_value=df.columns_value,
)
@classmethod
def _tile_shuffle(
cls,
op: "DataFrameMerge",
left: Union[DataFrame, Series],
right: Union[DataFrame, Series],
):
df = op.outputs[0]
left_row_chunk_size = left.chunk_shape[0]
right_row_chunk_size = right.chunk_shape[0]
out_row_chunk_size = max(left_row_chunk_size, right_row_chunk_size)
out_chunk_shape = (out_row_chunk_size, 1)
nsplits = [[np.nan for _ in range(out_row_chunk_size)], [df.shape[1]]]
left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on)
right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)
# do shuffle
left_chunks, right_chunks = cls._gen_both_shuffle_chunks(
out_chunk_shape, left_on, right_on, left, right
)
out_chunks = []
for left_chunk, right_chunk in zip(left_chunks, right_chunks):
merge_op = op.copy().reset_key()
out_chunk = merge_op.new_chunk(
[left_chunk, right_chunk],
shape=(np.nan, df.shape[1]),
index=left_chunk.index,
index_value=infer_index_value(
left_chunk.index_value, right_chunk.index_value
),
dtypes=df.dtypes,
columns_value=df.columns_value,
)
out_chunks.append(out_chunk)
new_op = op.copy()
return new_op.new_dataframes(
op.inputs,
df.shape,
nsplits=tuple(tuple(ns) for ns in nsplits),
chunks=out_chunks,
dtypes=df.dtypes,
index_value=df.index_value,
columns_value=df.columns_value,
)
@classmethod
def _tile_broadcast(
cls,
op: "DataFrameMerge",
left: Union[DataFrame, Series],
right: Union[DataFrame, Series],
):
from .concat import DataFrameConcat
out_df = op.outputs[0]
out_chunks = []
if left.chunk_shape[0] < right.chunk_shape[0]:
# broadcast left
if op.how == "inner":
left_chunks = left.chunks
need_split = False
else:
left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on)
left_chunks = cls._gen_shuffle_chunks(left.chunk_shape, left_on, left)
need_split = True
# set is_broadcast property
for c in left_chunks:
c.is_broadcaster = True
right_chunks = right.chunks
for right_chunk in right_chunks:
merged_chunks = []
# concat all merged results
for j, left_chunk in enumerate(left_chunks):
merge_op = op.copy().reset_key()
if need_split:
merge_op.split_info = MergeSplitInfo(
"right", j, len(left_chunks)
)
merged_chunks.append(
merge_op.new_chunk(
[left_chunk, right_chunk],
index=(j, 0),
shape=(np.nan, out_df.shape[1]),
columns_value=out_df.columns_value,
)
)
concat_op = DataFrameConcat(output_types=[OutputType.dataframe])
out_chunks.append(
concat_op.new_chunk(
merged_chunks,
shape=(np.nan, out_df.shape[1]),
dtypes=out_df.dtypes,
index=right_chunk.index,
index_value=infer_index_value(
left_chunks[0].index_value, right_chunk.index_value
),
columns_value=out_df.columns_value,
)
)
nsplits = ((np.nan,) * len(right.chunks), (out_df.shape[1],))
else:
# broadcast right
if op.how == "inner":
need_split = False
right_chunks = right.chunks
else:
need_split = True
right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)
right_chunks = cls._gen_shuffle_chunks(
right.chunk_shape, right_on, right
)
# set is_broadcast property
for c in right_chunks:
c.is_broadcaster = True
left_chunks = left.chunks
for left_chunk in left_chunks:
merged_chunks = []
# concat all merged results
for j, right_chunk in enumerate(right_chunks):
merge_op = op.copy().reset_key()
if need_split:
merge_op.split_info = MergeSplitInfo(
"left", j, len(right_chunks)
)
merged_chunks.append(
merge_op.new_chunk(
[left_chunk, right_chunk],
shape=(np.nan, out_df.shape[1]),
index=(j, 0),
columns_value=out_df.columns_value,
)
)
concat_op = DataFrameConcat(output_types=[OutputType.dataframe])
out_chunks.append(
concat_op.new_chunk(
merged_chunks,
shape=(np.nan, out_df.shape[1]),
dtypes=out_df.dtypes,
index=left_chunk.index,
index_value=infer_index_value(
left_chunk.index_value, right_chunks[0].index_value
),
columns_value=out_df.columns_value,
)
)
nsplits = ((np.nan,) * len(left.chunks), (out_df.shape[1],))
new_op = op.copy()
return new_op.new_dataframes(
op.inputs,
out_df.shape,
nsplits=tuple(tuple(ns) for ns in nsplits),
chunks=out_chunks,
dtypes=out_df.dtypes,
index_value=out_df.index_value,
columns_value=out_df.columns_value,
)
@classmethod
def _can_merge_with_one_chunk(
cls, left: TileableType, right: TileableType, how: str
) -> bool:
return (len(left.chunks) == 1 and how in ["right", "inner"]) or (
len(right.chunks) == 1 and how in ["left", "inner"]
)
@classmethod
def _can_merge_with_broadcast(
cls, big_chunk_size: int, small_chunk_size: int, big_side: str, how: str
) -> bool:
return how in [big_side, "inner"] and np.log2(big_chunk_size) > small_chunk_size
@classmethod
def _choose_merge_method(
cls, op: "DataFrameMerge", left: TileableType, right: TileableType
):
how = op.how
method = op.method
left_row_chunk_size = left.chunk_shape[0]
right_row_chunk_size = right.chunk_shape[0]
if left_row_chunk_size > right_row_chunk_size:
big_side = "left"
big_chunk_size = left_row_chunk_size
small_chunk_size = right_row_chunk_size
else:
big_side = "right"
big_chunk_size = right_row_chunk_size
small_chunk_size = left_row_chunk_size
if method == "auto":
if cls._can_merge_with_one_chunk(left, right, how):
return MergeMethod.one_chunk
elif cls._can_merge_with_broadcast(
big_chunk_size, small_chunk_size, big_side, how
):
return MergeMethod.broadcast
else:
return MergeMethod.shuffle
elif method == "broadcast":
if cls._can_merge_with_one_chunk(left, right, how):
return MergeMethod.one_chunk
elif how in [big_side, "inner"]:
return MergeMethod.broadcast
else: # pragma: no cover
raise ValueError("Cannot specify merge method `broadcast`")
else:
assert method == "shuffle"
return MergeMethod.shuffle
@classmethod
def _if_apply_bloom_filter(
cls,
method: MergeMethod,
op: "DataFrameMerge",
left: TileableType,
right: TileableType,
):
# bloom filter can only work for inner merge
if op.how != "inner" or op.bloom_filter is False:
return False
elif op.bloom_filter is True:
return True
bloom_filter_options = op.bloom_filter_options or dict()
bloom_filter_chunk_threshold = bloom_filter_options.get(
"apply_chunk_size_threshold", DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD
)
# TODO(hks): disable bloom_filter for now, when it is ready, turn it on them
# bloom_filter == auto
if len(left.chunks + right.chunks) <= bloom_filter_chunk_threshold:
# if size of input chunks <= threshold, skip bloom filter
return False
elif method == MergeMethod.shuffle:
# for shuffle, enable bloom filter by default
return False
return False
@classmethod
def tile(cls, op: "DataFrameMerge"):
left = build_concatenated_rows_frame(op.inputs[0])
right = build_concatenated_rows_frame(op.inputs[1])
ctx = get_context()
auto_merge_threshold = op.auto_merge_threshold
auto_merge_before, auto_merge_after = cls._get_auto_merge_options(op.auto_merge)
merge_before_res = yield from cls._merge_before(
op, auto_merge_before, auto_merge_threshold, left, right, logger
)
left, right = merge_before_res[0], merge_before_res[1]
method = cls._choose_merge_method(op, left, right)
if cls._if_apply_bloom_filter(method, op, left, right):
if has_unknown_shape(left, right): # pragma: no cover
yield TileStatus(left.chunks + right.chunks, progress=0.3)
left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on)
right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)
small_one = right if len(left.chunks) > len(right.chunks) else left
logger.info(
"Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.",
op,
small_one,
)
left, right = yield from recursive_tile(
*cls._apply_bloom_filter(left, right, left_on, right_on, op)
)
# auto merge after bloom filter
yield TileStatus([left, right] + left.chunks + right.chunks, progress=0.5)
left = auto_merge_chunks(ctx, left)
right = auto_merge_chunks(ctx, right)
if op.method == "auto":
# if method is auto, select new method after auto merge
method = cls._choose_merge_method(op, left, right)
logger.info("Choose %s method for merge operand %s.", method, op)
if method == MergeMethod.one_chunk:
ret = cls._tile_one_chunk(op, left, right)
elif method == MergeMethod.broadcast:
ret = cls._tile_broadcast(op, left, right)
else:
assert method == MergeMethod.shuffle
ret = cls._tile_shuffle(op, left, right)
if op.how == "inner":
# if how=="inner", output data size will reduce greatly with high probability,
# use auto_merge_chunks to combine small chunks.
ret = yield from cls._merge_after(
op, auto_merge_after, auto_merge_threshold, ret, logger
)
return ret
@classmethod
def execute(cls, ctx, op):
chunk = op.outputs[0]
left, right = ctx[op.inputs[0].key], ctx[op.inputs[1].key]
if getattr(op, "split_info", None) is not None:
split_info = op.split_info
if split_info.split_side == "left":
index = hash_dataframe_on(left, on=op.on, size=split_info.nsplits)[
split_info.split_index
]
left = left.iloc[index]
else:
index = hash_dataframe_on(right, on=op.on, size=split_info.nsplits)[
split_info.split_index
]
right = right.iloc[index]
def execute_merge(x, y):
"""
All merge and join operations are transformed into specific merge operations.
In particular, for join operations on the GPU without the ``on`` column
(CUDF does not support join with ``on`` parameter),
The key of merge is always transformed into a column on one side (left_on / right_on) and
an index on the other side (left_index=True / right_index=True).
In this case, CUDF takes the result of the ``on`` column as the index instead of the data column,
which is different from pandas.
so we should reset the index before the real calculation, and
then restore the column to the ``on`` data column.
"""
left_index = None
left_on = None
right_index = None
right_on = None
rename_mapping = None
magic_index_col_name = "__index__"
if not op.gpu:
kwargs = dict(
copy=op.copy, validate=op.validate, indicator=op.indicator
)
else:
# cudf doesn't support 'validate' and 'copy'
kwargs = dict(indicator=op.indicator)
# TODO: support MultiIndex case
if not (
isinstance(x.index, cudf.MultiIndex)
or isinstance(y.index, cudf.MultiIndex)
):
if op.how == "left" and op.left_index is True:
x.index.name = magic_index_col_name
x = x.reset_index()
left_on = magic_index_col_name
left_index = False
rename_mapping = {magic_index_col_name: op.right_on}
elif op.how == "right" and op.right_index is True:
y.index.name = magic_index_col_name
y = y.reset_index()
right_on = magic_index_col_name
right_index = False
rename_mapping = {magic_index_col_name: op.left_on}
res = x.merge(
y,
how=op.how,
on=op.on,
left_on=left_on if left_on is not None else op.left_on,
right_on=right_on if right_on is not None else op.right_on,
left_index=left_index if left_index is not None else op.left_index,
right_index=right_index if right_index is not None else op.right_index,
sort=op.sort,
suffixes=op.suffixes,
**kwargs,
)
if rename_mapping is not None:
col = list(rename_mapping.values())[0]
if col in res.columns:
res[col] = res[magic_index_col_name]
res = res.drop(columns=[magic_index_col_name])
else:
res = res.rename(columns=rename_mapping)
return res
# workaround for: https://github.com/pandas-dev/pandas/issues/27943
try:
r = execute_merge(left, right)
except ValueError:
r = execute_merge(left.copy(deep=True), right.copy(deep=True))
# make sure column's order
if not all(
n1 == n2 for n1, n2 in zip(chunk.columns_value.to_pandas(), r.columns)
):
r = r[list(chunk.columns_value.to_pandas())]
ctx[chunk.key] = r
def _prepare_shuffle_on(use_index, side_on, on):
# consistent with pandas: `left_index` precedes `left_on` and `right_index` precedes `right_on`
if use_index:
# `None` means we will shuffle on df.index.
return None
elif side_on is not None:
return side_on
else:
return on
[docs]def merge(
df: Union[DataFrame, Series],
right: Union[DataFrame, Series],
how: str = "inner",
on: str = None,
left_on: str = None,
right_on: str = None,
left_index: bool = False,
right_index: bool = False,
sort: bool = False,
suffixes: Tuple[Optional[str], Optional[str]] = ("_x", "_y"),
copy: bool = True,
indicator: bool = False,
validate: str = None,
method: str = "auto",
auto_merge: str = "both",
auto_merge_threshold: int = 8,
bloom_filter: Union[bool, str] = "auto",
bloom_filter_options: Dict[str, Any] = None,
) -> DataFrame:
"""
Merge DataFrame or named Series objects with a database-style join.
A named Series object is treated as a DataFrame with a single named column.
The join is done on columns or indexes. If joining columns on
columns, the DataFrame indexes *will be ignored*. Otherwise if joining indexes
on indexes or indexes on a column or columns, the index will be passed on.
When performing a cross merge, no column specifications to merge on are
allowed.
Parameters
----------
right : DataFrame or named Series
Object to merge with.
how : {'left', 'right', 'outer', 'inner'}, default 'inner'
Type of merge to be performed.
* left: use only keys from left frame, similar to a SQL left outer join;
preserve key order.
* right: use only keys from right frame, similar to a SQL right outer join;
preserve key order.
* outer: use union of keys from both frames, similar to a SQL full outer
join; sort keys lexicographically.
* inner: use intersection of keys from both frames, similar to a SQL inner
join; preserve the order of the left keys.
on : label or list
Column or index level names to join on. These must be found in both
DataFrames. If `on` is None and not merging on indexes then this defaults
to the intersection of the columns in both DataFrames.
left_on : label or list, or array-like
Column or index level names to join on in the left DataFrame. Can also
be an array or list of arrays of the length of the left DataFrame.
These arrays are treated as if they are columns.
right_on : label or list, or array-like
Column or index level names to join on in the right DataFrame. Can also
be an array or list of arrays of the length of the right DataFrame.
These arrays are treated as if they are columns.
left_index : bool, default False
Use the index from the left DataFrame as the join key(s). If it is a
MultiIndex, the number of keys in the other DataFrame (either the index
or a number of columns) must match the number of levels.
right_index : bool, default False
Use the index from the right DataFrame as the join key. Same caveats as
left_index.
sort : bool, default False
Sort the join keys lexicographically in the result DataFrame. If False,
the order of the join keys depends on the join type (how keyword).
suffixes : list-like, default is ("_x", "_y")
A length-2 sequence where each element is optionally a string
indicating the suffix to add to overlapping column names in
`left` and `right` respectively. Pass a value of `None` instead
of a string to indicate that the column name from `left` or
`right` should be left as-is, with no suffix. At least one of the
values must not be None.
copy : bool, default True
If False, avoid copy if possible.
indicator : bool or str, default False
If True, adds a column to the output DataFrame called "_merge" with
information on the source of each row. The column can be given a different
name by providing a string argument. The column will have a Categorical
type with the value of "left_only" for observations whose merge key only
appears in the left DataFrame, "right_only" for observations
whose merge key only appears in the right DataFrame, and "both"
if the observation's merge key is found in both DataFrames.
validate : str, optional
If specified, checks if merge is of specified type.
* "one_to_one" or "1:1": check if merge keys are unique in both
left and right datasets.
* "one_to_many" or "1:m": check if merge keys are unique in left
dataset.
* "many_to_one" or "m:1": check if merge keys are unique in right
dataset.
* "many_to_many" or "m:m": allowed, but does not result in checks.
method : {"auto", "shuffle", "broadcast"}, default auto
"broadcast" is recommended when one DataFrame is much smaller than the other,
otherwise, "shuffle" will be a better choice. By default, we choose method
according to actual data size.
auto_merge : {"both", "none", "before", "after"}, default both
Auto merge small chunks before or after merge
* "both": auto merge small chunks before and after,
* "none": do not merge small chunks
* "before": only merge small chunks before merge
* "after": only merge small chunks after merge
auto_merge_threshold : int, default 8
When how is "inner", merged result could be much smaller than original DataFrame,
if the number of chunks is greater than the threshold,
it will merge small chunks automatically.
bloom_filter: bool, str, default "auto"
Use bloom filter to optimize merge
bloom_filter_options: dict
* "max_elements": max elements in bloom filter,
default value is the max size of all input chunks
* "error_rate": error raite, default 0.1.
* "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10
when chunk size of left and right is greater than this threshold, apply bloom filter
* "filter": "large", "small", "both", default "large"
decides to filter on large, small or both DataFrames.
Returns
-------
DataFrame
A DataFrame of the two merged objects.
Examples
--------
>>> import mars.dataframe as md
>>> df1 = md.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
... 'value': [1, 2, 3, 5]})
>>> df2 = md.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
... 'value': [5, 6, 7, 8]})
>>> df1.execute()
lkey value
0 foo 1
1 bar 2
2 baz 3
3 foo 5
>>> df2.execute()
rkey value
0 foo 5
1 bar 6
2 baz 7
3 foo 8
Merge df1 and df2 on the lkey and rkey columns. The value columns have
the default suffixes, _x and _y, appended.
>>> df1.merge(df2, left_on='lkey', right_on='rkey').execute()
lkey value_x rkey value_y
0 foo 1 foo 5
1 foo 1 foo 8
2 foo 5 foo 5
3 foo 5 foo 8
4 bar 2 bar 6
5 baz 3 baz 7
Merge DataFrames df1 and df2 with specified left and right suffixes
appended to any overlapping columns.
>>> df1.merge(df2, left_on='lkey', right_on='rkey',
... suffixes=('_left', '_right')).execute()
lkey value_left rkey value_right
0 foo 1 foo 5
1 foo 1 foo 8
2 foo 5 foo 5
3 foo 5 foo 8
4 bar 2 bar 6
5 baz 3 baz 7
Merge DataFrames df1 and df2, but raise an exception if the DataFrames have
any overlapping columns.
>>> df1.merge(df2, left_on='lkey', right_on='rkey', suffixes=(False, False)).execute()
Traceback (most recent call last):
...
ValueError: columns overlap but no suffix specified:
Index(['value'], dtype='object')
>>> df1 = md.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
>>> df2 = md.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
>>> df1.execute()
a b
0 foo 1
1 bar 2
>>> df2.execute()
a c
0 foo 3
1 baz 4
>>> df1.merge(df2, how='inner', on='a').execute()
a b c
0 foo 1 3
>>> df1.merge(df2, how='left', on='a').execute()
a b c
0 foo 1 3.0
1 bar 2 NaN
"""
if method is None:
method = "auto"
if method not in [
"auto",
"shuffle",
"broadcast",
]: # pragma: no cover
raise NotImplementedError(f"{method} merge is not supported")
if auto_merge not in ["both", "none", "before", "after"]: # pragma: no cover
raise ValueError(
f"auto_merge can only be `both`, `none`, `before` or `after`, got {auto_merge}"
)
if (df.op.gpu or right.op.gpu) and bloom_filter is not False:
logger.warning(
"Currently we do not support ``bloom_filter`` option on GPU, due to some limitations of CUDF."
)
bloom_filter = False
if bloom_filter not in [True, False, "auto"]:
raise ValueError(
f'bloom_filter can only be True, False, or "auto", got {bloom_filter}'
)
if bloom_filter_options:
if not isinstance(bloom_filter_options, dict):
raise TypeError(
f"bloom_filter_options must be a dict, got {type(bloom_filter_options)}"
)
for k, v in bloom_filter_options.items():
if k not in BLOOM_FILTER_OPTIONS:
raise ValueError(
f"Invalid bloom filter option {k}, available: {BLOOM_FILTER_OPTIONS}"
)
if k == "filter" and v not in BLOOM_FILTER_ON_OPTIONS:
raise ValueError(
f"Invalid filter {k}, available: {BLOOM_FILTER_ON_OPTIONS}"
)
op = DataFrameMerge(
how=how,
on=on,
left_on=left_on,
right_on=right_on,
left_index=left_index,
right_index=right_index,
sort=sort,
suffixes=suffixes,
copy=copy,
indicator=indicator,
validate=validate,
method=method,
auto_merge=auto_merge,
auto_merge_threshold=auto_merge_threshold,
bloom_filter=bloom_filter,
bloom_filter_options=bloom_filter_options,
output_types=[OutputType.dataframe],
)
return op(df, right)
def join(
df: Union[DataFrame, Series],
other: Union[DataFrame, Series],
on: str = None,
how: str = "left",
lsuffix: str = "",
rsuffix: str = "",
sort: bool = False,
method: str = None,
auto_merge: str = "both",
auto_merge_threshold: int = 8,
bloom_filter: Union[bool, Dict] = True,
bloom_filter_options: Dict[str, Any] = None,
) -> DataFrame:
"""
Join columns of another DataFrame.
Join columns with `other` DataFrame either on index or on a key
column. Efficiently join multiple DataFrame objects by index at once by
passing a list.
Parameters
----------
other : DataFrame, Series, or list of DataFrame
Index should be similar to one of the columns in this one. If a
Series is passed, its name attribute must be set, and that will be
used as the column name in the resulting joined DataFrame.
on : str, list of str, or array-like, optional
Column or index level name(s) in the caller to join on the index
in `other`, otherwise joins index-on-index. If multiple
values given, the `other` DataFrame must have a MultiIndex. Can
pass an array as the join key if it is not already contained in
the calling DataFrame. Like an Excel VLOOKUP operation.
how : {'left', 'right', 'outer', 'inner'}, default 'left'
How to handle the operation of the two objects.
* left: use calling frame's index (or column if on is specified)
* right: use `other`'s index.
* outer: form union of calling frame's index (or column if on is
specified) with `other`'s index, and sort it.
lexicographically.
* inner: form intersection of calling frame's index (or column if
on is specified) with `other`'s index, preserving the order
of the calling's one.
lsuffix : str, default ''
Suffix to use from left frame's overlapping columns.
rsuffix : str, default ''
Suffix to use from right frame's overlapping columns.
sort : bool, default False
Order result DataFrame lexicographically by the join key. If False,
the order of the join key depends on the join type (how keyword).
method : {"shuffle", "broadcast"}, default None
"broadcast" is recommended when one DataFrame is much smaller than the other,
otherwise, "shuffle" will be a better choice. By default, we choose method
according to actual data size.
auto_merge : {"both", "none", "before", "after"}, default both
Auto merge small chunks before or after merge
* "both": auto merge small chunks before and after,
* "none": do not merge small chunks
* "before": only merge small chunks before merge
* "after": only merge small chunks after merge
auto_merge_threshold : int, default 8
When how is "inner", merged result could be much smaller than original DataFrame,
if the number of chunks is greater than the threshold,
it will merge small chunks automatically.
bloom_filter: bool, str, default "auto"
Use bloom filter to optimize merge
bloom_filter_options: dict
* "max_elements": max elements in bloom filter,
default value is the max size of all input chunks
* "error_rate": error raite, default 0.1.
* "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10
when chunk size of left and right is greater than this threshold, apply bloom filter
* "filter": "large", "small", "both", default "large"
decides to filter on large, small or both DataFrames.
Returns
-------
DataFrame
A dataframe containing columns from both the caller and `other`.
See Also
--------
DataFrame.merge : For column(s)-on-column(s) operations.
Examples
--------
>>> import mars.dataframe as md
>>> df = md.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
... 'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
>>> df.execute()
key A
0 K0 A0
1 K1 A1
2 K2 A2
3 K3 A3
4 K4 A4
5 K5 A5
>>> other = md.DataFrame({'key': ['K0', 'K1', 'K2'],
... 'B': ['B0', 'B1', 'B2']})
>>> other.execute()
key B
0 K0 B0
1 K1 B1
2 K2 B2
Join DataFrames using their indexes.
>>> df.join(other, lsuffix='_caller', rsuffix='_other').execute()
key_caller A key_other B
0 K0 A0 K0 B0
1 K1 A1 K1 B1
2 K2 A2 K2 B2
3 K3 A3 NaN NaN
4 K4 A4 NaN NaN
5 K5 A5 NaN NaN
If we want to join using the key columns, we need to set key to be
the index in both `df` and `other`. The joined DataFrame will have
key as its index.
>>> df.set_index('key').join(other.set_index('key')).execute()
A B
key
K0 A0 B0
K1 A1 B1
K2 A2 B2
K3 A3 NaN
K4 A4 NaN
K5 A5 NaN
Another option to join using the key columns is to use the `on`
parameter. DataFrame.join always uses `other`'s index but we can use
any column in `df`. This method preserves the original DataFrame's
index in the result.
>>> df.join(other.set_index('key'), on='key').execute()
key A B
0 K0 A0 B0
1 K1 A1 B1
2 K2 A2 B2
3 K3 A3 NaN
4 K4 A4 NaN
5 K5 A5 NaN
Using non-unique key values shows how they are matched.
>>> df = md.DataFrame({'key': ['K0', 'K1', 'K1', 'K3', 'K0', 'K1'],
... 'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
>>> df.execute()
key A
0 K0 A0
1 K1 A1
2 K1 A2
3 K3 A3
4 K0 A4
5 K1 A5
>>> df.join(other.set_index('key'), on='key').execute()
key A B
0 K0 A0 B0
1 K1 A1 B1
2 K1 A2 B1
3 K3 A3 NaN
4 K0 A4 B0
5 K1 A5 B1
"""
return merge(
df,
other,
left_on=on,
how=how,
left_index=on is None,
right_index=True,
suffixes=(lsuffix, rsuffix),
sort=sort,
method=method,
auto_merge=auto_merge,
auto_merge_threshold=auto_merge_threshold,
bloom_filter=bloom_filter,
bloom_filter_options=bloom_filter_options,
)