Kotlin协程:Flow 异步流

编程入门 行业动态 更新时间:2024-10-06 10:25:54

<a href=https://www.elefans.com/category/jswz/34/1766123.html style=Kotlin协程:Flow 异步流"/>

Kotlin协程:Flow 异步流

异步流

    • 1、Flow 介绍
    • 2、Flow 特性
    • 3、冷流、热流
    • 4、Flow构建器
      • ①、flow{}
      • ②、flowOf() 帮助可变数组生成 Flow 实例
      • ③、asFlow 面向数组、列表等集合
    • 5、Flow取消
      • ①、withTimeoutOrNull
      • ②、cancel
      • ③、cancellable
    • 6、Flow上下文
      • ①、上下文保存属性
      • ②、flowOn函数
      • ③、launchIn函数
    • 7、Flow背压
      • ①、buffer
      • ②、flowOn
      • ③、conflate
      • ④、collectLatest
    • 8、Flow异常处理
      • ①、发射端异常处理(上游)
      • ②、收集端异常处理(下游)
    • 9、Flow的完成
      • ①、命令式处理finally
      • ②、onCompletion声明式处理
    • 10、Flow操作符
      • ①、transfrom 转换
      • ②、take 限长
      • ③、zip 组合
      • ④、flattenMerge
      • ⑤、flattenConcat
      • ⑥、flatMapConcat
      • ⑦、flatMapMerge
      • ⑧、flatMapLatest
      • ⑨、reduce
      • ⑩、fold

1、Flow 介绍

Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。

emit生产,collect消费

2、Flow 特性

  • flow{}构建块中的代码可以使用挂起函数
  • Flow构建器函数可以不用supend修饰符
  • 流的每次单独收集都是按顺序执行的,除非使用特殊操作符
  • Flow是一种类似序列的冷流,flow构建器中代码直到流被收集的时候才运行
//不需要用挂起函数修饰符
fun flowBuilder() = flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素}
}//flowOf 不需要挂起函数修饰符 flowOf自动实现发射元素
fun flowOfBuilder() = flowOf(1, 2, 3, 4, 5)//asFlow 不需要挂起函数修饰符 flowOf自动实现发射元素
fun asFlowBuilder() = (5..10).asFlow()

3、冷流、热流

Flow冷流:主动需要即是主动收集才会提供发射数据
Channel热流:不管你需不需要一上来数据全都发射给你

4、Flow构建器

①、flow{}

fun flowBuilder() = flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素}
}@Test
fun flowStudy(): Unit = runBlocking {//collect 需要在协程作用域中调用flowBuilder().collect(::println)
}//执行输出
1
2
3

②、flowOf() 帮助可变数组生成 Flow 实例

fun flowOfBuilder() = flowOf(1, 2, 3, 4, 5)@Test
fun flowStudy(): Unit = runBlocking {//collect 需要在协程作用域中调用flowOfBuilder().collect(::println)
}//执行输出
1
2
3
4
5

③、asFlow 面向数组、列表等集合

fun asFlowBuilder() = (5..10).asFlow()@Test
fun flowStudy(): Unit = runBlocking {asFlowBuilder().collect(::println)
}//执行输出
5
6
7
8
9
10

5、Flow取消

①、withTimeoutOrNull

//不需要用挂起函数修饰符
fun flowBuilder() = flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素println("emit is send $i")}
}@Test
fun flowStudy(): Unit = runBlocking {withTimeoutOrNull(2500){flowBuilder().collect(::println)}println("collect finish")
}//执行输出
1
emit is send 1
2
emit is send 2
collect finish

②、cancel

流构建器对每个发射值执行附加的ensureActive检查以进行取消,这意味着从flow{…} 发出的繁忙循环是可以取消的。

fun flowCancelBuilder() = flow {for (i in 1..5) {delay(1000)emit(i)println("emit is send $i")}
}@Test
fun flowCancelStudy(): Unit = runBlocking {flowCancelBuilder().collect { value -> if (value == 3) cancel() }println("collect finish")
}//执行输出
emit is send 1
emit is send 2
emit is send 3BlockingCoroutine was cancelled
kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@710636b0at app//kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1578)

③、cancellable

大多数其他流操作不会自行执行其他取消检查,在协程处于繁忙循环的情况下,必须通过 cancellable 明确检测是否取消

@Test
fun flowCancellableStudy(): Unit = runBlocking {(1..5).asFlow().collect { value ->println(value)if (value == 3) cancel()}
}//执行输出
1
2
3
4
5BlockingCoroutine was cancelled
kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@1e0b4072at app//kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1578)

上面这个示例这种一次性发射的繁忙循环并没有取消,还是收集完成了,要解决这个问题需要明确取消检查

使用 cancellable 就可以取消

@Test
fun flowCancellableStudy(): Unit = runBlocking {(1..5).asFlow().cancellable().collect { value ->println(value)if (value == 3) cancel()}
}//执行输出
1
2
3BlockingCoroutine was cancelled
kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@1a6d8329at app//kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1578)

6、Flow上下文

  • 流的收集总是在调用协程的上下文中发生,流的这个属性称为上下文保存
  • flow{}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射emit
  • 上游:flowOn函数,该函数用于更改流发射的协程
  • 下游:launchIn函数,改函数可以指定流收集处理协程,同时可以替换collect单独在协程中启动流的收集

①、上下文保存属性

示例:说明下游的上下文会传给上游上下文进行保存

fun flowContext() = flow {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}
}@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().collect{value ->println("value=$value Thread=${Thread.currentThread().name}")}
}//执行输出
value=1 Thread=Test worker @coroutine#1
emit send 1 Thread=Test worker @coroutine#1
value=2 Thread=Test worker @coroutine#1
emit send 2 Thread=Test worker @coroutine#1
value=3 Thread=Test worker @coroutine#1
emit send 3 Thread=Test worker @coroutine#1

示例:如果将上游使用withContext调整协程,那么执行会出现异常,因为没有遵循上下文保存属性

fun flowContext() = flow {withContext(Dispatchers.IO) {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}}
}@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().collect { value ->println("value=$value Thread=${Thread.currentThread().name}")}
}//执行输出
Flow invariant is violated:Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5d113a91, BlockingEventLoop@7eb7a30],but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@3347186a, Dispatchers.IO].Please refer to 'flow' documentation or use 'flowOn' instead

②、flowOn函数

如果想切换上游的协程,需要使用flowOn 函数

fun flowContext() = flow {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}
}.flowOn(Dispatchers.IO)@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().collect { value ->println("value=$value Thread=${Thread.currentThread().name}")}
}//执行结果
emit send 1 Thread=DefaultDispatcher-worker-1 @coroutine#2
value=1 Thread=Test worker @coroutine#1
emit send 2 Thread=DefaultDispatcher-worker-1 @coroutine#2
value=2 Thread=Test worker @coroutine#1
emit send 3 Thread=DefaultDispatcher-worker-1 @coroutine#2
value=3 Thread=Test worker @coroutine#1

③、launchIn函数

launchIn 是Flow的扩展函数,接收参数CoroutineScope,返回Job对象

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {collect() // tail-call
}
fun flowContext() = flow {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}
}@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().onEach { println("collect $it Thread=${Thread.currentThread().name}") }.launchIn(CoroutineScope(Dispatchers.Default)) //返回Job对象.join() //跟runBlocking  不是父子关系,所以要 join
}//执行结果
collect 1 Thread=DefaultDispatcher-worker-2 @coroutine#2
emit send 1 Thread=DefaultDispatcher-worker-2 @coroutine#2
collect 2 Thread=DefaultDispatcher-worker-2 @coroutine#2
emit send 2 Thread=DefaultDispatcher-worker-2 @coroutine#2
collect 3 Thread=DefaultDispatcher-worker-2 @coroutine#2
emit send 3 Thread=DefaultDispatcher-worker-2 @coroutine#2

7、Flow背压

当发射效率大于收集效率时候会出现背压

fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
collect value=1
send emit=1
collect value=2
send emit=2
collect value=3
send emit=3
collect end time=1898

如上示例背压需要 1898ms 如何进一步优化时长?

①、buffer

buffer相当于加长了管道,让收集有更多时间执行

使用buffer,将时长优化到1501 ms

fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().buffer(100).collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
collect value=1
send emit=3
collect value=2
collect value=3
collect end time=1501

②、flowOn

flowOn 改变发射段的线程,时间优化到1505 ms

fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}.flowOn(Dispatchers.IO)@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
collect value=1
send emit=3
collect value=2
collect value=3
collect end time=1504

③、conflate

合并发射项,不对每个值进行处理,就是放弃中间一些值的处理

示例:这个例子发射值太少,都收集到了,但是还是有优化效果 1500ms ,可以增加发射值来验证,收集的数据肯定会缺少

fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().conflate().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
collect value=1
send emit=3
collect value=2
collect value=3
collect end time=1500

④、collectLatest

取消并重新发射最后一个值,就是只收集最后一个值

fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().collectLatest { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
send emit=3
collect value=3
collect end time=1174

8、Flow异常处理

①、发射端异常处理(上游)

发射端异常处理一般使用 catch { } 处理

fun flowException() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")throw NoSuchMethodException("发射端异常")}
}.catch { println("捕捉到发射端异常") }@Test
fun flowExceptionStudy(): Unit = runBlocking {val time = measureTimeMillis {flowException().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
collect value=1
send emit=1
捕捉到发射端异常
collect end time=664

②、收集端异常处理(下游)

收集端异常一般使用 try{ } catch{ } 处理

fun flowException() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowExceptionStudy(): Unit = runBlocking {val time = measureTimeMillis {try {flowException().collect { value ->delay(400)println("collect value=$value")throw NoSuchMethodException("收集端异常")}}catch (e:Exception){println("捕捉到收集端异常")}}println("collect end time=$time")
}//执行输出
collect value=1
捕捉到收集端异常
collect end time=648

9、Flow的完成

①、命令式处理finally

fun flowCompletion() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")throw NoSuchMethodException("收集端异常")}
}@Test
fun flowCompletionStudy(): Unit = runBlocking {try {flowCompletion().collect { value ->delay(400)println("collect value=$value")}}finally {println("collect onCompletion")}
}//执行输出
collect value=1
send emit=1
collect onCompletion收集端异常
java.lang.NoSuchMethodException: 收集端异常at com.example.mycoroutine.Flow1$flowCompletion$1.invokeSuspend(Flow1.kt:87)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)

②、onCompletion声明式处理

onCompletion 只是会收集异常信息,但是不会捕捉异常

fun flowCompletion() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")throw NoSuchMethodException("收集端异常")}
}@Test
fun flowCompletionStudy(): Unit = runBlocking {flowCompletion().onCompletion { exp ->println("collect onCompletion exp=$exp")}.collect { value ->delay(400)println("collect value=$value")}
}//执行输出
collect value=1
send emit=1
collect onCompletion exp=java.lang.NoSuchMethodException: 收集端异常收集端异常
java.lang.NoSuchMethodException: 收集端异常at com.example.mycoroutine.Flow1$flowCompletion$1.invokeSuspend(Flow1.kt:87)

10、Flow操作符

①、transfrom 转换

可以任意多次调用emit

@Test
fun flowTransformStudy(): Unit = runBlocking {(1..3).asFlow().transform {emit(it * it)emit("add it$it")}.collect { value ->println("collect value=$value")}
}//执行输出
collect value=1
collect value=add it1
collect value=4
collect value=add it2
collect value=9
collect value=add it3

②、take 限长

现在收集长度

@Test
fun flowTakeStudy(): Unit = runBlocking {(1..3).asFlow().take(2).collect{value ->println("collect value=$value")}
}//执行输出
collect value=1
collect value=2

③、zip 组合

组合两个流数据

@Test
fun flowZipStudy(): Unit = runBlocking {val asFlow = (1..3).asFlow()val flowOf = flowOf("kotlin", "java", "c++")asFlow.zip(flowOf) { flowOne, flowTwo ->"$flowOne->$flowTwo"}.collect(::println)
}//执行输出
1->kotlin
2->java
3->c++

④、flattenMerge

flattenMerge不会组合多个flow,而是将它们作为单个流执行。
flattenMerge和flattenConcat类似,但是可以设置并发收集的流的数量,如果设置成1,那么就等同于 flattenConcat

@Test
fun flowFlattenMergeStudy(): Unit = runBlocking {val flowA = (1..5).asFlow()val flowB = flowOf("one", "two", "three", "four", "five")flowOf(flowA, flowB).flattenMerge(2).collect {println("flattenMerge:$it")}
}
//执行输出
flattenMerge:1
flattenMerge:2
flattenMerge:3
flattenMerge:4
flattenMerge:5
flattenMerge:one
flattenMerge:two
flattenMerge:three
flattenMerge:four
flattenMerge:five

⑤、flattenConcat

@Test
fun flowFlattenConcatStudy(): Unit = runBlocking {val flowA = (1..5).asFlow()val flowB = flowOf("one", "two", "three", "four", "five")flowOf(flowA, flowB).flattenConcat().collect { println("flattenConcat:$it") }
}
//执行输出
flattenConcat:1
flattenConcat:2
flattenConcat:3
flattenConcat:4
flattenConcat:5
flattenConcat:one
flattenConcat:two
flattenConcat:three
flattenConcat:four
flattenConcat:five

⑥、flatMapConcat

在调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成

fun currTime() = System.currentTimeMillis()
var start:Long=0@Test
fun flowFlatMapConcatStudy(): Unit = runBlocking {(1..5).asFlow().onStart {start=currTime()}.flatMapConcat {flow {emit("$it kotlin")delay(100)emit("$it java")}}.collect {println("collect $it at ${System.currentTimeMillis()-start}")}
}// 执行输出
collect 1 kotlin at 7
collect 1 java at 126
collect 2 kotlin at 126
collect 2 java at 236
collect 3 kotlin at 236
collect 3 java at 345
collect 4 kotlin at 346
collect 4 java at 454
collect 5 kotlin at 455
collect 5 java at 567

⑦、flatMapMerge

并发收集flows并且将合并它们的值为一个单一flow,因此发射地值会尽快被处理

fun currTime() = System.currentTimeMillis()
var start:Long=0@Test
fun flowFlatMapMergeStudy(): Unit = runBlocking {(1..5).asFlow().onStart {start = currTime()}.flatMapMerge {flow {emit("$it kotlin")delay(100)emit("$it java")}}.collect {println("collect $it at ${System.currentTimeMillis() - start}")}}// 执行输出 可以看出来flatMapMerge并发特性
collect 1 kotlin at 13
collect 2 kotlin at 14
collect 3 kotlin at 15
collect 4 kotlin at 15
collect 5 kotlin at 15
collect 1 java at 119
collect 2 java at 119
collect 3 java at 119
collect 4 java at 119
collect 5 java at 120

⑧、flatMapLatest

flatMapLatest和collectLatest操作符很像,只要新flow发射了新值,那么上个flow就会被取消。

fun currTime() = System.currentTimeMillis()
var start:Long=0@Test
fun flowFlatMapLatestStudy(): Unit = runBlocking {(1..5).asFlow().onStart {start = currTime()}.flatMapLatest {flow {emit("$it kotlin")delay(100)emit("$it java")}}.collect {println("collect $it at ${System.currentTimeMillis() - start}")}}// 执行输出
collect 1 kotlin at 12
collect 2 kotlin at 41
collect 3 kotlin at 41
collect 4 kotlin at 42
collect 5 kotlin at 43
collect 5 java at 148

⑨、reduce

就是两个元素操作之后拿到的值跟后面的元素进行操作,用于把flow 简化合并为一个值

@Test
fun flowReduceStudy(): Unit = runBlocking {val reduceResult = (1..5).asFlow().reduce { valueA, valueB ->println("reduce valueA=$valueA,valueB=$valueB")valueA + valueB}println("reduceResult=$reduceResult")
}//执行输出
reduce valueA=1,valueB=2
reduce valueA=3,valueB=3
reduce valueA=6,valueB=4
reduce valueA=10,valueB=5
reduceResult=15

⑩、fold

@Test
fun flowFoldStudy(): Unit = runBlocking {val reduceResult = (1..5).asFlow().fold(2) { valueA, valueB ->println("fold valueA=$valueA,valueB=$valueB")valueA * valueB}println("reduceResult=$reduceResult")
}//执行输出
fold valueA=2,valueB=1
fold valueA=2,valueB=2
fold valueA=4,valueB=3
fold valueA=12,valueB=4
fold valueA=48,valueB=5
reduceResult=240

更多推荐

Kotlin协程:Flow 异步流

本文发布于:2024-02-07 10:29:06,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1756096.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Kotlin   协程   Flow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!