最近在项目中遇到一个需求,前端发来一个命令,这个命令是去执行传递过来的一个脚本(shell 或者python),并返回脚本的标准输出和标准出错,如果执行超过设定时间还没结束就超时,然后终止脚本的执行。实现这个功能,自然而然先想到的是subprocess这个库了。
因此,在后端的一个脚本中调用python的subprocess去执行传递过来的脚本,通常情况下subprocess都能运行的很好,完成脚本的执行并返回。最初的实现代码如下:
run_cmd.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import subprocess
from threading import Timer
import os
class test(object):
def __init__(self):
self.stdout = []
self.stderr = []
self.timeout = 10
self.is_timeout = False
pass
def timeout_callback(self, p):
print 'exe time out call back'
print p.pid
try:
p.kill()
except Exception as error:
print error
def run(self):
cmd = ['bash', '/home/XXXX/test.sh']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
my_timer = Timer(self.timeout, self.timeout_callback, [p])
my_timer.start()
try:
print "start to count timeout; timeout set to be %d \n" % (self.timeout,)
stdout, stderr = p.communicate()
exit_code = p.returncode
print exit_code
print type(stdout), type(stderr)
print stdout
print stderr
finally:
my_timer.cancel()
但是偶然间测试一个shell脚本,这个shell脚本中有一行ping www.baidu.com &,shell脚本如下:
test.sh
#!/bin/bash
ping www.baidu.com (&) #加不加&都没区别
echo $$
python(父进程)用subprocess.Popen新建一个进程(子进程)去开启一个shell, shell新开一个子进程(孙进程)去执行ping www.baidu.com的命令。由于孙进程ping www.baidu.com一直在执行,就类似于一个daemon程序,一直在运行。在超时时间后,父进程杀掉了shell子进程,但是父进程阻塞在了p.communicate函数了,是阻塞在了调用wait()函数之前,感兴趣的朋友可以看一下源码_communicate函数,linux系统重点看_communicate_with_poll和_communicate_with_select函数,你会发现是阻塞在了while循环里面,因为父进程一直在获取输出,而孙进程一直像一个daemon程序一样,一直在往子进程的输出写东西,而子进程的文件句柄继承自父进程。虽然shell子进程被杀掉了,但是父进程里面的逻辑并没有因为子进程被意外的干掉而终止,(因为孙进程一直有输出到子进程的stdout,导致子进程的stdout一直有输出,也就是父进程的stdout也有输出),所以while循环一直成立,就导致了阻塞,进而导致wait()没有被调用,所以子进程没有被回收,就成了僵尸进程。
要完美的解决这个问题就是即要能获取到subprocess.Popen的进程的输出,在超时又要能杀掉子进程,让主进程不被阻塞。
一开始比较急,也对subprocess.Popen没有深入的去用过,尝试了一个low B的办法,就是不用subprocess.Popen.communicate()去获取输出,而是直接去读文件,然后超时后不去读文件。代码如下:
run_cmd.py第一个改版
#!/usr/bin/python
# -*- coding: utf-8 -*-
import subprocess
from threading import Timer
import os
class test(object):
def __init__(self):
self.stdout = []
self.stderr = []
self.timeout = 10
self.is_timeout = False
pass
def timeout_callback(self, p):
self.is_timeout = True
print "time out"
def run(self):
cmd = ['bash', '/home/zhangxin/work/baofabu/while_test.sh']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
my_timer = Timer(self.timeout, self.timeout_callback, [p])
my_timer.start()
try:
print "start to count timeout; timeout set to be %d \n" % (self.timeout,)
for line in iter(p.stdout.readline, b''):
print line
if self.is_timeout:
break
for line in iter(p.stderr.readline, b''):
print line
if self.is_timeout:
break
finally:
my_timer.cancel()
p.stdout.close()
p.stderr.close()
p.kill()
p.wait()
这样虽然能获取输出,在超时后也不再阻塞,写完过后返回来再看时发现,其实在最开始的那一版代码中,只要在超时的回调函数中加上p.stdout.close()和p.stderr.clode(), p.communicate就不再阻塞了,其实问题也就解决了。 但是还会存在一个潜在的问题,父进程结束了,没有其他进程去读取PIPE,daemon孙进程一直往PIPE写,最后导致PIPE填满,孙进程也被阻塞。
所以这样处理其实没任何意义,因为孙进程没有被终止掉,只是简单的关闭了管道。 所以在假期,我仔细的在网上找了找,看了看subprocess,发现subprocess.Popen有一个参数preexec_fn,调用subprocess.Popen时传递preexec_fn=os.setsid或者preexec_fn=os.setpgrp,然后在超时的时候执行os.killpg(p.pid, signal.SIGKILL)就可以杀掉子进程以及在同一个会话的所有进程。所以将run函数的subprocess.Popen改为
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)
同时将timeout_callback函数改成如下就可以了:
def timeout_callback(self, p):
self.is_timeout = True
print 'exe time out call back'
print p.pid
try:
os.killpg(p.pid, signal.SIGKILL)
except Exception as error:
print error
关于preexec_fn=os.setsid的作用,以下摘自https://blog.tonyseek.com/post/kill-the-descendants-of-subprocess/
运行程序 fork 以后,产生的子进程都享有独立的进程空间和 pid,也就是它超出了我们触碰的范围。好在 subprocess.Popen 有个 preexec_fn 参数,它接受一个回调函数,并在 fork 之后 exec 之前的间隙中执行它。我们可以利用这个特性对被运行的子进程做出一些修改,比如执行 setsid() 成立一个独立的进程组。
Linux 的进程组是一个进程的集合,任何进程用系统调用 setsid 可以创建一个新的进程组,并让自己成为首领进程。首领进程的子子孙孙只要没有再调用 setsid 成立自己的独立进程组,那么它都将成为这个进程组的成员。 之后进程组内只要还有一个存活的进程,那么这个进程组就还是存在的,即使首领进程已经死亡也不例外。 而这个存在的意义在于,我们只要知道了首领进程的 pid (同时也是进程组的 pgid), 那么可以给整个进程组发送 signal,组内的所有进程都会收到。
因此利用这个特性,就可以通过 preexec_fn 参数让 Popen 成立自己的进程组, 然后再向进程组发送 SIGTERM 或 SIGKILL,中止 subprocess.Popen 所启动进程的子子孙孙。当然,前提是这些子子孙孙中没有进程再调用 setsid 分裂自立门户。
至于setsid和setpgrp有什么区别,看了各自的man page,还不是很明白,如果有大兄弟知道,并且不吝留言分享告知,感激涕零!
subprocess.Popen只能运行命令或者脚本,而不能像threading的thread库一样运行函数,那么如何在一个只有.py文件的情况下像thread一样运行subprocess.Popen呢? 在调用subprocess.Popen的py我们可以把要执行的脚本内容写到一个临时文件,也即是类似于thread的target函数,然后用subprocess.Popen执行这个临时脚本,这样就可以不用预先存在多个脚本。。如下面的例子:
import os
import signal
import subprocess
import tempfile
import time
import sys
def show_setting_prgrp():
print('Calling os.setpgrp() from {}'.format(os.getpid()))
os.setpgrp()
print('Process group is now {}'.format(
os.getpid(), os.getpgrp()))
sys.stdout.flush()
# 这次的重点关注是这里
script = '''#!/bin/sh
echo "Shell script in process $$"
set -x
python3 signal_child.py
'''
script_file = tempfile.NamedTemporaryFile('wt')
script_file.write(script)
script_file.flush()
proc = subprocess.Popen(
['sh', script_file.name],
preexec_fn=show_setting_prgrp,
)
print('PARENT : Pausing before signaling {}...'.format(
proc.pid))
sys.stdout.flush()
time.sleep(1)
print('PARENT : Signaling process group {}'.format(
proc.pid))
sys.stdout.flush()
os.killpg(proc.pid, signal.SIGUSR1)
time.sleep(3)
当然也可以在shell脚本里面用exec来运行命令,那么就只有父进程和子进程,没有孙进程的概念了。
其实关于阻塞问题,也可以将subprocess.Popen的输出重定向到文件。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import subprocess
from threading import Timer
import os
import time
import signal
class test(object):
def __init__(self):
self.stdout = []
self.stderr = []
self.timeout = 6
self.is_timeout = False
pass
def timeout_callback(self, p):
print 'exe time out call back'
try:
p.kill()
# os.killpg(p.pid, signal.SIGKILL)
except Exception as error:
print error
def run(self):
stdout = open('/tmp/subprocess_stdout', 'wb')
stderr = open('/tmp/subprocess_stderr', 'wb')
cmd = ['bash', '/home/xxx/while_test.sh']
p = subprocess.Popen(cmd, stdout=stdout.fileno(), stderr=stderr.fileno())
my_timer = Timer(self.timeout, self.timeout_callback, [p])
my_timer.start()
print p.pid
try:
print "start to count timeout; timeout set to be %d \n" % (self.timeout,)
p.wait()
finally:
my_timer.cancel()
stdout.flush()
stderr.flush()
stdout.close()
stderr.close()
写在最后,关于p = subprocess.Popen,最好用p.communicate.而不是直接用p.wait(), 因为p.wait()有可能因为子进程往PIPE写的时候写满了,但是子进程还没有结束,导致子进程阻塞,而父进程一直在wait(),导致父进程阻塞。而且p.wait()和p.communicate不能一起用,因为p.communicate里面也会去调用wait()。
在linux平台下,p.wait()其实最后调用的是os.waitpid(), 我们自己用的时候,也尽量用waitpid,而不是wait(),因为多次调用waitpid去wait同一个进程不会导致阻塞,但是程序中多次调用wait就很有可能会被阻塞,详见wait函数的作用。
其实阻塞的根本原因还是因为PIPE满了,所以用PIPE的时候,最好和select或者poll模型一起使用,防止读、写阻塞。 PIPE管道是系统调用,os.pipe产生的一个文件,只不过他有两个fd,一个用于读,一个用于写,当读写端都被关闭后,内核会自动回收。你可以理解内核在内存中开辟了一个队列,一端读,一端写。
管道在进程间通信(IPC)使用很广泛,shell命令就使用的很广泛。比如:
ps –aux | grep mysqld
上述命令表示获取mysqld进程相关的信息。这里ps和grep两个命令通信就采用了管道。管道有几个特点:
管道是半双工的,数据只能单向流动,ps命令的输出是grep的输出
只能用于父子进程或兄弟进程通信,这里可以认为ps和grep命令都是shell(bash/pdksh/ash/dash)命令的子进程,两者是兄弟关系。
管道相对于管道两端的进程而言就是一个文件,并且只存在于内存中。
-
到这里大家可能会有一个疑问,管道两端的进程,写入进程不断的写,读取进程不断的读,那么什么时候结束呢?比如我们刚刚这个命令很快就结束了,它的原理是怎么样的呢?对于管道,这里有两个基本原则:写入端不断往管道写,并且每次写到管道末尾;读取端则不断从管道读,每次从头部读取。
1.当读一个写端已经关闭的管道时,在所有数据被读取后,read返回0,以指示达到文件结束处。
2.当写一个读端已经关闭的管道时,会产生sigpipe信息。
结合这个例子,当ps写管道结束后,就会自动关闭,此时grep进程read就会返回0,然后自动结束。
具体pipe可以参见http://man7.org/linux/man-pages/man7/pipe.7.html
最近有发现了一个有趣的shell命令timeout,结合python 2.7的subprocess.Popen(python3的subprocess.Popen自带timeout参数),可以做到超时后终止进程。
cmd = ['timeout', 'bash', 'xxxxx']
subprocess.Popen(cmd)
参考来源:
https://pymotw.com/3/subprocess/#process-groups-sessions
http://www.cnblogs.com/cchust/p/3899832.html