问题描述
限时送ChatGPT账号..我有一个文件 .gz 我需要读取这个文件并将时间和文件名添加到这个文件我有一些问题需要你的帮助来推荐一个方法来解决这个问题.
I have a file .gz I need to read this file and add the time and file name to this file I have some problems and need your help to recommend a way for this points.
因为文件被压缩,第一行读取的格式不正确我认为由于编码问题我尝试了下面的代码但没有工作
Because the file is compressed the first line is reading with not the proper format I think due to encoding problem I tried the below code but not working
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
文件具有特殊格式,我需要使用正则表达式将其读入 datafame ==> 我发现的唯一方法是使用 RDD 读取它并将其映射到正则表达式是否有任何方法可以直接读取它DF 并传递正则表达式?
File has special format and I need to read it using Regex into a datafame ==> the only way i found is to read it using RDD and map it to the regex is there any way to read it direct to DF and pass the regex?
val Test_special_format_RawData = sc.textFile("file://"+filename.toString())
.map(line ⇒ line.replace("||", "|NA|NA"))
.map(line ⇒ if (line.takeRight(1) == "|") line+"NA" else line)
.map { x ⇒ regex_var.findAllIn(x).toArray }
import hiveSqlContext.implicits._
val Test_special_format_DF = Test_special_format_RawData.filter { x⇒x.length==30 }
.filter { x⇒x(0) !=header(0) }
.map { x⇒ (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7),
x(8), x(9), x(10), x(11), x(12), x(13), x(14),
x(15),x(16), x(17), x(18), x(19))}.toDF()
val Test_special_format_Tranformed_Data = Test_special_format_DF.withColumn("FileName", lit(filename.getName))
.withColumn("rtm_insertion_date", lit(RTM_DATE_FORMAT.format(Cal.getInstance().getTime())))
我可以忽略任何特殊字符之间的任何分隔符,例如|"^~ ^~ 之间的管道忽略它吗?
Can I ignore any delimiter between any special charachter for example if "|" pipe coming between ^~ ^~ ignore it?
有时数据帧列类型被错误的数据类型接收.我们如何处理这个问题以应用数据质量检查?
Some times the dataframe columns types received by wrong data types. How can we handle this problem to apply data quality checks?
当我尝试使用 Dataframe 从 Spark 插入 hive 时.我可以指定拒绝目录用于取消处理行错误下面是我使用的代码吗?
When I tried to insert into hive from the Spark using Dataframe. Can I specify the rejection Directory for un handle rows error below is the code I used?
Test_special_format_Tranformed_Data.write.partitionBy("rtm_insertion_date")
.mode(SaveMode.Append).insertInto("dpi_Test_special_format_source")
文件示例在这里
推荐答案
我将回答有关文件格式问题的问题.解决方案是覆盖 gzib 的默认扩展格式.
I will answer my question regarding the file format issue. The solution is to override the default extension format for the gzib.
import org.apache.hadoop.iopress.GzipCodec
class TmpGzipCodec extends GzipCodec {
override def getDefaultExtension(): String = ".gz.tmp"
}
现在我们刚刚注册了这个编解码器,在 SparkConf 上设置 spark.hadoop.iopression.codecs:
Now we just registered this codec, setting spark.hadoop.iopression.codecs on SparkConf:
val conf = new SparkConf()
// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.iopression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")
val sc = new SparkContext(conf)
val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")
我发现这个解决方案是链接
I found this solution is this link
对于格式错误的记录,有以下两种解决方案:
Regarding the malformed records, There are two solutions as follow:
案例作为案例类,然后检查它的模式是否与此案例类匹配.逐行解析 RDD,但需要在 spark.csv 库中进行更新.关于分隔符分隔符问题,需要使用RDD和regex.
Regarding delimiter delimiter issue, it required to use RDD with regex.
这篇关于使用特殊格式压缩的 Spark 阅读的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论