Kotlin Coroutine (1)
使用Kotlin已经有一段时间了,之前是断断续续的用一点,现在的主要语言已经切过来了。但是一直对Kotlin没有一个整体的把握,所以准备通过这一系列的文章来边学边总结。
第一部分主要写的是协程(Coroutine)的基础部分,第二部分准备写一写其相关的底层实现。
因为是基础部分,所以这一篇主要讲Coroutine的概念,其优势和适用场景,组成部分以及如何使用.
概念
Coroutine是一个非抢占式多任务的程序组件,支持暂停和恢复. Coroutine是一个并发设计模式,可以简化异步操作.而Kotlin Coroutine则是Jetbrains对coroutine的一套实现,是一个标准库.
特点
- 轻量级
- 更少的内存泄漏
- 良好的“取消”支持
- 从Android角度来说,Jetpack库中很多都支持
因为可以暂停,我们可以在一个线程中,运行很多的Coroutine,同时也不会阻塞线程.暂停比阻塞节约内存,也没有线程的上下文切换的消耗.实测跑10万个Coroutine很轻松,如果跑这么的线程,基本就OOM了.
暂停通过suspending function来实现,Kotlin中提供了suspend关键字,
suspend fun name() {
//empty
}
Coroutine通过结构化并发(Structure Concurrency)来减少内存泄漏,它保证了所有的子任务结束后,父任务才退出.
Coroutine通过CancellationExeption来处理取消,通常Cancel会逐级上抛,直至root.
和其他类似技术的对比
因为Coroutine是用来实现异步多任务的,在日常开发中,我们也会接触到许多类似的技术,比如常见的线程、回调还有JS中的promise、async和await,Java中的Future,以及现在用的很多的React.之前Kotlin的网站上有对比这些技术的说明,不知道为什么现在没有了.
其他技术的不足,主要包括资源占用,异常处理,学习难度等方面
- 成本高,占用过多系统资源-线程
- 回调地狱,会导致代码可读性降低,同时不易处理异常
- 其他的也都存在不同的代码结构,异常处理困难等问题
总而言之,Coroutine是“最好”的异步处理技术.
核心组成部分
主要包括CoroutineScope和CoroutineContext,CoroutineContext又包括Job和CoroutineDispatcher.
CoroutineScope
其为新的coroutine提供一个作用域,这是结构化并发的基础,将所有的任务都限定在一个Scope中.构建CoroutineScope有多种方式,包括
- coroutine builder
- runBlocking
- launch
- async etc.
- scoping function
- withContext
- coroutineScope
- supervisorScope
- factory function
- CoroutineScope()
- MainScope()
先说一下 factory function,他们主要用来创建独立的scope,并且需要自己在任务结束时,将scope主动取消. CoroutineScope和MainScope唯一的差别在于Dispatcher,这个后面会说到.
一般最常用的就是coroutine builder,有的是suspending function,只能在scope中调用,而runBlocking比较特殊,不是suspending function,所以可以在主线程中调用,用于连接coroutine和非coroutine. launch是返回一个Job,而async则返回一个Defered对象,需要和await配对使用.另外他们都是CoroutineScope的extension方法.
withContext则用于给定/切换Context.
coroutineScope继承了父scope的context,但覆写了父scope的Job,它主要的用处在于并行处理子任务.如果某个任务失败了,所有的子任务都会被取消,当前scope也会失败.
supervisorScope并不会因为子任务的失败而取消,其他子任务也不会相互干扰.
CoroutineContext
像Android程序的Context一样,Coroutine也有一个Context,表示的是Coroutine运行的环境,用CoroutineContext来表示的,其主要元素是Job和CoroutineDispatcher.
Job
Job是Coroutine中执行的任务, 它是可取消的,并且有生命周期的. Job 可以被用在parent-child层级中,父Scope取消,会导致所有的子Job立即全部取消,与之对应的,子Job有一个失败(不是CancellationException)也会导致父Scope取消,以及所有其他的子Coroutine取消.supervisorJob提供了不同的形式,适用于其他情形.
最常用的是使用launch来创建一个Job,其随代码执行完成而终止;另外可以用构造方法Job(),来创建CompletableJob,它需要通过调用complete手动完成.
Job有以下几种状态,包括:
- New
- Active
- Completed
- Cancelled
- Completing
- Cancelling
Active 是默认的初始状态.
Job作为一个接口,自然有很多方法,所有的方法都是线程安全的,下面介绍一下常用的方法:
- start()
- cancel()
- join() suspend the coroutine until the job is complete.
- cancelAndJoin()
CoroutineDispatcher
决定Coroutine执行在某个或某些线程中,并可以将限定在某个特殊的线程或不限定. CoroutineDispatcher有多种类型,包括
- Default, 共享一个线程池,如果CoroutineScope创建时未指定,则默认使用该选项
- newSingleThreadContext, 新建一个thread来执行coroutine,但是当不用时,需要调用cancel回收,或者作为全局变量复用
- newFixedThreadContext, 新建一个线程池,用来执行coroutine
- Unconfined, 在调用者线程中开始创建,知道下一个suspension 方法调用,便转移到该方法的Dispatcher中.
默认情况下,dispatcher继承自外层CoroutineScope,而runBlocking创建的coroutine,限定于调用者线程.
withContext可以用来在同一个coroutine中切换context.
默认情况下,当在一个CoroutineScope中创建一个新的CoroutineScope,其继承了外层的CoroutineContext,它的Job也成了外层CoroutineContext的子Job.如果外层CoroutineScope取消了,则所有的子Job都会取消.
有两种方式可以改变这种行为:
- GlobalScope.launch 直接在顶层Scope创建CoroutineScope,从而不继承任何外层CoroutineScope
- 创建Coroutine时,传入特定的Job,见如下代码示例
launch(Job()) {
}
其他重要问题点
异常处理
我们已经知道,Coroutine被取消时,会抛出CancellationException,但是它会被忽略,被当作是正常情况.
这里主要关注两个问题:
- 正在取消的过程中发生的异常,如何处理
- 同一个Coroutine中的多个任务抛出的异常,是怎么处理的
对于Corooutine builder来说,异常的上抛有两种类型,一种是自动处理的,比如launch和actor, 另外一种需要调用者处理的,包括async和producer.
CoroutineExceptionHandler用来在Root Coroutine中处理uncaught异常,并且子Coroutine会将异常逐级上抛,最终到达Root,所以子Coroutine中的ExceptionHandler默认情况下,是用不到的.
而且我们不在CoroutineExceptionHandler中处理异常,仅仅用于输入日志,重启程序等,和Java中的Thread.uncaughtExceptionHandler很像.
如果子Coroutine通过调用cancel()来抛出CancellationException,父Coroutine是不会被取消的,如果是其他异常,则会导致父Coroutine被取消.
如果是在Android开发中,我们并不希望子Coroutine失败导致父Coroutine被取消,那么我们就需要Supervision,即只有当父Coroutine取消时,所有的子Coroutine才会取消.它可以通过两种方式来实现:
- SupervisorJob 它和普通Job只有一点差异,就是它的异常只向下传递
- supervisorScope 和上述类似
他们和普通的Coroutine还有一点差异在于,他们的异常是不会上抛的.
并发
如果Coroutine的Dispatcher是多线程的,那么其中的Job也会并发执行,从而也会有Java多线程并发的各种问题.
这里先提一下并发和并行的区别(Concurrency vs. Parallelism):
- 并发(Concurrency) 至少两个任务在推进,但并不一定同时发生,例如通过时间分片在单核机器上执行多任务
- 并行(Parallelism) 至少两个线程同时执行,这个必须在多核机器上才可以
Coroutine有些解决方案和传统的Thread方案一样,有一些则是专属的. 下面用一段Kotlin官方文档中的代码说明一下这个问题
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
通过运行结果可以知道,结果不是预期的.那么怎么做能够让结果等于10000呢?
可以通过给counter加volatile关键字吗?
这是一个典型的并发方面的错误,我们需要的是原子化操作(Atomic),但是volatile能做的非常有限,在原子化操作方面,仅适用于读和写变量,包括long和double的变量.
如果没有该关键字,原子化操作仅包含读写引用变量和基础数据类型(不包括long和double).
volatile还有一个作用,就是减少了内存不一致的错误,保证被volatile修饰的变量,如果发生变化,其他线程也可见.
而自增并不是一个原子化操作,所以并不能达到要求.
可以达到的要求的方式如下:
- 线程安全数据结构 AtomicInteger
- 线程限定
- 精细的
仅对自增操作,限定在单线程中,此方案效率较低,主要是因为对每一次自增都需要有线程切换 - 粗糙的
将massiveRun整个方法限定在单线程中,比上述方法要快
- 精细的
- 互斥
在Java中,我们会使用synchronized或者ReentrantLock,在Coroutine中我们使用Mutex,可以lock和unlock. - Actor
是一个集Coroutine和Channel的实体,它比加锁更高效,因为没有线程切换.
总结
这篇文章,主要是对官方文档的一些总结,在后续的实际使用过程中,总结的经验等,会逐步更新上来.
引用
https://en.wikipedia.org/wiki/Coroutine
https://developer.android.com/kotlin/coroutines
https://kotlinlang.org/docs/coroutines-overview.html
https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html
https://en.wikipedia.org/wiki/Linearizability