Skip to content

Commit

Permalink
[HUDI-7410] Use SeekableDataInputStream as the input of native HFile …
Browse files Browse the repository at this point in the history
…reader (#10673)
  • Loading branch information
yihua authored Feb 15, 2024
1 parent 6e6d66a commit 80f9f1e
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.hfile.HFileReader;
import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.io.hfile.Key;
Expand All @@ -41,7 +43,6 @@
import org.apache.hudi.io.util.IOUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
Expand Down Expand Up @@ -238,7 +239,7 @@ private static HFileReader createReader(String hFilePath, FileSystem fileSystem)
LOG.info("Opening HFile for reading :" + hFilePath);
Path path = new Path(hFilePath);
long fileSize = fileSystem.getFileStatus(path).getLen();
FSDataInputStream stream = fileSystem.open(path);
SeekableDataInputStream stream = new HadoopSeekableDataInputStream(fileSystem.open(path));
return new HFileReaderImpl(stream, fileSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
import org.apache.hudi.io.ByteArraySeekableDataInputStream;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.hfile.HFileReader;
import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.io.hfile.KeyValue;
Expand All @@ -41,7 +45,6 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
Expand Down Expand Up @@ -256,15 +259,15 @@ private synchronized HFileReader getSharedHFileReader() {
}

private HFileReader newHFileReader() throws IOException {
FSDataInputStream inputStream;
SeekableDataInputStream inputStream;
long fileSize;
if (path.isPresent()) {
FileSystem fs = HadoopFSUtils.getFs(path.get(), conf);
fileSize = fs.getFileStatus(path.get()).getLen();
inputStream = fs.open(path.get());
inputStream = new HadoopSeekableDataInputStream(fs.open(path.get()));
} else {
fileSize = bytesContent.get().length;
inputStream = new FSDataInputStream(new SeekableByteArrayInputStream(bytesContent.get()));
inputStream = new ByteArraySeekableDataInputStream(new ByteBufferBackedInputStream(bytesContent.get()));
}
return new HFileReaderImpl(inputStream, fileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
package org.apache.hudi.common.fs.inline;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.hfile.HFileReader;
import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.io.hfile.Key;
import org.apache.hudi.io.hfile.KeyValue;
import org.apache.hudi.io.hfile.UTF8StringKey;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
Expand All @@ -51,8 +52,9 @@ protected void validateHFileReading(InLineFileSystem inlineFileSystem,
Path inlinePath,
int maxRows) throws IOException {
long fileSize = inlineFileSystem.getFileStatus(inlinePath).getLen();
try (FSDataInputStream fin = inlineFileSystem.open(inlinePath)) {
try (HFileReader reader = new HFileReaderImpl(fin, fileSize)) {
try (SeekableDataInputStream stream =
new HadoopSeekableDataInputStream(inlineFileSystem.open(inlinePath))) {
try (HFileReader reader = new HFileReaderImpl(stream, fileSize)) {
// Align scanner at start of the file.
reader.seekTo();
readAllRecords(reader, maxRows);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.io;

import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;

import java.io.IOException;

/**
* Implementation of {@link SeekableDataInputStream} based on byte array
*/
public class ByteArraySeekableDataInputStream extends SeekableDataInputStream {

ByteBufferBackedInputStream stream;

public ByteArraySeekableDataInputStream(ByteBufferBackedInputStream stream) {
super(stream);
this.stream = stream;
}

@Override
public long getPos() throws IOException {
return stream.getPosition();
}

@Override
public void seek(long pos) throws IOException {
stream.seek(pos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.io.hfile;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.io.SeekableDataInputStream;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -30,7 +30,7 @@
public class HFileBlockReader {
private final HFileContext context;
private final long streamStartOffset;
private final FSDataInputStream stream;
private final SeekableDataInputStream stream;
private final byte[] byteBuff;
private int offset;
private boolean isReadFully = false;
Expand All @@ -44,7 +44,7 @@ public class HFileBlockReader {
* @param endOffset end offset to stop at.
*/
public HFileBlockReader(HFileContext context,
FSDataInputStream stream,
SeekableDataInputStream stream,
long startOffset,
long endOffset) {
this.context = context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.apache.hudi.io.hfile;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.SeekableDataInputStream;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.logging.log4j.util.Strings;

import java.io.ByteArrayInputStream;
Expand All @@ -38,7 +38,7 @@
* An implementation a {@link HFileReader}.
*/
public class HFileReaderImpl implements HFileReader {
private final FSDataInputStream stream;
private final SeekableDataInputStream stream;
private final long fileSize;

private final HFileCursor cursor;
Expand All @@ -51,7 +51,7 @@ public class HFileReaderImpl implements HFileReader {
private Option<BlockIndexEntry> currentDataBlockEntry;
private Option<HFileDataBlock> currentDataBlock;

public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
public HFileReaderImpl(SeekableDataInputStream stream, long fileSize) {
this.stream = stream;
this.fileSize = fileSize;
this.cursor = new HFileCursor();
Expand Down Expand Up @@ -255,7 +255,7 @@ public void close() throws IOException {
* @return {@link HFileTrailer} instance.
* @throws IOException upon error.
*/
private static HFileTrailer readTrailer(FSDataInputStream stream,
private static HFileTrailer readTrailer(SeekableDataInputStream stream,
long fileSize) throws IOException {
int bufferSize = HFileTrailer.getTrailerSize();
long seekPos = fileSize - bufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@

import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.io.ByteArraySeekableDataInputStream;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -439,7 +437,7 @@ public static byte[] readHFileFromResources(String filename) throws IOException
public static HFileReader getHFileReader(String filename) throws IOException {
byte[] content = readHFileFromResources(filename);
return new HFileReaderImpl(
new FSDataInputStream(new SeekableByteArrayInputStream(content)), content.length);
new ByteArraySeekableDataInputStream(new ByteBufferBackedInputStream(content)), content.length);
}

private static void verifyHFileRead(String filename,
Expand Down Expand Up @@ -604,36 +602,4 @@ public String getExpectedValue() {
return expectedValue;
}
}

static class SeekableByteArrayInputStream extends ByteBufferBackedInputStream implements Seekable,
PositionedReadable {
public SeekableByteArrayInputStream(byte[] buf) {
super(buf);
}

@Override
public long getPos() throws IOException {
return getPosition();
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}

@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
return copyFrom(position, buffer, offset, length);
}

@Override
public void readFully(long position, byte[] buffer) throws IOException {
read(position, buffer, 0, buffer.length);
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
read(position, buffer, offset, length);
}
}
}

0 comments on commit 80f9f1e

Please sign in to comment.