SLURM 部署#
如果你有一个 SLURM 集群,你可以使用本教程去提交一个 Xorbits 作业。其他HPC作业调度器(例如Torque或者LSF等)与之类似。你需要先阅读 集群部署 以了解一些 Xorbits 集群的基础知识。
安装#
HPC集群有一个多计算节点共享的存储。你可以用 module
,conda
或 mamba
创建一个Python环境,然后用 pip
安装 Xorbits,Xorbits 会被安装到共享的存储上。安装问题请查看 安装文档。
SLURM Xorbits 流程#
在一个SLURM集群,获取和使用计算资源需要使用 sbatch
命令并编写一个 SLURM 脚本文件,SLURM 脚本文件中声明所需计算资源。等这些计算资源被分配后,你可以得到一个主机名列表。
在这个SLURM脚本文件中,你需要启动一个 Xorbits 集群,启动集群需要使用 srun
命令;然后执行 Python 程序,Python 程序连接到刚刚启动的 Xorbits 集群。你需要先启动一个 supervisor,然后启动一系列 worker。
整个流程如下:
向SLURM集群申请所需要的计算资源。
加载环境。
获取计算节点和他们主机名。
在其中一个节点(头节点)启动 supervisor。
在其他节点启动 worker,保证 worker 与头节点连接。
向该 Xorbits 集群提交用户作业。
Script Method#
SLURM 脚本文件#
在这个 SLURM 脚本文件中,你需要告诉 SLURM 分配计算节点给 Xorbits 作业。在本例中,我们申请了4个计算节点,每个计算节点上设置 --cpus-per-task=24
和 --ntasks-per-node=1
,意味着每个计算节点使用24个CPU核。根据你的工作负载来修改这些设置。类似地,你也可以通过 --gpus=1
参数来修改所申请的GPU的数量。你需要将 --partition
改为你的站点所设置的分区。你也可以添加其他SLURM参数。
#!/bin/bash
#SBATCH --job-name=xorbits
#SBATCH --nodes=4
#SBATCH --cpus-per-task=24
#SBATCH --ntasks-per-node=1
#SBATCH --partition=cpu24c
#SBATCH --time=00:30:00
加载环境#
你需要使用 conda
或 module
将 Xorbits 安装到特定环境。计算节点被分配后,环境会被切换到你安装 Xorbits 的环境。本例中,我们将 Xorbits 安装到了一个名为 df
的 conda 环境。
# Example: module load xorbits
# Example: source activate my-env
source activate df
获取节点#
接下来,你需要获取所有被分配的计算节点。
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
现在, nodes_array
是一个数组,它存储着分配给这个作业的节点主机名。
启动 Supervisor#
选择 nodes_array
中的第一个节点作为头节点,头节点用于启动 supervisor ,其他节点用于启动 worker 。
head_node=${nodes_array[0]}
获取到头节点的主机名后,我们将在头节点上运行 supervisor。我们将使用 srun
来启动 supervisor。xorbits-supervisor
是启动 supervisor 的命令。你必须配置主机名,端口和 web 端口。注意,你需要让程序休眠几秒,因为 supervisor 需要一些时间来启动。否则,其他 worker 节点可能无法连接上 supervisor。
port=16380
web_port=16379
echo "Starting SUPERVISOR at ${head_node}"
srun --nodes=1 --ntasks=1 -w "${head_node}" \
xorbits-supervisor -H "${head_node}" -p "${port}" -w "${web_port}" &
sleep 10
启动 Worker#
其他的机器被用来作为 worker:
# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
port_i=$((port + i))
echo "Starting WORKER $i at ${node_i}"
srun --nodes=1 --ntasks=1 -w "${node_i}" \
xorbits-worker -H "${node_i}" -p "${port_i}" -s "${head_node}":"${port}" &
done
sleep 5
连接到创建的集群#
至此,Xorbits 集群已创建。address
是访问这个集群的地址,你通过这个地址可以连接到 supervisor,然后提交你的 Xorbits 作业。
address=http://"${head_node}":"${web_port}"
python -u test.py --endpoint "${address}"
test.py
像这样:
import argparse
import xorbits
import xorbits.numpy as np
parser = argparse.ArgumentParser(description="test")
parser.add_argument(
"--endpoint",
type=str,
default="0.0.0.0",
required=True,
)
args = parser.parse_args()
xorbits.init(args.endpoint)
print(np.random.rand(100, 100).mean())
将 SLURM 脚本文件命名为 xorbits_slurm.sh
。提交这个作业:
sbatch xorbits_slurm.sh
整合汇总#
将以上汇总, SLURM 脚本文件像这样:
#!/bin/bash
#SBATCH --job-name=xorbits
#SBATCH --nodes=4
#SBATCH --cpus-per-task=24
#SBATCH --ntasks-per-node=1
#SBATCH --partition=cpu24c
#SBATCH --time=00:30:00
source activate df
### Use the debug mode to see if the shell commands are correct.
### If you do not want the shell command logs, delete the following line.
set -x
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
head_node=${nodes_array[0]}
port=16380
web_port=16379
echo "Starting SUPERVISOR at ${head_node}"
srun --nodes=1 --ntasks=1 -w "${head_node}" \
xorbits-supervisor -H "${head_node}" -p "${port}" -w "${web_port}" &
sleep 10
# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
port_i=$((port + i))
echo "Starting WORKER $i at ${node_i}"
srun --nodes=1 --ntasks=1 -w "${node_i}" \
xorbits-worker -H "${node_i}" -p "${port_i}" -s "${head_node}":"${port}" &
done
sleep 5
address=http://"${head_node}":"${web_port}"
python -u test.py --endpoint "${address}"
Code Method#
Initialization#
To create an instance of the SLURMCluster class, you can use the following parameters:
job_name (str, optional): Name of the Slurm job.
num_nodes (int, optional): Number of nodes in the Slurm cluster.
partition_option (str, optional): Request a specific partition for resource allocation.
load_env (str, optional): Conda Environment to load.
output_path (str, optional): Path for log output.
error_path (str, optional): Path for log errors.
work_dir (str, optional): Slurm’s working directory, the default location for logs and results.
time (str, optional): Minimum time limit for job allocation.
processes (int, optional): Number of processes.
cores (int, optional): Number of cores.
memory (str, optional): Specify the real memory required per node. Default units are megabytes.
account (str, optional): Charge resources used by this job to the specified account.
webport (int, optional): Xorbits’ web port.
**kwargs: Additional parameters that can be added using the Slurm interface.
from xorbits.deploy.slurm import SLURMCluster
cluster = SLURMCluster(
job_name="my_job",
num_nodes=4,
partition_option="compute",
load_env="my_env",
output_path="logs/output.log",
error_path="logs/error.log",
work_dir="/path/to/work_dir",
time="1:00:00",
processes=8,
cores=2,
memory="8G",
account="my_account",
webport=16379,
custom_param1="value1",
custom_param2="value2"
)
备注
Modify the parameters as needed for your specific use case.
Running the Job#
To submit the job to SLURM, use the run() method. It will return the job’s address.
address = cluster.run()
Getting Job Information#
get_job_id(): This method extracts the job ID from the output of the sbatch command.
job_id = cluster.get_job_id()
cancel_job(): This method cancels the job using the scancel command. A hook is designed so that while canceling the program, the Slurm task will also be canceled.
cluster.cancel_job(job_id)
update_head_node(): This method retrieves the head node information from the SLURM job.
cluster.update_head_node()
get_job_address(retry_attempts=10, sleep_interval=30): This method retrieves the job address after deployment. It retries several times to get the job data.
job_address = cluster.get_job_address(retry_attempts=10, sleep_interval=30)
Example#
Here’s an example of how to use the SLURMCluster class
import pandas as pd
from xorbits.deploy.slurm import SLURMCluster
test_cluster = SLURMCluster(
job_name="xorbits",
num_nodes=2,
output_path="/shared_space/output.out",
time="00:30:00",
)
address = test_cluster.run()
xorbits.init(address)
assert pd.Series([1, 2, 3]).sum() == 6