【翻译】深入 Kotlin 协程

2017-10-26 by Liuqingwen | Tags: Kotlin 翻译

一、前言

翻译好的文章也是一种学习方法! smiley
原文标题:Diving deep into Kotlin Coroutines
原文地址:https://www.kotlindevelopment.com/deep-dive-coroutines/
原文作者:Adrian Bukros

二、正文

编写 Kotlin 协程相关的文章是当下比较流行的一个话题,这当然是有极好的理由支撑着的。JetBrains 公司的伙伴们创造了一门非常实用的计算机语言,赢得了成千上万开发者们的心,还介绍了一个稳定(目前仍在实验阶段)的特性,承诺在避免冗繁的前提下编写出优雅的异步代码。我将会指导你使用协程相关的基本示例,并观察背后到底发生了什么。

为什么像协程这种解决方案非常有必要?

在现代应用程序开发中,处理多线程任务是不可避免的工作。一边在 UI 上显示花哨的加载动画一边等待网络请求就是一个简单的例子,这不得不使用异步代码来解决。在手机操作系统上,比如 Android 的 UI 线程用于处理用户交互,像那些点击按钮和手势操作,以及显示层的渲染工作等。这些都是最基本的,让我们来看一个例子吧!点击一个按钮之后,应用程序开始从网络上下载 JSON 数据,然后反序列化,最后把结果更新在显示层上。(反序列化和网络交互通常可以用一些聪明的类库来处理,以降低程序员的开发复杂度,但是为了展示例子的目的,还是让我们看这个例子吧。)那么你会如何写代码呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun fetchUserString(userId: String): String {
// request user from network
// return user String
}
fun deserializeUser(userString: String): User {
// deserialize
// return User
}
button.setOnClickListener {
val userString = fetchUserString("1")
val user = deserializeUser(userString)
showUserData(user)
}

我觉得这是最直接(也很幼稚)的一种方式,提供了解决上述问题的方案。但是这里面有很大的问题。

  1. 在 UI 线程上启动一个网络请求是不明智的做法(在安卓上这甚至不可能,因为你的应用程序会抛出一个丑陋的 NetworkOnMainThreadException 异常),由于网络请求经常会花费至少半秒的时长,而此期间 UI 更新将会被暂停。没有花哨的加载动画,也不能滚动,等等。
  2. 反序列化是一个很消耗 CPU 资源的操作,这会吃掉帧渲染所需的大量资源。本质上,这个结果和前面一点是一样的。

那么我们怎么解决这些问题呢?我们需要把网络请求和反序列化分派到一个后台进程中执行。听起来很简单,但是实现它的最好方式是什么呢?

解决方案 1 :Callbacks 回调

假设现在重构我们的 fetchUserString 方法和 deserializeUser 方法来使它在后台的线程中工作。程序仍然需要等待后台任务完成后在 UI 上执行返回结果的显示。我们可以通过给运行函数提供回调来达到目的,就像这样:

1
2
3
4
5
6
7
button.setOnClickListener {
fetchUserString("1") { userString ->
deserializeUser(userString) { user ->
showUserData(user)
}
}
}

但这个解决方案导致的问题是随着回调函数调用次数的增加,代码开始变得不那么易读了。这种现象叫做:回调的地狱。我们必须避免这种情况发生。

解决方案 2 : Reactive 反应链方法

这种反应链的方法提供了一种更加有效的代码编写方式,它能组合方法的回调以避免函数方法的多重嵌套:

1
2
3
4
5
6
7
8
9
button.setOnClickListener {
fetchUserString("1")
.flatMap { userString ->
deserializeUser(userString)
}
.subscribe { user ->
showUserData(user)
}
}

大部分人对这种反应链解决方式感到非常满意(包括我自己),但是我们必须不得不承认的是,这个代码仍然离我们的刚开始的写法相差甚远 —— 对于使用者来说还是有那么一点复杂度的。那么还有其他的更加有效的方式来实现程序并行计算运行吗?有的。

解决方案 3 : Coroutines 协程

这是我们使用协程来实现的代码的模样:

1
2
3
4
5
6
7
button.setOnClickListener {
launch(UI){
val userString = fetchUserString("1").await()
val user = deserializeUser(userString).await()
showUserData(user)
}
}

这个代码看起来基本上和我们一开始写的代码是一样的,并且效果和预期的一致(不会阻塞 UI 线程)。那么让我们来看看具体怎样使用命令式的风格写出类似的异步代码吧!

如何使用协程?

协程基于一种新的函数类型,叫做挂起函数。我们可以在函数名称前使用一种新的语言关键字 suspend 来标记。用这个关键字标记的函数能够暂停一个协程的执行,且不会阻塞当前线程。

1
2
3
suspend fun fetchUserData(userId: String): String {
// return user String
}

一个挂起的函数只能被协程或者其他挂起的函数调用。通常我们会在协程库中 lambda 函数表达式的参数中看到他们,比如 async 函数:

1
2
3
public fun <T> async( … , … , block: suspend CoroutineScope.() -> T): Deferred<T> {
}

launch{}

如果你不考虑返回值,那么 launch 函数是使用协程的最简单工具。

1
2
3
4
5
val job = launch {
val userString = fetchUserString("1")
val user = deserializeUser(userString)
log(user.name)
}

被函数包裹的代码会分派到一个后台的线程中,而这个函数自己则返回一个 Job 实例,它可以在其他的协程中使用并被控制执行。调用 Job 类的 join() 方法将暂停它所包含的协程的运行。

async{}

通过使用 async 函数你可以达到和 launch 一样的效果,唯一一个非常重要的差别是:它有返回值。

1
2
3
4
5
val user = async {
val userString = fetchUserString("1")
val user = deserializeUser(userString)
user
}.await()

这个异步方法返回一个 Defered<T> 实例,访问它的 await() 方法你能够得到最后计算的实际结果。

重构我们的代码,这个问题的解决方案如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun fetchUserString(userId: String) = async {
// request user from network
// return user String
}
fun deserializeUser(userString: String) = async {
// deserialize
// return User
}
launch(UI) {
val userString = fetchUserString("1").await()
val user = deserializeUser(userString).await()
showUserData(user)
}

现在我们知道了如何获取协程执行后的返回值,但是如果我们需要多个返回值呢?那就是 Channel 要做的事情了 —— 你可以在这里阅读了解更多关于它的资料。

在深入协程之前,让我们来看看其他编程语言关于异步编程模式的不同实现方式!

光天化日之下无新鲜感……或者还是有的?

在 C# 5.0 中,追溯到 2012 年,关于异步函数微软提出了一个特性,非常类似 Kotlin 中的协程:

1
2
3
4
5
public async Task<T> doWork()
{
// do some work
}
var result = await doWork()

他们的实现方式在关于异步代码应该写成什么样的看法上拥有一致的基本理念,但是有几个主要的区别。

  1. 在 C# 中 asyncawait 都是关键字
  2. 在 C# 中 async 函数只能返回一个 Task 实例或者返回空

如果你仔细观察协程的这个例子,你会看到在 Kotlin 中, launch{}async{} , 以及 await() 函数都是普普通通的的函数(它们有些采用 suspend 关键字标记了)。这在命名和返回值类型方面给开发者手头留了一个最灵活的工具。

线程 vs 协程

唤醒一个线程需要巨大的资源开销。在一个现代化系统上,一个线程非常容易就能吃掉 1M 多的内存。在当前的上下文中,我们可以通过调用协程(根据文档)来作为“轻量级”的线程。通常,一个协程坐落在一个实际的线程池当中,专门用于后台任务的执行操作,这也就是协程为什么如此高效的原因。它只会在需要的时候才会使用系统资源。参考如下代码:

1
2
3
4
5
6
7
val jobs = List(100_000) {
launch {
delay(1000)
print(".")
}
}
jobs.forEach { it.join() }

如果你不使用协程而是使用实际的线程,那么程序将会非常耗时,甚至有可能被系统终结运行。一个 OutOfMemoryException 内存溢出的异常在没有使用协程而是大量线程运行下很容易发生,这并不是什么新鲜事了。

原理是什么?

让我们看一下 async{} 函数的签名是什么样子的!

1
2
3
4
5
public fun <T> async(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>

第一个参数是 CoroutineContext ,它定义了线程执行的位置。它有个默认值,最终指向一个定义好了的线程池。当然这完全可以使用其他实现方式。在上面那个例子中,我是在 UI 这个协程的上下文中使用 launch 函数,来自于 Anko 库。在那种特定上下文的情形下,调用一个协程可以保证与 UI 相关连的代码(比如设置标签的文本)都能在 UI 线程中被调用。

第二个参数 start 是指明协程何时应该开始执行。默认情况下,当运行到 async{} 或者 launch{} 函数的时候,协程内部任务就开始执行。通过使用 CoroutineStart.LAZY 这个值,可以让协程只在开发者显式调用返回的 Deferred<T> 实例或者 Job 实例的 await() 方法或者 join() 方法才开始运行。

神奇的状态机

协程的内部实现机制涉及到一些关于编译器危险魔法的黑色艺术。作为简化描述,这里发生的事情包括:

每一个挂起的函数和 lambda 表达式都有一个隐藏的参数,它在函数唤醒的时候会被隐式地传入到函数中。这个参数就是一个 Continuation 实例,它代表着一段应该在某个协程挂起后才执行的代码块。

让我们看看这个 await 函数,就像下面的代码:

1
suspend fun <T> Deferred<T>.await(): T

然而,实际上它更像这样:

1
fun <T> Deferred<T>.await(continuation: Continuation<T>): Any?

这个 await 函数的返回值类型参数 T 现在是 continuation 里的类型参数了。结尾返回值的签名 Any 是用于控制协程运行的流程。如果它被挂起,那么这个 await 的结果将会是一个特殊的值: COROUTINE_SUSPENDED 。如果没有被挂起,它会返回这个函数的类型参数 T 的结果。

对应于每一个协程的 lambda 表达式,都将会在编译期间创建一个新的类。这个类本质上就是一个状态机。编译器会分析这些代码并查找那些挂起的函数 —— 这些函数的位置点就对应这个状态机的当前状态(因为这是程序能被挂起暂停的地方)。

根据我们的代码示例如下:

1
2
3
4
5
launch(UI){
val userString = fetchUserString("1").await() // suspension point #1
val user = deserializeUser(userString).await() // suspension point #2
showUserData(user)
}

在这个生成的类中,当运行代码所在的线程没有被阻塞时,程序的执行将从某个状态跳到另一个状态。在某个挂起的函数执行结束后,同时函数返回结果也处于可用状态,那么它的 Continuation 参数将会触发这个状态机,使其跳转到下一个状态中。

异常处理

异常处理和平常没两样。想象一下之前所描述的状态机被嵌入到一个巨大的 try/catch 代码块中!如果某个地方抛出了一个异常,它将会被捕获并传播下去,意味着你不需要做任何的改变。工作照常进行。

1
2
3
4
5
6
7
8
9
10
11
12
launch(UI) {
progressBar.visibility = View.VISIBLE
try {
val userString = fetchUserString("1").await()
val user = deserializeUser(userString).await()
showUserData(user)
} catch (ex: Exception) {
log(ex)
} finally {
progressBar.visibility = View.GONE
}
}

可取消作业

取消一个协程是可行的,类似取消线程的运行:

1
2
3
4
val job = launch {
// do work
}
job.cancel()

调用 Job 或者 Deferred<T> 实例的 cancel() 方法将会终止协程的内部运行,前提是处理 isActive 标志被正确实现。

1
2
3
4
5
val job = launch {
while (isActive){
//do work
}
}

同时, isActive 的值在标准库中会在子协程的挂起点之间被检查,所以你仅仅只需检查你自己的处于长时间计算运行代码块中的 isActive 值就可以了。

总结

协程依然还处在实验阶段,意味着 API 会发生变化,但是这些特性确实已经很稳定 —— 它会一直在这里的。 JetBrains 公司在开发者背后全力以赴地开发,我已经迫不及待地想看到并行编程的将来会是怎样了。值得一试,这是肯定的!

资源

Andrey Breslav - Kotlin Coroutines
MCE 2017: Svetlana Isakova, Coroutines in Kotlin
Kotlinlang.org Coroutines
Github.com Kotlin coroutines


Comments: