Kotlin协程:Flow 异步流"/>
Kotlin协程:Flow 异步流
异步流
- 1、Flow 介绍
- 2、Flow 特性
- 3、冷流、热流
- 4、Flow构建器
- ①、flow{}
- ②、flowOf() 帮助可变数组生成 Flow 实例
- ③、asFlow 面向数组、列表等集合
- 5、Flow取消
- ①、withTimeoutOrNull
- ②、cancel
- ③、cancellable
- 6、Flow上下文
- ①、上下文保存属性
- ②、flowOn函数
- ③、launchIn函数
- 7、Flow背压
- ①、buffer
- ②、flowOn
- ③、conflate
- ④、collectLatest
- 8、Flow异常处理
- ①、发射端异常处理(上游)
- ②、收集端异常处理(下游)
- 9、Flow的完成
- ①、命令式处理finally
- ②、onCompletion声明式处理
- 10、Flow操作符
- ①、transfrom 转换
- ②、take 限长
- ③、zip 组合
- ④、flattenMerge
- ⑤、flattenConcat
- ⑥、flatMapConcat
- ⑦、flatMapMerge
- ⑧、flatMapLatest
- ⑨、reduce
- ⑩、fold
1、Flow 介绍
Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。
emit生产,collect消费
2、Flow 特性
- flow{}构建块中的代码可以使用挂起函数
- Flow构建器函数可以不用supend修饰符
- 流的每次单独收集都是按顺序执行的,除非使用特殊操作符
- Flow是一种类似序列的冷流,flow构建器中代码直到流被收集的时候才运行
//不需要用挂起函数修饰符
fun flowBuilder() = flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素}
}//flowOf 不需要挂起函数修饰符 flowOf自动实现发射元素
fun flowOfBuilder() = flowOf(1, 2, 3, 4, 5)//asFlow 不需要挂起函数修饰符 flowOf自动实现发射元素
fun asFlowBuilder() = (5..10).asFlow()
3、冷流、热流
Flow冷流:主动需要即是主动收集才会提供发射数据
Channel热流:不管你需不需要一上来数据全都发射给你
4、Flow构建器
①、flow{}
fun flowBuilder() = flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素}
}@Test
fun flowStudy(): Unit = runBlocking {//collect 需要在协程作用域中调用flowBuilder().collect(::println)
}//执行输出
1
2
3
②、flowOf() 帮助可变数组生成 Flow 实例
fun flowOfBuilder() = flowOf(1, 2, 3, 4, 5)@Test
fun flowStudy(): Unit = runBlocking {//collect 需要在协程作用域中调用flowOfBuilder().collect(::println)
}//执行输出
1
2
3
4
5
③、asFlow 面向数组、列表等集合
fun asFlowBuilder() = (5..10).asFlow()@Test
fun flowStudy(): Unit = runBlocking {asFlowBuilder().collect(::println)
}//执行输出
5
6
7
8
9
10
5、Flow取消
①、withTimeoutOrNull
//不需要用挂起函数修饰符
fun flowBuilder() = flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素println("emit is send $i")}
}@Test
fun flowStudy(): Unit = runBlocking {withTimeoutOrNull(2500){flowBuilder().collect(::println)}println("collect finish")
}//执行输出
1
emit is send 1
2
emit is send 2
collect finish
②、cancel
流构建器对每个发射值执行附加的ensureActive检查以进行取消,这意味着从flow{…} 发出的繁忙循环是可以取消的。
fun flowCancelBuilder() = flow {for (i in 1..5) {delay(1000)emit(i)println("emit is send $i")}
}@Test
fun flowCancelStudy(): Unit = runBlocking {flowCancelBuilder().collect { value -> if (value == 3) cancel() }println("collect finish")
}//执行输出
emit is send 1
emit is send 2
emit is send 3BlockingCoroutine was cancelled
kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@710636b0at app//kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1578)
③、cancellable
大多数其他流操作不会自行执行其他取消检查,在协程处于繁忙循环的情况下,必须通过 cancellable 明确检测是否取消
@Test
fun flowCancellableStudy(): Unit = runBlocking {(1..5).asFlow().collect { value ->println(value)if (value == 3) cancel()}
}//执行输出
1
2
3
4
5BlockingCoroutine was cancelled
kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@1e0b4072at app//kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1578)
上面这个示例这种一次性发射的繁忙循环并没有取消,还是收集完成了,要解决这个问题需要明确取消检查
使用 cancellable 就可以取消
@Test
fun flowCancellableStudy(): Unit = runBlocking {(1..5).asFlow().cancellable().collect { value ->println(value)if (value == 3) cancel()}
}//执行输出
1
2
3BlockingCoroutine was cancelled
kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@1a6d8329at app//kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1578)
6、Flow上下文
- 流的收集总是在调用协程的上下文中发生,流的这个属性称为上下文保存
- flow{}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射emit
- 上游:flowOn函数,该函数用于更改流发射的协程
- 下游:launchIn函数,改函数可以指定流收集处理协程,同时可以替换collect单独在协程中启动流的收集
①、上下文保存属性
示例:说明下游的上下文会传给上游上下文进行保存
fun flowContext() = flow {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}
}@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().collect{value ->println("value=$value Thread=${Thread.currentThread().name}")}
}//执行输出
value=1 Thread=Test worker @coroutine#1
emit send 1 Thread=Test worker @coroutine#1
value=2 Thread=Test worker @coroutine#1
emit send 2 Thread=Test worker @coroutine#1
value=3 Thread=Test worker @coroutine#1
emit send 3 Thread=Test worker @coroutine#1
示例:如果将上游使用withContext调整协程,那么执行会出现异常,因为没有遵循上下文保存属性
fun flowContext() = flow {withContext(Dispatchers.IO) {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}}
}@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().collect { value ->println("value=$value Thread=${Thread.currentThread().name}")}
}//执行输出
Flow invariant is violated:Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5d113a91, BlockingEventLoop@7eb7a30],but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@3347186a, Dispatchers.IO].Please refer to 'flow' documentation or use 'flowOn' instead
②、flowOn函数
如果想切换上游的协程,需要使用flowOn 函数
fun flowContext() = flow {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}
}.flowOn(Dispatchers.IO)@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().collect { value ->println("value=$value Thread=${Thread.currentThread().name}")}
}//执行结果
emit send 1 Thread=DefaultDispatcher-worker-1 @coroutine#2
value=1 Thread=Test worker @coroutine#1
emit send 2 Thread=DefaultDispatcher-worker-1 @coroutine#2
value=2 Thread=Test worker @coroutine#1
emit send 3 Thread=DefaultDispatcher-worker-1 @coroutine#2
value=3 Thread=Test worker @coroutine#1
③、launchIn函数
launchIn 是Flow的扩展函数,接收参数CoroutineScope,返回Job对象
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {collect() // tail-call
}
fun flowContext() = flow {for (i in 1..3) {delay(1000)emit(i)println("emit send $i Thread=${Thread.currentThread().name}")}
}@Test
fun flowContextStudy(): Unit = runBlocking {flowContext().onEach { println("collect $it Thread=${Thread.currentThread().name}") }.launchIn(CoroutineScope(Dispatchers.Default)) //返回Job对象.join() //跟runBlocking 不是父子关系,所以要 join
}//执行结果
collect 1 Thread=DefaultDispatcher-worker-2 @coroutine#2
emit send 1 Thread=DefaultDispatcher-worker-2 @coroutine#2
collect 2 Thread=DefaultDispatcher-worker-2 @coroutine#2
emit send 2 Thread=DefaultDispatcher-worker-2 @coroutine#2
collect 3 Thread=DefaultDispatcher-worker-2 @coroutine#2
emit send 3 Thread=DefaultDispatcher-worker-2 @coroutine#2
7、Flow背压
当发射效率大于收集效率时候会出现背压
fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
collect value=1
send emit=1
collect value=2
send emit=2
collect value=3
send emit=3
collect end time=1898
如上示例背压需要 1898ms 如何进一步优化时长?
①、buffer
buffer相当于加长了管道,让收集有更多时间执行
使用buffer,将时长优化到1501 ms
fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().buffer(100).collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
collect value=1
send emit=3
collect value=2
collect value=3
collect end time=1501
②、flowOn
flowOn 改变发射段的线程,时间优化到1505 ms
fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}.flowOn(Dispatchers.IO)@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
collect value=1
send emit=3
collect value=2
collect value=3
collect end time=1504
③、conflate
合并发射项,不对每个值进行处理,就是放弃中间一些值的处理
示例:这个例子发射值太少,都收集到了,但是还是有优化效果 1500ms ,可以增加发射值来验证,收集的数据肯定会缺少
fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().conflate().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
collect value=1
send emit=3
collect value=2
collect value=3
collect end time=1500
④、collectLatest
取消并重新发射最后一个值,就是只收集最后一个值
fun flowPressure() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowPressureStudy(): Unit = runBlocking {val time = measureTimeMillis {flowPressure().collectLatest { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
send emit=1
send emit=2
send emit=3
collect value=3
collect end time=1174
8、Flow异常处理
①、发射端异常处理(上游)
发射端异常处理一般使用 catch { } 处理
fun flowException() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")throw NoSuchMethodException("发射端异常")}
}.catch { println("捕捉到发射端异常") }@Test
fun flowExceptionStudy(): Unit = runBlocking {val time = measureTimeMillis {flowException().collect { value ->delay(400)println("collect value=$value")}}println("collect end time=$time")
}//执行输出
collect value=1
send emit=1
捕捉到发射端异常
collect end time=664
②、收集端异常处理(下游)
收集端异常一般使用 try{ } catch{ } 处理
fun flowException() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")}
}@Test
fun flowExceptionStudy(): Unit = runBlocking {val time = measureTimeMillis {try {flowException().collect { value ->delay(400)println("collect value=$value")throw NoSuchMethodException("收集端异常")}}catch (e:Exception){println("捕捉到收集端异常")}}println("collect end time=$time")
}//执行输出
collect value=1
捕捉到收集端异常
collect end time=648
9、Flow的完成
①、命令式处理finally
fun flowCompletion() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")throw NoSuchMethodException("收集端异常")}
}@Test
fun flowCompletionStudy(): Unit = runBlocking {try {flowCompletion().collect { value ->delay(400)println("collect value=$value")}}finally {println("collect onCompletion")}
}//执行输出
collect value=1
send emit=1
collect onCompletion收集端异常
java.lang.NoSuchMethodException: 收集端异常at com.example.mycoroutine.Flow1$flowCompletion$1.invokeSuspend(Flow1.kt:87)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
②、onCompletion声明式处理
onCompletion 只是会收集异常信息,但是不会捕捉异常
fun flowCompletion() = flow {for (i in 1..3) {delay(200)emit(i)println("send emit=$i")throw NoSuchMethodException("收集端异常")}
}@Test
fun flowCompletionStudy(): Unit = runBlocking {flowCompletion().onCompletion { exp ->println("collect onCompletion exp=$exp")}.collect { value ->delay(400)println("collect value=$value")}
}//执行输出
collect value=1
send emit=1
collect onCompletion exp=java.lang.NoSuchMethodException: 收集端异常收集端异常
java.lang.NoSuchMethodException: 收集端异常at com.example.mycoroutine.Flow1$flowCompletion$1.invokeSuspend(Flow1.kt:87)
10、Flow操作符
①、transfrom 转换
可以任意多次调用emit
@Test
fun flowTransformStudy(): Unit = runBlocking {(1..3).asFlow().transform {emit(it * it)emit("add it$it")}.collect { value ->println("collect value=$value")}
}//执行输出
collect value=1
collect value=add it1
collect value=4
collect value=add it2
collect value=9
collect value=add it3
②、take 限长
现在收集长度
@Test
fun flowTakeStudy(): Unit = runBlocking {(1..3).asFlow().take(2).collect{value ->println("collect value=$value")}
}//执行输出
collect value=1
collect value=2
③、zip 组合
组合两个流数据
@Test
fun flowZipStudy(): Unit = runBlocking {val asFlow = (1..3).asFlow()val flowOf = flowOf("kotlin", "java", "c++")asFlow.zip(flowOf) { flowOne, flowTwo ->"$flowOne->$flowTwo"}.collect(::println)
}//执行输出
1->kotlin
2->java
3->c++
④、flattenMerge
flattenMerge不会组合多个flow,而是将它们作为单个流执行。
flattenMerge和flattenConcat类似,但是可以设置并发收集的流的数量,如果设置成1,那么就等同于 flattenConcat
@Test
fun flowFlattenMergeStudy(): Unit = runBlocking {val flowA = (1..5).asFlow()val flowB = flowOf("one", "two", "three", "four", "five")flowOf(flowA, flowB).flattenMerge(2).collect {println("flattenMerge:$it")}
}
//执行输出
flattenMerge:1
flattenMerge:2
flattenMerge:3
flattenMerge:4
flattenMerge:5
flattenMerge:one
flattenMerge:two
flattenMerge:three
flattenMerge:four
flattenMerge:five
⑤、flattenConcat
@Test
fun flowFlattenConcatStudy(): Unit = runBlocking {val flowA = (1..5).asFlow()val flowB = flowOf("one", "two", "three", "four", "five")flowOf(flowA, flowB).flattenConcat().collect { println("flattenConcat:$it") }
}
//执行输出
flattenConcat:1
flattenConcat:2
flattenConcat:3
flattenConcat:4
flattenConcat:5
flattenConcat:one
flattenConcat:two
flattenConcat:three
flattenConcat:four
flattenConcat:five
⑥、flatMapConcat
在调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成
fun currTime() = System.currentTimeMillis()
var start:Long=0@Test
fun flowFlatMapConcatStudy(): Unit = runBlocking {(1..5).asFlow().onStart {start=currTime()}.flatMapConcat {flow {emit("$it kotlin")delay(100)emit("$it java")}}.collect {println("collect $it at ${System.currentTimeMillis()-start}")}
}// 执行输出
collect 1 kotlin at 7
collect 1 java at 126
collect 2 kotlin at 126
collect 2 java at 236
collect 3 kotlin at 236
collect 3 java at 345
collect 4 kotlin at 346
collect 4 java at 454
collect 5 kotlin at 455
collect 5 java at 567
⑦、flatMapMerge
并发收集flows并且将合并它们的值为一个单一flow,因此发射地值会尽快被处理
fun currTime() = System.currentTimeMillis()
var start:Long=0@Test
fun flowFlatMapMergeStudy(): Unit = runBlocking {(1..5).asFlow().onStart {start = currTime()}.flatMapMerge {flow {emit("$it kotlin")delay(100)emit("$it java")}}.collect {println("collect $it at ${System.currentTimeMillis() - start}")}}// 执行输出 可以看出来flatMapMerge并发特性
collect 1 kotlin at 13
collect 2 kotlin at 14
collect 3 kotlin at 15
collect 4 kotlin at 15
collect 5 kotlin at 15
collect 1 java at 119
collect 2 java at 119
collect 3 java at 119
collect 4 java at 119
collect 5 java at 120
⑧、flatMapLatest
flatMapLatest和collectLatest操作符很像,只要新flow发射了新值,那么上个flow就会被取消。
fun currTime() = System.currentTimeMillis()
var start:Long=0@Test
fun flowFlatMapLatestStudy(): Unit = runBlocking {(1..5).asFlow().onStart {start = currTime()}.flatMapLatest {flow {emit("$it kotlin")delay(100)emit("$it java")}}.collect {println("collect $it at ${System.currentTimeMillis() - start}")}}// 执行输出
collect 1 kotlin at 12
collect 2 kotlin at 41
collect 3 kotlin at 41
collect 4 kotlin at 42
collect 5 kotlin at 43
collect 5 java at 148
⑨、reduce
就是两个元素操作之后拿到的值跟后面的元素进行操作,用于把flow 简化合并为一个值
@Test
fun flowReduceStudy(): Unit = runBlocking {val reduceResult = (1..5).asFlow().reduce { valueA, valueB ->println("reduce valueA=$valueA,valueB=$valueB")valueA + valueB}println("reduceResult=$reduceResult")
}//执行输出
reduce valueA=1,valueB=2
reduce valueA=3,valueB=3
reduce valueA=6,valueB=4
reduce valueA=10,valueB=5
reduceResult=15
⑩、fold
@Test
fun flowFoldStudy(): Unit = runBlocking {val reduceResult = (1..5).asFlow().fold(2) { valueA, valueB ->println("fold valueA=$valueA,valueB=$valueB")valueA * valueB}println("reduceResult=$reduceResult")
}//执行输出
fold valueA=2,valueB=1
fold valueA=2,valueB=2
fold valueA=4,valueB=3
fold valueA=12,valueB=4
fold valueA=48,valueB=5
reduceResult=240
更多推荐
Kotlin协程:Flow 异步流
发布评论