Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream-client): add confirm offset to stream #21

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 86 additions & 5 deletions core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,48 @@

import com.automq.elasticstream.client.DefaultAppendResult;
import com.automq.elasticstream.client.api.AppendResult;
import com.automq.elasticstream.client.api.ElasticStreamClientException;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.flatc.header.ErrorCode;
import kafka.log.es.FutureUtil;
import kafka.log.es.RecordBatchWithContextWrapper;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.streams.StreamManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class S3Stream implements Stream {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Stream.class);
private final StreamMetadata metadata;
private final String logIdent;
private final long streamId;
private final long epoch;
final AtomicLong confirmOffset;
private final AtomicLong nextOffset;
private final Wal wal;
private final S3BlockCache blockCache;
private final StreamManager streamManager;
private final Status status;

public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager) {
this.metadata = metadata;
this.logIdent = "[Stream id=" + metadata.getStreamId() + " epoch" + metadata.getEpoch() + "]";
this.streamId = metadata.getStreamId();
this.epoch = metadata.getEpoch();
this.nextOffset = new AtomicLong(metadata.getRanges().get(metadata.getRanges().size() - 1).getStartOffset());
this.confirmOffset = new AtomicLong(nextOffset.get());
this.status = new Status();
this.wal = wal;
this.blockCache = blockCache;
this.streamManager = streamManager;
Expand All @@ -70,14 +83,42 @@ public long nextOffset() {

@Override
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
long offset = nextOffset.getAndIncrement();
if (!status.isWritable()) {
return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable"));
}
long offset = nextOffset.getAndAdd(recordBatch.count());
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch);
return wal.append(streamRecordBatch).thenApply(nil -> new DefaultAppendResult(offset));
CompletableFuture<AppendResult> cf = wal.append(streamRecordBatch).thenApply(nil -> {
updateConfirmOffset(offset + recordBatch.count());
return new DefaultAppendResult(offset);
});
return cf.whenComplete((rst, ex) -> {
if (ex == null) {
return;
}
// Wal should keep retry append until stream is fenced or wal is closed.
status.markFenced();
if (ex instanceof ElasticStreamClientException && ((ElasticStreamClientException) ex).getCode() == ErrorCode.EXPIRED_STREAM_EPOCH) {
LOGGER.info("{} stream append, stream is fenced", logIdent);
} else {
LOGGER.warn("{} stream append fail", logIdent, ex);
}
});
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
//TODO: bound check
if (status.isClosed()) {
return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed"));
}
long confirmOffset = this.confirmOffset.get();
if (startOffset < metadata.getStartOffset() || endOffset > confirmOffset) {
return FutureUtil.failedFuture(
new ElasticStreamClientException(
ErrorCode.OFFSET_OUT_OF_RANGE_BOUNDS,
String.format("fetch range[%s, %s) is out of stream bound [%s, %s)", startOffset, endOffset, metadata.getStartOffset(), confirmOffset)
));
}
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> {
List<RecordBatchWithContext> records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList());
return new DefaultFetchResult(records);
Expand All @@ -96,16 +137,29 @@ public CompletableFuture<Void> trim(long newStartOffset) {

@Override
public CompletableFuture<Void> close() {
// TODO: add stream status to fence future access.
status.markClosed();
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> destroy() {
// TODO: add stream status to fence future access.
status.markDestroy();
metadata.setStartOffset(this.confirmOffset.get());
return streamManager.deleteStream(streamId, epoch);
}

private void updateConfirmOffset(long newOffset) {
for (; ; ) {
long oldConfirmOffset = confirmOffset.get();
if (oldConfirmOffset <= newOffset) {
break;
}
if (confirmOffset.compareAndSet(oldConfirmOffset, newOffset)) {
break;
}
}
}

static class DefaultFetchResult implements FetchResult {
private final List<RecordBatchWithContext> records;

Expand All @@ -118,4 +172,31 @@ public List<RecordBatchWithContext> recordBatchList() {
return records;
}
}

static class Status {
private static final int CLOSED_MARK = 1;
private static final int FENCED_MARK = 1 << 1;
private static final int DESTROY_MARK = 1 << 2;
private final AtomicInteger status = new AtomicInteger();

public void markFenced() {
status.getAndUpdate(operand -> operand | FENCED_MARK);
}

public void markClosed() {
status.getAndUpdate(operand -> operand | CLOSED_MARK);
}

public void markDestroy() {
status.getAndUpdate(operand -> operand | DESTROY_MARK);
}

public boolean isClosed() {
return (status.get() & CLOSED_MARK) != 0;
}

public boolean isWritable() {
return status.get() == 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package kafka.log.s3;

import kafka.log.s3.exception.StreamFencedException;
import com.automq.elasticstream.client.api.ElasticStreamClientException;
import com.automq.elasticstream.client.flatc.header.ErrorCode;
import kafka.log.s3.objects.CommitWalObjectRequest;
import kafka.log.s3.objects.CommitWalObjectResponse;
import kafka.log.s3.objects.ObjectManager;
Expand Down Expand Up @@ -87,7 +88,7 @@ public void ack() {
for (WalWriteRequest request : requests) {
long streamId = request.record.getStreamId();
if (failedStreamId.contains(streamId)) {
request.cf.completeExceptionally(new StreamFencedException());
request.cf.completeExceptionally(new ElasticStreamClientException(ErrorCode.EXPIRED_STREAM_EPOCH, "Stream " + streamId + " epoch expired"));
} else {
request.cf.complete(null);
}
Expand Down

This file was deleted.

14 changes: 14 additions & 0 deletions core/src/test/java/kafka/log/s3/S3StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.log.s3;

import com.automq.elasticstream.client.api.ElasticStreamClientException;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.RecordBatch;
import kafka.log.s3.cache.ReadDataBlock;
Expand All @@ -31,9 +32,11 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -60,12 +63,23 @@ public void setup() {

@Test
public void testFetch() throws Throwable {
stream.confirmOffset.set(120L);
when(blockCache.read(eq(233L), eq(110L), eq(120L), eq(100)))
.thenReturn(CompletableFuture.completedFuture(newReadDataBlock(110, 115, 110)));
FetchResult rst = stream.fetch(110, 120, 100).get(1, TimeUnit.SECONDS);
assertEquals(1, rst.recordBatchList().size());
assertEquals(110, rst.recordBatchList().get(0).baseOffset());
assertEquals(115, rst.recordBatchList().get(0).lastOffset());

boolean isException = false;
try {
stream.fetch(120, 140, 100).get();
}catch (ExecutionException e) {
if (e.getCause() instanceof ElasticStreamClientException) {
isException = true;
}
}
assertTrue(isException);
}

ReadDataBlock newReadDataBlock(long start, long end, int size) {
Expand Down