为什么要引入线程调度框架
Java本身是一个多线程语言,作为一个拥有UI部分内容的平台,Android在采用Java开发的过程中,设定了UI线程和IO线程两个环境,页面的渲染动作放在了UI线程,而常见的数据请求则放在IO线程,否则会出现数据请求占用页面渲染造成卡顿的可能。这个时候,合理调度线程就显得极为重要了。
最开始的时候,Android官方并没有推出线程调度框架,使得国内外一众开发者必须依赖Java自带的线程池去自己封装一个线程调度工具,这就很考验不同团队对于线程池的认识了,这是一个很硬核的东西,难度系数非常高,很多搞Java开发的人包括SpringBoot、Android等等面对着它,都一脸茫然。
因此,引入一个易用的线程调度框架,对于Android来说,是一件很重要的事情。
AsyncTask
- 为了便于开发者去调度UI和IO这两个线程,Google官方推出过AsyncTask,这个工具是一个抽象类,使用起来非常简单。
- 但是这个东西用到后面,会发现它并不强制与生命周期所绑定,在前端开发中,生命周期是一个很重要的概念,这个就会出现资源泄漏等等问题了。所以Google官方团队在Android SDK 30之后,已经是将这个类进行弃用了。
RxJava
RxJava是一个很常用的异步线程调度框架,这个东西比较强大的是,它不仅拥有线程调度能力,它还引入了链式调用(可以理解为建造者模式的一种表现形式)。通过这个链式调用再结合Java的lambda表达式,整体代码会变得非常简洁。而且,RxJava在思想上,它把获取数据这个动作可能发生的情况通过状态在监听状态中表现出来。
这样的话,回到Presenter或者ViewModel层,用户只需要关心的是,如何将各种状态下的数据传递出去给到View层即可,无需再过度关注数据层的内容,就能够实现整条数据请求责任链下的一数据源模式。
For Example: 串行任务的计算
interface Service {
/**
* - delay(1000L)
* - return 5
*/
fun fetchDataElementLength(): Int
/**
* - delay(3000L)
* - return list<String>
*/
fun fetchDataList(length: Int): List<String>
}
fun fetchABWithSerialOperation(): Single<String> {
return currentService.fetchDataElementLength()
.flatMap { length -> currentService.fetchDataList(length) }
.map { it.joinToString(", ") }
}
For Example: 并行任务的计算
interface Service {
/**
* - delay(1000L)
* - return "A"
*/
fun fetchDataA(): Single<String>
/**
* - delay(3000L)
* - return "B"
*/
fun fetchDataB(): Single<String>
}
fun fetchABWithParallelOperation(): Single<String> {
return Single.zip(
currentService.fetchDataA(),
currentService.fetchDataB()
) { dataA, dataB -> (dataA + dataB) }
}
实际线程调度动作:
val currentViewModel = ViewProvider(this or parentFragment or activity)[CurrentViewModel::class.java]
val startTimeWithSerialOperation = System.currentTimeMillis()
currentViewModel.fetchABWithSerialOperation()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
// TODO
}
override fun onError(e: Throwable) {
// TODO
}
override fun onSuccess(t: String) {
println("fetchABWithSerialRelation is $t")
println("---Spend Time is ${(System.currentTimeMillis() - startTimeWithSerialRelation) / 1000}s")
}
})
当然这里是Sample,使用的时候,订阅是放在ViewModel/Presenter中去执行的,然后再通过LiveData把结果返回给到View。
有一个地方要注意,我看到过有些项目在使用RxJava的时候,并没有注意到这个点,即要在ViewModel生命周期借宿的时候去取消请求。
class CurrentViewModel(
private val currentRepository: CurrentRepository,
): ViewModel() {
private val compositeDisposable = CompositeDisposable()
private val _error = MutableLiveData<Throwable>()
val error: LiveData<Throwable> = _error
private val _success = MutableLiveData<String>()
val success: LiveData<String> = _success
fun fetchABWithSerialRelation() {
currentRepository.fetchABWithParallelRelation()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.blockingSubscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
compositeDisposable.add(d)
}
override fun onError(e: Throwable) {
_error.value = e
}
override fun onSuccess(t: String) {
_success.value = t
}
})
}
override fun onCleared() {
compositeDisposable.clear()
}
}
当然,RxJava还有很多其他的操作符,这个具体需要使用的时候再去翻阅即可,操作符太多背会其实也没有什么用处,这个得根据业务按照文档做选择。
被乱用的Observable
我看过很多的网络请求,不知道开发者出于何种目的,把这些请求都去使用了Observable接收请求结果,就像下面这样写的
kotlininterface RetrofitService { @GET("airport/fetchAll") fun fetchAllAirports(): Observable<List<AirportDetails>> }
但这个为什么不使用Single呢?拿一个Observable去接收这个返回值,它的目的是什么呢?
Observable的意义在于,数据一旦发生变化,立刻就会有一个新的通知发送过来,常用于ViewPager等等多页面的情况下,比如说你在A页面修改了本地数据库的内容,而B页面是一个实时监听整个数据库所有信息的页面,那这个时候在B页面的ViewModel使用一个Observable不断刷新整个UI,这个是有必要的。
但请求网络URL就是一个fetch和一个receive的动作,数据本身就是一次性的,按理来说使用Single就是足够了的。
kotlininterface RetrofitService { @GET("airport/fetchAll") fun fetchAllAirports(): Single<List<AirportDetails>> }
Kotlin Coroutines
- 因为Kotlin是多平台语言,它可以编译在JS、JVM等等多个平台,但在JVM平台下,Kotlin的这种所谓协程,就是一套由Kotlin封装的线程调度框架。作为Android开发者,我只针对JVM这个平台谈谈我对这部分内容的理解。
- 从思路上来讲,这个是Kotlin官方基于JavaScript和Dart的Future的那一套思路,通过withContext调度改进的一种同步的方式写非阻塞式的异步代码的框架。但是回到异步调度的角度来理解这一个问题,其实只是用suspend标记这里为一个子任务体。
- 如果两个请求有相应的前后顺序关系(完成A请求后,根据A的返回值去调用B请求,即串行关系)的话。在RxJava中,人们往往是用flatMap去实现这种请求的,而来到Kotlin Coroutines,则使用等号将该子任务体内在父任务体中予以阻塞性的等待,擅长起名字的Kotlin官方称之为挂起函数。
interface Service {
/**
* - delay(1000L)
* - return 5
*/
suspend fun fetchDataElementLength(): Int
/**
* - delay(3000L)
* - return list<String>
*/
suspend fun fetchDataList(length: Int): List<String>
}
suspend fun fetchABWithSerialRelation(): String = withContext {
// 如果有异步切换线程的动作,放在具体的位置里面全方法调度线程位置,不要在一个方法中来来去去地调度
val length = currentService.fetchDataElementLength()
val data = currentService.fetchDataList(length)
return data.joinToString(", ")
}
如果要两个请求并没有先后顺序关系(A请求和B请求没有先后关系,但是需要拿到A的结果和B的结果去map成为一个返回值,即并行关系),那么RxJava中人们往往使用zip去实现这个操作。来到Kotlin Coroutines,与之相对应的是使用async + await。
interface Service {
/**
* - delay(1000L)
* - return listOf("A1", "A2", "A3")
*/
suspend fun fetchDataA(): List<String>
/**
* - delay(2000L)
* - return listOf("B1", "B2", "B3")
*/
suspend fun fetchDataB(): List<String>
}
suspend fun fetchABWithParallelRelation(): String = withContext {
val dataA = async { currentService.fetchDataA() }
val dataB = async { currentService.fetchDataB() }
return@withContext (dataA.await() + dataB.await()).joinToString(", ")
}
我对线程调度框架的认知
- 作为大前端开发者,我觉得多线程的Rx和单线程的Future都是值得一学习的,因为他们背后的思想是不一样的,而使用何种方式去实现异步线程的调度,这是UI前端开发很重要的一个知识。
- 而Kotlin的Coroutine,它可以作为一个进阶的框架来学习,它基于了Future的任务体阻塞,又参考了Rx的状态思路,从使用的角度也是一个很不错的框架,从思想上来说,它以一个结构化的方式掌控异步。
- 实际上,不学习Kotlin Coroutines也是可以的,以RxJava拥有的技术广度,完全可以面对所有的技术实现,这个得看团队对技术的追求。
- 要改变的是,RxJava之前的状态,在Kotlin Coroutines上是通过异常来捕获,以及写UnitTest的时候,需要调整一下。
- 对于开发者来讲,Kotlin Coroutine可能成为适应下一个团队的技术栈的一个点。而对于一个团队来讲,就需要团队内部所有人去考量,Kotlin Coroutine值不值得去花成本去适应。
- 当然对于我个人来讲,我就很喜欢项目中混杂着各种各样的技术,我不希望技术单一性的,我在确保两样技术的混合编程是绝对可靠的基础上,我是更愿意混合编程的。比如说我喜欢同时维护Java与Kotlin并存的项目、JavaScript和TypeScript并存的项目。