Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java pinned memory pool allocator #2872

Merged
merged 5 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- PR #2838 CSV Reader: Support ARROW_RANDOM_FILE input
- PR #2655 CuPy-based Series and Dataframe .values property
- PR #2803 Added `edit_distance_matrix()` function to calculate pairwise edit distance for each string on a given nvstrings object.
- PR #2872 Add Java pinned memory pool allocator

## Improvements

Expand Down
11 changes: 11 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,17 @@ public final void ensureOnDevice() {
}
}

/**
* Drop any data stored on the host.
*/
public final void dropHostData() {
ensureOnDevice();
if (offHeap.hostData != null) {
offHeap.hostData.close();
offHeap.hostData = null;
}
}

/**
* Be sure the data is on the host.
*/
Expand Down
16 changes: 16 additions & 0 deletions java/src/main/java/ai/rapids/cudf/Cuda.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ public class Cuda {
*/
public static native CudaMemInfo memGetInfo() throws CudaException;

/**
* Allocate pinned memory on the host. This call takes a long time, but can really speed up
* memory transfers.
* @param size how much memory, in bytes, to allocate.
* @return the address to the allocated memory.
* @throws CudaException on any error.
*/
static native long hostAllocPinned(long size) throws CudaException;

/**
* Free memory allocated with hostAllocPinned.
* @param ptr the pointer returned by hostAllocPinned.
* @throws CudaException on any error.
*/
static native void freePinned(long ptr) throws CudaException;

/**
* Copies count bytes from the memory area pointed to by src to the memory area pointed to by
* dst.
Expand Down
27 changes: 24 additions & 3 deletions java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* NOTE: Instances must be explicitly closed or native memory will be leaked!
*/
public class HostMemoryBuffer extends MemoryBuffer {
private static final boolean defaultPreferPinned = Boolean.getBoolean(
"ai.rapids.cudf.prefer-pinned");
private static final Logger log = LoggerFactory.getLogger(HostMemoryBuffer.class);

private static final class HostBufferCleaner extends MemoryCleaner.Cleaner {
Expand All @@ -56,8 +58,19 @@ protected boolean cleanImpl(boolean logErrorIfNotClean) {
}
}

HostMemoryBuffer(long address, long length) {
super(address, length, new HostBufferCleaner(address));
/**
* Factory method to create this buffer
* @param bytes - size in bytes to allocate
* @return - return this newly created buffer
*/
public static HostMemoryBuffer allocate(long bytes, boolean preferPinned) {
if (preferPinned) {
HostMemoryBuffer pinnedBuffer = PinnedMemoryPool.tryAllocate(bytes);
if (pinnedBuffer != null) {
return pinnedBuffer;
}
}
return new HostMemoryBuffer(UnsafeMemoryAccessor.allocate(bytes), bytes);
}

/**
Expand All @@ -66,7 +79,15 @@ protected boolean cleanImpl(boolean logErrorIfNotClean) {
* @return - return this newly created buffer
*/
public static HostMemoryBuffer allocate(long bytes) {
return new HostMemoryBuffer(UnsafeMemoryAccessor.allocate(bytes), bytes);
return allocate(bytes, defaultPreferPinned);
}

HostMemoryBuffer(long address, long length) {
super(address, length, new HostBufferCleaner(address));
}

HostMemoryBuffer(long address, long length, MemoryCleaner.Cleaner cleaner) {
super(address, length, cleaner);
}

private HostMemoryBuffer(long address, long lengthInBytes, HostMemoryBuffer parent) {
Expand Down
292 changes: 292 additions & 0 deletions java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/*
*
* Copyright (c) 2019, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* This provides a pool of pinned memory similar to what RMM does for device memory.
*/
public final class PinnedMemoryPool implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(PinnedMemoryPool.class);
private static final long ALIGNMENT = 8;

// These static fields should only ever be accessed when class-synchronized.
// Do NOT use singleton_ directly! Use the getSingleton accessor instead.
private static volatile PinnedMemoryPool singleton_ = null;
private static ExecutorService initService = null;
private static Future<PinnedMemoryPool> initFuture = null;

private long pinnedPoolBase;
private PriorityQueue<MemorySection> freeHeap = new PriorityQueue<>(new SortedBySize());
private int numAllocatedSections = 0;
private long availableBytes;

private static class SortedBySize implements Comparator<MemorySection> {
@Override
public int compare(MemorySection s0, MemorySection s1) {
// We want the largest ones first...
int ret = Long.compare(s1.size, s0.size);
if (ret == 0) {
ret = Long.compare(s0.baseAddress, s1.baseAddress);
}
return ret;
}
}

private static class MemorySection {
private long baseAddress;
private long size;

MemorySection(long baseAddress, long size) {
this.baseAddress = baseAddress;
this.size = size;
}

boolean canCombine(MemorySection other) {
boolean ret = (other.baseAddress + other.size) == baseAddress ||
(baseAddress + size) == other.baseAddress;
log.trace("CAN {} COMBINE WITH {} ? {}", this, other, ret);
return ret;
}

void combineWith(MemorySection other) {
assert canCombine(other);
razajafri marked this conversation as resolved.
Show resolved Hide resolved
log.trace("COMBINING {} AND {}", this, other);
this.baseAddress = Math.min(baseAddress, other.baseAddress);
this.size = other.size + this.size;
log.trace("COMBINED TO {}\n", this);
}

MemorySection splitOff(long newSize) {
assert this.size > newSize;
MemorySection ret = new MemorySection(baseAddress, newSize);
this.baseAddress += newSize;
this.size -= newSize;
return ret;
}

@Override
public String toString() {
return "PINNED: " + size + " bytes (0x" + Long.toHexString(baseAddress)
+ " to 0x" + Long.toHexString(baseAddress + size) + ")";
}
}

private static final class PinnedHostBufferCleaner extends MemoryCleaner.Cleaner {
private MemorySection section;

PinnedHostBufferCleaner(MemorySection section) {
this.section = section;
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
if (section != null) {
PinnedMemoryPool.freeInternal(section);
section = null;
neededCleanup = true;
}
if (neededCleanup && logErrorIfNotClean) {
log.error("A PINNED HOST BUFFER WAS LEAKED!!!!");
logRefCountDebug("Leaked pinned host buffer");
}
return neededCleanup;
}
}

private static PinnedMemoryPool getSingleton() {
if (singleton_ == null) {
if (initFuture == null) {
return null;
}

synchronized (PinnedMemoryPool.class) {
if (singleton_ == null) {
initService.shutdown();
initService = null;
try {
singleton_ = initFuture.get();
} catch (Exception e) {
throw new RuntimeException("Error initializing pinned memory pool", e);
}
initFuture = null;
}
}
}
return singleton_;
}

private static void freeInternal(MemorySection section) {
Objects.requireNonNull(getSingleton()).free(section);
}

/**
* Initialize the pool.
* @param poolSize size of the pool to initialize.
*/
public static synchronized void initialize(long poolSize) {
if (isInitialized()) {
throw new IllegalStateException("Can only initialize the pool once.");
}
initService = Executors.newSingleThreadExecutor();
initFuture = initService.submit(() -> new PinnedMemoryPool(poolSize));
}

/**
* Check if the pool has been initialized or not.
*/
public static boolean isInitialized() {
return getSingleton() != null;
}

/**
* Shut down the pool of memory. If there are outstanding allocations this may fail.
*/
public static synchronized void shutdown() {
PinnedMemoryPool pool = getSingleton();
if (pool != null) {
pool.close();
}
initFuture = null;
singleton_ = null;
}

/**
* Factory method to create a pinned host memory buffer.
* @param bytes size in bytes to allocate
* @return newly created buffer or null if insufficient pinned memory
*/
public static HostMemoryBuffer tryAllocate(long bytes) {
HostMemoryBuffer result = null;
PinnedMemoryPool pool = getSingleton();
if (pool != null) {
result = pool.tryAllocateInternal(bytes);
}
return result;
}

/**
* Factory method to create a host buffer but preferably pointing to pinned memory.
* It is not guaranteed that the returned buffer will be pointer to pinned memory.
* @param bytes size in bytes to allocate
* @return newly created buffer
*/
public static HostMemoryBuffer allocate(long bytes) {
HostMemoryBuffer result = tryAllocate(bytes);
if (result == null) {
result = HostMemoryBuffer.allocate(bytes, false);
}
return result;
}

/**
* Get the number of bytes free in the pinned memory pool.
* @return amount of free memory in bytes or 0 if the pool is not initialized
*/
public static long getAvailableBytes() {
PinnedMemoryPool pool = getSingleton();
if (pool != null) {
return pool.getAvailableBytesInternal();
}
return 0;
}

private PinnedMemoryPool(long poolSize) {
this.pinnedPoolBase = Cuda.hostAllocPinned(poolSize);
freeHeap.add(new MemorySection(pinnedPoolBase, poolSize));
this.availableBytes = poolSize;
}

@Override
public void close() {
assert numAllocatedSections == 0;
Cuda.freePinned(pinnedPoolBase);
}

private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) {
if (freeHeap.isEmpty()) {
log.debug("No free pinned memory left");
return null;
}
// Align the allocation
long alignedBytes = ((bytes + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT;
MemorySection largest = freeHeap.peek();
if (largest.size < alignedBytes) {
log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest.size);
return null;
}
log.debug("Allocating {}/{} bytes pinned from {} FREE COUNT {} OUTSTANDING COUNT {}",
bytes, alignedBytes, largest, freeHeap.size(), numAllocatedSections);
freeHeap.remove(largest);
MemorySection allocated;
if (largest.size == alignedBytes) {
allocated = largest;
} else {
allocated = largest.splitOff(alignedBytes);
freeHeap.add(largest);
}
numAllocatedSections++;
availableBytes -= allocated.size;
log.debug("Allocated {} free {} outstanding {}", allocated, freeHeap, numAllocatedSections);
return new HostMemoryBuffer(allocated.baseAddress, allocated.size,
new PinnedHostBufferCleaner(allocated));
}

private synchronized void free(MemorySection section) {
log.debug("Freeing {} with {} outstanding {}", section, freeHeap, numAllocatedSections);
// This looks inefficient, but in reality it will only walk through the heap about 2 times.
// Because we keep entries up to date, each new entry will at most combine with one above it
// and one below it. That will happen in a single pass through the heap. We do a second pass
// simply out of an abundance of caution.
// Adding it in will be a log(N) operation because it is a heap.
availableBytes += section.size;
boolean anyReplaced;
do {
anyReplaced = false;
Iterator<MemorySection> it = freeHeap.iterator();
while(it.hasNext()) {
MemorySection current = it.next();
if (section.canCombine(current)) {
it.remove();
anyReplaced = true;
section.combineWith(current);
}
}
} while(anyReplaced);
freeHeap.add(section);
numAllocatedSections--;
log.debug("After freeing {} outstanding {}", freeHeap, numAllocatedSections);
}

private synchronized long getAvailableBytesInternal() {
return this.availableBytes;
}
}
Loading