基本上,我来分析HDFS上一些复杂的JSON的星火。
Basically, i have to analyze some complex JSON's on HDFS with Spark.
我用为COM prehensions来(pre)过滤JSON的和提取方法的json4s把它包装成一个案例类
I use "for comprehensions" to (pre)filter the JSON's and "extract" method of json4s to wrap it into a case class
这一件作品很好!
def foo(rdd: RDD[String]) = { case class View(C: String,b: Option[Array[List[String]]], t: Time) case class Time($numberLong: String) implicit val formats = DefaultFormats rdd.map { jsonString => val jsonObj = parse(jsonString) val listsOfView = for { JObject(value) <- jsonObj JField(("v"), JObject(views)) <- value normalized <- views.map(x => (x._2)) } yield normalized }到目前为止好!
So far so good!
当我尝试提取(pre)过滤JSON我CaseClass我得到这样的:
When i try to extract the (pre)filtered JSON to my CaseClass i get this:
在线程异常主org.apache.spark.SparkException:作业已中止由于舞台故障:没有任务序列:java.io.NotSerializableException:org.json4s.DefaultFormats $
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.json4s.DefaultFormats$
这里的code。与萃取:
here the code with extraction:
def foo(rdd: RDD[String]) = { case class View(C: String,b: Option[Array[List[String]]], t: Time) case class Time($numberLong: String) implicit val formats = DefaultFormats rdd.map { jsonString => val jsonObj = parse(jsonString) val listsOfView = for { JObject(value) <- jsonObj JField(("v"), JObject(views)) <- value normalized <- views.map(x => (x._2)) } yield normalized.extract[View] }我已经尝试在斯卡拉WS我的code,其工作! Im在事情HDFS和火花真正的新,所以我将AP preciate提示。
i have already tried my code on a scala ws, and its work! Im really new on things with hdfs and spark, so i would be appreciate a hint.
推荐答案星火序列化的RDD变换和'船'那些工人分布式执行关闭。这强制要求关闭中的所有code(往往还包含对象)应该是可序列化。
Spark serializes the closures on the RDD transformations and 'ships' those to the workers for distributed execution. That mandates that all code within the closure (and often also in the containing object) should be serializable.
该看的 org.json4s.DefaultFormat $ (即性状的同伴对象):
Looking that the impl of org.json4s.DefaultFormat$ (the companion object of that trait):
object DefaultFormats extends DefaultFormats { val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) val UTC = TimeZone.getTimeZone("UTC") }很显然,这个对象不是序列化而无法进行如此。 (ThreadLocal的是其自身的性质不可序列)
It's clear that this object is not serializable and cannot be made so. (ThreadLocal is by its own nature non-serializable)
您似乎没有使用你的code 日期类型,所以你能不能摆脱隐VAL格式= DefaultFormats 或序列化的东西取代DefaultFormats?
You don't seem to be using Date types on your code, so could you get rid of implicit val formats = DefaultFormats or replace DefaultFormats by something serializable?
更多推荐
NotSerializableException与星火json4s
发布评论