基于 Fabric 部署分布式爬虫的思考

Python:基于 Fabric 部署分布式爬虫的思考

Fabric 本身是一款用于自动化管理,发布任务和布署应用的工具,在自动化运维中比较常见

当然其他的 连接工具同样优秀,比如 paramiko ,只是 fabric 封装的更好,文档更全,使用也更简单

中文文档 https://fabric-chs.readthedocs.io/zh_CN/chs/

Fabric 是一个用 Python 编写的命令行工具库,它可以帮助系统管理员高效地执行某些任务
一个让通过 SSH 执行 Shell 命令更加 容易 、 更符合 Python 风格 的命令库

image

安装

sudo pip3 install fabric3

如何运行

最终这样都可以运行

fab -f REQUESTS_HTML.py host_type # host_type 是任务函数

or

python3 REQUESTS_HTML.py  # 运行整个文件

实战案例

我是一个不怎么爱说话的人(其实是文笔一般),所以直接贴代码恐怕是最好的分享的方式了

第一个 Demo

from fabric.api import run,cd,env,hosts,execute
env.hosts=['root@2.2.2.2:22']
env.password='pwd2'

def host_type():
    with cd('../home'):
        run('ls')
        run('cd youboy_redis')
        run('cd Youboy && cd youboy && ls && python3 run.py')

print(execute(host_type)) # execute 执行任务

可以很清晰的看到函数执行的顺序,进入 爬虫 目录并运行一个爬虫脚本,就像在本地执行命令一样,这里支持 with 上下文

简单做一个类封装

from fabric.api import run,cd,env,hosts,execute
class H():
    def __init__(self,host,pwd):
        env.hosts=list(hosts)
        env.password=pwd

    def host_type(self):
        with cd('../home'):
            run('ls')
    def run(self):
        print(execute(self.host_type))

h = H('root@2.2.2.2:22','pwd2')
h.run()

嗯,没有问题

并行执行

我们在介绍执行远程命令时曾提到过多台机器的任务默认情况下是串行执行的。
Fabric 支持并行任务,当服务器的任务之间没有依赖时,并行可以有效的加快执行速度。
怎么开启并行执行呢?

在执行”fab”命令时加上”-P”参数

$ fab -P host_type

或者
设置 ”env.parallel” 环境参数为True

from fabric.api import env
env.parallel = True

如果,我们只想对某一任务做并行的话,我们可以在任务函数上加上”@parallel”装饰器:

from fabric.api import parallel
 
@parallel
def runs_in_parallel():
    pass
 
def runs_serially():
    pass

这样即便并行未开启,”runs_in_parallel()”任务也会并行执行。
反过来,我们可以在任务函数上加上”@serial”装饰器:

from fabric.api import serial
 
def runs_in_parallel():
    pass
 
@serial
def runs_serially():
    pass

这样即便并行已经开启,”runs_serially()”任务也会串行执行。

试着做一些事情

配置文件

使用 yaml 格式

[mysql]
host = 127.0.0.1
port = 3306
db = python
user = root
passwd = 123456
charset = utf8

[mongodb]
host = ip
port = 27017
db = QXB

[redis]
host = 127.0.0.1
port = 6379
db = 0

[server]
aliyun1_host = ["公网ip", "ssh密码", 22]
aliyun2_host = ["公网ip", "ssh密码", 22]

读取配置

config.py

from configparser import ConfigParser
import json
config = ConfigParser()
config.read('./conf.yml')  # ['conf.ini'] ['conf.cfg]


# 获取所有的section
# print(config.sections())  # ['mysql', redis]

conf_list = list()
for host in config.options('server'):
    str_host = config.get('server', host)
    json_host = json.loads(str_host)
    conf_list.append(json_host)

print(conf_list)

远程连接服务器以及一些常用操作

import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task

class HA():
    def __init__(self):
        self.host = "root@{host}:{port}"
        self.ssh = "root@{host}:{port}"
        self.env = env
        self.env.warn_only = True # 这样写比较痛快
        self.env.hosts = [
            self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
        self.env.passwords = {
            self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}

 
        print(self.env["hosts"])

    # def Hide_all(self):
    #     with settings(hide('everything'), warn_only=True):  # 关闭显示
    #         result = run('ls')
    #         print(result)  # 命令执行的结果
    #         print(result.return_code)

    # def Show_all(self):
    #     with settings(show('everything'), warn_only=True):  # 显示所有
    #         result = run('docker')
    #         print(str(result.return_code))  # 返回码,0表示正确执行,1表示错误
    #         print(str(result.failed))

    # @task
    # def Prefix(self): # 前缀,它接受一个命令作为参数,表示在其内部执行的代码块,都要先执行prefix的命令参数。
    #     with cd('../home'):
    #         with prefix('echo 123'):
    #             run('echo caonima')


    # def Shell_env(self): # 设置shell脚本的环境变量 
    #     with shell_env(HTTP_PROXY='1.1.1.1'):
    #         run('echo $HTTP_PROXY')


    # def Path_env(self): # 配置远程服务器PATH环境变量,只对当前会话有效,不会影响远程服务器的其他操作,path的修改支持多种模式
    #     with path('/tmp', 'prepend'):
    #         run("echo $PATH")
    #     run("echo $PATH")


    # def Mongo(self): # 尝试连接mongodb数据库  不知道为什么制定端口就不行了
    #     # with remote_tunnel(27017):
    #     run('mongo')


    # def Mysql(self):  # 尝试连接mysql数据库
    #     with remote_tunnel(3306):
    #         run('mysql -u root -p password')

    '''
    指定host时,可以同时指定用户名和端口号: username@hostname:port
    通过命令行指定要多哪些hosts执行人物:fab mytask:hosts="host1;host2"
    通过hosts装饰器指定要对哪些hosts执行当前task
    通过env.reject_unkown_hosts控制未知host的行为,默认True,类似于SSH的StrictHostKeyChecking的选项设置为no,不进行公钥确认。
    '''

    # @hosts('root@ip:22')
    # @task
    # def Get_Ip(self):
    #     run('ifconfig') 
    #     # return run("ip a")

    # @hosts("root@ip:22")
    # @runs_once
    # def Get_One_Ip(self):
    #     run('ifconfig')

    '''
    role是对服务器进行分类的手段,通过role可以定义服务器的角色,
    以便对不同的服务器执行不同的操作,Role逻辑上将服务器进行了分类,
    分类以后,我们可以对某一类服务器指定一个role名即可。
    进行task任务时,对role进行控制。
    '''

    # @roles('web')  # 只对role为db的主机进行操作
    # @task
    # def Roles_Get_Ip():
    #     run('ifconfig')
        

    # def Confirm(self): # 有时候我们在某一步执行错误,会给用户提示,是否继续执行时,confirm就非常有用了,它包含在 fabric.contrib.console中
    #     result = confirm('Continue Anyway?')
    #     print(result)

    # def run_python(self):
    #     run("python3 trigger.py")

    @task
    @parallel
    def celery_call(): # 执行celery任务
        with cd('../home'):
            warn(yellow('----->Celery'))
            puts(green('----->puts'))
            run('cd ./celery_1 && celery -A Celery worker -l info')
            time.sleep(3)
            run('python3 run_tasks.py')
    

    # @task
    # def update_file(): # 上传文件到服务器
    #     with settings(warn_only=True):
    #         local("tar -czf test.tar.gz config.py")
    #         result = put("test.tar.gz", "/home/test.tar.gz")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("put test.tar.gz failed")

    #     with settings(warn_only=True):
    #         local_file_md5 = local("md5sum test.tar.gz",capture=True).split(" ")[0]
    #         remote_file_md5 = run("md5sum /home/test.tar.gz").split(" ")[0]
    #     if local_file_md5 == remote_file_md5:
    #         print(green("local_file == remote_file"))
    #     else:
    #         print(red("local_file != remote"))
    #     run("mkdir /home/test")
    #     run("tar -zxf /home/test.tar.gz -C /home/scp")

    '''
    有一个地方很神奇,self和@task装饰器在类中不能共用,否则会报错
    '''

    # @task
    # def downloads_file(): # get文件到本地
    #     with settings(warn_only=True):
    #         result = get("/home/celery_1", "./")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("get test.tar.gz failed")
    #     local("mkdir ./test")
    #     local("tar zxf ./hh.tar.gz -C ./test")

    # @task
    # @parallel
    # def scp_docker_file():
    #     with settings(warn_only=True):
    #         local("tar -czf docker.tar.gz ../docker")
    #         result = put("docker.tar.gz", "/home/docker.tar.gz")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("put dockerfile failed")
    #     run("mkdir /home/docker")
    #     run("tar -zxf /home/docker.tar.gz -C /home")


    def Run(self):
        execute(self.celery_call)
    

h = HA()
h.Run()

我尽可能添加一些代码注释,更多解释还请参考 fabric 文档啊

看看 docker

只要能够连接到服务器,那么在这些服务器上安装服务也就在情理之中了,比如 docker
来看具体的代码实现

import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task

class HA():
    def __init__(self):
        self.host = "root@{host}:{port}"
        self.ssh = "root@{host}:{port}"
        self.env = env
        self.env.warn_only = True # 这样写比较痛快
        self.env.hosts = [
            self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
        self.env.passwords = {
            self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}


        print(self.env["hosts"])

    
    
    @task
    def get_docker_v(): # 查看docker版本
        with cd('../home'):
            run('docker version')

    @task
    def pull_images(images_name):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker pull {}".format(images_name))
                except:
                    abort("docker pull failed")

    @task
    def push_images(images_name,username_repository,tag):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker tag {image_name} {username_repository}:{tag}".format(images_name=images_name,username_repository=username_repository,tag=tag))
                    run("docker push {username_repository}:{tag}".format(username_repository=username_repository,tag=tag))
                except:
                    abort("docker push failed")

    @task
    def run_docker_images(images_name_tag):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker run -p 4000:80 {}".format(images_name_tag))
                except:
                    abort("docker run failed")


    @task
    @parallel
    def execute_docker_compose():
        with settings(warn_only=True):
            with cd("../home/flask_app"):
                run("docker-compose up")


    @task
    def create_docker_service(service_name,images_name,num=4):
        with settings(warn_only=True):
            with cd("../home/"):
                run("docker service create --name {service_name} -p 4000:80 {images_name}".format(service_name=service_name,images_name=images_name))
                run("docker service scale {service_name}={num}".format(service_name=service_name,num=num))
    
    
    @task
    def stop_docker_service(service_name):
        with settings(warn_only=True):
            with cd("../home/"):
                run("docker service rm {}".format(service_name))

    def Run(self):
        # execute(self.create_docker_service,"demo","3417947630/py:hello")
        execute(self.execute_docker_compose)

h = HA()
h.Run()

嘿嘿,挺好

总结

基于python第三方库 fabric 实现远程ssh分布式调度部署应用,是一种很不错的选择,那么如果用于部署 爬虫的应用呢?
如果你是使用 scrapy 框架编写的爬虫(或者是其他框架,各种脚本也是一样),那么可以直接运行文件上传的方法把完整目录拷贝到目标服务器(当然是批量的)
然后键入爬取的命令,记住 fabric 是支持并行的,就能达到多机协作抓取的目的了

其实在 scrapy 中也可能用 scrapyd 来打包部署分布式爬虫,但是打包过程略为繁琐,而 SSH 连接则比较直接,操作简单
然鹅说到底 fabric 也只是一种自动化运维的工具,本质上也只是把代码拷贝到目标服务器和执行相应的命令而已,并没有像 scrapyd 提供爬虫管理的可视化界面

所以这样看来, Fabric 至少算得上是部署分布式爬虫的一种选择,就是因为部署简单

以上就是我对这个 py 库的一些看法,它为我们日后部署应用和服务提供了更多的选择,多多实战吧 !!

欢迎转载,但要声明出处,不然我顺着网线过去就是一拳。
个人技术博客:http://www.gzky.live

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容