因此,我试图在Play/Scala项目中使用Spark从MySQL数据库获取数据.由于我要接收的行数很大,因此我的目标是从spark rdd获取一个Iterator.这是Spark上下文和配置...
So I am trying to get data from a MySQL database using Spark within a Play/Scala project. Since the amount of rows I am trying to receive is huge, my aim is to get an Iterator from the spark rdd. Here is the Spark context and configuration...
private val configuration = new SparkConf() .setAppName("Reporting") .setMaster("local[*]") .set("spark.executor.memory", "2g") .set("spark.akka.timeout", "5") .set("spark.driver.allowMultipleContexts", "true") val sparkContext = new SparkContext(configuration)JDBCRDD以及sql查询如下
The JDBCRDD is as follows along with the sql query
val query = """ |SELECT id, date |FROM itembid |WHERE date BETWEEN ? AND ? """.stripMargin val rdd = new JdbcRDD[ItemLeadReportOutput](SparkProcessor.sparkContext, driverFactory, query, rangeMinValue.get, rangeMaxValue.get, partitionCount, rowMapper) .persist(StorageLevel.MEMORY_AND_DISK)数据太多,无法立即获取.最初,使用较小的数据集可以从rdd.toLocalIterator获得一个迭代器.但是,在这种特定情况下,它无法计算迭代器.因此,我的目标是部分地具有多个分区和接收数据.我不断收到错误消息.正确的方法是什么?
The data is too much to get it at once. At the beginning with smaller data sets it was possible the get an iterator from rdd.toLocalIterator. However in this specific case it can not compute an iterator. So my aim is to have multiple partitions and recevie data part by part. I keep getting errors. What is the correct way of doing this ?
推荐答案我相信您在阅读MySQL表时遇到堆问题.
I believe that you are facing a heap problem read your MySQL table.
在您的情况下,我要做的是将MySQL的数据提取到存储系统(HDFS,本地)文件中,然后使用spark的上下文textFile进行提取!
What I'll do in your case is to fetch the data from MySQL into the storage system (HDFS, local) files and then I'll use spark's context textFile to fetch it!
示例:
object JDBCExample { def main(args: Array[String]) { val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost/database" val username = "user" val password = "pass" var connection: Connection = null try { Class.forName(driver) connection = DriverManager.getConnection(url, username, password) // This is the tricky part of reading a huge MySQL table you'll need to set your sql statement as following : val statement = connection.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY) statement.setMaxRows(0) statement.setFetchSize(Integer.MIN_VALUE) val resultSet = statement.executeQuery("select * from ex_table") val fileWriter = new FileWriter("output.csv") val writer = new CSVWriter(fileWriter, '\t'); while (resultSet.next()) { val entries = List(... // process result here //...) writer.writeNext(entries.toArray) } writer.close(); } catch { case e: Throwable => e.printStackTrace } connection.close() } }一旦存储了数据,您就可以读取它:
Once your data is stored you can read it:
val data = sc.textFile("output.csv")PS:我在代码中使用了一些快捷方式(每个示例均使用CSVWriter),但是您可以将其用作要执行的操作的框架!
PS: I've used some shortcuts (CSVWriter per example) in the code but you can use it as a skeleton to what you are intending to do!
更多推荐
Spark RDD中的多个分区
发布评论