SLURM 部署#

如果你有一个 SLURM 集群,你可以使用本教程去提交一个 Xorbits 作业。其他HPC作业调度器(例如Torque或者LSF等)与之类似。你需要先阅读 集群部署 以了解一些 Xorbits 集群的基础知识。

安装#

HPC集群有一个多计算节点共享的存储。你可以用 modulecondamamba 创建一个Python环境,然后用 pip 安装 Xorbits,Xorbits 会被安装到共享的存储上。安装问题请查看 安装文档

SLURM Xorbits 流程#

在一个SLURM集群,获取和使用计算资源需要使用 sbatch 命令并编写一个 SLURM 脚本文件,SLURM 脚本文件中声明所需计算资源。等这些计算资源被分配后,你可以得到一个主机名列表。

在这个SLURM脚本文件中,你需要启动一个 Xorbits 集群,启动集群需要使用 srun 命令;然后执行 Python 程序,Python 程序连接到刚刚启动的 Xorbits 集群。你需要先启动一个 supervisor,然后启动一系列 worker。

整个流程如下:

  1. 向SLURM集群申请所需要的计算资源。

  2. 加载环境。

  3. 获取计算节点和他们主机名。

  4. 在其中一个节点(头节点)启动 supervisor。

  5. 在其他节点启动 worker,保证 worker 与头节点连接。

  6. 向该 Xorbits 集群提交用户作业。

Script Method#

SLURM 脚本文件#

在这个 SLURM 脚本文件中,你需要告诉 SLURM 分配计算节点给 Xorbits 作业。在本例中,我们申请了4个计算节点,每个计算节点上设置 --cpus-per-task=24--ntasks-per-node=1,意味着每个计算节点使用24个CPU核。根据你的工作负载来修改这些设置。类似地,你也可以通过 --gpus=1 参数来修改所申请的GPU的数量。你需要将 --partition 改为你的站点所设置的分区。你也可以添加其他SLURM参数。

#!/bin/bash
#SBATCH --job-name=xorbits
#SBATCH --nodes=4
#SBATCH --cpus-per-task=24
#SBATCH --ntasks-per-node=1
#SBATCH --partition=cpu24c
#SBATCH --time=00:30:00

加载环境#

你需要使用 condamodule 将 Xorbits 安装到特定环境。计算节点被分配后,环境会被切换到你安装 Xorbits 的环境。本例中,我们将 Xorbits 安装到了一个名为 df 的 conda 环境。

# Example: module load xorbits
# Example: source activate my-env

source activate df

获取节点#

接下来,你需要获取所有被分配的计算节点。

# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)

现在, nodes_array 是一个数组,它存储着分配给这个作业的节点主机名。

启动 Supervisor#

选择 nodes_array 中的第一个节点作为头节点,头节点用于启动 supervisor ,其他节点用于启动 worker 。

head_node=${nodes_array[0]}

获取到头节点的主机名后,我们将在头节点上运行 supervisor。我们将使用 srun 来启动 supervisor。xorbits-supervisor 是启动 supervisor 的命令。你必须配置主机名,端口和 web 端口。注意,你需要让程序休眠几秒,因为 supervisor 需要一些时间来启动。否则,其他 worker 节点可能无法连接上 supervisor。

port=16380
web_port=16379

echo "Starting SUPERVISOR at ${head_node}"
srun --nodes=1 --ntasks=1 -w "${head_node}" \
    xorbits-supervisor -H "${head_node}" -p "${port}" -w "${web_port}" &
sleep 10

启动 Worker#

其他的机器被用来作为 worker:

# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))

for ((i = 1; i <= worker_num; i++)); do
    node_i=${nodes_array[$i]}
    port_i=$((port + i))

    echo "Starting WORKER $i at ${node_i}"
    srun --nodes=1 --ntasks=1 -w "${node_i}" \
        xorbits-worker -H "${node_i}"  -p "${port_i}" -s "${head_node}":"${port}" &
done
sleep 5

连接到创建的集群#

至此,Xorbits 集群已创建。address 是访问这个集群的地址,你通过这个地址可以连接到 supervisor,然后提交你的 Xorbits 作业。

address=http://"${head_node}":"${web_port}"

python -u test.py --endpoint "${address}"

test.py 像这样:

import argparse

import xorbits
import xorbits.numpy as np

parser = argparse.ArgumentParser(description="test")
parser.add_argument(
    "--endpoint",
    type=str,
    default="0.0.0.0",
    required=True,
)

args = parser.parse_args()

xorbits.init(args.endpoint)
print(np.random.rand(100, 100).mean())

将 SLURM 脚本文件命名为 xorbits_slurm.sh。提交这个作业:

sbatch xorbits_slurm.sh

整合汇总#

将以上汇总, SLURM 脚本文件像这样:

#!/bin/bash

#SBATCH --job-name=xorbits
#SBATCH --nodes=4
#SBATCH --cpus-per-task=24
#SBATCH --ntasks-per-node=1
#SBATCH --partition=cpu24c
#SBATCH --time=00:30:00

source activate df

### Use the debug mode to see if the shell commands are correct.
### If you do not want the shell command logs, delete the following line.
set -x

# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)

head_node=${nodes_array[0]}
port=16380
web_port=16379

echo "Starting SUPERVISOR at ${head_node}"
srun --nodes=1 --ntasks=1 -w "${head_node}" \
    xorbits-supervisor -H "${head_node}" -p "${port}" -w "${web_port}" &
sleep 10

# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))

for ((i = 1; i <= worker_num; i++)); do
    node_i=${nodes_array[$i]}
    port_i=$((port + i))

    echo "Starting WORKER $i at ${node_i}"
    srun --nodes=1 --ntasks=1 -w "${node_i}" \
        xorbits-worker -H "${node_i}"  -p "${port_i}" -s "${head_node}":"${port}" &
done
sleep 5

address=http://"${head_node}":"${web_port}"

python -u test.py --endpoint "${address}"

Code Method#

Initialization#

To create an instance of the SLURMCluster class, you can use the following parameters:

  • job_name (str, optional): Name of the Slurm job.

  • num_nodes (int, optional): Number of nodes in the Slurm cluster.

  • partition_option (str, optional): Request a specific partition for resource allocation.

  • load_env (str, optional): Conda Environment to load.

  • output_path (str, optional): Path for log output.

  • error_path (str, optional): Path for log errors.

  • work_dir (str, optional): Slurm’s working directory, the default location for logs and results.

  • time (str, optional): Minimum time limit for job allocation.

  • processes (int, optional): Number of processes.

  • cores (int, optional): Number of cores.

  • memory (str, optional): Specify the real memory required per node. Default units are megabytes.

  • account (str, optional): Charge resources used by this job to the specified account.

  • webport (int, optional): Xorbits’ web port.

  • **kwargs: Additional parameters that can be added using the Slurm interface.

from xorbits.deploy.slurm import SLURMCluster
cluster = SLURMCluster(
      job_name="my_job",
      num_nodes=4,
      partition_option="compute",
      load_env="my_env",
      output_path="logs/output.log",
      error_path="logs/error.log",
      work_dir="/path/to/work_dir",
      time="1:00:00",
      processes=8,
      cores=2,
      memory="8G",
      account="my_account",
      webport=16379,
      custom_param1="value1",
      custom_param2="value2"
)

备注

Modify the parameters as needed for your specific use case.

Running the Job#

To submit the job to SLURM, use the run() method. It will return the job’s address.

address = cluster.run()

Getting Job Information#

  • get_job_id(): This method extracts the job ID from the output of the sbatch command.

job_id = cluster.get_job_id()
  • cancel_job(): This method cancels the job using the scancel command. A hook is designed so that while canceling the program, the Slurm task will also be canceled.

cluster.cancel_job(job_id)
  • update_head_node(): This method retrieves the head node information from the SLURM job.

cluster.update_head_node()
  • get_job_address(retry_attempts=10, sleep_interval=30): This method retrieves the job address after deployment. It retries several times to get the job data.

job_address = cluster.get_job_address(retry_attempts=10, sleep_interval=30)

Example#

Here’s an example of how to use the SLURMCluster class

import pandas as pd
from xorbits.deploy.slurm import SLURMCluster

test_cluster = SLURMCluster(
      job_name="xorbits",
      num_nodes=2,
      output_path="/shared_space/output.out",
      time="00:30:00",
  )
address = test_cluster.run()
xorbits.init(address)
assert pd.Series([1, 2, 3]).sum() == 6