我正在使用Akka 2.4.4,并尝试从Apache HttpAsyncClient迁移(未成功)。
I'm using Akka 2.4.4 and trying to move from Apache HttpAsyncClient (unsuccessfully).
下面是我在项目中使用的简化代码。
Below is simplified version of code that I use in my project.
问题是,如果我向该流程发送了1-3个以上的请求,它将挂起。经过六个小时的调试,到目前为止,我什至无法找到问题所在。我没有在 Decider 中看到异常,错误日志和事件。什么都没有:)
The problem is that it hangs if I send more than 1-3 requests to the flow. So far after 6 hours of debugging I couldn't even locate the problem. I don't see exceptions, error logs, events in Decider. NOTHING :)
我试图将 connection-timeout 的设置减小为1s,以为可能是在等待服务器的响应
I tried reducing connection-timeout setting to 1s thinking that maybe it's waiting for response from the server but it didn't help.
我在做什么错了?
import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers.Referer import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.stream.Supervision.Decider import akka.stream.scaladsl.{Sink, Source} import akka.stream.{ActorAttributes, Supervision} import com.typesafe.config.ConfigFactory import scala.collection.immutable.{Seq => imSeq} import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration import scala.util.Try object Main { implicit val system = ActorSystem("root") implicit val executor = system.dispatcher val config = ConfigFactory.load() private val baseDomain = "www.google" private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config)) private val decider: Decider = { case ex => ex.printStackTrace() Supervision.Stop } private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] = Source.fromIterator(() => items.toIterator) .via(poolClientFlow) .log("Logger")(log = myAdapter) .recoverWith { case ex => println(ex) null } .withAttributes(ActorAttributes.supervisionStrategy(decider)) .runWith(Sink.seq) .map { v => println(s"Got ${v.length} responses in Flow") v.asInstanceOf[Seq[(Try[HttpResponse], T)]] } def main(args: Array[String]) { val headers = imSeq(Referer("www.google/")) val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID" val requests = List.fill(10)(reqPair) val qwe = sendMultipleRequests(requests).map { case responses => println(s"Got ${responses.length} responses") system.terminate() } Await.ready(system.whenTerminated, Duration.Inf) } }代理支持?似乎也不适合我。
推荐答案您需要完全消耗响应的主体,以便连接可用于后续请求。如果您根本不关心响应实体,则可以将其排放到 Sink.ignore ,如下所示:
You need to consume the body of the response fully so that the connection is made available for subsequent requests. If you don't care about the response entity at all, then you can just drain it to a Sink.ignore, something like this:
resp.entity.dataBytes.runWith(Sink.ignore)通过默认配置,使用主机连接池时,最大连接数设置为4。每个池都有其自己的队列,请求等待直到打开的连接之一变为可用为止。如果该队列超过32(默认配置,可以更改,必须为2的幂),那么您将开始看到故障。在您的情况下,您只执行10个请求,所以您没有达到该限制。但是,通过不使用响应实体,您不会释放连接,其他所有东西都只是排在后面,等待连接释放。
By the default config, when using a host connection pool, the max connections is set to 4. Each pool has it's own queue where requests wait until one of the open connections becomes available. If that queue ever goes over 32 (default config, can be changed, must be a power of 2) then yo will start seeing failures. In your case, you only do 10 requests, so you don't hit that limit. But by not consuming the response entity you don't free up the connection and everything else just queues in behind, waiting for the connections to free up.
更多推荐
通过连接池发出HTTP请求时Akka Flow挂起
发布评论