Skip to content

Commit

Permalink
merge develop
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanRisheng committed Jan 21, 2022
2 parents 5986410 + ab1abd4 commit 9ef36c0
Show file tree
Hide file tree
Showing 619 changed files with 13,361 additions and 6,800 deletions.
8 changes: 1 addition & 7 deletions cmake/inference_lib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,7 @@ copy(inference_lib_dist
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/api/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/pten/common/*.h
${PADDLE_SOURCE_DIR}/paddle/fluid/platform/bfloat16.h
${PADDLE_SOURCE_DIR}/paddle/fluid/platform/complex.h
${PADDLE_SOURCE_DIR}/paddle/fluid/platform/float16.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/)
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/pten/common/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/utils/any.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/utils/)
Expand Down
5 changes: 0 additions & 5 deletions paddle/fluid/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
# Adapt to custom op mechanism: Include the header files related to the data type
# to avoid exposing the path of the underlying file, remove it after moving
# float16.h/complex.h/bfloat16.h into pten
include_directories(${PADDLE_SOURCE_DIR}/paddle/fluid/platform)

add_subdirectory(memory)
add_subdirectory(platform)
add_subdirectory(distributed)
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/distributed/fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ limitations under the License. */

namespace paddle {
namespace framework {
class Tensor;
class Scope;
class SelectedRows;
class Variable;
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/fleet_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ endif()

cc_library(task_loop_thread_pool SRCS task_loop_thread_pool.cc task_loop_thread.cc task_loop.cc DEPS enforce glog)

cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc dist_model.cc
interceptor.cc compute_interceptor.cc amplifier_interceptor.cc message_service.cc message_bus.cc
cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc dist_model.cc interceptor.cc
compute_interceptor.cc amplifier_interceptor.cc message_service.cc message_bus.cc dist_model_tensor_wrapper.cc
DEPS proto_desc fleet_executor_desc_proto interceptor_message_proto task_loop_thread_pool collective_helper
op_registry executor_gc_helper gflags glog ${BRPC_DEPS})

Expand Down
222 changes: 216 additions & 6 deletions paddle/fluid/distributed/fleet_executor/dist_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
#include <glog/logging.h>

#include "paddle/fluid/distributed/fleet_executor/dist_model.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/naive_executor.h"
#include "paddle/fluid/framework/op_proto_maker.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
Expand All @@ -37,24 +41,179 @@ bool IsPersistable(const framework::VarDesc *var) {

bool DistModel::Init() {
/* TODO(fleet exe dev): implement this funct */
place_ = paddle::platform::CUDAPlace(config_.device_id);
if (!PrepareScope()) {
bool init_method = (!config_.model_dir.empty() || config_.program_desc);
PADDLE_ENFORCE_EQ(init_method, true,
platform::errors::InvalidArgument(
"One of model dir or program desc must be provided to "
"dist model inference."));
if (config_.program_desc) {
PADDLE_ENFORCE_NOT_NULL(
config_.scope, platform::errors::InvalidArgument(
"Scope must be provided to dist model inference if "
"program desc has been provided."));
}
if (!PreparePlace()) {
return false;
}
if (!PrepareProgram()) {
if (!config_.program_desc) {
if (config_.scope) {
LOG(WARNING) << "The provided scope will be ignored if model dir has "
"also been provided.";
}
if (!PrepareScope()) {
return false;
}
if (!PrepareProgram()) {
return false;
}
} else {
program_.reset(config_.program_desc);
scope_.reset(config_.scope);
}
if (!PrepareFeedAndFetch()) {
return false;
}
if (!CommInit()) {
return false;
}
if (!PrepareFleetExe()) {
return false;
}
return true;
}

bool DistModel::PreparePlace() {
if (config_.place == "GPU") {
place_ = paddle::platform::CUDAPlace(config_.device_id);
} else if (config_.place == "CPU") {
place_ = paddle::platform::CPUPlace();
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Place must be choosen from GPU or CPU, but got %s.", config_.place));
}
return true;
}

bool DistModel::CommInit() {
// TODO(fleet executor): init the comm
// NOTE (Yuang Liu): The peer endpoints will be obtained with the assumption
// that mp part is always on inner side and pp part is always on outer side.
// TODO(fleet exe dev): The peer endpoints could be configured by users.
PADDLE_ENFORCE_EQ(
config_.pp_degree * config_.mp_degree, config_.nranks,
platform::errors::InvalidArgument(
"The mp_degree multiplies pp_degree is not equal with nranks"));
std::unique_ptr<framework::ProgramDesc> comm_init_program(
new framework::ProgramDesc());
framework::BlockDesc *comm_init_block = comm_init_program->MutableBlock(0);
if (config_.mp_degree > 1) {
PADDLE_ENFORCE_GE(
config_.mp_ring_id, 0,
platform::errors::InvalidArgument(
"mp ring id must be provided for inference under mp."));
VLOG(3) << "Init comm group for mp.";
std::vector<std::string> peer_endpoints;
for (int64_t
idx = (config_.local_rank / config_.mp_degree) * config_.mp_degree,
i = 0;
i < config_.mp_degree; ++idx, ++i) {
if (config_.trainer_endpoints[idx] == config_.current_endpoint) {
continue;
}
peer_endpoints.emplace_back(config_.trainer_endpoints[idx]);
}
// get nranks in a mp group and inner group rank for local rank
int64_t mp_group_nranks = config_.nranks / config_.pp_degree;
int64_t mp_group_rank = config_.local_rank % config_.mp_degree;
InsertCommOp("mp_comm_id", mp_group_nranks, mp_group_rank, peer_endpoints,
comm_init_block, config_.mp_ring_id);
}
if (config_.pp_degree) {
// NOTE: the last pp stage doesn't need init pp comm
VLOG(3) << "Init comm group for pp.";
if (config_.local_rank - config_.mp_degree >= 0) {
PADDLE_ENFORCE_EQ(config_.pp_upstream_ring_id >= 0, true,
platform::errors::InvalidArgument(
"pp upstream ring id must be provided for "
"non-first pp stage if inference under pp."));
// not the first pp stage, has upstream
std::vector<std::string> upstream_peer_endpoints;
upstream_peer_endpoints.emplace_back(
config_.trainer_endpoints[config_.local_rank - config_.mp_degree]);
InsertCommOp("pp_upstream_comm_id", 2, 1, upstream_peer_endpoints,
comm_init_block, config_.pp_upstream_ring_id);
}

if (config_.local_rank + config_.mp_degree < config_.nranks) {
PADDLE_ENFORCE_EQ(config_.pp_downstream_ring_id >= 0, true,
platform::errors::InvalidArgument(
"pp downstream ring id must be provided for "
"non-last pp stage if inference under pp."));
// not the last pp stage, has downstream
std::vector<std::string> downstream_peer_endpoints;
downstream_peer_endpoints.emplace_back(
config_.trainer_endpoints[config_.local_rank + config_.mp_degree]);
InsertCommOp("pp_downstream_comm_id", 2, 0, downstream_peer_endpoints,
comm_init_block, config_.pp_downstream_ring_id);
}
}
framework::NaiveExecutor e(place_);
e.CreateVariables(*comm_init_program, 0, true, scope_.get());
e.Prepare(scope_.get(), *comm_init_program, 0, false);
e.Run();
VLOG(3) << "Comm init successful.";
return true;
}

void DistModel::InsertCommOp(std::string tmp_var_name, int nranks, int rank,
const std::vector<std::string> &peer_endpoints,
framework::BlockDesc *block, int ring_id) {
/*
* tmp_var_name: the var name for var comm_id
* nranks: number of total ranks
* rank: the rank of local rank in the comm group
* peer_endpoints: peer's endpoints
* block: the block where to insert the comm ops
* ring_id: the ring_id to be inited
*/
std::string &endpoint = config_.current_endpoint;
std::stringstream ss;
ss << "Init comm with tmp var: " << tmp_var_name
<< ". The ring id is: " << ring_id << ". The group has: " << nranks
<< " ranks. Current rank in the group is: " << rank
<< ". The endpoint is: " << endpoint << ". Peer endpoints are: ";
for (auto ep : peer_endpoints) {
ss << ep << ", ";
}
VLOG(3) << ss.str();
if (config_.place == "GPU") {
framework::VarDesc *new_var = block->Var(tmp_var_name);
new_var->SetType(framework::proto::VarType::RAW);
new_var->SetPersistable(true);
framework::OpDesc *gen_nccl_id_op = block->AppendOp();
gen_nccl_id_op->SetType("c_gen_nccl_id");
gen_nccl_id_op->SetOutput("Out", {tmp_var_name});
gen_nccl_id_op->SetAttr("rank", rank);
gen_nccl_id_op->SetAttr("endpoint", config_.current_endpoint);
gen_nccl_id_op->SetAttr("other_endpoints", peer_endpoints);
gen_nccl_id_op->SetAttr("ring_id", ring_id);
gen_nccl_id_op->SetAttr("op_role",
static_cast<int>(framework::OpRole::kForward));
gen_nccl_id_op->CheckAttrs();
framework::OpDesc *comm_init_op = block->AppendOp();
comm_init_op->SetType("c_comm_init");
comm_init_op->SetInput("X", {tmp_var_name});
comm_init_op->SetAttr("rank", rank);
comm_init_op->SetAttr("nranks", nranks);
comm_init_op->SetAttr("ring_id", ring_id);
comm_init_op->SetAttr("op_role",
static_cast<int>(framework::OpRole::kForward));
comm_init_op->CheckAttrs();
} else {
LOG(WARNING) << "DistModelInf doesn't init comm.";
// TODO(fleet exe dev): comm init for more devices
}
}

bool DistModel::PrepareScope() {
scope_.reset(new framework::Scope());
return true;
Expand Down Expand Up @@ -119,6 +278,8 @@ bool DistModel::LoadParameters() {
new_var->SetLoDLevel(var->GetLoDLevel());
new_var->SetPersistable(true);
params.push_back(new_var->Name());
// NOTE: if the params are stored in different files, 'load' op should be
// added here
}
}

Expand All @@ -145,8 +306,57 @@ bool DistModel::LoadParameters() {
return true;
}

void DistModel::Run(const std::vector<framework::Tensor> &input_data,
std::vector<framework::Tensor> *output_data) {
bool DistModel::PrepareFleetExe() {
task_node_.reset(new TaskNode(program_.get(), config_.local_rank));
if (config_.local_rank - config_.mp_degree >= 0) {
task_node_->AddUpstreamTask(config_.local_rank - config_.mp_degree);
}
if (config_.local_rank + config_.mp_degree < config_.nranks) {
task_node_->AddDownstreamTask(config_.local_rank + config_.mp_degree);
}
task_node_->SetType("Compute");
task_node_->Init();
executor_desc_ = FleetExecutorDesc();
executor_desc_.set_cur_rank(config_.local_rank);
std::unordered_map<int64_t, int64_t> id_to_rank;
for (int i = 0; i < config_.nranks; ++i) {
RankInfo *rank_info = executor_desc_.add_cluster_info();
rank_info->set_rank(i);
rank_info->set_ip_port(config_.trainer_endpoints[i]);
id_to_rank.insert({i, i});
}
fleet_exe.reset(new FleetExecutor(executor_desc_));
fleet_exe->Init("inference", *(program_.get()), scope_.get(), place_, 1,
{task_node_.get()}, id_to_rank);
return true;
}

bool DistModel::PrepareFeedAndFetch() {
for (auto *op : program_->Block(0).AllOps()) {
if (op->Type() == "feed") {
VLOG(3) << "feed op with feed var: " << op->Output("Out")[0];
int idx = BOOST_GET_CONST(int, op->GetAttr("col"));
if (feeds_.size() <= static_cast<size_t>(idx)) {
feeds_.resize(idx + 1);
}
feeds_[idx] = op;
feed_names_[op->Output("Out")[0]] = idx;
idx_to_feeds_[idx] = op->Output("Out")[0];
} else if (op->Type() == "fetch") {
VLOG(3) << "fetch op with fetch var: " << op->Input("X")[0];
int idx = BOOST_GET_CONST(int, op->GetAttr("col"));
if (fetches_.size() <= static_cast<size_t>(idx)) {
fetches_.resize(idx + 1);
}
fetches_[idx] = op;
id_to_fetches_[idx] = op->Input("X")[0];
}
}
return true;
}

void DistModel::Run(const std::vector<DistModelTensor> &input_data,
std::vector<DistModelTensor> *output_data) {
/* TODO(fleet exe dev): implement this funct */
}

Expand Down
35 changes: 30 additions & 5 deletions paddle/fluid/distributed/fleet_executor/dist_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,48 @@
#include <string>
#include <vector>

#include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/macros.h"
#include "paddle/fluid/platform/place.h"

namespace paddle {

namespace framework {
class ProgramDesc;
class Scope;
class Tensor;
class BlockDesc;
}

namespace distributed {

class TaskNode;
class FleetExecutor;

struct DistModelConfig {
std::string model_dir{};
framework::ProgramDesc* program_desc{nullptr};
framework::Scope* scope{nullptr};
std::string place{};
int64_t device_id{0};
std::vector<std::string> trainer_endpoints{};
std::string current_endpoint{};
int64_t nranks{1};
int64_t local_rank{0};
int64_t device_id{0};
int64_t mp_degree{1};
int64_t pp_degree{1};
int64_t mp_ring_id{-1};
int64_t pp_upstream_ring_id{-1};
int64_t pp_downstream_ring_id{-1};
};

class DistModel {
public:
explicit DistModel(const DistModelConfig& config) : config_(config) {}
bool Init();
void Run(const std::vector<framework::Tensor>& input_data,
std::vector<framework::Tensor>* output_data);
void Run(const std::vector<DistModelTensor>& input_data,
std::vector<DistModelTensor>* output_data);
~DistModel() = default;

private:
Expand All @@ -56,12 +68,25 @@ class DistModel {
bool PrepareProgram();
bool LoadProgram();
bool LoadParameters();
bool PreparePlace();
bool CommInit();
bool PrepareFeedAndFetch();
bool PrepareFleetExe();
void InsertCommOp(std::string tmp_var_name, int nranks, int rank,
const std::vector<std::string>& peer_endpoints,
framework::BlockDesc* block, int ring_id);

std::vector<framework::OpDesc*> feeds_;
std::map<std::string, int64_t> feed_names_;
std::map<int64_t, std::string> idx_to_feeds_;
std::vector<framework::OpDesc*> fetches_;
std::map<int64_t, std::string> id_to_fetches_;
DistModelConfig config_;
FleetExecutorDesc executor_desc_;
platform::Place place_;
std::shared_ptr<FleetExecutor> fleet_exe;
std::shared_ptr<TaskNode> task_node_;
std::shared_ptr<framework::Scope> scope_;
paddle::platform::Place place_;
std::shared_ptr<framework::ProgramDesc> program_;
};

Expand Down
Loading

0 comments on commit 9ef36c0

Please sign in to comment.