练习使用Spark and ML

编程入门 行业动态 更新时间:2024-10-09 00:40:42

练习使用<a href=https://www.elefans.com/category/jswz/34/1769717.html style=Spark and ML"/>

练习使用Spark and ML

按照这篇博客的步骤进行。其中有些注意事项列在下面。系统环境为JDK 1.7,Spark 1.2.0, Scala 2.10.4。


1. 下载joda-convert 1.2, 将jar包加入IntelliJ IDEA的Library路径。


2. 下载joda-time, 将jar包加入IntelliJ IDEA的Library路径。


3. 测试代码如下。

import org.apache.spark.rdd._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import au.bytecode.opencsv.CSVReaderimport java.io._
import org.joda.time._
import org.joda.time.format._import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.StandardScalerimport org.apache.spark.mllib.classification.SVMWithSGDcase class DelayRec(year: String,month: String,dayOfMonth: String,dayOfWeek: String,crsDepTime: String,depDelay: String,origin: String,distance: String,cancelled: String) {val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007","09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007","01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008","09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")def gen_features: (String, Array[Double]) = {val values = Array(depDelay.toDouble,month.toDouble,dayOfMonth.toDouble,dayOfWeek.toDouble,get_hour(crsDepTime).toDouble,distance.toDouble,days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt))new Tuple2(to_date(year.toInt, month.toInt, dayOfMonth.toInt), values)}def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2)def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = {val sampleDate = new DateTime(year, month, day, 0, 0)holidays.foldLeft(3000) { (r, c) =>val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)math.min(r, distance)}}
}object MyApp {// function to do a preprocessing step for a given filedef prepFlightDelays(sc: SparkContext, infile: String): RDD[DelayRec] = {val data = sc.textFile(infile)data.map { line =>val reader = new CSVReader(new StringReader(line))reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))}.map(list => list(0)).filter(rec => rec.year != "Year").filter(rec => rec.cancelled == "0").filter(rec => rec.origin == "ORD")}def parseData(vals: Array[Double]): LabeledPoint = {LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))}// Function to compute evaluation metricsdef eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = {val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDoubleval tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDoubleval fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDoubleval fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDoubleval precision = tp / (tp+fp)val recall = tp / (tp+fn)val F_measure = 2*precision*recall / (precision+recall)val accuracy = (tp+tn) / (tp+tn+fp+fn)new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy))}def main (args: Array[String]) {val conf = new SparkConf().setAppName("MyApp").setMaster("spark://node1:7077").set("spark.executor.memory", "2g")val sc = new SparkContext(conf)val data_2007 = prepFlightDelays(sc, "hdfs://node1:9000/airline/delay/2007.csv").map(rec => rec.gen_features._2)val data_2008 = prepFlightDelays(sc, "hdfs://node1:9000/airline/delay/2008.csv").map(rec => rec.gen_features._2)//data_2007.take(5).map(x => x mkString ",").foreach(println)// Prepare training setval parsedTrainData = data_2007.map(parseData)parsedTrainData.cacheval scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))scaledTrainData.cache// Prepare test/validation setval parsedTestData = data_2008.map(parseData)parsedTestData.cacheval scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))scaledTestData.cache//scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)// Build the SVM modelval svmAlg = new SVMWithSGD()svmAlg.optimizer.setNumIterations(100).setRegParam(1.0).setStepSize(1.0)val model_svm = svmAlg.run(scaledTrainData)// Predictval labelsAndPreds_svm = scaledTestData.map { point =>val pred = model_svm.predict(point.features)(pred, point.label)}val m_svm = eval_metrics(labelsAndPreds_svm)._2println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3)))sc.stop()}
}

4. 运行

spark-submit --jars /root/StudyLab.jar --class MyApp hdfs:/airline/program/StudyLab.jar


更多推荐

练习使用Spark and ML

本文发布于:2024-03-08 16:52:03,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1721510.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Spark   ML

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!