我有一个期货池,每个期货都使用相同的akka Actor系统-系统中的某些Actor应该是全局的,有些仅在一个Future中使用。
I have a futures pool , and each future works with the same akka Actor System - some Actors in system should be global, some are used only in one future.
val longFutures = for (i <- 0 until 2 ) yield Future { val p:Page = PhantomExecutor(isDebug=true) Await.result( p.open("www.stackoverflow/") ,timeout = 10.seconds) }PhantomExecutor尝试通过 system.actorSelection
PhantomExecutor tryes to use one shared global actor (simple increment counter) using system.actorSelection
def selectActor[T <: Actor : ClassTag](system:ActorSystem,name:String) = { val timeout = Timeout(0.1 seconds) val myFutureStuff = system.actorSelection("akka://"+system.name+"/user/"+name) val aid:ActorIdentity = Await.result(myFutureStuff.ask(Identify(1))(timeout).mapTo[ActorIdentity], 0.1 seconds) aid.ref match { case Some(cacher) => cacher case None => system.actorOf(Props[T],name) } }但是在并发环境中,由于竞争条件,这种方法行不通。
But in concurrent environment this approach does not work because of race condition.
我对此问题只有一个解决方案-在拆分成期货之前创建全球参与者。但这意味着我无法封装顶级库用户的大量隐藏工作。
I know only one solution for this problem - create global actors before splitting to futures. But this means that I can't encapsulate alot of hidden work from top library user.
推荐答案确保首先初始化全局参与者是正确的方法。您不能将它们绑定到同伴对象并从那里引用它们,以便知道它们只会被初始化一次吗?如果您真的不能采用这种方法,则可以尝试使用类似方法来查找或创建演员。它与您的代码相似,但是它包含逻辑,如果达到竞争条件(最多最多次数),则可以(递归地)返回查找/创建逻辑:
You're right in that making sure the global actors are initialized first is the right approach. Can't you tie them to a companion object and reference them from there so you know they will only ever be initialized one time? If you really can't go with such an approach then you could try something like this to lookup or create the actor. It is similar to your code but it include logic to go back through the lookup/create logic (recursively) if the race condition is hit (only up to a max number of times):
def findOrCreateActor[T <: Actor : ClassTag](system:ActorSystem, name:String, maxAttempts:Int = 5):ActorRef = { import system.dispatcher val timeout = 0.1 seconds def doFindOrCreate(depth:Int = 0):ActorRef = { if (depth >= maxAttempts) throw new RuntimeException(s"Can not create actor with name $name and reached max attempts of $maxAttempts") val selection = system.actorSelection(s"/user/$name") val fut = selection.resolveOne(timeout).map(Some(_)).recover{ case ex:ActorNotFound => None } val refOpt = Await.result(fut, timeout) refOpt match { case Some(ref) => ref case None => util.Try(system.actorOf(Props[T],name)).getOrElse(doFindOrCreate(depth + 1)) } } doFindOrCreate() }现在,创建actor时,重试逻辑会针对任何异常触发,因此您可能想要进一步指定(可能通过另一个 recover 组合器)仅在它得到 InvalidActorNameException 时递归,但是您知道了
Now the retry logic would fire for any exception when creating the actor, so you might want to further specify that (probably via another recover combinator) to only recurse when it gets an InvalidActorNameException, but you get the idea.
更多推荐
无种族条件的akka演员选择
发布评论