消耗TCP流并将其重定向到另一个接收器(使用Akka流)

编程入门 行业动态 更新时间:2024-10-24 10:21:36
本文介绍了消耗TCP流并将其重定向到另一个接收器(使用Akka流)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个接收器。 该程序应打开服务器套接字,侦听传入的连接,然后使用tcp流。我们的发件人不希望收到我们的答复,因此我们从不发回任何东西-我们只消耗流。在对tcp流进行成帧之后,我们需要将字节转换成更有用的内容并将其发送到接收器。

I try to redirect/forward a TCP stream to another Sink with Akka 2.4.3. The program should open a server socket, listen for incoming connections and then consume the tcp stream. Our sender does not expect/accept replies from us so we never send back anything - we just consume the stream. After framing the tcp stream we need to transform the bytes into something more useful and send it to the Sink.

到目前为止,我尝试了以下操作,但是我特别努力如何不将tcp数据包发送回发送方并正确连接接收器。

I tried the following so far but i struggle especially with the part how to not sending tcp packets back to the sender and to properly connect the Sink.

import scala.util.Failure import scala.util.Success import akka.actor.ActorSystem import akka.event.Logging import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Tcp import akka.stream.scaladsl.Framing import akka.util.ByteString import java.nio.ByteOrder import akka.stream.scaladsl.Flow object TcpConsumeOnlyStreamToSink { implicit val system = ActorSystem("stream-system") private val log = Logging(system, getClass.getName) //The Sink //In reality this is of course a real Sink doing some useful things :-) //The Sink accept types of "SomethingMySinkUnderstand" val mySink = Sink.ignore; def main(args: Array[String]): Unit = { //our sender is not interested in getting replies from us //so we just want to consume the tcp stream and never send back anything to the sender val (address, port) = ("127.0.0.1", 6000) server(system, address, port) } def server(system: ActorSystem, address: String, port: Int): Unit = { implicit val sys = system import system.dispatcher implicit val materializer = ActorMaterializer() val handler = Sink.foreach[Tcp.IncomingConnection] { conn => println("Client connected from: " + conn.remoteAddress) conn handleWith Flow[ByteString] //this is neccessary since we use a self developed tcp wire protocol .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) //here we want to map the raw bytes into something our Sink understands .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) //here we like to connect our Sink to the Tcp Source .to(mySink) //<------ NOT COMPILING } val tcpSource = Tcp().bind(address, port) val binding = tcpSource.to(handler).run() binding.onComplete { case Success(b) => println("Server started, listening on: " + b.localAddress) case Failure(e) => println(s"Server could not bind to $address:$port: ${e.getMessage}") system.terminate() } } class SomethingMySinkUnderstand(x:String) { } }

更新:将其添加到build.sbt文件中以获得必要的Deps

Update: Add this to your build.sbt file to get necessary deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

推荐答案

handleWith 期望 Flow ,即一个具有未连接的入口和未连接的出口的盒子。您有效地提供了来源,因为您已将 Flow 与 Sink 通过使用至操作。

handleWith expects a Flow, i.e. a box with an unconnected inlet and an unconnected outlet. You effectively provide a Source, because you connected the Flow with a Sink by using the to operation.

我认为您可以执行以下操作:

I think you could do the following:

conn.handleWith( Flow[ByteString] .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) .alsoTo(mySink) .map(_ => ByteString.empty) .filter(_ => false) // Prevents sending anything back )

更多推荐

消耗TCP流并将其重定向到另一个接收器(使用Akka流)

本文发布于:2023-11-26 00:45:19,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631983.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:接收器   消耗   重定向   并将其   Akka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!