期货"/>
玩期货
在工作面试中,我们经常给Scala开发人员一个简单的设计任务:对二叉树进行建模。 最简单但不一定最佳的实现涉及Option
习惯用法:
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]])
使用case
类和协方差的不变性加分。 更好但更复杂的实现涉及两个case
类,但至少允许对空树进行建模:
sealed trait Tree[+T]
case object Empty extends Tree[Nothing]
case class Node[+T](value: T, left: Tree[T], right: Tree[T]) extends Tree[T]
让我们坚持第一个想法。 现在实现构建具有任意高度的树:
def apply[T](n: Int)(block: => T): Tree[T] = n match {case 1 => Tree(block, None, None)case _ =>Tree(block,Some(Tree(n - 1)(block)),Some(Tree(n - 1)(block)))
}
为了构建一棵具有1024个叶子和所有随机变量的树,可以这样说:
val randomTree: Tree[Double] = Tree(1 + 10)(math.random)
这是一个开放式的问题,下一个要求可能是编写等效于Seq.map()
或Option.map()
的map
方法。 了解这意味着什么是问题的一部分。 该实现非常简单明了:
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) = {def map[R](f: T => R): Tree[R] =Tree(f(value),left.map{_.map(f)},right.map{_.map(f)})
}
好吧…… .map{_.map(f)}
,你在跟我开玩笑吗? 请记住, left
和right
是Option
S和Option.map(f)
打开Option[T]
至Option[R]
因此,第一张map
来自Option
。 第二个_.map(f)
实际上是对Tree.map()
的递归调用。 现在,例如,我们可以创建第二棵树(不变性!),每个树的值增加但保留结构:
val tree: Tree[Int] = //...
val incremented = tree.map(1 +)
…或在每个值上调用toString()
:
val stringified = tree.map(_.toString)
让我们再走一点。 如果f
函数既费时Tree.map()
在执行map()
时经常需要这样做),或者我们的树很大,那么以某种方式并行化Tree.map()
怎样? 实现此目的的方法很少,陷阱也很多。 最简单的方法是使用由ExecutionContext
支持的线程池:
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {def pmap[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[R] = {val transformed: Future[R] = Future { f(value)}val leftFuture: Option[Future[Tree[R]]] = left.map { l => Future { l.pmap(f)}}val rightFuture: Option[Future[Tree[R]]] = right.map { r => Future { r.pmap(f)}}Tree(Await.result(transformed, timeout),leftFuture.map(Await.result(_, timeout)),rightFuture.map(Await.result(_, timeout)))}
}
一旦整理出一些implicits
pmap
使用pmap
(名称不是巧合 )就非常简单:
import scala.concurrent.{Await, Future, ExecutionContext}
import java.util.concurrent.Executors
import scala.concurrent.duration._val pool = Executors newFixedThreadPool 10
implicit val ec: ExecutionContext = ExecutionContext fromExecutor pool
implicit val timeout = 10.secondval tree = Tree("alpha",None,Some(Tree("beta",None,None)))println(tree.pmap{_.toUpperCase})
上面的示例代码将采用一棵简单的树,该树的树根带有“ alpha ”根和“ beta ”右孩子,并且在多个线程中全部使用大写值。 调用Future { ... }
是一个简单的习惯用法,用于将异步任务提交到线程池并获取Future[T]
作为回报。
此代码至少有几个问题。 首先主要是…等待。 几个线程将闲置,仅等待孩子完成。 但这不是最坏的情况。 想象一下,我们的线程池被限制为一个线程(对于较大的线程,问题仍然存在,但仍然有限)。 我们为孩子生成子任务,并等待它们完成。 但是这些子任务永远不会启动,因为它们无法从线程池中获取线程。 为什么? 因为池中只有一个线程,我们已经在使用它! 这个唯一的线程被阻止,等待着永远无法完成的任务。 这叫做死锁 。 实际上,代码将在给定的时间后超时,但不会改变上述实现严重失败的事实。 ForkJoinPool
可以解决此问题,但是有更多高级且有意义的解决方案。
进入
令人惊讶的是,有更好,更实用和更干净的方法。 反应式编程不鼓励等待。 让我们强调一下它,而不是隐藏处理树的异步性质! 由于处理已经基于Future
,因此将它们显式放置在API中:
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {def mapf[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[Future[R]] = {Tree(Future { f(value) },left.map {_.mapf(f)},right.map {_.mapf(f)})}
}
Tree.mapf()
立即返回,但现在不返回Tree[R]
而是返回Tree[Future[R]]
。 因此,我们有一棵树,其中每个节点都包含一个独立的Future
。 我们如何回到熟悉的Tree[R]
? 一种方法使用Tree.map()
,我们已经实现了它:
val treeOfFutures: Tree[Future[R]] = ...val tree = treeOfFutures.map(Await.result(_, 10.seconds))
我敢打赌,目前尚不清楚,但原则上这很简单–每个节点都等待独立的未来对象,直到所有对象都解决为止。 没有死锁的风险,因为期货互不依赖。
将
但是我们想更深入。 如果我们只能用一个期货来统治所有期货,那为什么还要使用一堆期货呢? 考虑一下将Seq[Future[T]]
转换为Future[Seq[T]]
Future.sequence()
。 单独为Tree[Future[T]]
实现这种方法是一项不错的任务。 这个想法是对所有未解决的任务都有一个计数器,一旦所有任务都完成,就可以不加阻塞地取消引用所有期货(因为它们已经完成):
object Tree {def sequence[T](tree: Tree[Future[T]])(implicit ec: ExecutionContext, timeout: Duration): Future[Tree[T]] = {val promise = Promise[Tree[T]]()val pending = new AtomicInteger(tree.size)for {future <- treevalue <- future} if(pending.decrementAndGet() == 0) {promise.success(tree.map(Await.result(_, 0.seconds)) //will never block)}promise.future}
}
上面的代码有点必要,并且不能正确处理异常-但可以作为一个很好的起点。 我们对所有期货进行迭代,并在每个期货完成后递减一个计数器。 如果所有期货都完成,我们将兑现我们的自定义承诺。 上面的代码需要两个额外的方法: Tree.size
和Tree.foreach()
(隐式用于内部理解)–我留给您作为练习。
翻译自: .html
更多推荐
玩期货
发布评论