脚本二:Youtube频道监测 (Continued...)
3)多频道轮询
一般在自动化搬运之前,先确定一个主题,然后关注多个该主题下的优秀博主的账号(频道),这样就保证了搬运账号的高频率的持续发布,毕竟脚本发帖不累人,当然每天的搬运视频数是多多益善了。
Log文件创建
上一篇文章里,我们知道了如何获取某个Youtube账号的最新一期视频的方法,这一篇,我们先把Pytube放一放,用一个简单的log文件,来管理监测中的每个博主账号的最新发布。这个log文件的作用有:
1)与当前各个频道的最新视频进行比对,发现新作品发布,触发搬运脚本。
2)保存所有已经搬运成功的视频链接及其参数信息,供一些辅助功能使用。
由于这个文件平时既给Python脚本自动读取,也需要偶尔进行人工阅读,因此它既要结构简单可读性强,又要软件读取效率高并最好有现成的开源代码拿来用,那么JSON就是目前最好的选择。由于这个系列教程也面向零基础,所以JSON的格式这里也快速带过一下,熟悉的读者可以直接略过:
以下是JSON格式介绍
JSON格式的几个特点:
- JSON 是纯文本
- JSON 具有"自我描述性"(人类可读)
- JSON 具有层级结构(值中存在值)
- JSON 可通过 Python 进行解析
JSON 语法其实还是很直观很简单的:
- 数据在名称/值对中 --- 格式为 key: value,如 "channel": "NBAHighlights",value可以是数字,字符串,布尔值,数组,对象和空。value可以嵌套。
- 数据由逗号 , 分隔
- 使用斜杆 \ 来转义字符
- 大括号 {} 保存对象 --- 大括号 {} 保存的对象是一个无序的名称/值对集合。一个对象以左括号 { 开始, 右括号 } 结束。每个"键"后跟一个冒号 :,名称/值对使用逗号 , 分隔。如 {"channel": "NBAHighlights", "video":{"id": "123", "name": "playoff game"}}
- 中括号 [] 保存数组,数组可以包含多个对象 --- 中括号 [] 保存的数组是值(value)的有序集合。一个数组以左中括号 [ 开始, 右中括号 ] 结束,值之间使用逗号 , 分隔。如 {"videos": [{"id": "123", "name": "playoff game"}, {"id": "456", "name": "season game"},{"id": "789", "name": "final game"}]}
了解了JSON语法,接下来就可以设计一下自己的log文件。根据上面提到的用途,我们设计了以下元素:
- 每个频道的URL(pytube目前支持的频道格式)及其名称,类别
- 每个频道下面的所有影片(至少包括最新的若干期影片)
- 每个影片的ID,名称,URL,发布日期(Youtube不提供发布时间),已成功发布平台
其中,“已成功发布平台”这个参数后面会用到,主要是当脚本同时在进行多个短视频平台搬运的时候,需要用它来记录某个视频已经成功发布的平台,来追踪搬运情况。
确定了这些元素和他们之间的关联,以下就是JSON结构:
以下是该Log文件的一个范例,假设监测两个Channel,注意其中的频道和影片信息都是假名,并不是真的油管频道和影片:
uploads.json
{
"uploads": [
{
"channelurl": "https://www.youtube.com/channel/samplechannelid",
"channelname": "sample_channel",
"videos": [
{
"id": "samplevideoid",
"url": "https://www.youtube.com/watch?v=samplevideoid",
"name": "sample video",
"publishdate": "2022-11-24 00:00:00",
"published": [
"DY",
"KS",
"XHS"
]
},
{
"id": "samplevideoid2",
"url": "https://www.youtube.com/watch?v=samplevideoid2",
"name": "sample video 2",
"publishdate": "2022-11-24 00:00:00",
"published": [
"DY",
"KS",
"XHS"
]
}
],
"category": "1"
},
{
"channelurl": "https://www.youtube.com/channel/samplechannelid2",
"channelname": "sample_channel_2",
"videos": [
{
"id": "samplevideoid11",
"url": "https://www.youtube.com/watch?v=samplevideoid11",
"name": "sample video 11",
"publishdate": "2022-11-24 00:00:00",
"published": [
"DY",
"KS",
"XHS"
]
}
],
"category": "1"
}]
}
其中published列表中的DY,KS,XHS分别代指抖音,快手,小红书,这里也只是举例,实际操作中可以加入更多的平台简称。有了这个文件,接下来只要在Python代码中增加轮询文件的逻辑即可。
代码编写
首先,利用在《躺着就能涨粉?Python自动化短视频搬运(三)|频道监测》中我们已经可以得到一个频道的最新一期影片的URL,快速上代码:
from pytube import Channel
channel = Channel("https://www.youtube.com/channel/samplechannelid")
new_video_url = channel.video_urls[0]
注意上面的samplechannelid并不存在,请自行替换真实频道ID,获取方法也请参考《躺着就能涨粉?Python自动化短视频搬运(三)|频道监测》的内容。new_video_url的格式通常是“https://www.youtube.com/watch?v=samplevideoid”,samplevideoid在youtube URL里的定义是11位的字符串,因此我们可以直接截取url的最后11位获得video ID:
new_video_id = new_video_url[-11:]
这样我们就可以拿着这个ID去和Log文件中对应Channel的第一个video ID进行比对,发现新影片上架。
在python里读取JSON文件非常简单,首先需要在脚本里打开我们的log文件uploads.json,把该文件内容读取进文件流uploads:
with open("uploads.json", "r") as json_file:
uploads = json_file.read()
json_file.close()
接着需要导入JSON模块,把uploads文件流转换成字典对象,使我们可以在python里解析和构建JSON:
import json
upload_log = json.loads(uploads)
由于是字典类型的对象,我们就可以用dict的相关方法,按照JSON参数的层级进行解析,如我们可以用一个for循环,读出python.json里的所有频道参数:
for everychannel in upload_log['uploads']:
channel_url = everychannel['channelurl']
channel_name = everychannel['channelname']
category = everychannel['category']
latest_video_id = everychannel['videos'][0]['id']
print("channel url: %s, name: %s, category: %s, latest video: %s" % (channel_url, channel_name, category, latest_video_id))
返回结果:
channel url: https://www.youtube.com/channel/samplechannelid, name: sample_channel, category: 1, latest video: samplevideoid
channel url: https://www.youtube.com/channel/samplechannelid2, name: sample_channel_2, category: 1, latest video: samplevideoid11
结合这篇文章描述的各个过程,我们把轮询Log文件得到的每个Channel的上一次记录video ID即latest_video_id和之前通过Pytube Channel接口获取到的真实video ID即new_video_id进行对比,如果不同,就代表该Channel中有新视频产生:
for everychannel in upload_log['uploads']:
channel_url = everychannel['channelurl']
channel_name = everychannel['channelname']
category = everychannel['category']
latest_video_id = everychannel['videos'][0]['id']
print("channel url: %s, name: %s, category: %s, latest video: %s" % (channel_url, channel_name, category, latest_video_id))
channel = Channel(channel_url)
new_video_url = channel.video_urls[0]
new_video_id = new_video_url[-11:]
if new_video_id != latest_video_id:
print("New video found...id: " + new_video_id)
新影片发现后,我们并不急着马上进入发布流程,因为每个视频要做的处理比较多,发布平台也不少,过程中难免会有些意外情况发生造成流程中断,为了让脚本能够在各种中断恢复后,继续之前未完成的操作,现在首先要做的是把这个新视频的信息写入到Log文件里。写入文件和之前的读取过程类似。
首先先构造一个新的字典条目,把新发现的Video信息填入:
video_log = dict(id=new_video_id, url=new_video_url, name=channel.videos[0].title, publishdate=str(channel.videos[0].publish_date), published=[])
插入到原upload_log中该频道video对象里的第一位:
everychannel['videos'].insert(0, video_log)
把upload_log字典转换成字符流,写回到uploads.json文件中:
with open("uploads.json", "w") as json_file:
uploads = json.dumps(upload_log)
json_file.write(uploads)
json_file.close()
执行完上面的代码后,我们就可以在uploads.json文件里看到新添的一串video信息(假设<new_video_id>是在'samplechannelid'这个Channel找到的):
插入新影片后的uploads.json
{
"uploads": [
{
"channelurl": "https://www.youtube.com/channel/samplechannelid",
"channelname": "sample_channel",
"videos": [
{
"id": <new_video_id>,
"url": "https://www.youtube.com/watch?v=<new_video_id>",
"name": <channel.videos[0].title>,
"publishdate": <channel.videos[0].publish_date>,
"published": []
},
......
创建好Log记录了以后,我们就可以进入视频发布流程了。可以看到目前published参数里是一个空列表,说明还没有发布到任何一个平台,可以先定义一个当前可支持的平台列表,也方便未来扩展它:
supported_target_platform = ['KS', 'XHS', 'DY']
首先获取当前Log记录里的最新Video的'published'字段,并和'supported_target_platform'进行比较,如果'supported_target_platform'里某个平台还未出现在published的列表里,则进行该平台的上传操作:
for everyplatform in supported_target_platform:
if everyplatform not in everychannel['videos'][0]['published']:
video_id = everychannel['videos'][0]['id']
print("Now upload " + video_id + " to " + everyplatform)
#进行视频上传的伪代码
video_publish(...)
关于video_publish()的实现,涉及到视频的下载(已介绍),编辑和发布,将是我们以后文章的介绍重点,这里先用伪代码代指进行视频发布。
执行完video_publish()之后,需要对uploads.json文件进行更新,添加published成员:
everychannel['videos'][0]['published'].append(everyplatform)
with open("uploads.json", "w") as json_file:
uploads = json.dumps(uploadlog)
json_file.write(uploads)
json_file.close()
然后重复“for everyplatform in supported_target_platform:”循环,检查下一个published平台。支持的平台都上传成功后,继续'for everychannel in upload_log['uploads']:'大循环,检查下一个Channel频道。调用太频繁可能会导致pytube异常,所以每次循环我们调用time.sleep(120)等待两分钟,虽然这样会使脚本发现新影片上架的时间略微变长,但这几分钟的延迟对搬运来说,可以忽略不计。
至此,我们做完了频道监测的功能,对于需要进行监测的频道,目前脚本有能力进行实时监控,第一时间发现新影片上架,进入发布流程,并且可以依次对多个自媒体平台进行上传。
后面的教程,我们将对具体上传发布过程进行解析。
进入下一篇:《躺着就能涨粉?Python自动化短视频搬运(五)|视频处理》
返回上一篇:《躺着就能涨粉?Python自动化短视频搬运(三)|频道监测》
本篇用到的代码:
from pytube import Channel
import json
import time
supported_target_platform = ['KS', 'XHS', 'DY']
with open("uploads.json", "r") as json_file:
uploads = json_file.read()
json_file.close()
upload_log = json.loads(uploads)
while True:
for everychannel in upload_log['uploads']:
channel_url = everychannel['channelurl']
channel_name = everychannel['channelname']
category = everychannel['category']
latest_video_id = everychannel['videos'][0]['id']
print("channel url: %s, name: %s, category: %s, latest video: %s" % (channel_url, channel_name, category, latest_video_id))
channel = Channel(channel_url)
new_video_url = channel.video_urls[0]
new_video_id = new_video_url[-11:]
if new_video_id != latest_video_id:
print("New video found...id: " + new_video_id)
video_log = dict(id=new_video_id, url=new_video_url, name=channel.videos[0].title, publishdate=str(channel.videos[0].publish_date), published=[])
everychannel['videos'].insert(0, video_log)
with open("uploads.json", "w") as json_file:
uploads = json.dumps(upload_log)
json_file.write(uploads)
json_file.close()
for everyplatform in supported_target_platform:
if everyplatform not in everychannel['videos'][0]['published']:
video_id = everychannel['videos'][0]['id']
print("Now upload " + video_id + " to " + everyplatform)
#进行视频上传的伪代码
video_publish(...)
everychannel['videos'][0]['published'].append(everyplatform)
with open("uploads.json", "w") as json_file:
uploads = json.dumps(uploadlog)
json_file.write(uploads)
json_file.close()
print("Channel loop...")
time.sleep(120)
channel_scan_loop.py