-
为了阐述本章的一些概念,我们选择了bike sharing数据集做实验。这个数据集记录了bike
sharing系统每小时自行车的出租次数。另外还包括日期、时间、天气、季节和节假日等相关信息。[hadoop@master spark]$ wget http://archive.ics.uci.edu/ml/machine-learning-databases/00275/Bike-Sharing-Dataset.zip [hadoop@master spark]$ tar xvf Bike-Sharing-Dataset.zip [hadoop@master spark]$ sed 1d hour.csv > hour_noheader.csv [hadoop@master spark]$ hdfs dfs -put hour_noheader.csv ML/ [hadoop@master spark]$ cat Readme.txt Dataset characteristics ========================================= Both hour.csv and day.csv have the following fields, except hr which is not available in day.csv - instant: record index - dteday : date - season : season (1:springer, 2:summer, 3:fall, 4:winter) - yr : year (0: 2011, 1:2012) - mnth : month ( 1 to 12) - hr : hour (0 to 23) - holiday : weather day is holiday or not (extracted from http://dchr.dc.gov/page/holiday-schedule) - weekday : day of the week - workingday : if day is neither weekend nor holiday is 1, otherwise is 0. + weathersit : - 1: Clear, Few clouds, Partly cloudy, Partly cloudy - 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist - 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds - 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog - temp : Normalized temperature in Celsius. The values are divided to 41 (max) - atemp: Normalized feeling temperature in Celsius. The values are divided to 50 (max) - hum: Normalized humidity. The values are divided to 100 (max) - windspeed: Normalized wind speed. The values are divided to 67 (max) - casual: count of casual users - registered: count of registered users - cnt: count of total rental bikes including both casual and registered =========================================
-
加载和查看数据集
export PYSPARK_DRIVER_PYTHON=/usr/local/program/python2.7/bin/python export PYSPARK_PYTHON=/usr/local/program/python2.7/bin/python[hadoop@master ~]$ pyspark --master yarn --driver-memory 4G >>> raw_data = sc.textFile("ML/hour_noheader.csv") >>> num_data = raw_data.count() >>> records = raw_data.map(lambda x: x.split(",")) >>> first = records.first() >>> print first [u'1', u'2011-01-01', u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16'] >>> print num_data 17379 >>> records.cache() PythonRDD[4] at RDD at PythonRDD.scala:48
-
为了将类型特征表示成二维形式,我们将特征值映射到二元向量中非0的位置
def get_mapping(rdd, idx): return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap() >>> print "Mapping of first categorical feasture column: %s" % get_mapping(records, 2) Mapping of first categorical feasture column: {u'1': 0, u'3': 1, u'2': 2, u'4': 3} #8个类型变量 mappings = [get_mapping(records, i) for i in range(2,10)] >>> print mappings [{u'1': 0, u'3': 1, u'2': 2, u'4': 3}, {u'1': 0, u'0': 1}, {u'11': 0, u'10': 6, u'12': 7, u'1': 1, u'3': 2, u'2': 8, u'5': 3, u'4': 9, u'7': 4, u'6': 10, u'9': 5, u'8': 11}, {u'20': 2, u'21': 14, u'22': 4, u'23': 15, u'1': 6, u'0': 18, u'3': 7, u'2': 19, u'5': 8, u'4': 20, u'7': 9, u'6': 21, u'9': 10, u'8': 22, u'11': 0, u'10': 12, u'13': 1, u'12': 13, u'15': 11, u'14': 23, u'17': 3, u'16': 17, u'19': 5, u'18': 16}, {u'1': 0, u'0': 1}, {u'1': 0, u'0': 3, u'3': 1, u'2': 4, u'5': 2, u'4': 5, u'6': 6}, {u'1': 0, u'0': 1}, {u'1': 0, u'3': 1, u'2': 2, u'4': 3}] #计算完每个变量的映射之后,统计一下最终二元向量的总长度 #这里的len是函数len cat_len = sum(map(len, mappings)) num_len = len(records.first()[11:15]) total_len = num_len + cat_len >>> print "Feature vector length for categorical features: %d" % cat_len Feature vector length for categorical features: 57 >>> print "Feature vector length for numerical features: %d" % num_len Feature vector length for numerical features: 4 >>> print "Total feature vector length: %d" % total_len Total feature vector length: 61
-
为线性模型创建特征向量
用上面的映射函数将所有类型特征转换为二元编码的特征。为了方便对每条记录提取特征和标签,我们分别定义两个辅助函数extract_features和extract_labelfrom pyspark.mllib.regression import LabeledPoint import numpy as np def extract_features(record): cat_vec = np.zeros(cat_len) i = 0 step = 0 for field in record[2:9]: #m是一个字典 m = mappings[i] idx = m[field] cat_vec[idx + step] = 1 i = i + 1 step = step + len(m) num_vec = np.array([float(field) for field in record[10:14]]) return np.concatenate((cat_vec, num_vec)) def extract_label(record): #pyhotn的数组可以看成一个环,所以-1就是第0个的前一个,也就是最后一个 return float(record[-1]) data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r))) first_point = data.first() >>> print "Raw data: " + str(first[2:]) Raw data: [u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16'] >>> print "Label: " + str(first_point.label) Label: 16.0 >>> print "Linear Model feature vector:\n" + str(first_point.features) Linear Model feature vector: [1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.24,0.2879,0.81,0.0] >>> print "Linear Model feature vector length: " + str(len(first_point.features)) Linear Model feature vector length: 61
-
为决策树创建特征向量
决策树模型可以直接使用原始数据(不需要将类型数据用二元向量表示)。因此,只需要创建一个分割函数简单地将所有数值转换为浮点数,最后用numpy的array封装def extract_features_dt(record): return np.array(map(float, record[2:14])) data_dt = records.map(lambda r: LabeledPoint(extract_label(r),extract_features_dt(r))) first_point_dt = data_dt.first() >>> print "Decision Tree feature vector: " + str(first_point_dt.features) Decision Tree feature vector: [1.0,0.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0] >>> print "Decision Tree feature vector length: " + str(len(first_point_dt.features)) Decision Tree feature vector length: 12
Spark构建回归模型(一)
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...