Skip to content

Commit

Permalink
fix bugs and improve convergence rate.
Browse files Browse the repository at this point in the history
  • Loading branch information
ZHUI committed Feb 15, 2022
1 parent 88a0008 commit c6d6b8f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 51 deletions.
70 changes: 38 additions & 32 deletions paddle/fluid/memory/allocation/mmap_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,16 @@ void AllocateMemoryMap(std::string filename, int flags, size_t size,
}
}

std::shared_ptr<MemoryMapAllocation> AllocateMemoryMapAllocation(
std::string filename, int flags, size_t size) {
int fd = -1;
void *base_ptr = nullptr;
AllocateMemoryMap(filename, flags, size, &base_ptr, &fd);
return std::make_shared<MemoryMapAllocation>(base_ptr, size, filename, flags,
fd);
}
// Temporarily commented out for convergence rate @ZHUI
// std::shared_ptr<MemoryMapAllocation> AllocateMemoryMapAllocation(
// std::string filename, int flags, size_t size) {
// int fd = -1;
// void *base_ptr = nullptr;
// AllocateMemoryMap(filename, flags, size, &base_ptr, &fd);
// return std::make_shared<MemoryMapAllocation>(base_ptr, size, filename,
// flags,
// fd);
// }

std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename, int flags,
Expand All @@ -136,30 +138,34 @@ void MemoryMapAllocation::close() {
return;
}
closed_ = true;
if (map_ptr_ == nullptr) {
return;
}
if (flags_ & MAPPED_KEEPFD) {
PADDLE_ENFORCE_NE(
::close(fd_), -1,
platform::errors::Unavailable("could not close file descriptor ", fd_,
" :", strerror(errno), " (", errno, ")"));
}

PADDLE_ENFORCE_NE(
munmap(map_ptr_, map_size_), -1,
platform::errors::Unavailable("could not unmap the shared memory file: ",
strerror(errno), " (", errno, ")"));

if (!(flags_ & (MAPPED_FROMFD | MAPPED_UNLINK))) {
if (flags_ & MAPPED_SHAREDMEM) {
PADDLE_ENFORCE_NE(
shm_unlink(ipc_name_.c_str()), -1,
platform::errors::Unavailable(
"could not unlink the shared memory file ", ipc_name_, " : ",
strerror(errno), " (", errno, ")"));
}
}
// Temporarily commented out for convergence rate @ZHUI
// if (map_ptr_ == nullptr) {
// return;
// }
// if (flags_ & MAPPED_KEEPFD) {
// PADDLE_ENFORCE_NE(
// ::close(fd_), -1,
// platform::errors::Unavailable("could not close file descriptor ",
// fd_,
// " :", strerror(errno), " (", errno,
// ")"));
// }

// PADDLE_ENFORCE_NE(
// munmap(map_ptr_, map_size_), -1,
// platform::errors::Unavailable("could not unmap the shared memory file:
// ",
// strerror(errno), " (", errno, ")"));

// if (!(flags_ & (MAPPED_FROMFD | MAPPED_UNLINK))) {
// if (flags_ & MAPPED_SHAREDMEM) {
// PADDLE_ENFORCE_NE(
// shm_unlink(ipc_name_.c_str()), -1,
// platform::errors::Unavailable(
// "could not unlink the shared memory file ", ipc_name_, " : ",
// strerror(errno), " (", errno, ")"));
// }
// }
}

MemoryMapAllocation::~MemoryMapAllocation() { close(); }
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/memory/allocation/mmap_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
void AllocateMemoryMap(std::string filename, int flags, size_t size,
void **base_ptr_, int *fd_);

std::shared_ptr<MemoryMapAllocation> AllocateMemoryMapAllocation(
std::string filename, int flags, size_t size);
// Temporarily commented out for convergence rate @ZHUI
// std::shared_ptr<MemoryMapAllocation> AllocateMemoryMapAllocation(
// std::string filename, int flags, size_t size);

std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename, int flags,
Expand Down
24 changes: 24 additions & 0 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,30 @@ PYBIND11_MODULE(core_noavx, m) {
tensor_from_shared = paddle.to_tensor(paddle.fluid.core.LoDTensor._new_shared_filename(metainfo))
)DOC")
.def("_shared_incref",
[](framework::Tensor &self) {
auto *mmap_allocation = dynamic_cast<
memory::allocation::RefcountedMemoryMapAllocation *>(
self.Holder().get());
if (mmap_allocation) {
mmap_allocation->incref();
}
},
R"DOC(
Increase reference count of share_filename tensor.
)DOC")
.def("_shared_decref",
[](framework::Tensor &self) {
auto *mmap_allocation = dynamic_cast<
memory::allocation::RefcountedMemoryMapAllocation *>(
self.Holder().get());
if (mmap_allocation) {
mmap_allocation->decref();
}
},
R"DOC(
Decrease reference count of share_filename tensor.
)DOC")
.def(py::pickle(
[](const framework::Tensor &t) { // __getstate__
auto holder = t.Holder();
Expand Down
24 changes: 21 additions & 3 deletions python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ def send_tensor(queue, event, device, dtype):
event.wait()


def send_parambase(queue, event, device, dtype):
tensor = paddle.nn.Layer().create_parameter(
[5, 5],
dtype=dtype,
default_initializer=paddle.nn.initializer.Constant(value=1.0))
queue.put(tensor)
queue.put(tensor)
event.wait()


class leak_checker(object):
def __init__(self, test_case):
self.checked_pids = [os.getpid()]
Expand Down Expand Up @@ -96,6 +106,13 @@ def get_parameter(self):
default_initializer=paddle.nn.initializer.Constant(value=0.0))
return w

def _test_empty(self, dtype="float32"):
q = mp.Queue()
empty = paddle.to_tensor([], dtype=dtype)
q.put(empty)
out = q.get(timeout=1)
self.assertEqual(str(out), str(empty))

def _test_sharing(self,
ctx=mp,
device='cpu',
Expand Down Expand Up @@ -135,7 +152,8 @@ def test_receive():
event = ctx.Event()

process = ctx.Process(
target=send_tensor, args=(queue, event, device, dtype))
target=send_parambase if param else send_tensor,
args=(queue, event, device, dtype))
process.daemon = True
lc.check_pid(process.pid)
process.start()
Expand All @@ -152,8 +170,7 @@ def test_receive():
with leak_checker(self) as lc:
for _ in range(repeat):
test_fill()
if device != "gpu":
test_receive()
test_receive()


class TestMultiprocessingCpu(TestMultiprocessingBase):
Expand All @@ -167,6 +184,7 @@ def test_pass_parambase(self):

def test_pass_empty(self):
paddle.set_device("cpu")
self._test_empty()


class TestMultiprocessingGpu(TestMultiprocessingBase):
Expand Down
25 changes: 11 additions & 14 deletions python/paddle/multiprocessing/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,6 @@ def cuda_from_cache(key):
return lodtensor


#TODO: support cuda ipc event
def rebuild_event():
pass


def reduce_event():
pass


def rebuild_tensor(cls, lodtensor, metadata):
if cls == paddle.fluid.framework.ParamBase:
tensor = paddle.fluid.framework.ParamBase(lodtensor.shape(),
Expand All @@ -102,7 +93,10 @@ def rebuild_tensor(cls, lodtensor, metadata):
else:
size, stop_gradient = metadata
tensor = paddle.fluid.core.VarBase()
tensor.value().get_tensor()._share_data_with(lodtensor)
if lodtensor._is_initialized():
tensor.value().get_tensor()._share_data_with(lodtensor)
else:
tensor = paddle.to_tensor([], dtype=lodtensor._dtype())
tensor.stop_gradient = stop_gradient
return tensor

Expand Down Expand Up @@ -131,6 +125,7 @@ def reduce_tensor(tensor):

def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod):
lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod))
lodtensor._shared_decref()
return lodtensor


Expand Down Expand Up @@ -163,15 +158,17 @@ def rebuild_lodtensor_empty(cls):
def reduce_lodtensor(lodtensor):
if lodtensor._place().is_cpu_place() or lodtensor._place(
).is_cuda_pinned_place():
for dim in lodtensor.shape():
if dim == 0:
# Empty tensors have nothing be mmapped.
return (rebuild_lodtensor_empty, (type(lodtensor), ))

# Default use share filename stratege
metadata = lodtensor._share_filename(
) # ipc_name, size, type_idx, dims, lod
rebuild = rebuild_lodtensor_filename
lodtensor._shared_incref()
# TODO, maintain reference for lodtensor
for dim in lodtensor.shape():
if dim == 0:
# Empty tensors have nothing be mmapped.
return (rebuild_lod_empty, (type(lodtensor), ))
# TODO: support file_discriptor stratege
elif lodtensor._place().is_gpu_place():
metadata = lodtensor._share_cuda()
Expand Down

0 comments on commit c6d6b8f

Please sign in to comment.