ML Spark Pyspark
word count 实战
用sc.parallelize创建一个基本的RDD
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)
#lambda 函数
pluralLambdaRDD = wordsRDD.map(lambda x : x+'s')
print pluralLambdaRDD.collect()
#Out:['cats', 'elephants', 'rats', 'rats', 'cats']
#用map 计算每个单词的长度
pluralLengths = (pluralRDD
.map(lambda x: len(x))
.collect())
print pluralLengths
#Out:[4, 9, 4, 4, 4]
创建一个pari RDD,值是 (k,v) k 是key v是value
wordPairs = wordsRDD.map(lambda x : (x,1))
print wordPairs.collect()
#Out: [('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]
用reduceByKey计算个数
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda x,y : x+y)
print wordCounts.collect()
#Out:[('rat', 2), ('elephant', 1), ('cat', 2)]
word count
#各步骤合并在一起
wordCountsCollected = (wordsRDD
.map(lambda x : (x,1))
.reduceByKey(lambda x,y: x+y)
.collect())
print wordCountsCollected
#Out:[('rat', 2), ('elephant', 1), ('cat', 2)]
计算 均值
from operator import add
#去重后记录条数
uniqueWords = wordCounts.count()
#总记录条数
totalCount = (wordCounts
.map(lambda (k,v): (v))
.reduce(add))
#均值
average = totalCount / float(uniqueWords)
print totalCount
#Out: 5
print round(average, 2)
#Out: 1.67
实战##
1.定义wordCount函数,输入wordListRDD,返回 wordCount RDD.
def wordCount(wordListRDD):
"""Creates a pair RDD with word counts from an RDD of words.
Args:
wordListRDD (RDD of str): An RDD consisting of words.
Returns:
RDD of (str, int): An RDD consisting of (word, count) tuples.
"""
return (wordListRDD
.map(lambda x :(x,1))
.reduceByKey(lambda x,y:x+y)
)
print wordCount(wordsRDD).collect()
#Out:[('rat', 2), ('elephant', 1), ('cat', 2)]
2.定义特殊字符处理函数
import re
def removePunctuation(text):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces.
Note:
Only spaces, letters, and numbers should be retained. Other characters should should be
eliminated (e.g. it's becomes its). Leading and trailing spaces should be removed after
punctuation is removed.
Args:
text (str): A string.
Returns:
str: The cleaned up string.
"""
return re.sub(r'[^\w\s]','',text).strip().lower()
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
print removePunctuation(' * Remove punctuation then spaces * ')
#Out:hi you
#Out:no under_score
#Out:remove punctuation then spaces
3.导入文件
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab1', 'shakespeare.txt')
fileName = os.path.join(baseDir, inputPath)
#导入文件,处理特殊字符
shakespeareRDD = (sc
.textFile(fileName, 8)
.map(removePunctuation))
print '\n'.join(shakespeareRDD
.zipWithIndex() # to (line, lineNum)
.map(lambda (l, num): '{0}: {1}'.format(num, l)) # to 'lineNum: line'
.take(10))
"""
Out:
0: 1609
1:
2: the sonnets
3:
4: by william shakespeare
5:
6:
7:
8: 1
9: from fairest creatures we desire increase
10: that thereby beautys rose might never die
"""
4.字符串转成单词
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount
#Out:[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
#Out:927631
5.过滤空字符
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x<>'')
shakeWordCount = shakeWordsRDD.count()
print shakeWordCount
#Out:882996
6.计算Top15 单词
top15WordsAndCounts = (wordCount(shakeWordsRDD)
.takeOrdered(15,key=lambda x: -1 * x[1] ))
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))
"""
Out:
the: 27361
and: 26028
i: 20681
to: 19150
of: 17463
a: 14593
you: 13615
my: 12481
in: 10956
that: 10890
is: 9134
not: 8497
with: 7771
me: 7769
it: 7678
"""