Skip to content

Commit

Permalink
Added mark/reset functionality to BufferInputStream. (#2447)
Browse files Browse the repository at this point in the history
Motivation:
BufferInputStream can support the mark/reset functionality with not too much additional logic like ByteArrayInputStream and ByteBufInputStream (from netty).

Modifications:
- Implemented the mark, reset and markSupported methods of BufferInputStream
- Added test cases

Result:
Addresses #1562
  • Loading branch information
bobbyowolabi authored Dec 2, 2022
1 parent 8b414d8 commit 080d6a0
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
final class BufferInputStream extends InputStream {
private final Buffer buffer;

private int mark;

BufferInputStream(Buffer buffer) {
this.buffer = requireNonNull(buffer);
this.mark = buffer.readerIndex();
}

@Override
Expand Down Expand Up @@ -61,4 +64,19 @@ public long skip(long n) {
public int available() {
return buffer.readableBytes();
}

@Override
public boolean markSupported() {
return true;
}

@Override
public void mark(final int readlimit) {
mark = buffer.readerIndex();
}

@Override
public void reset() {
buffer.readerIndex(mark);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Scanner;

import static io.servicetalk.buffer.api.ReadOnlyBufferAllocators.DEFAULT_RO_ALLOCATOR;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

class BufferInputStreamTest {

private static final String DATA = "12345";
private static final int BYTE_COUNT = getBytes(DATA).length;
private final InputStream is = new BufferInputStream(DEFAULT_RO_ALLOCATOR.fromAscii("12345"));

@Test
Expand Down Expand Up @@ -71,6 +75,66 @@ private void testSkip(long n, long skipped, String expected) throws Exception {
assertThat(remaining(is), is(expected));
}

@Test
void markSupported() {
assertThat(is.markSupported(), is(true));
}

@Test
void ignoreResetCalledBeforeMark() {
assertDoesNotThrow(is::reset);
}

@Test
void readAndResetToLastMark() throws IOException {
testReadAndReset(2, 6, 2);
}

@Test
void skipAndResetToLastMark() throws IOException {
testSkipAndReset(2, 6, 2L, getBytes("5"));
}

@Test
void markAndResetToStartOfStream() throws IOException {
testReadAndReset(0, 6, BYTE_COUNT);
}

@Test
void markAndResetToEndOfStream() throws IOException {
testReadAndReset(BYTE_COUNT, 6, 1);
assertThat(is.read(), is(-1));
}

@Test
void ignoreReadlimit() {
assertDoesNotThrow(() -> testReadAndReset(2, 1, 2));
}

@Test
void ignoreSkipExceedsReadlimit() {
assertDoesNotThrow(() -> testSkipAndReset(2, 1, 2, "".getBytes(UTF_8)));
}

@Test
void markClosedStream() throws IOException {
try {
is.close();
} finally {
is.mark(6);
}
}

@Test
void resetClosedStream() throws IOException {
try {
is.mark(6);
is.close();
} finally {
is.reset();
}
}

private static String remaining(InputStream is) {
try (Scanner scanner = new Scanner(is, US_ASCII.name())) {
if (!scanner.hasNext()) {
Expand All @@ -79,4 +143,55 @@ private static String remaining(InputStream is) {
return scanner.next();
}
}

private static byte[] getBytes(final String value) {
return value.getBytes(UTF_8);
}

private static byte[] readBytes(int count, final InputStream is) throws IOException {
final byte[] b = new byte[count];
int totalBytesRead = 0;
while (totalBytesRead < count) {
final int bytesRead = is.read(b, totalBytesRead, count);
if (bytesRead == -1) {
return b;
} else {
totalBytesRead += bytesRead;
}
}
return b;
}

private void testReadAndReset(final int initialReadCount, final int readLimit, final int readCount) throws
IOException {
readBytes(initialReadCount, is);
is.mark(readLimit);
final byte[] firstRead = readBytes(readCount, is);
is.reset();
final byte[] secondRead = readBytes(readCount, is);
assertThat(firstRead, is(secondRead));
}

private void testSkipAndReset(final long initialSkipCount, final int readLimit, final long skipCount,
final byte[] expected) throws IOException {
skipBytes(initialSkipCount, is);
is.mark(readLimit);
skipBytes(skipCount, is);
is.reset();
skipBytes(skipCount, is);
final byte[] actual = readBytes(expected.length, is);
assertThat(actual, is(expected));
}

private static void skipBytes(final long n, final InputStream inputStream) throws IOException {
long totalBytesSkipped = 0;
while (totalBytesSkipped < n) {
final long bytesSkipped = inputStream.skip(n);
if (bytesSkipped == 0) {
return;
} else {
totalBytesSkipped += bytesSkipped;
}
}
}
}

0 comments on commit 080d6a0

Please sign in to comment.