Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bind Operator #770

Merged
merged 23 commits into from
Jan 22, 2014
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1d1066b
Added Observable.bind
benjchristensen Jan 16, 2014
2ac7ec4
Re-implemented Take Operator with Bind
benjchristensen Jan 16, 2014
71baf0e
Bind implementation of fromIterable, toList, toSortedList
benjchristensen Jan 16, 2014
60ac55f
Bind implementation of Map, Cast, Timestamp
benjchristensen Jan 16, 2014
b5e4933
Take: Fix Terminal State Handling
benjchristensen Jan 16, 2014
723d935
Bind implementation of Merge
benjchristensen Jan 17, 2014
02ccc4d
New Bind Signature and GroupBy Operator
benjchristensen Jan 17, 2014
06f5d83
Bind implementation of Parallel
benjchristensen Jan 20, 2014
c601c1a
Bugfix: GroupBy Completion
benjchristensen Jan 20, 2014
0bb6666
Operator Class
benjchristensen Jan 20, 2014
bcf9807
BugFix: Another GroupBy use case found and fixed.
benjchristensen Jan 21, 2014
00f4488
Get OperationReplay working via OnSubscribeFunc to Action1 bridge
benjchristensen Jan 21, 2014
4964d5d
Deprecate the Old Create Method
benjchristensen Jan 21, 2014
a225cff
Groovy: MetaMethod to disambiguate the create method during deprecati…
benjchristensen Jan 21, 2014
8b97295
Temporarily disable Scala/Kotlin builds until they are fixed
benjchristensen Jan 21, 2014
4571e92
Updates while validating no major performance and memory regressions
benjchristensen Jan 21, 2014
1f2a92a
Cleanup after rebase.
benjchristensen Jan 21, 2014
a283deb
Remove use of @SafeVarargs (breaking Java6 build)
benjchristensen Jan 21, 2014
f90ba89
Organize and Import
benjchristensen Jan 21, 2014
2765090
FromIterable: Remove superfluous onComplete
benjchristensen Jan 21, 2014
fcefa21
GroupBy: No need to close over parentSubscription
benjchristensen Jan 21, 2014
f7e34eb
Simple Javadoc for Bind
benjchristensen Jan 21, 2014
47c20f8
Fixed ZipWithIndex using mutable state (not pretty)
AppliedDuality Jan 21, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.groovy;

import groovy.lang.Closure;
import rx.Operator;
import rx.Subscription;
import rx.util.functions.Action1;

public class GroovyCreateWrapper<T> implements Action1<Operator<? super T>> {

private final Closure<Void> closure;

public GroovyCreateWrapper(Closure<Void> closure) {
this.closure = closure;
}

@Override
public void call(Operator<? super T> op) {
Object o = closure.call(op);
/*
* If the new signature is being used, we will get NULL back.
* If the old is being used we will get a Subscription back.
*/
if (o != null) {
op.add((Subscription) o);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@
package rx.lang.groovy;

import groovy.lang.Closure;
import groovy.lang.GroovySystem;
import groovy.lang.MetaMethod;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.codehaus.groovy.reflection.CachedClass;
import org.codehaus.groovy.reflection.ReflectionCache;
import org.codehaus.groovy.runtime.m12n.ExtensionModule;
import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
Expand Down Expand Up @@ -75,6 +70,9 @@ public List<MetaMethod> getMetaMethods() {
}

private MetaMethod createMetaMethod(final Method m) {
if (m.getDeclaringClass().equals(Observable.class) && m.getName().equals("create")) {
return specialCasedOverrideForCreate(m);
}
return new MetaMethod() {

@Override
Expand Down Expand Up @@ -109,12 +107,11 @@ public Object invoke(Object object, Object[] arguments) {
if (o instanceof Closure) {
if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyActionWrapper((Closure) o);
} else if(OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
} else if (OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyOnSubscribeFuncWrapper((Closure) o);
} else {
newArgs[i] = new GroovyFunctionWrapper((Closure) o);
}

} else {
newArgs[i] = o;
}
Expand Down Expand Up @@ -152,4 +149,55 @@ public CachedClass[] getParameterTypes() {
}
};
}

/**
* Special case until we finish migrating off the deprecated 'create' method signature
*/
private MetaMethod specialCasedOverrideForCreate(final Method m) {
return new MetaMethod() {

@Override
public int getModifiers() {
return m.getModifiers();
}

@Override
public String getName() {
return m.getName();
}

@Override
public Class getReturnType() {
return m.getReturnType();
}

@Override
public CachedClass getDeclaringClass() {
return ReflectionCache.getCachedClass(m.getDeclaringClass());
}

@Override
public Object invoke(Object object, final Object[] arguments) {
return Observable.create(new GroovyCreateWrapper((Closure) arguments[0]));
}

@SuppressWarnings("rawtypes")
@Override
public CachedClass[] getParameterTypes() {
Class[] pts = m.getParameterTypes();
CachedClass[] cc = new CachedClass[pts.length];
for (int i = 0; i < pts.length; i++) {
if (Function.class.isAssignableFrom(pts[i])) {
// function type to be replaced by closure
cc[i] = ReflectionCache.getCachedClass(Closure.class);
} else {
// non-function type
cc[i] = ReflectionCache.getCachedClass(pts[i]);
}
}
return cc;
}
};
}

}
Loading