-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProfileActionProcessor.kt
72 lines (66 loc) · 3.82 KB
/
ProfileActionProcessor.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.cziyeli.domain.user
import com.cziyeli.data.Repository
import com.cziyeli.domain.stash.SimpleCardActionProcessor
import com.cziyeli.domain.summary.StatsAction
import com.cziyeli.domain.tracks.TrackModel
import com.cziyeli.soundbits.root.RootActionProcessor
import io.reactivex.Observable
import io.reactivex.ObservableTransformer
import lishiyo.kotlin_arch.utils.schedulers.BaseSchedulerProvider
import javax.inject.Inject
import javax.inject.Singleton
/**
* Processes actions for the Profile tab.
*/
@Singleton
class ProfileActionProcessor @Inject constructor(private val repository: Repository,
private val schedulerProvider: BaseSchedulerProvider,
private val userActionProcessor: UserActionProcessor,
private val simpleCardActionProcessor: SimpleCardActionProcessor,
private val rootActionProcessor: RootActionProcessor
) {
private val TAG = ProfileActionProcessor::class.java.simpleName
val combinedProcessor: ObservableTransformer<ProfileActionMarker, ProfileResultMarker> = ObservableTransformer { acts ->
acts.publish { shared ->
Observable.merge<ProfileResultMarker>(
shared.ofType<UserAction.LoadLikedTracks>(UserAction.LoadLikedTracks::class.java)
.compose(rootActionProcessor.likedTracksProcessor),
shared.ofType<StatsAction.FetchFullStats>(StatsAction.FetchFullStats::class.java)
.compose(simpleCardActionProcessor.fetchStatsProcessor),
shared.ofType<ProfileAction.StatChanged>(ProfileAction.StatChanged::class.java)
.compose(fetchStatsProcessor),
shared.ofType<ProfileAction.FetchRecommendedTracks>(ProfileAction.FetchRecommendedTracks::class.java)
.compose(fetchRecommendedTracksProcessor)
).mergeWith(
shared.ofType<ProfileAction.Reset>(ProfileAction.Reset::class.java)
.compose(resetProcessor)
).mergeWith(
shared.ofType<UserAction.ClearUser>(UserAction.ClearUser::class.java).compose(userActionProcessor.clearUserProcessor)
).retry() // don't unsubscribe ever
}
}
private val resetProcessor: ObservableTransformer<ProfileAction.Reset, ProfileResult.Reset> = ObservableTransformer {
action -> action.map { ProfileResult.Reset() }
}
private val fetchStatsProcessor: ObservableTransformer<ProfileAction.StatChanged, ProfileResult.StatChanged> = ObservableTransformer {
action -> action.switchMap { act ->
Observable.just(act)
.map { act.currentMap.updateWithStat(act.stat) }
.subscribeOn(schedulerProvider.io())
.map { ProfileResult.StatChanged(statsMap = it)}
}
}
private val fetchRecommendedTracksProcessor: ObservableTransformer<ProfileAction.FetchRecommendedTracks,
ProfileResult.FetchRecommendedTracks> = ObservableTransformer { action -> action.switchMap { act ->
repository
.fetchRecommendedTracks(limit = act.limit,
seedGenres = act.seedGenres.joinToString(","),
attributes = act.attributes)
.subscribeOn(schedulerProvider.io())
.map { it.tracks.map { TrackModel.create(it) }.filter { it.isSwipeable } }
.map { ProfileResult.FetchRecommendedTracks.createSuccess(it) }
.onErrorReturn { ProfileResult.FetchRecommendedTracks.createError(it) }
.startWith(ProfileResult.FetchRecommendedTracks.createLoading())
}
}
}