diff --git a/processing/src/main/java/io/druid/data/input/BulkRowSequence.java b/processing/src/main/java/io/druid/data/input/BulkRowSequence.java index 999f644a9d98..13c51077692a 100644 --- a/processing/src/main/java/io/druid/data/input/BulkRowSequence.java +++ b/processing/src/main/java/io/druid/data/input/BulkRowSequence.java @@ -81,12 +81,16 @@ public BulkRowSequence(final Sequence sequence, final List types category[i] = 2; page[i] = new double[max]; break; - case STRING: + case BOOLEAN: category[i] = 3; + page[i] = new boolean[max]; + break; + case STRING: + category[i] = 4; page[i] = new BytesOutputStream(4096); break; default: - category[i] = 4; + category[i] = 5; page[i] = new Object[max]; } nulls[i] = new BitSet(); @@ -185,7 +189,8 @@ public OutType accumulate(OutType prevValue, Row current) case 0: ((float[]) page[i])[ix] = ((Number) values[i]).floatValue(); break; case 1: ((long[]) page[i])[ix] = ((Number) values[i]).longValue(); break; case 2: ((double[]) page[i])[ix] = ((Number) values[i]).doubleValue(); break; - case 3: + case 3: ((boolean[]) page[i])[ix] = (Boolean) values[i]; break; + case 4: final byte[] bytes = values[i] instanceof UTF8Bytes ? ((UTF8Bytes) values[i]).getValue() : StringUtils.toUtf8WithNullToEmpty((String) values[i]); ((BytesOutputStream) page[i]).writeVarSizeBytes(bytes); @@ -207,7 +212,8 @@ private OutType asBulkRow() case 0: copy[i] = copy((float[]) page[i], size, nulls[i]); break; case 1: copy[i] = copy((long[]) page[i], size, nulls[i]); break; case 2: copy[i] = copy((double[]) page[i], size, nulls[i]); break; - case 3: + case 3: copy[i] = copy((boolean[]) page[i], size, nulls[i]); break; + case 4: final BytesOutputStream stream = (BytesOutputStream) page[i]; final byte[] compressed = new byte[Integer.BYTES + LZ4.maxCompressedLength(stream.size())]; System.arraycopy(Ints.toByteArray(stream.size()), 0, compressed, 0, Integer.BYTES); @@ -215,7 +221,7 @@ private OutType asBulkRow() compressed, Integer.BYTES + LZ4.compress(stream.toByteArray(), 0, stream.size(), compressed, Integer.BYTES) ); - stream.reset(); + stream.clear(); break; default: copy[i] = Arrays.copyOf((Object[]) page[i], size); break; } @@ -269,4 +275,19 @@ private Object copy(final double[] array, final int size, final BitSet nulls) nulls.clear(); return copy; } + + private Object copy(final boolean[] array, final int size, final BitSet nulls) + { + if (nulls.isEmpty()) { + return Arrays.copyOf(array, size); + } + final Boolean[] copy = new Boolean[size]; + for (int i = 0; i < copy.length; i++) { + if (!nulls.get(i)) { + copy[i] = array[i]; + } + } + nulls.clear(); + return copy; + } } diff --git a/processing/src/main/java/io/druid/data/input/BytesOutputStream.java b/processing/src/main/java/io/druid/data/input/BytesOutputStream.java index 56bec22ce344..8f9f4b4bb6a1 100644 --- a/processing/src/main/java/io/druid/data/input/BytesOutputStream.java +++ b/processing/src/main/java/io/druid/data/input/BytesOutputStream.java @@ -183,11 +183,27 @@ public void writeUTF(String s) } } + public void mark(int mark) + { + this.mark = mark; + } + + @Override + public void reset() + { + count = mark; + } + public void reset(int mark) { count = mark; } + public void clear() + { + mark = count = 0; + } + public void writeVarSizeBytes(byte[] value) { writeUnsignedVarInt(value.length);