Skip to content

Commit

Permalink
Fix StyxObservable type parameters (#219)
Browse files Browse the repository at this point in the history
The type parameters were overly strict. They have been made more nuanced.
  • Loading branch information
kvosper authored Jul 26, 2018
1 parent 6fbade5 commit 64e137c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* An observable that underpins the StyxObservable interface.
*
* @param <T>
* @param <T> type of object published
*/
class StyxCoreObservable<T> implements StyxObservable<T> {
private final Observable<T> delegate;
Expand All @@ -38,18 +38,6 @@ public StyxCoreObservable(CompletionStage<T> future) {
this.delegate = toObservable(future);
}

private static <T> Observable<T> toObservable(CompletionStage<T> future) {
return Observable.create(subscriber ->
future.whenComplete((result, error) -> {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onNext(result);
subscriber.onCompleted();
}
}));
}

public static <T> StyxObservable<T> empty() {
return new StyxCoreObservable<>(Observable.empty());
}
Expand All @@ -62,31 +50,23 @@ public static <T> StyxObservable<T> error(Throwable cause) {
return new StyxCoreObservable<T>(Observable.error(cause));
}

public <U> StyxObservable<U> map(Function<T, U> transformation) {
public <R> StyxObservable<R> map(Function<? super T, ? extends R> transformation) {
return new StyxCoreObservable<>(delegate.map(transformation::apply));
}

public <U> StyxObservable<U> flatMap(Function<T, StyxObservable<U>> transformation) {
public <R> StyxObservable<R> flatMap(Function<? super T, ? extends StyxObservable<? extends R>> transformation) {
return new StyxCoreObservable<>(delegate.flatMap(response ->
toObservable(transformation.apply(response))));
}

private static <U> Observable<? extends U> toObservable(StyxObservable<U> styxObservable) {
return styxObservable instanceof StyxCoreObservable
? ((StyxCoreObservable<U>) styxObservable).delegate
: toObservable(styxObservable.asCompletableFuture());
}

public <U> StyxObservable<U> reduce(BiFunction<T, U, U> accumulator, U seed) {
public <R> StyxObservable<R> reduce(BiFunction<? super T, R, R> accumulator, R seed) {
return new StyxCoreObservable<>(delegate.reduce(seed, (result, element) -> accumulator.apply(element, result)));
}

@Override
public StyxObservable<T> onError(Function<Throwable, StyxObservable<T>> errorHandler) {
return new StyxCoreObservable<>(delegate.onErrorResumeNext(cause -> {
StyxCoreObservable<T> result = (StyxCoreObservable<T>) errorHandler.apply(cause);
return result.delegate();
}));
public StyxObservable<T> onError(Function<Throwable, ? extends StyxObservable<? extends T>> errorHandler) {
return new StyxCoreObservable<>(delegate.onErrorResumeNext(cause ->
toObservable(errorHandler.apply(cause))));
}

public Observable<T> delegate() {
Expand All @@ -103,4 +83,21 @@ private static <T> CompletableFuture<T> fromSingleObservable(Observable<T> obser
return future;
}

private static <T> Observable<T> toObservable(StyxObservable<T> styxObservable) {
return styxObservable instanceof StyxCoreObservable
? ((StyxCoreObservable<T>) styxObservable).delegate
: toObservable(styxObservable.asCompletableFuture());
}

private static <T> Observable<T> toObservable(CompletionStage<T> future) {
return Observable.create(subscriber ->
future.whenComplete((result, error) -> {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onNext(result);
subscriber.onCompleted();
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
* <p>
* This interface provides a is *not* intended for plugins to extend.
*
* @param <T>
* @param <T> type of object published
*/
public interface StyxObservable<T> {
<U> StyxObservable<U> map(Function<T, U> transformation);
<R> StyxObservable<R> map(Function<? super T, ? extends R> transformation);

<U> StyxObservable<U> flatMap(Function<T, StyxObservable<U>> transformation);
<R> StyxObservable<R> flatMap(Function<? super T, ? extends StyxObservable<? extends R>> transformation);

<U> StyxObservable<U> reduce(BiFunction<T, U, U> accumulator, U initialValue);
<R> StyxObservable<R> reduce(BiFunction<? super T, R, R> accumulator, R initialValue);

StyxObservable<T> onError(Function<Throwable, StyxObservable<T>> errorHandler);
StyxObservable<T> onError(Function<Throwable, ? extends StyxObservable<? extends T>> errorHandler);

/**
* Converts this observable to a completable future. Note that in order to do this, it must
Expand All @@ -62,7 +62,7 @@ static <T> StyxObservable<T> of(T value) {
return new StyxCoreObservable<>(Observable.just(value));
}

static <T> StyxObservable<T> from(Iterable<T> values) {
static <T> StyxObservable<T> from(Iterable<? extends T> values) {
return new StyxCoreObservable<>(Observable.from(values));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ class AsyncRequestContentDelayPlugin extends PluginAdapter {
Thread.sleep(1000)
Observable.just(byteBuf)
})
StyxObservable.of(request)

// This was split apart as it no longer compiles without the type annotation StyxObservable[HttpRequest]
val mapped: StyxObservable[HttpRequest] = StyxObservable.of(request)
.map(asJavaFunction((request: HttpRequest) => request.newBuilder().body(fromRxObservable(contentTransformation)).build()))

mapped
.flatMap(asJavaFunction((request: HttpRequest) => chain.proceed(request)))
}
}

0 comments on commit 64e137c

Please sign in to comment.