Skip to content

Commit

Permalink
HADOOP-18106: Handle memory fragmentation in S3A Vectored IO. (#4445)
Browse files Browse the repository at this point in the history
part of HADOOP-18103.
Handling memory fragmentation in S3A vectored IO implementation by
allocating smaller user range requested size buffers and directly
filling them from the remote S3 stream and skipping undesired
data in between ranges.
This patch also adds aborting active vectored reads when stream is
closed or unbuffer() is called.

Contributed By: Mukund Thakur

 Conflicts:
	hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
  • Loading branch information
mukund-thakur committed Jun 23, 2022
1 parent bfb7d02 commit c517b08
Show file tree
Hide file tree
Showing 17 changed files with 609 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
Expand All @@ -55,6 +54,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;

/****************************************************************
* Abstract Checksumed FileSystem.
Expand Down Expand Up @@ -166,7 +166,7 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource {
IOStatisticsSource, StreamCapabilities {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
Expand Down Expand Up @@ -408,7 +408,7 @@ public void readVectored(List<? extends FileRange> ranges,
int minSeek = minSeekForVectorReads();
int maxSize = maxReadSizeForVectorReads();
List<CombinedFileRange> dataRanges =
VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum,
VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum,
minSeek, maxReadSizeForVectorReads());
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
Expand All @@ -435,6 +435,11 @@ public void readVectored(List<? extends FileRange> ranges,
}
}
}

@Override
public boolean hasCapability(String capability) {
return datas.hasCapability(capability);
}
}

private static class FSDataBoundedInputStream extends FSDataInputStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.fs.impl.FileRangeImpl;

/**
* A byte range of a file.
* This is used for the asynchronous gather read API of
Expand Down Expand Up @@ -52,4 +54,14 @@ public interface FileRange {
* @param data the future of the ByteBuffer that will have the data
*/
void setData(CompletableFuture<ByteBuffer> data);

/**
* Factory method to create a FileRange object.
* @param offset starting offset of the range.
* @param length length of the range.
* @return a new instance of FileRangeImpl.
*/
static FileRange createFileRange(long offset, int length) {
return new FileRangeImpl(offset, length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.impl.VectoredReadUtils;

/**
* Stream that permits positional reading.
Expand Down Expand Up @@ -121,7 +120,6 @@ default int maxReadSizeForVectorReads() {
*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(),
maxReadSizeForVectorReads());
VectoredReadUtils.readVectored(this, ranges, allocate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.hadoop.fs;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.impl.VectoredReadUtils;

import java.io.BufferedOutputStream;
import java.io.DataOutput;
Expand Down Expand Up @@ -68,6 +67,7 @@
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
Expand Down Expand Up @@ -278,6 +278,7 @@ public boolean hasCapability(String capability) {
// new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.VECTOREDIO:
return true;
default:
return false;
Expand All @@ -303,23 +304,24 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {

List<? extends FileRange> sortedRanges = Arrays.asList(sortRanges(ranges));
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: ranges) {
for(FileRange range: sortedRanges) {
VectoredReadUtils.validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
AsynchronousFileChannel channel = getAsyncChannel();
ByteBuffer[] buffers = new ByteBuffer[ranges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers);
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
for(int i = 0; i < sortedRanges.size(); ++i) {
FileRange range = sortedRanges.get(i);
buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, asyncHandler);
}
} catch (IOException ioe) {
LOG.debug("Exception occurred during vectored read ", ioe);
for(FileRange range: ranges) {
for(FileRange range: sortedRanges) {
range.getData().completeExceptionally(ioe);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public interface StreamCapabilities {
*/
String IOSTATISTICS = "iostatistics";

/**
* Support for vectored IO api.
* See {@code PositionedReadable#readVectored(List, IntFunction)}.
*/
String VECTOREDIO = "readvectored";

/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* This matches the Path Capability
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;
package org.apache.hadoop.fs;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -28,9 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;

import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.util.Preconditions;

/**
Expand Down Expand Up @@ -68,35 +66,19 @@ public static void validateVectoredReadRanges(List<? extends FileRange> ranges)


/**
* Read fully a list of file ranges asynchronously from this file.
* The default iterates through the ranges to read each synchronously, but
* the intent is that subclasses can make more efficient readers.
* This is the default implementation which iterates through the ranges
* to read each synchronously, but the intent is that subclasses
* can make more efficient readers.
* The data or exceptions are pushed into {@link FileRange#getData()}.
* @param stream the stream to read the data from
* @param ranges the byte ranges to read
* @param allocate the byte buffer allocation
* @param minimumSeek the minimum number of bytes to seek over
* @param maximumRead the largest number of bytes to combine into a single read
*/
public static void readVectored(PositionedReadable stream,
List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
int minimumSeek,
int maximumRead) {
if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
for(FileRange range: ranges) {
range.setData(readRangeFrom(stream, range, allocate));
}
} else {
for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
maximumRead)) {
CompletableFuture<ByteBuffer> read =
readRangeFrom(stream, range, allocate);
for(FileRange child: range.getUnderlying()) {
child.setData(read.thenApply(
(b) -> sliceTo(b, range.getOffset(), child)));
}
}
IntFunction<ByteBuffer> allocate) {
for (FileRange range: ranges) {
range.setData(readRangeFrom(stream, range, allocate));
}
}

Expand Down Expand Up @@ -166,7 +148,7 @@ public static boolean isOrderedDisjoint(List<? extends FileRange> input,
int chunkSize,
int minimumSeek) {
long previous = -minimumSeek;
for(FileRange range: input) {
for (FileRange range: input) {
long offset = range.getOffset();
long end = range.getOffset() + range.getLength();
if (offset % chunkSize != 0 ||
Expand Down Expand Up @@ -209,7 +191,42 @@ public static long roundUp(long offset, int chunkSize) {
}

/**
* Sort and merge ranges to optimize the access from the underlying file
* Check if the input ranges are overlapping in nature.
* We call two ranges to be overlapping when start offset
* of second is less than the end offset of first.
* End offset is calculated as start offset + length.
* @param input list if input ranges.
* @return true/false based on logic explained above.
*/
public static List<? extends FileRange> validateNonOverlappingAndReturnSortedRanges(
List<? extends FileRange> input) {

if (input.size() <= 1) {
return input;
}
FileRange[] sortedRanges = sortRanges(input);
FileRange prev = sortedRanges[0];
for (int i=1; i<sortedRanges.length; i++) {
if (sortedRanges[i].getOffset() < prev.getOffset() + prev.getLength()) {
throw new UnsupportedOperationException("Overlapping ranges are not supported");
}
}
return Arrays.asList(sortedRanges);
}

/**
* Sort the input ranges by offset.
* @param input input ranges.
* @return sorted ranges.
*/
public static FileRange[] sortRanges(List<? extends FileRange> input) {
FileRange[] sortedRanges = input.toArray(new FileRange[0]);
Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset));
return sortedRanges;
}

/**
* Merge sorted ranges to optimize the access from the underlying file
* system.
* The motivations are that:
* <ul>
Expand All @@ -219,24 +236,22 @@ public static long roundUp(long offset, int chunkSize) {
* <li>Some file systems want to round ranges to be at checksum boundaries.</li>
* </ul>
*
* @param input the list of input ranges
* @param sortedRanges already sorted list of ranges based on offset.
* @param chunkSize round the start and end points to multiples of chunkSize
* @param minimumSeek the smallest gap that we should seek over in bytes
* @param maxSize the largest combined file range in bytes
* @return the list of sorted CombinedFileRanges that cover the input
*/
public static List<CombinedFileRange> sortAndMergeRanges(List<? extends FileRange> input,
int chunkSize,
int minimumSeek,
int maxSize) {
// sort the ranges by offset
FileRange[] ranges = input.toArray(new FileRange[0]);
Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
public static List<CombinedFileRange> mergeSortedRanges(List<? extends FileRange> sortedRanges,
int chunkSize,
int minimumSeek,
int maxSize) {

CombinedFileRange current = null;
List<CombinedFileRange> result = new ArrayList<>(ranges.length);
List<CombinedFileRange> result = new ArrayList<>(sortedRanges.size());

// now merge together the ones that merge
for(FileRange range: ranges) {
for (FileRange range: sortedRanges) {
long start = roundDown(range.getOffset(), chunkSize);
long end = roundUp(range.getOffset() + range.getLength(), chunkSize);
if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.fs.impl;

import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
package org.apache.hadoop.fs.impl;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileRange;

/**
* A range of bytes from a file with an optional buffer to read those bytes
* for zero copy.
* for zero copy. This shouldn't be created directly via constructor rather
* factory defined in {@code FileRange#createFileRange} should be used.
*/
@InterfaceAudience.Private
public class FileRangeImpl implements FileRange {
private long offset;
private int length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,14 @@ Read fully data for a list of ranges asynchronously. The default implementation
iterates through the ranges, tries to coalesce the ranges based on values of
`minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged
ranges synchronously, but the intent is sub classes can implement efficient
implementation.
implementation. Reading in both direct and heap byte buffers are supported.
Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for
allocating buffers such that even direct buffers are garbage collected when
they are no longer referenced.

Note: Don't use direct buffers for reading from ChecksumFileSystem as that may
lead to memory fragmentation explained in HADOOP-18296.


#### Preconditions

Expand All @@ -467,7 +474,7 @@ For each requested range:

### `minSeekForVectorReads()`

Smallest reasonable seek. Two ranges won't be merged together if the difference between
The smallest reasonable seek. Two ranges won't be merged together if the difference between
end of first and start of next range is more than this value.

### `maxReadSizeForVectorReads()`
Expand Down
Loading

0 comments on commit c517b08

Please sign in to comment.