diff --git a/src/main/java/com/facebook/presto/AggregationFunction.java b/src/main/java/com/facebook/presto/AggregationFunction.java index 27569689e7a6..21110f963a09 100644 --- a/src/main/java/com/facebook/presto/AggregationFunction.java +++ b/src/main/java/com/facebook/presto/AggregationFunction.java @@ -2,6 +2,9 @@ public interface AggregationFunction { + TupleInfo getTupleInfo(); + void add(ValueBlock values, PositionBlock relevantPositions); - Object evaluate(); + + Tuple evaluate(); } diff --git a/src/main/java/com/facebook/presto/BasicSliceOutput.java b/src/main/java/com/facebook/presto/BasicSliceOutput.java new file mode 100644 index 000000000000..83f703930f65 --- /dev/null +++ b/src/main/java/com/facebook/presto/BasicSliceOutput.java @@ -0,0 +1,218 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.facebook.presto; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +public class BasicSliceOutput extends SliceOutput +{ + private final Slice slice; + private int size; + + protected BasicSliceOutput(Slice slice) + { + this.slice = slice; + } + + @Override + public void reset() + { + size = 0; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean isWritable() + { + return writableBytes() > 0; + } + + @Override + public int writableBytes() + { + return slice.length() - size; + } + + @Override + public void writeByte(int value) + { + slice.setByte(size++, value); + } + + @Override + public void writeShort(int value) + { + slice.setShort(size, value); + size += 2; + } + + @Override + public void writeInt(int value) + { + slice.setInt(size, value); + size += 4; + } + + @Override + public void writeLong(long value) + { + slice.setLong(size, value); + size += 8; + } + + @Override + public void writeBytes(byte[] source, int sourceIndex, int length) + { + slice.setBytes(size, source, sourceIndex, length); + size += length; + } + + @Override + public void writeBytes(byte[] source) + { + writeBytes(source, 0, source.length); + } + + @Override + public void writeBytes(Slice source) + { + writeBytes(source, 0, source.length()); + } + + @Override + public void writeBytes(SliceInput source, int length) + { + if (length > source.available()) { + throw new IndexOutOfBoundsException(); + } + writeBytes(source.readBytes(length)); + } + + @Override + public void writeBytes(Slice source, int sourceIndex, int length) + { + slice.setBytes(size, source, sourceIndex, length); + size += length; + } + + @Override + public void writeBytes(ByteBuffer source) + { + int length = source.remaining(); + slice.setBytes(size, source); + size += length; + } + + @Override + public int writeBytes(InputStream in, int length) + throws IOException + { + int writtenBytes = slice.setBytes(size, in, length); + if (writtenBytes > 0) { + size += writtenBytes; + } + return writtenBytes; + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) + throws IOException + { + int writtenBytes = slice.setBytes(size, in, length); + if (writtenBytes > 0) { + size += writtenBytes; + } + return writtenBytes; + } + + @Override + public int writeBytes(FileChannel in, int position, int length) + throws IOException + { + int writtenBytes = slice.setBytes(size, in, position, length); + if (writtenBytes > 0) { + size += writtenBytes; + } + return writtenBytes; + } + + @Override + public void writeZero(int length) + { + if (length == 0) { + return; + } + if (length < 0) { + throw new IllegalArgumentException( + "length must be 0 or greater than 0."); + } + int nLong = length >>> 3; + int nBytes = length & 7; + for (int i = nLong; i > 0; i--) { + writeLong(0); + } + if (nBytes == 4) { + writeInt(0); + } + else if (nBytes < 4) { + for (int i = nBytes; i > 0; i--) { + writeByte((byte) 0); + } + } + else { + writeInt(0); + for (int i = nBytes - 4; i > 0; i--) { + writeByte((byte) 0); + } + } + } + + @Override + public Slice slice() + { + return slice.slice(0, size); + } + + @Override + public ByteBuffer toByteBuffer() + { + return slice.toByteBuffer(0, size); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + '(' + + "size=" + size + ", " + + "capacity=" + slice.length() + + ')'; + } + + public String toString(Charset charset) + { + return slice.toString(0, size, charset); + } +} diff --git a/src/main/java/com/facebook/presto/BlockBuilder.java b/src/main/java/com/facebook/presto/BlockBuilder.java index 60c9ac32be1b..d931d0cb0d12 100644 --- a/src/main/java/com/facebook/presto/BlockBuilder.java +++ b/src/main/java/com/facebook/presto/BlockBuilder.java @@ -1,13 +1,71 @@ package com.facebook.presto; +import com.google.common.base.Preconditions; +import io.airlift.units.DataSize; +import io.airlift.units.DataSize.Unit; + public class BlockBuilder { - public void append(Object value) + private static final DataSize DEFAULT_MAX_BLOCK_SIZE = new DataSize(64, Unit.KILOBYTE); + + private final long startPosition; + private final TupleInfo tupleInfo; + private final int maxBlockSize; + private final DynamicSliceOutput sliceOutput; + + public BlockBuilder(long startPosition, TupleInfo tupleInfo) + { + this(startPosition, tupleInfo, DEFAULT_MAX_BLOCK_SIZE); + } + + public BlockBuilder(long startPosition, TupleInfo tupleInfo, DataSize blockSize) + { + Preconditions.checkArgument(startPosition >= 0, "startPosition is negative"); + Preconditions.checkNotNull(blockSize, "blockSize is null"); + + this.startPosition = startPosition; + this.tupleInfo = tupleInfo; + maxBlockSize = (int) blockSize.toBytes(); + sliceOutput = new DynamicSliceOutput((int) blockSize.toBytes()); + } + + public boolean isFull() + { + return sliceOutput.size() > maxBlockSize; + } + + public void append(byte value) + { + sliceOutput.write(value); + } + + public void append(int value) + { + sliceOutput.writeInt(value); + } + + public void append(long value) + { + sliceOutput.writeLong(value); + } + + public void append(byte[] value) + { + sliceOutput.writeBytes(value); + } + + public void append(Slice value) + { + sliceOutput.writeBytes(value); + } + + public void append(Tuple tuple) { + tuple.writeTo(sliceOutput); } - public ValueBlock build() + public UncompressedValueBlock build() { - return null; + return new UncompressedValueBlock(startPosition, tupleInfo, sliceOutput.slice()); } } diff --git a/src/main/java/com/facebook/presto/CsvFileScanner.java b/src/main/java/com/facebook/presto/CsvFileScanner.java index 39e6d3d8338b..33d8fd084d75 100644 --- a/src/main/java/com/facebook/presto/CsvFileScanner.java +++ b/src/main/java/com/facebook/presto/CsvFileScanner.java @@ -12,35 +12,40 @@ import java.io.InputStreamReader; import java.util.Iterator; -public class CsvFileScanner implements Iterable +public class CsvFileScanner implements Iterable { private final InputSupplier inputSupplier; private final Splitter columnSplitter; private final int columnIndex; + private final TupleInfo tupleInfo; - public CsvFileScanner(InputSupplier inputSupplier, int columnIndex, char columnSeparator) + public CsvFileScanner(InputSupplier inputSupplier, int columnIndex, char columnSeparator, TupleInfo tupleInfo) { - this.columnIndex = columnIndex; Preconditions.checkNotNull(inputSupplier, "inputSupplier is null"); + + this.columnIndex = columnIndex; + this.tupleInfo = tupleInfo; this.inputSupplier = inputSupplier; columnSplitter = Splitter.on(columnSeparator); } @Override - public Iterator iterator() + public Iterator iterator() { - return new ColumnIterator(inputSupplier, columnIndex, columnSplitter); + return new ColumnIterator(inputSupplier, columnIndex, columnSplitter, tupleInfo); } - private static class ColumnIterator extends AbstractIterator + private static class ColumnIterator extends AbstractIterator { - private long position; private final LineReader reader; - private int columnIndex; - private Splitter columnSplitter; + private final TupleInfo tupleInfo; + private final int columnIndex; + private final Splitter columnSplitter; + private long position; - public ColumnIterator(InputSupplier inputSupplier, int columnIndex, Splitter columnSplitter) + public ColumnIterator(InputSupplier inputSupplier, int columnIndex, Splitter columnSplitter, TupleInfo tupleInfo) { + this.tupleInfo = tupleInfo; try { this.reader = new LineReader(inputSupplier.getInput()); } @@ -52,23 +57,42 @@ public ColumnIterator(InputSupplier inputSupplier, int column } @Override - protected ValueBlock computeNext() + protected UncompressedValueBlock computeNext() { - String line; - try { - line = reader.readLine(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } + String line = nextLine(); if (line == null) { endOfData(); return null; } - Iterable split = columnSplitter.split(line); - String value = Iterables.get(split, columnIndex); - return new UncompressedValueBlock(position++, value); + BlockBuilder blockBuilder = new BlockBuilder(position, tupleInfo); + do { + Iterable split = columnSplitter.split(line); + String value = Iterables.get(split, columnIndex); + + // calculate final value for this group + // todo add support for other column types + blockBuilder.append(Long.valueOf(value)); + + if (blockBuilder.isFull()) { + break; + } + line = nextLine(); + } while (line != null); + + UncompressedValueBlock block = blockBuilder.build(); + position += block.getCount(); + return block; + } + + private String nextLine() + { + try { + return reader.readLine(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } } } } diff --git a/src/main/java/com/facebook/presto/DataScan1.java b/src/main/java/com/facebook/presto/DataScan1.java index 7aadcd4cec1d..0c6bd7e0d797 100644 --- a/src/main/java/com/facebook/presto/DataScan1.java +++ b/src/main/java/com/facebook/presto/DataScan1.java @@ -9,9 +9,9 @@ public class DataScan1 extends AbstractIterator { private final Iterator source; - private final Predicate predicate; + private final Predicate predicate; - public DataScan1(Iterator source, Predicate predicate) + public DataScan1(Iterator source, Predicate predicate) { this.predicate = predicate; this.source = source; diff --git a/src/main/java/com/facebook/presto/DataScan2.java b/src/main/java/com/facebook/presto/DataScan2.java index dcf2210b1692..68e90db3a78e 100644 --- a/src/main/java/com/facebook/presto/DataScan2.java +++ b/src/main/java/com/facebook/presto/DataScan2.java @@ -9,9 +9,9 @@ public class DataScan2 extends AbstractIterator { private final Iterator source; - private final Predicate predicate; + private final Predicate predicate; - public DataScan2(Iterator source, Predicate predicate) + public DataScan2(Iterator source, Predicate predicate) { this.predicate = predicate; this.source = source; diff --git a/src/main/java/com/facebook/presto/DynamicSliceOutput.java b/src/main/java/com/facebook/presto/DynamicSliceOutput.java new file mode 100644 index 000000000000..d832110e1e6b --- /dev/null +++ b/src/main/java/com/facebook/presto/DynamicSliceOutput.java @@ -0,0 +1,230 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.facebook.presto; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +public class DynamicSliceOutput extends SliceOutput +{ + private Slice slice; + private int size; + + public DynamicSliceOutput(int estimatedSize) + { + this.slice = new Slice(estimatedSize); + } + + @Override + public void reset() + { + size = 0; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean isWritable() + { + return writableBytes() > 0; + } + + @Override + public int writableBytes() + { + return slice.length() - size; + } + + @Override + public void writeByte(int value) + { + slice = Slices.ensureSize(slice, size + 1); + slice.setByte(size++, value); + } + + @Override + public void writeShort(int value) + { + slice = Slices.ensureSize(slice, size + 2); + slice.setShort(size, value); + size += 2; + } + + @Override + public void writeInt(int value) + { + slice = Slices.ensureSize(slice, size + 4); + slice.setInt(size, value); + size += 4; + } + + @Override + public void writeLong(long value) + { + slice = Slices.ensureSize(slice, size + 8); + slice.setLong(size, value); + size += 8; + } + + @Override + public void writeBytes(byte[] source) + { + writeBytes(source, 0, source.length); + } + + @Override + public void writeBytes(byte[] source, int sourceIndex, int length) + { + slice = Slices.ensureSize(slice, size + length); + slice.setBytes(size, source, sourceIndex, length); + size += length; + } + + @Override + public void writeBytes(Slice source) + { + writeBytes(source, 0, source.length()); + } + + @Override + public void writeBytes(SliceInput source, int length) + { + if (length > source.available()) { + throw new IndexOutOfBoundsException(); + } + writeBytes(source.slice()); + } + + @Override + public void writeBytes(Slice source, int sourceIndex, int length) + { + slice = Slices.ensureSize(slice, size + length); + slice.setBytes(size, source, sourceIndex, length); + size += length; + } + + @Override + public void writeBytes(ByteBuffer source) + { + int length = source.remaining(); + slice = Slices.ensureSize(slice, size + length); + slice.setBytes(size, source); + size += length; + } + + @Override + public int writeBytes(InputStream in, int length) + throws IOException + { + slice = Slices.ensureSize(slice, size + length); + int writtenBytes = slice.setBytes(size, in, length); + if (writtenBytes > 0) { + size += writtenBytes; + } + return writtenBytes; + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) + throws IOException + { + slice = Slices.ensureSize(slice, size + length); + int writtenBytes = slice.setBytes(size, in, length); + if (writtenBytes > 0) { + size += writtenBytes; + } + return writtenBytes; + } + + @Override + public int writeBytes(FileChannel in, int position, int length) + throws IOException + { + slice = Slices.ensureSize(slice, size + length); + int writtenBytes = slice.setBytes(size, in, position, length); + if (writtenBytes > 0) { + size += writtenBytes; + } + return writtenBytes; + } + + @Override + public void writeZero(int length) + { + if (length == 0) { + return; + } + if (length < 0) { + throw new IllegalArgumentException( + "length must be 0 or greater than 0."); + } + slice = Slices.ensureSize(slice, size + length); + int nLong = length >>> 3; + int nBytes = length & 7; + for (int i = nLong; i > 0; i--) { + writeLong(0); + } + if (nBytes == 4) { + writeInt(0); + } + else if (nBytes < 4) { + for (int i = nBytes; i > 0; i--) { + writeByte((byte) 0); + } + } + else { + writeInt(0); + for (int i = nBytes - 4; i > 0; i--) { + writeByte((byte) 0); + } + } + } + + @Override + public Slice slice() + { + return slice.slice(0, size); + } + + @Override + public ByteBuffer toByteBuffer() + { + return slice.toByteBuffer(0, size); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + '(' + + "size=" + size + ", " + + "capacity=" + slice.length() + + ')'; + } + + @Override + public String toString(Charset charset) + { + return slice.toString(0, size, charset); + } +} diff --git a/src/main/java/com/facebook/presto/EmptyValueBlock.java b/src/main/java/com/facebook/presto/EmptyValueBlock.java index ab955a675356..c2361e10f741 100644 --- a/src/main/java/com/facebook/presto/EmptyValueBlock.java +++ b/src/main/java/com/facebook/presto/EmptyValueBlock.java @@ -10,16 +10,16 @@ import java.util.Iterator; public class EmptyValueBlock - implements ValueBlock + implements ValueBlock { @Override - public PositionBlock selectPositions(Predicate predicate) + public PositionBlock selectPositions(Predicate predicate) { return new EmptyPositionBlock(); } @Override - public ValueBlock selectPairs(Predicate predicate) + public ValueBlock selectPairs(Predicate predicate) { return this; } @@ -79,7 +79,7 @@ public Range getRange() } @Override - public Iterator iterator() + public Iterator iterator() { return Iterators.emptyIterator(); } diff --git a/src/main/java/com/facebook/presto/GroupBy.java b/src/main/java/com/facebook/presto/GroupBy.java index 60ab0abe41a3..a10e31112da0 100644 --- a/src/main/java/com/facebook/presto/GroupBy.java +++ b/src/main/java/com/facebook/presto/GroupBy.java @@ -33,7 +33,7 @@ protected RunLengthEncodedBlock computeNext() // form a group from the current position, until the value changes Pair entry = currentGroupByBlock.next(); - Object groupByKey = entry.getValue(); + Tuple groupByKey = entry.getValue(); long startPosition = entry.getPosition(); while (true) { diff --git a/src/main/java/com/facebook/presto/HashAggregation.java b/src/main/java/com/facebook/presto/HashAggregation.java index 715a5a62c783..1c4aa189af55 100644 --- a/src/main/java/com/facebook/presto/HashAggregation.java +++ b/src/main/java/com/facebook/presto/HashAggregation.java @@ -18,16 +18,20 @@ public class HashAggregation private final Provider functionProvider; - private Iterator> aggregations; + private Iterator> aggregations; private long position; + private final TupleInfo tupleInfo; - public HashAggregation(Iterator keySource, SeekableIterator valueSource, Provider functionProvider) + public HashAggregation(TupleInfo tupleInfo, Iterator keySource, + SeekableIterator valueSource, + Provider functionProvider) { this.groupBySource = keySource; this.aggregationSource = valueSource; this.functionProvider = functionProvider; + this.tupleInfo = tupleInfo; } @Override @@ -35,7 +39,7 @@ protected ValueBlock computeNext() { // process all data ahead of time if (aggregations == null) { - Map aggregationMap = new HashMap<>(); + Map aggregationMap = new HashMap<>(); while (groupBySource.hasNext()) { RunLengthEncodedBlock group = groupBySource.next(); @@ -56,13 +60,24 @@ protected ValueBlock computeNext() return null; } - // get next aggregation - Entry aggregation = aggregations.next(); + BlockBuilder blockBuilder = new BlockBuilder(position, tupleInfo); + while (!blockBuilder.isFull() && aggregations.hasNext()) { + // get next aggregation + Entry aggregation = aggregations.next(); - // calculate final value for this group - Object value = aggregation.getValue().evaluate(); + // calculate final value for this group + Tuple key = aggregation.getKey(); + Tuple value = aggregation.getValue().evaluate(); + blockBuilder.append(key); + blockBuilder.append(value); + } + + // build output block + ValueBlock block = blockBuilder.build(); + + // update position + position += block.getCount(); - // build an output block - return new UncompressedValueBlock(position++, new Tuple(aggregation.getKey(), value)); + return block; } } diff --git a/src/main/java/com/facebook/presto/Merge.java b/src/main/java/com/facebook/presto/Merge.java index 44a6f991182a..d58e93431c39 100644 --- a/src/main/java/com/facebook/presto/Merge.java +++ b/src/main/java/com/facebook/presto/Merge.java @@ -7,22 +7,22 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import static com.google.common.base.Predicates.not; public class Merge - extends AbstractIterator + extends AbstractIterator { - private final static int TUPLES_PER_BLOCK = 1000; + private final List> sources; + private final TupleInfo tupleInfo; + private long position; - private final List> sources; - - public Merge(List> sources) + public Merge(List> sources, TupleInfo tupleInfo) { - ImmutableList.Builder> builder = ImmutableList.builder(); + ImmutableList.Builder> builder = ImmutableList.builder(); + this.tupleInfo = tupleInfo; for (Iterator source : sources) { builder.add(Iterators.concat(Iterators.transform(source, toIterator()))); @@ -31,12 +31,12 @@ public Merge(List> sources) this.sources = builder.build(); } - private static Function> toIterator() + private static Function> toIterator() { - return new Function>() + return new Function>() { @Override - public Iterator apply(ValueBlock input) + public Iterator apply(ValueBlock input) { return input.iterator(); } @@ -51,26 +51,29 @@ protected ValueBlock computeNext() return null; } - List tuples = new ArrayList<>(TUPLES_PER_BLOCK); + BlockBuilder blockBuilder = new BlockBuilder(position, tupleInfo); - while (tuples.size() < TUPLES_PER_BLOCK && Iterables.all(sources, hasNextPredicate())) { - ImmutableList.Builder builder = ImmutableList.builder(); - for (Iterator source : sources) { - builder.add(source.next()); + do { + for (Iterator source : sources) { + blockBuilder.append(source.next()); } - tuples.add(new Tuple(builder.build())); - } + if (blockBuilder.isFull()) { + break; + } + } while (Iterables.all(sources, hasNextPredicate())); - return new UncompressedValueBlock(0, (List) tuples); + UncompressedValueBlock block = blockBuilder.build(); + position += block.getCount(); + return block; } - private Predicate> hasNextPredicate() + private Predicate> hasNextPredicate() { - return new Predicate>() + return new Predicate>() { @Override - public boolean apply(Iterator input) + public boolean apply(Iterator input) { return input.hasNext(); } diff --git a/src/main/java/com/facebook/presto/Pair.java b/src/main/java/com/facebook/presto/Pair.java index b397fe796ef0..64273f2604b7 100644 --- a/src/main/java/com/facebook/presto/Pair.java +++ b/src/main/java/com/facebook/presto/Pair.java @@ -5,9 +5,9 @@ public class Pair { private final long position; - private final Object value; + private final Tuple value; - public Pair(long position, Object value) + public Pair(long position, Tuple value) { this.position = position; this.value = value; @@ -18,7 +18,7 @@ public long getPosition() return position; } - public Object getValue() + public Tuple getValue() { return value; } @@ -35,12 +35,12 @@ public Long apply(Pair input) }; } - public static Function valueGetter() + public static Function valueGetter() { - return new Function() + return new Function() { @Override - public Object apply(Pair input) + public Tuple apply(Pair input) { return input.getValue(); } diff --git a/src/main/java/com/facebook/presto/PairsIterator.java b/src/main/java/com/facebook/presto/PairsIterator.java index 923aed66fe90..475eb2620c6f 100644 --- a/src/main/java/com/facebook/presto/PairsIterator.java +++ b/src/main/java/com/facebook/presto/PairsIterator.java @@ -8,10 +8,10 @@ public class PairsIterator extends AbstractIterator { - private final Iterator blockIterator; + private final Iterator blockIterator; private PeekingIterator currentBlock; - public PairsIterator(Iterator blockIterator) + public PairsIterator(Iterator blockIterator) { this.blockIterator = blockIterator; } @@ -42,5 +42,4 @@ private boolean advance() currentBlock = blockIterator.next().pairIterator(); return true; } - } diff --git a/src/main/java/com/facebook/presto/PipelinedAggregation.java b/src/main/java/com/facebook/presto/PipelinedAggregation.java index 4519da902b43..e15af3781731 100644 --- a/src/main/java/com/facebook/presto/PipelinedAggregation.java +++ b/src/main/java/com/facebook/presto/PipelinedAggregation.java @@ -12,9 +12,15 @@ public class PipelinedAggregation private final SeekableIterator aggregationSource; private final Provider functionProvider; + private final TupleInfo tupleInfo; + private long position; - public PipelinedAggregation(Iterator keySource, SeekableIterator valueSource, Provider functionProvider) + public PipelinedAggregation(TupleInfo tupleInfo, + Iterator keySource, + SeekableIterator valueSource, + Provider functionProvider) { + this.tupleInfo = tupleInfo; this.groupBySource = keySource; this.aggregationSource = valueSource; @@ -30,18 +36,28 @@ protected ValueBlock computeNext() return null; } - // get next group - RunLengthEncodedBlock group = groupBySource.next(); + BlockBuilder builder = new BlockBuilder(position, tupleInfo); - // create a new aggregate for this group - AggregationFunction aggregationFunction = functionProvider.get(); + do { + // get next group + RunLengthEncodedBlock group = groupBySource.next(); - AggregationUtil.processGroup(aggregationSource, aggregationFunction, group.getRange()); + // create a new aggregate for this group + AggregationFunction aggregationFunction = functionProvider.get(); - // calculate final value for this group - Object value = aggregationFunction.evaluate(); + AggregationUtil.processGroup(aggregationSource, aggregationFunction, group.getRange()); + + // calculate final value for this group + Tuple value = aggregationFunction.evaluate(); + + builder.append(group.getValue()); + builder.append(value); + } + while (!builder.isFull() && groupBySource.hasNext()); // build an output block - return new UncompressedValueBlock(group.getRange().lowerEndpoint(), new Tuple(group.getValue(), value)); + UncompressedValueBlock block = builder.build(); + position += block.getCount(); + return block; } } diff --git a/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java b/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java index 3bcd8a915900..b7df0b8ff55a 100644 --- a/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java +++ b/src/main/java/com/facebook/presto/RunLengthEncodedBlock.java @@ -3,7 +3,6 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.DiscreteDomains; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import com.google.common.collect.Range; @@ -12,30 +11,30 @@ import java.util.Iterator; public class RunLengthEncodedBlock - implements ValueBlock + implements ValueBlock { - private final Object value; + private final Tuple value; private final Range range; - public RunLengthEncodedBlock(Object value, Range range) + public RunLengthEncodedBlock(Tuple value, Range range) { this.value = value; this.range = range; } - public Object getValue() + public Tuple getValue() { return value; } @Override - public PositionBlock selectPositions(Predicate predicate) + public PositionBlock selectPositions(Predicate predicate) { return null; } @Override - public ValueBlock selectPairs(Predicate predicate) + public ValueBlock selectPairs(Predicate predicate) { return null; } @@ -43,25 +42,32 @@ public ValueBlock selectPairs(Predicate predicate) @Override public ValueBlock filter(PositionBlock positions) { - ImmutableList.Builder builder = ImmutableList.builder(); - for (Long position : positions.getPositions()) { + // todo this is wrong, it should produce either another RLE block or a BitVectorBlock + int matches = 0; + for (long position : positions.getPositions()) { if (range.contains(position)) { - builder.add(new Pair(position, value)); + matches++; } } - - ImmutableList pairs = builder.build(); - if (pairs.isEmpty()) { + if (matches == 0) { return new EmptyValueBlock(); } - return new UncompressedValueBlock(pairs); + Slice newSlice = Slices.allocate(matches * value.getTupleInfo().size()); + SliceOutput sliceOutput = newSlice.output(); + for (int i = 0; i < matches; i++) { + value.writeTo(sliceOutput); + } + + // todo what is the start position + return new UncompressedValueBlock(0, value.getTupleInfo(), newSlice); } @Override public PeekingIterator pairIterator() { - return Iterators.peekingIterator(Iterators.transform(getPositions().iterator(), new Function() { + return Iterators.peekingIterator(Iterators.transform(getPositions().iterator(), new Function() + { @Override public Pair apply(Long position) { @@ -70,6 +76,12 @@ public Pair apply(Long position) })); } + @Override + public Iterator iterator() + { + return Iterators.peekingIterator(Collections.nCopies(getCount(), value).iterator()); + } + @Override public boolean isEmpty() { @@ -116,10 +128,4 @@ public String toString() { return Iterators.toString(pairIterator()); } - - @Override - public Iterator iterator() - { - return Iterators.peekingIterator(Collections.nCopies(getCount(), value).iterator()); - } } diff --git a/src/main/java/com/facebook/presto/SizeOf.java b/src/main/java/com/facebook/presto/SizeOf.java new file mode 100644 index 000000000000..8ff051a1a9e7 --- /dev/null +++ b/src/main/java/com/facebook/presto/SizeOf.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2011 the original author or authors. + * See the notice.md file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto; + +public final class SizeOf +{ + public static final byte SIZE_OF_BYTE = 1; + public static final byte SIZE_OF_SHORT = 2; + public static final byte SIZE_OF_INT = 4; + public static final byte SIZE_OF_LONG = 8; + + private SizeOf() + { + } +} diff --git a/src/main/java/com/facebook/presto/Slice.java b/src/main/java/com/facebook/presto/Slice.java new file mode 100644 index 000000000000..8a1904d71754 --- /dev/null +++ b/src/main/java/com/facebook/presto/Slice.java @@ -0,0 +1,716 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.facebook.presto; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; +import java.util.Arrays; + +import static com.facebook.presto.SizeOf.SIZE_OF_BYTE; +import static com.facebook.presto.SizeOf.SIZE_OF_INT; +import static com.facebook.presto.SizeOf.SIZE_OF_LONG; +import static com.facebook.presto.SizeOf.SIZE_OF_SHORT; +import static java.nio.ByteOrder.LITTLE_ENDIAN; + +/** + * Little Endian slice of a byte array. + */ +public final class Slice implements Comparable +{ + private final byte[] data; + private final int offset; + private final int length; + + private int hash; + + public Slice(int length) + { + data = new byte[length]; + this.offset = 0; + this.length = length; + } + + public Slice(byte[] data) + { + Preconditions.checkNotNull(data, "array is null"); + this.data = data; + this.offset = 0; + this.length = data.length; + } + + public Slice(byte[] data, int offset, int length) + { + Preconditions.checkNotNull(data, "array is null"); + this.data = data; + this.offset = offset; + this.length = length; + } + + /** + * Length of this slice. + */ + public int length() + { + return length; + } + + /** + * Gets the array underlying this slice. + */ + public byte[] getRawArray() + { + return data; + } + + /** + * Gets the offset of this slice in the underlying array. + */ + public int getRawOffset() + { + return offset; + } + + /** + * Gets a byte at the specified absolute {@code index} in this buffer. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 1} is greater than {@code this.capacity} + */ + public byte getByte(int index) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_BYTE, this.length); + index += offset; + return data[index]; + } + + /** + * Gets an unsigned byte at the specified absolute {@code index} in this + * buffer. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 1} is greater than {@code this.capacity} + */ + public short getUnsignedByte(int index) + { + return (short) (getByte(index) & 0xFF); + } + + /** + * Gets a 16-bit short integer at the specified absolute {@code index} in + * this slice. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 2} is greater than {@code this.capacity} + */ + public short getShort(int index) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_SHORT, this.length); + index += offset; + return (short) (data[index] & 0xFF | data[index + 1] << 8); + } + + /** + * Gets a 32-bit integer at the specified absolute {@code index} in + * this buffer. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 4} is greater than {@code this.capacity} + */ + public int getInt(int index) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_INT, this.length); + index += offset; + return (data[index] & 0xff) | + (data[index + 1] & 0xff) << 8 | + (data[index + 2] & 0xff) << 16 | + (data[index + 3] & 0xff) << 24; + } + + /** + * Gets a 64-bit long integer at the specified absolute {@code index} in + * this buffer. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 8} is greater than {@code this.capacity} + */ + public long getLong(int index) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_LONG, this.length); + index += offset; + return ((long) data[index] & 0xff) | + ((long) data[index + 1] & 0xff) << 8 | + ((long) data[index + 2] & 0xff) << 16 | + ((long) data[index + 3] & 0xff) << 24 | + ((long) data[index + 4] & 0xff) << 32 | + ((long) data[index + 5] & 0xff) << 40 | + ((long) data[index + 6] & 0xff) << 48 | + ((long) data[index + 7] & 0xff) << 56; + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the specified absolute {@code index}. + * + * @param dstIndex the first index of the destination + * @param length the number of bytes to transfer + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0}, + * if the specified {@code dstIndex} is less than {@code 0}, + * if {@code index + length} is greater than + * {@code this.capacity}, or + * if {@code dstIndex + length} is greater than + * {@code dst.capacity} + */ + public void getBytes(int index, Slice dst, int dstIndex, int length) + { + getBytes(index, dst.data, dstIndex, length); + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the specified absolute {@code index}. + * + * @param destinationIndex the first index of the destination + * @param length the number of bytes to transfer + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0}, + * if the specified {@code dstIndex} is less than {@code 0}, + * if {@code index + length} is greater than + * {@code this.capacity}, or + * if {@code dstIndex + length} is greater than + * {@code dst.length} + */ + public void getBytes(int index, byte[] destination, int destinationIndex, int length) + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + Preconditions.checkPositionIndexes(destinationIndex, destinationIndex + length, destination.length); + index += offset; + System.arraycopy(data, index, destination, destinationIndex, length); + } + + public byte[] getBytes() + { + return getBytes(0, length); + } + + public byte[] getBytes(int index, int length) + { + index += offset; + if (index == 0) { + return Arrays.copyOf(data, length); + } + else { + byte[] value = new byte[length]; + System.arraycopy(data, index, value, 0, length); + return value; + } + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the specified absolute {@code index} until the destination's position + * reaches its limit. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * if {@code index + dst.remaining()} is greater than + * {@code this.capacity} + */ + public void getBytes(int index, ByteBuffer destination) + { + Preconditions.checkPositionIndex(index, this.length); + index += offset; + destination.put(data, index, Math.min(length, destination.remaining())); + } + + /** + * Transfers this buffer's data to the specified stream starting at the + * specified absolute {@code index}. + * + * @param length the number of bytes to transfer + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * if {@code index + length} is greater than + * {@code this.capacity} + * @throws java.io.IOException if the specified stream threw an exception during I/O + */ + public void getBytes(int index, OutputStream out, int length) + throws IOException + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + index += offset; + out.write(data, index, length); + } + + /** + * Transfers this buffer's data to the specified channel starting at the + * specified absolute {@code index}. + * + * @param length the maximum number of bytes to transfer + * @return the actual number of bytes written out to the specified channel + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * if {@code index + length} is greater than + * {@code this.capacity} + * @throws java.io.IOException if the specified channel threw an exception during I/O + */ + public int getBytes(int index, GatheringByteChannel out, int length) + throws IOException + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + index += offset; + return out.write(ByteBuffer.wrap(data, index, length)); + } + + /** + * Sets the specified 16-bit short integer at the specified absolute + * {@code index} in this buffer. The 16 high-order bits of the specified + * value are ignored. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 2} is greater than {@code this.capacity} + */ + public void setShort(int index, int value) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_SHORT, this.length); + index += offset; + data[index] = (byte) (value); + data[index + 1] = (byte) (value >>> 8); + } + + /** + * Sets the specified 32-bit integer at the specified absolute + * {@code index} in this buffer. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 4} is greater than {@code this.capacity} + */ + public void setInt(int index, int value) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_INT, this.length); + index += offset; + data[index] = (byte) (value); + data[index + 1] = (byte) (value >>> 8); + data[index + 2] = (byte) (value >>> 16); + data[index + 3] = (byte) (value >>> 24); + } + + /** + * Sets the specified 64-bit long integer at the specified absolute + * {@code index} in this buffer. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 8} is greater than {@code this.capacity} + */ + public void setLong(int index, long value) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_LONG, this.length); + index += offset; + data[index] = (byte) (value); + data[index + 1] = (byte) (value >>> 8); + data[index + 2] = (byte) (value >>> 16); + data[index + 3] = (byte) (value >>> 24); + data[index + 4] = (byte) (value >>> 32); + data[index + 5] = (byte) (value >>> 40); + data[index + 6] = (byte) (value >>> 48); + data[index + 7] = (byte) (value >>> 56); + } + + /** + * Sets the specified byte at the specified absolute {@code index} in this + * buffer. The 24 high-order bits of the specified value are ignored. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * {@code index + 1} is greater than {@code this.capacity} + */ + public void setByte(int index, int value) + { + Preconditions.checkPositionIndexes(index, index + SIZE_OF_BYTE, this.length); + index += offset; + data[index] = (byte) value; + } + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the specified absolute {@code index}. + * + * @param srcIndex the first index of the source + * @param length the number of bytes to transfer + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0}, + * if the specified {@code srcIndex} is less than {@code 0}, + * if {@code index + length} is greater than + * {@code this.capacity}, or + * if {@code srcIndex + length} is greater than + * {@code src.capacity} + */ + public void setBytes(int index, Slice src, int srcIndex, int length) + { + setBytes(index, src.data, src.offset + srcIndex, length); + } + + /** + * Transfers the specified source array's data to this buffer starting at + * the specified absolute {@code index}. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0}, + * if the specified {@code srcIndex} is less than {@code 0}, + * if {@code index + length} is greater than + * {@code this.capacity}, or + * if {@code srcIndex + length} is greater than {@code src.length} + */ + public void setBytes(int index, byte[] source, int sourceIndex, int length) + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + Preconditions.checkPositionIndexes(sourceIndex, sourceIndex + length, source.length); + index += offset; + System.arraycopy(source, sourceIndex, data, index, length); + } + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the specified absolute {@code index} until the source buffer's position + * reaches its limit. + * + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * if {@code index + src.remaining()} is greater than + * {@code this.capacity} + */ + public void setBytes(int index, ByteBuffer source) + { + Preconditions.checkPositionIndexes(index, index + source.remaining(), this.length); + index += offset; + source.get(data, index, source.remaining()); + } + + /** + * Transfers the content of the specified source stream to this buffer + * starting at the specified absolute {@code index}. + * + * @param length the number of bytes to transfer + * @return the actual number of bytes read in from the specified channel. + * {@code -1} if the specified channel is closed. + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * if {@code index + length} is greater than {@code this.capacity} + * @throws java.io.IOException if the specified stream threw an exception during I/O + */ + public int setBytes(int index, InputStream in, int length) + throws IOException + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + index += offset; + int readBytes = 0; + do { + int localReadBytes = in.read(data, index, length); + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } + else { + break; + } + } + readBytes += localReadBytes; + index += localReadBytes; + length -= localReadBytes; + } while (length > 0); + + return readBytes; + } + + /** + * Transfers the content of the specified source channel to this buffer + * starting at the specified absolute {@code index}. + * + * @param length the maximum number of bytes to transfer + * @return the actual number of bytes read in from the specified channel. + * {@code -1} if the specified channel is closed. + * @throws IndexOutOfBoundsException if the specified {@code index} is less than {@code 0} or + * if {@code index + length} is greater than {@code this.capacity} + * @throws java.io.IOException if the specified channel threw an exception during I/O + */ + public int setBytes(int index, ScatteringByteChannel in, int length) + throws IOException + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + index += offset; + ByteBuffer buf = ByteBuffer.wrap(data, index, length); + int readBytes = 0; + + do { + int localReadBytes; + try { + localReadBytes = in.read(buf); + } + catch (ClosedChannelException e) { + localReadBytes = -1; + } + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } + else { + break; + } + } + else if (localReadBytes == 0) { + break; + } + readBytes += localReadBytes; + } while (readBytes < length); + + return readBytes; + } + + public int setBytes(int index, FileChannel in, int position, int length) + throws IOException + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + index += offset; + ByteBuffer buf = ByteBuffer.wrap(data, index, length); + int readBytes = 0; + + do { + int localReadBytes; + try { + localReadBytes = in.read(buf, position + readBytes); + } + catch (ClosedChannelException e) { + localReadBytes = -1; + } + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } + else { + break; + } + } + else if (localReadBytes == 0) { + break; + } + readBytes += localReadBytes; + } while (readBytes < length); + + return readBytes; + } + + public Slice copySlice() + { + return copySlice(0, length); + } + + /** + * Returns a copy of this buffer's sub-region. Modifying the content of + * the returned buffer or this buffer does not affect each other at all. + */ + public Slice copySlice(int index, int length) + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + + index += offset; + byte[] copiedArray = new byte[length]; + System.arraycopy(data, index, copiedArray, 0, length); + return new Slice(copiedArray); + } + + public byte[] copyBytes() + { + return copyBytes(0, length); + } + + public byte[] copyBytes(int index, int length) + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + index += offset; + if (index == 0) { + return Arrays.copyOf(data, length); + } + else { + byte[] value = new byte[length]; + System.arraycopy(data, index, value, 0, length); + return value; + } + } + + /** + * Returns a slice of this buffer's readable bytes. Modifying the content + * of the returned buffer or this buffer affects each other's content + * while they maintain separate indexes and marks. + */ + public Slice slice() + { + return slice(0, length); + } + + /** + * Returns a slice of this buffer's sub-region. Modifying the content of + * the returned buffer or this buffer affects each other's content while + * they maintain separate indexes and marks. + */ + public Slice slice(int index, int length) + { + if (index == 0 && length == this.length) { + return this; + } + + Preconditions.checkPositionIndexes(index, index + length, this.length); + if (index >= 0 && length == 0) { + return Slices.EMPTY_SLICE; + } + return new Slice(data, offset + index, length); + } + + /** + * Creates an input stream over this slice. + */ + public SliceInput input() + { + return new SliceInput(this); + } + + /** + * Creates an output stream over this slice. + */ + public SliceOutput output() + { + return new BasicSliceOutput(this); + } + + /** + * Converts this buffer's readable bytes into a NIO buffer. The returned + * buffer shares the content with this buffer. + */ + public ByteBuffer toByteBuffer() + { + return toByteBuffer(0, length); + } + + /** + * Converts this buffer's sub-region into a NIO buffer. The returned + * buffer shares the content with this buffer. + */ + public ByteBuffer toByteBuffer(int index, int length) + { + Preconditions.checkPositionIndexes(index, index + length, this.length); + index += offset; + return ByteBuffer.wrap(data, index, length).order(LITTLE_ENDIAN); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Slice slice = (Slice) o; + + // do lengths match + if (length != slice.length) { + return false; + } + + // if arrays have same base offset, some optimizations can be taken... + if (offset == slice.offset && data == slice.data) { + return true; + } + for (int i = 0; i < length; i++) { + if (data[offset + i] != slice.data[slice.offset + i]) { + return false; + } + } + return true; + } + + @Override + public int hashCode() + { + if (hash != 0) { + return hash; + } + + int result = length; + for (int i = offset; i < offset + length; i++) { + result = 31 * result + data[i]; + } + if (result == 0) { + result = 1; + } + hash = result; + return hash; + } + + /** + * Compares the content of the specified buffer to the content of this + * buffer. This comparison is performed byte by byte using an unsigned + * comparison. + */ + public int compareTo(Slice that) + { + if (this == that) { + return 0; + } + if (this.data == that.data && length == that.length && offset == that.offset) { + return 0; + } + + int minLength = Math.min(this.length, that.length); + for (int i = 0; i < minLength; i++) { + int thisByte = 0xFF & this.data[this.offset + i]; + int thatByte = 0xFF & that.data[that.offset + i]; + if (thisByte != thatByte) { + return (thisByte) - (thatByte); + } + } + return this.length - that.length; + } + + /** + * Decodes this buffer's readable bytes into a string with the specified + * character set name. + */ + public String toString(Charset charset) + { + return toString(0, length, charset); + } + + /** + * Decodes this buffer's sub-region into a string with the specified + * character set. + */ + public String toString(int index, int length, Charset charset) + { + if (length == 0) { + return ""; + } + + return Slices.decodeString(toByteBuffer(index, length), charset); + } + + public String toString() + { + return getClass().getSimpleName() + '(' + + "length=" + length() + + ')'; + } +} diff --git a/src/main/java/com/facebook/presto/SliceInput.java b/src/main/java/com/facebook/presto/SliceInput.java new file mode 100644 index 000000000000..faa5dc9160ee --- /dev/null +++ b/src/main/java/com/facebook/presto/SliceInput.java @@ -0,0 +1,457 @@ +package com.facebook.presto; + +import java.io.DataInput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.nio.charset.Charset; + +public final class SliceInput extends InputStream implements DataInput +{ + private final Slice slice; + private int position; + + public SliceInput(Slice slice) + { + this.slice = slice; + } + + /** + * Returns the {@code position} of this buffer. + */ + public int position() + { + return position; + } + + /** + * Sets the {@code position} of this buffer. + * + * @throws IndexOutOfBoundsException if the specified {@code position} is + * less than {@code 0} or + * greater than {@code this.writerIndex} + */ + public void setPosition(int position) + { + if (position < 0 || position > slice.length()) { + throw new IndexOutOfBoundsException(); + } + this.position = position; + } + + /** + * Returns {@code true} + * if and only if {@code available()} is greater + * than {@code 0}. + */ + public boolean isReadable() + { + return available() > 0; + } + + /** + * Returns the number of readable bytes which is equal to + * {@code (this.slice.length() - this.position)}. + */ + public int available() + { + return slice.length() - position; + } + + @Override + public boolean readBoolean() + throws IOException + { + return readByte() != 0; + } + + @Override + public int read() + { + return readByte(); + } + + /** + * Gets a byte at the current {@code position} and increases + * the {@code position} by {@code 1} in this buffer. + * + * @throws IndexOutOfBoundsException if {@code this.available()} is less than {@code 1} + */ + public byte readByte() + { + if (position == slice.length()) { + throw new IndexOutOfBoundsException(); + } + return slice.getByte(position++); + } + + /** + * Gets an unsigned byte at the current {@code position} and increases + * the {@code position} by {@code 1} in this buffer. + * + * @throws IndexOutOfBoundsException if {@code this.available()} is less than {@code 1} + */ + public int readUnsignedByte() + { + return (short) (readByte() & 0xFF); + } + + /** + * Gets a 16-bit short integer at the current {@code position} + * and increases the {@code position} by {@code 2} in this buffer. + * + * @throws IndexOutOfBoundsException if {@code this.available()} is less than {@code 2} + */ + public short readShort() + { + short v = slice.getShort(position); + position += 2; + return v; + } + + @Override + public int readUnsignedShort() + throws IOException + { + return readShort() & 0xff; + } + + /** + * Gets a 32-bit integer at the current {@code position} + * and increases the {@code position} by {@code 4} in this buffer. + * + * @throws IndexOutOfBoundsException if {@code this.available()} is less than {@code 4} + */ + public int readInt() + { + int v = slice.getInt(position); + position += 4; + return v; + } + + /** + * Gets an unsigned 32-bit integer at the current {@code position} + * and increases the {@code position} by {@code 4} in this buffer. + * + * @throws IndexOutOfBoundsException if {@code this.available()} is less than {@code 4} + */ + public long readUnsignedInt() + { + return readInt() & 0xFFFFFFFFL; + } + + /** + * Gets a 64-bit integer at the current {@code position} + * and increases the {@code position} by {@code 8} in this buffer. + * + * @throws IndexOutOfBoundsException if {@code this.available()} is less than {@code 8} + */ + public long readLong() + { + long v = slice.getLong(position); + position += 8; + return v; + } + + public byte[] readByteArray(int length) + { + byte[] value = slice.copyBytes(position, length); + position += length; + return value; + } + + /** + * Transfers this buffer's data to a newly created buffer starting at + * the current {@code position} and increases the {@code position} + * by the number of the transferred bytes (= {@code length}). + * The returned buffer's {@code position} and {@code writerIndex} are + * {@code 0} and {@code length} respectively. + * + * @param length the number of bytes to transfer + * @return the newly created buffer which contains the transferred bytes + * @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.available()} + */ + public Slice readBytes(int length) + { + if (length == 0) { + return Slices.EMPTY_SLICE; + } + Slice value = slice.slice(position, length); + position += length; + return value; + } + + /** + * Returns a new slice of this buffer's sub-region starting at the current + * {@code position} and increases the {@code position} by the size + * of the new slice (= {@code length}). + * + * @param length the size of the new slice + * @return the newly created slice + * @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.available()} + */ + public Slice readSlice(int length) + { + Slice newSlice = slice.slice(position, length); + position += length; + return newSlice; + } + + @Override + public void readFully(byte[] destination) + { + readBytes(destination); + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the current {@code position} and increases the {@code position} + * by the number of the transferred bytes (= {@code dst.length}). + * + * @throws IndexOutOfBoundsException if {@code dst.length} is greater than {@code this.available()} + */ + public void readBytes(byte[] destination) + { + readBytes(destination, 0, destination.length); + } + + @Override + public void readFully(byte[] destination, int offset, int length) + { + readBytes(destination, offset, length); + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the current {@code position} and increases the {@code position} + * by the number of the transferred bytes (= {@code length}). + * + * @param destinationIndex the first index of the destination + * @param length the number of bytes to transfer + * @throws IndexOutOfBoundsException if the specified {@code destinationIndex} is less than {@code 0}, + * if {@code length} is greater than {@code this.available()}, or + * if {@code destinationIndex + length} is greater than {@code destination.length} + */ + public void readBytes(byte[] destination, int destinationIndex, int length) + { + slice.getBytes(position, destination, destinationIndex, length); + position += length; + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the current {@code position} until the destination becomes + * non-writable, and increases the {@code position} by the number of the + * transferred bytes. This method is basically same with + * {@link #readBytes(Slice, int, int)}, except that this method + * increases the {@code writerIndex} of the destination by the number of + * the transferred bytes while {@link #readBytes(Slice, int, int)} + * does not. + * + * @throws IndexOutOfBoundsException if {@code destination.writableBytes} is greater than + * {@code this.available()} + */ + public void readBytes(Slice destination) + { + readBytes(destination, destination.length()); + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the current {@code position} and increases the {@code position} + * by the number of the transferred bytes (= {@code length}). This method + * is basically same with {@link #readBytes(Slice, int, int)}, + * except that this method increases the {@code writerIndex} of the + * destination by the number of the transferred bytes (= {@code length}) + * while {@link #readBytes(Slice, int, int)} does not. + * + * @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.available()} or + * if {@code length} is greater than {@code destination.writableBytes} + */ + public void readBytes(Slice destination, int length) + { + if (length > destination.length()) { + throw new IndexOutOfBoundsException(); + } + readBytes(destination, destination.length(), length); + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the current {@code position} and increases the {@code position} + * by the number of the transferred bytes (= {@code length}). + * + * @param destinationIndex the first index of the destination + * @param length the number of bytes to transfer + * @throws IndexOutOfBoundsException if the specified {@code destinationIndex} is less than {@code 0}, + * if {@code length} is greater than {@code this.available()}, or + * if {@code destinationIndex + length} is greater than + * {@code destination.capacity} + */ + public void readBytes(Slice destination, int destinationIndex, int length) + { + slice.getBytes(position, destination, destinationIndex, length); + position += length; + } + + /** + * Transfers this buffer's data to the specified destination starting at + * the current {@code position} until the destination's position + * reaches its limit, and increases the {@code position} by the + * number of the transferred bytes. + * + * @throws IndexOutOfBoundsException if {@code destination.remaining()} is greater than + * {@code this.available()} + */ + public void readBytes(ByteBuffer destination) + { + int length = destination.remaining(); + slice.getBytes(position, destination); + position += length; + } + + /** + * Transfers this buffer's data to the specified stream starting at the + * current {@code position}. + * + * @param length the maximum number of bytes to transfer + * @return the actual number of bytes written out to the specified channel + * @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.available()} + * @throws java.io.IOException if the specified channel threw an exception during I/O + */ + public int readBytes(GatheringByteChannel out, int length) + throws IOException + { + int readBytes = slice.getBytes(position, out, length); + position += readBytes; + return readBytes; + } + + /** + * Transfers this buffer's data to the specified stream starting at the + * current {@code position}. + * + * @param length the number of bytes to transfer + * @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.available()} + * @throws java.io.IOException if the specified stream threw an exception during I/O + */ + public void readBytes(OutputStream out, int length) + throws IOException + { + slice.getBytes(position, out, length); + position += length; + } + + public int skipBytes(int length) + { + length = Math.min(length, available()); + position += length; + return length; + } + + /** + * Returns a slice of this buffer's readable bytes. Modifying the content + * of the returned buffer or this buffer affects each other's content + * while they maintain separate indexes and marks. This method is + * identical to {@code buf.slice(buf.position(), buf.available()())}. + * This method does not modify {@code position} or {@code writerIndex} of + * this buffer. + */ + public Slice slice() + { + return slice.slice(position, available()); + } + + /** + * Converts this buffer's readable bytes into a NIO buffer. The returned + * buffer might or might not share the content with this buffer, while + * they have separate indexes and marks. This method is identical to + * {@code buf.toByteBuffer(buf.position(), buf.available()())}. + * This method does not modify {@code position} or {@code writerIndex} of + * this buffer. + */ + public ByteBuffer toByteBuffer() + { + return slice.toByteBuffer(position, available()); + } + + /** + * Decodes this buffer's readable bytes into a string with the specified + * character set name. This method is identical to + * {@code buf.toString(buf.position(), buf.available()(), charsetName)}. + * This method does not modify {@code position} or {@code writerIndex} of + * this buffer. + * + * @throws java.nio.charset.UnsupportedCharsetException if the specified character set name is not supported by the + * current VM + */ + public String toString(Charset charset) + { + return slice.toString(position, available(), charset); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + '(' + + "ridx=" + position + ", " + + "cap=" + slice.length() + + ')'; + } + + // + // Unsupported operations + // + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public char readChar() + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public float readFloat() + { + throw new UnsupportedOperationException(); + } + + @Override + public double readDouble() + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public String readLine() + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public String readUTF() + { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/com/facebook/presto/SliceOutput.java b/src/main/java/com/facebook/presto/SliceOutput.java new file mode 100644 index 000000000000..0950e9e637b3 --- /dev/null +++ b/src/main/java/com/facebook/presto/SliceOutput.java @@ -0,0 +1,336 @@ +package com.facebook.presto; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +public abstract class SliceOutput extends OutputStream implements DataOutput +{ + /** + * Resets this stream to the initial position. + */ + public abstract void reset(); + + /** + * Returns the {@code writerIndex} of this buffer. + */ + public abstract int size(); + + /** + * Returns the number of writable bytes which is equal to + * {@code (this.capacity - this.writerIndex)}. + */ + public abstract int writableBytes(); + + /** + * Returns {@code true} + * if and only if {@code (this.capacity - this.writerIndex)} is greater + * than {@code 0}. + */ + public abstract boolean isWritable(); + + @Override + public final void writeBoolean(boolean value) + { + writeByte(value ? 1 : 0); + } + + @Override + public final void write(int value) + { + writeByte(value); + } + + /** + * Sets the specified byte at the current {@code writerIndex} + * and increases the {@code writerIndex} by {@code 1} in this buffer. + * The 24 high-order bits of the specified value are ignored. + * + * @throws IndexOutOfBoundsException + * if {@code this.writableBytes} is less than {@code 1} + */ + public abstract void writeByte(int value); + + /** + * Sets the specified 16-bit short integer at the current + * {@code writerIndex} and increases the {@code writerIndex} by {@code 2} + * in this buffer. The 16 high-order bits of the specified value are ignored. + * + * @throws IndexOutOfBoundsException + * if {@code this.writableBytes} is less than {@code 2} + */ + public abstract void writeShort(int value); + + /** + * Sets the specified 32-bit integer at the current {@code writerIndex} + * and increases the {@code writerIndex} by {@code 4} in this buffer. + * + * @throws IndexOutOfBoundsException + * if {@code this.writableBytes} is less than {@code 4} + */ + public abstract void writeInt(int value); + + /** + * Sets the specified 64-bit long integer at the current + * {@code writerIndex} and increases the {@code writerIndex} by {@code 8} + * in this buffer. + * + * @throws IndexOutOfBoundsException + * if {@code this.writableBytes} is less than {@code 8} + */ + public abstract void writeLong(long value); + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer becomes + * unreadable, and increases the {@code writerIndex} by the number of + * the transferred bytes. This method is basically same with + * {@link #writeBytes(Slice, int, int)}, except that this method + * increases the {@code readerIndex} of the source buffer by the number of + * the transferred bytes while {@link #writeBytes(Slice, int, int)} + * does not. + * + * @throws IndexOutOfBoundsException + * if {@code source.readableBytes} is greater than + * {@code this.writableBytes} + * + */ + public abstract void writeBytes(Slice source); + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} and increases the {@code writerIndex} + * by the number of the transferred bytes (= {@code length}). This method + * is basically same with {@link #writeBytes(Slice, int, int)}, + * except that this method increases the {@code readerIndex} of the source + * buffer by the number of the transferred bytes (= {@code length}) while + * {@link #writeBytes(Slice, int, int)} does not. + * + * @param length the number of bytes to transfer + * + * @throws IndexOutOfBoundsException + * if {@code length} is greater than {@code this.writableBytes} or + * if {@code length} is greater then {@code source.readableBytes} + */ + public abstract void writeBytes(SliceInput source, int length); + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} and increases the {@code writerIndex} + * by the number of the transferred bytes (= {@code length}). + * + * @param sourceIndex the first index of the source + * @param length the number of bytes to transfer + * + * @throws IndexOutOfBoundsException + * if the specified {@code sourceIndex} is less than {@code 0}, + * if {@code sourceIndex + length} is greater than + * {@code source.capacity}, or + * if {@code length} is greater than {@code this.writableBytes} + */ + public abstract void writeBytes(Slice source, int sourceIndex, int length); + + @Override + public final void write(byte[] source) + throws IOException + { + writeBytes(source); + } + + /** + * Transfers the specified source array's data to this buffer starting at + * the current {@code writerIndex} and increases the {@code writerIndex} + * by the number of the transferred bytes (= {@code source.length}). + * + * @throws IndexOutOfBoundsException + * if {@code source.length} is greater than {@code this.writableBytes} + */ + public abstract void writeBytes(byte[] source); + + @Override + public final void write(byte[] source, int sourceIndex, int length) + { + writeBytes(source, sourceIndex, length); + } + + /** + * Transfers the specified source array's data to this buffer starting at + * the current {@code writerIndex} and increases the {@code writerIndex} + * by the number of the transferred bytes (= {@code length}). + * + * @param sourceIndex the first index of the source + * @param length the number of bytes to transfer + * + * @throws IndexOutOfBoundsException + * if the specified {@code sourceIndex} is less than {@code 0}, + * if {@code sourceIndex + length} is greater than + * {@code source.length}, or + * if {@code length} is greater than {@code this.writableBytes} + */ + public abstract void writeBytes(byte[] source, int sourceIndex, int length); + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer's position + * reaches its limit, and increases the {@code writerIndex} by the + * number of the transferred bytes. + * + * @throws IndexOutOfBoundsException + * if {@code source.remaining()} is greater than + * {@code this.writableBytes} + */ + public abstract void writeBytes(ByteBuffer source); + + /** + * Transfers the content of the specified stream to this buffer + * starting at the current {@code writerIndex} and increases the + * {@code writerIndex} by the number of the transferred bytes. + * + * @param length the number of bytes to transfer + * + * @return the actual number of bytes read in from the specified stream + * + * @throws IndexOutOfBoundsException + * if {@code length} is greater than {@code this.writableBytes} + * @throws java.io.IOException + * if the specified stream threw an exception during I/O + */ + public abstract int writeBytes(InputStream in, int length) throws IOException; + + /** + * Transfers the content of the specified channel to this buffer + * starting at the current {@code writerIndex} and increases the + * {@code writerIndex} by the number of the transferred bytes. + * + * @param length the maximum number of bytes to transfer + * + * @return the actual number of bytes read in from the specified channel + * + * @throws IndexOutOfBoundsException + * if {@code length} is greater than {@code this.writableBytes} + * @throws java.io.IOException + * if the specified channel threw an exception during I/O + */ + public abstract int writeBytes(ScatteringByteChannel in, int length) throws IOException; + + public abstract int writeBytes(FileChannel in, int position, int length) throws IOException; + + /** + * Fills this buffer with NUL (0x00) starting at the current + * {@code writerIndex} and increases the {@code writerIndex} by the + * specified {@code length}. + * + * @param length the number of NULs to write to the buffer + * + * @throws IndexOutOfBoundsException + * if {@code length} is greater than {@code this.writableBytes} + */ + public abstract void writeZero(int length); + + /** + * Returns a slice of this buffer's readable bytes. Modifying the content + * of the returned buffer or this buffer affects each other's content + * while they maintain separate indexes and marks. This method is + * identical to {@code buf.slice(buf.readerIndex(), buf.readableBytes())}. + * This method does not modify {@code readerIndex} or {@code writerIndex} of + * this buffer. + */ + public abstract Slice slice(); + + /** + * Converts this buffer's readable bytes into a NIO buffer. The returned + * buffer might or might not share the content with this buffer, while + * they have separate indexes and marks. This method is identical to + * {@code buf.toByteBuffer(buf.readerIndex(), buf.readableBytes())}. + * This method does not modify {@code readerIndex} or {@code writerIndex} of + * this buffer. + */ + public abstract ByteBuffer toByteBuffer(); + + /** + * Decodes this buffer's readable bytes into a string with the specified + * character set name. This method is identical to + * {@code buf.toString(buf.readerIndex(), buf.readableBytes(), charsetName)}. + * This method does not modify {@code readerIndex} or {@code writerIndex} of + * this buffer. + * + * @throws java.nio.charset.UnsupportedCharsetException if the specified character set name is not supported by the + * current VM + */ + public abstract String toString(Charset charset); + + // + // Unsupported operations + // + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeChar(int value) + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeFloat(float v) + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeDouble(double v) + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeChars(String s) + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeUTF(String s) + { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported operation + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeBytes(String s) + { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/com/facebook/presto/Slices.java b/src/main/java/com/facebook/presto/Slices.java new file mode 100644 index 000000000000..60aae9ee072c --- /dev/null +++ b/src/main/java/com/facebook/presto/Slices.java @@ -0,0 +1,265 @@ +/** + * Copyright (C) 2011 the original author or authors. + * See the notice.md file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto; + +import com.google.common.base.Preconditions; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.IdentityHashMap; +import java.util.Map; + +public final class Slices +{ + public static Slice readLengthPrefixedBytes(SliceInput sliceInput) + { + int length = VariableLengthQuantity.readVariableLengthInt(sliceInput); + return sliceInput.readBytes(length); + } + + public static void writeLengthPrefixedBytes(SliceOutput sliceOutput, Slice value) + { + VariableLengthQuantity.writeVariableLengthInt(value.length(), sliceOutput); + sliceOutput.writeBytes(value); + } + + /** + * A buffer whose capacity is {@code 0}. + */ + public static final Slice EMPTY_SLICE = new Slice(0); + + private Slices() + { + } + + public static Slice ensureSize(Slice existingSlice, int minWritableBytes) + { + if (existingSlice == null) { + existingSlice = EMPTY_SLICE; + } + + if (minWritableBytes <= existingSlice.length()) { + return existingSlice; + } + + int newCapacity; + if (existingSlice.length() == 0) { + newCapacity = 1; + } + else { + newCapacity = existingSlice.length(); + } + int minNewCapacity = existingSlice.length() + minWritableBytes; + while (newCapacity < minNewCapacity) { + newCapacity <<= 1; + } + + Slice newSlice = Slices.allocate(newCapacity); + newSlice.setBytes(0, existingSlice, 0, existingSlice.length()); + return newSlice; + } + + public static Slice allocate(int capacity) + { + if (capacity == 0) { + return EMPTY_SLICE; + } + return new Slice(capacity); + } + + public static Slice wrappedBuffer(byte[] array) + { + if (array.length == 0) { + return EMPTY_SLICE; + } + return new Slice(array); + } + + public static Slice copiedBuffer(ByteBuffer source, int sourceOffset, int length) + { + Preconditions.checkNotNull(source, "source is null"); + int newPosition = source.position() + sourceOffset; + return copiedBuffer((ByteBuffer) source.duplicate().order(ByteOrder.LITTLE_ENDIAN).clear().limit(newPosition + length).position(newPosition)); + } + + public static Slice copiedBuffer(ByteBuffer source) + { + Preconditions.checkNotNull(source, "source is null"); + Slice copy = allocate(source.limit() - source.position()); + copy.setBytes(0, source.duplicate().order(ByteOrder.LITTLE_ENDIAN)); + return copy; + } + + public static Slice copiedBuffer(String string, Charset charset) + { + Preconditions.checkNotNull(string, "string is null"); + Preconditions.checkNotNull(charset, "charset is null"); + + return wrappedBuffer(string.getBytes(charset)); + } + + public static ByteBuffer encodeString(CharBuffer src, Charset charset) + { + final CharsetEncoder encoder = getEncoder(charset); + final ByteBuffer dst = ByteBuffer.allocate( + (int) ((double) src.remaining() * encoder.maxBytesPerChar())); + try { + CoderResult cr = encoder.encode(src, dst, true); + if (!cr.isUnderflow()) { + cr.throwException(); + } + cr = encoder.flush(dst); + if (!cr.isUnderflow()) { + cr.throwException(); + } + } + catch (CharacterCodingException x) { + throw new IllegalStateException(x); + } + dst.flip(); + return dst; + } + + public static String decodeString(ByteBuffer src, Charset charset) + { + final CharsetDecoder decoder = getDecoder(charset); + final CharBuffer dst = CharBuffer.allocate( + (int) ((double) src.remaining() * decoder.maxCharsPerByte())); + try { + CoderResult cr = decoder.decode(src, dst, true); + if (!cr.isUnderflow()) { + cr.throwException(); + } + cr = decoder.flush(dst); + if (!cr.isUnderflow()) { + cr.throwException(); + } + } + catch (CharacterCodingException x) { + throw new IllegalStateException(x); + } + return dst.flip().toString(); + } + + /** + * Toggles the endianness of the specified 16-bit short integer. + */ + public static short swapShort(short value) + { + return (short) (value << 8 | value >>> 8 & 0xff); + } + + /** + * Toggles the endianness of the specified 32-bit integer. + */ + public static int swapInt(int value) + { + return swapShort((short) value) << 16 | + swapShort((short) (value >>> 16)) & 0xffff; + } + + /** + * Toggles the endianness of the specified 64-bit long integer. + */ + public static long swapLong(long value) + { + return (long) swapInt((int) value) << 32 | + swapInt((int) (value >>> 32)) & 0xffffffffL; + } + + private static final ThreadLocal> encoders = + new ThreadLocal>() + { + @Override + protected Map initialValue() + { + return new IdentityHashMap(); + } + }; + + private static final ThreadLocal> decoders = + new ThreadLocal>() + { + @Override + protected Map initialValue() + { + return new IdentityHashMap(); + } + }; + + /** + * Returns a cached thread-local {@link CharsetEncoder} for the specified + * charset. + */ + private static CharsetEncoder getEncoder(Charset charset) + { + if (charset == null) { + throw new NullPointerException("charset"); + } + + Map map = encoders.get(); + CharsetEncoder e = map.get(charset); + if (e != null) { + e.reset(); + e.onMalformedInput(CodingErrorAction.REPLACE); + e.onUnmappableCharacter(CodingErrorAction.REPLACE); + return e; + } + + e = charset.newEncoder(); + e.onMalformedInput(CodingErrorAction.REPLACE); + e.onUnmappableCharacter(CodingErrorAction.REPLACE); + map.put(charset, e); + return e; + } + + + /** + * Returns a cached thread-local {@link CharsetDecoder} for the specified + * charset. + */ + private static CharsetDecoder getDecoder(Charset charset) + { + if (charset == null) { + throw new NullPointerException("charset"); + } + + Map map = decoders.get(); + CharsetDecoder d = map.get(charset); + if (d != null) { + d.reset(); + d.onMalformedInput(CodingErrorAction.REPLACE); + d.onUnmappableCharacter(CodingErrorAction.REPLACE); + return d; + } + + d = charset.newDecoder(); + d.onMalformedInput(CodingErrorAction.REPLACE); + d.onUnmappableCharacter(CodingErrorAction.REPLACE); + map.put(charset, d); + return d; + } + +} diff --git a/src/main/java/com/facebook/presto/SumAggregation.java b/src/main/java/com/facebook/presto/SumAggregation.java index 5d7bf63037dd..e642f9bdb716 100644 --- a/src/main/java/com/facebook/presto/SumAggregation.java +++ b/src/main/java/com/facebook/presto/SumAggregation.java @@ -1,21 +1,31 @@ package com.facebook.presto; +import static com.facebook.presto.SizeOf.SIZE_OF_LONG; + public class SumAggregation - implements AggregationFunction + implements AggregationFunction { private long sum; + @Override + public TupleInfo getTupleInfo() + { + return new TupleInfo(SIZE_OF_LONG); + } + @Override public void add(ValueBlock values, PositionBlock relevantPositions) { - for (Object value : values.filter(relevantPositions)) { - sum += (Long) value; + for (Tuple value : values.filter(relevantPositions)) { + sum += value.getLong(0); } } @Override - public Object evaluate() + public Tuple evaluate() { - return sum; + Slice slice = Slices.allocate(SIZE_OF_LONG); + slice.setLong(0, sum); + return new Tuple(slice, getTupleInfo()); } } diff --git a/src/main/java/com/facebook/presto/Tuple.java b/src/main/java/com/facebook/presto/Tuple.java index 88693cbf139a..714e57ac6c47 100644 --- a/src/main/java/com/facebook/presto/Tuple.java +++ b/src/main/java/com/facebook/presto/Tuple.java @@ -1,27 +1,57 @@ package com.facebook.presto; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -import java.util.List; public class Tuple { - private final List values; + private final Slice slice; + private final TupleInfo tupleInfo; + + public Tuple(Slice slice, TupleInfo tupleInfo) + { + this.slice = slice; + this.tupleInfo = tupleInfo; + } + + public TupleInfo getTupleInfo() + { + return tupleInfo; + } + + public byte getByteValue(int index) + { + checkIndexSize(index, SizeOf.SIZE_OF_BYTE); + return slice.getByte(tupleInfo.getOffset(index)); + } - public Tuple(Object... values) { - this(ImmutableList.copyOf(values)); + public int getInt(int index) + { + checkIndexSize(index, SizeOf.SIZE_OF_BYTE); + + return slice.getInt(tupleInfo.getOffset(index)); + } + + public long getLong(int index) + { + checkIndexSize(index, SizeOf.SIZE_OF_LONG); + return slice.getLong(tupleInfo.getOffset(index)); } - public Tuple(List values) + public Slice getSlice(int index) { - Preconditions.checkNotNull(values, "values is null"); - this.values = ImmutableList.copyOf(values); + Preconditions.checkArgument(index < tupleInfo.size()); + return slice.slice(tupleInfo.getOffset(index), tupleInfo.getLength(index)); } - public List getValues() + public void writeTo(SliceOutput out) { - return values; + out.writeBytes(slice); + } + + private void checkIndexSize(int index, int size) + { + Preconditions.checkArgument(index < tupleInfo.size()); + Preconditions.checkArgument(tupleInfo.getLength(index) == size, "Value %s must be %s bytes wide, but is %s bytes", index, size, tupleInfo.getLength(index)); } @Override @@ -36,7 +66,10 @@ public boolean equals(Object o) Tuple tuple = (Tuple) o; - if (!values.equals(tuple.values)) { + if (!tupleInfo.equals(tuple.tupleInfo)) { + return false; + } + if (!slice.equals(tuple.slice)) { return false; } @@ -46,12 +79,19 @@ public boolean equals(Object o) @Override public int hashCode() { - return values.hashCode(); + int result = slice.hashCode(); + result = 31 * result + tupleInfo.hashCode(); + return result; } @Override public String toString() { - return values.toString(); + final StringBuilder sb = new StringBuilder(); + sb.append("Tuple"); + sb.append("{tupleInfo=").append(tupleInfo); + sb.append(", slice=").append(slice); + sb.append('}'); + return sb.toString(); } } diff --git a/src/main/java/com/facebook/presto/TupleInfo.java b/src/main/java/com/facebook/presto/TupleInfo.java new file mode 100644 index 000000000000..19bc21d476f1 --- /dev/null +++ b/src/main/java/com/facebook/presto/TupleInfo.java @@ -0,0 +1,103 @@ +package com.facebook.presto; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; + +import java.util.List; + +public class TupleInfo +{ + private final int size; + private final List lengths; + private final List offsets; + + public TupleInfo(int... lengths) + { + this(Ints.asList(lengths)); + } + + public TupleInfo(List lengths) + { + Preconditions.checkNotNull(lengths, "lengths is null"); + + this.lengths = ImmutableList.copyOf(lengths); + int rowLength = 0; + for (int i = 0; i < lengths.size(); i++) { + int length = lengths.get(i); + Preconditions.checkArgument(length >= 1, "length %s must be at least 1", i); + rowLength += length; + + } + + ImmutableList.Builder offsets = ImmutableList.builder(); + int current = 0; + for (int length : lengths) { + offsets.add(current); + current += length; + } + this.offsets = offsets.build(); + + this.size = rowLength; + } + + public List getLengths() + { + return lengths; + } + + public int getLength(int index) + { + return lengths.get(index); + } + + public List getOffsets() + { + return offsets; + } + + public int getOffset(int index) + { + return offsets.get(index); + } + + public int size() + { + return size; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TupleInfo tupleInfo = (TupleInfo) o; + + if (!lengths.equals(tupleInfo.lengths)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return lengths.hashCode(); + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder(); + sb.append("TupleInfo"); + sb.append("{lengths=").append(lengths); + sb.append('}'); + return sb.toString(); + } +} diff --git a/src/main/java/com/facebook/presto/UncompressedValueBlock.java b/src/main/java/com/facebook/presto/UncompressedValueBlock.java index fe04722693a4..fbe247174a02 100644 --- a/src/main/java/com/facebook/presto/UncompressedValueBlock.java +++ b/src/main/java/com/facebook/presto/UncompressedValueBlock.java @@ -1,90 +1,119 @@ package com.facebook.presto; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.DiscreteDomains; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; import com.google.common.collect.Range; import com.google.common.collect.Ranges; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import static com.facebook.presto.Pair.positionGetter; -import static com.facebook.presto.Pair.valueGetter; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.Arrays.asList; - public class UncompressedValueBlock - implements ValueBlock + implements ValueBlock { private final Range range; - private final List pairs; + private final TupleInfo tupleInfo; + private final Slice slice; - public UncompressedValueBlock(long startPosition, List values) + public UncompressedValueBlock(long startPosition, TupleInfo tupleInfo, Slice slice) { - ImmutableList.Builder builder = ImmutableList.builder(); + Preconditions.checkArgument(startPosition >= 0, "startPosition is negative"); + Preconditions.checkNotNull(tupleInfo, "tupleInfo is null"); + Preconditions.checkNotNull(slice, "data is null"); - long index = startPosition; - for (Object value : values) { - if (value != null) { - builder.add(new Pair(index, value)); - } + this.tupleInfo = tupleInfo; + this.slice = slice; - ++index; - } + Preconditions.checkArgument(slice.length() % tupleInfo.size() == 0, "data must be a multiple of tuple length"); - pairs = builder.build(); - range = Ranges.closed(pairs.get(0).getPosition(), pairs.get(pairs.size() - 1).getPosition()); - } - - public UncompressedValueBlock(long startPosition, Object... values) - { - this(startPosition, asList(values)); - } - - public UncompressedValueBlock(List pairs) - { - checkNotNull(pairs, "pairs is null"); - checkArgument(!pairs.isEmpty(), "pairs is empty"); - - this.pairs = pairs; - - range = Ranges.closed(pairs.get(0).getPosition(), pairs.get(pairs.size() - 1).getPosition()); + int rows = slice.length() / tupleInfo.size(); + range = Ranges.closed(startPosition, startPosition + rows - 1); } @Override - public PositionBlock selectPositions(Predicate predicate) + public PositionBlock selectPositions(Predicate predicate) { return null; } @Override - public ValueBlock selectPairs(Predicate predicate) + public ValueBlock selectPairs(Predicate predicate) { return null; } + /** + * Build a new block with only the selected value positions + */ @Override public ValueBlock filter(PositionBlock positions) { - ImmutableList pairs = ImmutableList.copyOf(Iterables.filter(this.pairs, Predicates.compose(positions, positionGetter()))); - - if (pairs.isEmpty()) { + List indexes = new ArrayList<>(); + for (long position : positions.getPositions()) { + if (range.contains(position)) { + indexes.add(position - range.lowerEndpoint()); + } + } + if (indexes.isEmpty()) { return new EmptyValueBlock(); } - return new UncompressedValueBlock(pairs); + Slice newSlice = Slices.allocate(indexes.size() * tupleInfo.size()); + SliceOutput sliceOutput = newSlice.output(); + for (long index : indexes) { + sliceOutput.writeBytes(slice, (int) (index * tupleInfo.size()), tupleInfo.size()); + } + + // todo what is the start position + return new UncompressedValueBlock(0, tupleInfo, newSlice); + } + + @Override + public Iterator iterator() + { + return new AbstractIterator() + { + private long index = 0; + + @Override + protected Tuple computeNext() + { + if (index >= getCount()) { + endOfData(); + return null; + } + Slice row = slice.slice((int) (index * tupleInfo.size()), tupleInfo.size()); + index++; + return new Tuple(row, tupleInfo); + } + }; } @Override public PeekingIterator pairIterator() { - return Iterators.peekingIterator(pairs.iterator()); + return Iterators.peekingIterator(new AbstractIterator() + { + private long index = 0; + + @Override + protected Pair computeNext() + { + if (index >= getCount()) { + endOfData(); + return null; + } + Slice row = slice.slice((int) (index * tupleInfo.size()), tupleInfo.size()); + long position = index + range.lowerEndpoint(); + index++; + return new Pair(position, new Tuple(row, tupleInfo)); + } + }); } @Override @@ -96,7 +125,7 @@ public boolean isEmpty() @Override public int getCount() { - return pairs.size(); + return (int) (range.upperEndpoint() - range.lowerEndpoint() + 1); } @Override @@ -108,7 +137,7 @@ public boolean isSorted() @Override public boolean isSingleValue() { - return pairs.size() == 1; + return getCount() == 1; } @Override @@ -120,7 +149,7 @@ public boolean isPositionsContiguous() @Override public Iterable getPositions() { - return Lists.transform(pairs, positionGetter()); + return range.asSet(DiscreteDomains.longs()); } @Override @@ -129,14 +158,15 @@ public Range getRange() return range; } - public String toString() - { - return pairs.toString(); - } - @Override - public Iterator iterator() + public String toString() { - return Lists.transform(pairs, valueGetter()).iterator(); + final StringBuilder sb = new StringBuilder(); + sb.append("FixedWidthValueBlock"); + sb.append("{range=").append(range); + sb.append(", tupleInfo=").append(tupleInfo); + sb.append(", slice=").append(slice); + sb.append('}'); + return sb.toString(); } } diff --git a/src/main/java/com/facebook/presto/ValueBlock.java b/src/main/java/com/facebook/presto/ValueBlock.java index 2a7c4ec4886a..f4f7b0548250 100644 --- a/src/main/java/com/facebook/presto/ValueBlock.java +++ b/src/main/java/com/facebook/presto/ValueBlock.java @@ -1,16 +1,15 @@ package com.facebook.presto; -import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.PeekingIterator; -import java.util.Iterator; - public interface ValueBlock - extends Block, Iterable + extends Block, Iterable { - PositionBlock selectPositions(Predicate predicate); - ValueBlock selectPairs(Predicate predicate); + PositionBlock selectPositions(Predicate predicate); + + ValueBlock selectPairs(Predicate predicate); + ValueBlock filter(PositionBlock positions); PeekingIterator pairIterator(); diff --git a/src/main/java/com/facebook/presto/VariableLengthQuantity.java b/src/main/java/com/facebook/presto/VariableLengthQuantity.java new file mode 100644 index 000000000000..00a3c69a1828 --- /dev/null +++ b/src/main/java/com/facebook/presto/VariableLengthQuantity.java @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2011 the original author or authors. + * See the notice.md file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto; + +import java.nio.ByteBuffer; + +public final class VariableLengthQuantity +{ + private VariableLengthQuantity() + { + } + + public static int variableLengthSize(int value) + { + int size = 1; + while ((value & (~0x7f)) != 0) { + value >>>= 7; + size++; + } + return size; + } + + public static int variableLengthSize(long value) + { + int size = 1; + while ((value & (~0x7f)) != 0) { + value >>>= 7; + size++; + } + return size; + } + + public static void writeVariableLengthInt(int value, SliceOutput sliceOutput) + { + int highBitMask = 0x80; + if (value < (1 << 7) && value >= 0) { + sliceOutput.writeByte(value); + } + else if (value < (1 << 14) && value > 0) { + sliceOutput.writeByte(value | highBitMask); + sliceOutput.writeByte(value >>> 7); + } + else if (value < (1 << 21) && value > 0) { + sliceOutput.writeByte(value | highBitMask); + sliceOutput.writeByte((value >>> 7) | highBitMask); + sliceOutput.writeByte(value >>> 14); + } + else if (value < (1 << 28) && value > 0) { + sliceOutput.writeByte(value | highBitMask); + sliceOutput.writeByte((value >>> 7) | highBitMask); + sliceOutput.writeByte((value >>> 14) | highBitMask); + sliceOutput.writeByte(value >>> 21); + } + else { + sliceOutput.writeByte(value | highBitMask); + sliceOutput.writeByte((value >>> 7) | highBitMask); + sliceOutput.writeByte((value >>> 14) | highBitMask); + sliceOutput.writeByte((value >>> 21) | highBitMask); + sliceOutput.writeByte(value >>> 28); + } + } + + public static void writeVariableLengthLong(long value, SliceOutput sliceOutput) + { + // while value more than the first 7 bits set + while ((value & (~0x7f)) != 0) { + sliceOutput.writeByte((int) ((value & 0x7f) | 0x80)); + value >>>= 7; + } + sliceOutput.writeByte((int) value); + } + + public static int readVariableLengthInt(SliceInput sliceInput) + { + int result = 0; + for (int shift = 0; shift <= 28; shift += 7) { + int b = sliceInput.readUnsignedByte(); + + // add the lower 7 bits to the result + result |= ((b & 0x7f) << shift); + + // if high bit is not set, this is the last byte in the number + if ((b & 0x80) == 0) { + return result; + } + } + throw new NumberFormatException("last byte of variable length int has high bit set"); + } + + public static int readVariableLengthInt(ByteBuffer sliceInput) + { + int result = 0; + for (int shift = 0; shift <= 28; shift += 7) { + int b = sliceInput.get(); + + // add the lower 7 bits to the result + result |= ((b & 0x7f) << shift); + + // if high bit is not set, this is the last byte in the number + if ((b & 0x80) == 0) { + return result; + } + } + throw new NumberFormatException("last byte of variable length int has high bit set"); + } + + public static long readVariableLengthLong(SliceInput sliceInput) + { + long result = 0; + for (int shift = 0; shift <= 63; shift += 7) { + long b = sliceInput.readUnsignedByte(); + + // add the lower 7 bits to the result + result |= ((b & 0x7f) << shift); + + // if high bit is not set, this is the last byte in the number + if ((b & 0x80) == 0) { + return result; + } + } + throw new NumberFormatException("last byte of variable length int has high bit set"); + } +} diff --git a/src/test/java/com/facebook/presto/CsvFileScannerTest.java b/src/test/java/com/facebook/presto/CsvFileScannerTest.java deleted file mode 100644 index 82edce087ba4..000000000000 --- a/src/test/java/com/facebook/presto/CsvFileScannerTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.facebook.presto; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.io.InputSupplier; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.io.InputStreamReader; - -import static com.google.common.io.Resources.getResource; -import static com.google.common.io.Resources.newReaderSupplier; - -public class CsvFileScannerTest -{ - private final InputSupplier inputSupplier = newReaderSupplier(getResource("data.csv"), Charsets.UTF_8); - - @Test - public void testIterator() - throws Exception - { - CsvFileScanner firstColumn = new CsvFileScanner(inputSupplier, 0, ','); - - Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(firstColumn.iterator())), - ImmutableList.of( - new Pair(0, "0"), - new Pair(1, "1"), - new Pair(2, "2"), - new Pair(3, "3"))); - - CsvFileScanner secondColumn = new CsvFileScanner(inputSupplier, 1, ','); - Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(secondColumn.iterator())), - ImmutableList.of( - new Pair(0, "apple"), - new Pair(1, "banana"), - new Pair(2, "cherry"), - new Pair(3, "date"))); - - CsvFileScanner thirdColumn = new CsvFileScanner(inputSupplier, 2, ','); - Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(thirdColumn.iterator())), - ImmutableList.of( - new Pair(0, "alice"), - new Pair(1, "bob"), - new Pair(2, "charlie"), - new Pair(3, "dave"))); - } -} diff --git a/src/test/java/com/facebook/presto/TestCsvFileScanner.java b/src/test/java/com/facebook/presto/TestCsvFileScanner.java new file mode 100644 index 000000000000..da2871241a73 --- /dev/null +++ b/src/test/java/com/facebook/presto/TestCsvFileScanner.java @@ -0,0 +1,57 @@ +package com.facebook.presto; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.io.InputSupplier; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.InputStreamReader; + +import static com.facebook.presto.SizeOf.SIZE_OF_LONG; +import static com.google.common.io.Resources.getResource; +import static com.google.common.io.Resources.newReaderSupplier; + +public class TestCsvFileScanner +{ + private final InputSupplier inputSupplier = newReaderSupplier(getResource("data.csv"), Charsets.UTF_8); + + @Test + public void testIterator() + throws Exception + { + CsvFileScanner firstColumn = new CsvFileScanner(inputSupplier, 0, ',', new TupleInfo(SIZE_OF_LONG)); + + ImmutableList actual = ImmutableList.copyOf(new PairsIterator(firstColumn.iterator())); + Assert.assertEquals(actual, + ImmutableList.of( + new Pair(0, createTuple(0)), + new Pair(1, createTuple(1)), + new Pair(2, createTuple(2)), + new Pair(3, createTuple(3)))); + + // todo add support for variable length columns +// CsvFileScanner secondColumn = new CsvFileScanner(inputSupplier, 1, ','); +// Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(secondColumn.iterator())), +// ImmutableList.of( +// new Pair(0, new Tuple("apple")), +// new Pair(1, new Tuple("banana")), +// new Pair(2, new Tuple("cherry")), +// new Pair(3, new Tuple("date")))); +// +// CsvFileScanner thirdColumn = new CsvFileScanner(inputSupplier, 2, ','); +// Assert.assertEquals(ImmutableList.copyOf(new PairsIterator(thirdColumn.iterator())), +// ImmutableList.of( +// new Pair(0, new Tuple("alice")), +// new Pair(1, new Tuple("bob")), +// new Pair(2, new Tuple("charlie")), +// new Pair(3, new Tuple("dave")))); + } + + private Tuple createTuple(long value) + { + Slice slice = Slices.allocate(SIZE_OF_LONG); + slice.setLong(0, value); + return new Tuple(slice, new TupleInfo(SIZE_OF_LONG)); + } +} diff --git a/src/test/java/com/facebook/presto/TestExample.java b/src/test/java/com/facebook/presto/TestExample.java index 4ee253865114..e8f746c0ffd9 100644 --- a/src/test/java/com/facebook/presto/TestExample.java +++ b/src/test/java/com/facebook/presto/TestExample.java @@ -13,7 +13,7 @@ public static void main(String[] args) DataScan3 scan = newScan(); DataScan3 scan2 = newScan(); - Merge merge = new Merge(ImmutableList.of(scan, scan2)); + Merge merge = new Merge(ImmutableList.of(scan, scan2), new TupleInfo(1, 1)); while (merge.hasNext()) { ValueBlock block = merge.next(); @@ -27,9 +27,9 @@ public static void main(String[] args) private static DataScan3 newScan() { Iterator values = ImmutableList.builder() - .add(new UncompressedValueBlock(0, "a", "b", "c", "d", "e", "f")) - .add(new UncompressedValueBlock(20, "h", "i", "j", "k", "l", "m")) - .add(new UncompressedValueBlock(30, "n", "o", "p", "q", "r", "s")) + .add(new UncompressedValueBlock(0, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'a', 'b', 'c', 'd', 'e', 'f'}))) + .add(new UncompressedValueBlock(20, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'h', 'i', 'j', 'k', 'l', 'm'}))) + .add(new UncompressedValueBlock(30, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'n', 'o', 'p', 'q', 'r', 's'}))) .build() .iterator(); diff --git a/src/test/java/com/facebook/presto/TestSumAggregation.java b/src/test/java/com/facebook/presto/TestSumAggregation.java index 13d615919231..ca8bfa517458 100644 --- a/src/test/java/com/facebook/presto/TestSumAggregation.java +++ b/src/test/java/com/facebook/presto/TestSumAggregation.java @@ -1,5 +1,6 @@ package com.facebook.presto; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.PeekingIterator; @@ -13,26 +14,32 @@ import java.util.List; import java.util.Map; +import static com.facebook.presto.SizeOf.SIZE_OF_BYTE; +import static com.facebook.presto.SizeOf.SIZE_OF_LONG; + public class TestSumAggregation { @Test public void testPipelinedAggregation() { GroupBy groupBy = new GroupBy(newGroupColumn()); - PipelinedAggregation aggregation = new PipelinedAggregation(groupBy, new ForwardingSeekableIterator<>(newAggregateColumn()), new Provider() - { - @Override - public AggregationFunction get() - { - return new SumAggregation(); - } - }); + PipelinedAggregation aggregation = new PipelinedAggregation(new TupleInfo(SIZE_OF_BYTE, SIZE_OF_LONG), + groupBy, + new ForwardingSeekableIterator<>(newAggregateColumn()), + new Provider() + { + @Override + public AggregationFunction get() + { + return new SumAggregation(); + } + }); List expected = ImmutableList.of( - new Pair(0, new Tuple("a", 10L)), - new Pair(4, new Tuple("b", 17L)), - new Pair(23, new Tuple("c", 15L)), - new Pair(30, new Tuple("d", 6L)) + new Pair(0, createTuple("a", 10L)), + new Pair(1, createTuple("b", 17L)), + new Pair(2, createTuple("c", 15L)), + new Pair(3, createTuple("d", 6L)) ); List actual = new ArrayList<>(); @@ -48,24 +55,35 @@ public AggregationFunction get() Assert.assertEquals(actual, expected); } + private Tuple createTuple(String character, long count) + { + Slice slice = Slices.allocate(SIZE_OF_BYTE + SIZE_OF_LONG); + slice.setByte(0, character.charAt(0)); + slice.setLong(1, count); + return new Tuple(slice, new TupleInfo(SIZE_OF_BYTE, SIZE_OF_LONG)); + } + @Test public void testHashAggregation() { GroupBy groupBy = new GroupBy(newGroupColumn()); - HashAggregation aggregation = new HashAggregation(groupBy, new ForwardingSeekableIterator<>(newAggregateColumn()), new Provider() - { - @Override - public AggregationFunction get() - { - return new SumAggregation(); - } - }); + HashAggregation aggregation = new HashAggregation(new TupleInfo(SIZE_OF_BYTE, SIZE_OF_LONG), + groupBy, + new ForwardingSeekableIterator<>(newAggregateColumn()), + new Provider() + { + @Override + public AggregationFunction get() + { + return new SumAggregation(); + } + }); Map expected = ImmutableMap.of( - "a", new Tuple("a", 10L), - "b", new Tuple("b", 17L), - "c", new Tuple("c", 15L), - "d", new Tuple("d", 6L) + "a", createTuple("a", 10L), + "b", createTuple("b", 17L), + "c", createTuple("c", 15L), + "d", createTuple("d", 6L) ); Map actual = new HashMap<>(); @@ -74,8 +92,8 @@ public AggregationFunction get() PeekingIterator pairs = block.pairIterator(); while (pairs.hasNext()) { Pair pair = pairs.next(); - Tuple tuple = (Tuple) pair.getValue(); - actual.put(tuple.getValues().get(0), tuple); + Tuple tuple = pair.getValue(); + actual.put(tuple.getSlice(0).toString(Charsets.UTF_8), tuple); } } @@ -85,11 +103,11 @@ public AggregationFunction get() public Iterator newGroupColumn() { Iterator values = ImmutableList.builder() - .add(new UncompressedValueBlock(0, "a", "a", "a", "a", "b", "b")) - .add(new UncompressedValueBlock(20, "b", "b", "b", "c", "c", "c")) - .add(new UncompressedValueBlock(30, "d")) - .add(new UncompressedValueBlock(31, "d")) - .add(new UncompressedValueBlock(32, "d")) + .add(new UncompressedValueBlock(0, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'a', 'a', 'a', 'a', 'b', 'b'}))) + .add(new UncompressedValueBlock(20, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'b', 'b', 'b', 'c', 'c', 'c'}))) + .add(new UncompressedValueBlock(30, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'d'}))) + .add(new UncompressedValueBlock(31, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'d'}))) + .add(new UncompressedValueBlock(32, new TupleInfo(1), Slices.wrappedBuffer(new byte[]{'d'}))) .build() .iterator(); @@ -99,14 +117,24 @@ public Iterator newGroupColumn() public Iterator newAggregateColumn() { Iterator values = ImmutableList.builder() - .add(new UncompressedValueBlock(0, 1L, 2L, 3L, 4L, 5L, 6L)) - .add(new UncompressedValueBlock(20, 1L, 2L, 3L, 4L, 5L, 6L)) - .add(new UncompressedValueBlock(30, 1L)) - .add(new UncompressedValueBlock(31, 2L)) - .add(new UncompressedValueBlock(32, 3L)) + .add(createBlock(0, 1L, 2L, 3L, 4L, 5L, 6L)) + .add(createBlock(20, 1L, 2L, 3L, 4L, 5L, 6L)) + .add(createBlock(30, 1L)) + .add(createBlock(31, 2L)) + .add(createBlock(32, 3L)) .build() .iterator(); return values; } + + private UncompressedValueBlock createBlock(long position, long... values) + { + Slice slice = Slices.allocate(values.length * SIZE_OF_LONG); + SliceOutput output = slice.output(); + for (long value : values) { + output.writeLong(value); + } + return new UncompressedValueBlock(position, new TupleInfo(SIZE_OF_LONG), slice); + } }