Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve exception type on BlockingIterable#forEach variants #3154

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.BlockingIterable;
import io.servicetalk.concurrent.BlockingIterator;
import io.servicetalk.concurrent.internal.DeliberateException;
import io.servicetalk.concurrent.internal.DeliberateIOException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static io.servicetalk.concurrent.api.Publisher.from;
Expand Down Expand Up @@ -74,6 +79,16 @@ void errorEmittedIsThrown() {
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
}

@ParameterizedTest(name = "{displayName} [{index}] clazz={0}")
@ValueSource(classes = {DeliberateException.class, DeliberateIOException.class})
void forEachThrowsOriginalException(final Class<Exception> clazz) throws Exception {
Exception ex = clazz.getDeclaredConstructor().newInstance();
BlockingIterable<Integer> iterator = Publisher.<Integer>failed(ex).toIterable();
assertSame(ex, assertThrows(clazz, () -> iterator.forEach(c -> { })));
assertSame(ex, assertThrows(clazz, () -> iterator.forEach(c -> { }, 1, TimeUnit.SECONDS)));
assertSame(ex, assertThrows(clazz, () -> iterator.forEach(c -> { }, () -> 1, TimeUnit.SECONDS)));
}

@Test
void doubleHashNextWithError() {
DeliberateException de = new DeliberateException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.internal;

import java.io.IOException;

public final class DeliberateIOException extends IOException {
private static final long serialVersionUID = 3895872333544069787L;

public DeliberateIOException() {
super("Deliberate IO Exception");
}

@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ default void forEach(final Consumer<? super T> action) {
while (iterator.hasNext()) {
action.accept(iterator.next());
}
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
} catch (Throwable t) {
// we "sneaky throw" here any exception that bubbles up to be backwards compatible with the
// previous implementation.
ThrowableUtils.sneakyThrow(t);
}
}

/**
* Mimics the behavior of {@link #forEach(Consumer)} but uses the {@code timeoutSupplier} to determine the timeout
* value for interactions with the {@link BlockingIterator}.
* <p>
* By default the {@code timeoutSupplier} will be used for each interaction with
* {@link BlockingIterator#hasNext(long, TimeUnit)} and {@link BlockingIterator#next(long, TimeUnit)}. However
* By default, the {@code timeoutSupplier} will be used for each interaction with
* {@link BlockingIterator#hasNext(long, TimeUnit)} and {@link BlockingIterator#next(long, TimeUnit)}. However,
* implementations of {@link BlockingIterable} may decide to only apply the timeout when they are not sure if
* an interaction with the {@link BlockingIterator} will block or not.
* <p>
Expand All @@ -74,17 +74,17 @@ default void forEach(final Consumer<? super T> action) {
* @throws TimeoutException If an individual call to {@link BlockingIterator#hasNext(long, TimeUnit)} takes
* longer than the {@code timeout} duration.
*/
default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, TimeUnit unit)
default void forEach(final Consumer<? super T> action, final LongSupplier timeoutSupplier, final TimeUnit unit)
throws TimeoutException {
requireNonNull(action);
try (BlockingIterator<T> iterator = iterator()) {
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
}
} catch (TimeoutException | RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
} catch (Throwable t) {
// we "sneaky throw" here any exception that bubbles up to be backwards compatible with the
// previous implementation.
ThrowableUtils.sneakyThrow(t);
}
}

Expand All @@ -96,9 +96,9 @@ default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, T
* Note that the {@code timeout} duration is an approximation and this duration maybe
* exceeded if data is available without blocking.
* <p>
* By default the {@code timeout} will be used for each interaction with
* {@link BlockingIterator#hasNext(long, TimeUnit)} and {@link BlockingIterator#next(long, TimeUnit)}. However
* implementations of {@link BlockingIterable} may decide to only apply the timeout when they are not be sure if
* By default, the {@code timeout} will be used for each interaction with
* {@link BlockingIterator#hasNext(long, TimeUnit)} and {@link BlockingIterator#next(long, TimeUnit)}. However,
* implementations of {@link BlockingIterable} may decide to only apply the timeout when they are not sure if
* an interaction with the {@link BlockingIterator} will block or not.
* <p>
* Note: This method can sneaky-throw an {@link InterruptedException} when a blocking operation internally does so.
Expand All @@ -112,7 +112,8 @@ default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, T
* @throws TimeoutException If the total iteration time as determined by
* {@link BlockingIterator#hasNext(long, TimeUnit)} takes longer than the {@code timeout} duration.
*/
default void forEach(Consumer<? super T> action, long timeout, TimeUnit unit) throws TimeoutException {
default void forEach(final Consumer<? super T> action, final long timeout, final TimeUnit unit)
throws TimeoutException {
requireNonNull(action);
try (BlockingIterator<T> iterator = iterator()) {
long remainingTimeoutNanos = unit.toNanos(timeout);
Expand All @@ -128,10 +129,10 @@ default void forEach(Consumer<? super T> action, long timeout, TimeUnit unit) th
timeStampANanos = nanoTime();
remainingTimeoutNanos -= timeStampANanos - timeStampBNanos;
}
} catch (TimeoutException | RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
} catch (Throwable t) {
// we "sneaky throw" here any exception that bubbles up to be backwards compatible with the
// previous implementation.
ThrowableUtils.sneakyThrow(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent;

final class ThrowableUtils {

private ThrowableUtils() {
// singleton
}

/**
* Throws the provided exception using the "sneaky throws" idiom.
*
* @param t The exception to throw.
* @param <E> the expected type of the exception thrown.
* @throws E unconditional throws the provided exception.
*/
@SuppressWarnings("unchecked")
static <E extends Throwable> void sneakyThrow(final Throwable t) throws E {
throw (E) t;
}
}
Loading