上一篇讲到如何爬取新浪财经和每经网新闻数据,今天分享一下小编如何分析数据。在分词上,小编用了jieba分词类库,但只是利用jieba的词库来分,效果还是没能达到需求预期。所以得要自己加载自定义字典,比如停用词表和新词表。停用词表在https://github.com/dongxiexidian/Chinese/find/master上可以随意下载,新词表的构建就要看分析什么了。比如股票名称,比如,“ST保千里”会被分成“ST”、“保”和“千里”;“尔康制药”会被分成“尔康”和“制药”,这就很尴尬了。
所以我以防万一,就将三千多只股票名称都加载进新词表,然后可以看到,效果很明显。除此之外,一些新词的出现,比如“区块链”,虽然不新,但是对于分词库来说是新的,分出来很可能是“区”和“块链”,所以就要在引用新词表前,添加你觉得有必要的新词。
分词后的结果就是特征集,复杂的特征集势必要降维处理。jieba提供的关键词提取算法和Gensim的LDA主题提取模型(即隐含狄利克雷分配,Latent Dirichlet Allocation),本质上都是进行降维操作。比如用之前在基于Gensim的文本相似度计算中使用的例子做LDA主题分布,如下,可以看到每条新闻的主题。
下面来看看怎么用上抓取的数据作分析。我抓取数据的来源暂时只有新浪财经和每经网。在新浪财经的网页上可以很快识别股票代码,但每经网很多时候只有股票名称。这时候就需要一个关于三千多只股票基本信息作为映射表。这里我选择从Tushare上直接获取放到数据库。在分完词后,需要让程序识别这几个字是代表一只股票而不是“嗯哦啊好”,那么就需要一个股票名称集合作为判断标准。最后映射出股票代码是为了往后的分析,如下图。
可见每条抓取的新闻基本上都映射到一只或多只股票代码。假设我们想知道一只股票相关的新闻有哪些,并导出到csv文件或者储存到新的数据集中,也不难,只是查询聚合的过程。现在我们想判断这条新闻是利好、利空还是中立的,完全走自然语言处理的路还是有点远,但如果和股票价格关联起来处理会显得很舒服。在做这一步之前,本来也想获取Tushare的分笔数据和日线数据。但很可惜,Tushare的分笔数据才抓了几天的量就断请求退出,日线数据还不多。所以后来选择从通达信下载日线数据到存进MongoDB了。回到正题,将新闻时间对应股票价格时间,假设新闻发布后特定天数内(有些论文设3天),股票价格上涨则认为该新闻利好该只股票,相反则利空该股票,价格持平则中立。比如将山西焦化(600740)的历史新闻抽出并贴上性质标签(利好、利空、中立),存到新的数据集,如下图:
相关代码:
# -*- coding: UTF-8 -*-
from gensim import corpora,similarities,models,matutils
from collections import defaultdict
import jieba
import jieba.analyse
from sklearn.cluster import KMeans
import numpy as np
classGensimExp(object):
def__init__(self,chnSTWPath,finance_dict):
self.chnSTWPath = chnSTWPath
self.finance_dict = finance_dict
defrenewFinanceDict(self,new_Word_list): #new_Word_list = ['区块链','余额宝','佣金宝','前海']
with open(self.finance_dict,'a',encoding='utf-8') as file:
for word in new_Word_list:
file.write(word + '\n')
defgetchnSTW(self):
stopwords = [line.strip() for line in open(self.chnSTWPath, 'r').readlines()]
return stopwords
defjieba_tokenize(self):
chnSTW = self.getchnSTW()
corpora_documents = []
jieba.load_userdict(self.finance_dict)
for item_text in self._raw_documents:
outstr = []
sentence_seged = list(jieba.cut(item_text))
for word in sentence_seged:
if word not in chnSTW and word != '\t' \
and word != ' ':
outstr.append(word)
corpora_documents.append(outstr)
return corpora_documents
defRemoveWordAppearOnce(self,corpora_documents):
frequency = defaultdict(int)
for text in corpora_documents:
for token in text:
frequency[token] += 1
corpora_documents = [[token for token in text if frequency[token] > 1] for text in corpora_documents]
return corpora_documents
defgenDictionary(self,documents):
self._raw_documents = documents
corpora_documents = self.jieba_tokenize() #分词
#corpora_documents = self.RemoveWordAppearOnce(corpora_documents) # 去除掉在语料库中只出现一次的单词
self._dictionary = corpora.Dictionary(corpora_documents) # 生成字典和语料
# 计算每一条新闻对应的bow向量
self._corpus = [self._dictionary.doc2bow(text) for text in corpora_documents] # 迭代器
return corpora_documents, self._dictionary, self._corpus
defCalSim(self,test_document,Type,best_num):
if Type == 'Similarity-tfidf-index':
# 统计corpus中出现的每一个特征的IDF值
tfidf_model = models.TfidfModel(self._corpus)
corpus_tfidf = tfidf_model[self._corpus]
self._num_features = len(self._dictionary.token2id.keys())
self._similarity = similarities.Similarity(Type, corpus_tfidf, \
num_features=self._num_features,num_best=best_num)
test_document_cut_raw = list(jieba.cut(test_document))
test_corpus = self._dictionary.doc2bow(test_cut_raw)
# 根据训练后的model,生成IF-IDF值,然后计算相似度
self._test_corpus=tfidf_model[test_corpus]
elif Type == 'Similarity-LSI-index':
lsi_model = models.LsiModel(self._corpus)
corpus_lsi = lsi_model[self._corpus]
self._num_features = len(self._dictionary.token2id.keys())
self._similarity = similarities.Similarity(Type, corpus_lsi, \
num_features=self._num_features,num_best=best_num)
test_cut_raw = list(jieba.cut(test_document))
test_corpus = self._dictionary.doc2bow(test_cut_raw)
self._test_corpus=lsi_model[test_corpus]
self.Print_CalSim()
IdLst = []
SimRltLst = []
SimTxLst = []
for Id, Sim in self._similarity[self._test_corpus]:
IdLst.append(Id)
SimRltLst.append(Sim)
SimTxLst.append(self._raw_documents[Id])
return IdLst,SimTxLst,SimRltLst
defCalTopic(self):
tfidf_model = models.TfidfModel(self._corpus)
self._corpus_tfidf = tfidf_model[self._corpus] #得到每个文本的tfidf向量,稀疏矩阵
self._lda = models.LdaModel(self._corpus_tfidf, id2word=self._dictionary, num_topics=200)
corpus_lda = self._lda[self._corpus_tfidf] #每个文本对应的LDA向量,稀疏的,元素值是隶属与对应序数类的权重
for i in range(len(self._corpus_tfidf)):
self.Print_CalTopic(i)
return corpus_lda
defPrintWorfCloud(self):
from scipy.misc import imread
import matplotlib.pyplot as plt
from wordcloud import WordCloud
corpora_documents = self.jieba_tokenize() #分词
for k in range(len(corpora_documents)):
corpora_documents[k] = ' '.join(corpora_documents[k])
corpora_documents = ' '.join(corpora_documents)
color_mask = imread("C:\\Users\\lenovo\\Desktop\\Text_Mining\\3.jpg")
cloud = WordCloud(font_path="C:\\Windows\\Fonts\\simhei.ttf",mask=color_mask,background_color='white',\
max_words=2000,max_font_size=40)
word_cloud = cloud.generate(corpora_documents)
plt.imshow(word_cloud, interpolation='bilinear')
plt.axis("off")
defPrint_CalTopic(self,idx):
for index, score in sorted(self._lda[self._corpus_tfidf[idx]], key=lambda tup: -1*tup[1]):
print ("Score: {}\t Topic: {}".format(score, self._lda.print_topic(index, 5)))
self._lda.print_topics(30)
defPrint_CalSim(self):
string = 'The Most Similar material is '
i = 1
for tpl in range(len(self._similarity[self._test_corpus])):
if tpl != len(self._similarity[self._test_corpus]) - 1:
string = string + str(self._similarity[self._test_corpus][tpl][0]) \
+ '(' + str(self._similarity[self._test_corpus][tpl][1]) + '),'
# print('<' + str(i) + '>' + '\n' + self._raw_documents[self._similarity[self._test_corpus][tpl][0]])
# print('-' * 50)
else:
string = string + str(self._similarity[self._test_corpus][tpl][0]) \
+ '(' + str(self._similarity[self._test_corpus][tpl][1]) + ')'
# print('<' + str(i) + '>' + '\n' + self._raw_documents[self._similarity[self._test_corpus][tpl][0]])
i += 1
print(string)
import re
import time
from pymongo import MongoClient
from sys import path
path.append('C:\\Users\\lenovo\\Desktop\\Text_Mining')
import gensim_example_2 as ge2
import pandas as pd
import numpy as np
from bson.objectid import ObjectId
import csv
import os
classTextMining(object):
def__init__(self,**kwarg):
self.IP = kwarg['IP']
self.PORT = kwarg['PORT']
self.ConnDB()
self.ge2 = ge2.GensimExp('C:\\Users\\lenovo\\Desktop\\Text_Mining\\Chinese_Stop_Words.txt',\
'C:\\Users\\lenovo\\Desktop\\Text_Mining\\finance_dict.txt')
defConnDB(self):
self._Conn = MongoClient(self.IP, self.PORT)
defextractData(self,dbName,colName,tag_list):
db = self._Conn[dbName]
collection = db.get_collection(colName)
data = []
Dict = {}
for tag in tag_list:
exec(tag + " = collection.distinct('" + tag + "')")
exec("data.append(" + tag + ")")
exec("Dict.update({'" + tag + "' : np.array(" + tag + ")})")
dataFrame = pd.DataFrame(Dict,columns=tag_list)
return dataFrame
defextractStockCodeFromArticle(self,dbName,colName):
db = self._Conn[dbName]
collection = db.get_collection(colName)
idLst = self.extractData(dbName,colName,['_id'])._id
data = self.extractData("Stock","Basic_Info",['name','code'])
articles = []
for _id in idLst:
article = collection.find_one({'_id':ObjectId(_id)})['Article']
articles.append(article)
token, _, _ = self.ge2.genDictionary(articles)
j = 0
for tk in token:
relevantStockName = []
relevantStockCode = []
for k in range(len(tk)):
if len(tk[k]) >= 3 and tk[k] in list(data.name):
relevantStockName.append(tk[k]) # ['农业银行', '建设银行', '中信银行']
relevantStockCode.append(list(data[(data.name == tk[k])].code)[0]) # ['601288', '601939', '601998']
if len(relevantStockCode) != 0:
collection.update({"_id":idLst[j]},{"$set":{"relevantStock":' '.join(relevantStockCode)}})
# print(' [*] finished ' + str(j+1) + ' ... ')
j += 1
defjudgeGoodOrBadNews(self,stockCode,date,judgeTerm):
db = self._Conn['Stock']
collection = db.get_collection(stockCode)
dateLst = self.extractData("Stock",stockCode,['date']).date
days = 0
CloseLst = []
for dt in dateLst:
if dt >= date:
CloseLst.append(float(collection.find_one({'date':dt})['close']))
#print(float(collection.find_one({'date':dt})['close']))
if days >= judgeTerm:
break
days += 1
if CloseLst[-1] > CloseLst[0]:
character = '利好'
elif CloseLst[-1] < CloseLst[0]:
character = '利空'
else:
character = '中立'
return character
defgetNewsOfSpecificStock(self,dbColLst,stockCode,**kwarg):
if kwarg['export'][0] == 'csv':
with open(kwarg['export'][1] + '\\' + stockCode + '.csv', 'a+', newline='',encoding='utf-8') as file:
fieldnames = ['date','address','title','article']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for dbName,colName in dbColLst:
db = self._Conn[dbName]
collection = db.get_collection(colName)
idLst = self.extractData(dbName,colName,['_id'])._id
if dbName == 'Sina_Stock':
for _id in idLst:
keys = ' '.join([k for k in collection.find_one({'_id':ObjectId(_id)}).keys()])
if keys.find('RelevantStock') != -1:
if collection.find_one({'_id':ObjectId(_id)})['RelevantStock'].find(stockCode) != -1:
print(' ' + collection.find_one({'_id':ObjectId(_id)})['Title'])
writer.writerow({'date':collection.find_one({'_id':ObjectId(_id)})['Date'], \
'address':collection.find_one({'_id':ObjectId(_id)})['Address'], \
'title':collection.find_one({'_id':ObjectId(_id)})['Title'], \
'article':collection.find_one({'_id':ObjectId(_id)})['Article']})
elif dbName == 'NBD':
for _id in idLst:
keys = ' '.join([k for k in collection.find_one({'_id':ObjectId(_id)}).keys()])
if keys.find('relevantStock') != -1:
if collection.find_one({'_id':ObjectId(_id)})['relevantStock'].find(stockCode) != -1:
print(' ' + collection.find_one({'_id':ObjectId(_id)})['title'])
writer.writerow({'date':collection.find_one({'_id':ObjectId(_id)})['date'], \
'address':collection.find_one({'_id':ObjectId(_id)})['address'], \
'title':collection.find_one({'_id':ObjectId(_id)})['title'], \
'article':collection.find_one({'_id':ObjectId(_id)})['Article']})
print(' [*] extracting ' + stockCode + ' news from ' + dbName + ' database to CSV file successfully ... ')
elif kwarg['export'][0] == 'database': #new database
for dbName,colName in dbColLst:
db = self._Conn[dbName]
collection = db.get_collection(colName)
idLst = self.extractData(dbName,colName,['_id'])._id
if dbName == 'Sina_Stock':
newdb = self._Conn[kwarg['export'][1]]
newcollection = newdb.get_collection(kwarg['export'][2])
for _id in idLst:
keys = ' '.join([k for k in collection.find_one({'_id':ObjectId(_id)}).keys()])
if keys.find('RelevantStock') != -1:
if collection.find_one({'_id':ObjectId(_id)})['RelevantStock'].find(stockCode) != -1:
character = self.judgeGoodOrBadNews(stockCode,\
collection.find_one({'_id':ObjectId(_id)})['Date'].split(' ')[0].replace('-',''),kwarg['judgeTerm'])
print(' ' + collection.find_one({'_id':ObjectId(_id)})['Title'] + '(' + character + ')')
data = {'Date' : collection.find_one({'_id':ObjectId(_id)})['Date'],
'Address' : collection.find_one({'_id':ObjectId(_id)})['Address'],
'Title' : collection.find_one({'_id':ObjectId(_id)})['Title'],
'Article' : collection.find_one({'_id':ObjectId(_id)})['Article'],
'Character' : character}
newcollection.insert_one(data)
elif dbName == 'NBD':
newdb = self._Conn[kwarg['export'][1]]
newcollection = newdb.get_collection(kwarg['export'][2])
for _id in idLst:
keys = ' '.join([k for k in collection.find_one({'_id':ObjectId(_id)}).keys()])
if keys.find('relevantStock') != -1:
if collection.find_one({'_id':ObjectId(_id)})['relevantStock'].find(stockCode) != -1:
character = self.judgeGoodOrBadNews(stockCode,\
collection.find_one({'_id':ObjectId(_id)})['date'].split(' ')[0].replace('-',''),kwarg['judgeTerm'])
print(' ' + collection.find_one({'_id':ObjectId(_id)})['title'] + '(' + character + ')')
data = {'Date' : collection.find_one({'_id':ObjectId(_id)})['date'],
'Address' : collection.find_one({'_id':ObjectId(_id)})['address'],
'Title' : collection.find_one({'_id':ObjectId(_id)})['title'],
'Article' : collection.find_one({'_id':ObjectId(_id)})['Article'],
'Character' : character}
newcollection.insert_one(data)
if __name__ == '__main__':
t1 = time.time()
Obj = TextMining(IP="localhost",PORT=27017)
Obj.extractStockCodeFromArticle("NBD","nbd_news_company") # 从每经网的新闻中抽出相关的股票代码
Obj.getNewsOfSpecificStock([("NBD","nbd_news_company"),("Sina_Stock","sina_news_company")],\
"000402",export=['csv','C:\\Users\\lenovo\\Desktop\\Text_Mining']) # 导出某只股票新闻到CSV
Obj.getNewsOfSpecificStock([("NBD","nbd_news_company"),("Sina_Stock","sina_news_company")],\
"600740",export=['database','Stock_News','600740'],judgeTerm=5) # 抽出新闻存到新的数据库并贴上利好利空标签
t2 = time.time()
print(' running time:', t2 - t1)
相关链接:
http://blog.csdn.net/tianbwin2995/article/details/51768574
http://blog.csdn.net/u012052268/article/details/77825981
http://www.mamicode.com/info-detail-1425363.html