Skip to content

Commit

Permalink
update to rxjava3
Browse files Browse the repository at this point in the history
  • Loading branch information
springeye committed Oct 20, 2020
1 parent 66133ca commit e10c717
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 32 deletions.
4 changes: 4 additions & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ android {
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
}
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}

dependencies {
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ ext.deps = [
androidannotations: 'androidx.annotation:annotation:1.0.0',
design : 'com.google.android.material:material:1.0.0',

rxandroid : 'io.reactivex:rxandroid:1.2.1',
rxjava : 'io.reactivex:rxjava:1.3.4',
rxandroid : 'io.reactivex.rxjava3:rxandroid:3.0.0',
rxjava : 'io.reactivex.rxjava3:rxjava:3.0.7',
timber : 'com.jakewharton.timber:timber:4.6.0',
]
6 changes: 3 additions & 3 deletions rxbus/src/main/java/com/hwangjr/rxbus/Bus.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;

import rx.functions.Action1;
import io.reactivex.rxjava3.functions.Consumer;


/**
Expand Down Expand Up @@ -230,9 +230,9 @@ public void register(Object object) {
}

private void dispatchProducerResult(final SubscriberEvent subscriberEvent, ProducerEvent producer) {
producer.produce().subscribe(new Action1<Object>() {
producer.produce().subscribe(new Consumer() {
@Override
public void call(Object event) {
public void accept(Object event) throws Throwable {
if (event != null) {
dispatch(event, subscriberEvent);
}
Expand Down
10 changes: 4 additions & 6 deletions rxbus/src/main/java/com/hwangjr/rxbus/RxBusOlder.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;
import timber.log.Timber;

@Deprecated
Expand All @@ -36,8 +35,7 @@ public <T> Observable<T> register(@NonNull Object tag, @NonNull Class<T> clazz)
subjectList = new ArrayList<>();
mSubjectsMapper.put(tag, subjectList);
}

Subject<T, T> subject = new SerializedSubject<>(PublishSubject.<T>create());
Subject<T> subject = PublishSubject.<T>create().toSerialized();
subjectList.add(subject);
if (DEBUG) {
Timber.d("[register] mSubjectsMapper: " + mSubjectsMapper);
Expand Down
17 changes: 11 additions & 6 deletions rxbus/src/main/java/com/hwangjr/rxbus/entity/ProducerEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import com.hwangjr.rxbus.thread.EventThread;

import org.reactivestreams.Subscriber;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import rx.Observable;
import rx.Subscriber;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;


/**
* Wraps a 'producer' method on a specific object.
Expand Down Expand Up @@ -73,12 +78,12 @@ public void invalidate() {
* Invokes the wrapped producer method and produce a {@link Observable}.
*/
public Observable produce() {
return Observable.create(new Observable.OnSubscribe<Object>() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void call(Subscriber<? super Object> subscriber) {
public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
try {
subscriber.onNext(produceEvent());
subscriber.onCompleted();
emitter.onNext(produceEvent());
emitter.onComplete();
} catch (InvocationTargetException e) {
throwRuntimeException("Producer " + ProducerEvent.this + " threw an exception.", e);
}
Expand Down
17 changes: 10 additions & 7 deletions rxbus/src/main/java/com/hwangjr/rxbus/entity/SubscriberEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import rx.functions.Action1;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;


/**
* Wraps a single-argument 'subscriber' method on a specific object.
Expand Down Expand Up @@ -69,11 +71,12 @@ public SubscriberEvent(Object target, Method method, EventThread thread) {
}

private void initObservable() {
subject = PublishSubject.create();
subject.onBackpressureBuffer().observeOn(EventThread.getScheduler(thread))
.subscribe(new Action1<Object>() {
subject = PublishSubject.create().toSerialized();
subject.toFlowable(BackpressureStrategy.BUFFER)
.observeOn(EventThread.getScheduler(thread))
.subscribe(new Consumer() {
@Override
public void call(Object event) {
public void accept(Object event) throws Throwable {
try {
if (valid) {
handleEvent(event);
Expand Down
15 changes: 7 additions & 8 deletions rxbus/src/main/java/com/hwangjr/rxbus/thread/EventThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

import java.util.concurrent.Executor;

import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;
import rx.android.schedulers.HandlerScheduler;
import rx.schedulers.Schedulers;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;


public enum EventThread {
/**
Expand Down Expand Up @@ -55,6 +55,7 @@ public enum EventThread {
/**
* Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
*/
@Deprecated
IMMEDIATE,

/**
Expand Down Expand Up @@ -82,17 +83,15 @@ public static Scheduler getScheduler(EventThread thread) {
case COMPUTATION:
scheduler = Schedulers.computation();
break;
case IMMEDIATE:
case TRAMPOLINE:
scheduler = Schedulers.trampoline();
break;
case IMMEDIATE:
scheduler = Schedulers.immediate();
break;
case EXECUTOR:
scheduler = Schedulers.from(ThreadHandler.DEFAULT.getExecutor());
break;
case HANDLER:
scheduler = HandlerScheduler.from(ThreadHandler.DEFAULT.getHandler());
scheduler =AndroidSchedulers.from(ThreadHandler.DEFAULT.getHandler().getLooper());
break;
default:
scheduler = AndroidSchedulers.mainThread();
Expand Down

0 comments on commit e10c717

Please sign in to comment.