Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-1372: [Plasma] enable HUGETLB support on Linux to improve plasma put performance #974

Closed
wants to merge 16 commits into from

Conversation

atumanov
Copy link

This PR makes it possible to use Plasma object store backed by a pre-mounted hugetlbfs.

}
ARROW_LOG(INFO) << "mlocking pointer " << pointer << " size " << size
<< " success " << rv;
memset(pointer, 0xff, size);
Copy link
Contributor

@pcmoritz pcmoritz Aug 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By getting rid of this memset, writing in my benchmarks takes 2x to 3x as long (that's the only performance difference I can measure from this commit); if there is no other way to achieve the same effect we will need to bring it back.

@pcmoritz
Copy link
Contributor

@atumanov Yeah, I think the best way to deal with the case where -h is specified and -d is not is to give an error I think; there is no canonical hugepagefs directory in linux in the way that there is /dev/shm (systemd seems to use /dev/hugepages and debian seems to prefer /hugepages).

@@ -40,7 +42,7 @@ int fake_munmap(void*, int64_t);
#define USE_DL_PREFIX
#define HAVE_MORECORE 0
#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T
#define DEFAULT_GRANULARITY ((size_t)128U * 1024U)
#define DEFAULT_GRANULARITY ((size_t) 1024U * 1024U * 1024U) // 1GB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this behave reasonably on machines with less than 1GB memory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my plan is to change it back to the default if hugepages are not active (unfortunately this will be done only during compile time, since it is a macro and we can't set it at runtime)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of a separate PR we should implement reserving and mmapping as much of the configured memory capacity as possible on startup. This would alleviate the need for DEFAULT_GRANULARITY altogether.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, one improvement I've been thinking about is exponential backoff on default granularity + mmap retry on mmap failure. As files we attempt to mmap double in size, we can easily get stuck with only 50% memory capacity mmaped and unable to mmap the remaining 50% (due to ENOMEM).

@@ -129,6 +130,8 @@ struct PlasmaStoreInfo {
/// The amount of memory (in bytes) that we allow to be allocated in the
/// store.
int64_t memory_capacity;
bool hugetlb_enabled;
std::string directory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two fields should be documented.


```
sudo mkdir -p /mnt/hugepages
sudo mount -t hugetlbfs -o uid=ubuntu -o gid=adm none /mnt/hugepages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ubuntu -> $USER

gid=adm might need to be changed also, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's some flexibility in setting up hugepages. We've been talking about creating an AMI with an entry in /etc/fstab, for instance. These instructions are just one example.
I can make it more generic if needed. For instance

gid=`id -g`
uid=`id -u`
sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages
sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group "
sudo bash -c "echo 2048 >  /proc/sys/vm/nr_hugepages"

@pcmoritz
Copy link
Contributor

pcmoritz commented Aug 19, 2017

This is the benchmarks I'm running:

import numpy as np
import pyarrow as pa
import pyarrow.plasma as plasma
import time

client = plasma.connect("/tmp/plasma", "", 0)

for size in [20000000, 20000000, 200000000, 200000000]:
    data = np.random.randn(size)
    tensor = pa.Tensor.from_numpy(data)

    object_id = plasma.ObjectID(np.random.bytes(20))
    a = time.time()
    buf = client.create(object_id, pa.get_tensor_size(tensor))
    print("creating ", size, " took", time.time() - a)

    stream = pa.FixedSizeBufferOutputStream(buf)
    a = time.time()
    pa.write_tensor(tensor, stream)
    print("writing ", size, " took ", time.time() - a)
    client.seal(object_id)

Here are the results (timing is in seconds, no error means it is very small,
all measurements are with the dlmalloc(1024) call on startup):

Legend:

M: memset
L: locking (have to edit /etc/security/limits.conf to memlock limit to unlimited)
P: populate
H: huge pages (have to use the -h flag and mount a hugetablefs)

     | creating 20M | writing 20M    | creating 200M | writing 200M
-----+--------------+----------------+---------------+-------------
M  H | 0.00016      | 0.020          | 0.81 +- 0.2   | 0.20
 L H | 0.00016      | 0.020          | 0.64 +- 0.2   | 0.20
  PH | 0.00015      | 0.020          | 0.65 +- 0.2   | 0.20
   H | 0.00060      | 0.056          | 0.002         | 0.58
M    | 0.00016      | 0.046          | 1.7 +- 0.5    | 0.43
 L   | 0.00017      | 0.047          | 1.5 +- 0.5    | 0.44
  P  | 0.00016      | 0.047          | 1.3 +- 0.1    | 0.44
     | 0.00020      | 0.10           | 0.15          | 0.96

Put performance is creating + writing

The time spent in creating for MLP is the latency from memsetting, locking or populating pages in the store and the error is the influence from the size of the memory mapped file (we increase the size by 2x every time we map a new file).

@atumanov
Copy link
Author

just to elaborate on Philipp's microbenchmark results, overall performance improvement with H + {M | L | P} on write performance is ~5x! (both for 20M and 200M objects) This is consistent with ray.put performance improvement I saw when benchmarking with Ray. The key is that the reported cost of create should be largely absorbed on object store startup. Subsequent writes to the same mmapped file should not (and did not for me) pay the reported create cost, just the writing cost.

BTW, you may also try to increase the number of threads from 1 to 8 as another dimension. I think the default is 1.

@pcmoritz
Copy link
Contributor

Yeah, good point! The default threadpool size is indeed 1, see

static constexpr int kMemcopyDefaultNumThreads = 1;

Thanks for the improvements, that looks great! I'll go ahead and fix the linting and the other points we collected above and then we can get this merged.

@pcmoritz
Copy link
Contributor

Based on the observations so far here is another set of benchmarks. This time it is measuring the quantity that's the most important for performance in practice, namely the throughput in the steady state (when all the memory mapped files have been created):

import numpy as np
import pyarrow as pa
import pyarrow.plasma as plasma
import time

client = plasma.connect("/tmp/plasma", "", 0)

create_times = []
write_times = []

data = np.random.randn(200000000)
tensor = pa.Tensor.from_numpy(data)

for i in range(200):
    print("iteration", i)
    object_id = plasma.ObjectID(np.random.bytes(20))
    a = time.time()
    buf = client.create(object_id, pa.get_tensor_size(tensor))
    if i >= 20:
        create_times.append(time.time() - a)

    stream = pa.FixedSizeBufferOutputStream(buf)
    a = time.time()
    pa.write_tensor(tensor, stream)
    if i >= 20:
        write_times.append(time.time() - a)
    client.seal(object_id)

print("create mean is", np.mean(create_times))
print("create stddev is", np.std(create_times))
print("write mean is", np.mean(write_times))
print("write stddev is", np.std(write_times))
Results on Linux (20GB plasma store):

      | creating 200M         | writing 200M
------+----------------------+-----------------
P     | 0.02 +- 0.02         | 0.431 +- 0.005
P   H | 0.000121 +- 2e-5     | 0.201 +- 0.004
      | 0.021 +- 0.026       | 0.441 +- 0.007
    H | 0.0001 +- 2e-1       | 0.200 +- 0.004

Results on Mac (2GB plasma store):

    | creating 20M         | writing 20M
----+----------------------+----------------
M+L | 0.000260 +- 0.000201 | 0.0302 +- 0.010
    | 0.000231 +- 0.000128 | 0.0274 +- 0.002

I'll incorporate Robert's comment and put the DEFAULT_GRANULARITY back to 128k on mac and non-hugepage linux, remove the warmup (it makes only a difference if DEFAULT_GRANULARITY is large), the memset and mlock on the store, since these don't make a difference in the steady state. This makes the behavior of the system similar to before the PR if huge pages are deactivated.

If desired we can create a follow up PR that allocates all the memory in advance as Alexey suggests.

@pcmoritz
Copy link
Contributor

pcmoritz commented Aug 19, 2017

With huge pages we now get the following results for copying 1GB of data (with different number of threads and blocksizes):

Format: num_threads = n: blocksize seconds; blocksize seconds; ...

num_threads = 1: 1 0.1326; 2 0.1361; 4 0.1327; 8 0.1328; 16 0.1350; 32 0.1370; 64 0.1366; 128 0.1339; 
num_threads = 2: 1 0.0721; 2 0.0718; 4 0.0714; 8 0.0769; 16 0.0716; 32 0.0718; 64 0.0725; 128 0.0723; 
num_threads = 3: 1 0.0541; 2 0.0539; 4 0.0544; 8 0.0543; 16 0.0541; 32 0.0540; 64 0.0537; 128 0.0581; 
num_threads = 4: 1 0.0482; 2 0.0477; 4 0.0454; 8 0.0454; 16 0.0452; 32 0.0455; 64 0.0457; 128 0.0479; 
num_threads = 5: 1 0.0430; 2 0.0430; 4 0.0431; 8 0.0432; 16 0.0437; 32 0.0431; 64 0.0431; 128 0.0434; 
num_threads = 6: 1 0.0437; 2 0.0436; 4 0.0440; 8 0.0450; 16 0.0436; 32 0.0450; 64 0.0437; 128 0.0449; 
num_threads = 7: 1 0.0416; 2 0.0416; 4 0.0417; 8 0.0425; 16 0.0429; 32 0.0418; 64 0.0417; 128 0.0419; 
num_threads = 8: 1 0.0456; 2 0.0434; 4 0.0427; 8 0.0440; 16 0.0424; 32 0.0446; 64 0.0427; 128 0.0427; 
num_threads = 9: 1 0.0480; 2 0.0458; 4 0.0468; 8 0.0463; 16 0.0476; 32 0.0458; 64 0.0468; 128 0.0470; 
num_threads = 10: 1 0.0479; 2 0.0477; 4 0.0481; 8 0.0476; 16 0.0481; 32 0.0478; 64 0.0476; 128 0.0478; 
num_threads = 11: 1 0.0465; 2 0.0466; 4 0.0468; 8 0.0463; 16 0.0461; 32 0.0469; 64 0.0460; 128 0.0465; 
num_threads = 12: 1 0.0458; 2 0.0458; 4 0.0454; 8 0.0462; 16 0.0460; 32 0.0461; 64 0.0458; 128 0.0462; 
num_threads = 13: 1 0.0437; 2 0.0432; 4 0.0428; 8 0.0436; 16 0.0429; 32 0.0432; 64 0.0438; 128 0.0436; 
num_threads = 14: 1 0.0450; 2 0.0452; 4 0.0453; 8 0.0454; 16 0.0455; 32 0.0453; 64 0.0454; 128 0.0451; 
num_threads = 15: 1 0.0446; 2 0.0438; 4 0.0438; 8 0.0451; 16 0.0442; 32 0.0445; 64 0.0442; 128 0.0441;

The best setting corresponds to 24 GB/s

import numpy as np
import pyarrow as pa
import pyarrow.plasma as plasma
import time

client = plasma.connect("/tmp/plasma", "", 0)

data = np.random.randn(1024 * 1024 * 1024 // 8)
tensor = pa.Tensor.from_numpy(data)

for num_threads in range(1, 16):
    print("\nnum_threads =", num_threads, end=": ")
    for blocksize in [1, 2, 4, 8, 16, 32, 64, 128]:
        print(blocksize, end=" ")
        times = []
        for i in range(10):
            object_id = plasma.ObjectID(np.random.bytes(20))
            buf = client.create(object_id, pa.get_tensor_size(tensor))

            stream = pa.FixedSizeBufferOutputStream(buf)
            stream.set_memcopy_threads(num_threads)
            stream.set_memcopy_blocksize(blocksize)
            a = time.time()
            pa.write_tensor(tensor, stream)
            times.append(time.time() - a)
            client.seal(object_id)
        print("%.4f" % np.mean(times), end="; ")

@pcmoritz pcmoritz force-pushed the putperf branch 2 times, most recently from 28362e9 to 1bfa7f8 Compare August 19, 2017 09:14
@@ -34,6 +34,14 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=20

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")

option(ARROW_PLASMA_HUGEPAGES
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this a command line option instead of compile time?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this only enables complication of related code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got rid of this now, it's not really needed

#define DEFAULT_GRANULARITY ((size_t)128U * 1024U)

#ifndef ARROW_PLASMA_HUGEPAGES
#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) // 128KB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would making this a configuration option that can be changed ulon initialization of Plasma be expensive?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually looking at this again, I think it can be done without too much effort, let me try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is refactored and this is set at runtime now. Let's not expose it to the user via a configuration option yet, it doesn't seem that there would be benefits that justify the additional complexity. I made it 1GB in the huge page case, it seems unlikely that anybody would use huge pages if they have < 1GB of memory. Happy to revisit this in the future if desired.

@@ -122,8 +130,18 @@ void* fake_mmap(size_t size) {

int fd = create_buffer(size);
ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
#ifdef __linux__
void* pointer =
mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document why we are using MAP_POPULATE and what it does.

ARROW_LOG(FATAL) << "ftruncate error";
return -1;
if (!plasma::plasma_config->hugepages_enabled) {
if (ftruncate(fd, (off_t)size) != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document what ftruncate does and why we are calling it.

@@ -129,6 +130,11 @@ struct PlasmaStoreInfo {
/// The amount of memory (in bytes) that we allow to be allocated in the
/// store.
int64_t memory_capacity;
/// Boolean flag indicating whether to start the object store with hugepages
/// support enabled.
bool hugepages_enabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider also explaining why huge pages improve performance

return -1;
}
if (unlink(file_name) != 0) {
ARROW_LOG(FATAL) << "unlink error";
if (unlink(&file_name[0]) != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but we should really document why we are doing this unlink call

@pcmoritz
Copy link
Contributor

+1 The AppVeyor build failure is due to ARROW-1375

@asfgit asfgit closed this in 10f7158 Aug 20, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants