用Akka实现MapReduce(MapReduce implementation with Akka)

系统教程 行业动态 更新时间:2024-06-14 17:03:52
用Akka实现MapReduce(MapReduce implementation with Akka)

我试图在Akka之上实现MapReduce,很幸运找到了Akka Essentials的书。 然而,我发现这个示例实现有两个主要问题,都看起来像是基本的并发设计缺陷,在关于Akka的书中找到这种缺陷非常令人震惊:

完成后,客户端将调用shutdown()但此时不保证消息传递给WCMapReduceServer。 我发现WCMapReduceServer在任何时候都只获得部分客户端消息,然后WCMapReduceServer输出[INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://ClientApplication@192.168.224.65:2552这意味着客户端shutdown()发生在客户端实际管理清除所有未决消息之前。 在客户端代码行41中,我们看到shutdown()发生时不先刷新。 在关闭系统之前,Akka是否有办法强制刷新出站消息?

另一个实际上我已经修复的更大的缺陷是用于向EPR发送信号给MapReduce服务器的方式,假设所有子任务(文件的每一行)都已完成,那么完成主要任务(单词文件)。 他发送一个特殊的String消息DISPLAY_LIST并且这条消息排在最低优先级看代码 。 这里存在的一个大缺陷是,即使DISPLAY_LIST的优先级最低,如果任何Map(或Reduce)任务占用任意长度,那么在所有MapReduce子任务完成之前DISPLAY_LIST消息都会经过,因此此MapReduce示例的结果为non-确定性的,即你可以从每次运行中得到不同的字典。 这个问题可以通过用以下方法替换MapActor#onReceive实现来揭示,即任意长一个Map步骤:

public void onReceive(Object message) { System.out.println("MapActor -> onReceive(" + message + ")"); if (message instanceof String) { String work = (String) message; // ******** BEGIN SLOW DOWN ONE MAP REQUEST if ("Thieves! thieves!".equals(work)) { try { System.out.println("*** sleeping!"); Thread.sleep(5000); System.out.println("*** back!"); } catch (InterruptedException e) { e.printStackTrace(); } } // ******** END SLOW DOWN ONE MAP REQUEST // perform the work List<Result> list = evaluateExpression(work); // reply with the result actor.tell(list); } else throw new IllegalArgumentException("Unknown message [" + message + "]"); }

进一步阅读这本书会发现:

我们有Thread.sleep(),因为不能保证消息的处理顺序。 第一个Thread.sleep()方法确保在我们发送Result消息之前,所有的字符串句子消息都被完全处理。

我很抱歉,但Thread.sleep()从来不是确保任何并发的手段。 因此,毫无疑问,这样的书籍会在他们的例子中充满基本的并发缺陷。

I'm trying to implement MapReduce on top of Akka and was lucky to find the code of the book Akka Essentials. However, I have found two major issues with this example implementation, and both seem like fundamental concurrency design flaws which btw is quite shocking to find in a book about Akka:

Upon completion the Client side will call shutdown() but at that point there is no guarantee that the messages went through to the WCMapReduceServer. I see that the WCMapReduceServer only gets a partial number of Client messages at any time and then WCMapReduceServer outputs [INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://ClientApplication@192.168.224.65:2552 meaning the Client shutdown() happens before the Client actually manages to flush all pending messages. In the Client code line 41 we see the shutdown() takes place without flushing first. Is there a way in Akka to enforce flushing outbound messages before shutting down the system?

The other actually bigger flaw, which I already fixed, is the way used to signal EOF to the MapReduce server that the main task (file of words) is done given that all subtasks (each line of the file) are done. He sends a special String message DISPLAY_LIST and this message is queued with lowest priority see code. The big flaw here is that even though DISPLAY_LIST has the lowest priority, if any Map (or Reduce) task takes arbitrarily long, the DISPLAY_LIST message will go through before all the MapReduce subtasks have completed and therefore the outcome of this MapReduce example is non-deterministic i.e. you can get different dictionaries out of each run. The issue can be revealed by replacing the MapActor#onReceive implementation with the following i.e. make one Map step arbitrarily long:

public void onReceive(Object message) { System.out.println("MapActor -> onReceive(" + message + ")"); if (message instanceof String) { String work = (String) message; // ******** BEGIN SLOW DOWN ONE MAP REQUEST if ("Thieves! thieves!".equals(work)) { try { System.out.println("*** sleeping!"); Thread.sleep(5000); System.out.println("*** back!"); } catch (InterruptedException e) { e.printStackTrace(); } } // ******** END SLOW DOWN ONE MAP REQUEST // perform the work List<Result> list = evaluateExpression(work); // reply with the result actor.tell(list); } else throw new IllegalArgumentException("Unknown message [" + message + "]"); }

Reading the book a bit further one finds:

We have Thread.sleep() because there is no guarantee in which order the messages are processed. The first Thread.sleep() method ensures that all the string sentence messages are processed completely before we send the Result message.

I'm sorry but Thread.sleep() has never been the means of ensuring anything in concurrency. Therefore no wonder books like this will end up full of fundamental concurrency flaws in their examples.

最满意答案

我解决了这两个问题,并将代码迁移到了最新的Akka 2.2-M3版本。

第一个问题的解决方案是让MapReduce远程MasterActor一收到所有消息发出后从客户端发送的TaskInfo通知就立即发回ShutdownInfo通知。 TaskInfo包含MapReduce任务有多少个子任务的信息,例如在这种情况下,文本文件中有多少行。

解决第二个问题的方法是向TaskInfo发送子任务总数。 在这里,AggregatorActor计算它已处理的子任务的数量,并将其与TaskInfo进行比较,并表示作业在匹配时完成(当前仅打印消息)。

输出中显示了有趣且正确的行为:

ClientActor发送一堆“子任务”的消息。 请注意,身份请求模式用于访问远程MapReduce MasterActor的ActorRef。 ClientActor最后发送TaskInfo消息,说明以前发送了多少个子任务。 MasterActor将字符串消息转发给MapActor,然后再转发给ReduceActor 一个MapActor是一个漫长的,即内容“贼!小偷!” 这有点让MapReduce计算速度变慢。 与此同时,MasterActor收到TaskInfo的最后一条消息,并将其返回给ClientActor ShudownInfo ClientActor运行system.shutdown()并终止客户端。 请注意,MapReduce仍处于处理过程中,客户端关闭不会产生干扰。 冗长的MapActor返回并继续处理消息。 AggregatorActor收到TaskInfo并通过计算子任务确认子任务总数已完成并发出完成信号。

该代码可能从我的存储库中获取: https : //github.com/bravegag/akka-mapreduce-example

反馈总是欢迎。

I have solved both problems, and also migrated the code to the latest Akka version 2.2-M3.

The solution to the first issue is to have the MapReduce remote MasterActor send back a ShutdownInfo notification as soon as it gets the TaskInfo notification which is sent from the Client once all messages have been sent. The TaskInfo contains the information of how many subtasks a MapReduce task has e.g. in this case how many lines in the text file.

The solution to the second problem is sending the TaskInfo with the total number of subtasks. Here the AggregatorActor counts the number of subtasks it has processed, compares it to the TaskInfo and signals that the job is done when they match (currently just print a message).

The interesting and correct behavior is shown in the output:

ClientActor sends a bunch of messages which are "subtasks". Note that the Identity request pattern is used to gain access to the ActorRef of the remote MapReduce MasterActor. ClientActor sends last the TaskInfo message saying how many subtasks were previously sent. MasterActor forwards String messages to MapActor which in turns forwards to ReduceActor One MapActor is a lengthy one namely the one with content "Thieves! thieves!" this slows the MapReduce computation a bit. Meanwhile MasterActor receives the TaskInfo last message and sends back to ClientActor the ShudownInfo ClientActor runs system.shutdown() and Client terminates. Note that the MapReduce is still in the middle of the processing and the Client shutdown does not interfere. The lengthy MapActor comes back and the message processing continues. AggregatorActor receives the TaskInfo and by counting the subtasks confirms that the total number of substasks have been completed and signals completion.

The code may be fetch from my repository: https://github.com/bravegag/akka-mapreduce-example

Feedback always welcome.

更多推荐

本文发布于:2023-04-24 12:18:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/dzcp/3d14e22c4e852b42047459d1435e244b.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Akka   MapReduce   implementation

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!