如何在不使用功能内的sparkContext的spark函数中读取HDFS中的文件.
How can a file from HDFS be read in a spark function not using sparkContext within the function.
示例:
val filedata_rdd = rdd.map { x => ReadFromHDFS(x.getFilePath) }问题是如何实现ReadFromHDFS?通常要从HDFS读取数据,我们可以做一个sc.textFile,但在这种情况下不能在函数中使用sc.
Question is how ReadFromHDFS can be implemented?Usually to read from HDFS we could do a sc.textFile but in this case sc cannot be used in the function.
推荐答案您不一定需要服务上下文即可与HDFS进行交互.您可以简单地从master广播hadoop配置,并在执行程序上使用广播的配置值来构造hadoop.fs.FileSystem.那世界就是你. :)
You don't necessarily need service context to interact with HDFS. You can simply broadcast the hadoop configuration from master and use the broadcasted configuration value on executors to construct a hadoop.fs.FileSystem. Then the world is your. :)
以下是代码:
import java.io.StringWriter import com.sachin.util.SparkIndexJobHelper._ import org.apachemons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.spark.{SerializableWritable, SparkConf} class Test { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[15]") .setAppName("TestJob") val sc = createSparkContext(conf) val confBroadcast = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration)) val rdd: RDD[String] = ??? // your existing rdd val filedata_rdd = rdd.map { x => readFromHDFS(confBroadcast.value.value, x) } } def readFromHDFS(configuration: Configuration, path: String): String = { val fs: FileSystem = FileSystem.get(configuration) val inputStream = fs.open(new Path(path)); val writer = new StringWriter(); IOUtils.copy(inputStream, writer, "UTF-8"); writer.toString(); } }更多推荐
从Spark转换功能内部从HDFS动态读取文件
发布评论