# Copyright 2022-2023 XProbe Inc.
# derived from copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import zipfile
from typing import Dict
from urllib.parse import urlparse
import numpy as np
import pandas as pd
try:
import pyarrow as pa
import pyarrow.parquet as pq
arrow_dtype = pa.DataType
except ImportError:
pa = None
arrow_dtype = None
try:
import fastparquet
except ImportError:
fastparquet = None
import fsspec
from ... import opcodes as OperandDef
from ...config import options
from ...serialization.serializables import (
AnyField,
BoolField,
DictField,
Int32Field,
Int64Field,
ListField,
StringField,
)
from ...utils import is_object_dtype, lazy_import
from ..operands import OutputType
from ..utils import PD_VERSION_GREATER_THAN_2_10, arrow_dtype_kwargs, parse_index
from .core import (
ColumnPruneSupportedDataSourceMixin,
IncrementalIndexDatasource,
IncrementalIndexDataSourceMixin,
merge_small_files,
)
from .utils import convert_to_abspath
PARQUET_MEMORY_SCALE = 10
PARQUET_MEMORY_SCALE_WITH_ARROW_DTYPE = 3
STRING_FIELD_OVERHEAD = 50
cudf = lazy_import("cudf")
def check_engine(engine):
if engine == "auto":
if pa is not None:
return "pyarrow"
elif fastparquet is not None: # pragma: no cover
return "fastparquet"
else: # pragma: no cover
raise RuntimeError("Please install either pyarrow or fastparquet.")
elif engine == "pyarrow":
if pa is None: # pragma: no cover
raise RuntimeError("Please install pyarrow first.")
return engine
elif engine == "fastparquet":
if fastparquet is None: # pragma: no cover
raise RuntimeError("Please install fastparquet first.")
return engine
else: # pragma: no cover
raise RuntimeError("Unsupported engine {} to read parquet.".format(engine))
def get_engine(engine):
if engine == "pyarrow":
return ArrowEngine()
elif engine == "fastparquet":
return FastpaquetEngine()
else: # pragma: no cover
raise RuntimeError("Unsupported engine {}".format(engine))
class ParquetEngine:
"""Read parquet by arrow / fastparquet instead of pandas is to read the
parquet file by group, please refer to `groups_as_chunks`."""
def get_row_num(self, f):
raise NotImplementedError
def read_dtypes(self, f, **kwargs):
raise NotImplementedError
def read_to_pandas(
self, f, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
raise NotImplementedError
def read_group_to_pandas(
self, f, group_index, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
raise NotImplementedError
def read_partitioned_to_pandas(
self,
f,
partitions: Dict,
partition_keys: Dict,
columns=None,
nrows=None,
use_arrow_dtype=None,
**kwargs,
):
raw_df = self.read_to_pandas(
f, columns=columns, nrows=nrows, use_arrow_dtype=use_arrow_dtype, **kwargs
)
for col, value in partition_keys.items():
dictionary = partitions[col]
raw_df[col] = pd.Series(
value,
dtype=pd.CategoricalDtype(categories=dictionary.tolist()),
index=raw_df.index,
)
return raw_df
def read_partitioned_dtypes(
self, fs: fsspec.AbstractFileSystem, directory, storage_options
):
# As ParquetDataset will iterate all files,
# here we just find one file to infer dtypes
current_path = directory
partition_cols = []
while fs.isdir(current_path):
_, dirs, files = next(fs.walk(current_path))
dirs = [d for d in dirs if not d.startswith(".")]
files = [f for f in files if not f.startswith(".")]
if len(files) == 0:
# directory as partition
partition_cols.append(dirs[0].split("=", 1)[0])
current_path = os.path.join(current_path, dirs[0])
elif len(dirs) == 0:
# parquet files in deepest directory
current_path = os.path.join(current_path, files[0])
else: # pragma: no cover
raise ValueError(
"Files and directories are mixed in an intermediate directory"
)
# current path is now a parquet file
of = fsspec.open(current_path, storage_options=storage_options)
with of as f:
dtypes = self.read_dtypes(f)
for partition in partition_cols:
dtypes[partition] = pd.CategoricalDtype()
return dtypes
def _parse_prefix(path):
path_prefix = ""
if isinstance(path, str):
parsed_path = urlparse(path)
if parsed_path.scheme:
path_prefix = f"{parsed_path.scheme}://{parsed_path.netloc}"
return path_prefix
class ArrowEngine(ParquetEngine):
def get_row_num(self, f):
file = pq.ParquetFile(f)
return file.metadata.num_rows
def read_dtypes(self, f, **kwargs):
types_mapper = kwargs.pop("types_mapper", None)
file = pq.ParquetFile(f)
return (
file.schema_arrow.empty_table().to_pandas(types_mapper=types_mapper).dtypes
)
@classmethod
def _table_to_pandas(cls, t, nrows=None, use_arrow_dtype=None):
if nrows is not None:
t = t.slice(0, nrows)
if use_arrow_dtype:
df = t.to_pandas(types_mapper=pd.ArrowDtype)
else:
df = t.to_pandas()
return df
def read_to_pandas(
self, f, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
file = pq.ParquetFile(f)
t = file.read(columns=columns, **kwargs)
return self._table_to_pandas(t, nrows=nrows, use_arrow_dtype=use_arrow_dtype)
def read_group_to_pandas(
self, f, group_index, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
file = pq.ParquetFile(f)
t = file.read_row_group(group_index, columns=columns, **kwargs)
return self._table_to_pandas(t, nrows=nrows, use_arrow_dtype=use_arrow_dtype)
class FastpaquetEngine(ParquetEngine):
def get_row_num(self, f):
file = fastparquet.ParquetFile(f)
return file.count()
def read_dtypes(self, f, **kwargs):
file = fastparquet.ParquetFile(f)
dtypes_dict = file._dtypes()
return pd.Series(dict((c, dtypes_dict[c]) for c in file.columns))
def read_to_pandas(
self, f, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
file = fastparquet.ParquetFile(f)
df = file.to_pandas(columns, **kwargs)
if nrows is not None:
df = df.head(nrows)
return df
class CudfEngine:
@classmethod
def read_to_cudf(cls, file, columns: list = None, nrows: int = None, **kwargs):
df = cudf.read_parquet(file, columns=columns, **kwargs)
if nrows is not None:
df = df.head(nrows)
return df
def read_group_to_cudf(
self, file, group_index: int, columns: list = None, nrows: int = None, **kwargs
):
return self.read_to_cudf(
file, columns=columns, nrows=nrows, row_groups=group_index, **kwargs
)
@classmethod
def read_partitioned_to_cudf(
cls,
file,
partitions: Dict,
partition_keys: Dict,
columns=None,
nrows=None,
**kwargs,
):
# cudf will read entire partitions even if only one partition provided,
# so we just read with pyarrow and convert to cudf DataFrame
file = pq.ParquetFile(file)
t = file.read(columns=columns, **kwargs)
t = t.slice(0, nrows) if nrows is not None else t
t = pa.table(t.columns, names=t.column_names)
raw_df = cudf.DataFrame.from_arrow(t)
for col, value in partition_keys.items():
dictionary = partitions[col].tolist()
codes = cudf.core.column.as_column(
dictionary.index(value), length=len(raw_df)
)
raw_df[col] = cudf.core.column.build_categorical_column(
categories=dictionary,
codes=codes,
size=codes.size,
offset=codes.offset,
ordered=False,
)
return raw_df
class DataFrameReadParquet(
IncrementalIndexDatasource,
ColumnPruneSupportedDataSourceMixin,
IncrementalIndexDataSourceMixin,
):
_op_type_ = OperandDef.READ_PARQUET
path = AnyField("path")
chunk_path = AnyField("chunk_path")
engine = StringField("engine")
columns = ListField("columns")
use_arrow_dtype = BoolField("use_arrow_dtype")
groups_as_chunks = BoolField("groups_as_chunks")
group_index = Int32Field("group_index")
read_kwargs = DictField("read_kwargs")
incremental_index = BoolField("incremental_index")
storage_options = DictField("storage_options")
is_partitioned = BoolField("is_partitioned")
merge_small_files = BoolField("merge_small_files")
merge_small_file_options = DictField("merge_small_file_options")
is_http_url = BoolField("is_http_url", None)
# for chunk
partitions = DictField("partitions", default=None)
partition_keys = DictField("partition_keys", default=None)
num_group_rows = Int64Field("num_group_rows", default=None)
# as read meta may be too time-consuming when number of files is large,
# thus we only read first file to get row number and raw file size
first_chunk_row_num = Int64Field("first_chunk_row_num")
first_chunk_raw_bytes = Int64Field("first_chunk_raw_bytes")
def get_columns(self):
return self.columns
def set_pruned_columns(self, columns, *, keep_order=None):
self.columns = columns
@classmethod
def _tile_partitioned(cls, op: "DataFrameReadParquet"):
out_df = op.outputs[0]
shape = (np.nan, out_df.shape[1])
dtypes = out_df.dtypes
dataset = pq.ParquetDataset(op.path, use_legacy_dataset=False)
path_prefix = _parse_prefix(op.path)
chunk_index = 0
out_chunks = []
first_chunk_row_num, first_chunk_raw_bytes = None, None
for i, fragment in enumerate(dataset.fragments):
chunk_op = op.copy().reset_key()
chunk_op.path = chunk_path = path_prefix + fragment.path
relpath = os.path.relpath(chunk_path, op.path)
partition_keys = dict(
tuple(s.split("=")) for s in relpath.split(os.sep)[:-1]
)
chunk_op.partition_keys = partition_keys
chunk_op.partitions = dict(
zip(
dataset.partitioning.schema.names, dataset.partitioning.dictionaries
)
)
if i == 0:
first_row_group = fragment.row_groups[0]
first_chunk_raw_bytes = first_row_group.total_byte_size
first_chunk_row_num = first_row_group.num_rows
chunk_op.first_chunk_row_num = first_chunk_row_num
chunk_op.first_chunk_raw_bytes = first_chunk_raw_bytes
new_chunk = chunk_op.new_chunk(
None,
shape=shape,
index=(chunk_index, 0),
index_value=out_df.index_value,
columns_value=out_df.columns_value,
dtypes=dtypes,
)
out_chunks.append(new_chunk)
chunk_index += 1
new_op = op.copy()
nsplits = ((np.nan,) * len(out_chunks), (out_df.shape[1],))
return new_op.new_dataframes(
None,
out_df.shape,
dtypes=dtypes,
index_value=out_df.index_value,
columns_value=out_df.columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
@classmethod
def _tile_no_partitioned(cls, op: "DataFrameReadParquet"):
chunk_index = 0
out_chunks = []
out_df = op.outputs[0]
dtypes = out_df.dtypes
shape = (np.nan, out_df.shape[1])
z = None
fs, _, _ = fsspec.get_fs_token_paths(
op.path, storage_options=op.storage_options
)
if isinstance(op.path, (tuple, list)):
paths = op.path
elif fs.isdir(op.path):
paths = fs.ls(op.path)
paths = sorted(paths)
if not isinstance(fs, fsspec.implementations.local.LocalFileSystem):
parsed_path = urlparse(op.path)
paths = [f"{parsed_path.scheme}://{path}" for path in paths]
elif isinstance(op.path, str) and op.path.endswith(".zip"):
file = fs.open(op.path, storage_options=op.storage_options)
z = zipfile.ZipFile(file)
paths = z.namelist()
paths = [
path
for path in paths
if path.endswith(".parquet") and not path.startswith("__MACOSX")
]
else:
paths = fs.glob(op.path)
if not isinstance(fs, fsspec.implementations.local.LocalFileSystem):
parsed_path = urlparse(op.path)
paths = [f"{parsed_path.scheme}://{path}" for path in paths]
first_chunk_row_num, first_chunk_raw_bytes = None, None
for i, pth in enumerate(paths):
if i == 0:
if z is not None:
with z.open(pth) as f:
first_chunk_row_num = get_engine(op.engine).get_row_num(f)
first_chunk_raw_bytes = sys.getsizeof(f)
else:
of = fs.open(pth)
first_chunk_row_num = get_engine(op.engine).get_row_num(of)
first_chunk_raw_bytes = fsspec.get_fs_token_paths(
pth, storage_options=op.storage_options
)[0].size(pth)
if op.groups_as_chunks:
if z is not None:
with z.open(pth) as f:
num_row_groups = pq.ParquetFile(f).num_row_groups
else:
num_row_groups = pq.ParquetFile(pth).num_row_groups
for group_idx in range(num_row_groups):
chunk_op = op.copy().reset_key()
if z is not None:
chunk_op.path = op.path
chunk_op.chunk_path = pth
else:
chunk_op.path = pth
chunk_op.group_index = group_idx
chunk_op.first_chunk_row_num = first_chunk_row_num
chunk_op.first_chunk_raw_bytes = first_chunk_raw_bytes
chunk_op.num_group_rows = num_row_groups
new_chunk = chunk_op.new_chunk(
None,
shape=shape,
index=(chunk_index, 0),
index_value=out_df.index_value,
columns_value=out_df.columns_value,
dtypes=dtypes,
)
out_chunks.append(new_chunk)
chunk_index += 1
else:
chunk_op = op.copy().reset_key()
if z is not None:
chunk_op.path = op.path
chunk_op.chunk_path = pth
else:
chunk_op.path = pth
chunk_op.first_chunk_row_num = first_chunk_row_num
chunk_op.first_chunk_raw_bytes = first_chunk_raw_bytes
new_chunk = chunk_op.new_chunk(
None,
shape=shape,
index=(chunk_index, 0),
index_value=out_df.index_value,
columns_value=out_df.columns_value,
dtypes=dtypes,
)
out_chunks.append(new_chunk)
chunk_index += 1
if z is not None:
z.close()
new_op = op.copy()
nsplits = ((np.nan,) * len(out_chunks), (out_df.shape[1],))
return new_op.new_dataframes(
None,
out_df.shape,
dtypes=dtypes,
index_value=out_df.index_value,
columns_value=out_df.columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
@classmethod
def _tile_http_url(cls, op: "DataFrameReadParquet"):
out_chunks = []
out_df = op.outputs[0]
z = None
if op.path[0].endswith(".zip"):
fs, _, _ = fsspec.get_fs_token_paths(op.path[0])
zip_filename = fs.open(op.path[0])
z = zipfile.ZipFile(zip_filename)
paths = z.namelist()
paths = [
path
for path in paths
if path.endswith(".parquet") and not path.startswith("__MACOSX")
]
else:
paths = op.path
for i, url in enumerate(paths):
chunk_op = op.copy().reset_key()
if z is not None:
chunk_op.path = op.path[0]
chunk_op.chunk_path = url
else:
chunk_op.path = url
out_chunks.append(
chunk_op.new_chunk(None, index=(i, 0), shape=(np.nan, np.nan))
)
if z is not None:
z.close()
new_op = op.copy()
nsplits = ((np.nan,) * len(out_chunks), (np.nan,))
return new_op.new_dataframes(
None,
out_df.shape,
dtypes=out_df.dtypes,
index_value=out_df.index_value,
columns_value=out_df.columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
@classmethod
def _tile(cls, op: "DataFrameReadParquet"):
if op.is_http_url:
tiled = cls._tile_http_url(op)
elif op.is_partitioned:
tiled = cls._tile_partitioned(op)
else:
tiled = cls._tile_no_partitioned(op)
if op.merge_small_files:
tiled = [
merge_small_files(tiled[0], **(op.merge_small_file_options or dict()))
]
return tiled
@classmethod
def _execute_partitioned(cls, ctx, op: "DataFrameReadParquet"):
out = op.outputs[0]
engine = get_engine(op.engine)
of = fsspec.open(op.path, storage_options=op.storage_options)
with of as f:
ctx[out.key] = engine.read_partitioned_to_pandas(
f,
op.partitions,
op.partition_keys,
columns=op.columns,
nrows=op.nrows,
use_arrow_dtype=op.use_arrow_dtype,
**op.read_kwargs or dict(),
)
@classmethod
def _pandas_read_parquet(cls, ctx: dict, op: "DataFrameReadParquet"):
out = op.outputs[0]
path = op.path
z = None
if op.is_http_url:
if op.path.endswith(".zip"):
fs, _, _ = fsspec.get_fs_token_paths(op.path)
zip_filename = fs.open(op.path)
z = zipfile.ZipFile(zip_filename)
f = z.open(op.chunk_path)
else:
f = op.path
read_kwargs = op.read_kwargs or dict()
if op.use_arrow_dtype:
read_kwargs.update(arrow_dtype_kwargs())
r = pd.read_parquet(
f,
columns=op.columns,
engine=op.engine,
**read_kwargs,
)
if z is not None:
z.close()
f.close()
ctx[out.key] = r
return
if op.partitions is not None:
return cls._execute_partitioned(ctx, op)
engine = get_engine(op.engine)
z = None
fs = fsspec.get_fs_token_paths(path, storage_options=op.storage_options)[0]
if op.path.endswith(".zip"):
file = fs.open(op.path, storage_options=op.storage_options)
z = zipfile.ZipFile(file)
f = z.open(op.chunk_path)
else:
f = fs.open(path, storage_options=op.storage_options)
use_arrow_dtype = op.use_arrow_dtype
if op.groups_as_chunks:
df = engine.read_group_to_pandas(
f,
op.group_index,
columns=op.columns,
nrows=op.nrows,
use_arrow_dtype=use_arrow_dtype,
**op.read_kwargs or dict(),
)
else:
df = engine.read_to_pandas(
f,
columns=op.columns,
nrows=op.nrows,
use_arrow_dtype=use_arrow_dtype,
**op.read_kwargs or dict(),
)
ctx[out.key] = df
if z is not None:
z.close()
f.close()
@classmethod
def _cudf_read_parquet(cls, ctx: dict, op: "DataFrameReadParquet"):
out = op.outputs[0]
path = op.path
z = None
fs = fsspec.get_fs_token_paths(path, storage_options=op.storage_options)[0]
if path.endswith(".zip"):
z = zipfile.ZipFile(path)
engine = CudfEngine()
if os.path.exists(path) and z is None:
file = op.path
close = lambda: None
else: # pragma: no cover
if z is not None:
file = z.open(op.chunk_path)
else:
file = fs.open(path, storage_options=op.storage_options)
close = file.close
try:
if op.partitions is not None:
ctx[out.key] = engine.read_partitioned_to_cudf(
file,
op.partitions,
op.partition_keys,
columns=op.columns,
nrows=op.nrows,
**op.read_kwargs or dict(),
)
else:
if op.groups_as_chunks:
df = engine.read_group_to_cudf(
file,
op.group_index,
columns=op.columns,
nrows=op.nrows,
**op.read_kwargs or dict(),
)
else:
df = engine.read_to_cudf(
file,
columns=op.columns,
nrows=op.nrows,
**op.read_kwargs or dict(),
)
ctx[out.key] = df
finally:
close()
@classmethod
def execute(cls, ctx, op: "DataFrameReadParquet"):
if not op.gpu:
cls._pandas_read_parquet(ctx, op)
else:
cls._cudf_read_parquet(ctx, op)
@classmethod
def estimate_size(cls, ctx, op: "DataFrameReadParquet"):
if op.is_http_url:
return super().estimate_size(ctx, op)
first_chunk_row_num = op.first_chunk_row_num
first_chunk_raw_bytes = op.first_chunk_raw_bytes
if isinstance(op.path, str) and op.path.endswith(".zip"):
with fsspec.open(op.path, storage_options=op.storage_options) as zip_file:
with zipfile.ZipFile(zip_file) as z:
with z.open(op.chunk_path) as f:
raw_bytes = sys.getsizeof(f)
else:
raw_bytes = fsspec.get_fs_token_paths(
op.path, storage_options=op.storage_options
)[0].size(op.path)
if op.num_group_rows:
raw_bytes = (
np.ceil(np.divide(raw_bytes, op.num_group_rows)).astype(np.int64).item()
)
estimated_row_num = (
np.ceil(first_chunk_row_num * (raw_bytes / first_chunk_raw_bytes))
.astype(np.int64)
.item()
)
if op.columns is not None:
of = fsspec.open(op.path, storage_options=op.storage_options)
with of as f:
all_columns = list(get_engine(op.engine).read_dtypes(f).index)
else:
all_columns = list(op.outputs[0].dtypes.index)
columns = op.columns if op.columns else all_columns
if op.use_arrow_dtype:
scale = op.memory_scale or PARQUET_MEMORY_SCALE_WITH_ARROW_DTYPE
else:
scale = op.memory_scale or PARQUET_MEMORY_SCALE
phy_size = raw_bytes * scale * len(columns) / len(all_columns)
n_strings = len(
[
dt
for col, dt in op.outputs[0].dtypes.items()
if col in columns and is_object_dtype(dt)
]
)
if op.use_arrow_dtype:
pd_size = phy_size
else:
pd_size = phy_size + n_strings * estimated_row_num * STRING_FIELD_OVERHEAD
ctx[op.outputs[0].key] = (pd_size, pd_size)
def __call__(self, index_value=None, columns_value=None, dtypes=None):
self._output_types = [OutputType.dataframe]
if dtypes is not None:
shape = (np.nan, len(dtypes))
else:
shape = (np.nan, np.nan)
return self.new_dataframe(
None,
shape,
dtypes=dtypes,
index_value=index_value,
columns_value=columns_value,
)
[文档]def read_parquet(
path,
engine: str = "auto",
columns: list = None,
groups_as_chunks: bool = False,
use_arrow_dtype: bool = None,
incremental_index: bool = False,
storage_options: dict = None,
memory_scale: int = None,
merge_small_files: bool = True,
merge_small_file_options: dict = None,
gpu: bool = None,
**kwargs,
):
"""
Load a parquet object from the file path, returning a DataFrame.
Parameters
----------
path : str, path object or file-like object
Any valid string path is acceptable. The string could be a URL.
For file URLs, a host is expected. A local file could be:
``file://localhost/path/to/table.parquet``.
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables``.
By file-like object, we refer to objects with a ``read()`` method,
such as a file handler (e.g. via builtin ``open`` function)
or ``StringIO``.
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. The default behavior is to try 'pyarrow',
falling back to 'fastparquet' if 'pyarrow' is unavailable.
columns : list, default=None
If not None, only these columns will be read from the file.
groups_as_chunks : bool, default False
if True, each row group correspond to a chunk.
if False, each file correspond to a chunk.
Only available for 'pyarrow' engine.
incremental_index: bool, default False
If index_col not specified, ensure range index incremental,
gain a slightly better performance if setting False.
use_arrow_dtype: bool, default None
If True, use arrow dtype to store columns. Default enabled if pandas >= 2.1
storage_options: dict, optional
Options for storage connection.
memory_scale: int, optional
Scale that real memory occupation divided with raw file size.
merge_small_files: bool, default True
Merge small files whose size is small.
merge_small_file_options: dict
Options for merging small files
**kwargs
Any additional kwargs are passed to the engine.
Returns
-------
Mars DataFrame
"""
engine_type = check_engine(engine)
engine = get_engine(engine_type)
# We enable arrow dtype by default if pandas >= 2.1
if use_arrow_dtype is None and engine_type == "pyarrow":
use_arrow_dtype = PD_VERSION_GREATER_THAN_2_10
single_path = path[0] if isinstance(path, list) else path
is_partitioned = False
if isinstance(single_path, str) and (
single_path.startswith("http://")
or single_path.startswith("https://")
or single_path.startswith("ftp://")
or single_path.startswith("sftp://")
):
urls = path if isinstance(path, (list, tuple)) else [path]
op = DataFrameReadParquet(
path=urls,
engine=engine_type,
columns=columns,
groups_as_chunks=groups_as_chunks,
use_arrow_dtype=use_arrow_dtype,
read_kwargs=kwargs,
incremental_index=incremental_index,
storage_options=storage_options,
memory_scale=memory_scale,
merge_small_files=merge_small_files,
merge_small_file_options=merge_small_file_options,
is_http_url=True,
gpu=gpu,
)
return op()
fs, _, _ = fsspec.get_fs_token_paths(single_path, storage_options=storage_options)
if use_arrow_dtype is None:
use_arrow_dtype = options.dataframe.use_arrow_dtype
if use_arrow_dtype and engine_type != "pyarrow":
raise ValueError(
f"The 'use_arrow_dtype' argument is not supported for the {engine_type} engine"
)
# We enable arrow dtype by default if pandas >= 2.1
if use_arrow_dtype is None:
use_arrow_dtype = PD_VERSION_GREATER_THAN_2_10
types_mapper = pd.ArrowDtype if use_arrow_dtype else None
if fs.isdir(single_path):
paths = fs.ls(path)
if all(fs.isdir(p) for p in paths):
# If all are directories, it is read as a partitioned dataset.
dtypes = engine.read_partitioned_dtypes(fs, path, storage_options)
is_partitioned = True
else:
with fs.open(paths[0], mode="rb") as f:
dtypes = engine.read_dtypes(f, types_mapper=types_mapper)
elif isinstance(path, str) and path.endswith(".zip"):
with fsspec.open(path, "rb") as file:
with zipfile.ZipFile(file) as z:
with z.open(z.namelist()[0]) as f:
dtypes = engine.read_dtypes(f, types_mapper=types_mapper)
else:
if not isinstance(path, list):
file_path = fs.glob(path)[0]
else:
file_path = path[0]
with fs.open(file_path) as f:
dtypes = engine.read_dtypes(f, types_mapper=types_mapper)
if columns:
dtypes = dtypes[columns]
index_value = parse_index(pd.RangeIndex(-1))
columns_value = parse_index(dtypes.index, store_data=True)
# convert path to abs_path
abs_path = convert_to_abspath(path, storage_options)
op = DataFrameReadParquet(
path=abs_path,
engine=engine_type,
columns=columns,
groups_as_chunks=groups_as_chunks,
use_arrow_dtype=use_arrow_dtype,
read_kwargs=kwargs,
incremental_index=incremental_index,
storage_options=storage_options,
is_partitioned=is_partitioned,
memory_scale=memory_scale,
merge_small_files=merge_small_files,
merge_small_file_options=merge_small_file_options,
gpu=gpu,
)
return op(index_value=index_value, columns_value=columns_value, dtypes=dtypes)