我正在使用Akka群集,以便在两个pahase中执行分布式计算。首先是 phaseA ,然后是 phaseB 。为了处理阶段,我使用Akka的FSM。
没有硬同步,因此其中一个节点可能达到 phaseB 而其他人仍处于阶段A 。
问题是,一个处于阶段B 向其他人发送与 phaseB相关的消息(它们处于 phaseA ),这导致它们失去了 phaseB相关的消息。
现在,我使用简单的技巧来推迟未知消息:
任意==>自我!任何但是IMO这不是正确的方法。我知道我也可以使用akka调度程序来调度任何,但是我也不喜欢这样。
这里是简化代码:
package不论 import akka.actor._ 对象测试扩展了App { 案例对象PhaseA 案例对象PhaseB 类任何扩展Actor { def phaseA:接收= { case PhaseA => { context.become(phaseB) println( in phaseB now)} case any =>自我!任何} def phaseB:接收= {情况PhaseB => println( got phaseB message!)} def接收= phaseA } val system = ActorSystem( MySystem ) val any = system.actorOf(Props(new Any),name = any) any! PhaseB 任何! PhaseA }在这种情况下推迟消息的正确方法是什么?
解决方案您可以隐藏消息以供以后处理。将 akka.actor.Stash 混合到您的actor中,然后将 stash()您的 phaseB 消息以供日后使用。
当您的FSM处于 phaseA 并收到 phaseB 消息,调用 stash()。当该参与者随后转换为 phaseB 状态时,调用 unstashAll(),所有隐藏的消息将重新传递。 / p>
I'm using akka cluster in order to perform distributed computations in two pahses. First phaseA then phaseB. To handle phases I use akka's FSM.
There is no hard synchronization so one of the nodes may reach phaseB while others are still in phaseA.
The problem is, one in phaseB sends phaseB-related messages to others (they are in phaseA yet) what causes them to loose phaseB-related messages.
For now I use simple trick to postpone unknown messages:
case any => self ! anyBut IMO this is not proper way to do that. I know I can also schedule any using akka scheduler, but I don't like this either.
Here is simplified code:
package whatever import akka.actor._ object Test extends App { case object PhaseA case object PhaseB class Any extends Actor { def phaseA: Receive = { case PhaseA => { context.become(phaseB) println("in phaseB now") } case any => self ! any } def phaseB: Receive = { case PhaseB => println("got phaseB message !") } def receive = phaseA } val system = ActorSystem("MySystem") val any = system.actorOf(Props(new Any), name = "any") any ! PhaseB any ! PhaseA }What is the correct way to postpone messages in such a situation?
解决方案You can stash messages for later processing. Mix akka.actor.Stash into your actors and stash() your phaseB messages for later.
When your FSM is in phaseA and receives a phaseB message, call stash(). When that actor then transitions into the phaseB state, call unstashAll() and all the stashed messages will be redelivered.
更多推荐
在Akka中推迟邮件的正确方法
发布评论