Skip to content

Commit

Permalink
Update S3ASeekableStream Implementation to throw when stream is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatbasik committed Nov 27, 2024
1 parent 2d23c5b commit 693fe9e
Showing 1 changed file with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;

import org.apache.hadoop.fs.FSExceptionMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -14,48 +15,46 @@
public class S3ASeekableStream extends FSInputStream {

private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private final String key;

public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);


public S3ASeekableStream(String bucket, String key, S3SeekableInputStreamFactory s3SeekableInputStreamFactory)
throws IOException {
public S3ASeekableStream(String bucket, String key, S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
this.key = key;
}

@Override
public int read() throws IOException {
throwIfClosed();
return inputStream.read();
}

@Override
public void seek(long pos) throws IOException {
throwIfClosed();
inputStream.seek(pos);
}

@Override
public long getPos() throws IOException {
return inputStream.getPos();
}

@Override
public void close() throws IOException {
if (inputStream != null) {
inputStream.close();
inputStream = null;
super.close();
public synchronized long getPos() {
if (!isClosed()) {
lastReadCurrentPos = inputStream.getPos();
}
return lastReadCurrentPos;
}


public void readTail(byte[] buf, int off, int n) throws IOException {
throwIfClosed();
inputStream.readTail(buf, off, n);
}

@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
return inputStream.read(buf, off, len);
}

Expand All @@ -65,4 +64,22 @@ public boolean seekToNewSource(long l) throws IOException {
return false;
}

@Override
public void close() throws IOException {
if (inputStream != null) {
inputStream.close();
inputStream = null;
super.close();
}
}

protected void throwIfClosed() throws IOException {
if (isClosed()) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}

protected boolean isClosed() {
return inputStream == null;
}
}

0 comments on commit 693fe9e

Please sign in to comment.