diff --git a/core/src/main/scala/kafka/log/s3/ObjectReader.java b/core/src/main/scala/kafka/log/s3/ObjectReader.java index 860d7db1e7..90e400dfdb 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectReader.java +++ b/core/src/main/scala/kafka/log/s3/ObjectReader.java @@ -41,18 +41,22 @@ public class ObjectReader { private final S3ObjectMetadata metadata; private final String objectKey; private final S3Operator s3Operator; - private final CompletableFuture indexBlockCf; + private final CompletableFuture basicObjectInfoCf; public ObjectReader(S3ObjectMetadata metadata, S3Operator s3Operator) { this.metadata = metadata; this.objectKey = metadata.key(); this.s3Operator = s3Operator; - this.indexBlockCf = new CompletableFuture<>(); - asyncGetIndexBlock(); + this.basicObjectInfoCf = new CompletableFuture<>(); + asyncGetBasicObjectInfo(); + } + + public CompletableFuture basicObjectInfo() { + return basicObjectInfoCf; } public CompletableFuture> find(long streamId, long startOffset, long endOffset) { - return indexBlockCf.thenApply(indexBlock -> indexBlock.find(streamId, startOffset, endOffset)); + return basicObjectInfoCf.thenApply(basicObjectInfo -> basicObjectInfo.indexBlock().find(streamId, startOffset, endOffset)); } public CompletableFuture read(DataBlockIndex block) { @@ -60,37 +64,53 @@ public CompletableFuture read(DataBlockIndex block) { return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount())); } - private void asyncGetIndexBlock() { - asyncGetIndexBlock0(Math.max(0, metadata.getObjectSize() - 1024 * 1024)); + private void asyncGetBasicObjectInfo() { + asyncGetBasicObjectInfo0(Math.max(0, metadata.getObjectSize() - 1024 * 1024)); } - private void asyncGetIndexBlock0(long startPosition) { + private void asyncGetBasicObjectInfo0(long startPosition) { CompletableFuture cf = s3Operator.rangeRead(objectKey, startPosition, metadata.getObjectSize()); cf.thenAccept(buf -> { try { - IndexBlock indexBlock = IndexBlock.parse(buf, metadata.getObjectSize()); - indexBlockCf.complete(indexBlock); + BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.getObjectSize()); + basicObjectInfoCf.complete(basicObjectInfo); } catch (IndexBlockParseException ex) { - asyncGetIndexBlock0(ex.indexBlockPosition); + asyncGetBasicObjectInfo0(ex.indexBlockPosition); } }).exceptionally(ex -> { LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.getObjectSize(), ex); // TODO: delay retry. - asyncGetIndexBlock0(startPosition); + asyncGetBasicObjectInfo0(startPosition); return null; }); } - static class IndexBlock { - private final ByteBuf blocks; - private final ByteBuf streamRanges; + static class BasicObjectInfo { + /** + * The total size of the data blocks, which equals to index start position. + */ + private final long dataBlockSize; + /** + * raw index data. + */ + private final IndexBlock indexBlock; + /** + * The number of data blocks in the object. + */ + private final int blockCount; + /** + * The size of the index blocks. + */ + private final int indexBlockSize; - public IndexBlock(ByteBuf blocks, ByteBuf streamRanges) { - this.blocks = blocks; - this.streamRanges = streamRanges; + public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) { + this.dataBlockSize = dataBlockSize; + this.indexBlock = indexBlock; + this.blockCount = blockCount; + this.indexBlockSize = indexBlockSize; } - public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException { + public static BasicObjectInfo parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException { long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - 48); int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40); if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) { @@ -102,10 +122,44 @@ public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws In ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16); indexBlockBuf.skipBytes(blockCount * 16); ByteBuf streamRanges = indexBlockBuf.slice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes()); - return new IndexBlock(blocks, streamRanges); + return new BasicObjectInfo(indexBlockPosition, new IndexBlock(blocks, streamRanges), blockCount, indexBlockSize); } } + public long dataBlockSize() { + return dataBlockSize; + } + + public IndexBlock indexBlock() { + return indexBlock; + } + + public int blockCount() { + return blockCount; + } + + public int indexBlockSize() { + return indexBlockSize; + } + } + + static class IndexBlock { + private final ByteBuf blocks; + private final ByteBuf streamRanges; + + public IndexBlock(ByteBuf blocks, ByteBuf streamRanges) { + this.blocks = blocks; + this.streamRanges = streamRanges; + } + + public ByteBuf blocks() { + return blocks.slice(); + } + + public ByteBuf streamRanges() { + return streamRanges.slice(); + } + public List find(long streamId, long startOffset, long endOffset) { // TODO: binary search long nextStartOffset = startOffset; @@ -146,6 +200,15 @@ public IndexBlockParseException(long indexBlockPosition) { } + static class BasicObjectInfoParseException extends Exception { + long indexBlockPosition; + + public BasicObjectInfoParseException(long indexBlockPosition) { + this.indexBlockPosition = indexBlockPosition; + } + + } + public static class DataBlockIndex { public static final int BLOCK_INDEX_SIZE = 8 + 4 + 4; diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java index 78dde0287a..b978502d0c 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectWriter.java +++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java @@ -51,6 +51,7 @@ public class ObjectWriter { public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) { this.objectId = objectId; + // TODO: use a better clusterName String objectKey = ObjectUtils.genKey(0, "todocluster", objectId); this.blockSizeThreshold = blockSizeThreshold; this.partSizeThreshold = partSizeThreshold; @@ -109,7 +110,7 @@ public CompletableFuture close() { waitingUploadBlocks.clear(); indexBlock = new IndexBlock(); buf.addComponent(true, indexBlock.buffer()); - Footer footer = new Footer(); + Footer footer = new Footer(indexBlock.position(), indexBlock.size()); buf.addComponent(true, footer.buffer()); writer.write(buf.duplicate()); size = indexBlock.position() + indexBlock.size() + footer.size(); @@ -237,17 +238,24 @@ public IndexBlock() { buf.writeInt(completedBlocks.size()); // block count // block index for (DataBlock block : completedBlocks) { + // start position in the object buf.writeLong(block.position()); + // byte size of the block buf.writeInt(block.size()); + // how many ranges in the block buf.writeInt(block.recordCount()); } // object stream range for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) { DataBlock block = completedBlocks.get(blockIndex); for (ObjectStreamRange range : block.getStreamRanges()) { + // stream id of this range buf.writeLong(range.getStreamId()); + // start offset of the related stream buf.writeLong(range.getStartOffset()); + // record count of the related stream in this range buf.writeInt((int) (range.getEndOffset() - range.getStartOffset())); + // the index of block where this range is in buf.writeInt(blockIndex); } } @@ -266,15 +274,18 @@ public int size() { } } - class Footer { + static class Footer { private static final int FOOTER_SIZE = 48; private static final long MAGIC = 0x88e241b785f4cff7L; private final ByteBuf buf; - public Footer() { + public Footer(long indexStartPosition, int indexBlockLength) { buf = Unpooled.buffer(FOOTER_SIZE); - buf.writeLong(indexBlock.position()); - buf.writeInt(indexBlock.size()); + // start position of index block + buf.writeLong(indexStartPosition); + // size of index block + buf.writeInt(indexBlockLength); + // reserved for future buf.writeZero(40 - 8 - 4); buf.writeLong(MAGIC); } diff --git a/core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java b/core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java new file mode 100644 index 0000000000..e4a423ecc4 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import kafka.log.s3.operator.S3Operator; +import kafka.log.s3.operator.Writer; +import org.apache.kafka.metadata.stream.ObjectUtils; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; + +public class StreamObjectCopyer { + private final List completedObjects; + private final S3Operator s3Operator; + private final Writer writer; + private final long objectId; + private long nextObjectDataStartPosition; + private int blockCount; + + private long size; + + public StreamObjectCopyer(long objectId, S3Operator s3Operator) { + // TODO: use a better clusterName + this(objectId, s3Operator, s3Operator.writer(ObjectUtils.genKey(0, "todocluster", objectId))); + } + + public StreamObjectCopyer(long objectId, S3Operator s3Operator, Writer writer) { + this.objectId = objectId; + this.s3Operator = s3Operator; + this.writer = writer; + this.completedObjects = new LinkedList<>(); + this.nextObjectDataStartPosition = 0; + this.blockCount = 0; + this.size = 0; + } + + public void write(S3ObjectMetadata metadata) { + if (metadata.getType() != S3ObjectType.STREAM) { + throw new IllegalArgumentException("Only stream object can be handled."); + } + ObjectReader reader = new ObjectReader(metadata, s3Operator); + ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().join(); + // Only copy data blocks for now. + writer.copyWrite(metadata.key(), 0, basicObjectInfo.dataBlockSize()); + completedObjects.add(new StreamObjectIndexData(basicObjectInfo.indexBlock(), basicObjectInfo.blockCount(), nextObjectDataStartPosition, blockCount, basicObjectInfo.indexBlockSize())); + blockCount += basicObjectInfo.blockCount(); + nextObjectDataStartPosition += basicObjectInfo.dataBlockSize(); + size += basicObjectInfo.dataBlockSize(); + } + + public CompletableFuture close() { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + IndexBlock indexBlock = new IndexBlock(); + buf.addComponent(true, indexBlock.buffer()); + ObjectWriter.Footer footer = new ObjectWriter.Footer(indexBlock.position(), indexBlock.size()); + buf.addComponent(true, footer.buffer()); + writer.write(buf.duplicate()); + size += indexBlock.size() + footer.size(); + return writer.close(); + } + + public long size() { + return size; + } + + private class IndexBlock { + private final CompositeByteBuf buf; + private final long position; + + public IndexBlock() { + position = nextObjectDataStartPosition; + buf = Unpooled.compositeBuffer(); + // block count + buf.addComponent(true, Unpooled.buffer(4).writeInt(blockCount)); + // block index + for (StreamObjectIndexData indexData : completedObjects) { + buf.addComponent(true, indexData.blockBuf()); + } + // object stream range + for (StreamObjectIndexData indexData : completedObjects) { + buf.addComponent(true, indexData.rangesBuf()); + } + } + + public ByteBuf buffer() { + return buf.duplicate(); + } + + public long position() { + return position; + } + + public int size() { + return buf.readableBytes(); + } + } + + static class StreamObjectIndexData { + private final ByteBuf blockBuf; + private final ByteBuf rangesBuf; + /** + * how many data blocks in this object. + */ + private final int dataBlockCount; + /** + * The total length of the block index. + */ + private final int blockIndexTotalLength; + + public StreamObjectIndexData(ObjectReader.IndexBlock indexBlock, int dataBlockCount, long blockStartPosition, int blockStartId, int blockIndexTotalLength) { + this.dataBlockCount = dataBlockCount; + this.blockIndexTotalLength = blockIndexTotalLength; + this.blockBuf = indexBlock.blocks().copy(); + this.rangesBuf = indexBlock.streamRanges().copy(); + + int blockPositionIndex = 0; + while (blockPositionIndex < blockBuf.readableBytes()) { + // The value is now the relative block position. + long blockPosition = blockBuf.getLong(blockPositionIndex); + // update block position with start position. + blockBuf.setLong(blockPositionIndex, blockPosition + blockStartPosition); + blockPositionIndex += 8 + 4 + 4; + } + + int startBlockIdIndex = 8 + 8 + 4; + while (startBlockIdIndex < rangesBuf.readableBytes()) { + // The value is now the relative block id. + int blockId = rangesBuf.getInt(startBlockIdIndex); + // update block id with start block id. + rangesBuf.setInt(startBlockIdIndex, blockId + blockStartId); + startBlockIdIndex += 8 + 8 + 4 + 4; + } + } + + public ByteBuf blockBuf() { + return blockBuf.duplicate(); + } + + public ByteBuf rangesBuf() { + return rangesBuf.duplicate(); + } + } +} diff --git a/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java b/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java new file mode 100644 index 0000000000..6271610f62 --- /dev/null +++ b/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.operator.MemoryS3Operator; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.ObjectUtils; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@Tag("S3Unit") +public class StreamObjectCopyerTest { + @Test + public void testCompact() throws ExecutionException, InterruptedException { + long targetObjectId = 10; + long streamId = 233; + + S3Operator s3Operator = new MemoryS3Operator(); + + ObjectWriter objectWriter1 = new ObjectWriter(1, s3Operator, 1024, 1024); + StreamRecordBatch r1 = newRecord(streamId, 10, 5, 512); + objectWriter1.write(r1); + StreamRecordBatch r2 = newRecord(streamId, 15, 10, 512); + objectWriter1.write(r2); + objectWriter1.close().get(); + + ObjectWriter objectWriter2 = new ObjectWriter(2, s3Operator, 1024, 1024); + StreamRecordBatch r3 = newRecord(streamId, 25, 8, 512); + objectWriter2.write(r3); + StreamRecordBatch r4 = newRecord(streamId, 33, 6, 512); + objectWriter2.write(r4); + objectWriter2.close().get(); + + S3ObjectMetadata metadata1 = new S3ObjectMetadata(1, objectWriter1.size(), S3ObjectType.STREAM); + S3ObjectMetadata metadata2 = new S3ObjectMetadata(2, objectWriter2.size(), S3ObjectType.STREAM); + + StreamObjectCopyer streamObjectCopyer = new StreamObjectCopyer(targetObjectId, + s3Operator, + // TODO: use a better clusterName + s3Operator.writer(ObjectUtils.genKey(0, "todocluster", targetObjectId)) + ); + streamObjectCopyer.write(metadata1); + streamObjectCopyer.write(metadata2); + streamObjectCopyer.close().get(); + long targetObjectSize = streamObjectCopyer.size(); + S3ObjectMetadata metadata = new S3ObjectMetadata(targetObjectId, targetObjectSize, S3ObjectType.STREAM); + + + int objectSize = s3Operator.rangeRead(metadata.key(), 0L, targetObjectSize).get().readableBytes(); + assertEquals(targetObjectSize, objectSize); + + ObjectReader objectReader = new ObjectReader(metadata, s3Operator); + List blockIndexes = objectReader.find(streamId, 10, 40).get(); + assertEquals(2, blockIndexes.size()); + { + Iterator it = objectReader.read(blockIndexes.get(0)).get().iterator(); + StreamRecordBatch r = it.next(); + assertEquals(streamId, r.getStreamId()); + assertEquals(10L, r.getBaseOffset()); + assertEquals(5L, r.getRecordBatch().count()); + assertEquals(r1.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + r = it.next(); + assertEquals(streamId, r.getStreamId()); + assertEquals(15L, r.getBaseOffset()); + assertEquals(10L, r.getRecordBatch().count()); + assertEquals(r2.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertFalse(it.hasNext()); + } + + { + Iterator it = objectReader.read(blockIndexes.get(1)).get().iterator(); + StreamRecordBatch r = it.next(); + assertEquals(streamId, r.getStreamId()); + assertEquals(25L, r.getBaseOffset()); + assertEquals(8L, r.getRecordBatch().count()); + assertEquals(r3.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + r = it.next(); + assertEquals(streamId, r.getStreamId()); + assertEquals(33L, r.getBaseOffset()); + assertEquals(6L, r.getRecordBatch().count()); + assertEquals(r4.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertFalse(it.hasNext()); + } + } + + StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) { + return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(payloadSize)); + } + +}