Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: to #66; add stream object copyer #67

Merged
merged 2 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 82 additions & 19 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,56 +41,76 @@ public class ObjectReader {
private final S3ObjectMetadata metadata;
private final String objectKey;
private final S3Operator s3Operator;
private final CompletableFuture<IndexBlock> indexBlockCf;
private final CompletableFuture<BasicObjectInfo> basicObjectInfoCf;

public ObjectReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
this.metadata = metadata;
this.objectKey = metadata.key();
this.s3Operator = s3Operator;
this.indexBlockCf = new CompletableFuture<>();
asyncGetIndexBlock();
this.basicObjectInfoCf = new CompletableFuture<>();
asyncGetBasicObjectInfo();
}

public CompletableFuture<BasicObjectInfo> basicObjectInfo() {
return basicObjectInfoCf;
}

public CompletableFuture<List<DataBlockIndex>> find(long streamId, long startOffset, long endOffset) {
return indexBlockCf.thenApply(indexBlock -> indexBlock.find(streamId, startOffset, endOffset));
return basicObjectInfoCf.thenApply(basicObjectInfo -> basicObjectInfo.indexBlock().find(streamId, startOffset, endOffset));
}

public CompletableFuture<DataBlock> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition());
return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount()));
}

private void asyncGetIndexBlock() {
asyncGetIndexBlock0(Math.max(0, metadata.getObjectSize() - 1024 * 1024));
private void asyncGetBasicObjectInfo() {
asyncGetBasicObjectInfo0(Math.max(0, metadata.getObjectSize() - 1024 * 1024));
}

private void asyncGetIndexBlock0(long startPosition) {
private void asyncGetBasicObjectInfo0(long startPosition) {
CompletableFuture<ByteBuf> cf = s3Operator.rangeRead(objectKey, startPosition, metadata.getObjectSize());
cf.thenAccept(buf -> {
try {
IndexBlock indexBlock = IndexBlock.parse(buf, metadata.getObjectSize());
indexBlockCf.complete(indexBlock);
BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.getObjectSize());
basicObjectInfoCf.complete(basicObjectInfo);
} catch (IndexBlockParseException ex) {
asyncGetIndexBlock0(ex.indexBlockPosition);
asyncGetBasicObjectInfo0(ex.indexBlockPosition);
}
}).exceptionally(ex -> {
LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.getObjectSize(), ex);
// TODO: delay retry.
asyncGetIndexBlock0(startPosition);
asyncGetBasicObjectInfo0(startPosition);
return null;
});
}

static class IndexBlock {
private final ByteBuf blocks;
private final ByteBuf streamRanges;
static class BasicObjectInfo {
/**
* The total size of the data blocks, which equals to index start position.
*/
private final long dataBlockSize;
/**
* raw index data.
*/
private final IndexBlock indexBlock;
/**
* The number of data blocks in the object.
*/
private final int blockCount;
/**
* The size of the index blocks.
*/
private final int indexBlockSize;

public IndexBlock(ByteBuf blocks, ByteBuf streamRanges) {
this.blocks = blocks;
this.streamRanges = streamRanges;
public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) {
this.dataBlockSize = dataBlockSize;
this.indexBlock = indexBlock;
this.blockCount = blockCount;
this.indexBlockSize = indexBlockSize;
}

public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException {
public static BasicObjectInfo parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException {
long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - 48);
int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) {
Expand All @@ -102,10 +122,44 @@ public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws In
ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16);
indexBlockBuf.skipBytes(blockCount * 16);
ByteBuf streamRanges = indexBlockBuf.slice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes());
return new IndexBlock(blocks, streamRanges);
return new BasicObjectInfo(indexBlockPosition, new IndexBlock(blocks, streamRanges), blockCount, indexBlockSize);
}
}

public long dataBlockSize() {
return dataBlockSize;
}

public IndexBlock indexBlock() {
return indexBlock;
}

public int blockCount() {
return blockCount;
}

public int indexBlockSize() {
return indexBlockSize;
}
}

static class IndexBlock {
private final ByteBuf blocks;
private final ByteBuf streamRanges;

public IndexBlock(ByteBuf blocks, ByteBuf streamRanges) {
this.blocks = blocks;
this.streamRanges = streamRanges;
}

public ByteBuf blocks() {
return blocks.slice();
}

public ByteBuf streamRanges() {
return streamRanges.slice();
}

public List<DataBlockIndex> find(long streamId, long startOffset, long endOffset) {
// TODO: binary search
long nextStartOffset = startOffset;
Expand Down Expand Up @@ -146,6 +200,15 @@ public IndexBlockParseException(long indexBlockPosition) {

}

static class BasicObjectInfoParseException extends Exception {
long indexBlockPosition;

public BasicObjectInfoParseException(long indexBlockPosition) {
this.indexBlockPosition = indexBlockPosition;
}

}


public static class DataBlockIndex {
public static final int BLOCK_INDEX_SIZE = 8 + 4 + 4;
Expand Down
21 changes: 16 additions & 5 deletions core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ObjectWriter {

public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectId = objectId;
// TODO: use a better clusterName
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
this.blockSizeThreshold = blockSizeThreshold;
this.partSizeThreshold = partSizeThreshold;
Expand Down Expand Up @@ -109,7 +110,7 @@ public CompletableFuture<Void> close() {
waitingUploadBlocks.clear();
indexBlock = new IndexBlock();
buf.addComponent(true, indexBlock.buffer());
Footer footer = new Footer();
Footer footer = new Footer(indexBlock.position(), indexBlock.size());
buf.addComponent(true, footer.buffer());
writer.write(buf.duplicate());
size = indexBlock.position() + indexBlock.size() + footer.size();
Expand Down Expand Up @@ -237,17 +238,24 @@ public IndexBlock() {
buf.writeInt(completedBlocks.size()); // block count
// block index
for (DataBlock block : completedBlocks) {
// start position in the object
buf.writeLong(block.position());
// byte size of the block
buf.writeInt(block.size());
// how many ranges in the block
buf.writeInt(block.recordCount());
}
// object stream range
for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) {
DataBlock block = completedBlocks.get(blockIndex);
for (ObjectStreamRange range : block.getStreamRanges()) {
// stream id of this range
buf.writeLong(range.getStreamId());
// start offset of the related stream
buf.writeLong(range.getStartOffset());
// record count of the related stream in this range
buf.writeInt((int) (range.getEndOffset() - range.getStartOffset()));
// the index of block where this range is in
buf.writeInt(blockIndex);
}
}
Expand All @@ -266,15 +274,18 @@ public int size() {
}
}

class Footer {
static class Footer {
private static final int FOOTER_SIZE = 48;
private static final long MAGIC = 0x88e241b785f4cff7L;
private final ByteBuf buf;

public Footer() {
public Footer(long indexStartPosition, int indexBlockLength) {
buf = Unpooled.buffer(FOOTER_SIZE);
buf.writeLong(indexBlock.position());
buf.writeInt(indexBlock.size());
// start position of index block
buf.writeLong(indexStartPosition);
// size of index block
buf.writeInt(indexBlockLength);
// reserved for future
buf.writeZero(40 - 8 - 4);
buf.writeLong(MAGIC);
}
Expand Down
163 changes: 163 additions & 0 deletions core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.operator.Writer;
import org.apache.kafka.metadata.stream.ObjectUtils;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.apache.kafka.metadata.stream.S3ObjectType;

public class StreamObjectCopyer {
private final List<StreamObjectIndexData> completedObjects;
private final S3Operator s3Operator;
private final Writer writer;
private final long objectId;
private long nextObjectDataStartPosition;
private int blockCount;

private long size;

public StreamObjectCopyer(long objectId, S3Operator s3Operator) {
// TODO: use a better clusterName
this(objectId, s3Operator, s3Operator.writer(ObjectUtils.genKey(0, "todocluster", objectId)));
}

public StreamObjectCopyer(long objectId, S3Operator s3Operator, Writer writer) {
this.objectId = objectId;
this.s3Operator = s3Operator;
this.writer = writer;
this.completedObjects = new LinkedList<>();
this.nextObjectDataStartPosition = 0;
this.blockCount = 0;
this.size = 0;
}

public void write(S3ObjectMetadata metadata) {
if (metadata.getType() != S3ObjectType.STREAM) {
throw new IllegalArgumentException("Only stream object can be handled.");
}
ObjectReader reader = new ObjectReader(metadata, s3Operator);
ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().join();
// Only copy data blocks for now.
writer.copyWrite(metadata.key(), 0, basicObjectInfo.dataBlockSize());
completedObjects.add(new StreamObjectIndexData(basicObjectInfo.indexBlock(), basicObjectInfo.blockCount(), nextObjectDataStartPosition, blockCount, basicObjectInfo.indexBlockSize()));
blockCount += basicObjectInfo.blockCount();
nextObjectDataStartPosition += basicObjectInfo.dataBlockSize();
size += basicObjectInfo.dataBlockSize();
}

public CompletableFuture<Void> close() {
CompositeByteBuf buf = Unpooled.compositeBuffer();
IndexBlock indexBlock = new IndexBlock();
buf.addComponent(true, indexBlock.buffer());
ObjectWriter.Footer footer = new ObjectWriter.Footer(indexBlock.position(), indexBlock.size());
buf.addComponent(true, footer.buffer());
writer.write(buf.duplicate());
size += indexBlock.size() + footer.size();
return writer.close();
}

public long size() {
return size;
}

private class IndexBlock {
private final CompositeByteBuf buf;
private final long position;

public IndexBlock() {
position = nextObjectDataStartPosition;
buf = Unpooled.compositeBuffer();
// block count
buf.addComponent(true, Unpooled.buffer(4).writeInt(blockCount));
// block index
for (StreamObjectIndexData indexData : completedObjects) {
buf.addComponent(true, indexData.blockBuf());
}
// object stream range
for (StreamObjectIndexData indexData : completedObjects) {
buf.addComponent(true, indexData.rangesBuf());
}
}

public ByteBuf buffer() {
return buf.duplicate();
}

public long position() {
return position;
}

public int size() {
return buf.readableBytes();
}
}

static class StreamObjectIndexData {
private final ByteBuf blockBuf;
private final ByteBuf rangesBuf;
/**
* how many data blocks in this object.
*/
private final int dataBlockCount;
/**
* The total length of the block index.
*/
private final int blockIndexTotalLength;

public StreamObjectIndexData(ObjectReader.IndexBlock indexBlock, int dataBlockCount, long blockStartPosition, int blockStartId, int blockIndexTotalLength) {
this.dataBlockCount = dataBlockCount;
this.blockIndexTotalLength = blockIndexTotalLength;
this.blockBuf = indexBlock.blocks().copy();
this.rangesBuf = indexBlock.streamRanges().copy();

int blockPositionIndex = 0;
while (blockPositionIndex < blockBuf.readableBytes()) {
// The value is now the relative block position.
long blockPosition = blockBuf.getLong(blockPositionIndex);
// update block position with start position.
blockBuf.setLong(blockPositionIndex, blockPosition + blockStartPosition);
blockPositionIndex += 8 + 4 + 4;
}

int startBlockIdIndex = 8 + 8 + 4;
while (startBlockIdIndex < rangesBuf.readableBytes()) {
// The value is now the relative block id.
int blockId = rangesBuf.getInt(startBlockIdIndex);
// update block id with start block id.
rangesBuf.setInt(startBlockIdIndex, blockId + blockStartId);
startBlockIdIndex += 8 + 8 + 4 + 4;
}
}

public ByteBuf blockBuf() {
return blockBuf.duplicate();
}

public ByteBuf rangesBuf() {
return rangesBuf.duplicate();
}
}
}
Loading