Skip to content

Commit

Permalink
feat(stream-client): optimize getting objects from `StreamMetadataMan…
Browse files Browse the repository at this point in the history
…ager` (#64)

* refactor(s3): remove inflight wal objects

1. remove inflight wal objects
2. delete redundant classes

Signed-off-by: TheR1sing3un <[email protected]>

* feat(s3): support blocking `getObjects` in `StreamMetadataManager`

1. support blocking `getObjects` in `StreamMetadataManager`
2. refactor
`getObjects`
3. change the unit of `s3.cache.size` from `MB`
to `B`

Signed-off-by: TheR1sing3un <[email protected]>

* feat(s3): more suitable log level

1. more suitable log level

Signed-off-by: TheR1sing3un <[email protected]>

* fix(s3): add more concurrent protection

1. add more concurrent protection

Signed-off-by: TheR1sing3un <[email protected]>

---------

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un authored Sep 7, 2023
1 parent 5ff7e8c commit 0d1a8ff
Show file tree
Hide file tree
Showing 31 changed files with 653 additions and 701 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.raft" />
</subpackage>

<subpackage name="testkit">
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.automq.elasticstream.client.api.StreamClient;
import kafka.log.s3.cache.DefaultS3BlockCache;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.metadata.StreamMetadataManager;
import kafka.log.s3.network.ControllerRequestSender;
import kafka.log.s3.objects.ControllerObjectManager;
import kafka.log.s3.objects.ObjectManager;
Expand Down Expand Up @@ -60,7 +61,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator
this.requestSender = new ControllerRequestSender(brokerServer);
this.streamManager = new ControllerStreamManager(this.requestSender, config);
this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config);
this.blockCache = new DefaultS3BlockCache(config.s3CacheSize() * 1024L * 1024, objectManager, operator);
this.blockCache = new DefaultS3BlockCache(config.s3CacheSize(), objectManager, operator);
this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator);
this.streamClient = new S3StreamClient(this.streamManager, this.storage);
this.kvClient = new ControllerKVClient(this.requestSender);
Expand Down
340 changes: 0 additions & 340 deletions core/src/main/scala/kafka/log/s3/StreamMetadataManager.java

This file was deleted.

Loading

0 comments on commit 0d1a8ff

Please sign in to comment.