Spark Scala UDP 在监听端口上接收

编程入门 行业动态 更新时间:2024-10-27 23:25:47
本文介绍了Spark Scala UDP 在监听端口上接收的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

中提到的例子spark.apache/docs/latest/streaming-编程指南.html让我在 TCP 流中接收数据包并监听 端口 9999

The example mentioned in spark.apache/docs/latest/streaming-programming-guide.html Lets me receive data packets in a TCP stream and listening on port 9999

import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate

通过在我的 Linux 系统中使用创建数据服务器,我能够通过 TCP 发送数据$ nc -lk 9999

I am able to send data over TCP by creating a data server by using in my Linux system $ nc -lk 9999

问题我需要使用 UDP 和 Scala/Spark 接收来自 android 手机流式传输的流val lines = ssc.socketTextStream("localhost", 9999)仅在 TCP 流中接收.

Question I need to receive stream from an android phone streaming using UDP and the Scala/Spark val lines = ssc.socketTextStream("localhost", 9999) receives ONLY in TCP streams.

如何使用 Scala+Spark 以类似的简单方式接收 UDP 流并创建 Spark DStream.

How can I receive UDP streams in a similar simple manner using Scala+Spark and create Spark DStream.

推荐答案

没有内置的东西,但自己完成它并没有太多的工作.这是我基于自定义 UdpSocketInputDStream[T] 制作的简单解决方案:

There isn't something built in, but it's not too much work to get it done youself. Here is a simple solution I made based on a custom UdpSocketInputDStream[T]:

import java.io._ import java.{ConnectException, DatagramPacket, DatagramSocket, InetAddress} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver import scala.reflect.ClassTag import scala.util.control.NonFatal class UdpSocketInputDStream[T: ClassTag]( _ssc: StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](_ssc) { def getReceiver(): Receiver[T] = { new UdpSocketReceiver(host, port, bytesToObjects, storageLevel) } } class UdpSocketReceiver[T: ClassTag](host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) { var udpSocket: DatagramSocket = _ override def onStart(): Unit = { try { udpSocket = new DatagramSocket(port, InetAddress.getByName(host)) } catch { case e: ConnectException => restart(s"Error connecting to $port", e) return } // Start the thread that receives data over a connection new Thread("Udp Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() } /** Create a socket connection and receive data until receiver is stopped */ def receive() { try { val buffer = new Array[Byte](2048) // Create a packet to receive data into the buffer val packet = new DatagramPacket(buffer, buffer.length) udpSocket.receive(packet) val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength)) // Now loop forever, waiting to receive packets and printing them. while (!isStopped() && iterator.hasNext) { store(iterator.next()) } if (!isStopped()) { restart("Udp socket data stream had no more data") } } catch { case NonFatal(e) => restart("Error receiving data", e) } finally { onStop() } } override def onStop(): Unit = { synchronized { if (udpSocket != null) { udpSocket.close() udpSocket = null } } } }

为了让 StreamingContext 在自身上添加一个方法,我们用一个隐式类来丰富它:

In order to get StreamingContext to add a method on itself, we enrich it with an implicit class:

object Implicits { implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal { def udpSocketStream[T: ClassTag](host: String, port: Int, converter: InputStream => Iterator[T], storageLevel: StorageLevel): InputDStream[T] = { new UdpSocketInputDStream(ssc, host, port, converter, storageLevel) } } }

这就是你如何称呼它:

import java.io.{BufferedReader, InputStream, InputStreamReader} import java.nio.charset.StandardCharsets import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.reflect.ClassTag object TestRunner { import Implicits._ def main(args: Array[String]): Unit = { val sparkContext = new SparkContext("local[*]", "udpTest") val ssc = new StreamingContext(sparkContext, Seconds(4)) val stream = ssc.udpSocketStream("localhost", 3003, bytesToLines, StorageLevel.MEMORY_AND_DISK_SER_2) stream.print() ssc.start() ssc.awaitTermination() } def bytesToLines(inputStream: InputStream): Iterator[String] = { val dataInputStream = new BufferedReader( new InputStreamReader(inputStream, StandardCharsets.UTF_8)) new NextIterator[String] { protected override def getNext(): String = { val nextValue = dataInputStream.readLine() if (nextValue == null) { finished = true } nextValue } protected override def close() { dataInputStream.close() } } } abstract class NextIterator[U] extends Iterator[U] { protected var finished = false private var gotNext = false private var nextValue: U = _ private var closed = false override def next(): U = { if (!hasNext) { throw new NoSuchElementException("End of stream") } gotNext = false nextValue } override def hasNext: Boolean = { if (!finished) { if (!gotNext) { nextValue = getNext() if (finished) { closeIfNeeded() } gotNext = true } } !finished } def closeIfNeeded() { if (!closed) { closed = true close() } } protected def getNext(): U protected def close() } }

这段代码大部分取自Spark提供的SocketInputDStream[T],我只是重用了它.我还获取了 bytesToLines 使用的 NextIterator 的代码,它所做的只是消耗数据包中的行并将其转换为 String.如果你有更复杂的逻辑,你可以通过传递 converter: InputStream => 来提供它.Iterator[T] 你自己的实现.

Most of this code is taken from the SocketInputDStream[T] provided by Spark, I simply re-used it. I also took the code for the NextIterator which is used by bytesToLines, all it does is consume the line from the packet and transform it to a String. If you have more complex logic, you can provide it by passing converter: InputStream => Iterator[T] your own implementation.

用简单的 UDP 数据包测试它:

Testing it with simple UDP packet:

echo -n "hello hello hello!" >/dev/udp/localhost/3003

产量:

------------------------------------------- Time: 1482676728000 ms ------------------------------------------- hello hello hello!

当然,这还需要进一步测试.我还有一个隐藏的假设,即从 DatagramPacket 创建的每个 buffer 都是 2048 字节,这可能是您想要更改的内容.

Of course, this has to be further tested. I also has a hidden assumption that each buffer created from the DatagramPacket is 2048 bytes, which is perhaps something you'll want to change.

更多推荐

Spark Scala UDP 在监听端口上接收

本文发布于:2023-08-07 03:52:08,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1316818.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:端口   Spark   Scala   UDP

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!