流水线并行

DeepSpeed v0.3 包含对流水线并行的全新支持!流水线并行通过将模型的层划分为可以并行处理的阶段,从而提高深度学习训练的内存和计算效率。DeepSpeed 的训练引擎提供混合数据和流水线并行,并且可以进一步与模型并行(例如 Megatron-LM)结合使用。下图显示了 3D 并行的示意图。我们最新的 结果 表明,这种 3D 并行使训练具有超过 **万亿** 个参数的模型成为可能。

3D parallelism in DeepSpeed

DeepSpeed 使用梯度累积来提取流水线并行(如下所示)。每个训练数据批次被划分为可以由流水线阶段并行处理的微批次。一旦一个阶段完成了一个微批次的前向传递,激活内存就会传递到流水线中的下一个阶段。类似地,当下一个阶段完成对一个微批次的向后传递时,关于激活的梯度会通过流水线向后传递。每个向后传递都会在本地累积梯度。接下来,所有数据并行组并行执行梯度的归约。最后,优化器更新模型权重。

下图说明了 DeepSpeed 如何使用混合双路数据并行和两阶段流水线并行来训练一个包含八个微批次的批次。GPU 0 和 2 以流水线方式排列,并将交替进行前向 (F) 和后向 (B) 传递。然后,它们将分别与其数据并行对应项 GPU 1 和 3 进行全归约 (AR) 梯度。最后,两个流水线阶段更新其模型权重。

Pipeline Schedule

流水线并行入门

DeepSpeed 致力于加速简化流水线并行训练的过程。本节通过准备 torchvisionAlexNet 模型,提供了混合数据和流水线并行训练的第一步。

表达流水线模型

流水线并行要求模型表示为一系列层。在前向传递中,每一层都使用前一层的输出。实际上,无需为流水线并行模型指定 forward()!流水线并行模型的前向传递隐式地采用以下形式

def forward(self, inputs):
    x = inputs
    for layer in self.layers:
        x = layer(x)
    return x

PyTorch 的 torch.nn.Sequential 是表达流水线并行模型的便捷容器,并且可以由 DeepSpeed 无需修改地进行并行化

net = nn.Sequential(
    nn.Linear(in_features, hidden_dim),
    nn.ReLU(inplace=True),
    nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)

PipelineModule 使用其 layers 参数作为构成模型的层序列。初始化后,net 被分成两个流水线阶段,并且其层被移动到相应的 GPU。如果存在两个以上的 GPU,DeepSpeed 还将使用混合数据并行。

注意:GPU 的总数必须能够被流水线阶段数整除。

注意:对于大型模型训练,请参阅 内存高效的模型构建

AlexNet

让我们看一下 torchvisionAlexNet 的简化实现

class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            ...
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            ...
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

AlexNet 主要由几个 Sequential 子模块组成。我们可以通过将其子模块展平为单个层序列来将其转换为 PipelineModule

class AlexNetPipe(AlexNet):
    def to_layers(self):
        layers = [
            *self.features,
            self.avgpool,
            lambda x: torch.flatten(x, 1),
            *self.classifier
        ]
        return layers

from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)

注意:上面 layers 中间的 lambda 不是 torch.nn.Module 类型。任何实现了 __call__() 的对象都可以是 PipelineModule 中的一层:这允许在流水线中进行方便的数据转换。

输入和输出

遵循 torch.nn.Sequential,每一层的输入和输出必须是单个 torch.Tensor 或张量 tuple。在实践中,某些模型可能需要修改其前向传递以将参数打包和解包到 forward() 中。考虑 Transformer 块堆栈的简化实现

class TransformerBlock(nn.Module)
    ...
    def forward(self, hidden, mask):
        output = self.compute(hidden, mask)
        return output
    ...

stack = [ TransformerBlock() for _ in range(num_layers) ]

需要对 TransformerBlock 进行两次修改

  1. 参数必须收集到一个 tuple 中。
  2. mask 也必须从 forward() 返回以传递到下一层。

这些修改可以通过一个简短的子类来完成

class TransformerBlockPipe(TransformerBlock)
    def forward(self, inputs):
        hidden, mask = inputs
        output = super().forward(hidden, mask)
        return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]

训练循环

流水线并行交替进行前向和后向传递,因此训练循环不能划分为 forward()backward()step() 的单独阶段。相反,DeepSpeed 的流水线引擎提供了一个 train_batch() 方法,该方法推进流水线引擎,直到消耗下一批训练数据并更新模型权重。

train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)

以上 train_batch() 示例等效于使用传统数据并行 DeepSpeed 的以下示例

train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
    batch = next(data_iter)
    loss = engine(batch)
    engine.backward(loss)
    engine.step()

处理数据

数据并行训练通常让每个工作器在每个批次的开始独立执行 IO。但是,在流水线并行环境中,只有第一个阶段使用输入数据,只有最后一个阶段使用标签进行损失计算。

注意:流水线引擎期望数据加载器返回一个包含两项的 tuple。返回的第一项是输入批次数据,第二项是用于损失计算的数据。与之前一样,输入和标签应该是 torch.Tensor 类型或张量 tuple

为方便起见,当数据集提供给 deepspeed.initialize() 时,DeepSpeed 流水线引擎可以构建一个分布式数据加载器。DeepSpeed 处理数据加载的其余复杂性,因此流水线训练循环变为

engine, _, _, _ = deepspeed.initialize(
    args=args,
    model=net,
    model_parameters=[p for p in net.parameters() if p.requires_grad],
    training_data=cifar_trainset())

for step in range(args.steps):
    loss = engine.train_batch()

当然,DeepSpeed 可以与您希望使用的任何数据加载器一起使用。数据加载器应由流水线中的第一阶段和最后一阶段构建。每个工作器应加载大小为 engine.train_micro_batch_size_per_gpu() 的微批次,并且每个 train_batch() 将被查询总共 engine.gradient_accumulation_steps() 次。

注意!流水线引擎迭代器中提取数据,而不是迭代它。在训练批次中间数据流不为空至关重要。每次调用 train_batch() 将从数据迭代器中提取总共 engine.gradient_accumulation_steps() 个微批次的数据。

DeepSpeed 提供了一个便捷类 deepspeed.utils.RepeatingLoader,它只是包装了一个可迭代对象(例如数据加载器)并在每次到达末尾时重新启动它

train_loader = deepspeed.utils.RepeatingLoader(train_loader)
train_iter = iter(train_loader)
for step in range(args.steps):
    loss = engine.train_batch(data_iter=train_iter)

高级主题

流水线模块负载均衡

流水线并行训练的性能很大程度上依赖于负载均衡。DeepSpeed 提供了几种跨 GPU 分区模型的机制。这些策略可以通过 PipelineModulepartition_method 关键字参数来设置。以下是 DeepSpeed 当前提供的分区方法

  • partition_method="parameters"(**默认**)平衡每个流水线阶段的可训练参数数量。这在内存受限的环境中以及当层的尺寸与计算时间成比例时特别有用。
  • partition_method="type:[regex]" 平衡其类名与 [regex] 匹配的层。正则表达式不区分大小写。例如,partition_method="type:transformer" 将平衡每个阶段的 Transformer 层数。
  • partition_method="uniform" 平衡每个阶段的层数。

内存高效的模型构建

构建 Sequential 容器并将其提供给 PipelineModule 是指定流水线并行模型的便捷方法。但是,这种方法在大型模型中会遇到可扩展性问题,因为每个工作器都会在 CPU 内存中复制整个模型。例如,一台具有 16 个 GPU 的机器必须拥有 16 倍于模型大小的本地 CPU 内存。

DeepSpeed 提供了一个 LayerSpec 类,该类将模块的构建延迟到模型层已跨工作器分区之后。然后,每个工作器将只分配分配给它的层。因此,与上一段的示例相比,使用 LayerSpec,一台具有 16 个 GPU 的机器只需要在其 CPU 内存上分配 1 倍模型大小,而不是 16 倍。

这是一个简化版 AlexNet 模型的示例,但仅使用 LayerSpec 来表示。请注意,语法几乎没有变化:nn.ReLU(inplace=True) 只是变成了 LayerSpec(nn.ReLU, inplace=True)

from deepspeed.pipe import PipelineModule, LayerSpec
class AlexNetPipe(PipelineModule):
    def __init__(self, num_classes=10, **kwargs):
        self.num_classes = num_classes
        specs = [
            LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=2),
            LayerSpec(nn.ReLU, inplace=True),
            ...
            LayerSpec(nn.ReLU, inplace=True),
            LayerSpec(nn.Linear, 4096, self.num_classes),
        ]
        super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)

绑定层

有些模型无法完全表示为流水线并行模型,因为某些层在流水线中被重复使用。例如,基于 Transformer 的语言模型通常在流水线的早期使用嵌入层将词汇映射到隐藏状态,然后在流水线的末尾使用嵌入层将隐藏状态映射回词汇。如果模型仅限于纯流水线并行,则此嵌入重复使用将禁止流水线并行。

DeepSpeed 提供了一个 TiedLayerSpec,它是 LayerSpec 的扩展。 TiedLayerSpec 需要一个额外的参数:key。每个层的重复使用都用 TiedLayerSpec 指定,key 字段用于识别层在何处被重复使用。

绑定层在拥有重复使用实例的每个流水线阶段上进行复制。然后,训练照常进行,但在所有反向传播完成后,会额外添加一个绑定的梯度的全减操作。全减操作确保绑定层的权重在流水线阶段之间保持同步。

更新: