diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java index e33746f95db05..0c2aecec4abc6 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.orc.metadata; +import java.util.Objects; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; @@ -20,7 +21,8 @@ public class Stream { - public enum StreamArea { + public enum StreamArea + { INDEX, DATA, } @@ -130,4 +132,28 @@ public Stream withOffset(long offset) this.sequence, Optional.of(offset)); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Stream stream = (Stream) o; + return column == stream.column + && length == stream.length + && useVInts == stream.useVInts + && sequence == stream.sequence + && streamKind == stream.streamKind + && Objects.equals(offset, stream.offset); + } + + @Override + public int hashCode() + { + return Objects.hash(column, streamKind, length, useVInts, sequence, offset); + } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/StreamDataOutput.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/StreamDataOutput.java index b09c0e6c0f4fb..f72438f111afd 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/StreamDataOutput.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/StreamDataOutput.java @@ -20,6 +20,7 @@ import java.util.function.ToLongFunction; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; @@ -68,4 +69,12 @@ public void writeData(SliceOutput sliceOutput) long size = writer.applyAsLong(sliceOutput); verify(stream.getLength() == size, "Data stream did not write expected size"); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("stream", stream) + .toString(); + } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnSequenceKey.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnSequenceKey.java new file mode 100644 index 0000000000000..e605b71f6598d --- /dev/null +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnSequenceKey.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.orc.writer; + +import java.util.Objects; + +class ColumnSequenceKey +{ + final int column; + final int sequence; + + public ColumnSequenceKey(int column, int sequence) + { + this.column = column; + this.sequence = sequence; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ColumnSequenceKey that = (ColumnSequenceKey) o; + return column == that.column && sequence == that.sequence; + } + + @Override + public int hashCode() + { + return Objects.hash(column, sequence); + } +} diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnSizeLayout.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnSizeLayout.java new file mode 100644 index 0000000000000..397d9f8164825 --- /dev/null +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnSizeLayout.java @@ -0,0 +1,164 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.orc.writer; + +import com.facebook.presto.orc.metadata.ColumnEncoding; +import com.facebook.presto.orc.metadata.Stream; +import com.facebook.presto.orc.stream.StreamDataOutput; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DWRF_MAP_FLAT; +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * Order streams by total column size in the desc order preserving the type-tree + * iteration order for complex types. + *

+ * For flatmaps consider streams for the same flatmap key as a pseudo-column: do + * additional grouping by sequence to keep streams for the same key together, + * and then sort streams belonging to the same flatmap key by the total size in + * desc order. + */ +public class ColumnSizeLayout + implements StreamLayout +{ + @Override + public void reorder(List dataStreams, Map nodeToColumn, Map nodeIdToColumnEncodings) + { + requireNonNull(dataStreams, "dataStreams is null"); + requireNonNull(nodeToColumn, "nodeToColumn is null"); + requireNonNull(nodeIdToColumnEncodings, "nodeIdToColumnEncodings is null"); + + if (dataStreams.isEmpty()) { + return; + } + + Set flatMapColumns = getFlatMapColumns(nodeToColumn, nodeIdToColumnEncodings); + + // gather column sizes on the column and column+seq levels + Map columnSize = new HashMap<>(); + Map flatMapColumnSize = new HashMap<>(); + + for (StreamDataOutput dataStream : dataStreams) { + Stream stream = dataStream.getStream(); + int node = stream.getColumn(); + Integer column = nodeToColumn.get(node); + + long[] storedColumnSize = columnSize.computeIfAbsent(column, (n) -> new long[] {0}); + storedColumnSize[0] += dataStream.size(); + + if (flatMapColumns.contains(column)) { + ColumnSequenceKey key = new ColumnSequenceKey(column, stream.getSequence()); + long[] storedFlatMapColumnSize = flatMapColumnSize.computeIfAbsent(key, (n) -> new long[] {0}); + storedFlatMapColumnSize[0] += dataStream.size(); + } + } + + // do the ordering + dataStreams.sort((streamDataA, streamDataB) -> { + Stream streamA = streamDataA.getStream(); + Stream streamB = streamDataB.getStream(); + + int nodeA = streamA.getColumn(); + int nodeB = streamB.getColumn(); + + int columnA = nodeToColumn.get(nodeA); + int columnB = nodeToColumn.get(nodeB); + + boolean isFlatMapA = flatMapColumns.contains(columnA); + boolean isFlatMapB = flatMapColumns.contains(columnB); + + // split non-flatmap and flatmap columns into separate groups + if (isFlatMapA != isFlatMapB) { + return Boolean.compare(isFlatMapA, isFlatMapB); + } + + long columnSizeA = columnSize.get(columnA)[0]; + long columnSizeB = columnSize.get(columnB)[0]; + + // order columns by total column size in desc order + if (columnSizeA != columnSizeB) { + return Long.compare(columnSizeB, columnSizeA); + } + + // group streams by the column + if (columnA != columnB) { + return Integer.compare(columnA, columnB); + } + + if (isFlatMapA) { + int sequenceA = streamA.getSequence(); + int sequenceB = streamB.getSequence(); + + // special handling for seq 0 before sorting by the col+seq size + // to keep it on top of the group + if (sequenceA != sequenceB) { + if (sequenceA == 0) { + return -1; + } + if (sequenceB == 0) { + return 1; + } + } + + long columnSeqSizeA = flatMapColumnSize.get(new ColumnSequenceKey(columnA, sequenceA))[0]; + long columnSeqSizeB = flatMapColumnSize.get(new ColumnSequenceKey(columnB, sequenceB))[0]; + + // order sequences by total column+seq size in desc order + if (columnSeqSizeA != columnSeqSizeB) { + return Long.compare(columnSeqSizeB, columnSeqSizeA); + } + + // group by the sequence + if (sequenceA != sequenceB) { + return Integer.compare(sequenceA, sequenceB); + } + } + + // order by the node in asc order + if (nodeA != nodeB) { + return Integer.compare(nodeA, nodeB); + } + + // sort by the stream kind, we don't really need it, but it makes testing easier + return Integer.compare(streamA.getStreamKind().ordinal(), streamB.getStreamKind().ordinal()); + }); + } + + @Override + public String toString() + { + return toStringHelper(this) + .toString(); + } + + private static Set getFlatMapColumns(Map nodeIdToColumn, Map nodeIdToColumnEncodings) + { + Set flatMapColumns = new HashSet<>(); + for (Map.Entry e : nodeIdToColumnEncodings.entrySet()) { + Integer node = e.getKey(); + ColumnEncoding encoding = e.getValue(); + if (encoding.getColumnEncodingKind() == DWRF_MAP_FLAT) { + flatMapColumns.add(nodeIdToColumn.get(node)); + } + } + return flatMapColumns; + } +} diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayout.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayout.java index 727cc525edb6a..81bda845ccb56 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayout.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayout.java @@ -14,14 +14,12 @@ package com.facebook.presto.orc.writer; import com.facebook.presto.orc.metadata.ColumnEncoding; -import com.facebook.presto.orc.metadata.Stream; import com.facebook.presto.orc.stream.StreamDataOutput; import java.util.Collections; import java.util.List; import java.util.Map; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.util.Objects.requireNonNull; /** @@ -63,62 +61,4 @@ public String toString() return "ByStreamSize{}"; } } - - /** - * Streams are ordered by Column Size. If two columns have same size then - * columnId, stream size and stream kind are used for ordering. It orders them - * by ascending order of column size to read multiple small columns in one IO. - * All streams for a column are stored together to read one column in one IO. - */ - class ByColumnSize - implements StreamLayout - { - public void reorder(List dataStreams) - { - requireNonNull(dataStreams, "dataStreams is null"); - if (dataStreams.isEmpty()) { - return; - } - - Map columnSizes = dataStreams.stream() - .collect(toImmutableMap( - s -> s.getStream().getColumn(), - s -> (long) s.getStream().getLength(), - Long::sum)); - - dataStreams.sort((left, right) -> { - Stream leftStream = left.getStream(); - Stream rightStream = right.getStream(); - - long sizeDelta = columnSizes.get(leftStream.getColumn()) - columnSizes.get(rightStream.getColumn()); - if (sizeDelta != 0) { - return sizeDelta < 0 ? -1 : 1; - } - - int columnDelta = leftStream.getColumn() - rightStream.getColumn(); - if (columnDelta != 0) { - return columnDelta; - } - - sizeDelta = leftStream.getLength() - rightStream.getLength(); - if (sizeDelta != 0) { - return sizeDelta < 0 ? -1 : 1; - } - - return leftStream.getStreamKind().compareTo(rightStream.getStreamKind()); - }); - } - - @Override - public void reorder(List dataStreams, Map nodeIdToColumn, Map nodeIdToColumnEncodings) - { - reorder(dataStreams); - } - - @Override - public String toString() - { - return "ByColumnSize{}"; - } - } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayoutFactory.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayoutFactory.java index db49315a0dd17..35c5a0f5e7cfe 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayoutFactory.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamLayoutFactory.java @@ -23,7 +23,7 @@ class ColumnSizeLayoutFactory @Override public StreamLayout create() { - return new StreamLayout.ByColumnSize(); + return new ColumnSizeLayout(); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamOrderingLayout.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamOrderingLayout.java index 100f317aa379c..f0f81937bdad9 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamOrderingLayout.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StreamOrderingLayout.java @@ -16,23 +16,41 @@ import com.facebook.presto.orc.DwrfStreamOrderingConfig; import com.facebook.presto.orc.metadata.ColumnEncoding; import com.facebook.presto.orc.metadata.DwrfSequenceEncoding; +import com.facebook.presto.orc.metadata.Stream; import com.facebook.presto.orc.proto.DwrfProto; import com.facebook.presto.orc.stream.StreamDataOutput; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; public class StreamOrderingLayout implements StreamLayout { + private static final Comparator IN_GROUP_COMPARATOR = (streamDataA, streamDataB) -> { + Stream streamA = streamDataA.getStream(); + Stream streamB = streamDataB.getStream(); + int nodeA = streamA.getColumn(); + int nodeB = streamB.getColumn(); + + // order by the node in asc order + if (nodeA != nodeB) { + return Integer.compare(nodeA, nodeB); + } + + // order streams of the same node by the stream kind + return Integer.compare(streamA.getStreamKind().ordinal(), streamB.getStreamKind().ordinal()); + }; + private final DwrfStreamOrderingConfig config; private final StreamLayout nonStreamOrderingLayout; @@ -47,11 +65,11 @@ public StreamOrderingLayout( private static class StreamMetadata { // -> List> - private final Map> sequenceToStreams; + private final Map> sequenceToStreams; // -> SequenceId private final Map keyToSequence; - public StreamMetadata(Map> sequenceToStreams, Map keyToSequence) + public StreamMetadata(Map> sequenceToStreams, Map keyToSequence) { this.sequenceToStreams = requireNonNull(sequenceToStreams, "sequenceToStreams cannot be null"); this.keyToSequence = requireNonNull(keyToSequence, "keyToSequence cannot be null"); @@ -89,44 +107,13 @@ public int hashCode() } } - private static class ColumnSequenceInfo - { - private final int column; - private final int sequence; - - public ColumnSequenceInfo(int column, int sequence) - { - this.column = column; - this.sequence = sequence; - } - - @Override - public boolean equals(Object obj) - { - if (obj == null) { - return false; - } - if (!(obj instanceof ColumnSequenceInfo)) { - return false; - } - ColumnSequenceInfo input = (ColumnSequenceInfo) obj; - return this.column == input.column && this.sequence == input.sequence; - } - - @Override - public int hashCode() - { - return Objects.hash(column, sequence); - } - } - private StreamMetadata getStreamMetadata( Map nodeIdToColumn, Map nodeIdToColumnEncodings, DwrfStreamOrderingConfig config) { ImmutableMap.Builder keyToSequenceBuilder = ImmutableMap.builder(); - ImmutableMap.Builder> sequenceToStreamsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder> sequenceToStreamsBuilder = ImmutableMap.builder(); Map> columnToKeySet = config.getStreamOrdering(); // Adding a set to track which of the columns in the reorder list are already visited // For complex maps (complex values for the value) @@ -157,7 +144,7 @@ private StreamMetadata getStreamMetadata( // add the stream only if it is present in the stream ordering config if (keysPerColumn.contains(key)) { keyToSequenceBuilder.put(new ColumnKeyInfo(column, key), sequence); - sequenceToStreamsBuilder.put(new ColumnSequenceInfo(column, sequence), new ArrayList<>()); + sequenceToStreamsBuilder.put(new ColumnSequenceKey(column, sequence), new ArrayList<>()); } } columnsVisited.add(column); @@ -174,14 +161,14 @@ public void reorder( { List nonReorderStreams = new ArrayList<>(); StreamMetadata metadata = getStreamMetadata(nodeIdToColumn, nodeIdToColumnEncodings, config); - Map> sequenceToStreams = metadata.sequenceToStreams; + Map> sequenceToStreams = metadata.sequenceToStreams; for (StreamDataOutput dataOutput : dataStreams) { int nodeId = dataOutput.getStream().getColumn(); int sequence = dataOutput.getStream().getSequence(); int column = nodeIdToColumn.get(nodeId); // only if sequence ID > 0, we do a look up in sequenceToStreams if (sequence > 0) { - List streams = sequenceToStreams.get(new ColumnSequenceInfo(column, sequence)); + List streams = sequenceToStreams.get(new ColumnSequenceKey(column, sequence)); if (streams == null) { nonReorderStreams.add(dataOutput); } @@ -203,21 +190,24 @@ public void reorder( ColumnKeyInfo columnKeyInfo = new ColumnKeyInfo(column, key); Integer sequence = keyToSequence.get(columnKeyInfo); if (sequence != null) { - ColumnSequenceInfo columnSequenceInfo = new ColumnSequenceInfo(column, sequence); + ColumnSequenceKey columnSequenceInfo = new ColumnSequenceKey(column, sequence); List groupedDataStreams = sequenceToStreams.get(columnSequenceInfo); checkState(groupedDataStreams != null, "list of streams for a sequence cannot be null"); checkState(groupedDataStreams.size() > 0, "There should be at least one stream for a sequence"); + + // order grouped streams + groupedDataStreams.sort(IN_GROUP_COMPARATOR); orderedStreams.addAll(groupedDataStreams); } } } // do actual reordering - nonStreamOrderingLayout.reorder(nonReorderStreams, ImmutableMap.of(), ImmutableMap.of()); + nonStreamOrderingLayout.reorder(nonReorderStreams, nodeIdToColumn, nodeIdToColumnEncodings); // add all the streams checkState(orderedStreams.size() + nonReorderStreams.size() == dataStreams.size(), - "Number ordered + non ordered streams should be equal to total number of data streams " + + "Number of ordered + non ordered streams should be equal to total number of data streams " + "orderedStreams: %s, nonReorderStreams: %s, dataStreams: %s", orderedStreams.size(), nonReorderStreams.size(), @@ -226,4 +216,13 @@ public void reorder( dataStreams.addAll(orderedStreams); dataStreams.addAll(nonReorderStreams); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("config", config) + .add("nonStreamOrderingLayout", nonStreamOrderingLayout) + .toString(); + } } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriter.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriter.java index 1faea658e12f1..366c0eb96b211 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriter.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriter.java @@ -35,11 +35,15 @@ import java.io.FileOutputStream; import java.io.IOException; +import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import static com.facebook.airlift.testing.Assertions.assertGreaterThanOrEqual; import static com.facebook.presto.common.type.VarcharType.VARCHAR; @@ -55,6 +59,7 @@ import static com.facebook.presto.orc.metadata.CompressionKind.ZLIB; import static com.facebook.presto.orc.metadata.CompressionKind.ZSTD; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; public class TestOrcWriter @@ -94,29 +99,18 @@ public void accept(Stream stream) public void testOutputStreamsByColumnSize(OrcEncoding encoding, CompressionKind kind, OptionalInt level) throws IOException { - testStreamOrder(encoding, kind, level, new ColumnSizeLayoutFactory(), () -> new Consumer() - { - int previousColumnSize; - int currentColumnSize; - int currentColumnId = -1; - - @Override - public void accept(Stream stream) - { - if (!isIndexStream(stream)) { - if (stream.getColumn() == currentColumnId) { - currentColumnSize += stream.getLength(); - } - else { - assertGreaterThanOrEqual(currentColumnSize, previousColumnSize, stream.toString()); - previousColumnSize = currentColumnSize; - - currentColumnSize = stream.getLength(); - currentColumnId = stream.getColumn(); - } - } + Map nodeSizes = new LinkedHashMap<>(); + testStreamOrder(encoding, kind, level, new ColumnSizeLayoutFactory(), () -> stream -> { + if (!isIndexStream(stream)) { + int node = stream.getColumn(); + int oldSize = nodeSizes.computeIfAbsent(node, (c) -> 0); + nodeSizes.put(node, oldSize + stream.getLength()); } }); + + List actual = ImmutableList.copyOf(nodeSizes.values()); + List expected = actual.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList()); + assertEquals(actual, expected); } private void testStreamOrder(OrcEncoding encoding, CompressionKind kind, OptionalInt level, StreamLayoutFactory streamLayoutFactory, Supplier> streamConsumerFactory) diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java index 8edfebeb8ddcf..d148898233ed4 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java @@ -28,7 +28,7 @@ import com.facebook.presto.orc.metadata.StripeFooter; import com.facebook.presto.orc.proto.DwrfProto; import com.facebook.presto.orc.stream.StreamDataOutput; -import com.facebook.presto.orc.writer.StreamLayout.ByColumnSize; +import com.facebook.presto.orc.writer.ColumnSizeLayout; import com.facebook.presto.orc.writer.StreamLayout.ByStreamSize; import com.facebook.presto.orc.writer.StreamLayoutFactory; import com.facebook.presto.orc.writer.StreamOrderingLayout; @@ -43,6 +43,8 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.time.LocalDate; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -51,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.SortedMap; import static com.facebook.presto.common.type.IntegerType.INTEGER; @@ -62,8 +65,13 @@ import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT; import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DWRF_MAP_FLAT; import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; +import static com.facebook.presto.orc.metadata.Stream.StreamKind.DATA; +import static com.facebook.presto.orc.metadata.Stream.StreamKind.IN_MAP; +import static com.facebook.presto.orc.metadata.Stream.StreamKind.LENGTH; +import static com.facebook.presto.orc.metadata.Stream.StreamKind.PRESENT; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.Collections.shuffle; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -88,12 +96,10 @@ private static void verifyStream(Stream stream, int nodeId, StreamKind streamKin assertEquals(stream.getStreamKind(), streamKind); } - private static void verifyStream(Stream stream, int nodeId, int seqId, StreamKind streamKind, int length) + private static void verifyStream(Stream actual, int nodeId, int seqId, StreamKind streamKind, int length) { - assertEquals(stream.getColumn(), nodeId); - assertEquals(stream.getSequence(), seqId); - assertEquals(stream.getLength(), length); - assertEquals(stream.getStreamKind(), streamKind); + Stream expected = new Stream(nodeId, seqId, streamKind, length, true); + assertEquals(actual, expected); } @Test @@ -102,62 +108,231 @@ public void testByStreamSize() List streams = new ArrayList<>(); int length = 10_000; for (int i = 0; i < 10; i++) { - streams.add(createStream(i, StreamKind.PRESENT, length - i)); - streams.add(createStream(i, StreamKind.DATA, length - 100 - i)); + streams.add(createStream(i, PRESENT, length - i)); + streams.add(createStream(i, DATA, length - 100 - i)); } - Collections.shuffle(streams); + shuffle(streams); new ByStreamSize().reorder(streams); assertEquals(streams.size(), 20); Iterator iterator = streams.iterator(); for (int i = 9; i >= 0; i--) { - verifyStream(iterator.next().getStream(), i, StreamKind.DATA, length - 100 - i); + verifyStream(iterator.next().getStream(), i, DATA, length - 100 - i); } for (int i = 9; i >= 0; i--) { - verifyStream(iterator.next().getStream(), i, StreamKind.PRESENT, length - i); + verifyStream(iterator.next().getStream(), i, PRESENT, length - i); } assertFalse(iterator.hasNext()); } - @Test - public void testByColumnSize() + @DataProvider + public static Object[][] testByColumnSizeDataProvider() { - // Assume the file has 3 streams - // 1st Column( 1010), Data(1000), Present(10) - // 2nd column (1010), Dictionary (300), Present (10), Data(600), Length(100) - // 3rd Column > 2GB - - List streams = new ArrayList<>(); - streams.add(createStream(1, StreamKind.DATA, 1_000)); - streams.add(createStream(1, StreamKind.PRESENT, 10)); - - streams.add(createStream(2, StreamKind.DICTIONARY_DATA, 300)); - streams.add(createStream(2, StreamKind.PRESENT, 10)); - streams.add(createStream(2, StreamKind.DATA, 600)); - streams.add(createStream(2, StreamKind.LENGTH, 100)); + // Assume the following schema: + // Node 1, Column 0, Type: MAP (map1) + // Node 2, Column 0, Type: INT + // Node 3, Column 0, Type: LIST + // Node 4, Column 0, Type: INT + + // Node 5, Column 1, Type: MAP (FLAT flatMap1) + // Node 6, Column 1, Type: INT (absent in flat maps) + // Node 7, Column 1, Type: LIST + // Node 8, Column 1, Type: INT + + // Node 9, Column 2, Type: LIST (list1) + // Node 10, Column 2, Type: INT + + // Node 11, Column 3, Name: m2, Type: MAP (FLAT flatMap2) + // Node 12, Column 3, Name: key, Type: INT (absent in flat maps) + // Node 13, Column 3, Name: item, Type: INT + + // Node 14, Column 4, Type: INT (regular1) + // Node 15, Column 5, Type: INT (regular2) + // Node 16, Column 6, Type: INT (regular3) + final int map1 = 1; + final int map1Key = 2; + final int map1Val = 3; + final int map1ValElem = 4; + + final int flatMap1 = 5; + final int flatMap1Val = 7; + final int flatMap1ValElem = 8; + + final int list1 = 9; + final int list1Elem = 10; + + final int flatMap2 = 11; + final int flatMap2Val = 13; + + final int regular1 = 14; + final int regular2 = 15; + final int regular3 = 16; + + // supply streams in the expected order, test will perform several reorder + // iterations with shuffling + return new Object[][] { + { + "split non-flatmap and flatmap columns into separate groups", + new StreamDataOutput[] { + createStream(map1, PRESENT, 0), + createStream(map1Key, PRESENT, 0), + createStream(list1, PRESENT, 0), + createStream(regular1, PRESENT, 0), + createStream(regular2, PRESENT, 0), + + createStream(flatMap1, PRESENT, 0), + createStream(flatMap1Val, PRESENT, 0), + createStream(flatMap1ValElem, 1, PRESENT, 0), + createStream(flatMap1ValElem, 2, PRESENT, 0), + createStream(flatMap1ValElem, 3, PRESENT, 0), + createStream(flatMap2, PRESENT, 0), + createStream(flatMap2Val, 1, PRESENT, 0), + createStream(flatMap2Val, 2, PRESENT, 0) + } + }, + { + "order columns by total column size in desc order", + new StreamDataOutput[] { + createStream(regular1, PRESENT, 5_000_000), + createStream(regular2, PRESENT, 4_000_000), + + createStream(list1, PRESENT, 3_000_000), + createStream(list1Elem, PRESENT, 200), + + createStream(map1, PRESENT, 10), + createStream(map1Key, PRESENT, 3_000_000), + + createStream(flatMap2, PRESENT, 1), + createStream(flatMap2Val, 1, PRESENT, 5), + createStream(flatMap2Val, 1, DATA, 5), + createStream(flatMap2Val, 2, DATA, 5), + createStream(flatMap2Val, 3, PRESENT, 5), + + createStream(flatMap1, PRESENT, 1), + createStream(flatMap1Val, PRESENT, 1), + createStream(flatMap1ValElem, 1, PRESENT, 1), + createStream(flatMap1ValElem, 1, DATA, 1), + createStream(flatMap1ValElem, 1, LENGTH, 1), + } + }, + + { + "group by sequence", + new StreamDataOutput[] { + createStream(flatMap1, PRESENT, 1), + createStream(flatMap1Val, 1, PRESENT, 1), + createStream(flatMap1ValElem, 1, PRESENT, 0), + createStream(flatMap1ValElem, 1, DATA, 0), + createStream(flatMap1ValElem, 1, LENGTH, 0), + createStream(flatMap1Val, 2, PRESENT, 1), + createStream(flatMap1ValElem, 2, PRESENT, 0), + createStream(flatMap1ValElem, 2, DATA, 0), + createStream(flatMap1ValElem, 2, LENGTH, 0), + + createStream(flatMap2, PRESENT, 0), + createStream(flatMap2Val, 1, PRESENT, 0), + createStream(flatMap2Val, 1, DATA, 0), + createStream(flatMap2Val, 1, LENGTH, 0), + createStream(flatMap2Val, 2, PRESENT, 0), + createStream(flatMap2Val, 2, DATA, 0), + createStream(flatMap2Val, 2, LENGTH, 0), + } + }, + { + "order sequence streams by column+sequence size in desc order", + new StreamDataOutput[] { + createStream(flatMap1, PRESENT, 1000), + // seq 2 + createStream(flatMap1Val, 2, PRESENT, 20), + createStream(flatMap1ValElem, 2, PRESENT, 20), + createStream(flatMap1ValElem, 2, DATA, 20), + createStream(flatMap1ValElem, 2, LENGTH, 20), + // seq 1 + createStream(flatMap1Val, 1, PRESENT, 10), + createStream(flatMap1ValElem, 1, PRESENT, 10), + createStream(flatMap1ValElem, 1, DATA, 10), + createStream(flatMap1ValElem, 1, LENGTH, 10), +// + createStream(flatMap2, PRESENT, 10), + // seq 1 + createStream(flatMap2Val, 1, PRESENT, 30), + createStream(flatMap2Val, 1, DATA, 30), + createStream(flatMap2Val, 1, LENGTH, 30), + // seq 2 + createStream(flatMap2Val, 2, PRESENT, 10), + createStream(flatMap2Val, 2, DATA, 10), + createStream(flatMap2Val, 2, LENGTH, 10), + } + }, + { + "order by the node in asc order", + new StreamDataOutput[] { + createStream(list1, PRESENT, 5), + createStream(list1Elem, PRESENT, 5), + createStream(regular1, PRESENT, 10), + createStream(regular2, DATA, 10), + + createStream(flatMap1, PRESENT, 5), + createStream(flatMap1Val, 1, DATA, 5), + createStream(flatMap2, PRESENT, 5), + createStream(flatMap2Val, 1, DATA, 5), + } + }, + { + "order by stream kind", + new StreamDataOutput[] { + createStream(list1, PRESENT, 0), + createStream(list1Elem, PRESENT, 0), + createStream(list1Elem, DATA, 0), + createStream(list1Elem, LENGTH, 0), + } + }, + }; + } - streams.add(createStream(3, StreamKind.DATA, Integer.MAX_VALUE)); - streams.add(createStream(3, StreamKind.PRESENT, Integer.MAX_VALUE)); + @Test(dataProvider = "testByColumnSizeDataProvider") + public void testByColumnSize(String testName, StreamDataOutput[] streams) + { + List expectedStreams = ImmutableList.copyOf(streams); + List testStreams = new ArrayList<>(ImmutableList.copyOf(streams)); - Collections.shuffle(streams); - new ByColumnSize().reorder(streams); + Map nodeToColumn = ImmutableMap.builder() + .put(1, 0) + .put(2, 0) + .put(3, 0) + .put(4, 0) + .put(5, 1) + .put(6, 1) + .put(7, 1) + .put(8, 1) + .put(9, 2) + .put(10, 2) + .put(11, 3) + .put(12, 3) + .put(13, 3) + .put(14, 4) + .put(15, 5) + .put(16, 6) + .build(); - Iterator iterator = streams.iterator(); - verifyStream(iterator.next().getStream(), 1, StreamKind.PRESENT, 10); - verifyStream(iterator.next().getStream(), 1, StreamKind.DATA, 1000); + Map nodeIdToColumnEncodings = ImmutableMap.builder() + .put(5, new ColumnEncoding(DWRF_MAP_FLAT, 0)) + .put(11, new ColumnEncoding(DWRF_MAP_FLAT, 0)) + .build(); - verifyStream(iterator.next().getStream(), 2, StreamKind.PRESENT, 10); - verifyStream(iterator.next().getStream(), 2, StreamKind.LENGTH, 100); - verifyStream(iterator.next().getStream(), 2, StreamKind.DICTIONARY_DATA, 300); - verifyStream(iterator.next().getStream(), 2, StreamKind.DATA, 600); + ColumnSizeLayout layout = new ColumnSizeLayout(); - verifyStream(iterator.next().getStream(), 3, StreamKind.PRESENT, Integer.MAX_VALUE); - verifyStream(iterator.next().getStream(), 3, StreamKind.DATA, Integer.MAX_VALUE); + int seed = LocalDate.now(ZoneId.of("America/Los_Angeles")).getDayOfYear(); + Random rnd = new Random(seed); - assertFalse(iterator.hasNext()); + for (int i = 0; i < 25; i++) { + shuffle(testStreams, rnd); + layout.reorder(testStreams, nodeToColumn, nodeIdToColumnEncodings); + assertEquals(testStreams, expectedStreams); + } } @DataProvider(name = "testParams") @@ -179,17 +354,17 @@ public void testByStreamSizeStreamOrdering(boolean isEmptyMap) verifyFlatMapColumns(iterator); } // non flat map columns - verifyStream(iterator.next().getStream(), 5, 0, StreamKind.DATA, 1); - verifyStream(iterator.next().getStream(), 2, 0, StreamKind.LENGTH, 2); - verifyStream(iterator.next().getStream(), 1, 0, StreamKind.DATA, 3); - verifyStream(iterator.next().getStream(), 4, 0, StreamKind.LENGTH, 5); - verifyStream(iterator.next().getStream(), 3, 0, StreamKind.DATA, 8); - verifyStream(iterator.next().getStream(), 1, 0, StreamKind.PRESENT, 12); + verifyStream(iterator.next().getStream(), 5, 0, DATA, 1); + verifyStream(iterator.next().getStream(), 2, 0, LENGTH, 2); + verifyStream(iterator.next().getStream(), 1, 0, DATA, 3); + verifyStream(iterator.next().getStream(), 4, 0, LENGTH, 5); + verifyStream(iterator.next().getStream(), 3, 0, DATA, 8); + verifyStream(iterator.next().getStream(), 1, 0, PRESENT, 12); if (!isEmptyMap) { // flat map stream not reordered - verifyStream(iterator.next().getStream(), 11, 5, StreamKind.IN_MAP, 13); - verifyStream(iterator.next().getStream(), 11, 5, StreamKind.LENGTH, 14); - verifyStream(iterator.next().getStream(), 12, 5, StreamKind.DATA, 15); + verifyStream(iterator.next().getStream(), 11, 5, IN_MAP, 13); + verifyStream(iterator.next().getStream(), 11, 5, LENGTH, 14); + verifyStream(iterator.next().getStream(), 12, 5, DATA, 15); } assertFalse(iterator.hasNext()); } @@ -198,26 +373,31 @@ public void testByStreamSizeStreamOrdering(boolean isEmptyMap) public void testByColumnSizeStreamOrdering(boolean isEmptyMap) { List streams = createStreams(isEmptyMap); - ByColumnSize streamLayout = new ByColumnSize(); - StreamOrderingLayout streamOrderingLayout = new StreamOrderingLayout(createStreamReorderingInput(), streamLayout); + ColumnSizeLayout layout = new ColumnSizeLayout(); + StreamOrderingLayout streamOrderingLayout = new StreamOrderingLayout(createStreamReorderingInput(), layout); streamOrderingLayout.reorder(streams, createNodeIdToColumnId(), createColumnEncodings(isEmptyMap)); Iterator iterator = streams.iterator(); if (!isEmptyMap) { verifyFlatMapColumns(iterator); } - // non flat map columns - verifyStream(iterator.next().getStream(), 5, 0, StreamKind.DATA, 1); - verifyStream(iterator.next().getStream(), 2, 0, StreamKind.LENGTH, 2); - verifyStream(iterator.next().getStream(), 4, 0, StreamKind.LENGTH, 5); - verifyStream(iterator.next().getStream(), 3, 0, StreamKind.DATA, 8); - verifyStream(iterator.next().getStream(), 1, 0, StreamKind.DATA, 3); - verifyStream(iterator.next().getStream(), 1, 0, StreamKind.PRESENT, 12); + + // regular columns + // column 1 with total size 16, ordered by nodes + verifyStream(iterator.next().getStream(), 2, 0, LENGTH, 2); + verifyStream(iterator.next().getStream(), 3, 0, DATA, 8); + verifyStream(iterator.next().getStream(), 4, 0, LENGTH, 5); + verifyStream(iterator.next().getStream(), 5, 0, DATA, 1); + + // column 0 with total size 15, ordered by stream kind + verifyStream(iterator.next().getStream(), 1, 0, PRESENT, 12); + verifyStream(iterator.next().getStream(), 1, 0, DATA, 3); + if (!isEmptyMap) { - // flat map stream not reordered - verifyStream(iterator.next().getStream(), 12, 5, StreamKind.DATA, 15); - verifyStream(iterator.next().getStream(), 11, 5, StreamKind.IN_MAP, 13); - verifyStream(iterator.next().getStream(), 11, 5, StreamKind.LENGTH, 14); + // flat map stream are also ordered by node and kind + verifyStream(iterator.next().getStream(), 11, 5, LENGTH, 14); + verifyStream(iterator.next().getStream(), 11, 5, IN_MAP, 13); + verifyStream(iterator.next().getStream(), 12, 5, DATA, 15); } assertFalse(iterator.hasNext()); } @@ -315,80 +495,82 @@ private static DwrfStreamOrderingConfig createStreamReorderingInput() private static void verifyFlatMapColumns(Iterator iterator) { + // flat map stream are ordered by node and kind + // Kind order: DATA:1, LENGTH:2, IN_MAP:12 // column 2 - verifyStream(iterator.next().getStream(), 8, 3, StreamKind.IN_MAP, 6); - verifyStream(iterator.next().getStream(), 8, 3, StreamKind.DATA, 7); - verifyStream(iterator.next().getStream(), 8, 2, StreamKind.IN_MAP, 4); - verifyStream(iterator.next().getStream(), 8, 2, StreamKind.DATA, 5); - verifyStream(iterator.next().getStream(), 8, 1, StreamKind.IN_MAP, 2); - verifyStream(iterator.next().getStream(), 8, 1, StreamKind.DATA, 3); + verifyStream(iterator.next().getStream(), 8, 3, DATA, 7); + verifyStream(iterator.next().getStream(), 8, 3, IN_MAP, 6); + verifyStream(iterator.next().getStream(), 8, 2, DATA, 5); + verifyStream(iterator.next().getStream(), 8, 2, IN_MAP, 4); + verifyStream(iterator.next().getStream(), 8, 1, DATA, 3); + verifyStream(iterator.next().getStream(), 8, 1, IN_MAP, 2); // column 3 - verifyStream(iterator.next().getStream(), 11, 1, StreamKind.IN_MAP, 1); - verifyStream(iterator.next().getStream(), 11, 1, StreamKind.LENGTH, 2); - verifyStream(iterator.next().getStream(), 12, 1, StreamKind.DATA, 3); + verifyStream(iterator.next().getStream(), 11, 1, LENGTH, 2); + verifyStream(iterator.next().getStream(), 11, 1, IN_MAP, 1); + verifyStream(iterator.next().getStream(), 12, 1, DATA, 3); - verifyStream(iterator.next().getStream(), 11, 2, StreamKind.IN_MAP, 4); - verifyStream(iterator.next().getStream(), 11, 2, StreamKind.LENGTH, 5); - verifyStream(iterator.next().getStream(), 12, 2, StreamKind.DATA, 6); + verifyStream(iterator.next().getStream(), 11, 2, LENGTH, 5); + verifyStream(iterator.next().getStream(), 11, 2, IN_MAP, 4); + verifyStream(iterator.next().getStream(), 12, 2, DATA, 6); - verifyStream(iterator.next().getStream(), 11, 4, StreamKind.IN_MAP, 10); - verifyStream(iterator.next().getStream(), 11, 4, StreamKind.LENGTH, 11); - verifyStream(iterator.next().getStream(), 12, 4, StreamKind.DATA, 12); + verifyStream(iterator.next().getStream(), 11, 4, LENGTH, 11); + verifyStream(iterator.next().getStream(), 11, 4, IN_MAP, 10); + verifyStream(iterator.next().getStream(), 12, 4, DATA, 12); - verifyStream(iterator.next().getStream(), 11, 3, StreamKind.IN_MAP, 7); - verifyStream(iterator.next().getStream(), 11, 3, StreamKind.LENGTH, 8); - verifyStream(iterator.next().getStream(), 12, 3, StreamKind.DATA, 9); + verifyStream(iterator.next().getStream(), 11, 3, LENGTH, 8); + verifyStream(iterator.next().getStream(), 11, 3, IN_MAP, 7); + verifyStream(iterator.next().getStream(), 12, 3, DATA, 9); } private static List createStreams(boolean isEmptyMap) { - // Assume the file has the following schema - // column 0: INT (Node 0) - // column 1: MAP> // non flat map - // column 2: MAP // flat map - // column 3: MAP // flat map + // Assume the file has the following schema: + // column 0: 1INT + // column 1: 2MAP<3INT, 4LIST<5INT>> // non flat map + // column 2: 6MAP<7INT, 8FLOAT> // flat map + // column 3: 9MAP<10INT, 11LIST<12INT> // flat map List streams = new ArrayList<>(); // column 0 - streams.add(createStream(1, StreamKind.DATA, 3)); - streams.add(createStream(1, StreamKind.PRESENT, 12)); + streams.add(createStream(1, DATA, 3)); + streams.add(createStream(1, PRESENT, 12)); // column 1 MAP> <2, <3, 4<5>>>> - streams.add(createStream(2, StreamKind.LENGTH, 2)); // MAP - streams.add(createStream(3, StreamKind.DATA, 8)); // INT - streams.add(createStream(4, StreamKind.LENGTH, 5)); // LIST - streams.add(createStream(5, StreamKind.DATA, 1)); // INT + streams.add(createStream(2, LENGTH, 2)); // MAP + streams.add(createStream(3, DATA, 8)); // INT + streams.add(createStream(4, LENGTH, 5)); // LIST + streams.add(createStream(5, DATA, 1)); // INT if (!isEmptyMap) { // column 2 MAP <6 <7, 8>> - streams.add(createStream(8, 1, StreamKind.IN_MAP, 2)); - streams.add(createStream(8, 1, StreamKind.DATA, 3)); - streams.add(createStream(8, 2, StreamKind.IN_MAP, 4)); - streams.add(createStream(8, 2, StreamKind.DATA, 5)); - streams.add(createStream(8, 3, StreamKind.IN_MAP, 6)); - streams.add(createStream(8, 3, StreamKind.DATA, 7)); + streams.add(createStream(8, 1, IN_MAP, 2)); + streams.add(createStream(8, 1, DATA, 3)); + streams.add(createStream(8, 2, IN_MAP, 4)); + streams.add(createStream(8, 2, DATA, 5)); + streams.add(createStream(8, 3, IN_MAP, 6)); + streams.add(createStream(8, 3, DATA, 7)); // column 3 MAP <9 <10, 11<12>>> - streams.add(createStream(11, 1, StreamKind.IN_MAP, 1)); - streams.add(createStream(11, 1, StreamKind.LENGTH, 2)); - streams.add(createStream(12, 1, StreamKind.DATA, 3)); + streams.add(createStream(11, 1, IN_MAP, 1)); + streams.add(createStream(11, 1, LENGTH, 2)); + streams.add(createStream(12, 1, DATA, 3)); - streams.add(createStream(11, 2, StreamKind.IN_MAP, 4)); - streams.add(createStream(11, 2, StreamKind.LENGTH, 5)); - streams.add(createStream(12, 2, StreamKind.DATA, 6)); + streams.add(createStream(11, 2, IN_MAP, 4)); + streams.add(createStream(11, 2, LENGTH, 5)); + streams.add(createStream(12, 2, DATA, 6)); - streams.add(createStream(11, 3, StreamKind.IN_MAP, 7)); - streams.add(createStream(11, 3, StreamKind.LENGTH, 8)); - streams.add(createStream(12, 3, StreamKind.DATA, 9)); + streams.add(createStream(11, 3, IN_MAP, 7)); + streams.add(createStream(11, 3, LENGTH, 8)); + streams.add(createStream(12, 3, DATA, 9)); - streams.add(createStream(11, 4, StreamKind.IN_MAP, 10)); - streams.add(createStream(11, 4, StreamKind.LENGTH, 11)); - streams.add(createStream(12, 4, StreamKind.DATA, 12)); + streams.add(createStream(11, 4, IN_MAP, 10)); + streams.add(createStream(11, 4, LENGTH, 11)); + streams.add(createStream(12, 4, DATA, 12)); - streams.add(createStream(11, 5, StreamKind.IN_MAP, 13)); - streams.add(createStream(11, 5, StreamKind.LENGTH, 14)); - streams.add(createStream(12, 5, StreamKind.DATA, 15)); + streams.add(createStream(11, 5, IN_MAP, 13)); + streams.add(createStream(11, 5, LENGTH, 14)); + streams.add(createStream(12, 5, DATA, 15)); } return streams; }