Skip to content

Commit

Permalink
Adds option to read null map keys from orc file
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Borovsky authored and mbasmanova committed Aug 26, 2020
1 parent 1f0269d commit 1a8987f
Show file tree
Hide file tree
Showing 22 changed files with 428 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import org.joda.time.DateTimeZone;
import org.openjdk.jol.info.ClassLayout;

Expand Down Expand Up @@ -58,11 +57,9 @@ public OrcBatchRecordReader(
Map<Integer, Slice> intermediateKeyMetadata,
int rowsInRowGroup,
DateTimeZone hiveStorageTimeZone,
OrcRecordReaderOptions options,
HiveWriterVersion hiveWriterVersion,
MetadataReader metadataReader,
DataSize maxMergeDistance,
DataSize tinyStripeThreshold,
DataSize maxBlockSize,
Map<String, Slice> userMetadata,
OrcAggregatedMemoryContext systemMemoryUsage,
Optional<OrcWriteValidation> writeValidation,
Expand All @@ -79,7 +76,7 @@ public OrcBatchRecordReader(
// doesn't have a local buffer. All non-leaf level StreamReaders' (e.g. MapStreamReader, LongStreamReader,
// ListStreamReader and StructStreamReader) instance sizes were not counted, because calling setBytes() in
// their constructors is confusing.
createStreamReaders(orcDataSource, types, hiveStorageTimeZone, includedColumns, systemMemoryUsage.newOrcAggregatedMemoryContext()),
createStreamReaders(orcDataSource, types, hiveStorageTimeZone, options, includedColumns, systemMemoryUsage.newOrcAggregatedMemoryContext()),
predicate,
numberOfRows,
fileStripes,
Expand All @@ -97,9 +94,9 @@ public OrcBatchRecordReader(
hiveStorageTimeZone,
hiveWriterVersion,
metadataReader,
maxMergeDistance,
tinyStripeThreshold,
maxBlockSize,
options.getMaxMergeDistance(),
options.getTinyStripeThreshold(),
options.getMaxBlockSize(),
userMetadata,
systemMemoryUsage,
writeValidation,
Expand Down Expand Up @@ -162,6 +159,7 @@ private static BatchStreamReader[] createStreamReaders(
OrcDataSource orcDataSource,
List<OrcType> types,
DateTimeZone hiveStorageTimeZone,
OrcRecordReaderOptions options,
Map<Integer, Type> includedColumns,
OrcAggregatedMemoryContext systemMemoryContext)
throws OrcCorruptionException
Expand All @@ -175,7 +173,7 @@ private static BatchStreamReader[] createStreamReaders(
Type type = includedColumns.get(columnId);
if (type != null) {
StreamDescriptor streamDescriptor = streamDescriptors.get(columnId);
streamReaders[columnId] = BatchStreamReaders.createStreamReader(type, streamDescriptor, hiveStorageTimeZone, systemMemoryContext);
streamReaders[columnId] = BatchStreamReaders.createStreamReader(type, streamDescriptor, hiveStorageTimeZone, options, systemMemoryContext);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,9 @@ public OrcBatchRecordReader createBatchRecordReader(
columnsToIntermediateKeys,
footer.getRowsInRowGroup(),
requireNonNull(hiveStorageTimeZone, "hiveStorageTimeZone is null"),
new OrcRecordReaderOptions(orcReaderOptions),
hiveWriterVersion,
metadataReader,
orcReaderOptions.getMaxMergeDistance(),
orcReaderOptions.getTinyStripeThreshold(),
orcReaderOptions.getMaxBlockSize(),
footer.getUserMetadata(),
systemMemoryUsage.newOrcAggregatedMemoryContext(),
writeValidation,
Expand Down Expand Up @@ -312,12 +310,10 @@ public OrcSelectiveRecordReader createSelectiveRecordReader(
columnsToIntermediateKeys,
footer.getRowsInRowGroup(),
hiveStorageTimeZone,
new OrcRecordReaderOptions(orcReaderOptions),
legacyMapSubscript,
hiveWriterVersion,
metadataReader,
orcReaderOptions.getMaxMergeDistance(),
orcReaderOptions.getTinyStripeThreshold(),
orcReaderOptions.getMaxBlockSize(),
footer.getUserMetadata(),
systemMemoryUsage.newOrcAggregatedMemoryContext(),
writeValidation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ public class OrcReaderOptions
private final DataSize tinyStripeThreshold;
private final DataSize maxBlockSize;
private final boolean zstdJniDecompressionEnabled;
private final boolean mapNullKeysEnabled;

public OrcReaderOptions(DataSize maxMergeDistance, DataSize tinyStripeThreshold, DataSize maxBlockSize, boolean zstdJniDecompressionEnabled)
{
this(maxMergeDistance, tinyStripeThreshold, maxBlockSize, zstdJniDecompressionEnabled, false);
}

public OrcReaderOptions(DataSize maxMergeDistance, DataSize tinyStripeThreshold, DataSize maxBlockSize, boolean zstdJniDecompressionEnabled, boolean mapNullKeysEnabled)
{
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
this.maxBlockSize = requireNonNull(maxBlockSize, "maxBlockSize is null");
this.tinyStripeThreshold = requireNonNull(tinyStripeThreshold, "tinyStripeThreshold is null");
this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled;
this.mapNullKeysEnabled = mapNullKeysEnabled;
}

public DataSize getMaxMergeDistance()
Expand All @@ -51,4 +58,9 @@ public DataSize getTinyStripeThreshold()
{
return tinyStripeThreshold;
}

public boolean mapNullKeysEnabled()
{
return mapNullKeysEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;

import io.airlift.units.DataSize;

import static java.util.Objects.requireNonNull;

public class OrcRecordReaderOptions
{
private final DataSize maxMergeDistance;
private final DataSize tinyStripeThreshold;
private final DataSize maxBlockSize;
private final boolean mapNullKeysEnabled;

public OrcRecordReaderOptions(OrcReaderOptions options)
{
this(options.getMaxMergeDistance(), options.getTinyStripeThreshold(), options.getMaxBlockSize(), options.mapNullKeysEnabled());
}

public OrcRecordReaderOptions(DataSize maxMergeDistance, DataSize tinyStripeThreshold, DataSize maxBlockSize, boolean mapNullKeysEnabled)
{
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
this.maxBlockSize = requireNonNull(maxBlockSize, "maxBlockSize is null");
this.tinyStripeThreshold = requireNonNull(tinyStripeThreshold, "tinyStripeThreshold is null");
this.mapNullKeysEnabled = mapNullKeysEnabled;
}

public DataSize getMaxMergeDistance()
{
return maxMergeDistance;
}

public DataSize getMaxBlockSize()
{
return maxBlockSize;
}

public DataSize getTinyStripeThreshold()
{
return tinyStripeThreshold;
}

public boolean mapNullKeysEnabled()
{
return mapNullKeysEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import org.joda.time.DateTimeZone;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -164,12 +163,10 @@ public OrcSelectiveRecordReader(
Map<Integer, Slice> intermediateKeyMetadata,
int rowsInRowGroup,
DateTimeZone hiveStorageTimeZone,
OrcRecordReaderOptions options,
boolean legacyMapSubscript,
PostScript.HiveWriterVersion hiveWriterVersion,
MetadataReader metadataReader,
DataSize maxMergeDistance,
DataSize tinyStripeThreshold,
DataSize maxBlockSize,
Map<String, Slice> userMetadata,
OrcAggregatedMemoryContext systemMemoryUsage,
Optional<OrcWriteValidation> writeValidation,
Expand All @@ -183,6 +180,7 @@ public OrcSelectiveRecordReader(
orcDataSource,
types,
hiveStorageTimeZone,
options,
legacyMapSubscript,
includedColumns,
outputColumns,
Expand All @@ -208,9 +206,9 @@ public OrcSelectiveRecordReader(
hiveStorageTimeZone,
hiveWriterVersion,
metadataReader,
maxMergeDistance,
tinyStripeThreshold,
maxBlockSize,
options.getMaxMergeDistance(),
options.getTinyStripeThreshold(),
options.getMaxBlockSize(),
userMetadata,
systemMemoryUsage,
writeValidation,
Expand Down Expand Up @@ -564,6 +562,7 @@ private static SelectiveStreamReader[] createStreamReaders(
OrcDataSource orcDataSource,
List<OrcType> types,
DateTimeZone hiveStorageTimeZone,
OrcRecordReaderOptions options,
boolean legacyMapSubscript,
Map<Integer, Type> includedColumns,
List<Integer> outputColumns,
Expand Down Expand Up @@ -596,6 +595,7 @@ private static SelectiveStreamReader[] createStreamReaders(
outputRequired ? Optional.of(includedColumns.get(columnId)) : Optional.empty(),
Optional.ofNullable(requiredSubfields.get(columnId)).orElse(ImmutableList.of()),
hiveStorageTimeZone,
options,
legacyMapSubscript,
systemMemoryContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcAggregatedMemoryContext;
import com.facebook.presto.orc.OrcCorruptionException;
import com.facebook.presto.orc.OrcRecordReaderOptions;
import com.facebook.presto.orc.StreamDescriptor;
import org.joda.time.DateTimeZone;

Expand All @@ -25,7 +26,7 @@ private BatchStreamReaders()
{
}

public static BatchStreamReader createStreamReader(Type type, StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, OrcAggregatedMemoryContext systemMemoryContext)
public static BatchStreamReader createStreamReader(Type type, StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, OrcRecordReaderOptions options, OrcAggregatedMemoryContext systemMemoryContext)
throws OrcCorruptionException
{
switch (streamDescriptor.getOrcTypeKind()) {
Expand All @@ -50,11 +51,11 @@ public static BatchStreamReader createStreamReader(Type type, StreamDescriptor s
case TIMESTAMP:
return new TimestampBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone);
case LIST:
return new ListBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, systemMemoryContext);
return new ListBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, options, systemMemoryContext);
case STRUCT:
return new StructBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, systemMemoryContext);
return new StructBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, options, systemMemoryContext);
case MAP:
return new MapBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, systemMemoryContext);
return new MapBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, options, systemMemoryContext);
case DECIMAL:
return new DecimalBatchStreamReader(type, streamDescriptor);
case UNION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcAggregatedMemoryContext;
import com.facebook.presto.orc.OrcCorruptionException;
import com.facebook.presto.orc.OrcRecordReaderOptions;
import com.facebook.presto.orc.StreamDescriptor;
import com.facebook.presto.orc.metadata.ColumnEncoding;
import com.facebook.presto.orc.stream.BooleanInputStream;
Expand Down Expand Up @@ -70,14 +71,14 @@ public class ListBatchStreamReader

private boolean rowGroupOpen;

public ListBatchStreamReader(Type type, StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, OrcAggregatedMemoryContext systemMemoryContext)
public ListBatchStreamReader(Type type, StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, OrcRecordReaderOptions options, OrcAggregatedMemoryContext systemMemoryContext)
throws OrcCorruptionException
{
requireNonNull(type, "type is null");
verifyStreamType(streamDescriptor, type, ArrayType.class::isInstance);
elementType = ((ArrayType) type).getElementType();
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
this.elementStreamReader = createStreamReader(elementType, streamDescriptor.getNestedStreams().get(0), hiveStorageTimeZone, systemMemoryContext);
this.elementStreamReader = createStreamReader(elementType, streamDescriptor.getNestedStreams().get(0), hiveStorageTimeZone, options, systemMemoryContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcAggregatedMemoryContext;
import com.facebook.presto.orc.OrcLocalMemoryContext;
import com.facebook.presto.orc.OrcRecordReaderOptions;
import com.facebook.presto.orc.StreamDescriptor;
import com.facebook.presto.orc.TupleDomainFilter;
import com.facebook.presto.orc.TupleDomainFilter.NullsFilter;
Expand Down Expand Up @@ -119,6 +120,7 @@ public ListSelectiveStreamReader(
int subfieldLevel, // 0 - top level
Optional<Type> outputType,
DateTimeZone hiveStorageTimeZone,
OrcRecordReaderOptions options,
boolean legacyMapSubscript,
OrcAggregatedMemoryContext systemMemoryContext)
{
Expand Down Expand Up @@ -199,7 +201,16 @@ else if (!filters.isEmpty()) {
.collect(toImmutableList());
}

this.elementStreamReader = createNestedStreamReader(elementStreamDescriptor, level + 1, Optional.ofNullable(this.listFilter), elementOutputType, elementSubfields, hiveStorageTimeZone, legacyMapSubscript, systemMemoryContext);
this.elementStreamReader = createNestedStreamReader(
elementStreamDescriptor,
level + 1,
Optional.ofNullable(this.listFilter),
elementOutputType,
elementSubfields,
hiveStorageTimeZone,
options,
legacyMapSubscript,
systemMemoryContext);
this.systemMemoryContext = systemMemoryContext.newOrcLocalMemoryContext(ListSelectiveStreamReader.class.getSimpleName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcAggregatedMemoryContext;
import com.facebook.presto.orc.OrcCorruptionException;
import com.facebook.presto.orc.OrcRecordReaderOptions;
import com.facebook.presto.orc.StreamDescriptor;
import com.facebook.presto.orc.metadata.ColumnEncoding;
import com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind;
Expand Down Expand Up @@ -46,12 +47,12 @@ public class MapBatchStreamReader
private final MapFlatBatchStreamReader flatReader;
private BatchStreamReader currentReader;

public MapBatchStreamReader(Type type, StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, OrcAggregatedMemoryContext systemMemoryContext)
public MapBatchStreamReader(Type type, StreamDescriptor streamDescriptor, DateTimeZone hiveStorageTimeZone, OrcRecordReaderOptions options, OrcAggregatedMemoryContext systemMemoryContext)
throws OrcCorruptionException
{
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
this.directReader = new MapDirectBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, systemMemoryContext);
this.flatReader = new MapFlatBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, systemMemoryContext);
this.directReader = new MapDirectBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, options, systemMemoryContext);
this.flatReader = new MapFlatBatchStreamReader(type, streamDescriptor, hiveStorageTimeZone, options, systemMemoryContext);
}

@Override
Expand Down
Loading

0 comments on commit 1a8987f

Please sign in to comment.