Kotlin的协程:flow"/>
Kotlin的协程:flow
flow 介绍
之前介绍的启动协程方法,比如 launch、async 都是协程的单次启动。如果有复杂场景,比如发送多个数据,就需要使用 flow 数据流。在 flow 中,数据如水流一样经过上游发送,中间站处理,下游接收。
创建 flow
创建 flow 有 3 种方式:
- flow{}
- flowOf()
- asFlow()
flow
flow{} 中使用 emit 发送数据。
fun flowEmit() = runBlocking {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).collect {// 6 8println(it)}
}
flowOf
flowOf() 可以将指定的一串数据转换为 flow,接收可变参数。
fun flowOfFun() = runBlocking {flowOf(1, 2, 3, 4, 5).filter { it > 2 }.map { it * 2 }.take(2).collect {// 6 8println(it)}listOf(1, 2, 3, 4, 5).filter { it > 2 }.map { it * 2 }.take(2).forEach {// 6 8println(it)}
}
asFlow
asFlow() 可以将 List 集合转换为 flow。toList() 可以将 flow 转换为 List 集合。
fun flow2list() = runBlocking {flowOf(1, 2, 3, 4, 5)// flow to list.toList().filter { it > 2 }.map { it * 2 }.take(2).forEach {println(it)}listOf(1, 2, 3, 4, 5)// list as flow.asFlow().filter { it > 2 }.map { it * 2 }.take(2).collect {println(it)}
}
中间操作符
创建 flow 之后使用中间操作符处理 flow 的每一个数据。flow 的中间操作符和 list 集合的操作符非常类似。
常用中间操作符:
- filter
- map
- take
filter
filter 传入判断条件,条件满足时过滤数据,否则不将数据流向下游。
map
map 传入映射函数,将每个数据传入映射函数,得到结果继续传入下游。
take
take 传入非负整数 n,取前 n 个数据传入下游。
fun flowEmit() = runBlocking {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).collect {// 6 8println(it)}
}
终止操作符
collect 是 flow 的终止操作符,收集每一个数据经过中间操作符后的最终结果,表示 flow 流的终止,后面不能再调用中间操作符。
除了 collect,还有一些其他的终止操作符,first、single、fold、reduce。
collect
返回所有元素,结束 flow。
fun flowEmit() = runBlocking {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).collect {// 6 8println(it)}
}
first
返回第一个元素,结束 flow。
fun flowFirst() = runBlocking {val first = flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).first()// 6println(first)
}
single
返回唯一元素,结束flow。不能多于一个,也不能一个没有。
fun flowSingle() = runBlocking {val single = flow {emit(3)}.filter {it > 2}.map {it * 2}.take(2).single()// 6println(single)
}
fold
折叠所有元素。指定一个函数和初始值,对每一个元素反复执行函数,返回最后的结果。
fun flowFold() = runBlocking {val fold = flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).fold(0) { acc, value ->acc + value}// 14println(fold)
}
reduce
reduce 和 fold 很类似,reduce 没有初始值。
fun flowReduce() = runBlocking {val reduce = flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).reduce { acc, value ->acc + value}// 14println(reduce)
}
first、single、fold、reduce 本质都是封装了 collect ,因此它们都是终止操作符。
生命周期
onStart
onStart 是 flow 的开始生命周期回调。onStart 的执行时机和它在 flow 位置无关。
对比下面两个方法,onStart 都会在第一时间回调。
fun onStartFun() = runBlocking {flowOf(1, 2, 3, 4, 5).filter {println("filter: $it")it > 2}.map {println("map: $it")it * 2}.take(2).onStart {println("onStart")}.collect {println("collect: $it")}
}fun onStartFun2() = runBlocking {flowOf(1, 2, 3, 4, 5).take(2).filter {println("filter: $it")it > 2}.map {println("map: $it")it * 2}.onStart {println("onStart")}.collect {println("collect: $it")}
}
onComplete
flow 执行完后回调 onComplete。onComplete 的执行时机和它在 flow 的位置无关。
flow 正常执行完回调 onComplete。
fun onCompleteFun() = runBlocking {flowOf(1, 2, 3, 4, 5).onCompletion {println("onCompletion")}.filter {println("filter: $it")it > 2}.take(2).collect {println("collect: $it")}
}
flow 执行被取消,或者 flow 执行中出现异常。
fun cancelOnCompleteFun() = runBlocking {launch {flow {emit(1)emit(2)emit(3)}// collect: 1//collect: 2//cancel//onCompletion first: kotlinx.coroutines.JobCancellationException.onCompletion {println("onCompletion first: $it")}.collect {println("collect: $it")if (it == 2) {// cancel flowcancel()println("cancel")}}}delay(1000)flowOf(4, 5, 6)
// collect: 4
// onCompletion second: java.lang.IllegalStateException.onCompletion {println("onCompletion second: $it")}.collect {println("collect: $it")throw IllegalStateException()}}
异常处理
flow 的异常处理可以分为上游异常和下游异常。上游异常指创建 flow 或者中间操作符发生的异常。下游异常指终止操作符 collect 发生的异常。
上游异常
上游异常可以用 catch 函数捕获异常。catch 函数和它的位置相关,只能捕获 catch 上游的异常。
fun flowCatch() = runBlocking {val flow = flow {emit(1)emit(2)throw IllegalStateException()emit(3)}flow.map {it * 2}.catch {println("catch: $it")}.collect {println(it)}// 2// 4// catch: java.lang.IllegalStateException
}
下游异常
下游异常不能用 catch 函数,需要在 collect 的作用域用 try-catch 捕获。
catch 函数无法捕获下游的 filter 除 0 异常。
fun flowCatchDownStream() = runBlocking {val flow = flow {emit(1)emit(2)emit(3)}flow.map {it * 2}.catch {println("catch: $it")}.filter {it / 0 > 1}.collect {println(it)}// Exception in thread "main" java.lang.ArithmeticException: / by zero
}
使用 try-catch 捕获 collect 的异常。
fun flowTryCatch() = runBlocking {flowOf(4, 5, 6).onCompletion {println("onCompletion second: $it")}.collect {try {println("collect: $it")throw IllegalStateException()} catch (e: Exception) {println("catch $e")}}
// collect: 4
// catch java.lang.IllegalStateException
// collect: 5
// catch java.lang.IllegalStateException
// collect: 6
// catch java.lang.IllegalStateException
// onCompletion second: null
}
线程切换
flowOn
flowOn 可以指定上游所有操作符运行的线程,和它的位置相关。
collect 运行在 main 线程,上游运行在 IO 线程,指定 DefaultDispatcher。
fun flowOn() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")emit(2)logX("Emit: 2")emit(3)logX("Emit: 3")}flow.filter {logX("Filter: $it")it > 2}.flowOn(Dispatchers.IO).collect {logX("Collect: $it")}
// ================================
// Start
// Thread:DefaultDispatcher-worker-1, time:1666096501866
// ================================
// ================================
// Filter: 1
// Thread:DefaultDispatcher-worker-1, time:1666096501917
// ================================
// ================================
// Emit: 1
// Thread:DefaultDispatcher-worker-1, time:1666096501917
// ================================
// ================================
// Filter: 2
// Thread:DefaultDispatcher-worker-1, time:1666096501917
// ================================
// ================================
// Emit: 2
// Thread:DefaultDispatcher-worker-1, time:1666096501917
// ================================
// ================================
// Filter: 3
// Thread:DefaultDispatcher-worker-1, time:1666096501917
// ================================
// ================================
// Emit: 3
// Thread:DefaultDispatcher-worker-1, time:1666096501917
// ================================
// ================================
// Collect: 3
// Thread:main, time:1666096501917
// ================================
}
flowOn 在 filter 之前,emit 执行在 IO 线程,filter 和 collect 执行在 main 线程。
fun flowOnIO() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}flow.flowOn(Dispatchers.IO).filter {logX("Filter: $it")it > 0}.collect {logX("Collect: $it")}// ================================
// Start
// Thread:DefaultDispatcher-worker-1, time:1666165816908
// ================================
// ================================
// Emit: 1
// Thread:DefaultDispatcher-worker-1, time:1666165816942
// ================================
// ================================
// Filter: 1
// Thread:main, time:1666165816944
// ================================
// ================================
// Collect: 1
// Thread:main, time:1666165816944
// ================================
}
因为 flowOn 只能用于上游,在 collect 可以用 withContext 切换线程,但不建议这么用。
collect 运行在 DefaultDispatcher,其他运行在 main 线程。
fun flowWithContext() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}flow.filter {logX("Filter: $it")it > 0}.collect {// 不建议withContext(Dispatchers.IO) {logX("Collect: $it")}}// ================================
// Start
// Thread:main, time:1666167319244
// ================================
// ================================
// Filter: 1
// Thread:main, time:1666167319297
// ================================
// ================================
// Collect: 1
// Thread:DefaultDispatcher-worker-2, time:1666167319311
// ================================
// ================================
// Emit: 1
// Thread:main, time:1666167319312
// ================================
}
flow 的 emit、filter、collect 都运行在 DefaultDispatcher。
fun flowWithContextAll() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}// 不建议withContext(Dispatchers.IO) {flow.filter {logX("Filter: $it")it > 0}.collect {logX("Collect: $it")}}
// ================================
// Start
// Thread:DefaultDispatcher-worker-1, time:1666167769589
// ================================
// ================================
// Filter: 1
// Thread:DefaultDispatcher-worker-1, time:1666167769645
// ================================
// ================================
// Collect: 1
// Thread:DefaultDispatcher-worker-1, time:1666167769645
// ================================
// ================================
// Emit: 1
// Thread:DefaultDispatcher-worker-1, time:1666167769645
// ================================
}
launchIn
flow 提供了 launchIn 函数指定在哪个线程执行。launchIn 运行在指定的 CoroutineScope。
flowOn 之前的运行在 Dispatchers.IO,下游运行在 launchIn 指定的 scope。
fun flowLaunchIn() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}val scope = CoroutineScope(mySingleDispatcher)flow.flowOn(Dispatchers.IO).filter {logX("Filter: $it")it > 0}.onEach {logX("Collect: $it")}.launchIn(scope)delay(100L)// ================================
// Start
// Thread:DefaultDispatcher-worker-1, time:1666168824669
// ================================
// ================================
// Emit: 1
// Thread:DefaultDispatcher-worker-1, time:1666168824704
// ================================
// ================================
// Filter: 1
// Thread:mySingleThread, time:1666168824706
// ================================
// ================================
// Collect: 1
// Thread:mySingleThread, time:1666168824706
// ================================}
launchIn 调用了 scope 的 launch,然后执行 collect。相当于终止操作符。
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {collect() // tail-call
}
flow 是冷的
flow 是冷的,只有接收者存在的情况下才会发送数据。如果不调用 collect,emit 不会执行。相反 channel 是热的,不管有没有接收者都会发送。
flow 的 emit 未执行。
fun flowCold() = runBlocking {val flow = flow {(1..3).forEach {println("Before send $it")emit(it)println("Send $it")}}val channel = produce<Int>(capacity = 0) {(1..3).forEach {println("Before send $it")send(it)println("Send $it")}}println("end")
// end
// Before send 1}
总结
flow 是 kotlin 提供的解决复杂异步场景的方案。
- flow 由创建、中间操作符、终止操作符三个部分组成。
- flow 的生命周期可以分为 onStart 和 onComplete,与它们在 flow 的位置无关。
- flow 的异常处理使用 catch。catch 与位置相关。
- flow 的线程切换使用 flowOn 和 launchIn。flowOn 控制上游,launchIn 控制全局。
- flow 是冷的,只有存在接收者它才会开始执行。
更多推荐
Kotlin的协程:flow
发布评论