Loading data#

Parquet files#

Parquet is a very popular columnar storage file format. Xorbits Pandas supports parallel reading and writing of Parquet files. Here we will introduce how to use these functions and some best practice recommendations.

xorbits.pandas.read_parquet() accepts the following input forms:

  • Single Parquet file

  • Folder contains parquet files

  • String with wildcards

All these can be local files or remote storage.

import xorbits.pandas as pd

# single local file
df = pd.read_parquet("local.parquet")

# S3 directory
df = pd.read_parquet("s3://bucket-name/parquet/files",
                     storage_options={"key": "", "secret": ""})

# wildcard in path
df = pd.read_parquet("s3://bucket-name/*.parquet",
                     storage_options={"key": "", "secret": ""})

Store as multiple files#

When the data is large, for best performance, it is best for users to store using multiple Parquet files. Xorbits will utilize multiprocessing or distributed workers to read different files in parallel to accelerate reading. Each file will become a Xorbits chunk, and more chunks allow higher concurrency. Generally recommending each Parquet file to be 16MiB ~ 128MiB in size, so there are not too many files but concurrency can be guaranteed.

For example with 200MiB of data, single file:

In [1]: %time print(pd.read_parquet("single.parquet"))
100%|████████████████████████████████████| 100.00/100 [00:01<00:00, 80.31it/s]
             col1                                               col2     col3
0        0.804201  Surface play great information. Make enjoy vot...        0
1        0.602314  Pattern arrive image everyone manager. Traditi...        1
2        0.416683  Mention central gun especially fish family. He...        2
3        0.697665  Congress others become that. Life reveal gener...        3
4        0.774197  Wife though bring inside industry drug. Unit w...        4
...           ...                                                ...      ...
1999995  0.123357  Through child behavior scene. Character simply...  1999995
1999996  0.983500  Admit laugh peace west recently why free few. ...  1999996
1999997  0.341014  Class necessary event radio material nearly im...  1999997
1999998  0.790413  Operation that interesting summer a identify. ...  1999998
1999999  0.553956  Take receive future situation. Per industry ki...  1999999

[2000000 rows x 3 columns]
CPU times: user 402 ms, sys: 165 ms, total: 567 ms
Wall time: 1.81 s

Stored the same data in a folder with 10 Parquet files, reading the folder:

In [2]: %time print(pd.read_parquet("parquet_dir"))
100%|████████████████████████████████████| 100.00/100 [00:00<00:00, 419.56it/s]
             col1                                               col2     col3
0        0.804201  Surface play great information. Make enjoy vot...        0
1        0.602314  Pattern arrive image everyone manager. Traditi...        1
2        0.416683  Mention central gun especially fish family. He...        2
3        0.697665  Congress others become that. Life reveal gener...        3
4        0.774197  Wife though bring inside industry drug. Unit w...        4
...           ...                                                ...      ...
1999995  0.123357  Through child behavior scene. Character simply...  1999995
1999996  0.983500  Admit laugh peace west recently why free few. ...  1999996
1999997  0.341014  Class necessary event radio material nearly im...  1999997
1999998  0.790413  Operation that interesting summer a identify. ...  1999998
1999999  0.553956  Take receive future situation. Per industry ki...  1999999

[2000000 rows x 3 columns]
CPU times: user 117 ms, sys: 30.3 ms, total: 147 ms
Wall time: 302 ms

From the running time we can see reading multiple files takes only 1/6 the time of a single file.

Single Parquet file with multiple row groups#

If storing as a single file, splitting into multiple row groups can also allow parallel reading. First use the row_group_size parameter to store into multiple row groups.

In [3]: df.to_parquet("all.parquet", row_group_size=20_0000)

When reading, specify groups_as_chunks=True:

In [4]: %time print(pd.read_parquet("all.parquet", groups_as_chunks=True))
100%|███████████████████████████████████| 100.00/100 [00:00<00:00, 231.36it/s]
            col1                                               col2     col3
0       0.804201  Surface play great information. Make enjoy vot...        0
1       0.602314  Pattern arrive image everyone manager. Traditi...        1
2       0.416683  Mention central gun especially fish family. He...        2
3       0.697665  Congress others become that. Life reveal gener...        3
4       0.774197  Wife though bring inside industry drug. Unit w...        4
...          ...                                                ...      ...
199995  0.123357  Through child behavior scene. Character simply...  1999995
199996  0.983500  Admit laugh peace west recently why free few. ...  1999996
199997  0.341014  Class necessary event radio material nearly im...  1999997
199998  0.790413  Operation that interesting summer a identify. ...  1999998
199999  0.553956  Take receive future situation. Per industry ki...  1999999

[2000000 rows x 3 columns]
CPU times: user 108 ms, sys: 39.5 ms, total: 147 ms
Wall time: 508 ms

Acceleration can also be achieved.

Use rebalance to redistribute data#

If unable to modify the data source, having just a single file will cause data skew problems in following computations. In this case, call df.rebalance after reading Parquet to evenly distribute the data to each worker and process.

Reading a single Parquet file and calling apply function then, this does not leverage multi-core parallelism:

In [5]: %time print(pd.read_parquet("all.parquet").apply(lambda row: len(row[1]) * row[2], axis=1))
100%|███████████████████████████████████| 100.00/100 [00:06<00:00, 16.10it/s]
0                  0
1                117
2                312
3                519
4                780
             ...
1999995    205999485
1999996    219999560
1999997    373999439
1999998    397999602
1999999    369999815
Length: 2000000, dtype: int64
CPU times: user 39.9 ms, sys: 11.5 ms, total: 51.4 ms
Wall time: 6.22 s

Upon calling rebalance, the computation will make use of multiple cores, although rebalance will consume some additional time, the more subsequent computations, the higher the gain.

In [6]: %time print(pd.read_parquet("all.parquet").rebalance().apply(lambda row: len(row[1]) * row[2], axis=1))
100%|███████████████████████████████████| 100.00/100 [00:04<00:00, 20.16it/s]
0                  0
1                117
2                312
3                519
4                780
             ...
1999995    205999485
1999996    219999560
1999997    373999439
1999998    397999602
1999999    369999815
Length: 2000000, dtype: int64
CPU times: user 163 ms, sys: 46.9 ms, total: 210 ms
Wall time: 4.98 s

After repartitioning data, the computational acceleration of apply saved 20% of the computing time for the whole calculation.