Loading data#
Recommended data formats#
Xorbits supports reading data from various data sources, including csv, parquet, sql, xml and other data formats, but not every data format supports parallel reading. We recommend using formats that support parallel reading, including:
For reading Parquet files, followings are some best practices.
Parquet files#
Parquet is a very popular columnar storage file format. Xorbits Pandas supports parallel reading and writing of Parquet files. Here we will introduce how to use these functions and some best practice recommendations.
xorbits.pandas.read_parquet()
accepts the following input forms:
Single Parquet file
Folder contains parquet files
String with wildcards
All these can be local files or remote storage.
import xorbits.pandas as pd
# single local file
df = pd.read_parquet("local.parquet")
# S3 directory
df = pd.read_parquet("s3://bucket-name/parquet/files",
storage_options={"key": "", "secret": ""})
# wildcard in path
df = pd.read_parquet("s3://bucket-name/*.parquet",
storage_options={"key": "", "secret": ""})
Store as multiple files#
When the data is large, for best performance, it is best for users to store using multiple Parquet files. Xorbits will utilize multiprocessing or distributed workers to read different files in parallel to accelerate reading. Each file will become a Xorbits chunk, and more chunks allow higher concurrency. Generally recommending each Parquet file to be 16MiB ~ 128MiB in size, so there are not too many files but concurrency can be guaranteed.
For example with 200MiB of data, single file:
In [1]: %time print(pd.read_parquet("single.parquet"))
100%|████████████████████████████████████| 100.00/100 [00:01<00:00, 80.31it/s]
col1 col2 col3
0 0.804201 Surface play great information. Make enjoy vot... 0
1 0.602314 Pattern arrive image everyone manager. Traditi... 1
2 0.416683 Mention central gun especially fish family. He... 2
3 0.697665 Congress others become that. Life reveal gener... 3
4 0.774197 Wife though bring inside industry drug. Unit w... 4
... ... ... ...
1999995 0.123357 Through child behavior scene. Character simply... 1999995
1999996 0.983500 Admit laugh peace west recently why free few. ... 1999996
1999997 0.341014 Class necessary event radio material nearly im... 1999997
1999998 0.790413 Operation that interesting summer a identify. ... 1999998
1999999 0.553956 Take receive future situation. Per industry ki... 1999999
[2000000 rows x 3 columns]
CPU times: user 402 ms, sys: 165 ms, total: 567 ms
Wall time: 1.81 s
Stored the same data in a folder with 10 Parquet files, reading the folder:
In [2]: %time print(pd.read_parquet("parquet_dir"))
100%|████████████████████████████████████| 100.00/100 [00:00<00:00, 419.56it/s]
col1 col2 col3
0 0.804201 Surface play great information. Make enjoy vot... 0
1 0.602314 Pattern arrive image everyone manager. Traditi... 1
2 0.416683 Mention central gun especially fish family. He... 2
3 0.697665 Congress others become that. Life reveal gener... 3
4 0.774197 Wife though bring inside industry drug. Unit w... 4
... ... ... ...
1999995 0.123357 Through child behavior scene. Character simply... 1999995
1999996 0.983500 Admit laugh peace west recently why free few. ... 1999996
1999997 0.341014 Class necessary event radio material nearly im... 1999997
1999998 0.790413 Operation that interesting summer a identify. ... 1999998
1999999 0.553956 Take receive future situation. Per industry ki... 1999999
[2000000 rows x 3 columns]
CPU times: user 117 ms, sys: 30.3 ms, total: 147 ms
Wall time: 302 ms
From the running time we can see reading multiple files takes only 1/6 the time of a single file.
Single Parquet file with multiple row groups#
If storing as a single file, splitting into multiple row groups can also allow parallel reading. First use the
row_group_size
parameter to store into multiple row groups.
In [3]: df.to_parquet("all.parquet", row_group_size=20_0000)
When reading, specify groups_as_chunks=True
:
In [4]: %time print(pd.read_parquet("all.parquet", groups_as_chunks=True))
100%|███████████████████████████████████| 100.00/100 [00:00<00:00, 231.36it/s]
col1 col2 col3
0 0.804201 Surface play great information. Make enjoy vot... 0
1 0.602314 Pattern arrive image everyone manager. Traditi... 1
2 0.416683 Mention central gun especially fish family. He... 2
3 0.697665 Congress others become that. Life reveal gener... 3
4 0.774197 Wife though bring inside industry drug. Unit w... 4
... ... ... ...
199995 0.123357 Through child behavior scene. Character simply... 1999995
199996 0.983500 Admit laugh peace west recently why free few. ... 1999996
199997 0.341014 Class necessary event radio material nearly im... 1999997
199998 0.790413 Operation that interesting summer a identify. ... 1999998
199999 0.553956 Take receive future situation. Per industry ki... 1999999
[2000000 rows x 3 columns]
CPU times: user 108 ms, sys: 39.5 ms, total: 147 ms
Wall time: 508 ms
Acceleration can also be achieved.
Use rebalance
to redistribute data#
If unable to modify the data source, having just a single file will cause data skew problems in following
computations. In this case, call df.rebalance
after reading Parquet to evenly distribute the data to each worker
and process.
Reading a single Parquet file and calling apply function then, this does not leverage multi-core parallelism:
In [5]: %time print(pd.read_parquet("all.parquet").apply(lambda row: len(row[1]) * row[2], axis=1))
100%|███████████████████████████████████| 100.00/100 [00:06<00:00, 16.10it/s]
0 0
1 117
2 312
3 519
4 780
...
1999995 205999485
1999996 219999560
1999997 373999439
1999998 397999602
1999999 369999815
Length: 2000000, dtype: int64
CPU times: user 39.9 ms, sys: 11.5 ms, total: 51.4 ms
Wall time: 6.22 s
Upon calling rebalance, the computation will make use of multiple cores, although rebalance
will consume
some additional time, the more subsequent computations, the higher the gain.
In [6]: %time print(pd.read_parquet("all.parquet").rebalance().apply(lambda row: len(row[1]) * row[2], axis=1))
100%|███████████████████████████████████| 100.00/100 [00:04<00:00, 20.16it/s]
0 0
1 117
2 312
3 519
4 780
...
1999995 205999485
1999996 219999560
1999997 373999439
1999998 397999602
1999999 369999815
Length: 2000000, dtype: int64
CPU times: user 163 ms, sys: 46.9 ms, total: 210 ms
Wall time: 4.98 s
After repartitioning data, the computational acceleration of apply saved 20% of the computing time for the whole calculation.