Python+Scikit"/>
练习使用Python+Scikit
按照这篇博客的步骤进行。由于系统中没有安装PIG,故没有按文中的方式生成训练和测试数据,而是用Spark生成。系统环境为JDK 1.7,Spark 1.2.0, Scala 2.10.4,Python 2.7. Python最好使用集成安装包如Anaconda安装,会安装大部分扩展包。
1. 安装pydoop
可以使用pydoop库访问HDFS。下载后解压,在根目录执行
python setup.py build
python setup.py install --skip-build
2. 从原始数据生成特征数据
这里利用了Spark生成特征数据,joda包的安装参考上篇博客。在IntelliJ IDEA中直接运行以下代码就可将生成数据存入HDFS。
import org.apache.spark.rdd._ import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import au.bytecode.opencsv.CSVReaderimport java.io._ import org.joda.time._ import org.joda.time.format._case class DelayRec(year: String,month: String,dayOfMonth: String,dayOfWeek: String,crsDepTime: String,depDelay: String,origin: String,distance: String,cancelled: String) {val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007","09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007","01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008","09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")def gen_features: String = {"%s,%s,%s,%s,%s,%s,%d".format(depDelay, month,dayOfMonth, dayOfWeek, get_hour(crsDepTime), distance,days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt))}def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2)def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = {val sampleDate = new DateTime(year, month, day, 0, 0)holidays.foldLeft(3000) { (r, c) =>val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)math.min(r, distance)}} }object MyApp {// function to do a preprocessing step for a given filedef prepFlightDelays(sc: SparkContext, infile: String): RDD[DelayRec] = {val data = sc.textFile(infile)data.map { line =>val reader = new CSVReader(new StringReader(line))reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))}.map(list => list(0)).filter(rec => rec.year != "Year").filter(rec => rec.cancelled == "0").filter(rec => rec.origin == "ORD")}def main (args: Array[String]) {val conf = new SparkConf().setAppName("MyApp").setMaster("local").set("spark.executor.memory", "600m")val sc = new SparkContext(conf)val data_2007 = prepFlightDelays(sc, "hdfs://node1:9000/airline/delay/2007.csv").map(rec => rec.gen_features).saveAsTextFile("hdfs://node1:9000/airline/delay/ord_2007_1")val data_2008 = prepFlightDelays(sc, "hdfs://node1:9000/airline/delay/2008.csv").map(rec => rec.gen_features).saveAsTextFile("hdfs://node1:9000/airline/delay/ord_2008_1")sc.stop()} }
3. 启动Spyder,在新建py文件中加入如下代码,运行,观看结果。这里调用Skicit-learn中的逻辑回归和随机森林算法进行分类。# Python library imports: numpy, random, sklearn, pandas, etcimport warnings warnings.filterwarnings('ignore')import sys import random import numpy as npfrom sklearn import linear_model, cross_validation, metrics, svm from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScalerimport pandas as pd import matplotlib.pyplot as pltimport pydoop.hdfs as hdfs# function to read HDFS file into dataframe using PyDoop def read_csv_from_hdfs(path, cols, col_types=None):files = hdfs.ls(path);pieces = []for f in files:pieces.append(pd.read_csv(hdfs.open(f), names=cols, dtype=col_types))return pd.concat(pieces, ignore_index=True)# read files cols = ['delay', 'month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday'] col_types = {'delay': int, 'month': int, 'day': int, 'dow': int, 'hour': int, 'distance': int, 'days_from_holiday': int} data_2007 = read_csv_from_hdfs('hdfs://node1:9000/airline/delay/ord_2007_1', cols, col_types) data_2008 = read_csv_from_hdfs('hdfs://node1:9000/airline/delay/ord_2008_1', cols, col_types)data_2007['DepDelayed'] = data_2007['delay'].apply(lambda x: x>=15) print "total flights: " + str(data_2007.shape[0]) print "total delays: " + str(data_2007['DepDelayed'].sum())# Select a Pandas dataframe with flight originating from ORD # Compute average number of delayed flights per month grouped = data_2007[['DepDelayed', 'month']].groupby('month').mean()# plot average delays by month grouped.plot(kind='bar')# Compute average number of delayed flights by hour grouped = data_2007[['DepDelayed', 'hour']].groupby('hour').mean()# plot average delays by hour of day grouped.plot(kind='bar')# Create training set and test set cols = ['month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday'] train_y = data_2007['delay'] >= 15 train_x = data_2007[cols]test_y = data_2008['delay'] >= 15 test_x = data_2008[cols]# Create logistic regression model with L2 regularization clf_lr = linear_model.LogisticRegression(penalty='l2', class_weight='auto') clf_lr.fit(train_x, train_y)# Predict output labels on test set pr = clf_lr.predict(test_x)# display evaluation metrics cm = confusion_matrix(test_y, pr) print("Confusion matrix") print(pd.DataFrame(cm)) report_lr = precision_recall_fscore_support(list(test_y), list(pr), average='micro') print "\nprecision = %0.2f, recall = %0.2f, F1 = %0.2f, accuracy = %0.2f\n" % \(report_lr[0], report_lr[1], report_lr[2], accuracy_score(list(test_y), list(pr)))# Create Random Forest classifier with 50 trees clf_rf = RandomForestClassifier(n_estimators=50, n_jobs=-1) clf_rf.fit(train_x, train_y)# Evaluate on test set pr = clf_rf.predict(test_x)# print results cm = confusion_matrix(test_y, pr) print("Confusion matrix") print(pd.DataFrame(cm)) report_svm = precision_recall_fscore_support(list(test_y), list(pr), average='micro') print "\nprecision = %0.2f, recall = %0.2f, F1 = %0.2f, accuracy = %0.2f\n" % \(report_svm[0], report_svm[1], report_svm[2], accuracy_score(list(test_y), list(pr)))
更多推荐
练习使用Python+Scikit
发布评论