Skip to content

Commit

Permalink
ps支持变server数加载
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyang.gxy committed Jul 3, 2019
1 parent ab3b8b3 commit 1dafefc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
4 changes: 2 additions & 2 deletions xdl/ps-plus/ps-plus/plugins/hdfs/libhdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ Status LibHDFS::LoadAndBind() {
const char* hdfs_root = getenv("HADOOP_HDFS_HOME");
if (hdfs_root == nullptr) {
return Status::NotFound("HADOOP_HDFS_HOME is not set");
}
}
const std::string& libhdfs = std::string(hdfs_root) + "/lib/native/libhdfs.so";
void* dl = dlopen(libhdfs.c_str(), RTLD_LAZY);
if (dl == nullptr) {
return Status::NotFound("cannot find $HADOOP_HDFS_HOME/lib/native/libhdfs.so");
return Status::NotFound(dlerror());
}
return LoadSymbols(dl);
}
Expand Down
47 changes: 42 additions & 5 deletions xdl/ps-plus/ps-plus/scheduler/scheduler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,25 @@ Status SchedulerImpl::UpdateVariableInfo(Version version,
unique_lock<mutex> lock(m_);
if (!ready_) { return Status::NotReady("Cluster is not ready"); }
if (version != version_) { return VersionMismatch(version_, version); }
return InternalUpdateVariableInfo(info, result);
}

Status SchedulerImpl::InternalUpdateVariableInfo(const vector<VariableInfo>& info, vector<VariableInfo>* result) {
map<string, VariableInfo> m;
for (const auto& i: info) { m[i.name] = i; }
for (auto& i: variable_info_) {
if (m[i.name].type == VariableInfo::Type::kHash) {
i.shape[0] = m[i.name].shape[0];
}
}
for (const auto& i: variable_info_) { m[i.name] = i; }
vector<VariableInfo> v;
for (const auto& it: m) { v.push_back(it.second); }
const auto& st = placementer_->Placement(v, result, placement_arg_, service_->GetServerSize(0));
if (!st.IsOk()) { return st; }
variable_info_.clear();
for (const auto& i: *result) { variable_info_.push_back(i); }
return Status::Ok();
return Status::Ok();
}

Status SchedulerImpl::UpdateVariableVisitInfo(Version version, const std::string& var_name, int64_t ids) {
Expand Down Expand Up @@ -376,6 +385,7 @@ Status SchedulerImpl::InternalRestore(const string& checkpoint) {
{
std::unique_ptr<FileSystem::ReadStream> s;
Status st = FileSystem::OpenReadStreamAny(checkpoint_path_ + "/checkpoints", &s);
LOG(INFO) << "st is " << st.ToString();
// Ignore st fail when we use a fresh checkpoint dir
if (st.IsOk()) {
size_t size;
Expand Down Expand Up @@ -430,11 +440,20 @@ Status SchedulerImpl::InternalRestore(const string& checkpoint) {
}

if (old_server != service_->GetServerSize(0)) {
//TODO
return Status::NotImplemented("Server Count Change is not supported");
} else {
variable_info_ = infos;
LOG(INFO) << "Change ps_num from " << old_server << " to " << service_->GetServerSize(0);
}
LOG(INFO) << "Info from checkpoint " << PrintVariableInfo(infos);
vector<VariableInfo> result;
vector<VariableInfo> input_infos = infos;
for (auto& i: input_infos) {
i.parts.clear();
}
Status st = InternalUpdateVariableInfo(input_infos, &result);
if (!st.IsOk()) {
return st;
}
variable_info_ = result;
LOG(INFO) << "Info from placement " << PrintVariableInfo(variable_info_);
} else {
infos = variable_info_;
}
Expand All @@ -456,6 +475,8 @@ Status SchedulerImpl::InternalRestore(const string& checkpoint) {
if (!st.IsOk() && collect.IsOk()) {
collect = st;
}
std::string st_str = st.IsOk() ? "OK" : st.Msg();
LOG(INFO) << "server " << id << " finish restore, status " << st_str << " waiting " << count_down - 1 << " more.";
if (--count_down == 0) {
lock.unlock();
result.set_value(collect);
Expand Down Expand Up @@ -907,3 +928,19 @@ void SchedulerImpl::InternalSynchronizeLeave(Version version, int id, int64_t to
}
sync->Leave(id, token, cb);
}

std::string SchedulerImpl::PrintVariableInfo(const std::vector<VariableInfo>& infos) {
std::string logger = "";
for (const auto& item : infos) {
logger += item.name + "<shape[";
for (auto dim : item.shape) {
logger += std::to_string(dim) + ",";
}
logger += "]parts[";
for (auto part : item.parts) {
logger += std::to_string(part.server) + ":" + std::to_string(part.size) + ",";
}
logger += "]>";
}
return logger;
}
2 changes: 2 additions & 0 deletions xdl/ps-plus/ps-plus/scheduler/scheduler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class SchedulerImpl {
void WorkerBarrier(Version version, int id, int worker_count, std::function<void (const Status&)> cb);
ps::Status UpdateVariableVisitInfo(Version version, const std::string& var_name, int64_t ids);
ps::Status WriteMetaInfo();
std::string PrintVariableInfo(const std::vector<VariableInfo>& infos);

private:
std::unique_ptr<std::thread> main_thread_;
Expand Down Expand Up @@ -110,6 +111,7 @@ class SchedulerImpl {
void WaitForServers();

std::vector<ps::VariableInfo> variable_info_;
Status InternalUpdateVariableInfo(const std::vector<VariableInfo>& info, std::vector<VariableInfo>* result);
Status InternalRestore(const std::string& checkpoint);
Status InternalSave(const std::string& checkpoint);
Status InternalTriggerStreamingDense(Version version);
Expand Down

0 comments on commit 1dafefc

Please sign in to comment.