您如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以将请求数量限制为每秒3个请求。我在网上找到了以下示例,但这是针对旧Akka和Akka-streams API的更改,以至于我不知道如何重写它。
How do you throttle Flow in the latest Akka (2.4.6) ? I'd like to throttle Http client flow to limit number of requests to 3 requests per second. I found following example online but it's for old Akka and akka-streams API changed so much that I can't figure out how to rewrite it.
def throttled[T](rate: FiniteDuration): Flow[T, T] = { val tickSource: Source[Unit] = TickSource(rate, rate, () => ()) val zip = Zip[T, Unit] val in = UndefinedSource[T] val out = UndefinedSink[T] PartialFlowGraph { implicit builder => import FlowGraphImplicits._ in ~> zip.left ~> Flow[(T, Unit)].map { case (t, _) => t } ~> out tickSource ~> zip.right }.toFlow(in, out) }此处是迄今为止我最好的尝试
Here is my best attempt so far
def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val ticker = Source.tick(rate, rate, Unit) val zip = builder.add(Zip[T, Unit.type]) val map = Flow[(T, Unit.type)].map { case (value, _) => value } val messageExtractor = builder.add(map) val in = Inlet[T]("Req.in") val out = Outlet[T]("Req.out") out ~> zip.in0 ticker ~> zip.in1 zip.out ~> messageExtractor.in FlowShape.of(in, messageExtractor.out) })虽然它会在我的主要流程中引发异常:)
it throws exception in my main flow though :)
private val queueHttp = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure) .via(throttleFlow(rate)) .via(poolClientFlow) .mapAsync(4) { case (util.Success(resp), any) => val strictFut = resp.entity.toStrict(5 seconds) strictFut.map(ent => (util.Success(resp.copy(entity = ent)), any)) case other => Future.successful(other) } .toMat(Sink.foreach({ case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) => p.success(triedResp -> value) case _ => throw new RuntimeException() }))(Keep.left) .run其中 poolClientFlow 是 Http()(system).cachedHostConnectionPool [Any](baseDomain)
例外是:
Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph. at scala.Predef$.require(Predef.scala:219) at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)推荐答案
这里尝试使用@Qingwei提到的节流方法。关键是不要使用 bindAndHandle(),而要使用 bind()并限制输入连接的流量处理它们。该代码来自 bindAndHandle() 的实现,但是为简化起见,省略了一些错误处理。
Here is an attempt that uses the throttle method as mentioned by @Qingwei. The key is to not use bindAndHandle(), but to use bind() and throttle the flow of incoming connections before handling them. The code is taken from the implementation of bindAndHandle(), but leaves out some error handling for simplicity. Please don't do that in production.
implicit val system = ActorSystem("test") implicit val mat = ActorMaterializer() import system.dispatcher val maxConcurrentConnections = 4 val handler: Flow[HttpRequest, HttpResponse, NotUsed] = complete(LocalDateTime.now().toString) def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] = incomingConnection.flow .watchTermination()(Keep.right) .joinMat(handler)(Keep.left) .run() Http().bind("127.0.0.1", 8080) .throttle(3, 1.second, 1, ThrottleMode.Shaping) .mapAsyncUnordered(maxConcurrentConnections)(handleOneConnection) .to(Sink.ignore) .run()更多推荐
您如何在最新的Akka(2.4.6)中限制Flow?
发布评论