From 80f9f1ef36c0e7953a13ee4b433a6afc623ad4cc Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 15 Feb 2024 15:26:02 -0800 Subject: [PATCH] [HUDI-7410] Use SeekableDataInputStream as the input of native HFile reader (#10673) --- .../bootstrap/index/HFileBootstrapIndex.java | 5 +- .../storage/HoodieNativeAvroHFileReader.java | 11 +++-- .../TestInLineFileSystemWithHFileReader.java | 8 ++-- .../io/ByteArraySeekableDataInputStream.java | 47 +++++++++++++++++++ .../hudi/io/hfile/HFileBlockReader.java | 6 +-- .../apache/hudi/io/hfile/HFileReaderImpl.java | 8 ++-- .../apache/hudi/io/hfile/TestHFileReader.java | 38 +-------------- 7 files changed, 71 insertions(+), 52 deletions(-) create mode 100644 hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java index 989b0ad1e6d03..7a6de5fe994e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java @@ -33,6 +33,8 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.hfile.HFileReader; import org.apache.hudi.io.hfile.HFileReaderImpl; import org.apache.hudi.io.hfile.Key; @@ -41,7 +43,6 @@ import org.apache.hudi.io.util.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparatorImpl; @@ -238,7 +239,7 @@ private static HFileReader createReader(String hFilePath, FileSystem fileSystem) LOG.info("Opening HFile for reading :" + hFilePath); Path path = new Path(hFilePath); long fileSize = fileSystem.getFileStatus(path).getLen(); - FSDataInputStream stream = fileSystem.open(path); + SeekableDataInputStream stream = new HadoopSeekableDataInputStream(fileSystem.open(path)); return new HFileReaderImpl(stream, fileSize); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java index cc3833996b9bd..e760b33b9e2fc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java @@ -28,9 +28,13 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream; +import org.apache.hudi.io.ByteArraySeekableDataInputStream; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.hfile.HFileReader; import org.apache.hudi.io.hfile.HFileReaderImpl; import org.apache.hudi.io.hfile.KeyValue; @@ -41,7 +45,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -256,15 +259,15 @@ private synchronized HFileReader getSharedHFileReader() { } private HFileReader newHFileReader() throws IOException { - FSDataInputStream inputStream; + SeekableDataInputStream inputStream; long fileSize; if (path.isPresent()) { FileSystem fs = HadoopFSUtils.getFs(path.get(), conf); fileSize = fs.getFileStatus(path.get()).getLen(); - inputStream = fs.open(path.get()); + inputStream = new HadoopSeekableDataInputStream(fs.open(path.get())); } else { fileSize = bytesContent.get().length; - inputStream = new FSDataInputStream(new SeekableByteArrayInputStream(bytesContent.get())); + inputStream = new ByteArraySeekableDataInputStream(new ByteBufferBackedInputStream(bytesContent.get())); } return new HFileReaderImpl(inputStream, fileSize); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java index 2ae8fd2f6516d..91649c68bd95b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java @@ -20,7 +20,9 @@ package org.apache.hudi.common.fs.inline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream; import org.apache.hudi.hadoop.fs.inline.InLineFileSystem; +import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.hfile.HFileReader; import org.apache.hudi.io.hfile.HFileReaderImpl; import org.apache.hudi.io.hfile.Key; @@ -28,7 +30,6 @@ import org.apache.hudi.io.hfile.UTF8StringKey; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import java.io.IOException; @@ -51,8 +52,9 @@ protected void validateHFileReading(InLineFileSystem inlineFileSystem, Path inlinePath, int maxRows) throws IOException { long fileSize = inlineFileSystem.getFileStatus(inlinePath).getLen(); - try (FSDataInputStream fin = inlineFileSystem.open(inlinePath)) { - try (HFileReader reader = new HFileReaderImpl(fin, fileSize)) { + try (SeekableDataInputStream stream = + new HadoopSeekableDataInputStream(inlineFileSystem.open(inlinePath))) { + try (HFileReader reader = new HFileReaderImpl(stream, fileSize)) { // Align scanner at start of the file. reader.seekTo(); readAllRecords(reader, maxRows); diff --git a/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java b/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java new file mode 100644 index 0000000000000..5ebe3a1729b36 --- /dev/null +++ b/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; + +import java.io.IOException; + +/** + * Implementation of {@link SeekableDataInputStream} based on byte array + */ +public class ByteArraySeekableDataInputStream extends SeekableDataInputStream { + + ByteBufferBackedInputStream stream; + + public ByteArraySeekableDataInputStream(ByteBufferBackedInputStream stream) { + super(stream); + this.stream = stream; + } + + @Override + public long getPos() throws IOException { + return stream.getPosition(); + } + + @Override + public void seek(long pos) throws IOException { + stream.seek(pos); + } +} diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java index bcc1afb64cea5..26103a4b391de 100644 --- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java +++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java @@ -19,7 +19,7 @@ package org.apache.hudi.io.hfile; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hudi.io.SeekableDataInputStream; import java.io.EOFException; import java.io.IOException; @@ -30,7 +30,7 @@ public class HFileBlockReader { private final HFileContext context; private final long streamStartOffset; - private final FSDataInputStream stream; + private final SeekableDataInputStream stream; private final byte[] byteBuff; private int offset; private boolean isReadFully = false; @@ -44,7 +44,7 @@ public class HFileBlockReader { * @param endOffset end offset to stop at. */ public HFileBlockReader(HFileContext context, - FSDataInputStream stream, + SeekableDataInputStream stream, long startOffset, long endOffset) { this.context = context; diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java index 87dafc9d88696..564dd98eb640e 100644 --- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java +++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java @@ -20,8 +20,8 @@ package org.apache.hudi.io.hfile; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.SeekableDataInputStream; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.logging.log4j.util.Strings; import java.io.ByteArrayInputStream; @@ -38,7 +38,7 @@ * An implementation a {@link HFileReader}. */ public class HFileReaderImpl implements HFileReader { - private final FSDataInputStream stream; + private final SeekableDataInputStream stream; private final long fileSize; private final HFileCursor cursor; @@ -51,7 +51,7 @@ public class HFileReaderImpl implements HFileReader { private Option currentDataBlockEntry; private Option currentDataBlock; - public HFileReaderImpl(FSDataInputStream stream, long fileSize) { + public HFileReaderImpl(SeekableDataInputStream stream, long fileSize) { this.stream = stream; this.fileSize = fileSize; this.cursor = new HFileCursor(); @@ -255,7 +255,7 @@ public void close() throws IOException { * @return {@link HFileTrailer} instance. * @throws IOException upon error. */ - private static HFileTrailer readTrailer(FSDataInputStream stream, + private static HFileTrailer readTrailer(SeekableDataInputStream stream, long fileSize) throws IOException { int bufferSize = HFileTrailer.getTrailerSize(); long seekPos = fileSize - bufferSize; diff --git a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java index d9a1969c75d4f..ef7d1c3fc7529 100644 --- a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java +++ b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java @@ -21,10 +21,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; +import org.apache.hudi.io.ByteArraySeekableDataInputStream; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -439,7 +437,7 @@ public static byte[] readHFileFromResources(String filename) throws IOException public static HFileReader getHFileReader(String filename) throws IOException { byte[] content = readHFileFromResources(filename); return new HFileReaderImpl( - new FSDataInputStream(new SeekableByteArrayInputStream(content)), content.length); + new ByteArraySeekableDataInputStream(new ByteBufferBackedInputStream(content)), content.length); } private static void verifyHFileRead(String filename, @@ -604,36 +602,4 @@ public String getExpectedValue() { return expectedValue; } } - - static class SeekableByteArrayInputStream extends ByteBufferBackedInputStream implements Seekable, - PositionedReadable { - public SeekableByteArrayInputStream(byte[] buf) { - super(buf); - } - - @Override - public long getPos() throws IOException { - return getPosition(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - - @Override - public int read(long position, byte[] buffer, int offset, int length) throws IOException { - return copyFrom(position, buffer, offset, length); - } - - @Override - public void readFully(long position, byte[] buffer) throws IOException { - read(position, buffer, 0, buffer.length); - } - - @Override - public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - read(position, buffer, offset, length); - } - } }