Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #62 from tqchen/master
Browse files Browse the repository at this point in the history
add stream wait to all pushes, change kvstore to redef pinned memory
  • Loading branch information
antinucleon committed Sep 12, 2015
2 parents e0e8ff9 + a28a83a commit 9ef1ec6
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 112 deletions.
4 changes: 3 additions & 1 deletion include/mxnet/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#define MXNET_USE_CUDNN MSHADOW_USE_CUDNN
#endif

/*! \brief Error message for using gpu when MXNET_USE_CUDA==0 */
#define MXNET_GPU_NOT_ENABLED_ERROR "GPU is not enabled"

/*! \brief namespace of mxnet */
namespace mxnet {
/*! \brief mxnet cpu */
Expand All @@ -50,7 +53,6 @@ typedef mshadow::TShape TShape;
typedef mshadow::TBlob TBlob;
} // namespace mxnet


//! \cond Doxygen_Suppress
namespace dmlc {
// Add a few patches to support TShape in dmlc/parameter.
Expand Down
9 changes: 9 additions & 0 deletions include/mxnet/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ struct RunContext {
* \brief the stream of the device, can be NULL or Stream<gpu>* in GPU mode
*/
void *stream;
/*!
* \brief get mshadow stream from Context
* \return the mshadow stream
* \tparam xpu the device type of the stream
*/
template<typename xpu>
inline mshadow::Stream<xpu>* get_stream() const {
return static_cast<mshadow::Stream<xpu>*>(stream);
}
};

/*!
Expand Down
2 changes: 1 addition & 1 deletion include/mxnet/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct OpContext {
*/
template<typename xpu>
inline mshadow::Stream<xpu>* get_stream() const {
return static_cast<mshadow::Stream<xpu>*>(run_ctx.stream);
return run_ctx.get_stream<xpu>();
}
};

Expand Down
14 changes: 7 additions & 7 deletions src/kvstore/kvstore_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ namespace mxnet {
*/
class KVStoreLocal : public KVStore {
public:
KVStoreLocal() : pinned_ctx_(cpu::kDevMask, Context::kPinnedMemoryID) {
KVStoreLocal() {
#if MXNET_USE_CUDA
pinned_ctx_ = Context(cpu::kDevMask, Context::kPinnedMemoryID);
#else
pinned_ctx_ = Context(cpu::kDevMask, 0);
#endif
Clear();
}

Expand All @@ -44,11 +49,7 @@ class KVStoreLocal : public KVStore {
for (size_t i = 0; i < keys.size(); ++i) {
CHECK(local_.find(keys[i]) == local_.end())
<< "duplicate init of key " << keys[i];
#if MXNET_USE_CUDA
local_.insert({keys[i], values[i].Copy(pinned_ctx_)});
#else
local_.insert({keys[i], values[i].Copy(local_ctx_)});
#endif // MXNET_USE_CUDA
}
}

Expand Down Expand Up @@ -121,7 +122,7 @@ class KVStoreLocal : public KVStore {
CHECK(val.size());
auto& buf = merge_buf_[key];
if (buf.merged.is_none()) {
buf.merged = val[0].Copy(local_ctx_);
buf.merged = val[0].Copy(pinned_ctx_);
} else {
CopyFromTo(val[0], &buf.merged);
}
Expand Down Expand Up @@ -167,7 +168,6 @@ class KVStoreLocal : public KVStore {
/// \brief local storage
std::unordered_map<int, NArray> local_;

Context local_ctx_;
Context pinned_ctx_;

Updater updater_;
Expand Down
187 changes: 88 additions & 99 deletions src/narray/narray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <dmlc/logging.h>
#include <dmlc/registry.h>
#include <mxnet/base.h>
#include <mxnet/narray.h>
#include <mshadow/tensor.h>
#include "./narray_function.h"
Expand Down Expand Up @@ -42,45 +43,34 @@ inline void BinaryOp(const NArray &lhs,
}
// important: callback must always capture by value
NArray ret = *out;
// get the const variables
std::vector<Engine::VarHandle> const_vars;
if (lhs.ptr_->var != ret.ptr_->var) const_vars.push_back(lhs.ptr_->var);
if (rhs.ptr_->var != ret.ptr_->var) const_vars.push_back(rhs.ptr_->var);

// redirect everything to mshadow operations
switch (lhs.ctx().dev_mask) {
case cpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var && rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {rhs.ptr_->var}, {ret.ptr_->var});
} else if (rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var, rhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#if MXNET_USE_CUDA
case gpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var && rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {rhs.ptr_->var}, {ret.ptr_->var});
} else if (rhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var, rhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP>(lhs.data(), rhs.data(), &tmp, ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#endif
default: LOG(FATAL) << "GPU is not enabled";
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
}
}

Expand All @@ -90,26 +80,26 @@ inline void SetValueOp(const real_t &rhs, NArray *out) {
NArray ret = *out;
switch (ret.ctx().dev_mask) {
case cpu::kDevMask: {
auto func = [rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu>(rhs, &tmp, ctx);
};
Engine::Get()->Push(func, ret.ctx(), {}, {ret.ptr_->var});
Engine::Get()->Push([rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu>(rhs, &tmp, ctx);
}, ret.ctx(), {}, {ret.ptr_->var});
break;
}
#if MXNET_USE_CUDA
case gpu::kDevMask: {
auto func = [rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu>(rhs, &tmp, ctx);
};
Engine::Get()->Push(func, ret.ctx(), {}, {ret.ptr_->var});
Engine::Get()->Push([rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu>(rhs, &tmp, ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, ret.ctx(), {}, {ret.ptr_->var});
break;
}
#endif
default: LOG(FATAL) << "GPU is not enabled";
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
}
}
/*!
Expand All @@ -124,45 +114,40 @@ inline void ScalarOp(const NArray &lhs,
const real_t &rhs,
NArray *out) {
if (out->is_none()) {
*out = NArray(OP::GetShape(lhs.shape(), lhs.shape()), lhs.ctx(), true);
*out = NArray(lhs.shape(), lhs.ctx(), true);
} else {
CHECK(out->ctx() == lhs.ctx()) << "target context mismatch";
CHECK(out->shape() == OP::GetShape(lhs.shape(), lhs.shape()))
<< "target shape mismatch";
CHECK(out->shape() == lhs.shape()) << "target shape mismatch";
}
// important: callback must always capture by value
NArray ret = *out;
// get the const variables
std::vector<Engine::VarHandle> const_vars;
if (lhs.ptr_->var != ret.ptr_->var) const_vars.push_back(lhs.ptr_->var);

// redirect everything to mshadow operations
switch (lhs.ctx().dev_mask) {
case cpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<cpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#if MXNET_USE_CUDA
case gpu::kDevMask: {
auto func = [lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
};
if (lhs.ptr_->var == ret.ptr_->var) {
Engine::Get()->Push(func, lhs.ctx(), {}, {ret.ptr_->var});
} else {
Engine::Get()->Push(func, lhs.ctx(), {lhs.ptr_->var}, {ret.ptr_->var});
}
Engine::Get()->Push([lhs, rhs, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Eval<gpu, OP, reverse>(lhs.data(), rhs, &tmp, ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, lhs.ctx(), const_vars, {ret.ptr_->var});
break;
}
#endif
default: LOG(FATAL) << "GPU is not enabled";
default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
}
}

Expand All @@ -175,48 +160,52 @@ void CopyFromTo(const NArray &from, NArray *to) {
NArray ret = *to;
int a = from.ctx().dev_mask;
int b = to->ctx().dev_mask;

std::vector<Engine::VarHandle> const_vars;
if (from.ptr_->var != ret.ptr_->var) const_vars.push_back(from.ptr_->var);

if (a == cpu::kDevMask && b == cpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<cpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, from.ctx(), {from.ptr_->var}, {ret.ptr_->var});
} else if (a == cpu::kDevMask && b == gpu::kDevMask) {
#if MXNET_USE_CUDA
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<cpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, ret.ctx(), {from.ptr_->var}, {ret.ptr_->var});
#else
LOG(FATAL) << "GPU is not enabled";
#endif
} else if (a == gpu::kDevMask && b == cpu::kDevMask) {
#if MXNET_USE_CUDA
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, from.ctx(), {from.ptr_->var}, {ret.ptr_->var});
#else
LOG(FATAL) << "GPU is not enabled";
#endif
} else if (a == gpu::kDevMask && b == gpu::kDevMask) {
}, from.ctx(), const_vars, {ret.ptr_->var});
} else {
#if MXNET_USE_CUDA
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
}, from.ctx(), {from.ptr_->var}, {ret.ptr_->var});
if (a == cpu::kDevMask && b == gpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<cpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, ret.ctx(), const_vars, {ret.ptr_->var});
} else if (a == gpu::kDevMask && b == cpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, cpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, from.ctx(), const_vars, {ret.ptr_->var});
} else if (a == gpu::kDevMask && b == gpu::kDevMask) {
Engine::Get()->Push([from, ret](RunContext ctx) {
ret.ptr_->CheckAndAlloc();
TBlob tmp = ret.data();
narray::Copy<gpu, gpu>(from.data(), &tmp,
from.ctx(), ret.ctx(), ctx);
// Wait GPU kernel to complete
ctx.get_stream<gpu>()->Wait();
}, from.ctx(), const_vars, {ret.ptr_->var});
} else {
LOG(FATAL) << "unknown device mask";
}
#else
LOG(FATAL) << "GPU is not enabled";
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
LOG(FATAL) << "unknown device mask";
}
}

Expand Down
19 changes: 17 additions & 2 deletions src/symbol/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ inline std::vector<std::pair<T, T> > GraphExecutor::GetInplaceOption(
inline GraphExecutor::OpExecEntry
GraphExecutor::GetOpExecEntry(uint32_t nid) {
OpNode& op_node = op_nodes_[nid];
Operator *op = op_node.op.get();
std::vector<OpReqType> req;
std::vector<TBlob> in_data, out_data, aux_states;
in_data.reserve(graph_.nodes[nid].inputs.size());
Expand Down Expand Up @@ -199,14 +198,30 @@ GraphExecutor::GetOpExecEntry(uint32_t nid) {
}
}

// start setup exec function.
Operator* op = op_node.op.get();
OpContext* op_ctx_ptr = &op_node.op_ctx;
exec.exec_fun = [op, op_ctx_ptr, in_data, req, out_data, aux_states] (RunContext ctx) {
bool is_gpu = op_node.ctx.dev_mask == gpu::kDevMask;
exec.exec_fun = [op, is_gpu, op_ctx_ptr, in_data, req, out_data, aux_states] (RunContext ctx) {
op_ctx_ptr->run_ctx = ctx;
op->Forward(*op_ctx_ptr, in_data, req, out_data, aux_states);
if (is_gpu) {
#if MXNET_USE_CUDA
// Wait GPU kernel to finish.
ctx.get_stream<gpu>()->Wait();
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
}
};
return exec;
}

GraphExecutor::~GraphExecutor() {
// need to destruct after all previously issued operations are finished.
Engine::Get()->WaitForAll();
}

void GraphExecutor::InitGraph(Symbol symbol, Context ctx, bool need_backward) {
// initialize all internal data structures
symbol.ToStaticGraph(&graph_);
Expand Down
2 changes: 1 addition & 1 deletion src/symbol/graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace mxnet {
*/
class GraphExecutor : public Executor {
public:
virtual ~GraphExecutor() {}
virtual ~GraphExecutor();
virtual void Forward(bool is_train);
virtual void Backward(const std::vector<NArray> &head_grads);
virtual const std::vector<NArray> &heads() const {
Expand Down
Loading

0 comments on commit 9ef1ec6

Please sign in to comment.