From 0cf708256f99843444a93b8de950641fcd74bda7 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sat, 30 May 2015 11:34:58 +1000 Subject: [PATCH] improve Subscriber readability and don't perform unnecessary test in request method --- src/main/java/rx/Subscriber.java | 96 ++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/src/main/java/rx/Subscriber.java b/src/main/java/rx/Subscriber.java index 0c26201552..1767237b25 100644 --- a/src/main/java/rx/Subscriber.java +++ b/src/main/java/rx/Subscriber.java @@ -31,20 +31,23 @@ * the type of items the Subscriber expects to observe */ public abstract class Subscriber implements Observer, Subscription { + + // represents requested not set yet + private static final Long NOT_SET = Long.MIN_VALUE; - private final SubscriptionList cs; - private final Subscriber op; + private final SubscriptionList subscriptions; + private final Subscriber subscriber; /* protected by `this` */ - private Producer p; + private Producer producer; /* protected by `this` */ - private long requested = Long.MIN_VALUE; // default to not set + private long requested = NOT_SET; // default to not set protected Subscriber() { this(null, false); } - protected Subscriber(Subscriber op) { - this(op, true); + protected Subscriber(Subscriber subscriber) { + this(subscriber, true); } /** @@ -53,15 +56,15 @@ protected Subscriber(Subscriber op) { *

* To retain the chaining of subscribers, add the created instance to {@code op} via {@link #add}. * - * @param op + * @param subscriber * the other Subscriber * @param shareSubscriptions * {@code true} to share the subscription list in {@code op} with this instance * @since 1.0.6 */ - protected Subscriber(Subscriber op, boolean shareSubscriptions) { - this.op = op; - this.cs = shareSubscriptions && op != null ? op.cs : new SubscriptionList(); + protected Subscriber(Subscriber subscriber, boolean shareSubscriptions) { + this.subscriber = subscriber; + this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } /** @@ -73,12 +76,12 @@ protected Subscriber(Subscriber op, boolean shareSubscriptions) { * the {@code Subscription} to add */ public final void add(Subscription s) { - cs.add(s); + subscriptions.add(s); } @Override public final void unsubscribe() { - cs.unsubscribe(); + subscriptions.unsubscribe(); } /** @@ -88,7 +91,7 @@ public final void unsubscribe() { */ @Override public final boolean isUnsubscribed() { - return cs.isUnsubscribed(); + return subscriptions.isUnsubscribed(); } /** @@ -124,57 +127,64 @@ protected final void request(long n) { if (n < 0) { throw new IllegalArgumentException("number requested cannot be negative: " + n); } - Producer shouldRequest = null; + + // if producer is set then we will request from it + // otherwise we increase the requested count by n + Producer producerToRequestFrom = null; synchronized (this) { - if (p != null) { - shouldRequest = p; - } else if (requested == Long.MIN_VALUE) { - requested = n; - } else { - final long total = requested + n; - // check if overflow occurred - if (total < 0) { - requested = Long.MAX_VALUE; - } else { - requested = total; - } + if (producer != null) { + producerToRequestFrom = producer; + } else { + addToRequested(n); + return; } } - // after releasing lock - if (shouldRequest != null) { - shouldRequest.request(n); - } + // after releasing lock (we should not make requests holding a lock) + producerToRequestFrom.request(n); } + private void addToRequested(long n) { + if (requested == NOT_SET) { + requested = n; + } else { + final long total = requested + n; + // check if overflow occurred + if (total < 0) { + requested = Long.MAX_VALUE; + } else { + requested = total; + } + } + } + /** * @warn javadoc description missing * @warn param producer not described - * @param producer + * @param p */ - public void setProducer(Producer producer) { + public void setProducer(Producer p) { long toRequest; - boolean setProducer = false; + boolean passToSubscriber = false; synchronized (this) { toRequest = requested; - p = producer; - if (op != null) { + producer = p; + if (subscriber != null) { // middle operator ... we pass thru unless a request has been made - if (toRequest == Long.MIN_VALUE) { + if (toRequest == NOT_SET) { // we pass-thru to the next producer as nothing has been requested - setProducer = true; + passToSubscriber = true; } - } } // do after releasing lock - if (setProducer) { - op.setProducer(p); + if (passToSubscriber) { + subscriber.setProducer(producer); } else { // we execute the request with whatever has been requested (or Long.MAX_VALUE) - if (toRequest == Long.MIN_VALUE) { - p.request(Long.MAX_VALUE); + if (toRequest == NOT_SET) { + producer.request(Long.MAX_VALUE); } else { - p.request(toRequest); + producer.request(toRequest); } } }