Spark RDD中的多个分区

编程入门 行业动态 更新时间:2024-10-28 16:20:13
本文介绍了Spark RDD中的多个分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

因此,我试图在Play/Scala项目中使用Spark从MySQL数据库获取数据.由于我要接收的行数很大,因此我的目标是从spark rdd获取一个Iterator.这是Spark上下文和配置...

So I am trying to get data from a MySQL database using Spark within a Play/Scala project. Since the amount of rows I am trying to receive is huge, my aim is to get an Iterator from the spark rdd. Here is the Spark context and configuration...

private val configuration = new SparkConf() .setAppName("Reporting") .setMaster("local[*]") .set("spark.executor.memory", "2g") .set("spark.akka.timeout", "5") .set("spark.driver.allowMultipleContexts", "true") val sparkContext = new SparkContext(configuration)

JDBCRDD以及sql查询如下

The JDBCRDD is as follows along with the sql query

val query = """ |SELECT id, date |FROM itembid |WHERE date BETWEEN ? AND ? """.stripMargin val rdd = new JdbcRDD[ItemLeadReportOutput](SparkProcessor.sparkContext, driverFactory, query, rangeMinValue.get, rangeMaxValue.get, partitionCount, rowMapper) .persist(StorageLevel.MEMORY_AND_DISK)

数据太多,无法立即获取.最初,使用较小的数据集可以从rdd.toLocalIterator获得一个迭代器.但是,在这种特定情况下,它无法计算迭代器.因此,我的目标是部分地具有多个分区和接收数据.我不断收到错误消息.正确的方法是什么?

The data is too much to get it at once. At the beginning with smaller data sets it was possible the get an iterator from rdd.toLocalIterator. However in this specific case it can not compute an iterator. So my aim is to have multiple partitions and recevie data part by part. I keep getting errors. What is the correct way of doing this ?

推荐答案

我相信您在阅读MySQL表时遇到堆问题.

I believe that you are facing a heap problem read your MySQL table.

在您的情况下,我要做的是将MySQL的数据提取到存储系统(HDFS,本地)文件中,然后使用spark的上下文textFile进行提取!

What I'll do in your case is to fetch the data from MySQL into the storage system (HDFS, local) files and then I'll use spark's context textFile to fetch it!

示例:

object JDBCExample { def main(args: Array[String]) { val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost/database" val username = "user" val password = "pass" var connection: Connection = null try { Class.forName(driver) connection = DriverManager.getConnection(url, username, password) // This is the tricky part of reading a huge MySQL table you'll need to set your sql statement as following : val statement = connection.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY) statement.setMaxRows(0) statement.setFetchSize(Integer.MIN_VALUE) val resultSet = statement.executeQuery("select * from ex_table") val fileWriter = new FileWriter("output.csv") val writer = new CSVWriter(fileWriter, '\t'); while (resultSet.next()) { val entries = List(... // process result here //...) writer.writeNext(entries.toArray) } writer.close(); } catch { case e: Throwable => e.printStackTrace } connection.close() } }

一旦存储了数据,您就可以读取它:

Once your data is stored you can read it:

val data = sc.textFile("output.csv")

PS:我在代码中使用了一些快捷方式(每个示例均使用CSVWriter),但是您可以将其用作要执行的操作的框架!

PS: I've used some shortcuts (CSVWriter per example) in the code but you can use it as a skeleton to what you are intending to do!

更多推荐

Spark RDD中的多个分区

本文发布于:2023-10-07 20:02:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1470396.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:多个   分区   Spark   RDD

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!