package com.topjohnwu.magisk.extensions import androidx.databinding.ObservableField import com.topjohnwu.magisk.utils.KObservableField import io.reactivex.* import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.disposables.Disposables import io.reactivex.functions.BiFunction import io.reactivex.schedulers.Schedulers import androidx.databinding.Observable as BindingObservable fun Observable.applySchedulers( subscribeOn: Scheduler = Schedulers.io(), observeOn: Scheduler = AndroidSchedulers.mainThread() ): Observable = this.subscribeOn(subscribeOn).observeOn(observeOn) fun Flowable.applySchedulers( subscribeOn: Scheduler = Schedulers.io(), observeOn: Scheduler = AndroidSchedulers.mainThread() ): Flowable = this.subscribeOn(subscribeOn).observeOn(observeOn) fun Single.applySchedulers( subscribeOn: Scheduler = Schedulers.io(), observeOn: Scheduler = AndroidSchedulers.mainThread() ): Single = this.subscribeOn(subscribeOn).observeOn(observeOn) fun Maybe.applySchedulers( subscribeOn: Scheduler = Schedulers.io(), observeOn: Scheduler = AndroidSchedulers.mainThread() ): Maybe = this.subscribeOn(subscribeOn).observeOn(observeOn) fun Completable.applySchedulers( subscribeOn: Scheduler = Schedulers.io(), observeOn: Scheduler = AndroidSchedulers.mainThread() ): Completable = this.subscribeOn(subscribeOn).observeOn(observeOn) /*=== ALIASES FOR OBSERVABLES ===*/ typealias OnCompleteListener = () -> Unit typealias OnSuccessListener = (T) -> Unit typealias OnErrorListener = (Throwable) -> Unit /*=== ALIASES FOR OBSERVABLES ===*/ fun Observable.subscribeK( onError: OnErrorListener = { it.printStackTrace() }, onComplete: OnCompleteListener = {}, onNext: OnSuccessListener = {} ) = applySchedulers() .subscribe(onNext, onError, onComplete) fun Single.subscribeK( onError: OnErrorListener = { it.printStackTrace() }, onNext: OnSuccessListener = {} ) = applySchedulers() .subscribe(onNext, onError) fun Maybe.subscribeK( onError: OnErrorListener = { it.printStackTrace() }, onComplete: OnCompleteListener = {}, onNext: OnSuccessListener = {} ) = applySchedulers() .subscribe(onNext, onError, onComplete) fun Flowable.subscribeK( onError: OnErrorListener = { it.printStackTrace() }, onComplete: OnCompleteListener = {}, onNext: OnSuccessListener = {} ) = applySchedulers() .subscribe(onNext, onError, onComplete) fun Completable.subscribeK( onError: OnErrorListener = { it.printStackTrace() }, onComplete: OnCompleteListener = {} ) = applySchedulers() .subscribe(onComplete, onError) fun Observable.updateBy( field: KObservableField ) = doOnNextUi { field.value = it } .doOnErrorUi { field.value = null } fun Single.updateBy( field: KObservableField ) = doOnSuccessUi { field.value = it } .doOnErrorUi { field.value = null } fun Maybe.updateBy( field: KObservableField ) = doOnSuccessUi { field.value = it } .doOnErrorUi { field.value = null } .doOnComplete { field.value = field.value } fun Flowable.updateBy( field: KObservableField ) = doOnNextUi { field.value = it } .doOnErrorUi { field.value = null } fun Completable.updateBy( field: KObservableField ) = doOnCompleteUi { field.value = true } .doOnErrorUi { field.value = false } fun Observable.doOnSubscribeUi(body: () -> Unit) = doOnSubscribe { ui { body() } } fun Single.doOnSubscribeUi(body: () -> Unit) = doOnSubscribe { ui { body() } } fun Maybe.doOnSubscribeUi(body: () -> Unit) = doOnSubscribe { ui { body() } } fun Flowable.doOnSubscribeUi(body: () -> Unit) = doOnSubscribe { ui { body() } } fun Completable.doOnSubscribeUi(body: () -> Unit) = doOnSubscribe { ui { body() } } fun Observable.doOnErrorUi(body: (Throwable) -> Unit) = doOnError { ui { body(it) } } fun Single.doOnErrorUi(body: (Throwable) -> Unit) = doOnError { ui { body(it) } } fun Maybe.doOnErrorUi(body: (Throwable) -> Unit) = doOnError { ui { body(it) } } fun Flowable.doOnErrorUi(body: (Throwable) -> Unit) = doOnError { ui { body(it) } } fun Completable.doOnErrorUi(body: (Throwable) -> Unit) = doOnError { ui { body(it) } } fun Observable.doOnNextUi(body: (T) -> Unit) = doOnNext { ui { body(it) } } fun Flowable.doOnNextUi(body: (T) -> Unit) = doOnNext { ui { body(it) } } fun Single.doOnSuccessUi(body: (T) -> Unit) = doOnSuccess { ui { body(it) } } fun Maybe.doOnSuccessUi(body: (T) -> Unit) = doOnSuccess { ui { body(it) } } fun Maybe.doOnCompleteUi(body: () -> Unit) = doOnComplete { ui { body() } } fun Completable.doOnCompleteUi(body: () -> Unit) = doOnComplete { ui { body() } } fun Observable>.mapList( transformer: (T) -> R ) = flatMapIterable { it } .map(transformer) .toList() fun Single>.mapList( transformer: (T) -> R ) = flattenAsFlowable { it } .map(transformer) .toList() fun Maybe>.mapList( transformer: (T) -> R ) = flattenAsFlowable { it } .map(transformer) .toList() fun Flowable>.mapList( transformer: (T) -> R ) = flatMapIterable { it } .map(transformer) .toList() fun ObservableField.toObservable(): Observable { val observableField = this return Observable.create { emitter -> observableField.get()?.let { emitter.onNext(it) } val callback = object : BindingObservable.OnPropertyChangedCallback() { override fun onPropertyChanged(sender: BindingObservable?, propertyId: Int) { observableField.get()?.let { emitter.onNext(it) } } } observableField.addOnPropertyChangedCallback(callback) emitter.setDisposable(Disposables.fromAction { observableField.removeOnPropertyChangedCallback(callback) }) } } fun T.toSingle() = Single.just(this) inline fun zip( t1: Single, t2: Single, crossinline zipper: (T1, T2) -> R ) = Single.zip(t1, t2, BiFunction { rt1, rt2 -> zipper(rt1, rt2) })