Kotlin的协程:flow

编程入门 行业动态 更新时间:2024-10-07 14:28:45

<a href=https://www.elefans.com/category/jswz/34/1766123.html style=Kotlin的协程:flow"/>

Kotlin的协程:flow

flow 介绍

之前介绍的启动协程方法,比如 launch、async 都是协程的单次启动。如果有复杂场景,比如发送多个数据,就需要使用 flow 数据流。在 flow 中,数据如水流一样经过上游发送,中间站处理,下游接收。

创建 flow

创建 flow 有 3 种方式:

  1. flow{}
  2. flowOf()
  3. asFlow()

flow

flow{} 中使用 emit 发送数据。

fun flowEmit() = runBlocking {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).collect {// 6 8println(it)}
}

flowOf

flowOf() 可以将指定的一串数据转换为 flow,接收可变参数。

fun flowOfFun() = runBlocking {flowOf(1, 2, 3, 4, 5).filter { it > 2 }.map { it * 2 }.take(2).collect {// 6 8println(it)}listOf(1, 2, 3, 4, 5).filter { it > 2 }.map { it * 2 }.take(2).forEach {// 6 8println(it)}
}

asFlow

asFlow() 可以将 List 集合转换为 flow。toList() 可以将 flow 转换为 List 集合。

fun flow2list() = runBlocking {flowOf(1, 2, 3, 4, 5)// flow to list.toList().filter { it > 2 }.map { it * 2 }.take(2).forEach {println(it)}listOf(1, 2, 3, 4, 5)// list as flow.asFlow().filter { it > 2 }.map { it * 2 }.take(2).collect {println(it)}
}

中间操作符

创建 flow 之后使用中间操作符处理 flow 的每一个数据。flow 的中间操作符和 list 集合的操作符非常类似。
常用中间操作符:

  1. filter
  2. map
  3. take

filter

filter 传入判断条件,条件满足时过滤数据,否则不将数据流向下游。

map

map 传入映射函数,将每个数据传入映射函数,得到结果继续传入下游。

take

take 传入非负整数 n,取前 n 个数据传入下游。

fun flowEmit() = runBlocking {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).collect {// 6 8println(it)}
}

终止操作符

collect 是 flow 的终止操作符,收集每一个数据经过中间操作符后的最终结果,表示 flow 流的终止,后面不能再调用中间操作符。

除了 collect,还有一些其他的终止操作符,first、single、fold、reduce。

collect

返回所有元素,结束 flow。

fun flowEmit() = runBlocking {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).collect {// 6 8println(it)}
}

first

返回第一个元素,结束 flow。

fun flowFirst() = runBlocking {val first = flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).first()// 6println(first)
}

single

返回唯一元素,结束flow。不能多于一个,也不能一个没有。

fun flowSingle() = runBlocking {val single = flow {emit(3)}.filter {it > 2}.map {it * 2}.take(2).single()// 6println(single)
}

fold

折叠所有元素。指定一个函数和初始值,对每一个元素反复执行函数,返回最后的结果。

fun flowFold() = runBlocking {val fold = flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).fold(0) { acc, value ->acc + value}// 14println(fold)
}

reduce

reduce 和 fold 很类似,reduce 没有初始值。

fun flowReduce() = runBlocking {val reduce = flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.filter {it > 2}.map {it * 2}.take(2).reduce { acc, value ->acc + value}// 14println(reduce)
}

first、single、fold、reduce 本质都是封装了 collect ,因此它们都是终止操作符。

生命周期

onStart

onStart 是 flow 的开始生命周期回调。onStart 的执行时机和它在 flow 位置无关。

对比下面两个方法,onStart 都会在第一时间回调。

fun onStartFun() = runBlocking {flowOf(1, 2, 3, 4, 5).filter {println("filter: $it")it > 2}.map {println("map: $it")it * 2}.take(2).onStart {println("onStart")}.collect {println("collect: $it")}
}fun onStartFun2() = runBlocking {flowOf(1, 2, 3, 4, 5).take(2).filter {println("filter: $it")it > 2}.map {println("map: $it")it * 2}.onStart {println("onStart")}.collect {println("collect: $it")}
}

onComplete

flow 执行完后回调 onComplete。onComplete 的执行时机和它在 flow 的位置无关。

flow 正常执行完回调 onComplete。

fun onCompleteFun() = runBlocking {flowOf(1, 2, 3, 4, 5).onCompletion {println("onCompletion")}.filter {println("filter: $it")it > 2}.take(2).collect {println("collect: $it")}
}

flow 执行被取消,或者 flow 执行中出现异常。

fun cancelOnCompleteFun() = runBlocking {launch {flow {emit(1)emit(2)emit(3)}// collect: 1//collect: 2//cancel//onCompletion first: kotlinx.coroutines.JobCancellationException.onCompletion {println("onCompletion first: $it")}.collect {println("collect: $it")if (it == 2) {// cancel flowcancel()println("cancel")}}}delay(1000)flowOf(4, 5, 6)
//    collect: 4
//    onCompletion second: java.lang.IllegalStateException.onCompletion {println("onCompletion second: $it")}.collect {println("collect: $it")throw IllegalStateException()}}

异常处理

flow 的异常处理可以分为上游异常和下游异常。上游异常指创建 flow 或者中间操作符发生的异常。下游异常指终止操作符 collect 发生的异常。

上游异常

上游异常可以用 catch 函数捕获异常。catch 函数和它的位置相关,只能捕获 catch 上游的异常。

fun flowCatch() = runBlocking {val flow = flow {emit(1)emit(2)throw IllegalStateException()emit(3)}flow.map {it * 2}.catch {println("catch: $it")}.collect {println(it)}//    2//    4//    catch: java.lang.IllegalStateException
}

下游异常

下游异常不能用 catch 函数,需要在 collect 的作用域用 try-catch 捕获。

catch 函数无法捕获下游的 filter 除 0 异常。

fun flowCatchDownStream() = runBlocking {val flow = flow {emit(1)emit(2)emit(3)}flow.map {it * 2}.catch {println("catch: $it")}.filter {it / 0 > 1}.collect {println(it)}// Exception in thread "main" java.lang.ArithmeticException: / by zero
}

使用 try-catch 捕获 collect 的异常。

fun flowTryCatch() = runBlocking {flowOf(4, 5, 6).onCompletion {println("onCompletion second: $it")}.collect {try {println("collect: $it")throw IllegalStateException()} catch (e: Exception) {println("catch $e")}}
//    collect: 4
//    catch java.lang.IllegalStateException
//            collect: 5
//    catch java.lang.IllegalStateException
//            collect: 6
//    catch java.lang.IllegalStateException
//            onCompletion second: null
}

线程切换

flowOn

flowOn 可以指定上游所有操作符运行的线程,和它的位置相关。

collect 运行在 main 线程,上游运行在 IO 线程,指定 DefaultDispatcher。

fun flowOn() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")emit(2)logX("Emit: 2")emit(3)logX("Emit: 3")}flow.filter {logX("Filter: $it")it > 2}.flowOn(Dispatchers.IO).collect {logX("Collect: $it")}
//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666096501866
//    ================================
//    ================================
//    Filter: 1
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Filter: 2
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Emit: 2
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Filter: 3
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Emit: 3
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Collect: 3
//    Thread:main, time:1666096501917
//    ================================
}

flowOn 在 filter 之前,emit 执行在 IO 线程,filter 和 collect 执行在 main 线程。

fun flowOnIO() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}flow.flowOn(Dispatchers.IO).filter {logX("Filter: $it")it > 0}.collect {logX("Collect: $it")}//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666165816908
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666165816942
//    ================================
//    ================================
//    Filter: 1
//    Thread:main, time:1666165816944
//    ================================
//    ================================
//    Collect: 1
//    Thread:main, time:1666165816944
//    ================================
}

因为 flowOn 只能用于上游,在 collect 可以用 withContext 切换线程,但不建议这么用。

collect 运行在 DefaultDispatcher,其他运行在 main 线程。

fun flowWithContext() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}flow.filter {logX("Filter: $it")it > 0}.collect {// 不建议withContext(Dispatchers.IO) {logX("Collect: $it")}}//    ================================
//    Start
//    Thread:main, time:1666167319244
//    ================================
//    ================================
//    Filter: 1
//    Thread:main, time:1666167319297
//    ================================
//    ================================
//    Collect: 1
//    Thread:DefaultDispatcher-worker-2, time:1666167319311
//    ================================
//    ================================
//    Emit: 1
//    Thread:main, time:1666167319312
//    ================================
}

flow 的 emit、filter、collect 都运行在 DefaultDispatcher。

fun flowWithContextAll() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}// 不建议withContext(Dispatchers.IO) {flow.filter {logX("Filter: $it")it > 0}.collect {logX("Collect: $it")}}
//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666167769589
//    ================================
//    ================================
//    Filter: 1
//    Thread:DefaultDispatcher-worker-1, time:1666167769645
//    ================================
//    ================================
//    Collect: 1
//    Thread:DefaultDispatcher-worker-1, time:1666167769645
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666167769645
//    ================================
}

launchIn

flow 提供了 launchIn 函数指定在哪个线程执行。launchIn 运行在指定的 CoroutineScope。

flowOn 之前的运行在 Dispatchers.IO,下游运行在 launchIn 指定的 scope。

fun flowLaunchIn() = runBlocking {val flow = flow {logX("Start")emit(1)logX("Emit: 1")}val scope = CoroutineScope(mySingleDispatcher)flow.flowOn(Dispatchers.IO).filter {logX("Filter: $it")it > 0}.onEach {logX("Collect: $it")}.launchIn(scope)delay(100L)//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666168824669
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666168824704
//    ================================
//    ================================
//    Filter: 1
//    Thread:mySingleThread, time:1666168824706
//    ================================
//    ================================
//    Collect: 1
//    Thread:mySingleThread, time:1666168824706
//    ================================}

launchIn 调用了 scope 的 launch,然后执行 collect。相当于终止操作符。

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {collect() // tail-call
}

flow 是冷的

flow 是冷的,只有接收者存在的情况下才会发送数据。如果不调用 collect,emit 不会执行。相反 channel 是热的,不管有没有接收者都会发送。

flow 的 emit 未执行。

fun flowCold() = runBlocking {val flow = flow {(1..3).forEach {println("Before send $it")emit(it)println("Send $it")}}val channel = produce<Int>(capacity = 0) {(1..3).forEach {println("Before send $it")send(it)println("Send $it")}}println("end")
//    end
//    Before send 1}

总结

flow 是 kotlin 提供的解决复杂异步场景的方案。

  1. flow 由创建、中间操作符、终止操作符三个部分组成。
  2. flow 的生命周期可以分为 onStart 和 onComplete,与它们在 flow 的位置无关。
  3. flow 的异常处理使用 catch。catch 与位置相关。
  4. flow 的线程切换使用 flowOn 和 launchIn。flowOn 控制上游,launchIn 控制全局。
  5. flow 是冷的,只有存在接收者它才会开始执行。

更多推荐

Kotlin的协程:flow

本文发布于:2024-02-07 10:27:54,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1756373.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Kotlin   flow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!