有时我会遇到自己想要合并为Future[Stream[Y]]的Stream[X]和function X => Future Y的情况,但似乎找不到解决方法.例如,我有
I sometimes find myself in a situation where I have some Stream[X], and a function X => Future Y, that I'd like to combine to a Future[Stream[Y]], and I can't seem to find a way to do it. For example, I have
val x = (1 until 10).toStream def toFutureString(value : Integer) = Future(value toString) val result : Future[Stream[String]] = ???我尝试了
val result = Future.Traverse(x, toFutureString)给出正确的结果,但似乎在返回Future之前消耗了整个流,这或多或少地击败了钱包
which gives the correct result, but seems to consume the entire stream before returning the Future, which more or less defeats the purpse
我尝试过
val result = x.flatMap(toFutureString)但是不能用type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]
val result = x.map(toFutureString)返回有些奇怪且无用的Stream[Future[String]]
returns the somewhat odd and useless Stream[Future[String]]
在这里我应该怎么做才能解决问题?
What should I do here to get things fixed?
我不会卡在Stream上,只要在开始处理头部之前不会阻塞评估所有项目,我对Iterator上的相同操作也会同样满意.
I'm not stuck on a Stream, I'd be equally happy with the same operation on an Iterator, as long as it won't block on evaluating all items before starting to process the head
Edit2:我不确定100%确定Future.Traverse构造是否需要在返回Future [Stream]之前遍历整个流,但我认为确实如此.如果不是这样,那么这本身就是一个很好的答案.
I'm not 100% sure that the Future.Traverse construct needs to traverse the entire stream before returning a Future[Stream], but I think it does. If it doesn't, that's a fine answer in itself.
Edit3:我也不需要按顺序排列结果,我可以按返回的流或迭代器顺序排列.
I also don't need the result to be in order, I'm fine with the stream or iterator returned being whatever order.
推荐答案使用traverse的方向正确,但不幸的是,在这种情况下,标准库的定义似乎有些破损,不应返回之前需要消耗流.
You're on the right track with traverse, but unfortunately it looks like the standard library's definition is a little broken in this case—it shouldn't need to consume the stream before returning.
Future.traverse是更为通用的功能的特定版本,该功能可用于以可遍历"类型包装的任何应用函子(请参见这些 论文或我的答案此处).
Future.traverse is a specific version of a much more general function that works on any applicative functor wrapped in a "traversable" type (see these papers or my answer here for more information, for example).
Scalaz 库提供了这个更通用的版本,在这种情况下,它可以按预期工作(请注意我正在从 scalaz-contrib 获取Future的应用仿函数实例;不是但是在Scalaz的稳定版本中,该版本仍与Scala 2.9.2交叉构建,而Scala 2.9.2没有此Future):
The Scalaz library provides this more general version, and it works as expected in this case (note that I'm getting the applicative functor instance for Future from scalaz-contrib; it's not yet in the stable versions of Scalaz, which are still cross-built against Scala 2.9.2, which doesn't have this Future):
import scala.concurrent._ import scalaz._, Scalaz._, scalaz.contrib.std._ import ExecutionContext.Implicits.global def toFutureString(value: Int) = Future(value.toString) val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString这会立即在无限流上返回,因此我们可以确定它并没有首先消耗.
This returns immediately on an infinite stream, so we know for sure that it's not being consuming first.
作为脚注:如果您查看 Future.traverse的源,您会看到它是根据foldLeft来实现的,这很方便,但对于流而言不是必需的或不适当的.
As a footnote: If you look at the source for Future.traverse you'll see that it's implemented in terms of foldLeft, which is convenient, but not necessary or appropriate in the case of streams.
更多推荐
使用返回Future的函数映射Stream
发布评论