Skip to content

Commit

Permalink
feat(store): Mock StoreMetadataService and StreamStore (#41)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Mar 14, 2024
1 parent 6f87276 commit 17a8e0f
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
*/
public class MemoryStreamClient implements StreamClient {
private final AtomicLong streamIdAlloc = new AtomicLong();

@Override
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
return CompletableFuture.completedFuture(new MemoryStream(streamIdAlloc.getAndIncrement()));
Expand All @@ -55,6 +56,7 @@ public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions opt
public void shutdown() {

}

static class MemoryStream implements Stream {
private final AtomicLong nextOffsetAlloc = new AtomicLong();
private NavigableMap<Long, RecordBatchWithContext> recordMap = new ConcurrentSkipListMap<>();
Expand Down Expand Up @@ -83,7 +85,8 @@ public long nextOffset() {
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
long baseOffset = nextOffsetAlloc.getAndAdd(recordBatch.count());
recordMap.put(baseOffset, new RecordBatchWithContextWrapper(recordBatch, baseOffset));
return CompletableFuture.completedFuture(() -> baseOffset); }
return CompletableFuture.completedFuture(() -> baseOffset);
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
Expand Down Expand Up @@ -129,7 +132,7 @@ public long baseOffset() {

@Override
public long lastOffset() {
return baseOffset + recordBatch.count();
return baseOffset + recordBatch.count() - 1;
}

@Override
Expand Down

0 comments on commit 17a8e0f

Please sign in to comment.