我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个接收器。 该程序应打开服务器套接字,侦听传入的连接,然后使用tcp流。我们的发件人不希望收到我们的答复,因此我们从不发回任何东西-我们只消耗流。在对tcp流进行成帧之后,我们需要将字节转换成更有用的内容并将其发送到接收器。
I try to redirect/forward a TCP stream to another Sink with Akka 2.4.3. The program should open a server socket, listen for incoming connections and then consume the tcp stream. Our sender does not expect/accept replies from us so we never send back anything - we just consume the stream. After framing the tcp stream we need to transform the bytes into something more useful and send it to the Sink.
到目前为止,我尝试了以下操作,但是我特别努力如何不将tcp数据包发送回发送方并正确连接接收器。
I tried the following so far but i struggle especially with the part how to not sending tcp packets back to the sender and to properly connect the Sink.
import scala.util.Failure import scala.util.Success import akka.actor.ActorSystem import akka.event.Logging import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Tcp import akka.stream.scaladsl.Framing import akka.util.ByteString import java.nio.ByteOrder import akka.stream.scaladsl.Flow object TcpConsumeOnlyStreamToSink { implicit val system = ActorSystem("stream-system") private val log = Logging(system, getClass.getName) //The Sink //In reality this is of course a real Sink doing some useful things :-) //The Sink accept types of "SomethingMySinkUnderstand" val mySink = Sink.ignore; def main(args: Array[String]): Unit = { //our sender is not interested in getting replies from us //so we just want to consume the tcp stream and never send back anything to the sender val (address, port) = ("127.0.0.1", 6000) server(system, address, port) } def server(system: ActorSystem, address: String, port: Int): Unit = { implicit val sys = system import system.dispatcher implicit val materializer = ActorMaterializer() val handler = Sink.foreach[Tcp.IncomingConnection] { conn => println("Client connected from: " + conn.remoteAddress) conn handleWith Flow[ByteString] //this is neccessary since we use a self developed tcp wire protocol .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) //here we want to map the raw bytes into something our Sink understands .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) //here we like to connect our Sink to the Tcp Source .to(mySink) //<------ NOT COMPILING } val tcpSource = Tcp().bind(address, port) val binding = tcpSource.to(handler).run() binding.onComplete { case Success(b) => println("Server started, listening on: " + b.localAddress) case Failure(e) => println(s"Server could not bind to $address:$port: ${e.getMessage}") system.terminate() } } class SomethingMySinkUnderstand(x:String) { } }更新:将其添加到build.sbt文件中以获得必要的Deps
Update: Add this to your build.sbt file to get necessary deps
libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"推荐答案
handleWith 期望 Flow ,即一个具有未连接的入口和未连接的出口的盒子。您有效地提供了来源,因为您已将 Flow 与 Sink 通过使用至操作。
handleWith expects a Flow, i.e. a box with an unconnected inlet and an unconnected outlet. You effectively provide a Source, because you connected the Flow with a Sink by using the to operation.
我认为您可以执行以下操作:
I think you could do the following:
conn.handleWith( Flow[ByteString] .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) .alsoTo(mySink) .map(_ => ByteString.empty) .filter(_ => false) // Prevents sending anything back )更多推荐
消耗TCP流并将其重定向到另一个接收器(使用Akka流)
发布评论