如何使用Akka HTTP从多个actor / web处理程序正确调用单个服务器?

编程入门 行业动态 更新时间:2024-10-25 14:33:37
本文介绍了如何使用Akka HTTP从多个actor / web处理程序正确调用单个服务器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个服务(让我们称之为服务A),它使用Akka Server HTTP来处理传入的请求。 此外,我还有第三方应用程序(服务B),它提供了多种Web服务。 服务A的目的是转换客户端请求,调用服务B的一个或多个Web服务,合并/转换结果并将其提供给客户端。

I have a service (let's call it Service A) which uses Akka Server HTTP to handle incoming requests. Also I have 3rd party application (Service B) which provides several web services. The purpose of service A is to transform client requests, call one or multiple web services of service B, merge/transform results and serve it back to a client.

我在某些部分使用Actors,而在其他部分使用Future。 要拨打服务B,我使用Akka HTTP客户端。

I am using Actors for some parts, and just Future for other. To make a call to Service B, I use Akka HTTP client.

Http.get(actorSystem).singleRequest(HttpRequest.create() .withUri("127.0.0.1:8082/test"), materializer) .onComplete(...)

问题是,每个Service A请求都会创建一个新流,如果有多个并发连接,则会产生 akka.stream.OverflowStrategy $ Fail $ BufferOverflowException:超出配置的max-open-请求值[32]错误

The issue is, a new flow is created per each Service A request, and if there are multiple concurrent connections, it results in akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

我已经问过这个问题,并建议使用单个流如何正确调用Akka HTTP客户端多个(10k - 100k)请求?

I already asked this question and got a suggestion to use a single Flow How to properly call Akka HTTP client for multiple (10k - 100k) requests?

虽然它适用于来自一个地方的一批请求,但我不知道如何使用来自我所有并发的单个流请求处理程序。

While it works for a batch of requests coming from a single place, I don't know how to use a single Flow from all my concurrent request handlers.

这样做的正确Akka-way是什么?

What is the correct "Akka-way" to do it?

推荐答案

我认为您可以使用 Source.queue 来缓冲您的请求。下面的代码假设您需要从第三方服务获得答案,因此非常欢迎拥有 Future [HttpResponse] 。这样你也可以提供溢出策略来防止资源匮乏。

I think you could use Source.queue to buffer your requests. The code below assume that you need to get the answer from 3rd party service, so having a Future[HttpResponse] is very welcomed. This way you could also provide an overflow strategy to prevent resource starvation.

import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy} import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} import scala.util.{Failure, Success} import scala.concurrent.ExecutionContext.Implicits.global implicit val system = ActorSystem("main") implicit val materializer = ActorMaterializer() val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google", port = 80) val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew) .via(pool) .toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) case ((Failure(e), p)) => p.failure(e) }))(Keep.left) .run val promise = Promise[HttpResponse] val request = HttpRequest(uri = "/") -> promise val response = queue.offer(request).flatMap(buffered => { if (buffered) promise.future else Future.failed(new RuntimeException()) }) Await.ready(response, 3 seconds)

(从我的博客文章)

更多推荐

如何使用Akka HTTP从多个actor / web处理程序正确调用单个服务器?

本文发布于:2023-11-25 22:31:13,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631612.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:多个   如何使用   正确   服务器   程序

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!