diff --git a/rxjava-contrib/rxjava-javafx/build.gradle b/rxjava-contrib/rxjava-javafx/build.gradle new file mode 100644 index 0000000000..2b08a63150 --- /dev/null +++ b/rxjava-contrib/rxjava-javafx/build.gradle @@ -0,0 +1,30 @@ +apply plugin: 'osgi' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':rxjava-core') + provided 'junit:junit-dep:4.10' + provided 'org.mockito:mockito-core:1.8.5' +} + +javadoc { + options { + doclet = "org.benjchristensen.doclet.DocletExclude" + docletpath = [rootProject.file('./gradle/doclet-exclude.jar')] + stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css') + windowTitle = "RxJava Javadoc ${project.version}" + } + options.addStringOption('top').value = '

RxJava

' +} + +jar { + manifest { + name = 'rxjava-javafx' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' + instruction 'Fragment-Host', 'com.netflix.rxjava.core' + } +} diff --git a/rxjava-contrib/rxjava-javafx/src/main/java/rx/javafx/sources/NodeEventSource.java b/rxjava-contrib/rxjava-javafx/src/main/java/rx/javafx/sources/NodeEventSource.java new file mode 100644 index 0000000000..d51fe4c7f4 --- /dev/null +++ b/rxjava-contrib/rxjava-javafx/src/main/java/rx/javafx/sources/NodeEventSource.java @@ -0,0 +1,41 @@ +package rx.javafx.sources; + +import javafx.event.Event; +import javafx.event.EventHandler; +import javafx.event.EventType; +import javafx.scene.Node; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action0; +import rx.schedulers.JavaFxScheduler; +import rx.subscriptions.JavaFxSubscriptions; + +public class NodeEventSource { + /** + * @see rx.observables.JavaFxObservable#fromNodeEvents + */ + public static Observable fromNodeEvents(final Node source, final EventType eventType) { + + return Observable.create(new Observable.OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + final EventHandler handler = new EventHandler() { + @Override + public void handle(T t) { + subscriber.onNext(t); + } + }; + + source.addEventHandler(eventType, handler); + + subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(new Action0() { + @Override + public void call() { + source.removeEventHandler(eventType, handler); + } + })); + } + + }).subscribeOn(JavaFxScheduler.getInstance()); + } +} diff --git a/rxjava-contrib/rxjava-javafx/src/main/java/rx/javafx/sources/ObservableValueSource.java b/rxjava-contrib/rxjava-javafx/src/main/java/rx/javafx/sources/ObservableValueSource.java new file mode 100644 index 0000000000..faac9c0e6e --- /dev/null +++ b/rxjava-contrib/rxjava-javafx/src/main/java/rx/javafx/sources/ObservableValueSource.java @@ -0,0 +1,42 @@ +package rx.javafx.sources; + +import javafx.beans.value.ChangeListener; +import javafx.beans.value.ObservableValue; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action0; +import rx.subscriptions.JavaFxSubscriptions; + +public class ObservableValueSource { + + /** + * @see rx.observables.JavaFxObservable#fromObservableValue + */ + public static Observable fromObservableValue(final ObservableValue fxObservable) { + return Observable.create(new Observable.OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + subscriber.onNext(fxObservable.getValue()); + + final ChangeListener listener = new ChangeListener() { + @Override + public void changed(final ObservableValue observableValue, final T prev, final T current) { + subscriber.onNext(current); + } + }; + + fxObservable.addListener(listener); + + subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(new Action0() { + @Override + public void call() { + fxObservable.removeListener(listener); + } + })); + + } + }); + } + + +} diff --git a/rxjava-contrib/rxjava-javafx/src/main/java/rx/observables/JavaFxObservable.java b/rxjava-contrib/rxjava-javafx/src/main/java/rx/observables/JavaFxObservable.java new file mode 100644 index 0000000000..4beba5b55c --- /dev/null +++ b/rxjava-contrib/rxjava-javafx/src/main/java/rx/observables/JavaFxObservable.java @@ -0,0 +1,53 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + + +import javafx.beans.value.ObservableValue; +import javafx.event.Event; +import javafx.event.EventType; +import javafx.scene.Node; +import rx.Observable; +import rx.javafx.sources.NodeEventSource; +import rx.javafx.sources.ObservableValueSource; + + +public enum JavaFxObservable { + ; // no instances + + + /** + * Creates an observable corresponding to javafx ui events. + * + * @param node The target of the UI events. + * @param eventType The type of the observed UI events + * @return An Observable of UI events, appropriately typed + */ + public static Observable fromNodeEvents(final Node node, final EventType eventType) { + return NodeEventSource.fromNodeEvents(node, eventType); + } + + /** + * Create an rx Observable from a javafx ObservableValue + * + * @param fxObservable the observed ObservableValue + * @param the type of the observed value + * @return an Observable emitting values as the wrapped ObservableValue changes + */ + public static Observable fromObservableValue(final ObservableValue fxObservable) { + return ObservableValueSource.fromObservableValue(fxObservable); + } +} diff --git a/rxjava-contrib/rxjava-javafx/src/main/java/rx/schedulers/JavaFxScheduler.java b/rxjava-contrib/rxjava-javafx/src/main/java/rx/schedulers/JavaFxScheduler.java new file mode 100644 index 0000000000..9196c5f0e7 --- /dev/null +++ b/rxjava-contrib/rxjava-javafx/src/main/java/rx/schedulers/JavaFxScheduler.java @@ -0,0 +1,136 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import javafx.animation.KeyFrame; +import javafx.animation.Timeline; +import javafx.application.Platform; +import javafx.event.ActionEvent; +import javafx.event.EventHandler; +import javafx.util.Duration; +import rx.Scheduler; +import rx.Subscription; +import rx.functions.Action0; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; + +import java.util.concurrent.TimeUnit; + +/** + * Executes work on the JavaFx UI thread. + * This scheduler should only be used with actions that execute quickly. + */ +public final class JavaFxScheduler extends Scheduler { + private static final JavaFxScheduler INSTANCE = new JavaFxScheduler(); + + /* package for unit test */JavaFxScheduler() { + } + + public static JavaFxScheduler getInstance() { + return INSTANCE; + } + + private static void assertThatTheDelayIsValidForTheJavaFxTimer(long delay) { + if (delay < 0 || delay > Integer.MAX_VALUE) { + throw new IllegalArgumentException(String.format("The JavaFx timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE)); + } + } + + @Override + public Worker createWorker() { + return new InnerJavaFxScheduler(); + } + + private static class InnerJavaFxScheduler extends Worker { + + private final CompositeSubscription innerSubscription = new CompositeSubscription(); + + @Override + public void unsubscribe() { + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + + @Override + public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { + long delay = unit.toMillis(delayTime); + assertThatTheDelayIsValidForTheJavaFxTimer(delay); + final BooleanSubscription s = BooleanSubscription.create(); + + final Timeline timeline = new Timeline(new KeyFrame(Duration.millis(delay), new EventHandler() { + + @Override + public void handle(ActionEvent event) { + if (innerSubscription.isUnsubscribed() || s.isUnsubscribed()) { + return; + } + action.call(); + innerSubscription.remove(s); + } + })); + + timeline.setCycleCount(1); + timeline.play(); + + innerSubscription.add(s); + + // wrap for returning so it also removes it from the 'innerSubscription' + return Subscriptions.create(new Action0() { + + @Override + public void call() { + timeline.stop(); + s.unsubscribe(); + innerSubscription.remove(s); + } + + }); + } + + @Override + public Subscription schedule(final Action0 action) { + final BooleanSubscription s = BooleanSubscription.create(); + Platform.runLater(new Runnable() { + @Override + public void run() { + if (innerSubscription.isUnsubscribed() || s.isUnsubscribed()) { + return; + } + action.call(); + innerSubscription.remove(s); + } + }); + + innerSubscription.add(s); + // wrap for returning so it also removes it from the 'innerSubscription' + return Subscriptions.create(new Action0() { + + @Override + public void call() { + s.unsubscribe(); + innerSubscription.remove(s); + } + + }); + } + + } +} diff --git a/rxjava-contrib/rxjava-javafx/src/main/java/rx/subscriptions/JavaFxSubscriptions.java b/rxjava-contrib/rxjava-javafx/src/main/java/rx/subscriptions/JavaFxSubscriptions.java new file mode 100644 index 0000000000..3767e39bd2 --- /dev/null +++ b/rxjava-contrib/rxjava-javafx/src/main/java/rx/subscriptions/JavaFxSubscriptions.java @@ -0,0 +1,56 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + + +import javafx.application.Platform; +import rx.Scheduler.Worker; +import rx.Subscription; +import rx.functions.Action0; +import rx.schedulers.JavaFxScheduler; + +public final class JavaFxSubscriptions { + + private JavaFxSubscriptions() { + // no instance + } + + /** + * Create an Subscription that always runs unsubscribe in the event dispatch thread. + * + * @param unsubscribe the action to be performed in the ui thread at un-subscription + * @return an Subscription that always runs unsubscribe in the event dispatch thread. + */ + public static Subscription unsubscribeInEventDispatchThread(final Action0 unsubscribe) { + return Subscriptions.create(new Action0() { + @Override + public void call() { + if (Platform.isFxApplicationThread()) { + unsubscribe.call(); + } else { + final Worker inner = JavaFxScheduler.getInstance().createWorker(); + inner.schedule(new Action0() { + @Override + public void call() { + unsubscribe.call(); + inner.unsubscribe(); + } + }); + } + } + }); + } +} diff --git a/rxjava-contrib/rxjava-javafx/src/test/java/rx/schedulers/JavaFxSchedulerTest.java b/rxjava-contrib/rxjava-javafx/src/test/java/rx/schedulers/JavaFxSchedulerTest.java new file mode 100644 index 0000000000..5f7e10fd38 --- /dev/null +++ b/rxjava-contrib/rxjava-javafx/src/test/java/rx/schedulers/JavaFxSchedulerTest.java @@ -0,0 +1,244 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import javafx.application.Application; +import javafx.application.Platform; +import javafx.stage.Stage; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.InOrder; +import rx.Scheduler.Worker; +import rx.functions.Action0; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +/** + * Executes work on the JavaFx UI thread. + * This scheduler should only be used with actions that execute quickly. + */ +public final class JavaFxSchedulerTest { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private static void waitForEmptyEventQueue() throws Exception { + FXUtilities.runAndWait(new Runnable() { + @Override + public void run() { + // nothing to do, we're just waiting here for the event queue to be emptied + } + }); + } + + public static class AsNonApp extends Application { + @Override + public void start(Stage primaryStage) throws Exception { + // noop + } + } + + @BeforeClass + public static void initJFX() { + Thread t = new Thread("JavaFX Init Thread") { + public void run() { + Application.launch(AsNonApp.class, new String[0]); + } + }; + t.setDaemon(true); + t.start(); + } + + @Test + public void testInvalidDelayValues() { + final JavaFxScheduler scheduler = new JavaFxScheduler(); + final Worker inner = scheduler.createWorker(); + final Action0 action = mock(Action0.class); + + exception.expect(IllegalArgumentException.class); + inner.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS); + + exception.expect(IllegalArgumentException.class); + inner.schedulePeriodically(action, 100L, -1L, TimeUnit.SECONDS); + + exception.expect(IllegalArgumentException.class); + inner.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS); + + exception.expect(IllegalArgumentException.class); + inner.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS); + } + + @Test + public void testPeriodicScheduling() throws Exception { + final JavaFxScheduler scheduler = new JavaFxScheduler(); + final Worker inner = scheduler.createWorker(); + + final CountDownLatch latch = new CountDownLatch(4); + + final Action0 innerAction = mock(Action0.class); + final Action0 action = new Action0() { + @Override + public void call() { + try { + innerAction.call(); + assertTrue(Platform.isFxApplicationThread()); + } finally { + latch.countDown(); + } + } + }; + + inner.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS); + + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { + fail("timed out waiting for tasks to execute"); + } + + inner.unsubscribe(); + waitForEmptyEventQueue(); + verify(innerAction, times(4)).call(); + } + + @Test + public void testNestedActions() throws Exception { + final JavaFxScheduler scheduler = new JavaFxScheduler(); + final Worker inner = scheduler.createWorker(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + assertTrue(Platform.isFxApplicationThread()); + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + assertTrue(Platform.isFxApplicationThread()); + secondStepStart.call(); + inner.schedule(firstAction); + secondStepEnd.call(); + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + assertTrue(Platform.isFxApplicationThread()); + thirdStepStart.call(); + inner.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + inner.schedule(thirdAction); + waitForEmptyEventQueue(); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } + + /* + * based on http://www.guigarage.com/2013/01/invokeandwait-for-javafx/ + * by hendrikebbers + */ + static public class FXUtilities { + + /** + * Simple helper class. + * + * @author hendrikebbers + */ + private static class ThrowableWrapper { + Throwable t; + } + + /** + * Invokes a Runnable in JFX Thread and waits while it's finished. Like + * SwingUtilities.invokeAndWait does for EDT. + * + * @param run The Runnable that has to be called on JFX thread. + * @throws InterruptedException f the execution is interrupted. + * @throws ExecutionException If a exception is occurred in the run method of the Runnable + */ + public static void runAndWait( final Runnable run) throws InterruptedException, ExecutionException { + if (Platform.isFxApplicationThread()) { + try { + run.run(); + } catch (Exception e) { + throw new ExecutionException(e); + } + } else { + final Lock lock = new ReentrantLock(); + final Condition condition = lock.newCondition(); + final ThrowableWrapper throwableWrapper = new ThrowableWrapper(); + lock.lock(); + try { + Platform.runLater(new Runnable() { + + @Override + public void run() { + lock.lock(); + try { + run.run(); + } catch (Throwable e) { + throwableWrapper.t = e; + } finally { + try { + condition.signal(); + } finally { + lock.unlock(); + } + } + } + }); + condition.await(); + if (throwableWrapper.t != null) { + throw new ExecutionException(throwableWrapper.t); + } + } finally { + lock.unlock(); + } + } + } + } +} diff --git a/settings.gradle b/settings.gradle index 913516f575..48c277fbf6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,6 +6,7 @@ include 'rxjava-core', \ 'language-adaptors:rxjava-scala', \ 'language-adaptors:rxjava-kotlin', \ 'rxjava-contrib:rxjava-swing', \ +'rxjava-contrib:rxjava-javafx', \ 'rxjava-contrib:rxjava-android', \ 'rxjava-contrib:rxjava-android-samples-build-wrapper', \ 'rxjava-contrib:rxjava-apache-http', \