您如何在最新的Akka(2.4.6)中限制Flow?

编程入门 行业动态 更新时间:2024-10-25 06:30:41
本文介绍了您如何在最新的Akka(2.4.6)中限制Flow?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

您如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以将请求数量限制为每秒3个请求。我在网上找到了以下示例,但这是针对旧Akka和Akka-streams API的更改,以至于我不知道如何重写它。

How do you throttle Flow in the latest Akka (2.4.6) ? I'd like to throttle Http client flow to limit number of requests to 3 requests per second. I found following example online but it's for old Akka and akka-streams API changed so much that I can't figure out how to rewrite it.

def throttled[T](rate: FiniteDuration): Flow[T, T] = { val tickSource: Source[Unit] = TickSource(rate, rate, () => ()) val zip = Zip[T, Unit] val in = UndefinedSource[T] val out = UndefinedSink[T] PartialFlowGraph { implicit builder => import FlowGraphImplicits._ in ~> zip.left ~> Flow[(T, Unit)].map { case (t, _) => t } ~> out tickSource ~> zip.right }.toFlow(in, out) }

此处是迄今为止我最好的尝试

Here is my best attempt so far

def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val ticker = Source.tick(rate, rate, Unit) val zip = builder.add(Zip[T, Unit.type]) val map = Flow[(T, Unit.type)].map { case (value, _) => value } val messageExtractor = builder.add(map) val in = Inlet[T]("Req.in") val out = Outlet[T]("Req.out") out ~> zip.in0 ticker ~> zip.in1 zip.out ~> messageExtractor.in FlowShape.of(in, messageExtractor.out) })

虽然它会在我的主要流程中引发异常:)

it throws exception in my main flow though :)

private val queueHttp = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure) .via(throttleFlow(rate)) .via(poolClientFlow) .mapAsync(4) { case (util.Success(resp), any) => val strictFut = resp.entity.toStrict(5 seconds) strictFut.map(ent => (util.Success(resp.copy(entity = ent)), any)) case other => Future.successful(other) } .toMat(Sink.foreach({ case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) => p.success(triedResp -> value) case _ => throw new RuntimeException() }))(Keep.left) .run

其中 poolClientFlow 是 Http()(system).cachedHostConnectionPool [Any](baseDomain)

例外是:

Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph. at scala.Predef$.require(Predef.scala:219) at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)

推荐答案

这里尝试使用@Qingwei提到的节流方法。关键是不要使用 bindAndHandle(),而要使用 bind()并限制输入连接的流量处理它们。该代码来自 bindAndHandle() 的实现,但是为简化起见,省略了一些错误处理。

Here is an attempt that uses the throttle method as mentioned by @Qingwei. The key is to not use bindAndHandle(), but to use bind() and throttle the flow of incoming connections before handling them. The code is taken from the implementation of bindAndHandle(), but leaves out some error handling for simplicity. Please don't do that in production.

implicit val system = ActorSystem("test") implicit val mat = ActorMaterializer() import system.dispatcher val maxConcurrentConnections = 4 val handler: Flow[HttpRequest, HttpResponse, NotUsed] = complete(LocalDateTime.now().toString) def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] = incomingConnection.flow .watchTermination()(Keep.right) .joinMat(handler)(Keep.left) .run() Http().bind("127.0.0.1", 8080) .throttle(3, 1.second, 1, ThrottleMode.Shaping) .mapAsyncUnordered(maxConcurrentConnections)(handleOneConnection) .to(Sink.ignore) .run()

更多推荐

您如何在最新的Akka(2.4.6)中限制Flow?

本文发布于:2023-11-26 00:49:05,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631993.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何在   最新   Akka   Flow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!