在python3中变成了_thread
方法
error
lockType
start_new_thread
interrupt_main
中断主线程
exit
当前线程退出
allocate_lock
创建一个lock并返回
get_indent
当前线程标识
stack_size
lock
acquire
release
locked
dummy threading
try:
import threading as _threading
except ImportError:
import dummy_threading as _threading
dummy thread
在python3中重命名为_dummy_thread,
try:
import thread as _thread
except ImportError:
import dummy_thread as _thread
dummy_thread代码
# coding: utf-8
__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
'interrupt_main', 'LockType']
import traceback as _traceback
class error(Exception):
def __init__(self, *args):
self.args = args
def start_new_thread(function, args, kwargs={}):
if type(args) != type(tuple()):
raise TypeError("2nd arg must be a tuple")
if type(kwargs) != type(dict()):
raise TypeError("3rd arg must be a dict")
global _main
_main = False
try:
function(*args, **kwargs) # 调用 算是执行线程
except SystemExit:
pass
except:
_traceback.print_exc()
_main = True # 回到主线程
# 如果调用了interrupt_main
global _interrupt
if _interrupt:
_interrupt = False
raise KeyboardInterrupt # 抛出异常
def exit():
raise SystemExit
def get_ident():
return -1
def allocate_lock():
return LockType()
def stack_size(size=None):
if size is not None:
raise error("setting thread stack size not supported")
return 0
class LockType(object):
def __init__(self):
self.locked_status = False # 起始状态下的锁是未锁定状态
def acquire(self, waitflag=None):
# waitflag没什么影响
if waitflag is None or waitflag:
self.locked_status = True
return True
else:
if not self.locked_status:
self.locked_status = True
return True
else:
return False
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
if not self.locked_status:
raise error
self.locked_status = False
return True
def locked(self):
return self.locked_status
# 用于指示是否调用了interrupt_main
_interrupt = False
# 用于指示是否在主线程中运行
_main = True
def interrupt_main():
if _main:
raise KeyboardInterrupt # 主线程抛出异常直接抛
else:
global _interrupt
_interrupt = True # 线程中抛出异常则得在线程结束时调用
dummy_threading代码
# coding: utf-8
from sys import modules as sys_modules
import dummy_thread
holding_thread = False
holding_threading = False
holding__threading_local = False
try:
# 系统中有thread的情况
if 'thread' in sys_modules:
held_thread = sys_modules['thread']
holding_thread = True
# 否则设置为dummy_thread
sys_modules['thread'] = sys_modules['dummy_thread']
# 系统中有threading的情况
if 'threading' in sys_modules:
held_threading = sys_modules['threading']
holding_threading = True
del sys_modules['threading']
if '_threading_local' in sys_modules:
held__threading_local = sys_modules['_threading_local']
holding__threading_local = True
del sys_modules['_threading_local']
import threading
sys_modules['_dummy_threading'] = sys_modules['threading']
del sys_modules['threading']
sys_modules['_dummy__threading_local'] = sys_modules['_threading_local']
del sys_modules['_threading_local']
from _dummy_threading import *
from _dummy_threading import __all__
finally:
if holding_threading:
sys_modules['threading'] = held_threading
del held_threading
del holding_threading
if holding__threading_local:
sys_modules['_threading_local'] = held__threading_local
del held__threading_local
del holding__threading_local
if holding_thread:
sys_modules['thread'] = held_thread
del held_thread
else:
del sys_modules['thread']
del holding_thread
del dummy_thread
del sys_modules