python网络编程二
概念
同步、异步与阻塞、非阻塞的区别
在函数或方法调用的过程中,涉及到同步、异步和阻塞、非阻塞的概念,它们之间存在以下区别:
同步调用:
- 调用者发起函数或方法调用,并直接等待结果返回。
- 调用者会阻塞(等待)直到得到最终结果。
异步调用:
- 调用者发起函数或方法调用后,不直接等待结果返回。
- 调用者可以继续执行其他操作,而不必等待结果返回。
- 结果通常在将来的某个时间点通过回调函数或其他方式获取。
阻塞调用:
- 函数或方法调用发起后,调用者会立即等待结果返回。
- 如果调用过程中需要等待某些操作(如网络传输、文件读写等),则会阻塞当前线程,直到操作完成。
非阻塞调用:
- 函数或方法调用发起后,调用者不会立即等待结果返回。
- 调用者可以继续执行其他操作,而不必等待调用完成。
- 如果调用过程中需要等待某些操作,调用者可以轮询或使用回调等方式来检查操作的完成情况,而不会阻塞当前线程。
区别
同步、异步与阻塞、非阻塞虽然相关,但它们的概念并不完全相同:
同步、异步:
- 强调的是是否得到最终结果。
- 同步:调用者等待直到得到最终结果。
- 异步:调用者不直接得到最终结果,而是通过其他方式(如回调函数)获取结果。
阻塞、非阻塞:
- 强调的是在等待过程中是否可以执行其他操作。
- 阻塞:调用者在等待结果返回的过程中无法执行其他操作,会一直等待。
- 非阻塞:调用者在等待过程中可以执行其他操作,不会一直等待。
联系
- 同步阻塞:等待过程中无法执行其他操作,需要一直等待最终结果。
- 同步非阻塞:等待过程中可以执行其他操作,但会反复检查最终结果是否就绪。
- 异步非阻塞:不直接等待最终结果,而是通过回调等方式获取结果,等待过程中可以执行其他操作。
同步IO
IO过程分两阶段:
- 数据准备阶段。从设备读取数据到内核空间的缓冲区(淘米,把米放饭锅里煮饭)
- 内核空间复制回用户空间进程缓冲区阶段(盛饭,从内核这个饭锅里面把饭装到碗里来)
系统调用一般是 read 函数、recv 函数等。
IO模型
同步IO
同步IO模型包括阻塞IO、非阻塞IO、IO多路复用。
阻塞IO
可以看到,用户进程在进行recv系统调用后会被阻塞,直到数据已经被复制到了用户空间。几乎所有的程序员第一次接触到的网络编程都是从listen()
、send()
、recv()
等接口开始的,这些接口都是阻塞型的。
如果用单进程/单线程去处理所有请求的话,则线程/进程很有可能被阻塞在了对某个网络连接的IO操作中,从而来不及处理另外的网络连接。因此后端业务系统往往在每个网络连接创建时给该连接创建一个单独的线程。在小并发量的情况下,这种一个连接对应一个线程的模式,确实可以很好的处理网络IO操作。然而,由于每个线程都会占用一部分内存,一旦系统并发量变大,一个连接对应一个线程的模式可能会导致系统内存不足;另外,大并发量情况下,一个连接对应一个线程的模式会频繁的创建线程和频繁的线程调度,都会严重的拖累系统性能,从而导致系统不可用。
总的来看,阻塞式IO只适用于小并发量的系统,不适合海量并发的场景。
非阻塞式IO
非阻塞式IO和阻塞式IO的不同,主要体现在数据等待阶段:非阻塞式IO中,用户进程进行了读操作的系统调用后,如果内核中的数据没有准备好的话,系统不会将用户进程阻塞,而是直接返回一个表示数据未准备好的状态码。从用户进程来看,进程发出一个读请求后并不需要等待 ,而是立马得到一个结果。用户进程通过该结果即可知道数据并没有准备好,从而后续会再次发起读请求。一旦系统内核中的数据准备好,用户进程再次发起读请求时,系统会将用户进程阻塞,直到数据从内核复制到用户空间完毕。
我们不难看出,在非阻塞式IO模型情况下,为了读取到数据,用户进程需要主动地不停的发起读请求系统调用,从而一直处于忙等待的状态。
IO多路复用
从IO多路复用的流程中可知,与阻塞式IO类似,用户进程会阻塞在数据准备和数据复制两个环节。但与阻塞式IO不同,在IO多路复用中,一个用户进程通过select
、poll
、epoll
系统调用,可以监视多个网络连接,一旦某个或某几个连接可读(数据已经到达系统内核)时,select
、poll
、epoll
系统调用会立即返回。用户可以通过select
、poll
、epoll
系统调用的参数获取到可读、可写的连接的文件描述符。通过连接的文件描述符,即从连接读取数据或者写入数据。
import socket
import selectors
server = socket.socket()
server.bind(('127.0.0.1',9999))
server.listen()
server.setblocking(False) # 设置为非阻塞
selector = selectors.DefaultSelector() # IO 监控的只有读写事件
key = selector.register(server, selectors.EVENT_READ,12334) # 注册事件, select 请帮我关注server的读事第一阶段就绪
print(key)
while True:
events = selector.select() # 阻塞,直到有事件发生
print(events)
for key,mask in events:
print(key,mask)
if key.data == 12334:
newsock, raddr = key.fileobj.accept() # server.accept() # 第二阶段就绪
newsock.setblocking(False) # 设置为非阻塞
selector.register(newsock, selectors.EVENT_READ,2233)
if key.data == 2233:
data = key.fileobj.recv(1024)
print(data,'++++++')
信号驱动IO
信号驱动式IO就是指进程预先向内核注册一个信号处理函数,然后用户进程返回不阻塞。当内核数据就绪时会发送一个信号给进程,用户进程便在信号处理函数中调用IO读取数据。图中可以看出,数据从内核拷贝到用户进程的过程中,用户进程还是会阻塞的。
异步IO
在异步IO中,用户进程发起read操作之后,立刻就可以开始去做其它的事。而从kernel的角度,当它收到一个read请求后,会立刻返回,不会对用户进程产生任何block。同时,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
下图是基于异步IO构建的Proactor模式的应用示意图。在Proactor中,有一个Initiator模块负责初始化系统,一个异步操作处理器(往往由系统内核提供)负责执行异步操作(如读、写)。异步操作完成后,异步操作处理器将完成事件或者失败事件放入完成事件队列。Proactor模块通过监视器监听完成事件队列并获取到事件,从而调用Completion Handler进行后续的处理。
开发中的选择:
- 完全跨平台,使用 select、poll。但是性能较差。
针对不同操作系统自行选择支持的技术,这样做会提高 IO 处理的性能。假设当前进程监控的很多 IO 的文件描述符为 fds。
select:
- 使用读、写两个位图标记 fd 对应的读写是否就绪,这个位图限制为 1024。每一次都需要遍历 fds,效率低。
poll:
- 使用数组保存结构体,没有了最大限制。依然遍历 fds 查看谁就绪了,效率低。
epoll:
- 内核空间与用户空间共享一段内存,减少数据的复制。事件驱动,每次只返回就绪的 fds。
Selector
这是基类,提供了多个方法用于注册、注销和等待I/O事件的通知。
register(fileobj, events, data=None)
: 注册一个文件对象以监视特定的I/O事件。unregister(fileobj)
: 注销一个已注册的文件对象。modify(fileobj, events, data=None)
: 修改已注册的文件对象的监视事件。select(timeout=None)
: 阻塞等待I/O事件发生,并返回发生事件的文件对象列表。close()
: 关闭选择器。
DefaultSelector
selectors.Selector
的默认实现,根据系统的特性选择最佳的选择器类型。
EpollSelector
基于Linux的 epoll
机制的选择器实现,通常在支持 epoll
的系统上提供最佳的性能。
PollSelector
基于 select.poll
的选择器实现,通常用于不支持 epoll
的系统。
SelectSelector
基于 select.select
的选择器实现,它是最通用的但性能最差的实现,适用于几乎所有的系统。
- 使用IO多路复用再次实现多人会话
import socket
import selectors
import threading
import logging
import termcolor
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
class ChatServer:
def __init__(self, ip='127.0.0.1',port=9999):
self.addr = ip, port
self.server = socket.socket()
self.server.setblocking(False)
self.selector = selectors.DefaultSelector() # 创建一个selector对象,选最好的IO多路复用模型
self.event = threading.Event()
def start(self):
self.server.bind(self.addr)
self.server.listen()
key = self.selector.register(self.server, selectors.EVENT_READ, self.accept)
threading.Thread(target=self.select, name='select').start()
def select(self):
while self.selector:
while not self.event.is_set():
events = self.selector.select(0.5) #
for key, mask in events:
key.data(key.fileobj, mask)
def accept(self, server:socket.socket, mask):
conn, raddr = server.accept()
conn.setblocking(False)
key = self.selector.register(conn, selectors.EVENT_READ, self.recv)
def recv(self,conn:socket.socket, mask):
data = conn.recv(1024).strip()
if data == b'' or data == b'quit':
self.selector.unregister(conn)
conn.close()
return
msg = "from {}: {}. data = {}".format(*conn.getpeername(), data)
logging.info(termcolor.colored(msg, 'blue'))
for key in self.selector.get_map().values():
if key.data == self.recv: # 为什么 is 不行,而==可以收到消息? , 因为key.data是一个函数,而recv是一个方法,而方法和函数的区别在于方法有一个self参数,而方法方法是绑定到对象上的,所以key.data == self.recv是False,而key.data == self.recv是True
key.fileobj.send(msg.encode())
def stop(self):
self.event.set()
# self.selector.close()
if __name__ == '__main__':
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
break
print(*cs.selector.get_map().items())
- 例子:实现一个简单的webserver服务器
import socket
import selectors
import threading
import logging
import termcolor
import webob
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
test_html_contet ="""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WWWServer</title>
</head>
<body>
<h1>hello blog</h1>
</body>
</html>
"""
class WWWServer:
def __init__(self, ip='0.0.0.0', port=80):
self.addr = ip, port
self.server = socket.socket()
self.server.setblocking(False)
self.selector = selectors.DefaultSelector()
self.event = threading.Event()
def start(self):
self.server.bind(self.addr)
self.server.listen()
self.selector.register(self.server, selectors.EVENT_READ, self.accept)
threading.Thread(target=self.select, name='select').start()
def select(self):
with self.selector:
while not self.event.is_set():
events = self.selector.select(0.5)
for key, mask in events:
key.data(key.fileobj, mask)
def accept(self, server:socket.socket, mask):
conn, raddr = server.accept()
conn.setblocking(False)
self.selector.register(conn, selectors.EVENT_READ, self.recv)
def recv(self, conn:socket.socket, mask):
try:
data = conn.recv(1024)
requset = webob.Request.from_bytes(data) # 用webob模块解析请求,解析为字典并把字典转换为Request对象
print(requset.url) # 可以根据不同的url返回不同的内容结果,URL映射
print(requset.method)
print(requset.user_agent)
print(requset.headers)
response = webob.Response(test_html_contet)
response.headers.add('Server', 'blogserver')
response.headers.add('YourURL', requset.url)
firstline = "HTTP/1.1 {}".format(response.status)
print(response.headers)
print(response.headerlist)
logging.info(termcolor.colored(firstline, 'green'))
header = "\r\n".join(
[firstline] +
["{}: {}".format(k,v) for k,v in response.headerlist] +
["", ""]
).encode()
conn.send(header + response.body)
finally:
self.selector.unregister(conn)
conn.close()
def stop(self):
self.event.set()
if __name__ == '__main__':
cs = WWWServer()
cs.start()