Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19389: Optimize shell -text command I/O with multi-byte read. #7291

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ protected void processPath(PathData item) throws IOException {
}

protected class TextRecordInputStream extends InputStream {
SequenceFile.Reader r;
Object key;
Object val;
private final SequenceFile.Reader r;
private Object key;
private Object val;

DataInputBuffer inbuf;
DataOutputBuffer outbuf;
private final DataInputBuffer inbuf;
private final DataOutputBuffer outbuf;

public TextRecordInputStream(FileStatus f) throws IOException {
final Path fpath = f.getPath();
Expand All @@ -237,30 +237,67 @@ public TextRecordInputStream(FileStatus f) throws IOException {
public int read() throws IOException {
int ret;
if (null == inbuf || -1 == (ret = inbuf.read())) {
key = r.next(key);
if (key == null) {
return -1;
if (!readNextFromSequenceFile()) {
ret = -1;
} else {
val = r.getCurrentValue(val);
ret = inbuf.read();
}
byte[] tmp = key.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\t');
tmp = val.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\n');
inbuf.reset(outbuf.getData(), outbuf.getLength());
outbuf.reset();
ret = inbuf.read();
}
return ret;
}

@Override
public int read(byte[] dest, int destPos, int destLen) throws IOException {
validateInputStreamReadArguments(dest, destPos, destLen);

if (destLen == 0) {
return 0;
}

int bytesRead = 0;
while (destLen > 0) {
// Attempt to copy buffered data.
int copyLen = inbuf.read(dest, destPos, destLen);
if (-1 == copyLen) {
// There was no buffered data.
if (!readNextFromSequenceFile()) {
// There is also no data remaining in the file.
break;
}
// Reattempt copy now that we have buffered data.
copyLen = inbuf.read(dest, destPos, destLen);
}
bytesRead += copyLen;
destPos += copyLen;
destLen -= copyLen;
}

return bytesRead > 0 ? bytesRead : -1;
}

@Override
public void close() throws IOException {
r.close();
super.close();
}

private boolean readNextFromSequenceFile() throws IOException {
key = r.next(key);
if (key == null) {
return false;
} else {
val = r.getCurrentValue(val);
}
byte[] tmp = key.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\t');
tmp = val.toString().getBytes(StandardCharsets.UTF_8);
outbuf.write(tmp, 0, tmp.length);
outbuf.write('\n');
inbuf.reset(outbuf.getData(), outbuf.getLength());
outbuf.reset();
return true;
}
}

/**
Expand All @@ -270,10 +307,11 @@ public void close() throws IOException {
protected static class AvroFileInputStream extends InputStream {
private int pos;
private byte[] buffer;
private ByteArrayOutputStream output;
private FileReader<?> fileReader;
private DatumWriter<Object> writer;
private JsonEncoder encoder;
private final ByteArrayOutputStream output;
private final FileReader<?> fileReader;
private final DatumWriter<Object> writer;
private final JsonEncoder encoder;
private final byte[] finalSeparator;

public AvroFileInputStream(FileStatus status) throws IOException {
pos = 0;
Expand All @@ -286,31 +324,96 @@ public AvroFileInputStream(FileStatus status) throws IOException {
writer = new GenericDatumWriter<Object>(schema);
output = new ByteArrayOutputStream();
encoder = EncoderFactory.get().jsonEncoder(schema, output);
finalSeparator = System.getProperty("line.separator").getBytes(StandardCharsets.UTF_8);
}

/**
* Read a single byte from the stream.
*/
@Override
public int read() throws IOException {
if (buffer == null) {
return -1;
}

if (pos < buffer.length) {
return buffer[pos++];
}

if (!fileReader.hasNext()) {
// Unset buffer to signal EOF on future calls.
buffer = null;
return -1;
}

writer.write(fileReader.next(), encoder);
encoder.flush();

if (!fileReader.hasNext()) {
// Write a new line after the last Avro record.
output.write(System.getProperty("line.separator")
.getBytes(StandardCharsets.UTF_8));
output.flush();
if (buffer.length > 0) {
// Write a new line after the last Avro record.
output.write(finalSeparator);
output.flush();
}
}

swapBuffer();
return read();
}

@Override
public int read(byte[] dest, int destPos, int destLen) throws IOException {
validateInputStreamReadArguments(dest, destPos, destLen);

if (destLen == 0) {
return 0;
}

if (buffer == null) {
return -1;
}

int bytesRead = 0;
while (destLen > 0 && buffer != null) {
if (pos < buffer.length) {
// We have buffered data available, either from the Avro file or the final separator.
int copyLen = Math.min(buffer.length - pos, destLen);
System.arraycopy(buffer, pos, dest, destPos, copyLen);
pos += copyLen;
bytesRead += copyLen;
destPos += copyLen;
destLen -= copyLen;
} else if (buffer == finalSeparator) {
// There is no buffered data, and the last buffer processed was the final separator.
// Unset buffer to signal EOF on future calls.
buffer = null;
} else if (!fileReader.hasNext()) {
if (buffer.length > 0) {
// There is no data remaining in the file. Get ready to write the final separator on
// the next iteration.
buffer = finalSeparator;
pos = 0;
} else {
// We never read data into the buffer. This must be an empty file.
// Immediate EOF, no separator needed.
buffer = null;
return -1;
}
} else {
// Read the next data from the file into the buffer.
writer.write(fileReader.next(), encoder);
encoder.flush();
swapBuffer();
}
}

return bytesRead;
}

private void swapBuffer() {
pos = 0;
buffer = output.toByteArray();
cnauroth marked this conversation as resolved.
Show resolved Hide resolved
output.reset();
return read();
}

/**
Expand All @@ -323,4 +426,14 @@ public void close() throws IOException {
super.close();
}
}

private static void validateInputStreamReadArguments(byte[] dest, int destPos, int destLen)
throws IOException {
if (dest == null) {
throw new NullPointerException("null destination buffer");
} else if (destPos < 0 || destLen < 0 || destLen > dest.length - destPos) {
throw new IndexOutOfBoundsException(String.format(
"invalid destination buffer range: destPos = %d, destLen = %d", destPos, destLen));
}
}
}
Loading
Loading