Skip to content

Commit

Permalink
ManagedLedger: move to FENCED state in case of BadVersionException (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Sep 22, 2022
1 parent a8b265d commit 63d4cf2
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public ManagedLedgerFencedException() {
super(new Exception("Attempted to use a fenced managed ledger"));
}

public ManagedLedgerFencedException(String message) {
super(message);
}

public ManagedLedgerFencedException(Exception e) {
super(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
if (e instanceof MetadataNotFoundException) {
callback.initializeFailed(new ManagedLedgerNotFoundException(e));
} else {
Expand Down Expand Up @@ -481,6 +482,7 @@ public void operationComplete(Void v, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
callback.initializeFailed(new ManagedLedgerException(e));
}
};
Expand Down Expand Up @@ -1022,6 +1024,7 @@ public void operationComplete(Void result, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
callback.deleteCursorFailed(e, ctx);
}

Expand Down Expand Up @@ -1312,6 +1315,7 @@ public void operationComplete(Void result, Stat stat) {
@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Failed to terminate managed ledger: {}", name, e.getMessage());
handleBadVersion(e);
callback.terminateFailed(new ManagedLedgerException(e), ctx);
}
});
Expand Down Expand Up @@ -1396,6 +1400,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
public synchronized void asyncClose(final CloseCallback callback, final Object ctx) {
State state = STATE_UPDATER.get(this);
if (state == State.Fenced) {
cancelScheduledTasks();
factory.close(this);
callback.closeFailed(new ManagedLedgerFencedException(), ctx);
return;
Expand Down Expand Up @@ -1519,6 +1524,7 @@ public void operationComplete(Void v, Stat stat) {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
handleBadVersion(e);
mbean.startDataLedgerDeleteOp();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
mbean.endDataLedgerDeleteOp();
Expand All @@ -1527,14 +1533,12 @@ public void operationFailed(MetaStoreException e) {
BKException.getMessage(rc1));
}
}, null);

if (e instanceof BadVersionException) {
synchronized (ManagedLedgerImpl.this) {
log.error(
"[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger",
name);
lastLedgerCreationFailureTimestamp = clock.millis();
STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
// Return ManagedLedgerFencedException to addFailed callback
// to indicate that the ledger is now fenced and topic needs to be closed
clearPendingAddEntries(new ManagedLedgerFencedException(e));
Expand All @@ -1557,6 +1561,12 @@ public void operationFailed(MetaStoreException e) {
updateLedgersListAfterRollover(cb, newLedger);
}
}

private void handleBadVersion(Throwable e) {
if (e instanceof BadVersionException) {
setFenced();
}
}
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
if (!metadataMutex.tryLock()) {
// Defer update for later
Expand Down Expand Up @@ -2463,12 +2473,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
TOTAL_SIZE_UPDATER.get(this));
}
if (STATE_UPDATER.get(this) == State.Closed) {
State currentState = STATE_UPDATER.get(this);
if (currentState == State.Closed) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
trimmerMutex.unlock();
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
return;
}
if (currentState == State.Fenced) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name);
trimmerMutex.unlock();
promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger"));
return;
}

long slowestReaderLedgerId = -1;
if (!cursors.hasDurableCursors()) {
Expand Down Expand Up @@ -2557,7 +2574,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
if (currentState == State.CreatingLedger // Give up now and schedule a new trimming
|| !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming(isTruncate, promise);
trimmerMutex.unlock();
Expand Down Expand Up @@ -2624,6 +2641,7 @@ public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
handleBadVersion(e);

promise.completeExceptionally(e);
}
Expand Down Expand Up @@ -2708,7 +2726,7 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) {
public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
// ledgers
STATE_UPDATER.set(this, State.Fenced);
setFenced();
cancelScheduledTasks();

List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
Expand Down Expand Up @@ -2957,7 +2975,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
promise.whenComplete((result, exception) -> {
offloadMutex.unlock();
if (exception != null) {
callback.offloadFailed(new ManagedLedgerException(exception), ctx);
callback.offloadFailed(ManagedLedgerException.getManagedLedgerException(exception), ctx);
} else {
callback.offloadComplete(result, ctx);
}
Expand All @@ -2971,11 +2989,17 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct

private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
if (getState() == State.Closed) {
State currentState = getState();
if (currentState == State.Closed) {
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
String.format("managed ledger [%s] has already closed", name)));
return;
}
if (currentState == State.Fenced) {
promise.completeExceptionally(new ManagedLedgerFencedException(
String.format("managed ledger [%s] is fenced", name)));
return;
}
LedgerInfo info = ledgersToOffload.poll();
if (info == null) {
if (firstError.isPresent()) {
Expand Down Expand Up @@ -3117,6 +3141,7 @@ public void operationComplete(Void result, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
unlockingPromise.completeExceptionally(e);
}
});
Expand Down Expand Up @@ -3639,6 +3664,7 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException {
}

synchronized void setFenced() {
log.info("{} Moving to Fenced state", name);
STATE_UPDATER.set(this, State.Fenced);
}

Expand Down Expand Up @@ -3842,12 +3868,21 @@ private void scheduleTimeoutTask() {
? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds())
: timeoutSec;
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
checkAddTimeout();
checkReadTimeout();
checkTimeouts();
}), timeoutSec, timeoutSec, TimeUnit.SECONDS);
}
}

private void checkTimeouts() {
final State state = STATE_UPDATER.get(this);
if (state == State.Closed
|| state == State.Fenced) {
return;
}
checkAddTimeout();
checkReadTimeout();
}

private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
Expand Down Expand Up @@ -4004,6 +4039,7 @@ public void operationComplete(Void result, Stat version) {
@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Update managedLedger's properties failed", name, e);
handleBadVersion(e);
callback.updatePropertiesFailed(e, ctx);
metadataMutex.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -387,6 +391,72 @@ public void recoverAfterZnodeVersionError() throws Exception {
}
}

@Test
public void recoverAfterZnodeVersionErrorWhileTrimming() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_trim",
new ManagedLedgerConfig()
.setMaxEntriesPerLedger(2));
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());

metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger_trim")
&& op == FaultInjectionMetadataStore.OperationType.PUT
);

CompletableFuture<?> handle = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(handle);
assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
instanceOf(ManagedLedgerException.BadVersionException.class));

assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());

// if the task started after the ML moved to Fenced state, it must fail
CompletableFuture<?> handleAlreadyFenced = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(handleAlreadyFenced);
assertThat(expectThrows(ExecutionException.class, () -> handleAlreadyFenced.get()).getCause(),
instanceOf(ManagedLedgerException.ManagedLedgerFencedException.class));

try {
ledger.addEntry("entry".getBytes());
fail("should fail");
} catch (ManagedLedgerFencedException e) {
assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
}

assertFalse(factory.ledgers.isEmpty());
try {
ledger.close();
} catch (ManagedLedgerFencedException e) {
assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
}

// verify that the ManagedLedger has been unregistered even if it was fenced
assertTrue(factory.ledgers.isEmpty());
}

@Test
public void badVersionErrorDuringTruncateLedger() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_trim",
new ManagedLedgerConfig()
.setMaxEntriesPerLedger(2));
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());

metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger_trim")
&& op == FaultInjectionMetadataStore.OperationType.PUT
);

CompletableFuture<?> handle = ledger.asyncTruncate();
assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
instanceOf(ManagedLedgerException.BadVersionException.class));

assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
}

@Test
public void recoverAfterWriteError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -48,6 +49,8 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -125,6 +128,51 @@ public void testOffload() throws Exception {
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());

// ledgers should be marked as offloaded
ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext());
}

@Test
public void testOffloadFenced() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);

int i = 0;
for (; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
assertEquals(ledger.getLedgersInfoAsList().size(), 3);

metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger")
&& op == FaultInjectionMetadataStore.OperationType.PUT
);

assertThrows(ManagedLedgerException.ManagedLedgerFencedException.class, () ->
ledger.offloadPrefix(ledger.getLastConfirmedEntry()));

assertEquals(ledger.getLedgersInfoAsList().size(), 3);

// the offloader actually wrote the data on the storage
assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());

// but the ledgers should not be marked as offloaded in local memory, as the write to metadata failed
ledger.getLedgersInfoAsList().stream().allMatch(l -> !l.hasOffloadContext());

// check that the ledger is fenced
assertEquals(ManagedLedgerImpl.State.Fenced, ledger.getState());

}

@Test
Expand Down
Loading

0 comments on commit 63d4cf2

Please sign in to comment.