我很好奇递归构建将顺序运行的Akka期货链的最佳方法,如果将来的 doWork 调用失败,则未来应该是重试最多3次,如果链用尽重试次数,则链条将失败。假设所有 doWork 调用均通过返回的将来 futChain 调用,则该操作只能完成。
I'm curious about the best way to recursively build a chain of Akka futures which will run sequentially, if a doWork call in a future fails, the future should be retried up to 3 times, the chain should fail if it runs out of retry attempts. Assuming all doWork calls pass the returned future futChain should only complete.
object Main extends App { val futChain = recurse(2) def recurse(param: Int, retries: Int = 3): Future[String] { Future { doWorkThatMayFailReturningString(param...) } recoverWith { case e => if (retries > 0) recurse(param, retries -1) else Future.failed(e) } flatMap { strRes => recurse(nextParam) //how should the res from the previous fut be passed? } } futChain onComplete { case res => println(res) //should print all the strings } }
推荐答案
您可以实现可重试的未来像这样:
You can implement a retryable Future like this:
def retry[T](f: => Future[T])(n: Int)(implicit e: ExecutionContext): Future[T] = { n match { case i if (i > 1) => f.recoverWith{ case t: Throwable => retry(f)(n - 1)} case _ => f } }这不是针对尾递归优化的,但是如果您只打算重试几次,就不会出现堆栈溢出的情况(而且我想如果它在前几次失败了,无论如何都会继续失败)。
This isn't optimized for tail recursion, but if you only intend on retrying a few times, you won't get a stack overflow (and I imagine if it's failed the first few, it's going to keep failing, anyway).
然后我将单独进行链接。如果您要链接的函数数量有限,则每个函数都取决于前一个函数(出于某种原因,您希望汇总结果),您可以使用来进行理解(语法糖对于 flatMap ):
Then I would do the chaining separately. If you have a finite number of functions to chain together, each depending on the previous (and for some reason you want to aggregate the results) you can use for comprehensions (syntactic sugar for flatMap):
for { firstResult <- retry(Future(doWork(param)))(3) secondResult <- retry(Future(doWork(firstResult)))(3) thirdResult <- retry(Future(doWork(secondResult)))(3) } yield List(firstResult, secondResult, thirdResult)任意长的链,可以使用 Future.sequence (在Akka库中为 Futures )并行进行: / p>
For an arbitrarily long chains, you can do them in parallel using Future.sequence (Futures in the Akka library):
def doWork(param: String): String = ... val parameters: List[String] = List(...) val results: Future[List[String]] = Future.sequence(parameters.map(doWork(_)))这将解开 List [Future [String] ] 到 Future [List [String]] 。
这是一种方法
def sequential[A, B](seq: List[A])(f: A => Future[B])(implicit e: ExecutionContext): Future[List[B]] = { seq.foldLeft(Future.successful(List[B]())) { case (left, next) => left.flatMap(list => f(next).map(_ :: list)) } } def doWork(param: String): String = ... val results: Future[List[String]] = sequential(parameters)(param => Future(doWork(param)))这些功能的实现对您的用例非常敏感。如果链中的任何期货失败,上述两个函数将返回失败的期货。有时您会想要这样做,而其他时候则不需要。如果您只想收集成功的期货,并丢弃失败的期货而又不使整个结果失败,则可以添加一个额外的步骤来恢复失败。
The implementation of these functions is very sensitive to your use case. The two above functions will return failed futures if any of the futures in the chain failed. Sometimes you'll want this, other times not. If you want to collect only the successful futures, and discard failed ones without failing the entire result, you can add an extra step to recover the failures.
此外,两者之间的区别 recover 和 recoverWith 之间是它接受的 PartialFunction 的类型。 恢复用默认值替换失败的期货,而 recoverWith 则使用另一个 Future 。对于我的重试, recoverWith 更合适,因为我试图恢复失败的未来本身。
Additionally, the difference between recover and recoverWith is the type of PartialFunction it accepts. recover replaces failed futures with default values, while recoverWith does so using another Future. In the case of my retry, recoverWith is more appropriate because I'm trying to recover the failed Future with itself.
更多推荐
任意长度的未来递归模式/未来链
发布评论