Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update #19

Merged
merged 25 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c661783
fix decoding error when clip grad op and python==2 (#33937)
jiweibo Jul 5, 2021
1cfa105
Update paddle_build.bat to remove op grad when build windows inferenc…
OliverLPH Jul 5, 2021
70100e4
Enhance error message when x or y is empty in elementwise_op (#33928)
zhiqiu Jul 5, 2021
2ef6188
【HeterPS】fix hdfs and fleet_util for supporting save/load/infer (#33903)
danleifeng Jul 5, 2021
0b91133
[HybridParallel] Add amp support for pipeline_parallel (#33951)
ForFishes Jul 5, 2021
9254183
Refine the dygraph ptq and the module of calculating KL threshold (#3…
juncaipeng Jul 5, 2021
bd559a2
fix bug of sync_parameters (#33955)
ForFishes Jul 5, 2021
75d247b
optimize grad add device (#33946)
wangxicoding Jul 5, 2021
a84e48b
[NPU] add abs and uniform_random op and npu dockerfile, test=develop …
qili93 Jul 5, 2021
fa5ddfd
[NPU] change Add to AddN in sum npu op (#33957)
pangyoki Jul 5, 2021
eae3185
Add fused elemwise gelu and optimize performance (#33480)
wangxicoding Jul 5, 2021
3629bf4
replace spatial with per_activation mode for bn op to improve perf (#…
limin2021 Jul 5, 2021
43876e8
make stop_gradient=True for random op in static graph (#33959)
zhiqiu Jul 5, 2021
9914dff
[hybrid performance] optimize pipeline performance
wangxicoding Jul 5, 2021
aa9fdd0
add `reduce_sum` op into amp black list (#33960)
thisjiang Jul 5, 2021
740f4e3
[Dy2Stat]Fix unique_name in create_static_variable_gast_node (#33963)
Aurelius84 Jul 5, 2021
72af57b
[pass_enhance] : seq_concat_fc_fuse_pass (#33961)
Wangzheee Jul 5, 2021
70ecf3b
correct define (#33966)
b3602sss Jul 5, 2021
7a47660
Reduce build time by deleting the template param BlockDim (#33901)
ZzSean Jul 5, 2021
389f8c5
[OP] fix histogram op when input tensor is empty, test=develop (#33970)
qili93 Jul 6, 2021
69ffb38
Optimize the forward of log_softmax for the case when axis is not the…
AshburnLee Jul 6, 2021
bfef7fe
【HETERPS】pipeline adaptive for heterps (#33159)
danleifeng Jul 6, 2021
f2068ee
Enhance error message for interpolate_v2 (#33941)
tink2123 Jul 6, 2021
ae74c40
[pass_enhance] embedding_eltwise_layernorm_fuse_pass (#33973)
Wangzheee Jul 6, 2021
dd33d28
[pass_enhance] conv_elementwise_add_mkldnn_fuse_pass (#33931)
Wangzheee Jul 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions paddle/fluid/framework/device_worker_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ REGISTER_DEVICE_WORKER_CLASS(DownpourWorkerOpt);
REGISTER_DEVICE_WORKER_CLASS(HeterCpuWorker);
#endif

#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
REGISTER_DEVICE_WORKER_CLASS(HeterBoxWorker);
#endif

#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker);
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ IF(WITH_GPU)
SET(HETERPS_DEPS ${HETERPS_DEPS} ${RPC_DEPS})
endif()
nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS ${HETERPS_DEPS})
nv_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm)
nv_test(test_heter_comm SRCS feature_value.h DEPS heter_comm)
nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm)
ENDIF()
IF(WITH_ROCM)
hip_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context)
hip_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm)
hip_test(test_heter_comm SRCS feature_value.h DEPS heter_comm)
hip_library(heter_ps SRCS heter_ps.cu DEPS heter_comm)
ENDIF()
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ x.second );
unsigned long long get_num_collisions() const { return m_collisions; }

void print() {
for (size_type i = 0; i < 10; ++i) {
for (size_type i = 0; i < 5; ++i) {
std::cout << i << ": " << m_hashtbl_values[i].first << ","
<< m_hashtbl_values[i].second << std::endl;
}
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void HeterComm<KeyType, ValType, GradType>::init_path() {
path_.resize(total_gpu);

if (!topo_aware_) {
VLOG(1) << "init path without topo aware";
VLOG(3) << "init path without topo aware";
for (int i = 0; i < total_gpu; ++i) {
path_[i].resize(total_gpu);
for (int j = 0; j < total_gpu; ++j) {
Expand All @@ -130,7 +130,7 @@ void HeterComm<KeyType, ValType, GradType>::init_path() {
}
}
} else {
VLOG(1) << "init path with topo aware";
VLOG(3) << "init path with topo aware";
for (int i = 0; i < total_gpu; ++i) {
path_[i].resize(total_gpu);
for (int j = 0; j < total_gpu; ++j) {
Expand Down
133 changes: 117 additions & 16 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ namespace framework {
std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL;
bool PSGPUWrapper::is_initialized_ = false;

void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
uint64_t table_id, int feature_dim) {
void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task) {
VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin";
platform::Timer timeline;
timeline.Start();
Expand All @@ -68,8 +67,6 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
thread_keys_.resize(thread_keys_thread_num_);
for (int i = 0; i < thread_keys_thread_num_; i++) {
thread_keys_[i].resize(thread_keys_shard_num_);
for (int j = 0; j < thread_keys_shard_num_; j++) {
}
}
const std::deque<Record>& vec_data = input_channel->GetData();
size_t total_len = vec_data.size();
Expand Down Expand Up @@ -139,17 +136,16 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
local_ptr[i].resize(local_keys[i].size());
}
timeline.Start();
auto ptl_func = [this, &local_keys, &local_ptr, &table_id,
&fleet_ptr](int i) {
auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) {
size_t key_size = local_keys[i].size();
#ifdef PADDLE_WITH_PSLIB
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
#endif
#ifdef PADDLE_WITH_PSCORE
auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
#endif
tt.wait();
Expand Down Expand Up @@ -255,7 +251,7 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
}
}
#endif
VLOG(1) << "GpuPs build hbmps done";
VLOG(3) << "GpuPs build hbmps done";

device_mutex[dev]->unlock();
}
Expand All @@ -272,11 +268,8 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
<< " seconds.";
}

void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
int device_num = heter_devices_.size();
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();
BuildTask(gpu_task, table_id, feature_dim);
platform::Timer timeline;
timeline.Start();

Expand All @@ -291,15 +284,21 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
delete HeterPs_;
HeterPs_ = nullptr;
}
if (size_max <= 0) {
VLOG(1) << "Skip build gpu ps cause feasign nums = " << size_max;
return;
}
std::vector<std::thread> threads(device_num);
HeterPs_ = HeterPsBase::get_instance(size_max, resource_);
HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_);
auto build_func = [this, &gpu_task, &feature_keys_count](int i) {
std::cout << "building table: " << i << std::endl;
VLOG(3) << "building table: " << i;
this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(),
gpu_task->device_values_[i].data(),
feature_keys_count[i], 500000, 2);
HeterPs_->show_one_table(i);
if (feature_keys_count[i] > 0) {
HeterPs_->show_one_table(i);
}
};
for (size_t i = 0; i < threads.size(); i++) {
threads[i] = std::thread(build_func, i);
Expand All @@ -310,7 +309,109 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
timeline.Pause();
VLOG(1) << "GpuPs build table total costs: " << timeline.ElapsedSec()
<< " s.";
gpu_task_pool_.Push(gpu_task);
}

void PSGPUWrapper::LoadIntoMemory(bool is_shuffle) {
platform::Timer timer;
VLOG(3) << "Begin LoadIntoMemory(), dataset[" << dataset_ << "]";
timer.Start();
dataset_->LoadIntoMemory();
timer.Pause();
VLOG(0) << "LoadIntoMemory cost: " << timer.ElapsedSec() << "s";

// local shuffle
if (is_shuffle) {
dataset_->LocalShuffle();
}

std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();
data_ready_channel_->Put(gpu_task);
VLOG(3) << "End LoadIntoMemory(), dataset[" << dataset_ << "]";
}

void PSGPUWrapper::start_build_thread() {
running_ = true;
VLOG(3) << "start build CPU&GPU ps thread.";
build_cpu_threads_ = std::thread([this] { build_cpu_thread(); });
build_gpu_threads_ = std::thread([this] { build_gpu_thread(); });
}

void PSGPUWrapper::build_cpu_thread() {
while (running_) {
std::shared_ptr<HeterContext> gpu_task = nullptr;
if (!data_ready_channel_->Get(gpu_task)) {
continue;
}
VLOG(3) << "thread BuildTask start.";
platform::Timer timer;
timer.Start();
// build cpu ps data process
BuildTask(gpu_task);
timer.Pause();
VLOG(1) << "thread BuildTask end, cost time: " << timer.ElapsedSec() << "s";
buildcpu_ready_channel_->Put(gpu_task);
}
VLOG(3) << "build cpu thread end";
}

void PSGPUWrapper::build_gpu_thread() {
while (running_) {
std::shared_ptr<HeterContext> gpu_task = nullptr;
if (!gpu_free_channel_->Get(gpu_task)) {
continue;
}
if (!buildcpu_ready_channel_->Get(gpu_task)) {
continue;
}
VLOG(3) << "thread BuildGPUTask start.";
platform::Timer timer;
timer.Start();
BuildGPUTask(gpu_task);
timer.Pause();
VLOG(1) << "thread BuildGPUTask end, cost time: " << timer.ElapsedSec()
<< "s";

gpu_task_pool_.Push(gpu_task);
train_ready_channel_->Put(gpu_task);
}
VLOG(3) << "build gpu thread end";
}

void PSGPUWrapper::BeginPass() {
platform::Timer timer;
timer.Start();
if (current_task_) {
PADDLE_THROW(
platform::errors::Fatal("[BeginPass] current task is not ended."));
}
// load+build done
if (!train_ready_channel_->Get(current_task_)) {
PADDLE_THROW(platform::errors::Fatal("train_ready_channel_ failed."));
}
timer.Pause();
VLOG(1) << "BeginPass end, cost time: " << timer.ElapsedSec() << "s";
}

void PSGPUWrapper::EndPass() {
if (!current_task_) {
PADDLE_THROW(
platform::errors::Fatal("[EndPass] current task has been ended."));
}
platform::Timer timer;
timer.Start();
size_t keysize_max = 0;
// in case of feasign_num = 0, skip dump_to_cpu
for (size_t i = 0; i < heter_devices_.size(); i++) {
keysize_max = std::max(keysize_max, current_task_->device_keys_[i].size());
}
if (keysize_max != 0) {
HeterPs_->end_pass();
}
current_task_ = nullptr;
gpu_free_channel_->Put(current_task_);
timer.Pause();
VLOG(1) << "EndPass end, cost time: " << timer.ElapsedSec() << "s";
}

void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
Expand Down
80 changes: 67 additions & 13 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,33 @@ class PSGPUWrapper {
const int hidden_size, const int64_t total_length,
const int batch_size);

void BuildGPUPS(const uint64_t table_id, int feature_dim);
void BuildTask(std::shared_ptr<HeterContext> gpu_task, uint64_t table_id,
int feature_dim);
void BuildGPUTask(std::shared_ptr<HeterContext> gpu_task);
void BuildTask(std::shared_ptr<HeterContext> gpu_task);
void LoadIntoMemory(bool is_shuffle);
void BeginPass();
void EndPass();
void start_build_thread();
void build_cpu_thread();
void build_gpu_thread();

void Finalize() {
VLOG(3) << "PSGPUWrapper Begin Finalize.";
if (s_instance_ == nullptr) {
return;
}
data_ready_channel_->Close();
buildcpu_ready_channel_->Close();
gpu_free_channel_->Close();
train_ready_channel_->Close();
running_ = false;
VLOG(3) << "begin stop build_cpu_threads_";
build_cpu_threads_.join();
VLOG(3) << "begin stop build_gpu_threads_";
build_gpu_threads_.join();
s_instance_ = nullptr;
VLOG(3) << "PSGPUWrapper Finalize Finished.";
}

void InitializeGPU(const std::vector<int>& dev_ids) {
if (s_instance_ != NULL && is_initialized_ == false) {
VLOG(3) << "PSGPUWrapper Begin InitializeGPU";
Expand Down Expand Up @@ -129,6 +153,24 @@ class PSGPUWrapper {
#endif
}
heter_devices_ = dev_ids;
data_ready_channel_->Open();
data_ready_channel_->SetCapacity(3);
buildcpu_ready_channel_->Open();
buildcpu_ready_channel_->SetCapacity(3);
gpu_free_channel_->Open();
gpu_free_channel_->SetCapacity(1);
train_ready_channel_->Open();
train_ready_channel_->SetCapacity(1);

current_task_ = nullptr;
gpu_free_channel_->Put(current_task_);

table_id_ = 1;
#ifdef PADDLE_WITH_PSLIB
table_id_ = 0;
#endif
// start build cpu&gpu ps thread
start_build_thread();
}
}

Expand Down Expand Up @@ -206,18 +248,8 @@ class PSGPUWrapper {
slot_vector_ = slot_vector;
}

void EndPass() { HeterPs_->end_pass(); }
void ShowOneTable(int index) { HeterPs_->show_one_table(index); }

void Finalize() {
VLOG(3) << "PSGPUWrapper Begin Finalize.";
if (s_instance_ == nullptr) {
return;
}
s_instance_ = nullptr;
VLOG(3) << "PSGPUWrapper Finalize Finished.";
}

private:
static std::shared_ptr<PSGPUWrapper> s_instance_;
Dataset* dataset_;
Expand All @@ -231,6 +263,7 @@ class PSGPUWrapper {
std::vector<int> slot_vector_;
int multi_node_{0};
int node_size_;
uint64_t table_id_;
std::vector<ncclComm_t> inner_comms_;
std::vector<ncclComm_t> inter_comms_;
std::vector<ncclUniqueId> inter_ncclids_;
Expand All @@ -242,6 +275,27 @@ class PSGPUWrapper {
int thread_keys_shard_num_ = 37;
uint64_t max_fea_num_per_pass_ = 5000000000;

std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
data_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
buildcpu_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
gpu_free_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
train_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<HeterContext> current_task_ = nullptr;
std::thread build_cpu_threads_;
std::thread build_gpu_threads_;
bool running_ = false;

protected:
static bool is_initialized_;
};
Expand Down
Loading