一、介绍
一直想爬微信公众号的历史信息,无奈限于腾讯爸爸的微信反爬机制——PC端历史消息只能看到10条,除了通过中间代理采集APP,就没什么招数能拿到数据了,这周在崔大的博客上看的了哎哟卧槽老师发表的利用新接口抓取微信公众号的所有文章,原来6月6日微信团队称对所有公众号开放在图文消息中插入全平台已群发文章链接,就是说我们可以在新建图文信息那里找到一个微信公众号的所有文章,即我们可以获得所有文章的链接
二、流程
你需要有一个订阅号or 公众号or企业号
自己在微信公众平台申请一个就好,我申请的是订阅号(另外两个应该是一样的)你需要登录
使用selenium 驱动浏览器获取cookie的方法,来达到登录的效果
from selenium import webdriver
import time
import json
post = {}
driver = webdriver.Chrome()
driver.get('https://mp.weixin.qq.com/')
time.sleep(2)
driver.find_element_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[1]/div/span/input').clear()
driver.find_element_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[1]/div/span/input').send_keys('你的账号')
driver.find_element_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[2]/div/span/input').clear()
driver.find_element_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[2]/div/span/input').send_keys('你的密码')
driver.find_element_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[3]/label/i').click()
driver.find_element_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[4]/a').click()
time.sleep(20)
# 扫二维码,手速要快!
driver.get('https://mp.weixin.qq.com/')
cookie_items = driver.get_cookies()
for cookie_item in cookie_items:
post[cookie_item['name']] = cookie_item['value']
cookie_str = json.dumps(post)
with open('cookie.txt', 'w+', encoding='utf-8') as f:
f.write(cookie_str)
print(cookie_str)
-
获取所以历史文章的url
1.使用requests携带Cookie、登录获取URL的token
token这玩意儿是啥东西呢,每个微信公众号对应一个token,在之后请求详细页面的时候需要它,如果你的抓取量并不多的话也可以直接手工输入的,这里我们向哎哟卧槽老师学习,来获取我们的token
2.使用获取到的token和公众号的微信号获取到公众号的fakeid
fakeid这玩意儿是啥东西呢,它也是公众号的标识之一,你需要获取你要爬的公众号的fakeid,在之后请求详细页面的时候需要它,当然如果你的抓取的公众号并不多,咱们也是可以直接手工输入的
3.通过获得的token、fakeid等构造索引页的url,访问索引页获取历史消息的url链接
4.遍历
import requests
import json
import re
import random
gzlist = ['gh_b59aa6364380']
header = {
"HOST": "mp.weixin.qq.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"
}
with open('cookie.txt', 'r', encoding='utf-8') as f:
cookie = f.read()
cookies = json.loads(cookie)
def get_token():
url = 'https://mp.weixin.qq.com'
response = requests.get(url=url, cookies=cookies)
#print(response.text)
#print(response.url)
token = re.findall('token=(\d+)', str(response.url))[0]
#print(token)
return token
def get_fakeid(token):
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
for query in gzlist:
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': query,
'begin': '0',
'count': '5',
}
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
'''print(search_response.json())
data=search_response.json().get('list')[0].get('fakeid')
print(data)'''
lists = search_response.json().get('list')[0]
fakeid = lists.get('fakeid')
return fakeid
def query_id_data(token,fakeid,i):
data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': i,
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
return data
def write_to_file(content):
with open('the_urls.text', 'a', encoding='utf-8') as f:
f.write(json.dumps(content, ensure_ascii=False) + '\n')
f.close()
def get_the_link(data):
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
appmsg_response = requests.get(appmsg_url, cookies=cookies, headers=header, fakeid=data)
#print(appmsg_response.json())
fakeid_list =appmsg_response.json().get('app_msg_list')
for item in fakeid_list:
content=item.get('link')
print(content)
write_to_file(content)
def main():
token=get_token()
fakeid=get_fakeid(token)
#num=get_num(token,fakeid)
for i in range(47):
print('正在爬取第%s页' %(i + 1))
data=query_id_data(token, fakeid,5*i)
get_the_link(data)
if __name__ == '__main__':
main()
5.得到的url文件
- 访问上面获得的url,抓取网页中的表白墙内容
import requests
import re
from requests.exceptions import RequestException
import json
def get_url():
with open('the_urls.text', 'r', encoding='utf-8') as f:
f = f.read()
f = f.replace("\n", "") # 去掉换行符
list = f.split('"') # 以"为分隔符把f转化为列表
while '' in list: # 删除列表中所有的空元素
list.remove('')
#print(list)
return list
def get_html(url):
try:
response = requests.get(url)
if response.status_code == 200:
#print(response.text)
return response.text
return None
except RequestException:
print('请求索引页出错')
return None
def get_txt(html):
pattern = re.compile('<section.*?([\u4e00-\u9fa5].*?)</section>', re.S)
txt = re.findall(pattern, html)
pattern = re.compile('[\u4e00-\u9fa5]+', re.S) # 去除原始数据中的标点符号、字母和特殊字符
a = re.findall(pattern, str(txt))
print(a)
write_to_file(a)
def write_to_file(content):
with open('lixin.text', 'a', encoding='utf-8') as f:
f.write(json.dumps(content, ensure_ascii=False) + '\n')
f.close()
def main():
list=get_url()
for url in list:
#get_html(url)
html = get_html(url)
get_txt(html)
if __name__ == '__main__':
main()
- 清理文本数据
import re
import json
f=open("lixin.text",'r',encoding='utf-8').read()
def write_to_file(content):
with open('clean.text', 'a', encoding='utf-8') as f:
f.write(json.dumps(content, ensure_ascii=False) + '\n')
f.close()
def clean(f):
pattern = re.compile('[\u4e00-\u9fa5]+',re.S) #去除原始数据中的标点符号、字母和特殊字符
a = re.findall(pattern,f)
#print(a)
#print(len(f))
#write_to_file(a)
b=['本推送由中博诚通赞助','长按关注立信微生活','立信微生活', '公众号','蓝字','点击上方','微软雅黑' ,'可订阅哦','宋体','黑体']
c=a[:]
for bi in b :
for i in a:
if i == bi:
c.remove(bi)
a = c
#print(a)
f = str(a) # a是列表,我们要把他转化成字符串
a=f.replace('[','')
a=a.replace("'", '')
a=a.replace(',', '')
a=a.replace(']', '')
a=a.replace(' ', '')
print(a)
return a
def main():
a=clean(f)
write_to_file(a)
if __name__ == '__main__':
main()
- 数据可视化
import jieba
from jieba.analyse import extract_tags
from wordcloud import WordCloud,ImageColorGenerator
from os import path
import matplotlib.pyplot as plt
import numpy as np
from snownlp import SnowNLP
f = open("clean.text", 'r', encoding='utf-8').read()
def GeneratePicture( max_words):
tags = extract_tags(f, topK=max_words) #根据tf-idf值找出文件中的关键词
word_freq_dict = dict()
word_list = jieba.lcut(f)
for tag in tags:
freq = word_list.count(tag)
word_freq_dict[tag] = freq
print(word_freq_dict)
a = []
b = []
for i in range(1, 51):
# print(i)
a.append(i * 2.5)
print(a)
for i in range(1, 21):
b.append(i * 250)
d1 = word_freq_dict.keys()
print(d1)
print(type(d1))
d2 = word_freq_dict.values()
print(d2)
fig = plt.figure(figsize=(16, 8), dpi=100)
ax = fig.add_subplot(1, 1, 1)
plt.bar(a, d2, 0.4, color="green")
ax.set_xticks(a)
ax.set_xticklabels(d1, rotation=45, fontsize='small')
ax.set_yticks(b)
plt.show()
plt.savefig('top50.png')
def getb():
g = " ".join(jieba.cut(f))
back_coloring = plt.imread(path.join("爱心.jpg")) # 选取背景图片
word_cloud = WordCloud(font_path='simsun.ttc', # 设置字体
mask=back_coloring, # 设置背景图片
background_color="white", # 背景颜色
max_words=900, # 词云显示的最大词数
max_font_size=70, # 字体最大值
random_state=42)
my_wordcloud = word_cloud.generate(g) # 生成词云图
image_colors = ImageColorGenerator(back_coloring) # 从背景图片生成颜色值
plt.imshow(my_wordcloud)
plt.axis("off")
plt.show()
word_cloud.to_file(path.join("word.png")) # 保存图片
def sentiment(): #进行情感分析
f = open("lixin.text", 'r', encoding='utf-8').read()
f = f.replace(",", '。')
s = SnowNLP(f)
a = []
c = []
for sentence in s.sentences:
# print(sentence)
s1 = SnowNLP(sentence)
z = s1.sentiments
print(z)
a.append(z)
ci = np.random.rand(1)[0]
c.append(20 * ci)
fig = plt.figure(figsize=(16, 8), dpi=100)
ax = fig.add_subplot(1, 1, 1)
plt.scatter(a, c, 0.4, color="green")
ax.set_xlabel('积极情感概率')
#ax.set_title('情感分析图')
plt.show()
plt.savefig('sentiment.png')
def main():
GeneratePicture(50)
getb()
sentiment()
if __name__ == '__main__':
main()
补充
- 文本处理可以多去一些停用词
- 代码比较乱,没怎么整理,各位看官老爷凑合着看吧