Skip to content

Commit

Permalink
HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable r…
Browse files Browse the repository at this point in the history
…eadahead

Adds new config option to turn off readahead
* also allows it to be passed in through openFile(),
  which means that if you set it in a mr/spark/hive job conf with
  "mapreduce.job.input.file.option." as the prefix, it will be disabled
  when used through LineRecordReader, FixedLengthRecordReader and
  a few others.
* extends ITestAbfsReadWriteAndSeek to use the option, including one
  replicated test...that shows that turning it off is slower.

Change-Id: Icfc3b6578654aff0b1264d5e116d4bf9eb414881
  • Loading branch information
steveloughran committed Nov 2, 2022
1 parent e62ba16 commit 3db7b36
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
private boolean trackLatency;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD,
DefaultValue = DEFAULT_ENABLE_READAHEAD)
private boolean enabledReadAhead;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
Expand Down Expand Up @@ -915,6 +920,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
}
}

public boolean isReadAheadEnabled() {
return this.enabledReadAhead;
}

@VisibleForTesting
void setReadAheadEnabled(final boolean enabledReadAhead) {
this.enabledReadAhead = enabledReadAhead;
}

public int getReadAheadRange() {
return this.readAheadRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;

/**
Expand Down Expand Up @@ -804,10 +805,16 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
boolean bufferedPreadDisabled = options
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
// readahead is cluster config unless openfile explicitly sets it.
boolean readahead = abfsConfiguration.isReadAheadEnabled();
boolean readAheadEnabled = options
.map(c -> c.getBoolean(FS_AZURE_ENABLE_READAHEAD, readahead))
.orElse(readahead);
return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(readAheadEnabled)
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";

/**
* Enable or disable readahead buffer in AbfsInputStream. It is enabled by default.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";

/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public AbfsInputStream(
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
this.readAheadEnabled = true;
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
this.alwaysReadBufferSize
= abfsInputStreamContext.shouldReadBufferSizeAlways();
this.bufferedPreadDisabled = abfsInputStreamContext
Expand Down Expand Up @@ -745,6 +745,12 @@ byte[] getBuffer() {
return buffer;
}

@VisibleForTesting
public boolean isReadAheadEnabled() {
return readAheadEnabled;

}

@VisibleForTesting
public int getReadAheadRange() {
return readAheadRange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private boolean isReadAheadEnabled = true;

private boolean alwaysReadBufferSize;

private int readAheadBlockSize;
Expand Down Expand Up @@ -72,6 +74,13 @@ public AbfsInputStreamContext withTolerateOobAppends(
return this;
}

public AbfsInputStreamContext isReadAheadEnabled(
final boolean isReadAheadEnabled) {
this.isReadAheadEnabled = isReadAheadEnabled;
return this;
}


public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
Expand Down Expand Up @@ -141,6 +150,10 @@ public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}

public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}

public int getReadAheadRange() {
return readAheadRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;

/**
* Test read, write and seek.
Expand All @@ -50,18 +50,27 @@
public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
private static final String TEST_PATH = "/testfile";

@Parameterized.Parameters(name = "Size={0}")
/**
* Parameterize on read buffer size and readahead.
* For test performance, a full x*y test matrix is not used.
* @return the test parameters
*/
@Parameterized.Parameters(name = "Size={0}-readahead={1}")
public static Iterable<Object[]> sizes() {
return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE},
{DEFAULT_READ_BUFFER_SIZE},
{APPENDBLOB_MAX_WRITE_BUFFER_SIZE},
{MAX_BUFFER_SIZE}});
return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true},
{DEFAULT_READ_BUFFER_SIZE, false},
{DEFAULT_READ_BUFFER_SIZE, true},
{APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false},
{MAX_BUFFER_SIZE, true}});
}

private final int size;
private final boolean readaheadEnabled;

public ITestAbfsReadWriteAndSeek(final int size) throws Exception {
public ITestAbfsReadWriteAndSeek(final int size,
final boolean readaheadEnabled) throws Exception {
this.size = size;
this.readaheadEnabled = readaheadEnabled;
}

@Test
Expand All @@ -74,6 +83,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
abfsConfiguration.setWriteBufferSize(bufferSize);
abfsConfiguration.setReadBufferSize(bufferSize);
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);

final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b);
Expand All @@ -85,7 +95,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
} finally{
stream.close();
}
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);

final byte[] readBuffer = new byte[2 * bufferSize];
int result;
Expand All @@ -109,7 +119,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
inputStream.seek(0);
result = inputStream.read(readBuffer, 0, bufferSize);
}
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);

assertNotEquals("data read in final read()", -1, result);
assertArrayEquals(readBuffer, b);
Expand All @@ -121,6 +131,7 @@ public void testReadAheadRequestID() throws java.io.IOException {
final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
int bufferSize = MIN_BUFFER_SIZE;
abfsConfiguration.setReadBufferSize(bufferSize);
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);

final byte[] b = new byte[bufferSize * 10];
new Random().nextBytes(b);
Expand All @@ -132,8 +143,10 @@ public void testReadAheadRequestID() throws java.io.IOException {
((AbfsOutputStream) stream.getWrappedStream())
.getStreamID()));
stream.write(b);
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
}


final byte[] readBuffer = new byte[4 * bufferSize];
int result;
fs.registerListener(
Expand All @@ -146,6 +159,7 @@ public void testReadAheadRequestID() throws java.io.IOException {
((AbfsInputStream) inputStream.getWrappedStream())
.getStreamID()));
result = inputStream.read(readBuffer, 0, bufferSize*4);
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream);
}
fs.registerListener(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ public void runCorrelationTestForAllMethods() throws Exception {

testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus
ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath"));
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //open,
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, true), //open,
// read, write
ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID"));
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //read (bypassreadahead)
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, false), //read (bypassreadahead)
ITestAbfsReadWriteAndSeek.class
.getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek"));
testClasses.put(new ITestAzureBlobFileSystemAppend(), //append
Expand Down

0 comments on commit 3db7b36

Please sign in to comment.