Pytorch DDP使用方法以及注意点

本文最后更新于:2021年12月10日 中午

Pytorch DDP使用方法以及注意点

[TOC]

Pytorch的DDP指的是DistributedDataParallel,位于torch.nn.parallel中,用于多GPU的模型训练。相比于之前的DP,DDP的速度快了很多。DDP支持多卡多机器,但我没有多机器,所以本文针对最常用的单机器多卡。

原理

DDP加速的原理是通过启动多个进程,提高同时训练的batch size来增加并行度的,每一个进程都会加载一个模型,用不同的数据进行训练之后得到各自的梯度,然后通过Ring-Reduce算法获得所有进程的梯度,然后进行相同的梯度下降。注意在训练前和训练后,所有进程的模型参数都是同步了的。

Ring-Reduce是很简单理解的一个算法:每个进程都从左手边获得一份梯度,然后从右手发送一份梯度(一份指的是一个GPU得出的梯度),经过$n$次迭代之后,所有进程都获得了相同的完整的梯度。如下图,假设梯度$\nabla w$是GPU0算出来的,第一次$\nabla w$被发送到GPU1,第二次被GPU1发送到GPU2,第三次被GPU2发送到GPU3,第四次被发送到GPU4,第五次被发回给GPU0。这样每个进程都只需要接收一个,发送一个,而且能够清楚知道什么时候结束。

基础概念

先来了解基础概念:

  • world_size:并行数,即总共用的卡数。
  • rank:当前进程全局序号,范围0~world_size。
  • local_rank:本地序号,由于本文介绍单机器,所以和上面一样。
  • master_port:DDP需要进程间传递数据,所以需要使用端口。
  • master_address:同上,需要使用IP地址。

使用方法

基本逻辑是这样:

graph
A[初始化多进程并获取rank号]-->B[使用DistributedSampler提供数据]
B --> C[将数据和模型都放在GPU上]
C --> D[将模型用DDP包裹]
D --> E[训练]
E --> F[loss.backward同步梯度]
F --> G[保存参数]

初始化多进程、获取rank号

1
2
3
4
5
6
7
8
9
# 使用这种方法获取local_rank
# 其他获取local_rank的方法已经被弃用
local_rank = int(os.environ["LOCAL_RANK"])
# 设置device
torch.cuda.set_device(local_rank)
# 用nccl后端初始化多进程,一般都用这个
dist.init_process_group(backend='nccl')
# 获取device,之后的模型和张量都.to(device)
device = torch.device("cuda", local_rank)

数据

对于$n$张卡上的$n$个模型,我们当然希望将数据分成不同的$n$部分,分别送给它们训练,所以需要使用torch.utils.data.distributed.DistributedSampler帮助我们自动分配数据。

注意!只有训练集才要用Sampler!在train之后,经常使用验证集对数据集进行验证得到validation_loss,此时没有必要使用多卡,只需要在一个进程上进行验证。

在多卡模式下要进行只在一个进程上的操作,通过model.module(inputs)而不是model(inputs)来调用forward()前向传播,而其他进程通过torch.distributed.barrier()来等待主进程完成validate操作。

假如要多卡推理,参考这篇文章写一个新的sampler[原创][深度][PyTorch] DDP系列第三篇:实战与技巧 - 知乎 (zhihu.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
train_dataset = MyDataset()
# shuffle不在dataloader中设置,而是在sampler中设置
train_sampler = DistributedSampler(train_dataset, shuffle=True)
# batch_size指的是一张卡上的batch_size,总batch_size应该是要乘并行数
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=64,
sampler=train_sampler)

for epoch in range(EPOCHS):
# 每轮开始前需要调用这个方法进行正确的数据shuffle
# 否则每一轮用的都是相同的顺序
train_sampler.set_epoch(epoch)
...

把数据和模型放在GPU上

1
2
3
4
# 直接to(device)即可,注意要收到返回值
model = YourModel()
model = model.to(device)
your_data = your_data.to(device)

把模型用DDP包裹

1
2
3
4
# local_rank就是进程号
# 从此之后,调用model()就是用DDP模式的前向传播
# 要使用原始的前向传播,需要model.module()
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

训练、同步梯度

1
2
3
4
5
6
7
8
9
10
11
12
# 仍然可以直接调用模型的train()方法
# 但是假如要调用其他你自己写的方法,就得model.module.func()
model.train()
for data in dataloader:
loss = model(data)
optimizer.zero_grad()
loss.backward() # 这个操作自动同步梯度
optimizer.step()
# 但是仍然需要累加得到所有进程loss的值的和
dist.all_reduce(loss, op=dist.ReduceOp.SUM)
# 然后除以并行数,就是这个batch的loss值了
loss /= world_size

保存参数

1
2
# 保存的是参数,不需要DDP包裹
torch.save(model.module.state_dict())

运行

1
2
3
4
# 通过外部命令运行 
# 通过CUDA_VISIBLE_DEVICES控制可见的卡数
# 通过--nproc_per_node确定使用多少卡
CUDA_VISIBLE_DEVICES="0,1,2,3" python -m torch.distributed.run --nproc_per_node 4 train.py

DDP注意点复习!

  1. 要把模型和数据放在进程对应的那张卡上
  2. 要使用Sampler来分发训练数据,并且shuffle不设置在Dataloder中而是Sampler中,每个epoch还需要调用Sampler的set_epoch()方法。
  3. 训练和验证区分较大,验证一般在主进程中进行一次验证即可,不需要sampler,操作和单卡一样,之后将数据同步给其他进程。
  4. 在多卡时要调用模型的其他方法或者使用单卡的模式,需要用model.module来获得原始模型,同样保存参数时也保存的是model.module的参数而不是DDP包裹的。

DDP小技巧

数据同步

使用dist.all_reduce(loss, op=dist.ReduceOp.SUM)可以同步tensor的数据,由于算法限制,要算平均值只能用求和运算dist.ReduceOp.SUM之后再除以world_size

假如要同步的不是tensor,可以创建Tensor然后放进对应的GPU,再同步。

假如需要获得每个进程的某个tensor的值(即有n个GPU就获得n个值),那么使用dist.all_gather可以获得tensor列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 官网API文档
# All tensors below are of torch.int64 dtype.
# We have 2 process groups, 2 ranks.
tensor_list = [torch.zeros(2, dtype=torch.int64) for _ in range(2)]
tensor_list
# [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1
tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
tensor
# tensor([1, 2]) # Rank 0
# tensor([3, 4]) # Rank 1
dist.all_gather(tensor_list, tensor)
tensor_list
# [tensor([1, 2]), tensor([3, 4])] # Rank 0
# [tensor([1, 2]), tensor([3, 4])] # Rank 1

同步数据时假如要控制所有进程同时,可以使用torch.distributed.barrier(),让快的进程等一下慢的进程,假如timeout了,可以看看代码是否能优化,或者在运行之前提供参数提高timeout的值。

假如dist.barrier()失效,可能是这种情况DistributedDataParallel barrier doesn’t work as expected during evaluation - distributed - PyTorch Forums

参考文献:

[原创][深度][PyTorch] DDP系列第一篇:入门教程 - 知乎 (zhihu.com)

ring allreduce和tree allreduce的具体区别是什么? - 知乎 (zhihu.com)

DistributedDataParallel barrier doesn’t work as expected during evaluation - distributed - PyTorch Forums