From cd517eddea59e536fca695911c28e9b0f064629e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 8 Nov 2022 11:43:04 +0000 Subject: [PATCH] HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead (#5103) * HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead Adds new config option to turn off readahead * also allows it to be passed in through openFile(), * extends ITestAbfsReadWriteAndSeek to use the option, including one replicated test...that shows that turning it off is slower. Important: this does not address the critical data corruption issue HADOOP-18521. ABFS ReadBufferManager buffer sharing across concurrent HTTP requests What is does do is provide a way to completely bypass the ReadBufferManager. To mitigate the problem, either fs.azure.enable.readahead needs to be set to false, or set "fs.azure.readaheadqueue.depth" to 0 -this still goes near the (broken) ReadBufferManager code, but does't trigger the bug. For safe reading of files through the ABFS connector, readahead MUST be disabled or the followup fix to HADOOP-18521 applied Contributed by Steve Loughran --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 14 ++++++++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 1 + .../azurebfs/constants/ConfigurationKeys.java | 7 ++++ .../constants/FileSystemConfigurations.java | 1 + .../fs/azurebfs/services/AbfsInputStream.java | 7 +++- .../services/AbfsInputStreamContext.java | 12 +++++++ .../azurebfs/ITestAbfsReadWriteAndSeek.java | 32 +++++++++++++------ .../fs/azurebfs/TestTracingContext.java | 4 +-- 8 files changed, 66 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 6462cc960097e..4c77d2e136dee 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -302,6 +302,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ABFS_LATENCY_TRACK) private boolean trackLatency; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_ENABLE_READAHEAD, + DefaultValue = DEFAULT_ENABLE_READAHEAD) + private boolean enabledReadAhead; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS, MinValue = 0, DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) @@ -915,6 +920,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio } } + public boolean isReadAheadEnabled() { + return this.enabledReadAhead; + } + + @VisibleForTesting + void setReadAheadEnabled(final boolean enabledReadAhead) { + this.enabledReadAhead = enabledReadAhead; + } + public int getReadAheadRange() { return this.readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 28c3ef25e94b8..3941f6e421a51 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -808,6 +808,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled()) .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) .withReadAheadRange(abfsConfiguration.getReadAheadRange()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 9d3b2d5e82c6e..0353f3e01ffb1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -186,6 +186,13 @@ public final class ConfigurationKeys { public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement"; public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider"; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + + /** + * Enable or disable readahead buffer in AbfsInputStream. + * Value: {@value}. + */ + public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead"; + /** Setting this true will make the driver use it's own RemoteIterator implementation */ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; /** Server side encryption key */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 63d62a33b1819..42f3b7503e03d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -106,6 +106,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; + public static final boolean DEFAULT_ENABLE_READAHEAD = true; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 7033ae9a4a039..e7ddffe99fde3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -137,7 +137,7 @@ public AbfsInputStream( this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); - this.readAheadEnabled = true; + this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled(); this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); this.bufferedPreadDisabled = abfsInputStreamContext @@ -745,6 +745,11 @@ byte[] getBuffer() { return buffer; } + @VisibleForTesting + public boolean isReadAheadEnabled() { + return readAheadEnabled; + } + @VisibleForTesting public int getReadAheadRange() { return readAheadRange; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index 55f01bf15bcf7..05afc7b9858da 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; + private boolean isReadAheadEnabled = true; + private boolean alwaysReadBufferSize; private int readAheadBlockSize; @@ -72,6 +74,12 @@ public AbfsInputStreamContext withTolerateOobAppends( return this; } + public AbfsInputStreamContext isReadAheadEnabled( + final boolean isReadAheadEnabled) { + this.isReadAheadEnabled = isReadAheadEnabled; + return this; + } + public AbfsInputStreamContext withReadAheadRange( final int readAheadRange) { this.readAheadRange = readAheadRange; @@ -141,6 +149,10 @@ public boolean isTolerateOobAppends() { return tolerateOobAppends; } + public boolean isReadAheadEnabled() { + return isReadAheadEnabled; + } + public int getReadAheadRange() { return readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 5bd6eaff42e84..beada775ae87b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; -import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; @@ -40,6 +39,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; /** * Test read, write and seek. @@ -50,18 +50,27 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest { private static final String TEST_PATH = "/testfile"; - @Parameterized.Parameters(name = "Size={0}") + /** + * Parameterize on read buffer size and readahead. + * For test performance, a full x*y test matrix is not used. + * @return the test parameters + */ + @Parameterized.Parameters(name = "Size={0}-readahead={1}") public static Iterable sizes() { - return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE}, - {DEFAULT_READ_BUFFER_SIZE}, - {APPENDBLOB_MAX_WRITE_BUFFER_SIZE}, - {MAX_BUFFER_SIZE}}); + return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true}, + {DEFAULT_READ_BUFFER_SIZE, false}, + {DEFAULT_READ_BUFFER_SIZE, true}, + {APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false}, + {MAX_BUFFER_SIZE, true}}); } private final int size; + private final boolean readaheadEnabled; - public ITestAbfsReadWriteAndSeek(final int size) throws Exception { + public ITestAbfsReadWriteAndSeek(final int size, + final boolean readaheadEnabled) throws Exception { this.size = size; + this.readaheadEnabled = readaheadEnabled; } @Test @@ -74,6 +83,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); abfsConfiguration.setWriteBufferSize(bufferSize); abfsConfiguration.setReadBufferSize(bufferSize); + abfsConfiguration.setReadAheadEnabled(readaheadEnabled); final byte[] b = new byte[2 * bufferSize]; new Random().nextBytes(b); @@ -85,7 +95,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { } finally{ stream.close(); } - IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); final byte[] readBuffer = new byte[2 * bufferSize]; int result; @@ -109,7 +119,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); } - IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); @@ -121,6 +131,7 @@ public void testReadAheadRequestID() throws java.io.IOException { final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); int bufferSize = MIN_BUFFER_SIZE; abfsConfiguration.setReadBufferSize(bufferSize); + abfsConfiguration.setReadAheadEnabled(readaheadEnabled); final byte[] b = new byte[bufferSize * 10]; new Random().nextBytes(b); @@ -132,8 +143,10 @@ public void testReadAheadRequestID() throws java.io.IOException { ((AbfsOutputStream) stream.getWrappedStream()) .getStreamID())); stream.write(b); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); } + final byte[] readBuffer = new byte[4 * bufferSize]; int result; fs.registerListener( @@ -146,6 +159,7 @@ public void testReadAheadRequestID() throws java.io.IOException { ((AbfsInputStream) inputStream.getWrappedStream()) .getStreamID())); result = inputStream.read(readBuffer, 0, bufferSize*4); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream); } fs.registerListener(null); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java index 006004850d0df..0e7c70e91a9f8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java @@ -130,10 +130,10 @@ public void runCorrelationTestForAllMethods() throws Exception { testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath")); - testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //open, + testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, true), //open, // read, write ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID")); - testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //read (bypassreadahead) + testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, false), //read (bypassreadahead) ITestAbfsReadWriteAndSeek.class .getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek")); testClasses.put(new ITestAzureBlobFileSystemAppend(), //append