map 和 mapAsync 的区别

编程入门 行业动态 更新时间:2024-10-22 11:40:52
本文介绍了map 和 mapAsync 的区别的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

谁能解释一下 map 和 mapAsync w.r.t AKKA 流之间的区别?在文档中据说

Can anyone please explain me difference between map and mapAsync w.r.t AKKA stream? In the documentation it is said that

涉及外部非流的流转换和副作用可以使用 mapAsync 或 mapAsyncUnordered 执行基于服务的服务

Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered

为什么我们不能简单地在这里映射?我假设 Flow、Source、Sink 本质上都是 Monadic,因此 map 应该可以正常工作,w.r.t w.r.t w.r.t w.r.t of the nature of these?

Why cant we simply us map here? I assume that Flow, Source, Sink all would be Monadic in nature and thus map should work fine w.r.t the Delay in the nature of these ?

推荐答案

签名

差异在签名:Flow.map 接收一个返回类型 T 的函数,而 Flow.mapAsync 接收一个返回类型 的函数>未来[T].

The difference is best highlighted in the signatures: Flow.map takes in a function that returns a type T while Flow.mapAsync takes in a function that returns a type Future[T].

实际例子

举个例子,假设我们有一个函数,它根据用户 id 在数据库中查询用户的全名:

As an example, suppose that we have a function which queries a database for a user's full name based on a user id:

type UserID = String type FullName = String val databaseLookup : UserID => FullName = ??? //implementation unimportant

给定 UserID 值的 akka 流 Source,我们可以在流中使用 Flow.map 来查询数据库并打印全名到控制台:

Given an akka stream Source of UserID values we could use Flow.map within a stream to query the database and print the full names to the console:

val userIDSource : Source[UserID, _] = ??? val stream = userIDSource.via(Flow[UserID].map(databaseLookup)) .to(Sink.foreach[FullName](println)) .run()

此方法的一个限制是此流一次只能进行 1 db 查询.这种串行查询将是一个瓶颈",可能会阻止我们的流中的最大吞吐量.

One limitation of this approach is that this stream will only make 1 db query at a time. This serial querying will be a "bottleneck" and likely prevent maximum throughput in our stream.

我们可以尝试通过使用 Future 的并发查询来提高性能:

We could try to improve performance through concurrent queries using a Future:

def concurrentDBLookup(userID : UserID) : Future[FullName] = Future { databaseLookup(userID) } val concurrentStream = userIDSource.via(Flow[UserID].map(concurrentDBLookup)) .to(Sink.foreach[Future[FullName]](_ foreach println)) .run()

这个简单的附录的问题在于我们已经有效地消除了背压.

The problem with this simplistic addendum is that we have effectively eliminated backpressure.

Sink 只是拉入 Future 并添加一个 foreach println,与数据库查询相比,它相对较快.流将不断地将需求传播到源,并在 Flow.map 内产生更多的期货.因此,同时运行的databaseLookup 的数量没有限制.不受约束的并行查询最终可能会使数据库过载.

The Sink is just pulling in the Future and adding a foreach println, which is relatively fast compared to database queries. The stream will continuously propagate demand to the Source and spawn off more Futures inside of the Flow.map. Therefore, there is no limit to the number of databaseLookup running concurrently. Unfettered parallel querying could eventually overload the database.

Flow.mapAsync 来救援;我们可以同时进行数据库访问,同时限制同时查找的数量:

Flow.mapAsync to the rescue; we can have concurrent db access while at the same time capping the number of simultaneous lookups:

val maxLookupCount = 10 val maxLookupConcurrentStream = userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup)) .to(Sink.foreach[FullName](println)) .run()

还要注意 Sink.foreach 变得更简单了,它不再需要 Future[FullName] 而只是一个 FullName.

Also notice that the Sink.foreach got simpler, it no longer takes in a Future[FullName] but just a FullName instead.

无序异步映射

如果不需要维护用户 ID 到全名的顺序,那么您可以使用 Flow.mapAsyncUnordered.例如:您只需要将所有名称打印到控制台,而不必关心它们的打印顺序.

If maintaining a sequential ordering of the UserIDs to FullNames is unnecessary then you can use Flow.mapAsyncUnordered. For example: you just need to print all of the names to the console but didn't care about order they were printed.

更多推荐

map 和 mapAsync 的区别

本文发布于:2023-11-26 00:18:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631908.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:区别   map   mapAsync

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!