Skip to content

Commit

Permalink
Merging fail engine
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed Jun 29, 2022
1 parent 5f71da7 commit eca2353
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,11 @@ public void onAfterTranslogRecovery() {

@Override
public void onFailure(String reason, Exception ex) {
failEngine(reason, ex);
}

@Override
public void onTragicFailure(AlreadyClosedException ex) {
failOnTragicEvent(ex);
if (ex instanceof AlreadyClosedException) {
failOnTragicEvent((AlreadyClosedException) ex);
} else {
failEngine(reason, ex);
}
}
};
translogManagerRef = new InternalTranslogManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ public void rollTranslogGeneration() throws TranslogException {
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
translogEventListener.onTragicFailure(e);
translogEventListener.onFailure("translog roll generation failed", e);
throw e;
} catch (Exception e) {
try {
translogEventListener.onFailure("translog trimming failed", e);
translogEventListener.onFailure("translog roll generation failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
Expand Down Expand Up @@ -204,15 +204,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
engineLifeCycleAware.ensureOpen();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
translogEventListener.onTragicFailure(e);
translogEventListener.onFailure("translog trimming unreferenced translog failed", e);
throw e;
} catch (Exception e) {
try {
translogEventListener.onFailure("translog trimming failed", e);
translogEventListener.onFailure("translog trimming unreferenced translog failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new TranslogException(shardId, "failed to trim translog", e);
throw new TranslogException(shardId, "failed to trim unreferenced translog translog", e);
}
}

Expand All @@ -237,7 +237,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T
engineLifeCycleAware.ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
} catch (AlreadyClosedException e) {
translogEventListener.onTragicFailure(e);
translogEventListener.onFailure("translog operations trimming failed", e);
throw e;
} catch (Exception e) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.TranslogException;
Expand Down Expand Up @@ -98,20 +97,6 @@ public void onFailure(String reason, Exception e) {
maybeThrowTranslogExceptionAndSuppress(exceptionList);
}

@Override
public void onTragicFailure(AlreadyClosedException e) {
List<Exception> exceptionList = new ArrayList<>(listeners.size());
for (TranslogEventListener listener : listeners) {
try {
listener.onTragicFailure(e);
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex);
exceptionList.add(ex);
}
}
maybeThrowTranslogExceptionAndSuppress(exceptionList);
}

private <T extends Throwable> void maybeThrowTranslogExceptionAndSuppress(List<T> exceptions) {
T main = null;
for (T ex : exceptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.index.translog.listener;

import org.apache.lucene.store.AlreadyClosedException;

/**
* The listener that gets fired on events related to {@link org.opensearch.index.translog.TranslogManager}
*
Expand All @@ -35,12 +33,6 @@ default void onAfterTranslogRecovery() {}
*/
default void onBeginTranslogRecovery() {}

/**
* Invoked when translog operations run into accessing an already closed resource
* @param ex the exception thrown when accessing a closed resource
*/
default void onTragicFailure(AlreadyClosedException ex) {}

/**
* Invoked when translog operations run into any other failure
* @param reason the failure reason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.translog.listener;

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -24,7 +23,6 @@ public void testCompositeTranslogEventListener() {
AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onFailureInvoked = new AtomicInteger();
AtomicInteger onTragicFailureInvoked = new AtomicInteger();

TranslogEventListener listener = new TranslogEventListener() {
@Override
Expand All @@ -46,11 +44,6 @@ public void onBeginTranslogRecovery() {
public void onFailure(String reason, Exception ex) {
onFailureInvoked.incrementAndGet();
}

@Override
public void onTragicFailure(AlreadyClosedException ex) {
onTragicFailureInvoked.incrementAndGet();
}
};

final List<TranslogEventListener> translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener));
Expand All @@ -63,21 +56,18 @@ public void onTragicFailure(AlreadyClosedException ex) {
compositeListener.onAfterTranslogSync();
compositeListener.onBeginTranslogRecovery();
compositeListener.onFailure("reason", new RuntimeException("reason"));
compositeListener.onTragicFailure(new AlreadyClosedException("reason"));

assertEquals(2, onBeginTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogSyncInvoked.get());
assertEquals(2, onFailureInvoked.get());
assertEquals(2, onTragicFailureInvoked.get());
}

public void testCompositeTranslogEventListenerOnExceptions() {
AtomicInteger onTranslogSyncInvoked = new AtomicInteger();
AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onFailureInvoked = new AtomicInteger();
AtomicInteger onTragicFailureInvoked = new AtomicInteger();

TranslogEventListener listener = new TranslogEventListener() {
@Override
Expand All @@ -99,11 +89,6 @@ public void onBeginTranslogRecovery() {
public void onFailure(String reason, Exception ex) {
onFailureInvoked.incrementAndGet();
}

@Override
public void onTragicFailure(AlreadyClosedException ex) {
onTragicFailureInvoked.incrementAndGet();
}
};

TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance(
Expand All @@ -122,13 +107,10 @@ public void onTragicFailure(AlreadyClosedException ex) {
expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync());
expectThrows(RuntimeException.class, () -> compositeListener.onBeginTranslogRecovery());
expectThrows(RuntimeException.class, () -> compositeListener.onFailure("reason", new RuntimeException("reason")));
expectThrows(RuntimeException.class, () -> compositeListener.onTragicFailure(new AlreadyClosedException("reason")));

assertEquals(2, onBeginTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogSyncInvoked.get());
assertEquals(2, onFailureInvoked.get());
assertEquals(2, onTragicFailureInvoked.get());

}
}

0 comments on commit eca2353

Please sign in to comment.