Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kunlun]Multi xpu dygraph performance optimization , add distributed.spawn support for multi xpu and some bug-fixes #31130

Merged
merged 15 commits into from
Mar 5, 2021

Conversation

vslyu
Copy link
Contributor

@vslyu vslyu commented Feb 22, 2021

PR types

Performance optimization

PR changes

Others

Describe

Lists of major modifications and bug-fix about Baidu Kunlun XPU:

  1. Multi xpu dygraph training performance optimization.
    add new threads for multi xpu communication in imperative/reducer
  2. Add distributed.spawn support for multi xpu. PR31032(merged into this PR)
    register FLAGS_selected_xpus into pybind/global_value_getter_setter, add xpu interface in distributed/spawn and
    distributed/utils. example at the end.
  3. Export bkcl_comm_num interface for python
    add bkcl_comm_num interface in pybind/pybind, and remove the limit of ‘num_threads = 1’ in fluid/compiler.
  4. Fix extend device id(>10) to multi strings bug in fleet/launch_utils.
    replace extend with append in fleet/launch_utils.
  5. Fix some error info and macro definition in collective/c_comm_init_op and collective/gen_bkcl_id_op.

use environment variables:
export FLAGS_selected_xpus = 0,1,2,3
specifies these cards to run.

spawn example code:

from __future__ import print_function

import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist

class LinearNet(nn.Layer):
    def __init__(self):
        super(LinearNet, self).__init__()
        self._linear1 = nn.Linear(10, 10)
        self._linear2 = nn.Linear(10, 1)

    def forward(self, x):
        return self._linear2(self._linear1(x))

def train(print_result=False):
    # 1. initialize parallel environment
    dist.init_parallel_env()

    # 2. create data parallel layer & optimizer
    layer = LinearNet()
    dp_layer = paddle.DataParallel(layer)

    loss_fn = nn.MSELoss()
    adam = opt.Adam(
        learning_rate=0.001, parameters=dp_layer.parameters())

    # 3. run layer
    inputs = paddle.randn([10, 10], 'float32')
    outputs = dp_layer(inputs)
    labels = paddle.randn([10, 1], 'float32')
    loss = loss_fn(outputs, labels)

    if print_result is True:
        print("loss:", loss.numpy())

    loss.backward()

    adam.step()
    adam.clear_grad()

# Usage 1: only pass function.
# If your training method no need any argument, and
# use all visible devices for parallel training.
if __name__ == '__main__':
    dist.spawn(train)

# Usage 2: pass function and arguments.
# If your training method need some arguments, and
# use all visible devices for parallel training.
if __name__ == '__main__':
    dist.spawn(train, args=(True,))

# Usage 3: pass function, arguments and nprocs.
# If your training method need some arguments, and
# only use part of visible devices for parallel training.
# If your machine hold 8 cards {0,1,2,3,4,5,6,7},
# this case will use cards {0,1}
if __name__ == '__main__':
    dist.spawn(train, args=(True,), nprocs=2)

# Usage 4: pass function, arguments, nprocs and gpus.
# If your training method need some arguments, and
# only use part of visible devices for parallel training, 
# you can pass `xpus` to select the XPU cards you want to use. 
# For example, this case will use cards {4,5}  if your machine hold more than 6 cards.
if __name__ == '__main__':
    dist.spawn(train, args=(True,), nprocs=2, xpus='4,5')

The only difference in API interface between xpus and gpus is usage 4.

model benchmarks:

  • BERT
    结果测量方法:取第一个epoch第10-110step的平均
    数据单位:step/s
    卡类型:K200
    单卡batch size=16
卡数/精度 1N1C 1N2C 1N4C 1N8C 1N16C
FP32 1.46x(1.0x) 1.42*2=2.88(1.9452x) 0.86*4=3.44(2.3561x) 0.77*8= 6.16(4.3380x) 0.52*16= 8.32(5.6986x)
  • ResNet
    动态图多进程多卡
    结果测量方法:取第一个epoch第10-110step的平均。
    数据单位:ips
    卡类型:K200
    单卡batch size=32
卡数/精度 1N1C 1N2C 1N4C 1N8C 1N16C
FP32 52.56127 46.21590*2=92.4318(1.7585x) 41.66224*4=166.64896(3.1705x) 36.2901*8=290.32007=(5.5234x) shm error

@paddle-bot-old
Copy link

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

@@ -640,56 +641,80 @@ void Reducer::MarkGroupReady(size_t group_index) {
return;
}

{
std::lock_guard<std::mutex> lock(mutex_);
multi_device_op_count_ = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个放在Reducer构造函数中吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

multi_device_op_count_ -= 1; // lock
cv_.notify_all();
}
// cnt -= 1; // lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

nprocs = core.get_cuda_device_count()
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif device == 'xpu'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


// std::vector<std::unique_ptr<::ThreadPool>> pool_;
// ::ThreadPool comm_pool_;
::ThreadPool multi_device_op_pool_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

名字就改成上面的comm_pool,或者想一个更好的。再加些注释,这个线程用于调度通信allreduce

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just comm_pool_?

// std::vector<std::unique_ptr<::ThreadPool>> pool_;
// ::ThreadPool comm_pool_;
::ThreadPool multi_device_op_pool_;
uint32_t multi_device_op_count_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个也改一下,comm_op_count,或者想个更好的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just comm_op_count_?

for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0;
++next_group_) {
auto &group = groups_[next_group_];
int run_order = next_group_ % nrings_;

// For CUDA or XPU, compute_stream --> comm_stream.
// For CUDA or XPU, compute_stream --event--> comm_stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event也加进去了吗?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有,通信库现在还是阻塞的,加进去也没有效果

cv_.notify_all();
}
});
#else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GPU的WaitCompute丟了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, add it again.

(card_id, ",".join(env_devices_list)))

if core.is_compiled_with_xpu():
args.selected_gpus = options.get('xpus', None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xpu也使用的是args.selected_gpus吗?感觉代码读起来有点乱

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, uniformally define args.selected_devices for multi gpus training or multi xpus training, delete args.selected_gpus.

raise ValueError(
"The number of selected gpus(%s) is not equal to "
"the number of spawn processes(%d), please ensure that the "
"correct `nprocs` and `gpus` arguments are passed." %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gpus -> xpus?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

(len(selected_gpu_list), nprocs))
for card_id in selected_gpu_list:
if card_id not in env_devices_list:
raise ValueError("The selected gpu card %s cannot found in "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gpu -> xpu?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@vslyu vslyu changed the title [Kunlun]multi xpu dygraph performance optimization [Kunlun]Multi xpu dygraph performance optimization , add distributed.spawn support for multi xpu and some bug-fixes Mar 2, 2021
}

comm_pool_.enqueue([&] {
parallel_ctx_->WaitCompute(run_order);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetXPUDevice呢

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, add it.

group_size_limits_(group_size_limits),
find_unused_vars_(find_unused_vars),
comm_pool_(1),
comm_op_count_(0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only wrap the added code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -645,51 +652,84 @@ void Reducer::MarkGroupReady(size_t group_index) {
auto &group = groups_[next_group_];
int run_order = next_group_ % nrings_;

auto place = parallel_ctx_->GetDeviceContext(run_order)->GetPlace();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么需要拿place,place不是已经有了吗?这里涉及gpu和xpu的混合通信?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reducer.h 里已经有 _place了,不用获取了。

}
}

void Reducer::FusedAllReduceSchedule(int run_order, Group group) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

换成常量引用,不然影响性能。

@@ -645,51 +652,84 @@ void Reducer::MarkGroupReady(size_t group_index) {
auto &group = groups_[next_group_];
int run_order = next_group_ % nrings_;

auto place = parallel_ctx_->GetDeviceContext(run_order)->GetPlace();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reducer.h 里已经有 _place了,不用获取了。

@@ -645,51 +652,84 @@ void Reducer::MarkGroupReady(size_t group_index) {
auto &group = groups_[next_group_];
int run_order = next_group_ % nrings_;

auto place = parallel_ctx_->GetDeviceContext(run_order)->GetPlace();

// For CUDA or XPU, compute_stream --> comm_stream.
// For CPU, do nothing.
// NOTE. Because concat uses the comm_stream,
// so we expose WaitCompute() interface and call
// it here.
parallel_ctx_->WaitCompute(run_order);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个后续试一下WaitCompute放到这里,还是放到allreduce里面的性能好

} else {
VLOG(3) << "The sparse group[" << next_group_
<< "] has no var to allreduce";
if (paddle::platform::is_xpu_place(place)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以不用判断place了,直接BKCL宏定义线程调度。其余正常调度

comm_pool_->enqueue([&] {
auto dev_id = BOOST_GET_CONST(platform::XPUPlace, place).device;
platform::SetXPUDeviceId(dev_id);
FusedAllReduceSchedule(run_order, group);
Copy link
Contributor

@wangxicoding wangxicoding Mar 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

线程调度,记个TOTO,后续加上try cache。否则出异常了主线程不知道还一直在跑

Xreki
Xreki previously approved these changes Mar 3, 2021
Copy link
Contributor

@Xreki Xreki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the modification of compiler.py

wangxicoding
wangxicoding previously approved these changes Mar 3, 2021
Copy link
Contributor

@wangxicoding wangxicoding left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@ForFishes ForFishes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangxicoding wangxicoding merged commit 9ebf05b into PaddlePaddle:develop Mar 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants