Skip to content

Commit

Permalink
apache#2999 Support boolean type for bulk row
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Dec 26, 2019
1 parent fc51fad commit 5aeb132
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
31 changes: 26 additions & 5 deletions processing/src/main/java/io/druid/data/input/BulkRowSequence.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ public BulkRowSequence(final Sequence<Row> sequence, final List<ValueDesc> types
category[i] = 2;
page[i] = new double[max];
break;
case STRING:
case BOOLEAN:
category[i] = 3;
page[i] = new boolean[max];
break;
case STRING:
category[i] = 4;
page[i] = new BytesOutputStream(4096);
break;
default:
category[i] = 4;
category[i] = 5;
page[i] = new Object[max];
}
nulls[i] = new BitSet();
Expand Down Expand Up @@ -185,7 +189,8 @@ public OutType accumulate(OutType prevValue, Row current)
case 0: ((float[]) page[i])[ix] = ((Number) values[i]).floatValue(); break;
case 1: ((long[]) page[i])[ix] = ((Number) values[i]).longValue(); break;
case 2: ((double[]) page[i])[ix] = ((Number) values[i]).doubleValue(); break;
case 3:
case 3: ((boolean[]) page[i])[ix] = (Boolean) values[i]; break;
case 4:
final byte[] bytes = values[i] instanceof UTF8Bytes ? ((UTF8Bytes) values[i]).getValue()
: StringUtils.toUtf8WithNullToEmpty((String) values[i]);
((BytesOutputStream) page[i]).writeVarSizeBytes(bytes);
Expand All @@ -207,15 +212,16 @@ private OutType asBulkRow()
case 0: copy[i] = copy((float[]) page[i], size, nulls[i]); break;
case 1: copy[i] = copy((long[]) page[i], size, nulls[i]); break;
case 2: copy[i] = copy((double[]) page[i], size, nulls[i]); break;
case 3:
case 3: copy[i] = copy((boolean[]) page[i], size, nulls[i]); break;
case 4:
final BytesOutputStream stream = (BytesOutputStream) page[i];
final byte[] compressed = new byte[Integer.BYTES + LZ4.maxCompressedLength(stream.size())];
System.arraycopy(Ints.toByteArray(stream.size()), 0, compressed, 0, Integer.BYTES);
copy[i] = new BytesRef(
compressed,
Integer.BYTES + LZ4.compress(stream.toByteArray(), 0, stream.size(), compressed, Integer.BYTES)
);
stream.reset();
stream.clear();
break;
default: copy[i] = Arrays.copyOf((Object[]) page[i], size); break;
}
Expand Down Expand Up @@ -269,4 +275,19 @@ private Object copy(final double[] array, final int size, final BitSet nulls)
nulls.clear();
return copy;
}

private Object copy(final boolean[] array, final int size, final BitSet nulls)
{
if (nulls.isEmpty()) {
return Arrays.copyOf(array, size);
}
final Boolean[] copy = new Boolean[size];
for (int i = 0; i < copy.length; i++) {
if (!nulls.get(i)) {
copy[i] = array[i];
}
}
nulls.clear();
return copy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,27 @@ public void writeUTF(String s)
}
}

public void mark(int mark)
{
this.mark = mark;
}

@Override
public void reset()
{
count = mark;
}

public void reset(int mark)
{
count = mark;
}

public void clear()
{
mark = count = 0;
}

public void writeVarSizeBytes(byte[] value)
{
writeUnsignedVarInt(value.length);
Expand Down

0 comments on commit 5aeb132

Please sign in to comment.