Socket连接和ActorSystem

编程入门 行业动态 更新时间:2024-10-08 03:25:57
本文介绍了Socket连接和ActorSystem的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个应用程序,使用akka,现在我想通过socket连接到它。因此,我使用类似于 scala页面的机器。 但是如果我尝试 tell ,而我有一个打开 OutputStream ,没有收到消息目标。

这是我的源代码:

def main(args:Array [String]){ val port = 1337 val conf = ConfigFactory.load val system = ActorSystem(SDDB, conf.getConfig(SDDB)) val master = system.actorOf(Props [TestActor]) master! a try { val listener = new ServerSocket(port) println(listen on port:+ port) while(true) new ConnectionThread(listener accept,master).start listener close } catch { case e:IOException => System.err.println(Could not listen on port:+ port +。) System.exit(-1)} finally { system.shutdown } } } 案例类ConnectionThread(socket:Socket,master:ActorRef) extends Thread(ConnectionThread){ private val Select_ * =select(\w +)from(\w +)on(\d {4}) - (\d\d) - (\d\d) - d)。r private implicit var id = 0L private implicit val timeout = Timeout(25.0 seconds) master! b override def run { master! c try { master! d val in = new ObjectInputStream(socket getInputStream) master! e val out = new ObjectOutputStream(socket getOutputStream) out writeObject(listening) out flush master! f val command = in.readObject.asInstanceOf [String] println(client sent:'+ command +') //处理命令 master! g out.writeObject(EOF) out.flush out.close in.close socket.close } catch { case e:SocketException => case e:IOException => e printStackTrace } } } class TestActor extends带ActorLogging的Actor { 日志信息(TestActor running) def receive = { case s:String => 日志信息(received:+ s)} }

我得到输出:

侦听端口:1337 [INFO] TestActor running [INFO] received:a [INFO] received:b [INFO] received:c [INFO] received:d

现在我希望它可以直到g,但是我得到:

客户端发送:'从testdata中选择内容2012-07-06'

我发现它工作,直到我打开一个流的套接字,可能是因为告诉和问是基于套接字以及使用套接字的输出流,胎面运行。然后套接字连接工作,但我不能发送任何消息到actor系统。 我没有办法删除连接器和ConnectionThread。如何解决?

解决方案

我必须承认,我没有完全理解文档中的示例。但我想通过使用 ConnectionHelper 而不是直接解决 ActorRef 工作相当不错。 我将我的代码更改为以下内容:

object连接器{ def main(args:Array [String]){ val port = 1337 val conf = ConfigFactory.load val system = ActorSystem(SDDB,conf.getConfig(SDDB) ) // val master = system.actorOf(Props [TestActor],master) // master! a try { val listener = new ServerSocket(port) println(listen on port:+ port) while(true) // new ConnectionThread(listener accept,master.asInstanceOf [TestActor])。start new ConnectionThread(listener accept,system).start listener close } catch { case e:IOException => System.err.println(Could not listen on port:+ port +。) System.exit(-1)} finally { // master ! PoisonPill system.shutdown } } } 案例类ConnectionThread(socket:Socket,sys:ActorSystem)$ b $ (\ w +)on(\d {4}) - (\ w +)上的(\w +)选择(\w +)在线程(ConnectionThread){ private val Select_ * = d \ d) - (\ d\d)。r 私人隐式var id = 0L 私人隐式val超时=超时(25.0秒)私人val conHelper = new ConnectionHelper 覆盖def run { try { val out = new ObjectOutputStream(socket getOutputStream) val in = new ObjectInputStream(socket getInputStream) conHelper告诉funzt out writeObject(Hi) out.flush val command = in.readObject.asInstanceOf [String] println receive:+ command) out writeObject(test) out.flush out writeObject(EOF) out.flush out.close in.close socket.close } } 私人类ConnectionHelper { val tester = sys.actorOf Props [TestActor]) def tell(s:String){tester! s} } }

不真正理解为什么这个工作,代码从我的问题不。我欢迎所有的解释。

I have an application, that uses akka and now I want to connect to it via a socket connection. Therefor I use a machanism similar to the one from the scala page. But if I try to tell, while I have an open OutputStream, no message is received by the target.

Here is my source code:

object Connector { def main(args: Array[String]) { val port = 1337 val conf = ConfigFactory.load val system = ActorSystem("SDDB", conf.getConfig("SDDB")) val master = system.actorOf(Props[TestActor]) master ! "a" try { val listener = new ServerSocket(port) println("listening on port: " + port) while (true) new ConnectionThread(listener accept, master).start listener close } catch { case e: IOException => System.err.println("Could not listen on port: " + port + ".") System.exit(-1) } finally { system.shutdown } } } case class ConnectionThread(socket: Socket, master: ActorRef) extends Thread("ConnectionThread") { private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r private implicit var id = 0L private implicit val timeout = Timeout(25.0 seconds) master ! "b" override def run { master ! "c" try{ master ! "d" val in = new ObjectInputStream(socket getInputStream) master ! "e" val out = new ObjectOutputStream(socket getOutputStream) out writeObject("listening") out flush master ! "f" val command = in.readObject.asInstanceOf[String] println("client sent: '" + command + "'") // process the command master ! "g" out.writeObject("EOF") out.flush out.close in.close socket.close } catch { case e: SocketException => case e: IOException => e printStackTrace } } } class TestActor extends Actor with ActorLogging{ log info("TestActor running") def receive = { case s: String => log info("received: " + s) } }

I get the output:

listening on port: 1337 [INFO] TestActor running [INFO] received: a [INFO] received: b [INFO] received: c [INFO] received: d

Now I expected it to go on until g, but instead I get:

client sent: 'select content from testdata on 2012-07-06'

I figured out that it works until I open a Stream of the socket, probably because tell and ask are socketbased as well and use the outputstream of the socket, the tread runs in. Afterwards the socket connection works, but I am not able to send any message to the actor-system. There is no way for me to drop the Connector and the ConnectionThread. How can I fix it?

解决方案

I must admit, that I did not completly understood the example from the documentation. But I figured out that using a ConnectionHelper instead of directly addressing the ActorRef works pretty good. I changed my code to the following:

object Connector { def main(args: Array[String]) { val port = 1337 val conf = ConfigFactory.load val system = ActorSystem("SDDB", conf.getConfig("SDDB")) // val master = system.actorOf(Props[TestActor], "master") // master ! "a" try { val listener = new ServerSocket(port) println("listening on port: " + port) while (true) // new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start new ConnectionThread(listener accept, system).start listener close } catch { case e: IOException => System.err.println("Could not listen on port: " + port + ".") System.exit(-1) } finally { // master ! PoisonPill system.shutdown } } } case class ConnectionThread(socket: Socket, sys: ActorSystem) extends Thread("ConnectionThread") { private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r private implicit var id = 0L private implicit val timeout = Timeout(25.0 seconds) private val conHelper = new ConnectionHelper override def run { try { val out = new ObjectOutputStream(socket getOutputStream) val in = new ObjectInputStream(socket getInputStream) conHelper tell "funzt" out writeObject ("Hi") out.flush val command = in.readObject.asInstanceOf[String] println("received: " + command) out writeObject ("test") out.flush out writeObject ("EOF") out.flush out.close in.close socket.close } } private class ConnectionHelper { val tester = sys.actorOf(Props[TestActor]) def tell(s: String) { tester ! s } } }

I don't really understand why this works and the code from my question does not. I welcome all explanations.

更多推荐

Socket连接和ActorSystem

本文发布于:2023-11-25 18:01:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1630769.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Socket   ActorSystem

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!