From 8107b75faca4ed6bc5b711a0c3d5c8493ae88bd1 Mon Sep 17 00:00:00 2001 From: frankxieke Date: Wed, 13 Oct 2021 09:05:25 +0800 Subject: [PATCH] [offload] fix FileSystemManagedLedgerOffloader can not cleanup outdated ledger data (#12309) ### Motivation When using FileSystem Offloader, and you set the ledger retention policy, after the offloaded ledger is outdated, however the data can not be deleted from file system. This is because the datafile path is wrong. For example, in fact the datafile path is "file:///Users/pulsar_nfs/public/default/persistent/test_pulsar_delta/449-775f0961-9719-4658-8357-6b0edbdef7a3", but is mistaken formatted as "file:///Users/pulsar_nfs/null/449-775f0961-9719-4658-8357-6b0edbdef7a3". The reason is when format the data path, "ManagedLedgerName" property in "offloadDriverMetadata" is missing. ### Modifications Before format the data path, add "ManagedLedgerName" property in "offloadDriverMetadata" map. --- .../mledger/impl/ManagedLedgerImpl.java | 6 +- .../mledger/impl/OffloadLedgerDeleteTest.java | 149 ++++++++++++++++++ 2 files changed, 154 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5dd98cbd684c38..9b005c6424b0ab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3083,9 +3083,13 @@ private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName Map offloadDriverMetadata, String cleanupReason) { log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.", name, ledgerId, uuid.toString(), cleanupReason); + Map metadataMap = Maps.newHashMap(); + metadataMap.putAll(offloadDriverMetadata); + metadataMap.put("ManagedLedgerName", name); + Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10), Retries.NonFatalPredicate, - () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata), + () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap), scheduledExecutor, name).whenComplete((ignored, exception) -> { if (exception != null) { log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})", diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index f25332e91703a6..e36835f2efbfc6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -20,11 +20,16 @@ import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -43,6 +48,102 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase { private static final Logger log = LoggerFactory.getLogger(OffloadLedgerDeleteTest.class); + + static class MockFileSystemLedgerOffloader implements LedgerOffloader { + interface InjectAfterOffload { + void call(); + } + + private String storageBasePath = "/Users/pulsar_filesystem_offloader"; + + private static String getStoragePath(String storageBasePath, String managedLedgerName) { + return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/"; + } + + private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) { + return storagePath + ledgerId + "-" + uuid.toString(); + } + + ConcurrentHashMap offloads = new ConcurrentHashMap(); + ConcurrentHashMap deletes = new ConcurrentHashMap(); + OffloadPrefixTest.MockLedgerOffloader.InjectAfterOffload inject = null; + + Set offloadedLedgers() { + return offloads.keySet(); + } + + Set deletedOffloads() { + return deletes.keySet(); + } + + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("filesystem", "", "", "", + null, null, + null, null, + OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, + OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); + + @Override + public String getOffloadDriverName() { + return "mockfilesystem"; + } + + @Override + public CompletableFuture offload(ReadHandle ledger, + UUID uuid, + Map extraMetadata) { + Assert.assertNotNull(extraMetadata.get("ManagedLedgerName")); + String storagePath = getStoragePath(storageBasePath, extraMetadata.get("ManagedLedgerName")); + String dataFilePath = getDataFilePath(storagePath, ledger.getId(), uuid); + CompletableFuture promise = new CompletableFuture<>(); + if (offloads.putIfAbsent(ledger.getId(), dataFilePath) == null) { + promise.complete(null); + } else { + promise.completeExceptionally(new Exception("Already exists exception")); + } + + if (inject != null) { + inject.call(); + } + return promise; + } + + @Override + public CompletableFuture readOffloaded(long ledgerId, UUID uuid, + Map offloadDriverMetadata) { + CompletableFuture promise = new CompletableFuture<>(); + promise.completeExceptionally(new UnsupportedOperationException()); + return promise; + } + + @Override + public CompletableFuture deleteOffloaded(long ledgerId, UUID uuid, + Map offloadDriverMetadata) { + Assert.assertNotNull(offloadDriverMetadata.get("ManagedLedgerName")); + String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get("ManagedLedgerName")); + String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid); + CompletableFuture promise = new CompletableFuture<>(); + if (offloads.remove(ledgerId, dataFilePath)) { + deletes.put(ledgerId, dataFilePath); + promise.complete(null); + } else { + promise.completeExceptionally(new Exception("Not found")); + } + return promise; + }; + + @Override + public OffloadPoliciesImpl getOffloadPolicies() { + return offloadPolicies; + } + + @Override + public void close() { + } + } + @Test public void testLaggedDelete() throws Exception { OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader(); @@ -105,6 +206,54 @@ public void testLaggedDelete() throws Exception { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId)); } + @Test(timeOut = 5000) + public void testFileSystemOffloadDeletePath() throws Exception { + MockFileSystemLedgerOffloader offloader = new MockFileSystemLedgerOffloader(); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + MockClock clock = new MockClock(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(3, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(300000L); + config.setLedgerOffloader(offloader); + config.setClock(clock); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger_filesystem", config); + int i = 0; + for (; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId(); + + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2); + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId)); + + // ledger still exists in list + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + + // move past retention, should be deleted from offloaded also + clock.advance(5, TimeUnit.MINUTES); + CompletableFuture promise3 = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise3); + promise3.join(); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1); + assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId)); + } + @Test public void testLaggedDeleteRetentionSetLower() throws Exception { OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();