Apache Pulsar is a distributed messaging system that supports multiple messaging protocols and storage methods. Among them, Pulsar Topic Compaction provides a key-based data retention mechanism that allows you only to keep the most recent message associated with that key to reduce storage space and improve system efficiency.
Another Pulsar's internal use case, the Topic Compaction of the new load balancer, changed the strategy of compaction. It only keeps the first value of the key. For more detail, see PIP-215.
More topic compaction details can be found in Pulsar Topic Compaction.
Currently, the implementation of Pulsar Topic Compaction is fixed and does not support customized strategy, which limits users from using more Compactor policies in their applications.
For example, current topic compaction can work with pulsar format data in KoP, but it can't work with Kafka format data since the data written to the entry is in Kafka format. The Pulsar compactor doesn't aware of Kafka format data. And it doesn't make sense to support Kafka format data handling in Pulsar. We need to implement a pluggable compactor to support Kafka format data handling in KoP.
Another long-term consideration is that we may need to support writing the compacted data anywhere, S3, in columnar format, or even partitioning.
So we need to make the whole topic compaction service (including Write API & Read API) pluggable to support more customize compaction service implementation.
-
Abstract topic compaction service interface and support topic compaction service pluggable.
-
Migrate the current implementation to a new interface implementation.
-
Makes existing tests compatible with new implementations.
-
For CompactorMetrics, keep the current implementation and don't define related methods in the topic compaction service interface. In the future, it will use the
Otel
interface or other metrics API instead. -
For
StrategicTwoPhaseCompactor
, it's out of the scope for regular compaction. It's only used for the load balancer. So it won't change.
To make the whole topic compaction service pluggable, we need to abstract TopicCompactionService
interface, it can provide the capability that the compactor has and provide the read API to read entries from compacted data.
We should combine CompactedTopicImpl
and TwoPhaseCompactor
to the Pulsar implementation of the topic compaction service and make behavior with the current implementation consistent.
Class Diagram of core class:
classDiagram
direction BT
class CompactedTopic {
<<Interface>>
+ deleteCompactedLedger(long) CompletableFuture~Void~
+ getCompactionHorizon() Optional~Position~
+ newCompactedLedger(Position, long) CompletableFuture~CompactedTopicContext~
+ asyncReadEntriesOrWait(ManagedCursor, int, boolean, ReadEntriesCallback, Consumer) void
+ readLastEntryOfCompactedLedger() CompletableFuture~Entry~
}
class CompactedTopicImpl {
+ newCompactedLedger(Position, long) CompletableFuture~CompactedTopicContext~
+ getCompactedTopicContext() Optional~CompactedTopicContext~
+ asyncReadEntriesOrWait(ManagedCursor, int, boolean, ReadEntriesCallback, Consumer) void
+ getCompactionHorizon() Optional~Position~
+ deleteCompactedLedger(long) CompletableFuture~Void~
+ getCompactedTopicContextFuture() CompletableFuture~CompactedTopicContext~
+ readLastEntryOfCompactedLedger() CompletableFuture~Entry~
}
class CompactionServiceFactory {
<<Interface>>
+ newTopicCompactionService(String) CompletableFuture~TopicCompactionService~
+ initialize(PulsarService) CompletableFuture~Void~
}
class Compactor {
+ getStats() CompactorMXBean
+ compact(String) CompletableFuture~Long~
}
class PulsarCompactionServiceFactory {
+ getNullableCompactor() Compactor?
+ getCompactor() Compactor
+ newTopicCompactionService(String) CompletableFuture~TopicCompactionService~
+ initialize(PulsarService) CompletableFuture~Void~
+ close() void
}
class PulsarCompactorSubscription {
+ acknowledgeMessage(List~Position~, AckType, Map<String, Long>) void
}
class PulsarTopicCompactionService {
+ compact() CompletableFuture~Void~
+ readCompactedEntries(Position, int) CompletableFuture~List~Entry~~
+ getCompactedLastPosition() CompletableFuture~Position~
+ readCompactedLastEntry() CompletableFuture~Entry~
+ getCompactedTopic() CompactedTopicImpl
+ close() void
}
class TopicCompactionService {
<<Interface>>
+ compact() CompletableFuture~Void~
+ readCompactedEntries(Position, int) CompletableFuture~List~Entry~~
+ getCompactedLastPosition() CompletableFuture~Position~
+ readCompactedLastEntry() CompletableFuture~Entry~
}
class TwoPhaseCompactor
CompactedTopicImpl ..> CompactedTopic
PulsarCompactionServiceFactory ..> CompactionServiceFactory
PulsarCompactionServiceFactory "1" *--> "compactor 1" Compactor
PulsarCompactionServiceFactory ..> PulsarTopicCompactionService : «create»
PulsarCompactionServiceFactory ..> TwoPhaseCompactor : «create»
PulsarCompactorSubscription "1" *--> "compactedTopic 1" CompactedTopic
PulsarTopicCompactionService ..> CompactedTopicImpl : «create»
PulsarTopicCompactionService "1" *--> "compactedTopic 1" CompactedTopicImpl
PulsarTopicCompactionService ..> TopicCompactionService
TwoPhaseCompactor --> Compactor
-
Define a standard TopicCompactionService interface.
import javax.annotation.Nonnull; public interface TopicCompactionService extends AutoCloseable { /** * Compact the topic. * Topic Compaction is a key-based retention mechanism. It keeps the most recent value for a given key and * user reads compacted data from TopicCompactionService. * * @return a future that will be completed when the compaction is done. */ CompletableFuture<Void> compact(); /** * Read the compacted entries from the TopicCompactionService. * * @param startPosition the position to start reading from. * @param numberOfEntriesToRead the maximum number of entries to read. * @return a future that will be completed with the list of entries, this list can be null. */ CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull Position startPosition, int numberOfEntriesToRead); /** * Read the last compacted entry from the TopicCompactionService. * * @return a future that will be completed with the compacted last entry, this entry can be null. */ CompletableFuture<Entry> readLastCompactedEntry(); /** * Get the last compacted position from the TopicCompactionService. * * @return a future that will be completed with the last compacted position, this position can be null. */ CompletableFuture<Position> getLastCompactedPosition(); }
-
Define a standard CompactionServiceFactory interface to manage
TopicCompactionService
.public interface CompactionServiceFactory extends AutoCloseable { /** * Initialize the compaction service factory. * * @param pulsarService * the pulsar service instance * @return a future represents the initialization result */ CompletableFuture<Void> initialize(PulsarService pulsarService); /** * Create a new topic compaction service for topic. * * @param topic * the topic name * @return a future represents the topic compaction service */ CompletableFuture<TopicCompactionService> newTopicCompactionService(String topic); }
-
Implement
PulsarCompactionServiceFactory
andPulsarCompactionService
-
Combining
CompactedTopicImpl
andTwoPhaseCompactor
toPulsarTopicCompactionService
-
Rename
CompactorSubscription
toPulsarCompactorSubscription
, since it is only applicable to the implementation of Pulsar. -
For
CompactorMetrics
: keep the current implementation. Currently, it only supportsPulsarTopicCompactionService
. In the future, it will use theOtel
API or other metrics API instead, and customizedTopicCompactedService
should implement theOtel
API or other metrics API. -
Fix tests and makes them compatible with new implementations.
broker.conf
compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory
- Only make the compactor pluggable
- Make the compaction data serializer and deserializer pluggable in the current Pulsar implementation.
But they will introduce some short-term configurations and interfaces, so they are not good for the long-term view of Pulsar. For a discussion of alternatives see: PIP-274
- Mailing List discussion thread: https://lists.apache.org/thread/ox2bot3p9j9fydqkw3v5gt5twc8jslvd
- Mailing List voting thread: https://lists.apache.org/thread/1pcsmn1osdkz04dtgy3fchgmzoko5jnf