Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-11867. Add a high performance vectored read API to file system. #3499

Closed
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
* Update data in child range from combined range.
* @param child child range.
* @param combinedBuffer combined buffer.
* @param combinedFileRange
* @param combinedFileRange combined range.
*/
private void updateOriginalRange(FileRange child,
ByteBuffer combinedBuffer,
Expand All @@ -887,18 +887,22 @@ private void updateOriginalRange(FileRange child,
child.getOffset(), child.getLength(), combinedFileRange.getOffset(), combinedFileRange.getLength());
ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child);
child.getData().complete(childBuffer);
LOG.trace("End Filling original range ");
LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ",
child.getOffset(), child.getLength(), combinedFileRange.getOffset(), combinedFileRange.getLength());
}

/**
* Validates range parameters.
* @param range
* @throws EOFException
* @param range requested range.
* @throws EOFException end of file exception.
*/
private void validateRangeRequest(FileRange range) throws EOFException {
VectoredReadUtils.validateRangeRequest(range);
if(range.getOffset() + range.getLength() > contentLength) {
throw new EOFException("Requested range is beyond EOF");
LOG.error("Requested range [{}, {}) is beyond EOF",
range.getOffset(), range.getLength());
throw new EOFException("Requested range [" + range.getOffset() +", "
+ range.getLength() + ") is beyond EOF");
}
}

Expand All @@ -914,6 +918,7 @@ private void validateRangeRequest(FileRange range) throws EOFException {
* @param buffer buffer to fill.
*/
private void readSingleRange(FileRange range, ByteBuffer buffer) {
LOG.debug("Start reading range {} ", range);
S3Object objectRange = null;
S3ObjectInputStream objectContent = null;
try {
Expand All @@ -922,6 +927,10 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) {
final String operationName = "readVectored";
objectRange = getS3Object(operationName, position, length);
objectContent = objectRange.getObjectContent();
if (objectContent == null) {
throw new PathIOException(uri,
"Null IO stream received during " + operationName);
}
populateBuffer(length, buffer, objectContent);
range.getData().complete(buffer);
} catch (IOException ex) {
Expand All @@ -930,6 +939,7 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) {
} finally {
IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
}
LOG.debug("Finished reading range {} ", range);
}

/**
Expand Down Expand Up @@ -958,31 +968,28 @@ private void populateBuffer(int length,
* This also handles if file has been changed while http call
* is getting executed. If file has been changed RemoteFileChangedException
* is thrown.
* TODO: add retries here.
* @param operationName
* @param position
* @param length
* @return
* @throws IOException
* @param operationName name of the operation for which get object on S3 is called.
* @param position position of the object to be read from S3.
* @param length length from position of the object to be read from S3.
* @return S3Object
* @throws IOException exception if any.
*/
private S3Object getS3Object(String operationName, long position,
int length) throws IOException {
final GetObjectRequest request = client.newGetRequest(key)
.withRange(position, position + length - 1);
String text = String.format("%s %s at %d length %d",
operationName, uri, position, length);
changeTracker.maybeApplyConstraint(request);
DurationTracker tracker = streamStatistics.initiateGetRequest();
S3Object objectRange;
Invoker invoker = context.getReadInvoker();
try {
objectRange = Invoker.once(text, uri,
objectRange = invoker.retry(operationName, uri, true,
() -> client.getObject(request));
} catch (IOException ex) {
tracker.failed();
throw ex;
} finally {
tracker.close();

}
changeTracker.processResponse(objectRange, operationName,
position);
Expand Down