Skip to content

Commit

Permalink
MSQ: Properly report errors that occur when starting up RunWorkOrder. (
Browse files Browse the repository at this point in the history
…#17069)

* MSQ: Properly report errors that occur when starting up RunWorkOrder.

In #17046, an exception thrown by RunWorkOrder#startAsync would be ignored
and replaced with a generic CanceledFault. This patch fixes it by retaining
the original error.
  • Loading branch information
gianm authored Sep 17, 2024
1 parent 307b8e3 commit 50503fe
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ enum State
STARTED,

/**
* State entered upon calling {@link #stop()}.
* State entered upon calling {@link #stop(Throwable)}.
*/
STOPPING,

/**
* State entered when a call to {@link #stop()} concludes.
* State entered when a call to {@link #stop(Throwable)} concludes.
*/
STOPPED
}
Expand Down Expand Up @@ -232,7 +232,7 @@ public void startAsync()
setUpCompletionCallbacks();
}
catch (Throwable t) {
stopUnchecked();
stopUnchecked(t);
}
}

Expand All @@ -242,64 +242,72 @@ public void startAsync()
* are all properly cleaned up.
*
* Blocks until execution is fully stopped.
*
* @param t error to send to {@link RunWorkOrderListener#onFailure}, if success/failure has not already been sent.
* Will also be thrown at the end of this method.
*/
public void stop() throws InterruptedException
public void stop(@Nullable Throwable t) throws InterruptedException
{
if (state.compareAndSet(State.INIT, State.STOPPING)
|| state.compareAndSet(State.STARTED, State.STOPPING)) {
// Initiate stopping.
Throwable e = null;

try {
exec.cancel(cancellationId);
}
catch (Throwable e2) {
e = e2;
if (t == null) {
t = e2;
} else {
t.addSuppressed(e2);
}
}

try {
frameContext.close();
}
catch (Throwable e2) {
if (e == null) {
e = e2;
if (t == null) {
t = e2;
} else {
e.addSuppressed(e2);
t.addSuppressed(e2);
}
}

try {
// notifyListener will ignore this cancellation error if work has already succeeded.
notifyListener(Either.error(new MSQException(CanceledFault.instance())));
// notifyListener will ignore this error if work has already succeeded.
notifyListener(Either.error(t != null ? t : new MSQException(CanceledFault.instance())));
}
catch (Throwable e2) {
if (e == null) {
e = e2;
if (t == null) {
t = e2;
} else {
e.addSuppressed(e2);
t.addSuppressed(e2);
}
}

stopLatch.countDown();

if (e != null) {
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

stopLatch.await();

if (t != null) {
Throwables.throwIfInstanceOf(t, InterruptedException.class);
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
}
}

/**
* Calls {@link #stop()}. If the call to {@link #stop()} throws {@link InterruptedException}, this method sets
* the interrupt flag and throws an unchecked exception.
* Calls {@link #stop(Throwable)}. If the call to {@link #stop(Throwable)} throws {@link InterruptedException},
* this method sets the interrupt flag and throws an unchecked exception.
*
* @param t error to send to {@link RunWorkOrderListener#onFailure}, if success/failure has not already been sent.
* Will also be thrown at the end of this method.
*/
public void stopUnchecked()
public void stopUnchecked(@Nullable final Throwable t)
{
try {
stop();
stop(t);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ private void handleNewWorkOrder(
);

// Set up processorCloser (called when processing is done).
kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked);
kernelHolder.processorCloser.register(() -> runWorkOrder.stopUnchecked(null));

// Start working on this stage immediately.
kernel.startReading();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.exec;

import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.kernel.FrameContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class RunWorkOrderTest
{
private static final String CANCELLATION_ID = "my-cancellation-id";

@Test
public void test_stopUnchecked() throws InterruptedException
{
final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
final FrameContext frameContext = Mockito.mock(FrameContext.class);
final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class);
final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class);

Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);

final RunWorkOrder runWorkOrder =
new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false);

runWorkOrder.stopUnchecked(null);

// Calling a second time doesn't do anything special.
runWorkOrder.stopUnchecked(null);

Mockito.verify(exec).cancel(CANCELLATION_ID);
Mockito.verify(frameContext).close();
Mockito.verify(listener).onFailure(ArgumentMatchers.any(MSQException.class));
}

@Test
public void test_stopUnchecked_error() throws InterruptedException
{
final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
final FrameContext frameContext = Mockito.mock(FrameContext.class);
final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class);
final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class);

Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);

final RunWorkOrder runWorkOrder =
new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false);

final ISE exception = new ISE("oops");

Assert.assertThrows(
IllegalStateException.class,
() -> runWorkOrder.stopUnchecked(exception)
);

// Calling a second time doesn't do anything special. We already tried our best.
runWorkOrder.stopUnchecked(null);

Mockito.verify(exec).cancel(CANCELLATION_ID);
Mockito.verify(frameContext).close();
Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
}

@Test
public void test_stopUnchecked_errorDuringExecCancel() throws InterruptedException
{
final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
final FrameContext frameContext = Mockito.mock(FrameContext.class);
final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class);
final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class);

final ISE exception = new ISE("oops");
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
Mockito.doThrow(exception).when(exec).cancel(CANCELLATION_ID);

final RunWorkOrder runWorkOrder =
new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false);

Assert.assertThrows(
IllegalStateException.class,
() -> runWorkOrder.stopUnchecked(null)
);

Mockito.verify(exec).cancel(CANCELLATION_ID);
Mockito.verify(frameContext).close();
Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
}

@Test
public void test_stopUnchecked_errorDuringFrameContextClose() throws InterruptedException
{
final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
final FrameContext frameContext = Mockito.mock(FrameContext.class);
final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class);
final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class);

final ISE exception = new ISE("oops");
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
Mockito.doThrow(exception).when(frameContext).close();

final RunWorkOrder runWorkOrder =
new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false);

Assert.assertThrows(
IllegalStateException.class,
() -> runWorkOrder.stopUnchecked(null)
);

Mockito.verify(exec).cancel(CANCELLATION_ID);
Mockito.verify(frameContext).close();
Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
}
}

0 comments on commit 50503fe

Please sign in to comment.