将RDD [T]过滤到类型为T的子类(Filtering an RDD[T] to a subclass of type T)

编程入门 行业动态 更新时间:2024-10-24 16:32:02
将RDD [T]过滤到类型为T的子类(Filtering an RDD[T] to a subclass of type T)

我正在使用Spark来读取文本文件。 每行可以属于不同的案例类。 一旦我将行转换为case类描述的对象,我将它们转换为数据帧并写入HDFS(镶木地板)。 我遇到的问题是我最终得到了抽象类型的RDD,我需要将它约束到特定的case类类型以应用toDF函数。

到目前为止,我已经将我的日志事件定义如下:

abstract class LogEvent final case class Web(datetime: String, ... ) final case class OtherEvent(datetime: String ...)

我在我的文本文件中读取,然后针对模式匹配函数映射行以创建RDD [LogEvent]:

def convertToCase(e: List[String]): LogEvent= e match { case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) }

此时,我希望约束到给定的case类并转换为Spark数据帧。 就像是:

val events = spark.read.textFile(...) .map(_.split(',').toList) .map(convertToCase)

然后我想将RDD [LogEvent]缩减为类型为T的RDD,它可以在集合{Web,OtherEvent}中。 这就是我正在努力的方向。 应用带谓词的过滤器来约束case类并不会改变LogEvent的类型,这意味着我不能调用'toDF()',因为必须在RDD [T]上调用它,其中T是一个特定的case类,而不是抽象类RDD [LogEvent]。

val webEvents = events.filter(someLogic).toDF()

我正在寻找一种方法,我可以将通用RDD降低到特定案例类的RDD。 我试图通过不使用isInstanceOf或asInstanceOf来保持类型安全性。

有一个简单的解决方案吗? 或者我是以错误的方式解决问题?

提前致谢。

I am using Spark to read in a text file. Each line can belong to a different case class. Once I have converted the lines to objects described by the case classes I will convert them to a dataframe and and write to HDFS (parquet). The problem I have is that I end up with an RDD of abstract type, and I need to constrain it to specific case class type to apply the toDF function.

So far I've defined my log events as follows:

abstract class LogEvent final case class Web(datetime: String, ... ) final case class OtherEvent(datetime: String ...)

I am reading in my text file, then mapping lines against a pattern match function to create an RDD[LogEvent]:

def convertToCase(e: List[String]): LogEvent= e match { case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) }

At this point I wish to constrain to a given case class and convert to Spark dataframe. Something like:

val events = spark.read.textFile(...) .map(_.split(',').toList) .map(convertToCase)

I then want to reduce the RDD[LogEvent] down to an RDD of type T, which could be in the set {Web, OtherEvent}. This is what I'm struggling with. Applying a filter with a predicate to constrain to case class doesn't change the type from LogEvent, which means I cannot call 'toDF()' as this must be called on RDD[T] where T is a specific case class, not the abstract class RDD[LogEvent].

val webEvents = events.filter(someLogic).toDF()

I'm looking for a way that I can reduce the generic RDD down to an RDD of a specific case class. I'm trying to achieve this whilst maintaining type safety by not using isInstanceOf or asInstanceOf.

Is there a simple solution to this? Or am I approaching the problem in the wrong way?

Thanks in advance.

最满意答案

你应该使用collect(f: PartialFunction[T, U]): RDD[U]方法( 不要与collect(): Array[T]混淆collect(): Array[T]将结果作为数组发送给驱动程序):

val webEvents = events.collect{ case w: Web => w }.toDF()

collect是map和filter之间的混合:如果输入匹配模式匹配中给出的一种情况,它将输出partial函数给出的值。 否则,它将简单地忽略(即滤除)输入。

请注意,您可能也应该为convertToCase执行此操作,因为您定义的模式匹配不完整,如果遇到意外事件或损坏的行,您可能会在运行时收到错误。 这样做的正确方法是定义

val convertToCase: PartialFunction[List[String], LogEvent] = { case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) }

然后用collect(convertToCase)替换map(convertToCase) collect(convertToCase) 。

You should use the collect(f: PartialFunction[T, U]): RDD[U] method (don't confuse with collect(): Array[T] which sends results as an array to the driver):

val webEvents = events.collect{ case w: Web => w }.toDF()

collect is a mix between map and filter: if the input matches one of the cases given in the pattern matching, it will output the value given by the partial function. Otherwise, it will simply ignore (ie filter out) the input.

Note that you should probably also do this for your convertToCase, since the pattern matching you defined is not complete, and you might get an error at runtime if you encounter an unexpected event, or a corrupted row. The correct way to do this would be to define

val convertToCase: PartialFunction[List[String], LogEvent] = { case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) }

And then to replace map(convertToCase) with collect(convertToCase).

更多推荐

本文发布于:2023-07-16 22:52:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1135401.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:子类   类型   RDD   type   subclass

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!