Skip to content

Commit

Permalink
fix: adding intermediate transferPair function
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Jun 11, 2024
1 parent 69bd1cd commit b6d2366
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void allocateBuffers() {
sizeBuffer = allocateBuffers(sizeAllocationSizeInBytes);
}

private ArrowBuf allocateBuffers(final long size) {
protected ArrowBuf allocateBuffers(final long size) {
final int curSize = (int) size;
ArrowBuf buffer = allocator.buffer(curSize);
buffer.readerIndex(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import org.apache.arrow.memory.util.ByteFunctionHelpers;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.UnionListViewReader;
import org.apache.arrow.vector.complex.impl.UnionListViewWriter;
Expand Down Expand Up @@ -363,23 +365,17 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) {

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
// TODO: https://github.com/apache/arrow/issues/41269
throw new UnsupportedOperationException(
"ListVector does not support getTransferPair(String, BufferAllocator, CallBack) yet");
return new TransferImpl(ref, allocator, callBack);
}

@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
// TODO: https://github.com/apache/arrow/issues/41269
throw new UnsupportedOperationException(
"ListVector does not support getTransferPair(Field, BufferAllocator, CallBack) yet");
return new TransferImpl(field, allocator, callBack);
}

@Override
public TransferPair makeTransferPair(ValueVector target) {
// TODO: https://github.com/apache/arrow/issues/41269
throw new UnsupportedOperationException(
"ListVector does not support makeTransferPair(ValueVector) yet");
return new TransferImpl((ListViewVector) target);
}

@Override
Expand Down Expand Up @@ -455,6 +451,148 @@ public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
throw new UnsupportedOperationException();
}

private class TransferImpl implements TransferPair {

ListViewVector to;
TransferPair dataTransferPair;

public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new ListViewVector(name, allocator, field.getFieldType(), callBack));
}

public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) {
this(new ListViewVector(field, allocator, callBack));
}

public TransferImpl(ListViewVector to) {
this.to = to;
to.addOrGetVector(vector.getField().getFieldType());
if (to.getDataVector() instanceof ZeroVector) {
to.addOrGetVector(vector.getField().getFieldType());
}
dataTransferPair = getDataVector().makeTransferPair(to.getDataVector());
}

@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
to.validityBuffer = transferBuffer(validityBuffer, to.allocator);
to.offsetBuffer = transferBuffer(offsetBuffer, to.allocator);
to.sizeBuffer = transferBuffer(sizeBuffer, to.allocator);
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}

@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(startIndex >= 0 && length >= 0 && startIndex + length <= valueCount,
"Invalid parameters startIndex: %s, length: %s for valueCount: %s", startIndex, length, valueCount);
to.clear();
if (length > 0) {
final int startPoint = offsetBuffer.getInt((long) startIndex * OFFSET_WIDTH);
// we have to scan by index since there are out-of-order offsets
to.offsetBuffer = to.allocateBuffers((long) length * OFFSET_WIDTH);
to.sizeBuffer = to.allocateBuffers((long) length * SIZE_WIDTH);

/* splitAndTransfer the offset buffer and validity buffer */
int maxOffsetAndSizeSum = -1;
int minOffsetValue = -1;
for (int i = 0; i < length; i++) {
final int offsetValue = offsetBuffer.getInt((long) (startIndex + i) * OFFSET_WIDTH);
final int sizeValue = sizeBuffer.getInt((long) (startIndex + i) * SIZE_WIDTH);
final int relativeOffset = offsetValue - startPoint;
to.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeOffset);
to.sizeBuffer.setInt((long) i * SIZE_WIDTH, sizeValue);
if (maxOffsetAndSizeSum < offsetValue + sizeValue) {
maxOffsetAndSizeSum = offsetValue + sizeValue;
}
if (minOffsetValue == -1 || minOffsetValue > offsetValue) {
minOffsetValue = offsetValue;
}
}

/* splitAndTransfer the validity buffer */
splitAndTransferValidityBuffer(startIndex, length, to);

/* splitAndTransfer the data buffer */
final int childSliceLength = maxOffsetAndSizeSum - minOffsetValue;
dataTransferPair.splitAndTransfer(minOffsetValue, childSliceLength);
to.setValueCount(length);
}
}

/*
* transfer the validity.
*/
private void splitAndTransferValidityBuffer(int startIndex, int length, ListViewVector target) {
int firstByteSource = BitVectorHelper.byteIndex(startIndex);
int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
int byteSizeTarget = getValidityBufferSizeFromCount(length);
int offset = startIndex % 8;

if (length > 0) {
if (offset == 0) {
// slice
if (target.validityBuffer != null) {
target.validityBuffer.getReferenceManager().release();
}
target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
target.validityBuffer.getReferenceManager().retain(1);
} else {
/* Copy data
* When the first bit starts from the middle of a byte (offset != 0),
* copy data from src BitVector.
* Each byte in the target is composed by a part in i-th byte,
* another part in (i+1)-th byte.
*/
target.allocateValidityBuffer(byteSizeTarget);

for (int i = 0; i < byteSizeTarget - 1; i++) {
byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer, firstByteSource + i, offset);
byte b2 = BitVectorHelper.getBitsFromNextByte(validityBuffer, firstByteSource + i + 1, offset);

target.validityBuffer.setByte(i, (b1 + b2));
}

/* Copying the last piece is done in the following manner:
* if the source vector has 1 or more bytes remaining, we copy
* the last piece as a byte formed by shifting data
* from the current byte and the next byte.
*
* if the source vector has no more bytes remaining
* (we are at the last byte), we copy the last piece as a byte
* by shifting data from the current byte.
*/
if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer,
firstByteSource + byteSizeTarget - 1, offset);
byte b2 = BitVectorHelper.getBitsFromNextByte(validityBuffer,
firstByteSource + byteSizeTarget, offset);

target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
} else {
byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer,
firstByteSource + byteSizeTarget - 1, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1);
}
}
}
}

@Override
public ValueVector getTo() {
return null;
}

@Override
public void copyValueSafe(int from, int to) {

}
}

@Override
protected FieldReader getReaderImpl() {
return new UnionListViewReader(this);
Expand Down
Loading

0 comments on commit b6d2366

Please sign in to comment.