Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Fix shared memory with gluon dataloader, add option pin_memory #11908

Merged
merged 13 commits into from
Aug 10, 2018
Prev Previous commit
Next Next commit
fix unittest for windows, update doc that windows mp is available
  • Loading branch information
zhreshold committed Aug 6, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit b7481f9fdc8466273607635ad59046912e1f179b
2 changes: 1 addition & 1 deletion python/mxnet/gluon/data/dataloader.py
Original file line number Diff line number Diff line change
@@ -151,7 +151,7 @@ def default_mp_batchify_fn(data):


def _as_in_context(data, ctx):
"""Move data into new context. """
"""Move data into new context."""
if isinstance(data, nd.NDArray):
return data.as_in_context(ctx)
elif isinstance(data, (list, tuple)):
169 changes: 81 additions & 88 deletions tests/python/unittest/test_gluon_data.py
Original file line number Diff line number Diff line change
@@ -130,99 +130,92 @@ def test_multi_worker():
for i, batch in enumerate(loader):
assert (batch.asnumpy() == i).all()

class _Dummy(Dataset):
"""Dummpy dataset for randomized shape arrays."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix that

def __init__(self, random_shape):
self.random_shape = random_shape

def __getitem__(self, idx):
key = idx
if self.random_shape:
out = np.random.uniform(size=(random.randint(1000, 1100), 40))
labels = np.random.uniform(size=(random.randint(10, 15)))
else:
out = np.random.uniform(size=(1000, 40))
labels = np.random.uniform(size=(10))
return key, out, labels

def __len__(self):
return 50

def _batchify_list(data):
"""
return list of ndarray without stack/concat/pad
"""
if isinstance(data, (tuple, list)):
return list(data)
if isinstance(data, mx.nd.NDArray):
return [data]
return data

def _batchify(data):
"""
Collate data into batch. Use shared memory for stacking.
:param data: a list of array, with layout of 'NTC'.
:return either x and x's unpadded lengths, or x, x's unpadded lengths, y and y's unpadded lengths
if labels are not supplied.
"""

# input layout is NTC
keys, inputs, labels = [item[0] for item in data], [item[1] for item in data], \
[item[2] for item in data]

if len(data) > 1:
max_data_len = max([seq.shape[0] for seq in inputs])
max_labels_len = 0 if not labels else max([seq.shape[0] for seq in labels])
else:
max_data_len = inputs[0].shape[0]
max_labels_len = 0 if not labels else labels[0].shape[0]

x_lens = [item.shape[0] for item in inputs]
y_lens = [item.shape[0] for item in labels]

for i, seq in enumerate(inputs):
pad_len = max_data_len - seq.shape[0]
inputs[i] = np.pad(seq, ((0, pad_len), (0, 0)), 'constant', constant_values=0)
labels[i] = np.pad(labels[i], (0, max_labels_len - labels[i].shape[0]),
'constant', constant_values=-1)

inputs = np.asarray(inputs, dtype=np.float32)
if labels is not None:
labels = np.asarray(labels, dtype=np.float32)
inputs = inputs.transpose((1, 0, 2))
labels = labels.transpose((1, 0))

return (nd.array(inputs, dtype=inputs.dtype, ctx=context.Context('cpu_shared', 0)),
nd.array(x_lens, ctx=context.Context('cpu_shared', 0))) \
if labels is None else (
nd.array(inputs, dtype=inputs.dtype, ctx=context.Context('cpu_shared', 0)),
nd.array(x_lens, ctx=context.Context('cpu_shared', 0)),
nd.array(labels, dtype=labels.dtype, ctx=context.Context('cpu_shared', 0)),
nd.array(y_lens, ctx=context.Context('cpu_shared', 0)))

@with_seed()
def test_multi_worker_forked_data_loader():
"""
Test should successfully run its course of multi-process/forked data loader without errors
"""
class Dummy(Dataset):
def __init__(self, random_shape):
self.random_shape = random_shape

def __getitem__(self, idx):
key = idx
if self.random_shape:
out = np.random.uniform(size=(random.randint(1000, 1100), 40))
labels = np.random.uniform(size=(random.randint(10, 15)))
else:
out = np.random.uniform(size=(1000, 40))
labels = np.random.uniform(size=(10))
return key, out, labels

def __len__(self):
return 50

def batchify_list(self, data):
"""
return list of ndarray without stack/concat/pad
"""
if isinstance(data, (tuple, list)):
return list(data)
if isinstance(data, mx.nd.NDArray):
return [data]
return data

def batchify(self, data):
"""
Collate data into batch. Use shared memory for stacking.

:param data: a list of array, with layout of 'NTC'.
:return either x and x's unpadded lengths, or x, x's unpadded lengths, y and y's unpadded lengths
if labels are not supplied.
"""

# input layout is NTC
keys, inputs, labels = [item[0] for item in data], [item[1] for item in data], \
[item[2] for item in data]

if len(data) > 1:
max_data_len = max([seq.shape[0] for seq in inputs])
max_labels_len = 0 if not labels else max([seq.shape[0] for seq in labels])
else:
max_data_len = inputs[0].shape[0]
max_labels_len = 0 if not labels else labels[0].shape[0]

x_lens = [item.shape[0] for item in inputs]
y_lens = [item.shape[0] for item in labels]

for i, seq in enumerate(inputs):
pad_len = max_data_len - seq.shape[0]
inputs[i] = np.pad(seq, ((0, pad_len), (0, 0)), 'constant', constant_values=0)
labels[i] = np.pad(labels[i], (0, max_labels_len - labels[i].shape[0]),
'constant', constant_values=-1)

inputs = np.asarray(inputs, dtype=np.float32)
if labels is not None:
labels = np.asarray(labels, dtype=np.float32)
inputs = inputs.transpose((1, 0, 2))
labels = labels.transpose((1, 0))

return (nd.array(inputs, dtype=inputs.dtype, ctx=context.Context('cpu_shared', 0)),
nd.array(x_lens, ctx=context.Context('cpu_shared', 0))) \
if labels is None else (
nd.array(inputs, dtype=inputs.dtype, ctx=context.Context('cpu_shared', 0)),
nd.array(x_lens, ctx=context.Context('cpu_shared', 0)),
nd.array(labels, dtype=labels.dtype, ctx=context.Context('cpu_shared', 0)),
nd.array(y_lens, ctx=context.Context('cpu_shared', 0)))


# This test is pointless on Windows because Windows doesn't fork
if platform.system() != 'Windows':
data = Dummy(True)
loader = DataLoader(data, batch_size=40, batchify_fn=data.batchify, num_workers=2)
for epoch in range(1):
for i, data in enumerate(loader):
if i % 100 == 0:
print(data)
print('{}:{}'.format(epoch, i))

data = Dummy(True)
loader = DataLoader(data, batch_size=40, batchify_fn=data.batchify_list, num_workers=2)
for epoch in range(1):
for i, data in enumerate(loader):
if i % 100 == 0:
print(data)
print('{}:{}'.format(epoch, i))
data = _Dummy(False)
loader = DataLoader(data, batch_size=40, batchify_fn=_batchify, num_workers=2)
for epoch in range(1):
for i, data in enumerate(loader):
pass

data = _Dummy(True)
loader = DataLoader(data, batch_size=40, batchify_fn=_batchify_list, num_workers=2)
for epoch in range(1):
for i, data in enumerate(loader):
pass

if __name__ == '__main__':
import nose