[arXiv'18] Horovod: fast and easy distributed deep learning in TensorFlow 论文阅读

Horovod 是 Uber 于 2017 年发布的一个易于使用的高性能的分布式深度训练框架,支持 TensorFlow,Keras,PyTorch 和 MXNet。

(博主注: 2017 年 Google 刚刚提出 Transformer,2018 年 BERT 发布,2019 年 GPT-2 发布,2020 年 GPT-3 发布。回头看这篇 2017 年的论文仅供学习。)

#1. 引言

近年来,深度学习蓬勃发展。User 公司选择了 TensorFlow 作为其深度学习框架。原因是:

  1. 广泛使用,用户技术大,新用户容易上手
  2. 具有高性能,支持对底层细节的控制: 例如有 Keras 这样的高级 API,也可以通过 CUDA 实现自定义的算子
  3. 支持广泛的使用场景: 从云服务平台到端侧设备

2017 年 Uber 团队提出了 Michelangelo 机器学习平台,本文介绍了其中一个开源组件,Horovod,开源地址在 https://github.com/uber/horovod

#2. 开始分布式

当 Uber 内部的深度学习模型变得越来越多时,模型的大小和数据消耗显著增加。在大多数情况下,单台机器(单卡或者多卡)可以满足需求。但是训练耗时很长,有时需要一周甚至更长的时间。为了解决这个问题,Uber 开始探索分布式训练。

Uber 团队首先使用了标准的 TensorFlow 分布式训练 API。发现两个明显的问题:

  1. 用户不知道如何使用: 做深度学习的人往往不懂底层概念, 不清楚该如何修改自己的代码,Debug 也很麻烦
  2. 可伸缩性: Uber 的服务规模过大,TensorFlow 在这种情况下的性能不佳。例如在 128 卡跑 TF 的 benchmark 时,利用率不到 50%。

#3. 使用不同的算法

Uber 团队尝试了 Facebook Accurate, large minibatch SGD: Training ImageNet in 1 hour 论文中的数据并行(Data Parallel)方法。

从概念上说,DP 方法很简单:每个训练脚本副本读取一个数据库,进行 forward 之后计算梯度,多个副本之间平均梯度,然后更新参数。

标准的 TensorFlow 分布式训练 API 使用了 Parameter Server 方法来平均梯度。虽然能够提高性能,但是遇到两个挑战:

  1. 难以确定计算节点和服务节点的比例
  2. 增加应用程序的复杂性

2017 年初百度开源了一个基于 Ring-AllReduce 方法的 TensorFlow Fork。

在 Ring-AllReduce 方法中,每个节点与其他两个相邻的节点交换梯度,经过多次迭代后,所有节点都拥有相同的梯度。该算法能更好地利用网络带宽。

此外,这种方法也更容易理解,用户使用 MPI API 进行编程,使用mpirun启动训练,修改也更简单。

#4. Horovod

Uber 团队在百度的 Fork 的基础上构建了 Horovod。

  1. 相关代码被放进了一个单独的 Python 包中,方便安装和使用
  2. 使用 NCCL 替换了百度的 Ring-AllReduce 实现
  3. 增加了对单机器多卡的支持,之前只支持单卡
  4. 根据用户反馈进行改进,增加了全局同步的广播操作。用户只需要增加 4 个 API 即可。

#5. 如何使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank(one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...

opt = tf.train.AdagradOptimizer(0.01)

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes
# during initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing
# when done or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir="/tmp/train_logs", config=config,hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# Perform synchronous training.
mon_sess.run(train_op)

用户可以使用

1
2
$ mpirun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
...

来启动训练。-np 16表示使用 16 个进程,-H表示主机列表。

#6. Horovod Timeline

Horovod Timeline 是一个可视化工具,可以帮助用户分析训练过程中的性能瓶颈。

#7. Tensor 聚合

在调用 Ring-AllReduce 之前,Horovod 可以对 Tensor 进行聚合以可以减少通信开销。

#8. 评估

使用 Horovod 运行 Tensorflow 的 Benchmark,训练速度能够达到 Tensorflow 的 2 倍。

由于 MPI 和 NCCL 都支持 RDMA,作者也测试了使用 RDMA 的性能。在某些情况下能够提升。

#FAQ

#1. NCCL 和 MPI 是什么?

NCCL(NVIDIA Collective Communications Library)是 NVIDIA 针对 GPU 设计的一种规约库,可以实现多 GPU 间的直接数据同步,避免内存和显存的,CPU 和 GPU 间的数据拷贝成本。

MPI(Massage Passing Interface)是消息传递函数库的标准规范,主要是被应用在科学计算,尤其是超算领域。由于容错性一般,故在机器学习场景下使用较少。