使用 Spark DStream 作为 Akka 流的源的惯用方式

编程入门 行业动态 更新时间:2024-10-18 10:28:18
本文介绍了使用 Spark DStream 作为 Akka 流的源的惯用方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在构建一个 REST API,它在 Spark 集群中启动一些计算,并以分块的结果流进行响应.给定有计算结果的 Spark 流,我可以使用

I'm building a REST API that starts some calculation in a Spark cluster and responds with a chunked stream of the results. Given the Spark stream with calculation results, I can use

dstream.foreachRDD()

将数据发送出 Spark.我正在使用 akka-http 发送分块的 HTTP 响应:

to send the data out of Spark. I'm sending the chunked HTTP response with akka-http:

val requestHandler: HttpRequest => HttpResponse = { case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) }

为简单起见,我首先尝试使用纯文本,稍后将添加 JSON 编组.

For simplicity, I'm trying to get plain text working first, will add JSON marshalling later.

但是使用 Spark DStream 作为 Akka 流源的惯用方式是什么?我想我应该能够通过套接字来完成它,但由于 Spark 驱动程序和 REST 端点位于同一个 JVM 上,为此打开一个套接字似乎有点矫枉过正.

But what is the idiomatic way of using the Spark DStream as a Source for the Akka stream? I figured I should be able to do it via a socket but since the Spark driver and the REST endpoint are sitting on the same JVM opening a socket just for this seems a bit of an overkill.

推荐答案

此答案仅适用于旧版本的 spark 和 akka.PH88的回答是最近版本的正确方法.

您可以使用中间 akka.actor.Actor 来提供源(类似于 这个问题).下面的解决方案不是反应式",因为底层 Actor 需要维护一个 RDD 消息缓冲区,如果下游 http 客户端没有足够快地消耗块,这些消息可能会被丢弃.但是无论实现细节如何,都会出现此问题,因为您无法将 akka 流背压的节流"连接到 DStream 以减慢数据速度.这是因为 DStream 没有实现 org.reactivestreams.Publisher .

You can use an intermediate akka.actor.Actor that feeds a Source (similar to this question). The solution below is not "reactive" because the underlying Actor would need to maintain a buffer of RDD messages that may be dropped if the downstream http client isn't consuming chunks quickly enough. But this problem occurs regardless of the implementation details since you cannot connect the "throttling" of the akka stream back-pressure to the DStream in order to slow down the data. This is due to the fact that DStream does not implement org.reactivestreams.Publisher .

基本拓扑为:

DStream --> Actor with buffer --> Source

要构建此拓扑,您必须创建一个类似于实现的 Actor 此处 :

To construct this toplogy you have to create an Actor similar to the implementation here :

//JobManager definition is provided in the link val actorRef = actorSystem actorOf JobManager.props

基于 JobManager 创建一个 ByteStrings(消息)的流 Source.此外,将 ByteString 转换为 HttpEntity.ChunkStreamPart,这是 HttpResponse 所需的:

Create a stream Source of ByteStrings (messages) based on the JobManager. Also, convert the ByteString to HttpEntity.ChunkStreamPart which is what the HttpResponse requires:

import akka.stream.actor.ActorPublisher import akka.stream.scaladsl.Source import akka.http.scaladsl.model.HttpEntity import akka.util.ByteString type Message = ByteString val messageToChunkPart = Flow[Message].map(HttpEntity.ChunkStreamPart(_)) //Actor with buffer --> Source val source : Source[HttpEntity.ChunkStreamPart, Unit] = Source(ActorPublisher[Message](actorRef)) via messageToChunkPart

将 Spark DStream 链接到 Actor,以便将每个传入的 RDD 转换为 ByteString 的 Iterable,然后转发给 Actor:

Link the Spark DStream to the Actor so that each incomining RDD is converted to an Iterable of ByteString and then forwarded to the Actor:

import org.apache.spark.streaming.dstream.Dstream import org.apache.spark.rdd.RDD val dstream : DStream = ??? //This function converts your RDDs to messages being sent //via the http response def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ??? def sendMessageToActor(message : Message) = actorRef ! message //DStream --> Actor with buffer dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}

为 HttpResponse 提供源:

Provide the Source to the HttpResponse:

val requestHandler: HttpRequest => HttpResponse = { case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) }

注意:dstream foreachRDD 行和 HttpReponse 之间应该有很少的时间/代码,因为在 之后,Actor 的内部缓冲区将立即开始填充来自 DStream 的 ByteString 消息foreach 行被执行.

Note: there should be very little time/code between the dstream foreachRDD line and the HttpReponse since the Actor's internal buffer will immediately begin to fill with ByteString message coming from the DStream after the foreach line is executed.

更多推荐

使用 Spark DStream 作为 Akka 流的源的惯用方式

本文发布于:2023-11-25 23:00:05,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631686.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:方式   Spark   DStream   Akka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!