Skip to content

Commit

Permalink
Make max slice size in ORC slice reader configurable (#24202)
Browse files Browse the repository at this point in the history
Summary:

Make max slice size in ORC slice reader configurable to be able to increase the threshold in Spark for Data Mine failing jobs.

Differential Revision: D66800897
  • Loading branch information
sdruzkin authored and facebook-github-bot committed Dec 5, 2024
1 parent e416ff1 commit 7d41968
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
import io.airlift.units.DataSize;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.util.Objects.requireNonNull;

public class OrcReaderOptions
{
private static final DataSize DEFAULT_MAX_SLICE_SIZE = new DataSize(1, GIGABYTE);
private final DataSize maxMergeDistance;
private final DataSize tinyStripeThreshold;
private final DataSize maxBlockSize;
private final boolean zstdJniDecompressionEnabled;
private final boolean mapNullKeysEnabled;
// if the option is set to true, OrcSelectiveReader will append a row number block at the end of the page
private final boolean appendRowNumber;
// slice reader will throw if the slice size is larger than this value
private final DataSize maxSliceSize;

/**
* Read column statistics for flat map columns. Usually there are quite a
Expand All @@ -41,7 +45,8 @@ private OrcReaderOptions(
boolean zstdJniDecompressionEnabled,
boolean mapNullKeysEnabled,
boolean appendRowNumber,
boolean readMapStatistics)
boolean readMapStatistics,
DataSize maxSliceSize)
{
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
this.maxBlockSize = requireNonNull(maxBlockSize, "maxBlockSize is null");
Expand All @@ -50,6 +55,7 @@ private OrcReaderOptions(
this.mapNullKeysEnabled = mapNullKeysEnabled;
this.appendRowNumber = appendRowNumber;
this.readMapStatistics = readMapStatistics;
this.maxSliceSize = maxSliceSize;
}

public DataSize getMaxMergeDistance()
Expand Down Expand Up @@ -87,6 +93,11 @@ public boolean readMapStatistics()
return readMapStatistics;
}

public DataSize getMaxSliceSize()
{
return maxSliceSize;
}

@Override
public String toString()
{
Expand All @@ -98,6 +109,7 @@ public String toString()
.add("mapNullKeysEnabled", mapNullKeysEnabled)
.add("appendRowNumber", appendRowNumber)
.add("readMapStatistics", readMapStatistics)
.add("maxSliceSize", maxSliceSize)
.toString();
}

Expand All @@ -115,6 +127,7 @@ public static final class Builder
private boolean mapNullKeysEnabled;
private boolean appendRowNumber;
private boolean readMapStatistics;
private DataSize maxSliceSize = DEFAULT_MAX_SLICE_SIZE;

private Builder() {}

Expand Down Expand Up @@ -160,6 +173,12 @@ public Builder withReadMapStatistics(boolean readMapStatistics)
return this;
}

public Builder withMaxSliceSize(DataSize maxSliceSize)
{
this.maxSliceSize = maxSliceSize;
return this;
}

public OrcReaderOptions build()
{
return new OrcReaderOptions(
Expand All @@ -169,7 +188,8 @@ public OrcReaderOptions build()
zstdJniDecompressionEnabled,
mapNullKeysEnabled,
appendRowNumber,
readMapStatistics);
readMapStatistics,
maxSliceSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.units.DataSize;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class OrcRecordReaderOptions
Expand All @@ -24,24 +25,34 @@ public class OrcRecordReaderOptions
private final DataSize maxBlockSize;
private final boolean mapNullKeysEnabled;
private final boolean appendRowNumber;
private final long maxSliceSize;

public OrcRecordReaderOptions(OrcReaderOptions options)
{
this(options.getMaxMergeDistance(), options.getTinyStripeThreshold(), options.getMaxBlockSize(), options.mapNullKeysEnabled(), options.appendRowNumber());
this(options.getMaxMergeDistance(),
options.getTinyStripeThreshold(),
options.getMaxBlockSize(),
options.mapNullKeysEnabled(),
options.appendRowNumber(),
options.getMaxSliceSize());
}

public OrcRecordReaderOptions(
DataSize maxMergeDistance,
DataSize tinyStripeThreshold,
DataSize maxBlockSize,
boolean mapNullKeysEnabled,
boolean appendRowNumber)
boolean appendRowNumber,
DataSize maxSliceSize)
{
this.maxMergeDistance = requireNonNull(maxMergeDistance, "maxMergeDistance is null");
this.maxBlockSize = requireNonNull(maxBlockSize, "maxBlockSize is null");
this.tinyStripeThreshold = requireNonNull(tinyStripeThreshold, "tinyStripeThreshold is null");
this.mapNullKeysEnabled = mapNullKeysEnabled;
this.appendRowNumber = appendRowNumber;
checkArgument(maxSliceSize.toBytes() < Integer.MAX_VALUE, "maxSliceSize cannot be larger than Integer.MAX_VALUE");
checkArgument(maxSliceSize.toBytes() > 0, "maxSliceSize must be positive");
this.maxSliceSize = maxSliceSize.toBytes();
}

public DataSize getMaxMergeDistance()
Expand All @@ -68,4 +79,9 @@ public boolean appendRowNumber()
{
return appendRowNumber;
}

public long getMaxSliceSize()
{
return maxSliceSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static BatchStreamReader createStreamReader(Type type, StreamDescriptor s
case STRING:
case VARCHAR:
case CHAR:
return new SliceBatchStreamReader(type, streamDescriptor, systemMemoryContext);
return new SliceBatchStreamReader(type, streamDescriptor, systemMemoryContext, options.getMaxSliceSize());
case TIMESTAMP:
case TIMESTAMP_MICROSECONDS:
boolean enableMicroPrecision = type == TIMESTAMP_MICROSECONDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ public LongSelectiveStreamReader(
Optional<TupleDomainFilter> filter,
Optional<Type> outputType,
OrcAggregatedMemoryContext systemMemoryContext,
boolean isLowMemory)
boolean isLowMemory,
long maxSliceSize)
{
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory);
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory, maxSliceSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ public class SelectiveReaderContext

private final OrcAggregatedMemoryContext systemMemoryContext;
private final boolean isLowMemory;
private final long maxSliceSize;

public SelectiveReaderContext(
StreamDescriptor streamDescriptor,
Optional<Type> outputType,
Optional<TupleDomainFilter> filter,
OrcAggregatedMemoryContext systemMemoryContext,
boolean isLowMemory)
boolean isLowMemory,
long maxSliceSize)
{
this.filter = requireNonNull(filter, "filter is null").orElse(null);
this.streamDescriptor = requireNonNull(streamDescriptor, "streamDescriptor is null");
Expand All @@ -57,6 +59,9 @@ public SelectiveReaderContext(
this.isLowMemory = isLowMemory;
this.nonDeterministicFilter = this.filter != null && !this.filter.isDeterministic();
this.nullsAllowed = this.filter == null || nonDeterministicFilter || this.filter.testNull();
checkArgument(maxSliceSize < Integer.MAX_VALUE, "maxSliceSize cannot be larger than Integer.MAX_VALUE");
checkArgument(maxSliceSize > 0, "maxSliceSize must be positive");
this.maxSliceSize = maxSliceSize;
}

public StreamDescriptor getStreamDescriptor()
Expand Down Expand Up @@ -106,4 +111,9 @@ public boolean isNullsAllowed()
{
return nullsAllowed;
}

public long getMaxSliceSize()
{
return maxSliceSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static SelectiveStreamReader createStreamReader(
case DATE: {
checkArgument(requiredSubfields.isEmpty(), "Primitive type stream reader doesn't support subfields");
verifyStreamType(streamDescriptor, outputType, t -> t instanceof BigintType || t instanceof IntegerType || t instanceof SmallintType || t instanceof DateType);
return new LongSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory);
return new LongSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory, options.getMaxSliceSize());
}
case FLOAT: {
checkArgument(requiredSubfields.isEmpty(), "Float type stream reader doesn't support subfields");
Expand All @@ -100,7 +100,7 @@ public static SelectiveStreamReader createStreamReader(
case CHAR:
checkArgument(requiredSubfields.isEmpty(), "Primitive stream reader doesn't support subfields");
verifyStreamType(streamDescriptor, outputType, t -> t instanceof VarcharType || t instanceof CharType || t instanceof VarbinaryType);
return new SliceSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory);
return new SliceSelectiveStreamReader(streamDescriptor, getOptionalOnlyFilter(type, filters), outputType, systemMemoryContext, isLowMemory, options.getMaxSliceSize());
case TIMESTAMP:
case TIMESTAMP_MICROSECONDS: {
boolean enableMicroPrecision = outputType.isPresent() && outputType.get() == TIMESTAMP_MICROSECONDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ public class SliceBatchStreamReader
private final SliceDictionaryBatchStreamReader dictionaryReader;
private BatchStreamReader currentReader;

public SliceBatchStreamReader(Type type, StreamDescriptor streamDescriptor, OrcAggregatedMemoryContext systemMemoryContext)
public SliceBatchStreamReader(Type type, StreamDescriptor streamDescriptor, OrcAggregatedMemoryContext systemMemoryContext, long maxSliceSize)
throws OrcCorruptionException
{
requireNonNull(type, "type is null");
verifyStreamType(streamDescriptor, type, t -> t instanceof VarcharType || t instanceof CharType || t instanceof VarbinaryType);
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
this.directReader = new SliceDirectBatchStreamReader(streamDescriptor, getMaxCodePointCount(type), isCharType(type));
this.directReader = new SliceDirectBatchStreamReader(streamDescriptor, getMaxCodePointCount(type), isCharType(type), maxSliceSize);
this.dictionaryReader = new SliceDictionaryBatchStreamReader(streamDescriptor, getMaxCodePointCount(type), isCharType(type), systemMemoryContext.newOrcLocalMemoryContext(SliceBatchStreamReader.class.getSimpleName()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.facebook.presto.orc.stream.LongInputStream;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
Expand All @@ -45,10 +44,10 @@
import static com.facebook.presto.orc.stream.MissingInputStreamSource.getByteArrayMissingStreamSource;
import static com.facebook.presto.orc.stream.MissingInputStreamSource.getLongMissingStreamSource;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.Slices.EMPTY_SLICE;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -57,11 +56,11 @@ public class SliceDirectBatchStreamReader
implements BatchStreamReader
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(SliceDirectBatchStreamReader.class).instanceSize();
private static final int ONE_GIGABYTE = toIntExact(new DataSize(1, GIGABYTE).toBytes());

private final StreamDescriptor streamDescriptor;
private final int maxCodePointCount;
private final boolean isCharType;
private final long maxSliceSize;

private int readOffset;
private int nextBatchSize;
Expand All @@ -80,11 +79,14 @@ public class SliceDirectBatchStreamReader

private boolean rowGroupOpen;

public SliceDirectBatchStreamReader(StreamDescriptor streamDescriptor, int maxCodePointCount, boolean isCharType)
public SliceDirectBatchStreamReader(StreamDescriptor streamDescriptor, int maxCodePointCount, boolean isCharType, long maxSliceSize)
{
this.maxCodePointCount = maxCodePointCount;
this.isCharType = isCharType;
this.streamDescriptor = requireNonNull(streamDescriptor, "stream is null");
checkArgument(maxSliceSize < Integer.MAX_VALUE, "maxSliceSize cannot be larger than Integer.MAX_VALUE");
checkArgument(maxSliceSize > 0, "maxSliceSize must be positive");
this.maxSliceSize = maxSliceSize;
}

@Override
Expand Down Expand Up @@ -176,8 +178,13 @@ public Block readBlock()
if (totalLength == 0) {
return new VariableWidthBlock(currentBatchSize, EMPTY_SLICE, offsetVector, Optional.ofNullable(isNullVector));
}
if (totalLength > ONE_GIGABYTE) {
throw new GenericInternalException(format("Values in column \"%s\" are too large to process for Presto. %s column values are larger than 1GB [%s]", streamDescriptor.getFieldName(), currentBatchSize, streamDescriptor.getOrcDataSourceId()));
if (totalLength > maxSliceSize) {
throw new GenericInternalException(
format("Values in column \"%s\" are too large to process for Presto. Requested to read [%s] bytes, when max allowed is [%s] bytes [%s]",
streamDescriptor.getFieldName(),
totalLength,
maxSliceSize,
streamDescriptor.getOrcDataSourceId()));
}
if (dataStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is missing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.google.common.annotations.VisibleForTesting;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
Expand All @@ -57,16 +56,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class SliceDirectSelectiveStreamReader
implements SelectiveStreamReader
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(SliceDirectSelectiveStreamReader.class).instanceSize();
private static final int ONE_GIGABYTE = toIntExact(new DataSize(1, GIGABYTE).toBytes());

private final SelectiveReaderContext context;
private final boolean isCharType;
Expand Down Expand Up @@ -721,10 +717,12 @@ else if (isNotNull) {
}

// TODO Do not throw if outputRequired == false
if (totalLength > ONE_GIGABYTE) {
if (totalLength > context.getMaxSliceSize()) {
throw new GenericInternalException(
format("Values in column \"%s\" are too large to process for Presto. %s column values are larger than 1GB [%s]",
context.getStreamDescriptor().getFieldName(), positionCount,
format("Values in column \"%s\" are too large to process for Presto. Requested to read [%s] bytes, when max allowed is [%s] bytes [%s]",
context.getStreamDescriptor().getFieldName(),
totalLength,
context.getMaxSliceSize(),
context.getStreamDescriptor().getOrcDataSourceId()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ public class SliceSelectiveStreamReader
private SliceDictionarySelectiveReader dictionaryReader;
private SelectiveStreamReader currentReader;

public SliceSelectiveStreamReader(StreamDescriptor streamDescriptor, Optional<TupleDomainFilter> filter, Optional<Type> outputType, OrcAggregatedMemoryContext systemMemoryContext, boolean isLowMemory)
public SliceSelectiveStreamReader(
StreamDescriptor streamDescriptor,
Optional<TupleDomainFilter> filter,
Optional<Type> outputType,
OrcAggregatedMemoryContext systemMemoryContext,
boolean isLowMemory,
long maxSliceSize)
{
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory);
this.context = new SelectiveReaderContext(streamDescriptor, outputType, filter, systemMemoryContext, isLowMemory, maxSliceSize);
}

public static int computeTruncatedLength(Slice slice, int offset, int length, int maxCodePointCount, boolean isCharType)
Expand Down

0 comments on commit 7d41968

Please sign in to comment.