diff --git a/.gitignore b/.gitignore
index e6dfe19bb9807..935f5fb488efe 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,3 +29,4 @@ cpp/.idea/
python/.eggs/
.vscode
.idea/
+
diff --git a/java/.gitignore b/java/.gitignore
index 03f5bf76e60d2..e596e627597fa 100644
--- a/java/.gitignore
+++ b/java/.gitignore
@@ -20,4 +20,6 @@ CMakeFiles
Makefile
cmake_install.cmake
install_manifest.txt
+*.dat
?/
+
diff --git a/java/memory/pom.xml b/java/memory/pom.xml
index 7efc8e6aa470c..c3a0347ba0589 100644
--- a/java/memory/pom.xml
+++ b/java/memory/pom.xml
@@ -40,6 +40,10 @@
org.slf4j
slf4j-api
+
+ org.apache.mnemonic
+ mnemonic-core
+
diff --git a/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledByteBufAllocator.java b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledByteBufAllocator.java
new file mode 100644
index 0000000000000..8d8295499a5cb
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledByteBufAllocator.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.buffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+import org.apache.mnemonic.CommonAllocator;
+
+/**
+ * Simplistic {@link ByteBufAllocator} implementation that does not pool anything.
+ */
+public final class MnemonicUnpooledByteBufAllocator> extends AbstractByteBufAllocator {
+
+ private A mcalloc;
+
+ /**
+ * Default instance
+ */
+ // public static final UnpooledByteBufAllocator DEFAULT =
+ // new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
+
+ /**
+ * Create a new instance
+ *
+ * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
+ * a heap buffer
+ */
+ public MnemonicUnpooledByteBufAllocator(boolean preferDirect, A mcallocator) {
+ super(preferDirect);
+ this.mcalloc = mcallocator;
+ }
+
+ public A getAllocator() {
+ return this.mcalloc;
+ }
+
+ @Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ assert null != this.mcalloc;
+ ByteBuf buf;
+ if (PlatformDependent.hasUnsafe()) {
+ buf = new MnemonicUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
+ } else {
+ buf = new MnemonicUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
+ }
+
+ return toLeakAwareBuffer(buf);
+ }
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return false;
+ }
+}
diff --git a/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledDirectByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledDirectByteBuf.java
new file mode 100644
index 0000000000000..33569117a7e7d
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledDirectByteBuf.java
@@ -0,0 +1,620 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.buffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import org.apache.arrow.memory.OutOfMemoryException;
+
+import org.apache.mnemonic.CommonAllocator;
+import org.apache.mnemonic.MemBufferHolder;
+
+/**
+ * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)}
+ * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
+ * constructor explicitly.
+ */
+public class MnemonicUnpooledDirectByteBuf> extends AbstractReferenceCountedByteBuf {
+
+ private final MnemonicUnpooledByteBufAllocator alloc;
+
+ private MemBufferHolder bufholder;
+ private ByteBuffer buffer;
+ private ByteBuffer tmpNioBuf;
+ private int capacity;
+ private boolean doNotFree;
+
+ /**
+ * Creates a new direct buffer.
+ *
+ * @param initialCapacity the initial capacity of the underlying direct buffer
+ * @param maxCapacity the maximum capacity of the underlying direct buffer
+ */
+ protected MnemonicUnpooledDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
+ super(maxCapacity);
+ if (alloc == null) {
+ throw new NullPointerException("alloc");
+ }
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
+ }
+ if (maxCapacity < 0) {
+ throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
+ }
+ if (initialCapacity > maxCapacity) {
+ throw new IllegalArgumentException(String.format(
+ "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
+ }
+
+ this.alloc = alloc;
+ setByteBuffer(allocateDirect(initialCapacity));
+ }
+
+ /**
+ * Creates a new direct buffer by wrapping the specified initial buffer.
+ *
+ * @param maxCapacity the maximum capacity of the underlying direct buffer
+ */
+ protected MnemonicUnpooledDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, MemBufferHolder initialBufHolder, int maxCapacity) {
+ super(maxCapacity);
+ if (alloc == null) {
+ throw new NullPointerException("alloc");
+ }
+ if (initialBufHolder == null || initialBufHolder.get() == null) {
+ throw new NullPointerException("initialBufHolder");
+ }
+ if (!initialBufHolder.get().isDirect()) {
+ throw new IllegalArgumentException("initialBufHolder is not a direct buffer.");
+ }
+ if (initialBufHolder.get().isReadOnly()) {
+ throw new IllegalArgumentException("initialBufHolder is a read-only buffer.");
+ }
+
+ int initialCapacity = initialBufHolder.get().remaining();
+ if (initialCapacity > maxCapacity) {
+ throw new IllegalArgumentException(String.format(
+ "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
+ }
+
+ this.alloc = alloc;
+ doNotFree = true;
+ setByteBuffer(initialBufHolder);
+ writerIndex(initialCapacity);
+ }
+
+ /**
+ * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
+ */
+ protected MemBufferHolder allocateDirect(int initialCapacity) {
+ MemBufferHolder mbholder = this.alloc.getAllocator().createBuffer(initialCapacity);
+ if (null == mbholder) {
+ throw new OutOfMemoryException("No more memory resource for this MnemonicUnpooledDirectByteBuf instance");
+ }
+ return mbholder;
+ }
+
+ /**
+ * Free a direct {@link ByteBuffer}
+ */
+ protected void freeDirect(MemBufferHolder bufholder) {
+ if (this.bufholder != null && this.bufholder.get() != null) {
+ this.bufholder.destroy();
+ }
+ this.buffer = null;
+ this.bufholder = null;
+ }
+
+ private void setByteBuffer(MemBufferHolder bufholder) {
+ MemBufferHolder oldBufholder = this.bufholder;
+ if (oldBufholder != null) {
+ if (doNotFree) {
+ doNotFree = false;
+ } else {
+ freeDirect(oldBufholder);
+ }
+ }
+
+ this.bufholder = bufholder;
+ this.buffer = this.bufholder.get();
+ tmpNioBuf = null;
+ capacity = this.buffer.remaining();
+ }
+
+ @Override
+ public boolean isDirect() {
+ return true;
+ }
+
+ @Override
+ public int capacity() {
+ return capacity;
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ ensureAccessible();
+ if (newCapacity < 0 || newCapacity > maxCapacity()) {
+ throw new IllegalArgumentException("newCapacity: " + newCapacity);
+ }
+
+ int readerIndex = readerIndex();
+ int writerIndex = writerIndex();
+
+ int oldCapacity = capacity;
+ if (newCapacity > oldCapacity) {
+ ByteBuffer oldBuffer = buffer;
+ MemBufferHolder newBufholder = allocateDirect(newCapacity);
+ ByteBuffer newBuffer = newBufholder.get();
+ oldBuffer.position(0).limit(oldBuffer.capacity());
+ newBuffer.position(0).limit(oldBuffer.capacity());
+ newBuffer.put(oldBuffer);
+ newBuffer.clear();
+ setByteBuffer(newBufholder);
+ } else if (newCapacity < oldCapacity) {
+ ByteBuffer oldBuffer = buffer;
+ MemBufferHolder newBufholder = allocateDirect(newCapacity);
+ ByteBuffer newBuffer = newBufholder.get();
+ if (readerIndex < newCapacity) {
+ if (writerIndex > newCapacity) {
+ writerIndex(writerIndex = newCapacity);
+ }
+ oldBuffer.position(readerIndex).limit(writerIndex);
+ newBuffer.position(readerIndex).limit(writerIndex);
+ newBuffer.put(oldBuffer);
+ newBuffer.clear();
+ } else {
+ setIndex(newCapacity, newCapacity);
+ }
+ setByteBuffer(newBufholder);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return alloc;
+ }
+
+ @Override
+ public ByteOrder order() {
+ return ByteOrder.BIG_ENDIAN;
+ }
+
+ @Override
+ public boolean hasArray() {
+ return false;
+ }
+
+ @Override
+ public byte[] array() {
+ throw new UnsupportedOperationException("direct buffer");
+ }
+
+ @Override
+ public int arrayOffset() {
+ throw new UnsupportedOperationException("direct buffer");
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return false;
+ }
+
+ @Override
+ public long memoryAddress() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte getByte(int index) {
+ ensureAccessible();
+ return _getByte(index);
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return buffer.get(index);
+ }
+
+ @Override
+ public short getShort(int index) {
+ ensureAccessible();
+ return _getShort(index);
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ return buffer.getShort(index);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ ensureAccessible();
+ return _getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | getByte(index + 2) & 0xff;
+ }
+
+ @Override
+ public int getInt(int index) {
+ ensureAccessible();
+ return _getInt(index);
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ return buffer.getInt(index);
+ }
+
+ @Override
+ public long getLong(int index) {
+ ensureAccessible();
+ return _getLong(index);
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ return buffer.getLong(index);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ checkDstIndex(index, length, dstIndex, dst.capacity());
+ if (dst.hasArray()) {
+ getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
+ } else if (dst.nioBufferCount() > 0) {
+ for (ByteBuffer bb: dst.nioBuffers(dstIndex, length)) {
+ int bbLen = bb.remaining();
+ getBytes(index, bb);
+ index += bbLen;
+ }
+ } else {
+ dst.setBytes(dstIndex, this, index, length);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ getBytes(index, dst, dstIndex, length, false);
+ return this;
+ }
+
+ private void getBytes(int index, byte[] dst, int dstIndex, int length, boolean internal) {
+ checkDstIndex(index, length, dstIndex, dst.length);
+
+ if (dstIndex < 0 || dstIndex > dst.length - length) {
+ throw new IndexOutOfBoundsException(String.format(
+ "dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.length));
+ }
+
+ ByteBuffer tmpBuf;
+ if (internal) {
+ tmpBuf = internalNioBuffer();
+ } else {
+ tmpBuf = buffer.duplicate();
+ }
+ tmpBuf.clear().position(index).limit(index + length);
+ tmpBuf.get(dst, dstIndex, length);
+ }
+
+ @Override
+ public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
+ checkReadableBytes(length);
+ getBytes(readerIndex, dst, dstIndex, length, true);
+ readerIndex += length;
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ getBytes(index, dst, false);
+ return this;
+ }
+
+ private void getBytes(int index, ByteBuffer dst, boolean internal) {
+ checkIndex(index);
+ if (dst == null) {
+ throw new NullPointerException("dst");
+ }
+
+ int bytesToCopy = Math.min(capacity() - index, dst.remaining());
+ ByteBuffer tmpBuf;
+ if (internal) {
+ tmpBuf = internalNioBuffer();
+ } else {
+ tmpBuf = buffer.duplicate();
+ }
+ tmpBuf.clear().position(index).limit(index + bytesToCopy);
+ dst.put(tmpBuf);
+ }
+
+ @Override
+ public ByteBuf readBytes(ByteBuffer dst) {
+ int length = dst.remaining();
+ checkReadableBytes(length);
+ getBytes(readerIndex, dst, true);
+ readerIndex += length;
+ return this;
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ ensureAccessible();
+ _setByte(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ buffer.put(index, (byte) value);
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ ensureAccessible();
+ _setShort(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ buffer.putShort(index, (short) value);
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ ensureAccessible();
+ _setMedium(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ setByte(index, (byte) (value >>> 16));
+ setByte(index + 1, (byte) (value >>> 8));
+ setByte(index + 2, (byte) value);
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ ensureAccessible();
+ _setInt(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ buffer.putInt(index, value);
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ ensureAccessible();
+ _setLong(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ buffer.putLong(index, value);
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ checkSrcIndex(index, length, srcIndex, src.capacity());
+ if (src.nioBufferCount() > 0) {
+ for (ByteBuffer bb: src.nioBuffers(srcIndex, length)) {
+ int bbLen = bb.remaining();
+ setBytes(index, bb);
+ index += bbLen;
+ }
+ } else {
+ src.getBytes(srcIndex, this, index, length);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ checkSrcIndex(index, length, srcIndex, src.length);
+ ByteBuffer tmpBuf = internalNioBuffer();
+ tmpBuf.clear().position(index).limit(index + length);
+ tmpBuf.put(src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ ensureAccessible();
+ ByteBuffer tmpBuf = internalNioBuffer();
+ if (src == tmpBuf) {
+ src = src.duplicate();
+ }
+
+ tmpBuf.clear().position(index).limit(index + src.remaining());
+ tmpBuf.put(src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ getBytes(index, out, length, false);
+ return this;
+ }
+
+ private void getBytes(int index, OutputStream out, int length, boolean internal) throws IOException {
+ ensureAccessible();
+ if (length == 0) {
+ return;
+ }
+
+ if (buffer.hasArray()) {
+ out.write(buffer.array(), index + buffer.arrayOffset(), length);
+ } else {
+ byte[] tmp = new byte[length];
+ ByteBuffer tmpBuf;
+ if (internal) {
+ tmpBuf = internalNioBuffer();
+ } else {
+ tmpBuf = buffer.duplicate();
+ }
+ tmpBuf.clear().position(index);
+ tmpBuf.get(tmp);
+ out.write(tmp);
+ }
+ }
+
+ @Override
+ public ByteBuf readBytes(OutputStream out, int length) throws IOException {
+ checkReadableBytes(length);
+ getBytes(readerIndex, out, length, true);
+ readerIndex += length;
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+ return getBytes(index, out, length, false);
+ }
+
+ private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
+ ensureAccessible();
+ if (length == 0) {
+ return 0;
+ }
+
+ ByteBuffer tmpBuf;
+ if (internal) {
+ tmpBuf = internalNioBuffer();
+ } else {
+ tmpBuf = buffer.duplicate();
+ }
+ tmpBuf.clear().position(index).limit(index + length);
+ return out.write(tmpBuf);
+ }
+
+ @Override
+ public int readBytes(GatheringByteChannel out, int length) throws IOException {
+ checkReadableBytes(length);
+ int readBytes = getBytes(readerIndex, out, length, true);
+ readerIndex += readBytes;
+ return readBytes;
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ ensureAccessible();
+ if (buffer.hasArray()) {
+ return in.read(buffer.array(), buffer.arrayOffset() + index, length);
+ } else {
+ byte[] tmp = new byte[length];
+ int readBytes = in.read(tmp);
+ if (readBytes <= 0) {
+ return readBytes;
+ }
+ ByteBuffer tmpBuf = internalNioBuffer();
+ tmpBuf.clear().position(index);
+ tmpBuf.put(tmp, 0, readBytes);
+ return readBytes;
+ }
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+ ensureAccessible();
+ ByteBuffer tmpBuf = internalNioBuffer();
+ tmpBuf.clear().position(index).limit(index + length);
+ try {
+ return in.read(tmpNioBuf);
+ } catch (ClosedChannelException ignored) {
+ return -1;
+ }
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return 1;
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return new ByteBuffer[] { nioBuffer(index, length) };
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ ensureAccessible();
+ ByteBuffer src;
+ try {
+ src = (ByteBuffer) buffer.duplicate().clear().position(index).limit(index + length);
+ } catch (IllegalArgumentException ignored) {
+ throw new IndexOutOfBoundsException("Too many bytes to read - Need " + (index + length));
+ }
+
+ return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ checkIndex(index, length);
+ return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
+ }
+
+ private ByteBuffer internalNioBuffer() {
+ ByteBuffer tmpNioBuf = this.tmpNioBuf;
+ if (tmpNioBuf == null) {
+ this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
+ }
+ return tmpNioBuf;
+ }
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ checkIndex(index, length);
+ return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice();
+ }
+
+ @Override
+ protected void deallocate() {
+ ByteBuffer buffer = this.buffer;
+ if (buffer == null) {
+ return;
+ }
+
+ this.buffer = null;
+
+ if (!doNotFree) {
+ freeDirect(this.bufholder);
+ }
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return null;
+ }
+}
diff --git a/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledUnsafeDirectByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledUnsafeDirectByteBuf.java
new file mode 100644
index 0000000000000..aed9ff0c6aae1
--- /dev/null
+++ b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledUnsafeDirectByteBuf.java
@@ -0,0 +1,542 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.buffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import org.apache.arrow.memory.OutOfMemoryException;
+
+import org.apache.mnemonic.CommonAllocator;
+import org.apache.mnemonic.MemBufferHolder;
+
+/**
+ * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)}
+ * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
+ * constructor explicitly.
+ */
+public class MnemonicUnpooledUnsafeDirectByteBuf> extends AbstractReferenceCountedByteBuf {
+
+ private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+ private final MnemonicUnpooledByteBufAllocator alloc;
+
+ private long memoryAddress;
+ private MemBufferHolder bufholder;
+ private ByteBuffer buffer;
+ private ByteBuffer tmpNioBuf;
+ private int capacity;
+ private boolean doNotFree;
+
+ /**
+ * Creates a new direct buffer.
+ *
+ * @param initialCapacity the initial capacity of the underlying direct buffer
+ * @param maxCapacity the maximum capacity of the underlying direct buffer
+ */
+ protected MnemonicUnpooledUnsafeDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
+ super(maxCapacity);
+ if (alloc == null) {
+ throw new NullPointerException("alloc");
+ }
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
+ }
+ if (maxCapacity < 0) {
+ throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
+ }
+ if (initialCapacity > maxCapacity) {
+ throw new IllegalArgumentException(String.format(
+ "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
+ }
+
+ this.alloc = alloc;
+
+ setByteBuffer(allocateDirect(initialCapacity));
+ }
+
+ /**
+ * Creates a new direct buffer by wrapping the specified initial buffer.
+ *
+ * @param maxCapacity the maximum capacity of the underlying direct buffer
+ */
+ protected MnemonicUnpooledUnsafeDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, MemBufferHolder initialBufHolder, int maxCapacity) {
+ super(maxCapacity);
+ if (alloc == null) {
+ throw new NullPointerException("alloc");
+ }
+ if (initialBufHolder == null || initialBufHolder.get() == null) {
+ throw new NullPointerException("initialBufHolder");
+ }
+ if (!initialBufHolder.get().isDirect()) {
+ throw new IllegalArgumentException("initialBufHolder is not a direct buffer.");
+ }
+ if (initialBufHolder.get().isReadOnly()) {
+ throw new IllegalArgumentException("initialBufHolder is a read-only buffer.");
+ }
+
+ int initialCapacity = initialBufHolder.get().remaining();
+ if (initialCapacity > maxCapacity) {
+ throw new IllegalArgumentException(String.format(
+ "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
+ }
+
+ this.alloc = alloc;
+ doNotFree = true;
+ setByteBuffer(initialBufHolder);
+ writerIndex(initialCapacity);
+ }
+
+ /**
+ * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
+ */
+ protected MemBufferHolder allocateDirect(int initialCapacity) {
+ MemBufferHolder mbholder = this.alloc.getAllocator().createBuffer(initialCapacity);
+ if (null == mbholder) {
+ throw new OutOfMemoryException("No more memory resource for this MnemonicUnpooledUnsafeDirectByteBuf instance");
+ }
+ return mbholder;
+ }
+
+ /**
+ * Free a direct {@link ByteBuffer}
+ */
+ protected void freeDirect(MemBufferHolder bufholder) {
+ if (this.bufholder != null && this.bufholder.get() != null) {
+ this.bufholder.destroy();
+ }
+ this.buffer = null;
+ this.bufholder = null;
+ }
+
+ private void setByteBuffer(MemBufferHolder bufholder) {
+ MemBufferHolder oldBufholder = this.bufholder;
+ if (oldBufholder != null) {
+ if (doNotFree) {
+ doNotFree = false;
+ } else {
+ freeDirect(oldBufholder);
+ }
+ }
+
+ this.bufholder = bufholder;
+ this.buffer = this.bufholder.get();
+ memoryAddress = PlatformDependent.directBufferAddress(this.buffer);
+ tmpNioBuf = null;
+ capacity = (int)this.bufholder.getSize();
+ }
+
+ @Override
+ public boolean isDirect() {
+ return true;
+ }
+
+ @Override
+ public int capacity() {
+ return capacity;
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ ensureAccessible();
+ if (newCapacity < 0 || newCapacity > maxCapacity()) {
+ throw new IllegalArgumentException("newCapacity: " + newCapacity);
+ }
+
+ int readerIndex = readerIndex();
+ int writerIndex = writerIndex();
+
+ int oldCapacity = capacity;
+ if (newCapacity > oldCapacity) {
+ ByteBuffer oldBuffer = buffer;
+ MemBufferHolder newBufholder = allocateDirect(newCapacity);
+ ByteBuffer newBuffer = newBufholder.get();
+ oldBuffer.position(0).limit(oldBuffer.capacity());
+ newBuffer.position(0).limit(oldBuffer.capacity());
+ newBuffer.put(oldBuffer);
+ newBuffer.clear();
+ setByteBuffer(newBufholder);
+ } else if (newCapacity < oldCapacity) {
+ ByteBuffer oldBuffer = buffer;
+ MemBufferHolder newBufholder = allocateDirect(newCapacity);
+ ByteBuffer newBuffer = newBufholder.get();
+ if (readerIndex < newCapacity) {
+ if (writerIndex > newCapacity) {
+ writerIndex(writerIndex = newCapacity);
+ }
+ oldBuffer.position(readerIndex).limit(writerIndex);
+ newBuffer.position(readerIndex).limit(writerIndex);
+ newBuffer.put(oldBuffer);
+ newBuffer.clear();
+ } else {
+ setIndex(newCapacity, newCapacity);
+ }
+ setByteBuffer(newBufholder);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return alloc;
+ }
+
+ @Override
+ public ByteOrder order() {
+ return buffer.order();
+ }
+
+ @Override
+ public boolean hasArray() {
+ return false;
+ }
+
+ @Override
+ public byte[] array() {
+ throw new UnsupportedOperationException("direct buffer");
+ }
+
+ @Override
+ public int arrayOffset() {
+ throw new UnsupportedOperationException("direct buffer");
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return true;
+ }
+
+ @Override
+ public long memoryAddress() {
+ ensureAccessible();
+ return memoryAddress;
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return PlatformDependent.getByte(addr(index));
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ short v = PlatformDependent.getShort(addr(index));
+ return NATIVE_ORDER? v : Short.reverseBytes(v);
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ long addr = addr(index);
+ return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+ (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+ PlatformDependent.getByte(addr + 2) & 0xff;
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ int v = PlatformDependent.getInt(addr(index));
+ return NATIVE_ORDER? v : Integer.reverseBytes(v);
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ long v = PlatformDependent.getLong(addr(index));
+ return NATIVE_ORDER? v : Long.reverseBytes(v);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ checkIndex(index, length);
+ if (dst == null) {
+ throw new NullPointerException("dst");
+ }
+ if (dstIndex < 0 || dstIndex > dst.capacity() - length) {
+ throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
+ }
+
+ if (dst.hasMemoryAddress()) {
+ PlatformDependent.copyMemory(addr(index), dst.memoryAddress() + dstIndex, length);
+ } else if (dst.hasArray()) {
+ PlatformDependent.copyMemory(addr(index), dst.array(), dst.arrayOffset() + dstIndex, length);
+ } else {
+ dst.setBytes(dstIndex, this, index, length);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ checkIndex(index, length);
+ if (dst == null) {
+ throw new NullPointerException("dst");
+ }
+ if (dstIndex < 0 || dstIndex > dst.length - length) {
+ throw new IndexOutOfBoundsException(String.format(
+ "dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.length));
+ }
+
+ if (length != 0) {
+ PlatformDependent.copyMemory(addr(index), dst, dstIndex, length);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ getBytes(index, dst, false);
+ return this;
+ }
+
+ private void getBytes(int index, ByteBuffer dst, boolean internal) {
+ checkIndex(index);
+ if (dst == null) {
+ throw new NullPointerException("dst");
+ }
+
+ int bytesToCopy = Math.min(capacity() - index, dst.remaining());
+ ByteBuffer tmpBuf;
+ if (internal) {
+ tmpBuf = internalNioBuffer();
+ } else {
+ tmpBuf = buffer.duplicate();
+ }
+ tmpBuf.clear().position(index).limit(index + bytesToCopy);
+ dst.put(tmpBuf);
+ }
+
+ @Override
+ public ByteBuf readBytes(ByteBuffer dst) {
+ int length = dst.remaining();
+ checkReadableBytes(length);
+ getBytes(readerIndex, dst, true);
+ readerIndex += length;
+ return this;
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ PlatformDependent.putByte(addr(index), (byte) value);
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ PlatformDependent.putShort(addr(index), NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value));
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ long addr = addr(index);
+ PlatformDependent.putByte(addr, (byte) (value >>> 16));
+ PlatformDependent.putByte(addr + 1, (byte) (value >>> 8));
+ PlatformDependent.putByte(addr + 2, (byte) value);
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ PlatformDependent.putInt(addr(index), NATIVE_ORDER ? value : Integer.reverseBytes(value));
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ PlatformDependent.putLong(addr(index), NATIVE_ORDER ? value : Long.reverseBytes(value));
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ checkIndex(index, length);
+ if (src == null) {
+ throw new NullPointerException("src");
+ }
+ if (srcIndex < 0 || srcIndex > src.capacity() - length) {
+ throw new IndexOutOfBoundsException("srcIndex: " + srcIndex);
+ }
+
+ if (length != 0) {
+ if (src.hasMemoryAddress()) {
+ PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr(index), length);
+ } else if (src.hasArray()) {
+ PlatformDependent.copyMemory(src.array(), src.arrayOffset() + srcIndex, addr(index), length);
+ } else {
+ src.getBytes(srcIndex, this, index, length);
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ checkIndex(index, length);
+ if (length != 0) {
+ PlatformDependent.copyMemory(src, srcIndex, addr(index), length);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ ensureAccessible();
+ ByteBuffer tmpBuf = internalNioBuffer();
+ if (src == tmpBuf) {
+ src = src.duplicate();
+ }
+
+ tmpBuf.clear().position(index).limit(index + src.remaining());
+ tmpBuf.put(src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ ensureAccessible();
+ if (length != 0) {
+ byte[] tmp = new byte[length];
+ PlatformDependent.copyMemory(addr(index), tmp, 0, length);
+ out.write(tmp);
+ }
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+ return getBytes(index, out, length, false);
+ }
+
+ private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
+ ensureAccessible();
+ if (length == 0) {
+ return 0;
+ }
+
+ ByteBuffer tmpBuf;
+ if (internal) {
+ tmpBuf = internalNioBuffer();
+ } else {
+ tmpBuf = buffer.duplicate();
+ }
+ tmpBuf.clear().position(index).limit(index + length);
+ return out.write(tmpBuf);
+ }
+
+ @Override
+ public int readBytes(GatheringByteChannel out, int length) throws IOException {
+ checkReadableBytes(length);
+ int readBytes = getBytes(readerIndex, out, length, true);
+ readerIndex += readBytes;
+ return readBytes;
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ checkIndex(index, length);
+ byte[] tmp = new byte[length];
+ int readBytes = in.read(tmp);
+ if (readBytes > 0) {
+ PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
+ }
+ return readBytes;
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+ ensureAccessible();
+ ByteBuffer tmpBuf = internalNioBuffer();
+ tmpBuf.clear().position(index).limit(index + length);
+ try {
+ return in.read(tmpBuf);
+ } catch (ClosedChannelException ignored) {
+ return -1;
+ }
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return 1;
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return new ByteBuffer[] { nioBuffer(index, length) };
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ checkIndex(index, length);
+ ByteBuf copy = alloc().directBuffer(length, maxCapacity());
+ if (length != 0) {
+ if (copy.hasMemoryAddress()) {
+ PlatformDependent.copyMemory(addr(index), copy.memoryAddress(), length);
+ copy.setIndex(0, length);
+ } else {
+ copy.writeBytes(this, index, length);
+ }
+ }
+ return copy;
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ checkIndex(index, length);
+ return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
+ }
+
+ private ByteBuffer internalNioBuffer() {
+ ByteBuffer tmpNioBuf = this.tmpNioBuf;
+ if (tmpNioBuf == null) {
+ this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
+ }
+ return tmpNioBuf;
+ }
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ checkIndex(index, length);
+ return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice();
+ }
+
+ @Override
+ protected void deallocate() {
+ ByteBuffer buffer = this.buffer;
+ if (buffer == null) {
+ return;
+ }
+
+ this.buffer = null;
+
+ if (!doNotFree) {
+ freeDirect(this.bufholder);
+ }
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return null;
+ }
+
+ long addr(int index) {
+ return memoryAddress + index;
+ }
+
+ @Override
+ protected SwappedByteBuf newSwappedByteBuf() {
+ return new UnsafeDirectSwappedByteBuf(this);
+ }
+}
diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index b6de2e3aa2acb..acec32fbc7ce0 100644
--- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -39,6 +39,9 @@ public class PooledByteBufAllocatorL {
private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
public final UnsafeDirectLittleEndian empty;
+
+ private static MnemonicUnpooledByteBufAllocator> mubballoc = null;
+
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
@@ -50,6 +53,17 @@ public PooledByteBufAllocatorL() {
empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
}
+ public static void setUpMnemonicUnpooledByteBufAllocator(MnemonicUnpooledByteBufAllocator> mubballocator) {
+ if (null == mubballocator) {
+ throw new RuntimeException("MnemonicUnpooledByteBufAllocator is null for setup");
+ }
+ mubballoc = mubballocator;
+ }
+
+ public static void clearMnemonicUnpooledByteBufAllocator() {
+ mubballoc = null;
+ }
+
public UnsafeDirectLittleEndian allocate(int size) {
try {
return allocator.directBuffer(size, Integer.MAX_VALUE);
@@ -156,9 +170,9 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
if (directArena != null) {
- if (initialCapacity > directArena.chunkSize) {
+ if (initialCapacity > directArena.chunkSize || null != mubballoc) {
// This is beyond chunk size so we'll allocate separately.
- ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+ ByteBuf buf = null != mubballoc ? mubballoc.directBuffer(initialCapacity, maxCapacity) : UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
hugeBufferSize.addAndGet(buf.capacity());
hugeBufferCount.incrementAndGet();
diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestMnemonicBackedBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestMnemonicBackedBaseAllocator.java
new file mode 100644
index 0000000000000..be08842ac41be
--- /dev/null
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestMnemonicBackedBaseAllocator.java
@@ -0,0 +1,688 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import org.apache.mnemonic.VolatileMemAllocator;
+import org.apache.mnemonic.Utils;
+
+import io.netty.buffer.ArrowBuf;
+import io.netty.buffer.ArrowBuf.TransferResult;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.MnemonicUnpooledByteBufAllocator;
+
+public class TestMnemonicBackedBaseAllocator {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);
+
+ private final static int MAX_ALLOCATION = 8 * 1024;
+
+ private final static long MNEMONIC_CAPACITY = 1024 * 1024 * 40;
+ private static VolatileMemAllocator bdmalloc;
+
+ @BeforeClass
+ public static void setupUpBeforeClass() throws Exception {
+ bdmalloc = new VolatileMemAllocator(
+ Utils.getVolatileMemoryAllocatorService("pmalloc"),
+ MNEMONIC_CAPACITY, "./base_allocator_test.dat");
+ MnemonicUnpooledByteBufAllocator mubba = new MnemonicUnpooledByteBufAllocator(true, bdmalloc);
+ PooledByteBufAllocatorL.setUpMnemonicUnpooledByteBufAllocator(mubba);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ PooledByteBufAllocatorL.clearMnemonicUnpooledByteBufAllocator();
+ if (null != bdmalloc) {
+ bdmalloc.close();
+ }
+ }
+
+/*
+ // ---------------------------------------- DEBUG -----------------------------------
+
+ @After
+ public void checkBuffers() {
+ final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+ if (bufferCount != 0) {
+ UnsafeDirectLittleEndian.logBuffers(logger);
+ UnsafeDirectLittleEndian.releaseBuffers();
+ }
+
+ assertEquals(0, bufferCount);
+ }
+
+// @AfterClass
+// public static void dumpBuffers() {
+// UnsafeDirectLittleEndian.logBuffers(logger);
+// }
+
+ // ---------------------------------------- DEBUG ------------------------------------
+*/
+
+
+ @Test
+ public void test_privateMax() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf1);
+
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("noLimits", 0, MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf2);
+ arrowBuf2.release();
+ }
+
+ arrowBuf1.release();
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRootAllocator_closeWithOutstanding() throws Exception {
+ try {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf = rootAllocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ }
+ } finally {
+ /*
+ * We expect there to be one unreleased underlying buffer because we're closing
+ * without releasing it.
+ */
+/*
+ // ------------------------------- DEBUG ---------------------------------
+ final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+ UnsafeDirectLittleEndian.releaseBuffers();
+ assertEquals(1, bufferCount);
+ // ------------------------------- DEBUG ---------------------------------
+*/
+ }
+ }
+
+ @Test
+ public void testRootAllocator_getEmpty() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf = rootAllocator.buffer(0);
+ assertNotNull("allocation failed", arrowBuf);
+ assertEquals("capacity was non-zero", 0, arrowBuf.capacity());
+ arrowBuf.release();
+ }
+ }
+
+ @Ignore // TODO(DRILL-2740)
+ @Test(expected = IllegalStateException.class)
+ public void testAllocator_unreleasedEmpty() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ @SuppressWarnings("unused")
+ final ArrowBuf arrowBuf = rootAllocator.buffer(0);
+ }
+ }
+
+ @Test
+ public void testAllocator_transferOwnership() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 =
+ rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 =
+ rootAllocator.newChildAllocator("changeOwnership2", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
+ rootAllocator.verify();
+ TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2);
+ assertEquiv(arrowBuf1, transferOwnership.buffer);
+ final boolean allocationFit = transferOwnership.allocationFit;
+ rootAllocator.verify();
+ assertTrue(allocationFit);
+
+ arrowBuf1.release();
+ childAllocator1.close();
+ rootAllocator.verify();
+
+ transferOwnership.buffer.release();
+ childAllocator2.close();
+ }
+ }
+
+ @Test
+ public void testAllocator_shareOwnership() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("shareOwnership1", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("shareOwnership2", 0, MAX_ALLOCATION);
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
+ rootAllocator.verify();
+
+ // share ownership of buffer.
+ final ArrowBuf arrowBuf2 = arrowBuf1.retain(childAllocator2);
+ rootAllocator.verify();
+ assertNotNull(arrowBuf2);
+ assertNotEquals(arrowBuf2, arrowBuf1);
+ assertEquiv(arrowBuf1, arrowBuf2);
+
+ // release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state)
+ arrowBuf1.release();
+ rootAllocator.verify();
+ childAllocator1.close();
+ rootAllocator.verify();
+
+ final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("shareOwnership3", 0, MAX_ALLOCATION);
+ final ArrowBuf arrowBuf3 = arrowBuf1.retain(childAllocator3);
+ assertNotNull(arrowBuf3);
+ assertNotEquals(arrowBuf3, arrowBuf1);
+ assertNotEquals(arrowBuf3, arrowBuf2);
+ assertEquiv(arrowBuf1, arrowBuf3);
+ rootAllocator.verify();
+
+ arrowBuf2.release();
+ rootAllocator.verify();
+ childAllocator2.close();
+ rootAllocator.verify();
+
+ arrowBuf3.release();
+ rootAllocator.verify();
+ childAllocator3.close();
+ }
+ }
+
+ @Test
+ public void testRootAllocator_createChildAndUse() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createChildAndUse", 0,
+ MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf = childAllocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ arrowBuf.release();
+ }
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRootAllocator_createChildDontClose() throws Exception {
+ try {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createChildDontClose", 0,
+ MAX_ALLOCATION);
+ final ArrowBuf arrowBuf = childAllocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ }
+ } finally {
+ /*
+ * We expect one underlying buffer because we closed a child allocator without
+ * releasing the buffer allocated from it.
+ */
+/*
+ // ------------------------------- DEBUG ---------------------------------
+ final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
+ UnsafeDirectLittleEndian.releaseBuffers();
+ assertEquals(1, bufferCount);
+ // ------------------------------- DEBUG ---------------------------------
+*/
+ }
+ }
+
+ private static void allocateAndFree(final BufferAllocator allocator) {
+ final ArrowBuf arrowBuf = allocator.buffer(512);
+ assertNotNull("allocation failed", arrowBuf);
+ arrowBuf.release();
+
+ final ArrowBuf arrowBuf2 = allocator.buffer(MAX_ALLOCATION);
+ assertNotNull("allocation failed", arrowBuf2);
+ arrowBuf2.release();
+
+ final int nBufs = 8;
+ final ArrowBuf[] arrowBufs = new ArrowBuf[nBufs];
+ for (int i = 0; i < arrowBufs.length; ++i) {
+ ArrowBuf arrowBufi = allocator.buffer(MAX_ALLOCATION / nBufs);
+ assertNotNull("allocation failed", arrowBufi);
+ arrowBufs[i] = arrowBufi;
+ }
+ for (ArrowBuf arrowBufi : arrowBufs) {
+ arrowBufi.release();
+ }
+ }
+
+ @Test
+ public void testAllocator_manyAllocations() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) {
+ allocateAndFree(childAllocator);
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_overAllocate() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) {
+ allocateAndFree(childAllocator);
+
+ try {
+ childAllocator.buffer(MAX_ALLOCATION + 1);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_overAllocateParent() throws Exception {
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator =
+ rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf1);
+ final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
+ assertNotNull("allocation failed", arrowBuf2);
+
+ try {
+ childAllocator.buffer(MAX_ALLOCATION / 4);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+
+ arrowBuf1.release();
+ arrowBuf2.release();
+ }
+ }
+ }
+
+ private static void testAllocator_sliceUpBufferAndRelease(
+ final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) {
+ final ArrowBuf arrowBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2);
+ rootAllocator.verify();
+
+ final ArrowBuf arrowBuf2 = arrowBuf1.slice(16, arrowBuf1.capacity() - 32);
+ rootAllocator.verify();
+ final ArrowBuf arrowBuf3 = arrowBuf2.slice(16, arrowBuf2.capacity() - 32);
+ rootAllocator.verify();
+ @SuppressWarnings("unused")
+ final ArrowBuf arrowBuf4 = arrowBuf3.slice(16, arrowBuf3.capacity() - 32);
+ rootAllocator.verify();
+
+ arrowBuf3.release(); // since they share refcounts, one is enough to release them all
+ rootAllocator.verify();
+ }
+
+ @Test
+ public void testAllocator_createSlices() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
+
+ try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+ }
+ rootAllocator.verify();
+
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
+
+ try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator2 =
+ childAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
+ final ArrowBuf arrowBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+ @SuppressWarnings("unused")
+ final ArrowBuf arrowBuf2 = arrowBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16);
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+ arrowBuf1.release();
+ rootAllocator.verify();
+ }
+ rootAllocator.verify();
+
+ testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
+ }
+ rootAllocator.verify();
+ }
+ }
+
+ @Test
+ public void testAllocator_sliceRanges() throws Exception {
+// final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges");
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ // Populate a buffer with byte values corresponding to their indices.
+ final ArrowBuf arrowBuf = rootAllocator.buffer(256);
+ assertEquals(256, arrowBuf.capacity());
+ assertEquals(0, arrowBuf.readerIndex());
+ assertEquals(0, arrowBuf.readableBytes());
+ assertEquals(0, arrowBuf.writerIndex());
+ assertEquals(256, arrowBuf.writableBytes());
+
+ final ArrowBuf slice3 = (ArrowBuf) arrowBuf.slice();
+ assertEquals(0, slice3.readerIndex());
+ assertEquals(0, slice3.readableBytes());
+ assertEquals(0, slice3.writerIndex());
+// assertEquals(256, slice3.capacity());
+// assertEquals(256, slice3.writableBytes());
+
+ for (int i = 0; i < 256; ++i) {
+ arrowBuf.writeByte(i);
+ }
+ assertEquals(0, arrowBuf.readerIndex());
+ assertEquals(256, arrowBuf.readableBytes());
+ assertEquals(256, arrowBuf.writerIndex());
+ assertEquals(0, arrowBuf.writableBytes());
+
+ final ArrowBuf slice1 = (ArrowBuf) arrowBuf.slice();
+ assertEquals(0, slice1.readerIndex());
+ assertEquals(256, slice1.readableBytes());
+ for (int i = 0; i < 10; ++i) {
+ assertEquals(i, slice1.readByte());
+ }
+ assertEquals(256 - 10, slice1.readableBytes());
+ for (int i = 0; i < 256; ++i) {
+ assertEquals((byte) i, slice1.getByte(i));
+ }
+
+ final ArrowBuf slice2 = arrowBuf.slice(25, 25);
+ assertEquals(0, slice2.readerIndex());
+ assertEquals(25, slice2.readableBytes());
+ for (int i = 25; i < 50; ++i) {
+ assertEquals(i, slice2.readByte());
+ }
+
+/*
+ for(int i = 256; i > 0; --i) {
+ slice3.writeByte(i - 1);
+ }
+ for(int i = 0; i < 256; ++i) {
+ assertEquals(255 - i, slice1.getByte(i));
+ }
+*/
+
+ arrowBuf.release(); // all the derived buffers share this fate
+ }
+ }
+
+ @Test
+ public void testAllocator_slicesOfSlices() throws Exception {
+// final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices");
+ try (final RootAllocator rootAllocator =
+ new RootAllocator(MAX_ALLOCATION)) {
+ // Populate a buffer with byte values corresponding to their indices.
+ final ArrowBuf arrowBuf = rootAllocator.buffer(256);
+ for (int i = 0; i < 256; ++i) {
+ arrowBuf.writeByte(i);
+ }
+
+ // Slice it up.
+ final ArrowBuf slice0 = arrowBuf.slice(0, arrowBuf.capacity());
+ for (int i = 0; i < 256; ++i) {
+ assertEquals((byte) i, arrowBuf.getByte(i));
+ }
+
+ final ArrowBuf slice10 = slice0.slice(10, arrowBuf.capacity() - 10);
+ for (int i = 10; i < 256; ++i) {
+ assertEquals((byte) i, slice10.getByte(i - 10));
+ }
+
+ final ArrowBuf slice20 = slice10.slice(10, arrowBuf.capacity() - 20);
+ for (int i = 20; i < 256; ++i) {
+ assertEquals((byte) i, slice20.getByte(i - 20));
+ }
+
+ final ArrowBuf slice30 = slice20.slice(10, arrowBuf.capacity() - 30);
+ for (int i = 30; i < 256; ++i) {
+ assertEquals((byte) i, slice30.getByte(i - 30));
+ }
+
+ arrowBuf.release();
+ }
+ }
+
+ @Test
+ public void testAllocator_transferSliced() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+ final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+
+ final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2);
+ final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2);
+
+ rootAllocator.verify();
+
+ TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1);
+ assertEquiv(arrowBuf2s, result1.buffer);
+ rootAllocator.verify();
+ TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2);
+ assertEquiv(arrowBuf1s, result2.buffer);
+ rootAllocator.verify();
+
+ result1.buffer.release();
+ result2.buffer.release();
+
+ arrowBuf1s.release(); // releases arrowBuf1
+ arrowBuf2s.release(); // releases arrowBuf2
+
+ childAllocator1.close();
+ childAllocator2.close();
+ }
+ }
+
+ @Test
+ public void testAllocator_shareSliced() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+ final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
+
+ final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2);
+ final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2);
+
+ rootAllocator.verify();
+
+ final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1);
+ assertEquiv(arrowBuf2s, arrowBuf2s1);
+ final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2);
+ assertEquiv(arrowBuf1s, arrowBuf1s2);
+ rootAllocator.verify();
+
+ arrowBuf1s.release(); // releases arrowBuf1
+ arrowBuf2s.release(); // releases arrowBuf2
+ rootAllocator.verify();
+
+ arrowBuf2s1.release(); // releases the shared arrowBuf2 slice
+ arrowBuf1s2.release(); // releases the shared arrowBuf1 slice
+
+ childAllocator1.close();
+ childAllocator2.close();
+ }
+ }
+
+ @Test
+ public void testAllocator_transferShared() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION);
+ final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION);
+
+ final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
+
+ boolean allocationFit;
+
+ ArrowBuf arrowBuf2 = arrowBuf1.retain(childAllocator2);
+ rootAllocator.verify();
+ assertNotNull(arrowBuf2);
+ assertNotEquals(arrowBuf2, arrowBuf1);
+ assertEquiv(arrowBuf1, arrowBuf2);
+
+ TransferResult result = arrowBuf1.transferOwnership(childAllocator3);
+ allocationFit = result.allocationFit;
+ final ArrowBuf arrowBuf3 = result.buffer;
+ assertTrue(allocationFit);
+ assertEquiv(arrowBuf1, arrowBuf3);
+ rootAllocator.verify();
+
+ // Since childAllocator3 now has childAllocator1's buffer, 1, can close
+ arrowBuf1.release();
+ childAllocator1.close();
+ rootAllocator.verify();
+
+ arrowBuf2.release();
+ childAllocator2.close();
+ rootAllocator.verify();
+
+ final BufferAllocator childAllocator4 = rootAllocator.newChildAllocator("transferShared4", 0, MAX_ALLOCATION);
+ TransferResult result2 = arrowBuf3.transferOwnership(childAllocator4);
+ allocationFit = result.allocationFit;
+ final ArrowBuf arrowBuf4 = result2.buffer;
+ assertTrue(allocationFit);
+ assertEquiv(arrowBuf3, arrowBuf4);
+ rootAllocator.verify();
+
+ arrowBuf3.release();
+ childAllocator3.close();
+ rootAllocator.verify();
+
+ arrowBuf4.release();
+ childAllocator4.close();
+ rootAllocator.verify();
+ }
+ }
+
+ @Test
+ public void testAllocator_unclaimedReservation() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator childAllocator1 =
+ rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) {
+ try (final AllocationReservation reservation = childAllocator1.newReservation()) {
+ assertTrue(reservation.add(64));
+ }
+ rootAllocator.verify();
+ }
+ }
+ }
+
+ @Test
+ public void testAllocator_claimedReservation() throws Exception {
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+
+ try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("claimedReservation", 0,
+ MAX_ALLOCATION)) {
+
+ try (final AllocationReservation reservation = childAllocator1.newReservation()) {
+ assertTrue(reservation.add(32));
+ assertTrue(reservation.add(32));
+
+ final ArrowBuf arrowBuf = reservation.allocateBuffer();
+ assertEquals(64, arrowBuf.capacity());
+ rootAllocator.verify();
+
+ arrowBuf.release();
+ rootAllocator.verify();
+ }
+ rootAllocator.verify();
+ }
+ }
+ }
+
+ @Test
+ public void multiple() throws Exception {
+ final String owner = "test";
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+
+ final int op = 100000;
+
+ BufferAllocator frag1 = allocator.newChildAllocator(owner, 1500000, Long.MAX_VALUE);
+ BufferAllocator frag2 = allocator.newChildAllocator(owner, 500000, Long.MAX_VALUE);
+
+ allocator.verify();
+
+ BufferAllocator allocator11 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b11 = allocator11.buffer(1000000);
+
+ allocator.verify();
+
+ BufferAllocator allocator12 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b12 = allocator12.buffer(500000);
+
+ allocator.verify();
+
+ BufferAllocator allocator21 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
+
+ allocator.verify();
+
+ BufferAllocator allocator22 = frag2.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b22 = allocator22.buffer(2000000);
+
+ allocator.verify();
+
+ BufferAllocator frag3 = allocator.newChildAllocator(owner, 1000000, Long.MAX_VALUE);
+
+ allocator.verify();
+
+ BufferAllocator allocator31 = frag3.newChildAllocator(owner, op, Long.MAX_VALUE);
+ ArrowBuf b31a = allocator31.buffer(200000);
+
+ allocator.verify();
+
+ // Previously running operator completes
+ b22.release();
+
+ allocator.verify();
+
+ allocator22.close();
+
+ b31a.release();
+ allocator31.close();
+
+ b12.release();
+ allocator12.close();
+
+ allocator21.close();
+
+ b11.release();
+ allocator11.close();
+
+ frag1.close();
+ frag2.close();
+ frag3.close();
+
+ }
+ }
+
+ public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
+ assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
+ assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 0a0f2e0ce8f65..4d664deb17933 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -35,9 +35,11 @@
4.0.49.Final
2.7.9
2.7.1
+ 0.9.0-incubating-SNAPSHOT
1.2.0-3f79e055
2
false
+ ${session.executionRootDirectory}/target/service-dist
@@ -80,7 +82,13 @@
-
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.4.0.Final
+
+
org.apache.rat
@@ -272,7 +280,6 @@
-
org.apache.maven.plugins
maven-checkstyle-plugin
@@ -333,6 +340,63 @@
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 2.10
+
+
+ copy
+ package
+
+ copy
+
+
+
+
+ org.apache.mnemonic
+ mnemonic-nvml-vmem-service
+ ${dep.mnemonic.version}
+ ${os.detected.classifier}
+ jar
+
+
+ org.apache.mnemonic
+ mnemonic-sys-vmem-service
+ ${dep.mnemonic.version}
+ ${os.detected.classifier}
+ jar
+
+
+ org.apache.mnemonic
+ mnemonic-nvml-pmem-service
+ ${dep.mnemonic.version}
+ ${os.detected.classifier}
+ jar
+
+
+ org.apache.mnemonic
+ mnemonic-pmalloc-service
+ ${dep.mnemonic.version}
+ ${os.detected.classifier}
+ jar
+
+
+ org.apache.mnemonic
+ mnemonic-utilities-service
+ ${dep.mnemonic.version}
+ ${os.detected.classifier}
+ jar
+
+
+ ${service.dist.dir}
+ false
+ false
+ true
+
+
+
+
@@ -374,6 +438,7 @@
-Darrow.vector.max_allocation_bytes=1048576
+ -Djava.ext.dirs=${service.dist.dir}
@@ -524,6 +589,11 @@
slf4j-api
${dep.slf4j.version}
+
+ org.apache.mnemonic
+ mnemonic-core
+ ${dep.mnemonic.version}
+
@@ -588,7 +658,6 @@
0.9.44
test
-
diff --git a/java/vector/pom.xml b/java/vector/pom.xml
index 46e06aa1e3f97..1cd58853a55cf 100644
--- a/java/vector/pom.xml
+++ b/java/vector/pom.xml
@@ -82,6 +82,10 @@
org.slf4j
slf4j-api
+
+ org.apache.mnemonic
+ mnemonic-core
+
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestMnemonicBackedValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestMnemonicBackedValueVector.java
new file mode 100644
index 0000000000000..bedacb19fc6f9
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestMnemonicBackedValueVector.java
@@ -0,0 +1,1616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector;
+import org.apache.arrow.vector.holders.VarCharHolder;
+import org.apache.arrow.vector.util.OversizedAllocationException;
+
+import static org.apache.arrow.vector.TestUtils.newNullableVarBinaryVector;
+import static org.apache.arrow.vector.TestUtils.newNullableVarCharVector;
+import static org.apache.arrow.vector.TestUtils.newVector;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.schema.TypeLayout;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.TransferPair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import org.apache.mnemonic.VolatileMemAllocator;
+import org.apache.mnemonic.Utils;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.MnemonicUnpooledByteBufAllocator;
+import io.netty.buffer.ArrowBuf;
+
+
+public class TestMnemonicBackedValueVector{
+
+ private final static String EMPTY_SCHEMA_PATH = "";
+
+ private BufferAllocator allocator;
+
+ private final static long MNEMONIC_CAPACITY = 1024 * 1024 * 1024 * 1;
+ private static VolatileMemAllocator bdmalloc;
+
+ @BeforeClass
+ public static void setupUpBeforeClass() throws Exception {
+ bdmalloc = new VolatileMemAllocator(
+ Utils.getVolatileMemoryAllocatorService("pmalloc"),
+ MNEMONIC_CAPACITY, "./value_vector_test.dat");
+ MnemonicUnpooledByteBufAllocator mubba = new MnemonicUnpooledByteBufAllocator(true, bdmalloc);
+ PooledByteBufAllocatorL.setUpMnemonicUnpooledByteBufAllocator(mubba);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ PooledByteBufAllocatorL.clearMnemonicUnpooledByteBufAllocator();
+ if (null != bdmalloc) {
+ bdmalloc.close();
+ }
+ }
+
+ @Before
+ public void init() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ private final static Charset utf8Charset = Charset.forName("UTF-8");
+ private final static byte[] STR1 = "AAAAA1".getBytes(utf8Charset);
+ private final static byte[] STR2 = "BBBBBBBBB2".getBytes(utf8Charset);
+ private final static byte[] STR3 = "CCCC3".getBytes(utf8Charset);
+ private final static byte[] STR4 = "DDDDDDDD4".getBytes(utf8Charset);
+ private final static byte[] STR5 = "EEE5".getBytes(utf8Charset);
+ private final static byte[] STR6 = "FFFFF6".getBytes(utf8Charset);
+ private final static int MAX_VALUE_COUNT =
+ Integer.getInteger("arrow.vector.max_allocation_bytes", Integer.MAX_VALUE)/4;
+ private final static int MAX_VALUE_COUNT_8BYTE = MAX_VALUE_COUNT/2;
+
+ @After
+ public void terminate() throws Exception {
+ allocator.close();
+ }
+
+ /*
+ * Tests for Fixed-Width vectors
+ *
+ * Covered types as of now
+ *
+ * -- UInt4Vector
+ * -- IntVector
+ * -- Float4Vector
+ * -- Float8Vector
+ *
+ * -- NullableUInt4Vector
+ * -- NullableIntVector
+ * -- NullableFloat4Vector
+ *
+ * TODO:
+ *
+ * -- SmallIntVector
+ * -- BigIntVector
+ * -- TinyIntVector
+ */
+
+ @Test /* UInt4Vector */
+ public void testFixedType1() {
+
+ // Create a new value vector for 1024 integers.
+ try (final UInt4Vector vector = new UInt4Vector(EMPTY_SCHEMA_PATH, allocator)) {
+
+ boolean error = false;
+ int initialCapacity = 0;
+ final UInt4Vector.Mutator mutator = vector.getMutator();
+ final UInt4Vector.Accessor accessor = vector.getAccessor();
+
+ vector.allocateNew(1024);
+ initialCapacity = vector.getValueCapacity();
+ assertEquals(1024, initialCapacity);
+
+ // Put and set a few values
+ mutator.setSafe(0, 100);
+ mutator.setSafe(1, 101);
+ mutator.setSafe(100, 102);
+ mutator.setSafe(1022, 103);
+ mutator.setSafe(1023, 104);
+
+ assertEquals(100, accessor.get(0));
+ assertEquals(101, accessor.get(1));
+ assertEquals(102, accessor.get(100));
+ assertEquals(103, accessor.get(1022));
+ assertEquals(104, accessor.get(1023));
+
+ try {
+ mutator.set(1024, 10000);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ try {
+ accessor.get(1024);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* this should trigger a realloc() */
+ mutator.setSafe(1024, 10000);
+
+ /* underlying buffer should now be able to store double the number of values */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* check vector data after realloc */
+ assertEquals(100, accessor.get(0));
+ assertEquals(101, accessor.get(1));
+ assertEquals(102, accessor.get(100));
+ assertEquals(103, accessor.get(1022));
+ assertEquals(104, accessor.get(1023));
+ assertEquals(10000, accessor.get(1024));
+
+ /* reset the vector */
+ vector.reset();
+
+ /* capacity shouldn't change after reset */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* vector data should have been zeroed out */
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i));
+ }
+ }
+ }
+
+ @Test /* IntVector */
+ public void testFixedType2() {
+ try (final IntVector intVector = new IntVector(EMPTY_SCHEMA_PATH, allocator)) {
+ final IntVector.Mutator mutator = intVector.getMutator();
+ final IntVector.Accessor accessor = intVector.getAccessor();
+ boolean error = false;
+ int initialCapacity = 16;
+
+ /* we should not throw exception for these values of capacity */
+ intVector.setInitialCapacity(MAX_VALUE_COUNT - 1);
+ intVector.setInitialCapacity(MAX_VALUE_COUNT);
+
+ try {
+ intVector.setInitialCapacity(MAX_VALUE_COUNT + 1);
+ }
+ catch (OversizedAllocationException oe) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ intVector.setInitialCapacity(initialCapacity);
+ /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */
+ assertEquals(0, intVector.getValueCapacity());
+
+ /* allocate 64 bytes (16 * 4) */
+ intVector.allocateNew();
+ /* underlying buffer should be able to store 16 values */
+ assertEquals(initialCapacity, intVector.getValueCapacity());
+
+ /* populate the vector */
+ int j = 1;
+ for(int i = 0; i < 16; i += 2) {
+ mutator.set(i, j);
+ j++;
+ }
+
+ try {
+ mutator.set(16, 9);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* check vector contents */
+ j = 1;
+ for(int i = 0; i < 16; i += 2) {
+ assertEquals("unexpected value at index: " + i, j, accessor.get(i));
+ j++;
+ }
+
+ try {
+ accessor.get(16);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* this should trigger a realloc() */
+ mutator.setSafe(16, 9);
+
+ /* underlying buffer should now be able to store double the number of values */
+ assertEquals(initialCapacity * 2, intVector.getValueCapacity());
+
+ /* vector data should still be intact after realloc */
+ j = 1;
+ for(int i = 0; i <= 16; i += 2) {
+ assertEquals("unexpected value at index: " + i, j, accessor.get(i));
+ j++;
+ }
+
+ /* reset the vector */
+ intVector.reset();
+
+ /* capacity shouldn't change after reset */
+ assertEquals(initialCapacity * 2, intVector.getValueCapacity());
+
+ /* vector data should have been zeroed out */
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i));
+ }
+ }
+ }
+
+ @Test /* Float4Vector */
+ public void testFixedType3() {
+ try (final Float4Vector floatVector = new Float4Vector(EMPTY_SCHEMA_PATH, allocator)) {
+ final Float4Vector.Mutator mutator = floatVector.getMutator();
+ final Float4Vector.Accessor accessor = floatVector.getAccessor();
+ boolean error = false;
+ int initialCapacity = 16;
+
+ /* we should not throw exception for these values of capacity */
+ floatVector.setInitialCapacity(MAX_VALUE_COUNT - 1);
+ floatVector.setInitialCapacity(MAX_VALUE_COUNT);
+
+ try {
+ floatVector.setInitialCapacity(MAX_VALUE_COUNT + 1);
+ }
+ catch (OversizedAllocationException oe) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ floatVector.setInitialCapacity(initialCapacity);
+ /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */
+ assertEquals(0, floatVector.getValueCapacity());
+
+ /* allocate 64 bytes (16 * 4) */
+ floatVector.allocateNew();
+ /* underlying buffer should be able to store 16 values */
+ assertEquals(initialCapacity, floatVector.getValueCapacity());
+
+ floatVector.zeroVector();
+
+ /* populate the vector */
+ mutator.set(0, 1.5f);
+ mutator.set(2, 2.5f);
+ mutator.set(4, 3.3f);
+ mutator.set(6, 4.8f);
+ mutator.set(8, 5.6f);
+ mutator.set(10, 6.6f);
+ mutator.set(12, 7.8f);
+ mutator.set(14, 8.5f);
+
+ try {
+ mutator.set(16, 9.5f);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* check vector contents */
+ assertEquals(1.5f, accessor.get(0), 0);
+ assertEquals(2.5f, accessor.get(2), 0);
+ assertEquals(3.3f, accessor.get(4), 0);
+ assertEquals(4.8f, accessor.get(6), 0);
+ assertEquals(5.6f, accessor.get(8), 0);
+ assertEquals(6.6f, accessor.get(10), 0);
+ assertEquals(7.8f, accessor.get(12), 0);
+ assertEquals(8.5f, accessor.get(14), 0);
+
+ try {
+ accessor.get(16);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* this should trigger a realloc() */
+ mutator.setSafe(16, 9.5f);
+
+ /* underlying buffer should now be able to store double the number of values */
+ assertEquals(initialCapacity * 2, floatVector.getValueCapacity());
+
+ /* vector data should still be intact after realloc */
+ assertEquals(1.5f, accessor.get(0), 0);
+ assertEquals(2.5f, accessor.get(2), 0);
+ assertEquals(3.3f, accessor.get(4), 0);
+ assertEquals(4.8f, accessor.get(6), 0);
+ assertEquals(5.6f, accessor.get(8), 0);
+ assertEquals(6.6f, accessor.get(10), 0);
+ assertEquals(7.8f, accessor.get(12), 0);
+ assertEquals(8.5f, accessor.get(14), 0);
+ assertEquals(9.5f, accessor.get(16), 0);
+
+ /* reset the vector */
+ floatVector.reset();
+
+ /* capacity shouldn't change after reset */
+ assertEquals(initialCapacity * 2, floatVector.getValueCapacity());
+
+ /* vector data should be zeroed out */
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i), 0);
+ }
+ }
+ }
+
+ @Test /* Float8Vector */
+ public void testFixedType4() {
+ try (final Float8Vector floatVector = new Float8Vector(EMPTY_SCHEMA_PATH, allocator)) {
+ final Float8Vector.Mutator mutator = floatVector.getMutator();
+ final Float8Vector.Accessor accessor = floatVector.getAccessor();
+ boolean error = false;
+ int initialCapacity = 16;
+
+ /* we should not throw exception for these values of capacity */
+ floatVector.setInitialCapacity(MAX_VALUE_COUNT_8BYTE - 1);
+ floatVector.setInitialCapacity(MAX_VALUE_COUNT_8BYTE);
+
+ try {
+ floatVector.setInitialCapacity(MAX_VALUE_COUNT_8BYTE + 1);
+ }
+ catch (OversizedAllocationException oe) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ floatVector.setInitialCapacity(initialCapacity);
+ /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */
+ assertEquals(0, floatVector.getValueCapacity());
+
+ /* allocate 128 bytes (16 * 8) */
+ floatVector.allocateNew();
+ /* underlying buffer should be able to store 16 values */
+ assertEquals(initialCapacity, floatVector.getValueCapacity());
+
+ /* populate the vector */
+ mutator.set(0, 1.55);
+ mutator.set(2, 2.53);
+ mutator.set(4, 3.36);
+ mutator.set(6, 4.82);
+ mutator.set(8, 5.67);
+ mutator.set(10, 6.67);
+ mutator.set(12, 7.87);
+ mutator.set(14, 8.56);
+
+ try {
+ mutator.set(16, 9.53);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* check vector contents */
+ assertEquals(1.55, accessor.get(0), 0);
+ assertEquals(2.53, accessor.get(2), 0);
+ assertEquals(3.36, accessor.get(4), 0);
+ assertEquals(4.82, accessor.get(6), 0);
+ assertEquals(5.67, accessor.get(8), 0);
+ assertEquals(6.67, accessor.get(10), 0);
+ assertEquals(7.87, accessor.get(12), 0);
+ assertEquals(8.56, accessor.get(14), 0);
+
+ try {
+ accessor.get(16);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* this should trigger a realloc() */
+ mutator.setSafe(16, 9.53);
+
+ /* underlying buffer should now be able to store double the number of values */
+ assertEquals(initialCapacity * 2, floatVector.getValueCapacity());
+
+ /* vector data should still be intact after realloc */
+ assertEquals(1.55, accessor.get(0), 0);
+ assertEquals(2.53, accessor.get(2), 0);
+ assertEquals(3.36, accessor.get(4), 0);
+ assertEquals(4.82, accessor.get(6), 0);
+ assertEquals(5.67, accessor.get(8), 0);
+ assertEquals(6.67, accessor.get(10), 0);
+ assertEquals(7.87, accessor.get(12), 0);
+ assertEquals(8.56, accessor.get(14), 0);
+ assertEquals(9.53, accessor.get(16), 0);
+
+ /* reset the vector */
+ floatVector.reset();
+
+ /* capacity shouldn't change after reset */
+ assertEquals(initialCapacity * 2, floatVector.getValueCapacity());
+
+ /* vector data should be zeroed out */
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i), 0);
+ }
+ }
+ }
+
+ @Test /* NullableUInt4Vector */
+ public void testNullableFixedType1() {
+
+ // Create a new value vector for 1024 integers.
+ try (final NullableUInt4Vector vector = newVector(NullableUInt4Vector.class, EMPTY_SCHEMA_PATH, new ArrowType.Int(32, false), allocator);) {
+ final NullableUInt4Vector.Mutator mutator = vector.getMutator();
+ final NullableUInt4Vector.Accessor accessor = vector.getAccessor();
+ boolean error = false;
+ int initialCapacity = 1024;
+
+ vector.setInitialCapacity(initialCapacity);
+ /* no memory allocation has happened yet */
+ assertEquals(0, vector.getValueCapacity());
+
+ vector.allocateNew();
+ assertEquals(initialCapacity, vector.getValueCapacity());
+
+ // Put and set a few values
+ mutator.set(0, 100);
+ mutator.set(1, 101);
+ mutator.set(100, 102);
+ mutator.set(1022, 103);
+ mutator.set(1023, 104);
+
+ /* check vector contents */
+ assertEquals(100, accessor.get(0));
+ assertEquals(101, accessor.get(1));
+ assertEquals(102, accessor.get(100));
+ assertEquals(103, accessor.get(1022));
+ assertEquals(104, accessor.get(1023));
+
+ int val = 0;
+
+ /* check unset bits/null values */
+ for (int i = 2, j = 101; i <= 99 || j <= 1021; i++, j++) {
+ if (i <= 99) {
+ assertTrue(accessor.isNull(i));
+ }
+ if(j <= 1021) {
+ assertTrue(accessor.isNull(j));
+ }
+ }
+
+ try {
+ mutator.set(1024, 10000);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ try {
+ accessor.get(1024);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* should trigger a realloc of the underlying bitvector and valuevector */
+ mutator.setSafe(1024, 10000);
+
+ /* check new capacity */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* vector contents should still be intact after realloc */
+ assertEquals(100, accessor.get(0));
+ assertEquals(101, accessor.get(1));
+ assertEquals(102, accessor.get(100));
+ assertEquals(103, accessor.get(1022));
+ assertEquals(104, accessor.get(1023));
+ assertEquals(10000, accessor.get(1024));
+
+ val = 0;
+
+ /* check unset bits/null values */
+ for (int i = 2, j = 101; i < 99 || j < 1021; i++, j++) {
+ if (i <= 99) {
+ assertTrue(accessor.isNull(i));
+ }
+ if(j <= 1021) {
+ assertTrue(accessor.isNull(j));
+ }
+ }
+
+ /* reset the vector */
+ vector.reset();
+
+ /* capacity shouldn't change after reset */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* vector data should be zeroed out */
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+ }
+ }
+ }
+
+ @Test /* NullableFloat4Vector */
+ public void testNullableFixedType2() {
+ // Create a new value vector for 1024 integers
+ try (final NullableFloat4Vector vector = newVector(NullableFloat4Vector.class, EMPTY_SCHEMA_PATH, MinorType.FLOAT4, allocator);) {
+ final NullableFloat4Vector.Mutator mutator = vector.getMutator();
+ final NullableFloat4Vector.Accessor accessor = vector.getAccessor();
+ boolean error = false;
+ int initialCapacity = 16;
+
+ vector.setInitialCapacity(initialCapacity);
+ /* no memory allocation has happened yet */
+ assertEquals(0, vector.getValueCapacity());
+
+ vector.allocateNew();
+ assertEquals(initialCapacity, vector.getValueCapacity());
+
+ /* populate the vector */
+ mutator.set(0, 100.5f);
+ mutator.set(2, 201.5f);
+ mutator.set(4, 300.3f);
+ mutator.set(6, 423.8f);
+ mutator.set(8, 555.6f);
+ mutator.set(10, 66.6f);
+ mutator.set(12, 78.8f);
+ mutator.set(14, 89.5f);
+
+ try {
+ mutator.set(16, 90.5f);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* check vector contents */
+ assertEquals(100.5f, accessor.get(0), 0);
+ assertTrue(accessor.isNull(1));
+ assertEquals(201.5f, accessor.get(2), 0);
+ assertTrue(accessor.isNull(3));
+ assertEquals(300.3f, accessor.get(4), 0);
+ assertTrue(accessor.isNull(5));
+ assertEquals(423.8f, accessor.get(6), 0);
+ assertTrue(accessor.isNull(7));
+ assertEquals(555.6f, accessor.get(8), 0);
+ assertTrue(accessor.isNull(9));
+ assertEquals(66.6f, accessor.get(10), 0);
+ assertTrue(accessor.isNull(11));
+ assertEquals(78.8f, accessor.get(12), 0);
+ assertTrue(accessor.isNull(13));
+ assertEquals(89.5f, accessor.get(14), 0);
+ assertTrue(accessor.isNull(15));
+
+ try {
+ accessor.get(16);
+ }
+ catch (IndexOutOfBoundsException ie) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ error = false;
+ }
+
+ /* this should trigger a realloc() */
+ mutator.setSafe(16, 90.5f);
+
+ /* underlying buffer should now be able to store double the number of values */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* vector data should still be intact after realloc */
+ assertEquals(100.5f, accessor.get(0), 0);
+ assertTrue(accessor.isNull(1));
+ assertEquals(201.5f, accessor.get(2), 0);
+ assertTrue(accessor.isNull(3));
+ assertEquals(300.3f, accessor.get(4), 0);
+ assertTrue(accessor.isNull(5));
+ assertEquals(423.8f, accessor.get(6), 0);
+ assertTrue(accessor.isNull(7));
+ assertEquals(555.6f, accessor.get(8), 0);
+ assertTrue(accessor.isNull(9));
+ assertEquals(66.6f, accessor.get(10), 0);
+ assertTrue(accessor.isNull(11));
+ assertEquals(78.8f, accessor.get(12), 0);
+ assertTrue(accessor.isNull(13));
+ assertEquals(89.5f, accessor.get(14), 0);
+ assertTrue(accessor.isNull(15));
+ assertEquals(90.5f, accessor.get(16), 0);
+
+ /* reset the vector */
+ vector.reset();
+
+ /* capacity shouldn't change after reset */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* vector data should be zeroed out */
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+ }
+ }
+ }
+
+ @Test /* NullableIntVector */
+ public void testNullableFixedType3() {
+ // Create a new value vector for 1024 integers
+ try (final NullableIntVector vector = newVector(NullableIntVector.class, EMPTY_SCHEMA_PATH, MinorType.INT, allocator)) {
+ final NullableIntVector.Mutator mutator = vector.getMutator();
+ final NullableIntVector.Accessor accessor = vector.getAccessor();
+ boolean error = false;
+ int initialCapacity = 1024;
+
+ /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */
+ assertEquals(0, vector.getValueCapacity());
+ /* allocate space for 4KB data (1024 * 4) */
+ vector.allocateNew(initialCapacity);
+ /* underlying buffer should be able to store 16 values */
+ assertEquals(initialCapacity, vector.getValueCapacity());
+
+ mutator.set(0, 1);
+ mutator.set(1, 2);
+ mutator.set(100, 3);
+ mutator.set(1022, 4);
+ mutator.set(1023, 5);
+
+ /* check vector contents */
+ int j = 1;
+ for(int i = 0; i <= 1023; i++) {
+ if((i >= 2 && i <= 99) || (i >= 101 && i <= 1021)) {
+ assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+ }
+ else {
+ assertFalse("null data not expected at index: " + i, accessor.isNull(i));
+ assertEquals("unexpected value at index: " + i, j, accessor.get(i));
+ j++;
+ }
+ }
+
+ mutator.setValueCount(1024);
+ Field field = vector.getField();
+ TypeLayout typeLayout = field.getTypeLayout();
+
+ List buffers = vector.getFieldBuffers();
+
+ assertEquals(2, typeLayout.getVectors().size());
+ assertEquals(2, buffers.size());
+
+ ArrowBuf validityVectorBuf = buffers.get(0);
+
+ /* bitvector tracks 1024 integers --> 1024 bits --> 128 bytes */
+ assertEquals(128, validityVectorBuf.readableBytes());
+ assertEquals(3, validityVectorBuf.getByte(0)); // 1st and second bit defined
+ for (int i = 1; i < 12; i++) {
+ assertEquals(0, validityVectorBuf.getByte(i)); // nothing defined until 100
+ }
+ assertEquals(16, validityVectorBuf.getByte(12)); // 100th bit is defined (12 * 8 + 4)
+ for (int i = 13; i < 127; i++) {
+ assertEquals(0, validityVectorBuf.getByte(i)); // nothing defined between 100th and 1022nd
+ }
+ assertEquals(-64, validityVectorBuf.getByte(127)); // 1022nd and 1023rd bit defined
+
+ /* this should trigger a realloc() */
+ mutator.setSafe(1024, 6);
+
+ /* underlying buffer should now be able to store double the number of values */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* vector data should still be intact after realloc */
+ j = 1;
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ if((i > 1024) || (i >= 2 && i <= 99) || (i >= 101 && i <= 1021)) {
+ assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+ }
+ else {
+ assertFalse("null data not expected at index: " + i, accessor.isNull(i));
+ assertEquals("unexpected value at index: " + i, j, accessor.get(i));
+ j++;
+ }
+ }
+
+ /* reset the vector */
+ vector.reset();
+
+ /* capacity shouldn't change after reset */
+ assertEquals(initialCapacity * 2, vector.getValueCapacity());
+
+ /* vector data should have been zeroed out */
+ for(int i = 0; i < (initialCapacity * 2); i++) {
+ assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+ }
+
+ vector.allocateNew(4096);
+ // vector has been erased
+ for(int i = 0; i < 4096; i++) {
+ assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+ }
+ }
+ }
+
+ /*
+ * Tests for Variable Width Vectors
+ *
+ * Covered types as of now
+ *
+ * -- NullableVarCharVector
+ * -- NullableVarBinaryVector
+ *
+ * TODO:
+ *
+ * -- VarCharVector
+ * -- VarBinaryVector
+ */
+
+ @Test /* NullableVarCharVector */
+ public void testNullableVarType1() {
+
+ // Create a new value vector for 1024 integers.
+ try (final NullableVarCharVector vector = newNullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
+ final NullableVarCharVector.Mutator m = vector.getMutator();
+ vector.allocateNew(1024 * 10, 1024);
+
+ m.set(0, STR1);
+ m.set(1, STR2);
+ m.set(2, STR3);
+ m.setSafe(3, STR3, 1, STR3.length - 1);
+ m.setSafe(4, STR3, 2, STR3.length - 2);
+ ByteBuffer STR3ByteBuffer = ByteBuffer.wrap(STR3);
+ m.setSafe(5, STR3ByteBuffer, 1, STR3.length - 1);
+ m.setSafe(6, STR3ByteBuffer, 2, STR3.length - 2);
+
+ // Check the sample strings.
+ final NullableVarCharVector.Accessor accessor = vector.getAccessor();
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(3));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(4));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(5));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(6));
+
+ // Ensure null value throws.
+ boolean b = false;
+ try {
+ vector.getAccessor().get(7);
+ } catch (IllegalStateException e) {
+ b = true;
+ } finally {
+ assertTrue(b);
+ }
+ }
+ }
+
+ @Test /* NullableVarBinaryVector */
+ public void testNullableVarType2() {
+
+ // Create a new value vector for 1024 integers.
+ try (final NullableVarBinaryVector vector = newNullableVarBinaryVector(EMPTY_SCHEMA_PATH, allocator)) {
+ final NullableVarBinaryVector.Mutator m = vector.getMutator();
+ vector.allocateNew(1024 * 10, 1024);
+
+ m.set(0, STR1);
+ m.set(1, STR2);
+ m.set(2, STR3);
+ m.setSafe(3, STR3, 1, STR3.length - 1);
+ m.setSafe(4, STR3, 2, STR3.length - 2);
+ ByteBuffer STR3ByteBuffer = ByteBuffer.wrap(STR3);
+ m.setSafe(5, STR3ByteBuffer, 1, STR3.length - 1);
+ m.setSafe(6, STR3ByteBuffer, 2, STR3.length - 2);
+
+ // Check the sample strings.
+ final NullableVarBinaryVector.Accessor accessor = vector.getAccessor();
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(3));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(4));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(5));
+ assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(6));
+
+ // Ensure null value throws.
+ boolean b = false;
+ try {
+ vector.getAccessor().get(7);
+ } catch (IllegalStateException e) {
+ b = true;
+ } finally {
+ assertTrue(b);
+ }
+ }
+ }
+
+
+ /*
+ * generic tests
+ *
+ * -- lastSet() and setValueCount()
+ * -- fillEmpties()
+ * -- VectorLoader and VectorUnloader
+ * -- some realloc tests
+ *
+ * TODO:
+ *
+ * The realloc() related tests below should be moved up and we need to
+ * add realloc related tests (edge cases) for more vector types.
+ */
+
+ @Test /* Float8Vector */
+ public void testReallocAfterVectorTransfer1() {
+ try (final Float8Vector vector = new Float8Vector(EMPTY_SCHEMA_PATH, allocator)) {
+ final Float8Vector.Mutator mutator = vector.getMutator();
+ final Float8Vector.Accessor accessor = vector.getAccessor();
+ final int initialDefaultCapacity = 4096;
+ boolean error = false;
+
+ /* use the default capacity; 4096*8 => 32KB */
+ vector.allocateNew();
+
+ assertEquals(initialDefaultCapacity, vector.getValueCapacity());
+
+ double baseValue = 100.375;
+
+ for (int i = 0; i < initialDefaultCapacity; i++) {
+ mutator.setSafe(i, baseValue + (double)i);
+ }
+
+ /* the above setSafe calls should not have triggered a realloc as
+ * we are within the capacity. check the vector contents
+ */
+ assertEquals(initialDefaultCapacity, vector.getValueCapacity());
+
+ for (int i = 0; i < initialDefaultCapacity; i++) {
+ double value = accessor.get(i);
+ assertEquals(baseValue + (double)i, value, 0);
+ }
+
+ /* this should trigger a realloc */
+ mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity);
+ assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity());
+
+ for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) {
+ mutator.setSafe(i, baseValue + (double)i);
+ }
+
+ for (int i = 0; i < (initialDefaultCapacity * 2); i++) {
+ double value = accessor.get(i);
+ assertEquals(baseValue + (double)i, value, 0);
+ }
+
+ /* this should trigger a realloc */
+ mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2));
+ assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity());
+
+ for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) {
+ mutator.setSafe(i, baseValue + (double)i);
+ }
+
+ for (int i = 0; i < (initialDefaultCapacity * 4); i++) {
+ double value = accessor.get(i);
+ assertEquals(baseValue + (double)i, value, 0);
+ }
+
+ /* at this point we are working with a 128KB buffer data for this
+ * vector. now let's transfer this vector
+ */
+
+ TransferPair transferPair = vector.getTransferPair(allocator);
+ transferPair.transfer();
+
+ Float8Vector toVector = (Float8Vector)transferPair.getTo();
+
+ /* now let's realloc the toVector */
+ toVector.reAlloc();
+ assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity());
+
+ final Float8Vector.Accessor toAccessor = toVector.getAccessor();
+
+ for (int i = 0; i < (initialDefaultCapacity * 8); i++) {
+ double value = toAccessor.get(i);
+ if (i < (initialDefaultCapacity * 4)) {
+ assertEquals(baseValue + (double)i, value, 0);
+ }
+ else {
+ assertEquals(0, value, 0);
+ }
+ }
+
+ toVector.close();
+ }
+ }
+
+ @Test /* NullableFloat8Vector */
+ public void testReallocAfterVectorTransfer2() {
+ try (final NullableFloat8Vector vector = new NullableFloat8Vector(EMPTY_SCHEMA_PATH, allocator)) {
+ final NullableFloat8Vector.Mutator mutator = vector.getMutator();
+ final NullableFloat8Vector.Accessor accessor = vector.getAccessor();
+ final int initialDefaultCapacity = 4096;
+ boolean error = false;
+
+ vector.allocateNew(initialDefaultCapacity);
+
+ assertEquals(initialDefaultCapacity, vector.getValueCapacity());
+
+ double baseValue = 100.375;
+
+ for (int i = 0; i < initialDefaultCapacity; i++) {
+ mutator.setSafe(i, baseValue + (double)i);
+ }
+
+ /* the above setSafe calls should not have triggered a realloc as
+ * we are within the capacity. check the vector contents
+ */
+ assertEquals(initialDefaultCapacity, vector.getValueCapacity());
+
+ for (int i = 0; i < initialDefaultCapacity; i++) {
+ double value = accessor.get(i);
+ assertEquals(baseValue + (double)i, value, 0);
+ }
+
+ /* this should trigger a realloc */
+ mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity);
+ assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity());
+
+ for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) {
+ mutator.setSafe(i, baseValue + (double)i);
+ }
+
+ for (int i = 0; i < (initialDefaultCapacity * 2); i++) {
+ double value = accessor.get(i);
+ assertEquals(baseValue + (double)i, value, 0);
+ }
+
+ /* this should trigger a realloc */
+ mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2));
+ assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity());
+
+ for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) {
+ mutator.setSafe(i, baseValue + (double)i);
+ }
+
+ for (int i = 0; i < (initialDefaultCapacity * 4); i++) {
+ double value = accessor.get(i);
+ assertEquals(baseValue + (double)i, value, 0);
+ }
+
+ /* at this point we are working with a 128KB buffer data for this
+ * vector. now let's transfer this vector
+ */
+
+ TransferPair transferPair = vector.getTransferPair(allocator);
+ transferPair.transfer();
+
+ NullableFloat8Vector toVector = (NullableFloat8Vector)transferPair.getTo();
+ final NullableFloat8Vector.Accessor toAccessor = toVector.getAccessor();
+
+ /* check toVector contents before realloc */
+ for (int i = 0; i < (initialDefaultCapacity * 4); i++) {
+ assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i));
+ double value = toAccessor.get(i);
+ assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0);
+ }
+
+ /* now let's realloc the toVector and check contents again */
+ toVector.reAlloc();
+ assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity());
+
+ for (int i = 0; i < (initialDefaultCapacity * 8); i++) {
+ if (i < (initialDefaultCapacity * 4)) {
+ assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i));
+ double value = toAccessor.get(i);
+ assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0);
+ }
+ else {
+ assertTrue("unexpected non-null value at index: " + i, toAccessor.isNull(i));
+ }
+ }
+
+ toVector.close();
+ }
+ }
+
+ @Test /* NullableVarCharVector */
+ public void testReallocAfterVectorTransfer3() {
+ try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
+ final NullableVarCharVector.Mutator mutator = vector.getMutator();
+ final NullableVarCharVector.Accessor accessor = vector.getAccessor();
+
+ /* 4096 values with 10 byte per record */
+ vector.allocateNew(4096 * 10, 4096);
+ int valueCapacity = vector.getValueCapacity();
+
+ /* populate the vector */
+ for (int i = 0; i < valueCapacity; i++) {
+ if ((i & 1) == 1) {
+ mutator.set(i, STR1);
+ }
+ else {
+ mutator.set(i, STR2);
+ }
+ }
+
+ /* Check the vector output */
+ for (int i = 0; i < valueCapacity; i++) {
+ if ((i & 1) == 1) {
+ assertArrayEquals(STR1, accessor.get(i));
+ }
+ else {
+ assertArrayEquals(STR2, accessor.get(i));
+ }
+ }
+
+ /* trigger first realloc */
+ mutator.setSafe(valueCapacity, STR2, 0, STR2.length);
+
+ /* populate the remaining vector */
+ for (int i = valueCapacity; i < vector.getValueCapacity(); i++) {
+ if ((i & 1) == 1) {
+ mutator.set(i, STR1);
+ }
+ else {
+ mutator.set(i, STR2);
+ }
+ }
+
+ /* Check the vector output */
+ valueCapacity = vector.getValueCapacity();
+ for (int i = 0; i < valueCapacity; i++) {
+ if ((i & 1) == 1) {
+ assertArrayEquals(STR1, accessor.get(i));
+ }
+ else {
+ assertArrayEquals(STR2, accessor.get(i));
+ }
+ }
+
+ /* trigger second realloc */
+ mutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length);
+
+ /* populate the remaining vector */
+ for (int i = valueCapacity; i < vector.getValueCapacity(); i++) {
+ if ((i & 1) == 1) {
+ mutator.set(i, STR1);
+ }
+ else {
+ mutator.set(i, STR2);
+ }
+ }
+
+ /* Check the vector output */
+ valueCapacity = vector.getValueCapacity();
+ for (int i = 0; i < valueCapacity; i++) {
+ if ((i & 1) == 1) {
+ assertArrayEquals(STR1, accessor.get(i));
+ }
+ else {
+ assertArrayEquals(STR2, accessor.get(i));
+ }
+ }
+
+ /* we are potentially working with 4x the size of vector buffer
+ * that we initially started with. Now let's transfer the vector.
+ */
+
+ TransferPair transferPair = vector.getTransferPair(allocator);
+ transferPair.transfer();
+ NullableVarCharVector toVector = (NullableVarCharVector)transferPair.getTo();
+ NullableVarCharVector.Mutator toMutator = toVector.getMutator();
+ NullableVarCharVector.Accessor toAccessor = toVector.getAccessor();
+
+ valueCapacity = toVector.getValueCapacity();
+
+ /* trigger a realloc of this toVector */
+ toMutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length);
+
+ toVector.close();
+ }
+ }
+
+ @Test
+ public void testReAllocNullableFixedWidthVector() {
+ // Create a new value vector for 1024 integers
+ try (final NullableFloat4Vector vector = newVector(NullableFloat4Vector.class, EMPTY_SCHEMA_PATH, MinorType.FLOAT4, allocator)) {
+ final NullableFloat4Vector.Mutator m = vector.getMutator();
+ vector.allocateNew(1024);
+
+ assertEquals(1024, vector.getValueCapacity());
+
+ // Put values in indexes that fall within the initial allocation
+ m.setSafe(0, 100.1f);
+ m.setSafe(100, 102.3f);
+ m.setSafe(1023, 104.5f);
+
+ // Now try to put values in space that falls beyond the initial allocation
+ m.setSafe(2000, 105.5f);
+
+ // Check valueCapacity is more than initial allocation
+ assertEquals(1024 * 2, vector.getValueCapacity());
+
+ final NullableFloat4Vector.Accessor accessor = vector.getAccessor();
+ assertEquals(100.1f, accessor.get(0), 0);
+ assertEquals(102.3f, accessor.get(100), 0);
+ assertEquals(104.5f, accessor.get(1023), 0);
+ assertEquals(105.5f, accessor.get(2000), 0);
+
+ // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
+ // as we don't call setSafe for null values, but we do call setValueCount when all values are inserted into the
+ // vector
+ m.setValueCount(vector.getValueCapacity() + 200);
+ }
+ }
+
+ @Test
+ public void testReAllocNullableVariableWidthVector() {
+ // Create a new value vector for 1024 integers
+ try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) {
+ final NullableVarCharVector.Mutator m = vector.getMutator();
+ vector.allocateNew();
+
+ int initialCapacity = vector.getValueCapacity();
+
+ // Put values in indexes that fall within the initial allocation
+ m.setSafe(0, STR1, 0, STR1.length);
+ m.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
+
+ // Now try to put values in space that falls beyond the initial allocation
+ m.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
+
+ // Check valueCapacity is more than initial allocation
+ assertEquals((initialCapacity + 1) * 2 - 1, vector.getValueCapacity());
+
+ final NullableVarCharVector.Accessor accessor = vector.getAccessor();
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(initialCapacity - 1));
+ assertArrayEquals(STR3, accessor.get(initialCapacity + 200));
+
+ // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
+ // as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed.
+ m.setValueCount(vector.getValueCapacity() + 200);
+ }
+ }
+
+ @Test
+ public void testFillEmptiesNotOverfill() {
+ try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) {
+ vector.allocateNew();
+
+ vector.getMutator().setSafe(4094, "hello".getBytes(), 0, 5);
+ vector.getMutator().setValueCount(4095);
+
+ assertEquals(4096 * 4, vector.getFieldBuffers().get(1).capacity());
+ }
+ }
+
+ @Test
+ public void testCopyFromWithNulls() {
+ try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator);
+ final NullableVarCharVector vector2 = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) {
+ vector.allocateNew();
+
+ for (int i = 0; i < 4095; i++) {
+ if (i % 3 == 0) {
+ continue;
+ }
+ byte[] b = Integer.toString(i).getBytes();
+ vector.getMutator().setSafe(i, b, 0, b.length);
+ }
+
+ vector.getMutator().setValueCount(4095);
+
+ vector2.allocateNew();
+
+ for (int i = 0; i < 4095; i++) {
+ vector2.copyFromSafe(i, i, vector);
+ }
+
+ vector2.getMutator().setValueCount(4095);
+
+ for (int i = 0; i < 4095; i++) {
+ if (i % 3 == 0) {
+ assertNull(vector2.getAccessor().getObject(i));
+ } else {
+ assertEquals(Integer.toString(i), vector2.getAccessor().getObject(i).toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSetLastSetUsage() {
+ try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) {
+
+ final NullableVarCharVector.Mutator mutator = vector.getMutator();
+
+ vector.allocateNew(1024 * 10, 1024);
+
+ setBytes(0, STR1, vector);
+ setBytes(1, STR2, vector);
+ setBytes(2, STR3, vector);
+ setBytes(3, STR4, vector);
+ setBytes(4, STR5, vector);
+ setBytes(5, STR6, vector);
+
+ /* Check current lastSet */
+ assertEquals(Integer.toString(-1), Integer.toString(mutator.getLastSet()));
+
+ /* Check the vector output */
+ final NullableVarCharVector.Accessor accessor = vector.getAccessor();
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(STR4, accessor.get(3));
+ assertArrayEquals(STR5, accessor.get(4));
+ assertArrayEquals(STR6, accessor.get(5));
+
+ /*
+ * If we don't do setLastSe(5) before setValueCount(), then the latter will corrupt
+ * the value vector by filling in all positions [0,valuecount-1] will empty byte arrays.
+ * Run the test by commenting out next line and we should see incorrect vector output.
+ */
+ mutator.setLastSet(5);
+ mutator.setValueCount(20);
+
+ /* Check the vector output again */
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(STR4, accessor.get(3));
+ assertArrayEquals(STR5, accessor.get(4));
+ assertArrayEquals(STR6, accessor.get(5));
+ }
+ }
+
+ @Test
+ public void testVectorLoadUnload() {
+
+ try (final NullableVarCharVector vector1 = new NullableVarCharVector("myvector", allocator)) {
+
+ final NullableVarCharVector.Mutator mutator1 = vector1.getMutator();
+
+ vector1.allocateNew(1024 * 10, 1024);
+
+ mutator1.set(0, STR1);
+ mutator1.set(1, STR2);
+ mutator1.set(2, STR3);
+ mutator1.set(3, STR4);
+ mutator1.set(4, STR5);
+ mutator1.set(5, STR6);
+ assertEquals(Integer.toString(5), Integer.toString(mutator1.getLastSet()));
+ mutator1.setValueCount(15);
+ assertEquals(Integer.toString(14), Integer.toString(mutator1.getLastSet()));
+
+ /* Check the vector output */
+ final NullableVarCharVector.Accessor accessor1 = vector1.getAccessor();
+ assertArrayEquals(STR1, accessor1.get(0));
+ assertArrayEquals(STR2, accessor1.get(1));
+ assertArrayEquals(STR3, accessor1.get(2));
+ assertArrayEquals(STR4, accessor1.get(3));
+ assertArrayEquals(STR5, accessor1.get(4));
+ assertArrayEquals(STR6, accessor1.get(5));
+
+ Field field = vector1.getField();
+ String fieldName = field.getName();
+
+ List fields = new ArrayList();
+ List fieldVectors = new ArrayList();
+
+ fields.add(field);
+ fieldVectors.add(vector1);
+
+ Schema schema = new Schema(fields);
+
+ VectorSchemaRoot schemaRoot1 = new VectorSchemaRoot(schema, fieldVectors, accessor1.getValueCount());
+ VectorUnloader vectorUnloader = new VectorUnloader(schemaRoot1);
+
+ try (
+ ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
+ BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("new vector", 0, Long.MAX_VALUE);
+ VectorSchemaRoot schemaRoot2 = VectorSchemaRoot.create(schema, finalVectorsAllocator);
+ ) {
+
+ VectorLoader vectorLoader = new VectorLoader(schemaRoot2);
+ vectorLoader.load(recordBatch);
+
+ NullableVarCharVector vector2 = (NullableVarCharVector) schemaRoot2.getVector(fieldName);
+ NullableVarCharVector.Mutator mutator2 = vector2.getMutator();
+
+ /*
+ * lastSet would have internally been set by VectorLoader.load() when it invokes
+ * loadFieldBuffers.
+ */
+ assertEquals(Integer.toString(14), Integer.toString(mutator2.getLastSet()));
+ mutator2.setValueCount(25);
+ assertEquals(Integer.toString(24), Integer.toString(mutator2.getLastSet()));
+
+ /* Check the vector output */
+ final NullableVarCharVector.Accessor accessor2 = vector2.getAccessor();
+ assertArrayEquals(STR1, accessor2.get(0));
+ assertArrayEquals(STR2, accessor2.get(1));
+ assertArrayEquals(STR3, accessor2.get(2));
+ assertArrayEquals(STR4, accessor2.get(3));
+ assertArrayEquals(STR5, accessor2.get(4));
+ assertArrayEquals(STR6, accessor2.get(5));
+ }
+ }
+ }
+
+ @Test
+ public void testFillEmptiesUsage() {
+ try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) {
+
+ final NullableVarCharVector.Mutator mutator = vector.getMutator();
+
+ vector.allocateNew(1024 * 10, 1024);
+
+ setBytes(0, STR1, vector);
+ setBytes(1, STR2, vector);
+ setBytes(2, STR3, vector);
+ setBytes(3, STR4, vector);
+ setBytes(4, STR5, vector);
+ setBytes(5, STR6, vector);
+
+ /* Check current lastSet */
+ assertEquals(Integer.toString(-1), Integer.toString(mutator.getLastSet()));
+
+ /* Check the vector output */
+ final NullableVarCharVector.Accessor accessor = vector.getAccessor();
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(STR4, accessor.get(3));
+ assertArrayEquals(STR5, accessor.get(4));
+ assertArrayEquals(STR6, accessor.get(5));
+
+ mutator.setLastSet(5);
+ /* fill empty byte arrays from index [6, 9] */
+ mutator.fillEmpties(10);
+
+ /* Check current lastSet */
+ assertEquals(Integer.toString(9), Integer.toString(mutator.getLastSet()));
+
+ /* Check the vector output */
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(STR4, accessor.get(3));
+ assertArrayEquals(STR5, accessor.get(4));
+ assertArrayEquals(STR6, accessor.get(5));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(6)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(7)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(8)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(9)));
+
+ setBytes(10, STR1, vector);
+ setBytes(11, STR2, vector);
+
+ mutator.setLastSet(11);
+ /* fill empty byte arrays from index [12, 14] */
+ mutator.setValueCount(15);
+
+ /* Check current lastSet */
+ assertEquals(Integer.toString(14), Integer.toString(mutator.getLastSet()));
+
+ /* Check the vector output */
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(STR4, accessor.get(3));
+ assertArrayEquals(STR5, accessor.get(4));
+ assertArrayEquals(STR6, accessor.get(5));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(6)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(7)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(8)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(9)));
+ assertArrayEquals(STR1, accessor.get(10));
+ assertArrayEquals(STR2, accessor.get(11));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(12)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(13)));
+ assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(14)));
+
+ /* Check offsets */
+ final UInt4Vector.Accessor offsetAccessor = vector.values.offsetVector.getAccessor();
+ assertEquals(Integer.toString(0), Integer.toString(offsetAccessor.get(0)));
+ assertEquals(Integer.toString(6), Integer.toString(offsetAccessor.get(1)));
+ assertEquals(Integer.toString(16), Integer.toString(offsetAccessor.get(2)));
+ assertEquals(Integer.toString(21), Integer.toString(offsetAccessor.get(3)));
+ assertEquals(Integer.toString(30), Integer.toString(offsetAccessor.get(4)));
+ assertEquals(Integer.toString(34), Integer.toString(offsetAccessor.get(5)));
+
+ assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(6)));
+ assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(7)));
+ assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(8)));
+ assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(9)));
+ assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(10)));
+
+ assertEquals(Integer.toString(46), Integer.toString(offsetAccessor.get(11)));
+ assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(12)));
+
+ assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(13)));
+ assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(14)));
+ assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(15)));
+ }
+ }
+
+ @Test /* NullableVarCharVector */
+ public void testGetBufferAddress1() {
+
+ try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) {
+
+ final NullableVarCharVector.Mutator mutator = vector.getMutator();
+ final NullableVarCharVector.Accessor accessor = vector.getAccessor();
+
+ vector.allocateNew(1024 * 10, 1024);
+
+ /* populate the vector */
+ mutator.set(0, STR1);
+ mutator.set(1, STR2);
+ mutator.set(2, STR3);
+ mutator.set(3, STR4);
+ mutator.set(4, STR5);
+ mutator.set(5, STR6);
+
+ mutator.setValueCount(15);
+
+ /* check the vector output */
+ assertArrayEquals(STR1, accessor.get(0));
+ assertArrayEquals(STR2, accessor.get(1));
+ assertArrayEquals(STR3, accessor.get(2));
+ assertArrayEquals(STR4, accessor.get(3));
+ assertArrayEquals(STR5, accessor.get(4));
+ assertArrayEquals(STR6, accessor.get(5));
+
+ List buffers = vector.getFieldBuffers();
+ long bitAddress = vector.getValidityBufferAddress();
+ long offsetAddress = vector.getOffsetBufferAddress();
+ long dataAddress = vector.getDataBufferAddress();
+
+ assertEquals(3, buffers.size());
+ assertEquals(bitAddress, buffers.get(0).memoryAddress());
+ assertEquals(offsetAddress, buffers.get(1).memoryAddress());
+ assertEquals(dataAddress, buffers.get(2).memoryAddress());
+ }
+ }
+
+ @Test /* NullableIntVector */
+ public void testGetBufferAddress2() {
+
+ try (final NullableIntVector vector = new NullableIntVector("myvector", allocator)) {
+
+ final NullableIntVector.Mutator mutator = vector.getMutator();
+ final NullableIntVector.Accessor accessor = vector.getAccessor();
+ boolean error = false;
+
+ vector.allocateNew(16);
+
+ /* populate the vector */
+ for(int i = 0; i < 16; i += 2) {
+ mutator.set(i, i+10);
+ }
+
+ /* check the vector output */
+ for(int i = 0; i < 16; i += 2) {
+ assertEquals(i+10, accessor.get(i));
+ }
+
+ List buffers = vector.getFieldBuffers();
+ long bitAddress = vector.getValidityBufferAddress();
+ long dataAddress = vector.getDataBufferAddress();
+
+ try {
+ long offsetAddress = vector.getOffsetBufferAddress();
+ }
+ catch (UnsupportedOperationException ue) {
+ error = true;
+ }
+ finally {
+ assertTrue(error);
+ }
+
+ assertEquals(2, buffers.size());
+ assertEquals(bitAddress, buffers.get(0).memoryAddress());
+ assertEquals(dataAddress, buffers.get(1).memoryAddress());
+ }
+ }
+
+ @Test
+ public void testMultipleClose() {
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("vector_allocator", 0, Long.MAX_VALUE);
+ NullableIntVector vector = newVector(NullableIntVector.class, EMPTY_SCHEMA_PATH, MinorType.INT, vectorAllocator);
+ vector.close();
+ vectorAllocator.close();
+ vector.close();
+ vectorAllocator.close();
+ }
+
+ public static void setBytes(int index, byte[] bytes, NullableVarCharVector vector) {
+ final int currentOffset = vector.values.offsetVector.getAccessor().get(index);
+
+ vector.bits.getMutator().setToOne(index);
+ vector.values.offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
+ vector.values.data.setBytes(currentOffset, bytes, 0, bytes.length);
+ }
+}