Skip to content

Commit

Permalink
feat: to #66; add stream object copyer (#67)
Browse files Browse the repository at this point in the history
* feat: to #66; add stream object copyer

Signed-off-by: Curtis Wan <[email protected]>

* fix: add header; roll back asyncGetBasicObjectInfo0

Signed-off-by: Curtis Wan <[email protected]>

---------

Signed-off-by: Curtis Wan <[email protected]>
  • Loading branch information
mooc9988 authored Sep 7, 2023
1 parent 44721e6 commit 5ff7e8c
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 24 deletions.
101 changes: 82 additions & 19 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,56 +41,76 @@ public class ObjectReader {
private final S3ObjectMetadata metadata;
private final String objectKey;
private final S3Operator s3Operator;
private final CompletableFuture<IndexBlock> indexBlockCf;
private final CompletableFuture<BasicObjectInfo> basicObjectInfoCf;

public ObjectReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
this.metadata = metadata;
this.objectKey = metadata.key();
this.s3Operator = s3Operator;
this.indexBlockCf = new CompletableFuture<>();
asyncGetIndexBlock();
this.basicObjectInfoCf = new CompletableFuture<>();
asyncGetBasicObjectInfo();
}

public CompletableFuture<BasicObjectInfo> basicObjectInfo() {
return basicObjectInfoCf;
}

public CompletableFuture<List<DataBlockIndex>> find(long streamId, long startOffset, long endOffset) {
return indexBlockCf.thenApply(indexBlock -> indexBlock.find(streamId, startOffset, endOffset));
return basicObjectInfoCf.thenApply(basicObjectInfo -> basicObjectInfo.indexBlock().find(streamId, startOffset, endOffset));
}

public CompletableFuture<DataBlock> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition());
return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount()));
}

private void asyncGetIndexBlock() {
asyncGetIndexBlock0(Math.max(0, metadata.getObjectSize() - 1024 * 1024));
private void asyncGetBasicObjectInfo() {
asyncGetBasicObjectInfo0(Math.max(0, metadata.getObjectSize() - 1024 * 1024));
}

private void asyncGetIndexBlock0(long startPosition) {
private void asyncGetBasicObjectInfo0(long startPosition) {
CompletableFuture<ByteBuf> cf = s3Operator.rangeRead(objectKey, startPosition, metadata.getObjectSize());
cf.thenAccept(buf -> {
try {
IndexBlock indexBlock = IndexBlock.parse(buf, metadata.getObjectSize());
indexBlockCf.complete(indexBlock);
BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.getObjectSize());
basicObjectInfoCf.complete(basicObjectInfo);
} catch (IndexBlockParseException ex) {
asyncGetIndexBlock0(ex.indexBlockPosition);
asyncGetBasicObjectInfo0(ex.indexBlockPosition);
}
}).exceptionally(ex -> {
LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.getObjectSize(), ex);
// TODO: delay retry.
asyncGetIndexBlock0(startPosition);
asyncGetBasicObjectInfo0(startPosition);
return null;
});
}

static class IndexBlock {
private final ByteBuf blocks;
private final ByteBuf streamRanges;
static class BasicObjectInfo {
/**
* The total size of the data blocks, which equals to index start position.
*/
private final long dataBlockSize;
/**
* raw index data.
*/
private final IndexBlock indexBlock;
/**
* The number of data blocks in the object.
*/
private final int blockCount;
/**
* The size of the index blocks.
*/
private final int indexBlockSize;

public IndexBlock(ByteBuf blocks, ByteBuf streamRanges) {
this.blocks = blocks;
this.streamRanges = streamRanges;
public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) {
this.dataBlockSize = dataBlockSize;
this.indexBlock = indexBlock;
this.blockCount = blockCount;
this.indexBlockSize = indexBlockSize;
}

public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException {
public static BasicObjectInfo parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException {
long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - 48);
int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) {
Expand All @@ -102,10 +122,44 @@ public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws In
ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16);
indexBlockBuf.skipBytes(blockCount * 16);
ByteBuf streamRanges = indexBlockBuf.slice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes());
return new IndexBlock(blocks, streamRanges);
return new BasicObjectInfo(indexBlockPosition, new IndexBlock(blocks, streamRanges), blockCount, indexBlockSize);
}
}

public long dataBlockSize() {
return dataBlockSize;
}

public IndexBlock indexBlock() {
return indexBlock;
}

public int blockCount() {
return blockCount;
}

public int indexBlockSize() {
return indexBlockSize;
}
}

static class IndexBlock {
private final ByteBuf blocks;
private final ByteBuf streamRanges;

public IndexBlock(ByteBuf blocks, ByteBuf streamRanges) {
this.blocks = blocks;
this.streamRanges = streamRanges;
}

public ByteBuf blocks() {
return blocks.slice();
}

public ByteBuf streamRanges() {
return streamRanges.slice();
}

public List<DataBlockIndex> find(long streamId, long startOffset, long endOffset) {
// TODO: binary search
long nextStartOffset = startOffset;
Expand Down Expand Up @@ -146,6 +200,15 @@ public IndexBlockParseException(long indexBlockPosition) {

}

static class BasicObjectInfoParseException extends Exception {
long indexBlockPosition;

public BasicObjectInfoParseException(long indexBlockPosition) {
this.indexBlockPosition = indexBlockPosition;
}

}


public static class DataBlockIndex {
public static final int BLOCK_INDEX_SIZE = 8 + 4 + 4;
Expand Down
21 changes: 16 additions & 5 deletions core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ObjectWriter {

public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectId = objectId;
// TODO: use a better clusterName
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
this.blockSizeThreshold = blockSizeThreshold;
this.partSizeThreshold = partSizeThreshold;
Expand Down Expand Up @@ -109,7 +110,7 @@ public CompletableFuture<Void> close() {
waitingUploadBlocks.clear();
indexBlock = new IndexBlock();
buf.addComponent(true, indexBlock.buffer());
Footer footer = new Footer();
Footer footer = new Footer(indexBlock.position(), indexBlock.size());
buf.addComponent(true, footer.buffer());
writer.write(buf.duplicate());
size = indexBlock.position() + indexBlock.size() + footer.size();
Expand Down Expand Up @@ -237,17 +238,24 @@ public IndexBlock() {
buf.writeInt(completedBlocks.size()); // block count
// block index
for (DataBlock block : completedBlocks) {
// start position in the object
buf.writeLong(block.position());
// byte size of the block
buf.writeInt(block.size());
// how many ranges in the block
buf.writeInt(block.recordCount());
}
// object stream range
for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) {
DataBlock block = completedBlocks.get(blockIndex);
for (ObjectStreamRange range : block.getStreamRanges()) {
// stream id of this range
buf.writeLong(range.getStreamId());
// start offset of the related stream
buf.writeLong(range.getStartOffset());
// record count of the related stream in this range
buf.writeInt((int) (range.getEndOffset() - range.getStartOffset()));
// the index of block where this range is in
buf.writeInt(blockIndex);
}
}
Expand All @@ -266,15 +274,18 @@ public int size() {
}
}

class Footer {
static class Footer {
private static final int FOOTER_SIZE = 48;
private static final long MAGIC = 0x88e241b785f4cff7L;
private final ByteBuf buf;

public Footer() {
public Footer(long indexStartPosition, int indexBlockLength) {
buf = Unpooled.buffer(FOOTER_SIZE);
buf.writeLong(indexBlock.position());
buf.writeInt(indexBlock.size());
// start position of index block
buf.writeLong(indexStartPosition);
// size of index block
buf.writeInt(indexBlockLength);
// reserved for future
buf.writeZero(40 - 8 - 4);
buf.writeLong(MAGIC);
}
Expand Down
163 changes: 163 additions & 0 deletions core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.operator.Writer;
import org.apache.kafka.metadata.stream.ObjectUtils;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.apache.kafka.metadata.stream.S3ObjectType;

public class StreamObjectCopyer {
private final List<StreamObjectIndexData> completedObjects;
private final S3Operator s3Operator;
private final Writer writer;
private final long objectId;
private long nextObjectDataStartPosition;
private int blockCount;

private long size;

public StreamObjectCopyer(long objectId, S3Operator s3Operator) {
// TODO: use a better clusterName
this(objectId, s3Operator, s3Operator.writer(ObjectUtils.genKey(0, "todocluster", objectId)));
}

public StreamObjectCopyer(long objectId, S3Operator s3Operator, Writer writer) {
this.objectId = objectId;
this.s3Operator = s3Operator;
this.writer = writer;
this.completedObjects = new LinkedList<>();
this.nextObjectDataStartPosition = 0;
this.blockCount = 0;
this.size = 0;
}

public void write(S3ObjectMetadata metadata) {
if (metadata.getType() != S3ObjectType.STREAM) {
throw new IllegalArgumentException("Only stream object can be handled.");
}
ObjectReader reader = new ObjectReader(metadata, s3Operator);
ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().join();
// Only copy data blocks for now.
writer.copyWrite(metadata.key(), 0, basicObjectInfo.dataBlockSize());
completedObjects.add(new StreamObjectIndexData(basicObjectInfo.indexBlock(), basicObjectInfo.blockCount(), nextObjectDataStartPosition, blockCount, basicObjectInfo.indexBlockSize()));
blockCount += basicObjectInfo.blockCount();
nextObjectDataStartPosition += basicObjectInfo.dataBlockSize();
size += basicObjectInfo.dataBlockSize();
}

public CompletableFuture<Void> close() {
CompositeByteBuf buf = Unpooled.compositeBuffer();
IndexBlock indexBlock = new IndexBlock();
buf.addComponent(true, indexBlock.buffer());
ObjectWriter.Footer footer = new ObjectWriter.Footer(indexBlock.position(), indexBlock.size());
buf.addComponent(true, footer.buffer());
writer.write(buf.duplicate());
size += indexBlock.size() + footer.size();
return writer.close();
}

public long size() {
return size;
}

private class IndexBlock {
private final CompositeByteBuf buf;
private final long position;

public IndexBlock() {
position = nextObjectDataStartPosition;
buf = Unpooled.compositeBuffer();
// block count
buf.addComponent(true, Unpooled.buffer(4).writeInt(blockCount));
// block index
for (StreamObjectIndexData indexData : completedObjects) {
buf.addComponent(true, indexData.blockBuf());
}
// object stream range
for (StreamObjectIndexData indexData : completedObjects) {
buf.addComponent(true, indexData.rangesBuf());
}
}

public ByteBuf buffer() {
return buf.duplicate();
}

public long position() {
return position;
}

public int size() {
return buf.readableBytes();
}
}

static class StreamObjectIndexData {
private final ByteBuf blockBuf;
private final ByteBuf rangesBuf;
/**
* how many data blocks in this object.
*/
private final int dataBlockCount;
/**
* The total length of the block index.
*/
private final int blockIndexTotalLength;

public StreamObjectIndexData(ObjectReader.IndexBlock indexBlock, int dataBlockCount, long blockStartPosition, int blockStartId, int blockIndexTotalLength) {
this.dataBlockCount = dataBlockCount;
this.blockIndexTotalLength = blockIndexTotalLength;
this.blockBuf = indexBlock.blocks().copy();
this.rangesBuf = indexBlock.streamRanges().copy();

int blockPositionIndex = 0;
while (blockPositionIndex < blockBuf.readableBytes()) {
// The value is now the relative block position.
long blockPosition = blockBuf.getLong(blockPositionIndex);
// update block position with start position.
blockBuf.setLong(blockPositionIndex, blockPosition + blockStartPosition);
blockPositionIndex += 8 + 4 + 4;
}

int startBlockIdIndex = 8 + 8 + 4;
while (startBlockIdIndex < rangesBuf.readableBytes()) {
// The value is now the relative block id.
int blockId = rangesBuf.getInt(startBlockIdIndex);
// update block id with start block id.
rangesBuf.setInt(startBlockIdIndex, blockId + blockStartId);
startBlockIdIndex += 8 + 8 + 4 + 4;
}
}

public ByteBuf blockBuf() {
return blockBuf.duplicate();
}

public ByteBuf rangesBuf() {
return rangesBuf.duplicate();
}
}
}
Loading

0 comments on commit 5ff7e8c

Please sign in to comment.