Skip to content

Commit

Permalink
Construct dist tensor (#54425)
Browse files Browse the repository at this point in the history
* construct dist tensor

* move constructor to header
  • Loading branch information
LiYuRio authored Jun 13, 2023
1 parent 38f38a9 commit e32c437
Show file tree
Hide file tree
Showing 18 changed files with 637 additions and 18 deletions.
9 changes: 7 additions & 2 deletions paddle/fluid/pybind/auto_parallel_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ using phi::distributed::auto_parallel::Machine;
using phi::distributed::auto_parallel::ProcessMesh;
using phi::distributed::auto_parallel::TensorDistAttr;

PyTypeObject *g_tensor_dist_attr_pytype = nullptr;

static inline const ProcessMesh *get_tensor_process_mesh(
const TensorDistAttr &self) {
if (self.process_mesh().empty()) {
Expand Down Expand Up @@ -225,8 +227,11 @@ void BindAutoParallel(py::module *m) {
py::arg("memo"))
.def("__str__", &DeviceMesh::to_string);

py::class_<TensorDistAttr>(*m, "TensorDistAttr")
.def(py::init<>())
py::class_<TensorDistAttr, std::shared_ptr<TensorDistAttr>> py_dist_attr(
*m, "TensorDistAttr");
g_tensor_dist_attr_pytype =
reinterpret_cast<PyTypeObject *>(py_dist_attr.ptr());
py_dist_attr.def(py::init<>())
.def(py::init([](const VarDesc &var_desc) {
auto shape =
paddle::distributed::auto_parallel::get_tensor_shape(&var_desc);
Expand Down
210 changes: 196 additions & 14 deletions paddle/fluid/pybind/eager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ limitations under the License. */
#include "paddle/fluid/pybind/exception.h"
#include "paddle/fluid/pybind/tensor_py.h"
#include "paddle/phi/core/string_tensor.h"

#ifdef PADDLE_WITH_DISTRIBUTE
#include "paddle/phi/core/distributed/auto_parallel/dist_attr.h"
#include "paddle/phi/core/distributed/auto_parallel/dist_tensor.h"
using phi::distributed::auto_parallel::DistTensor;
using phi::distributed::auto_parallel::TensorDistAttr;
#endif

namespace paddle {
namespace pybind {

Expand All @@ -60,6 +68,52 @@ PyObject* TensorNew(PyTypeObject* type, PyObject* args, PyObject* kwargs) {
return obj;
}

#ifdef PADDLE_WITH_DISTRIBUTE
void EmptyDistTensorInitializer(
TensorObject* self,
const std::string& name,
const paddle::platform::Place& place,
const std::shared_ptr<TensorDistAttr>& dist_attr,
bool persistable = false,
int stop_gradient = -1,
framework::proto::VarType::Type dtype =
paddle::framework::proto::VarType::FP32,
const std::vector<int>& dims = {0}) {
auto ddims = phi::make_ddim(dims);
self->tensor.set_name(name);
auto autograd_meta = egr::EagerUtils::autograd_meta(&(self->tensor));
autograd_meta->SetPersistable(persistable);
if (stop_gradient != -1) {
autograd_meta->SetStopGradient(static_cast<bool>(stop_gradient));
}

std::shared_ptr<DistTensor> dist_tensor = nullptr;
if (dims.size() == 1 && dims[0] == 0) {
std::shared_ptr<phi::Allocation> allocation_ptr = nullptr;
dist_tensor = std::make_shared<DistTensor>(
allocation_ptr,
phi::DenseTensorMeta(paddle::framework::TransToPhiDataType(dtype),
ddims),
dist_attr);
} else {
dist_tensor = std::make_shared<DistTensor>(
std::make_shared<phi::Allocation>(),
phi::DenseTensorMeta(paddle::framework::TransToPhiDataType(dtype),
ddims),
dist_attr);
}
self->tensor.set_impl(dist_tensor);

if (!autograd_meta->GetMutableGradNode()) {
autograd_meta->SetGradNode(
std::make_shared<egr::GradNodeAccumulation>(autograd_meta));
VLOG(3) << "Tensor(" << name
<< ") have not GradNode, add GradNodeAccumulation"
<< autograd_meta->GradNode() << " for it.";
}
}
#endif

// TODO(jiabin): Overload this once we need more constructor in Python
void EmptyTensorInitializer(TensorObject* self,
const std::string& name,
Expand All @@ -82,6 +136,7 @@ void EmptyTensorInitializer(TensorObject* self,
// TODO(jiabin): Maybe support LOD later
std::shared_ptr<phi::DenseTensor> dense_tensor = nullptr;
if (dims.size() == 1 && dims[0] == 0) {
VLOG(0) << "Create dense tensor with dims[0] equal to 0";
std::shared_ptr<phi::Allocation> allocation_ptr = nullptr;
dense_tensor = std::make_shared<phi::DenseTensor>(
allocation_ptr,
Expand Down Expand Up @@ -129,6 +184,48 @@ void EmptyStringTensorInitializer(TensorObject* self,
self->tensor.set_impl(string_tensor);
}

#ifdef PADDLE_WITH_DISTRIBUTE
void InitDistTensorWithNumpyValue(TensorObject* self,
const py::object& array,
const paddle::platform::Place& place,
bool zero_copy = false) {
PADDLE_ENFORCE_EQ(
self->tensor.defined(),
true,
paddle::platform::errors::Fatal(
"Calling InitDistTensorWithNumpyValue of Eager Tensor without "
"EmptyDistTensorInitializer is "
"forbidden. Please check your code and make sure you new a "
"eager tensor before init it with NumPy."));
DistTensor* dist_tensor_ptr =
static_cast<DistTensor*>(self->tensor.impl().get());
phi::DenseTensor* impl_ptr =
static_cast<phi::DenseTensor*>(dist_tensor_ptr->mutable_value());

if (platform::is_cpu_place(place)) {
SetTensorFromPyArray<platform::CPUPlace>(impl_ptr, array, place, zero_copy);
} else if (platform::is_xpu_place(place)) {
SetTensorFromPyArray<platform::XPUPlace>(impl_ptr, array, place, zero_copy);
} else if (platform::is_gpu_place(place)) {
SetTensorFromPyArray<platform::CUDAPlace>(
impl_ptr, array, place, zero_copy);
} else if (platform::is_cuda_pinned_place(place)) {
SetTensorFromPyArray<platform::CUDAPinnedPlace>(
impl_ptr, array, place, zero_copy);
} else if (platform::is_custom_place(place)) {
SetTensorFromPyArray<platform::CustomPlace>(
impl_ptr, array, place, zero_copy);
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Place should be one of "
"CPUPlace/XPUPlace/CUDAPlace/CUDAPinnedPlace/CustomPlace"));
}

// TODO(dev): dist_tensor meta is not equal to dense tensor meta
dist_tensor_ptr->set_meta(impl_ptr->meta());
}
#endif

void InitTensorWithNumpyValue(TensorObject* self,
const py::object& array,
const paddle::platform::Place& place,
Expand All @@ -143,6 +240,7 @@ void InitTensorWithNumpyValue(TensorObject* self,
"eager tensor before init it with NumPy."));
phi::DenseTensor* impl_ptr =
static_cast<phi::DenseTensor*>(self->tensor.impl().get());

if (platform::is_cpu_place(place)) {
SetTensorFromPyArray<platform::CPUPlace>(impl_ptr, array, place, zero_copy);
} else if (platform::is_xpu_place(place)) {
Expand Down Expand Up @@ -186,6 +284,39 @@ void InitStringTensorWithNumpyValue(TensorObject* self, const py::object& obj) {
}
}

#ifdef PADDLE_WITH_DISTRIBUTE
void InitDistTensorWithTensor(
TensorObject* self,
const paddle::Tensor& src,
const paddle::platform::Place& place,
const std::string& name,
const std::shared_ptr<TensorDistAttr>& dist_attr) {
PADDLE_ENFORCE(src.is_dense_tensor(),
paddle::platform::errors::InvalidArgument(
"DistTensor can only initialize by DenseTensor"));
self->tensor.set_name(name);
if (place == src.place()) {
std::shared_ptr<phi::DenseTensor> tensor =
std::static_pointer_cast<phi::DenseTensor>(src.impl());
self->tensor.set_impl(std::make_shared<DistTensor>(tensor, dist_attr));
VLOG(4) << "Same place, do ShareDataWith";
} else {
std::shared_ptr<phi::DenseTensor> tensor =
std::static_pointer_cast<phi::DenseTensor>(
src.copy_to(place, true).impl());
self->tensor.set_impl(std::make_shared<DistTensor>(tensor, dist_attr));
VLOG(4) << "Different place, do TensorCopy";
}
if (src.get_autograd_meta()) {
egr::EagerUtils::autograd_meta(&(self->tensor))
->SetPersistable(
egr::EagerUtils::unsafe_autograd_meta(src)->Persistable());
} else {
egr::EagerUtils::autograd_meta(&(self->tensor))->SetPersistable(false);
}
}
#endif

void InitTensorWithTensor(TensorObject* self,
const paddle::Tensor& src,
const paddle::platform::Place& place,
Expand Down Expand Up @@ -283,6 +414,25 @@ paddle::platform::Place ParsePlace(
return place;
}

#ifdef PADDLE_WITH_DISTRIBUTE
std::shared_ptr<TensorDistAttr> ParseDistAttrArgs(
std::unordered_map<std::string, PyObject*> kws_map,
std::unordered_map<std::string, Py_ssize_t> kw_order_map,
PyObject* args,
bool flag_kwargs,
Py_ssize_t args_num) {
std::shared_ptr<TensorDistAttr> dist_attr = nullptr;
if (kw_order_map["dist_attr"] <= args_num) {
dist_attr = CastPyArg2DistAttr(
PyTuple_GET_ITEM(args, kw_order_map["dist_attr"] - 1),
kw_order_map["dist_attr"] - 1);
} else if (flag_kwargs && kws_map["dist_attr"] != NULL) {
dist_attr = CastPyArg2DistAttr(kws_map["dist_attr"], 0);
}
return dist_attr;
}
#endif

// boolean arguments: zero_copy, stop_gradient, persistable
int ParseBooleanArgs(std::string key,
std::unordered_map<std::string, PyObject*> kws_map,
Expand Down Expand Up @@ -347,13 +497,13 @@ void AutoInitTensorByPyArray(TensorObject* py_tensor_ptr,
// kw_order_map's value is the position of the arguments respectively.
// If u want to update this constructor with new arguments,
// need to update this map and to add or change related code.
std::unordered_map<std::string, Py_ssize_t> kw_order_map{
{"value", 1},
{"place", 2},
{"persistable", 3},
{"zero_copy", 4},
{"name", 5},
{"stop_gradient", 6}};
std::unordered_map<std::string, Py_ssize_t> kw_order_map{{"value", 1},
{"place", 2},
{"persistable", 3},
{"zero_copy", 4},
{"name", 5},
{"stop_gradient", 6},
{"dist_attr", 7}};

py::object numpy_value = py::object();
paddle::platform::Place place =
Expand All @@ -378,6 +528,18 @@ void AutoInitTensorByPyArray(TensorObject* py_tensor_ptr,
stop_gradient = ParseBooleanArgs(
"stop_gradient", kws_map, kw_order_map, args, flag_kwargs, args_num);

#ifdef PADDLE_WITH_DISTRIBUTE
std::shared_ptr<TensorDistAttr> dist_attr =
ParseDistAttrArgs(kws_map, kw_order_map, args, flag_kwargs, args_num);

if (dist_attr) {
EmptyDistTensorInitializer(
py_tensor_ptr, act_name, place, dist_attr, persistable, stop_gradient);
InitDistTensorWithNumpyValue(py_tensor_ptr, numpy_value, place, zero_copy);
return;
}
#endif

EmptyTensorInitializer(
py_tensor_ptr, act_name, place, persistable, stop_gradient);
InitTensorWithNumpyValue(py_tensor_ptr, numpy_value, place, zero_copy);
Expand All @@ -399,7 +561,7 @@ void AutoInitTensorByTensor(TensorObject* py_tensor_ptr,
// If u want to update this constructor with new arguments,
// need to update this map and to add or change related code.
std::unordered_map<std::string, Py_ssize_t> kw_order_map{
{"value", 1}, {"place", 2}, {"name", 3}};
{"value", 1}, {"place", 2}, {"name", 3}, {"dist_attr", 4}};

paddle::platform::Place place =
egr::Controller::Instance().GetExpectedPlace();
Expand All @@ -408,6 +570,11 @@ void AutoInitTensorByTensor(TensorObject* py_tensor_ptr,
place = ParsePlace(kws_map, kw_order_map, args, flag_kwargs, args_num);
act_name = ParseName(kws_map, kw_order_map, args, flag_kwargs, args_num);

#ifdef PADDLE_WITH_DISTRIBUTE
std::shared_ptr<TensorDistAttr> dist_attr =
ParseDistAttrArgs(kws_map, kw_order_map, args, flag_kwargs, args_num);
#endif

if (init_by_egr_tensor) {
paddle::Tensor src_tensor;
if (kw_order_map["value"] <= args_num) {
Expand All @@ -426,7 +593,16 @@ void AutoInitTensorByTensor(TensorObject* py_tensor_ptr,
"way."));
}
}
#ifdef PADDLE_WITH_DISTRIBUTE
if (dist_attr) {
InitDistTensorWithTensor(
py_tensor_ptr, src_tensor, place, act_name, dist_attr);
} else {
InitTensorWithTensor(py_tensor_ptr, src_tensor, place, act_name);
}
#else
InitTensorWithTensor(py_tensor_ptr, src_tensor, place, act_name);
#endif
} else {
// init by framework tensor
phi::DenseTensor src_tensor;
Expand Down Expand Up @@ -545,7 +721,8 @@ void AutoInitStringTensorByStringTensor(
* ** persistable: bool,
* ** zero_copy: bool,
* ** name: std::string,
* ** stop_gradient: bool)
* ** stop_gradient: bool,
* ** dist_attr: phi::distributed::TensorDistAttr)
* 4.
* def __init__ (
* ** value: ndarray)
Expand All @@ -558,7 +735,8 @@ void AutoInitStringTensorByStringTensor(
* def __init__ (
* ** tensor: Tensor,
* ** place: paddle::platform::Place,
* ** name: std::string)
* ** name: std::string,
* ** dist_attr: phi::distributed::TensorDistAttr)
* 7. (multi-place) (should have at least one parameter, one parameter similar
* to case 5, zero parameter equals to case 1.)
* def __init__ (
Expand All @@ -583,6 +761,7 @@ int TensorInit(PyObject* self, PyObject* args, PyObject* kwargs) {
PyObject* kw_dims = NULL;
PyObject* kw_dtype = NULL;
PyObject* kw_type = NULL;
PyObject* kw_dist_attr = NULL;

// the keywords argument
static char* kwlist[] = {const_cast<char*>("value"),
Expand All @@ -594,6 +773,7 @@ int TensorInit(PyObject* self, PyObject* args, PyObject* kwargs) {
const_cast<char*>("dims"),
const_cast<char*>("dtype"),
const_cast<char*>("type"),
const_cast<char*>("dist_attr"),
NULL};

// 'O' Store a Python object (without any conversion) in a C object pointer,
Expand All @@ -604,7 +784,7 @@ int TensorInit(PyObject* self, PyObject* args, PyObject* kwargs) {
// which enhance case2, case3, case4, case5, case6, case7.
bool flag_ = PyArg_ParseTupleAndKeywords(args,
kwargs,
"|OOOOOOOOO",
"|OOOOOOOOOO",
kwlist,
&kw_value,
&kw_place,
Expand All @@ -614,7 +794,8 @@ int TensorInit(PyObject* self, PyObject* args, PyObject* kwargs) {
&kw_stop_gradient,
&kw_dims,
&kw_dtype,
&kw_type);
&kw_type,
&kw_dist_attr);

// helper map
std::unordered_map<std::string, PyObject*> kws_map{
Expand All @@ -626,7 +807,8 @@ int TensorInit(PyObject* self, PyObject* args, PyObject* kwargs) {
{"stop_gradient", kw_stop_gradient},
{"dims", kw_dims},
{"dtype", kw_dtype},
{"type", kw_type}};
{"type", kw_type},
{"dist_attr", kw_dist_attr}};

PADDLE_ENFORCE_EQ(flag_,
true,
Expand All @@ -636,7 +818,7 @@ int TensorInit(PyObject* self, PyObject* args, PyObject* kwargs) {
"sure you are on the right way. "
"The expected arguments as follow: ("
"value, place, persistable, zero_copy, "
"name, stop_gradient, dims, dtype, type)"));
"name, stop_gradient, dims, dtype, type, dist_attr)"));

PADDLE_ENFORCE_NOT_NULL(
self,
Expand Down
Loading

0 comments on commit e32c437

Please sign in to comment.