socketserver 是 python 标准库中的模块,作为一个网络服务器框架(A framework for network servers),其作用是简化编写网络服务器流程,并且担当标准库中其它模块跟第三方模块的底层服务器基础。
socketserver 是 python3 中的新名字,在 python2 中名为 SocketServer,本次阅读的是 0.4 版本。
该框架主要是使用 ServerClass 跟 RequestHandlerClass 两大类。
- ServerClass 处理服务端与客户端的通讯
- RequestHandlerClass 处理数据的解析,接收和发送;主要的业务逻辑
ServerClass
-
BaseServer
抽象基类 -
TCPServer
处理流式套接字 -
UnixStreamServer
处理本地处理流式套接字,只适用UNIX平台 -
UDPServer
处理数据报套接字 -
UnixDatagramServer
处理本地处理数据报套接字,只适用UNIX平台
RequestHandlerClass
-
BaseRequestHandler
处理基类 -
StreamRequestHandler
处理流式套接字 -
DatagramRequestHandler
处理数据报套接字
通过对通讯和数据处理的解耦,可以实现不同组合的服务器用于处理不同的套接字,但这样的服务器,仅仅是同步处理请求,如果要实现异步处理请求,还需使用 MixINClass
-
ForkingMixIn
利用多进程实现异步,接收一个请求后,fork 一个进程进行响应处理 -
ThreadingMixIn
利用多线程实现异常,接收一个请求后,使用一个线程进行响应处理
关于 MixIN,Mixin 扫盲班
Server 类的继承关系
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
UDPServer 继承 TCPServer,TCPServer 继承 BaseServer,UnixStreamServer 继承自 TCPServer,UnixDatagramServer 继承自 UDPServer。
if hasattr(socket, 'AF_UNIX'):
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX
class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX
从源码看,UnixStreamServer 跟 TCPServer,UnixDatagramServer 跟 UDPServer,仅仅是改变了 address_family
。
关于 socket.AF_UNIX,主要是用于同一台机器上的进程间通信。
AF_INET域与AF_UNIX域socket通信原理对比
if hasattr(os, "fork"):
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
而多进程服务器跟多线程服务器,也仅仅是把 TCPServer、UDPServer 跟 ForkingMixIn、ThreadingMixIn 组合在一起而已。
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
RequestHandler 类的继承关系
+--------------------+ +----------------------+
| BaseRequestHandler |------->| StreamRequestHandler |
+--------------------+ +----------------------+
|
v
+------------------------+
| DatagramRequestHandler |
+------------------------+
StreamRequestHandler 跟 DatagramRequestHandler 是继承 BaseRequestHandler,它们都重写了 setup 跟 finish,本质上是用缓存对套接字提供服务。
调用流程
以 示例 来看调用流程
import socketserver
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("{} wrote:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
这里定义了一个继承自 BaseRequestHandler 的 MyTCPHandler,重写了 handle 方法,然后主机、端口一同传入 TCPServer。
class BaseServer:
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
class TCPServer(BaseServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
1.TCPServer 实例化,调用了自身 server_bind()
方法,绑定了传入的 server_address
参数,而 server_activate()
方法是重写自 BaseServer
类,主要是请求队列大小的设置。
2.启动服务 ,调用 BaseServer
类的 serve_forever()
方法,用于不断监听请求,当有请求时调用 _handle_request_noblock()
方法完成请求处理,然后调用 service_actions()
处理后的操作;这个方法在 ForkingMixIn
类会被重写,用于解决完成请求处理的进程。
def _handle_request_noblock(self):
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
_handle_request_noblock()
方法处理请求时,会调用下列方法
-
get_request()
方法在TCPServer
类中定义,调用socket.accept()
方法,返回request
参数和client_address
参数 -
verify_request(request, client_address)
方法在BaseServer
类中定义,可以在子类中重写做一些验证处理 -
process_request(request, client_address)
方法请求处理函数,用于调用finish_request()
方法,在ForkingMixIn
类和ThreadingMixIn
类中会被重写,用进程或线程发起处理 -
finish_request(request, client_address)
方法具体的请求处理过程,这里是自定义MyTCPHandler
类实例化,该类继承了BaseRequestHandler
类,遵传它的定义,当自己被实例化时,会调用handle()
方法 -
shutdown_request(request)
方法在TCPServer
类中定义,会先request.shutdown(socket.SHUT_WR)
,出错再调用close_request(request)
方法做request.close()
,而在UDPServer
类中,该函数会重写pass
掉
3.关闭服务,要结束服务,可以手动调用 BaseServer
类中的 shutdown()
方法,示例说直接用 Ctrl-C
def shutdown(self):
self.__shutdown_request = True
self.__is_shut_down.wait()
主要是
__shutdown_request
标识更换,并阻塞等待serve_forever()
线程返回
这便是示例中 TCPServer
的生命周期。
些许问题
request.shutdowm(socket.SHUT_WR)
跟 request.close()
区别
1.调用shutdown会马上关闭指定链接, 而close会等到描述符的引用计数器为0时才会开始关闭链接
2.close会同时关闭两个链接, 而shutdown值关闭指定链接
3.close后文件描述符不再可用(引用基数为0,释放资源), shutdown后文件描述符是可用的.
具体
poll、epoll 选择
if hasattr(selectors, 'PollSelector'):
_ServerSelector = selectors.PollSelector
else:
_ServerSelector = selectors.SelectSelector
## Lib/selectors.py
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
socketserver 模块中手动选择了 poll 库,而不是使用模块提供的 DefaultSelector 自适应选择。