[原]Python异步IO

李余通 18/04/24 23:53:57

本文是我看了http://python.jobbole.com/88291/后加上自己的一些感想所创

我们都知道,IO比CPU慢很多个数量级,而传统的IO,是阻塞型的,CPU花在等待IO上的时间很多,那么想要提高并发量,选择解决CPU在等待IO上花费的大量时间是一个比较好的出路。
先来看一个阻塞型IO的例子。

同步阻塞下载程序

该脚本的功能是下载10个网页。

import socket
urls = ['/'+str(i) for i in range(10)]
host = 'www.baidu.com'
def fn(url):
    sock = socket.socket()
    sock.connect((host,80)) # 这里会阻塞直到发送成功
    #send不会阻塞
    sock.send(('GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (url,host)).encode('utf-8'))
    response = b''
    while True:
        chunk = sock.recv(4096) #这里会阻塞直到接收数据
        if chunk:
            response += chunk
        else:
            break
    # print(response.decode('utf-8'))
    sock.close()
if __name__ == '__main__':
    import time
    start = time.time()
    for i in urls:
        fn(i)
    print(time.time()-start)

花费的时间大概是7-10秒。
我们知道,程序会在connect和recv上阻塞,等待数据,这就是阻塞式IO的缺点,时间都花费在这里了
那我们是不是可以通过改变套接字为非阻塞来解决这个问题呢?

非阻塞式下载

import socket
urls = ['/'+str(i) for i in range(10)]
host = 'www.baidu.com'
def fn(url):
    sock = socket.socket()
    sock.setblocking(False) # 设置为非阻塞套接字
    try:
        sock.connect(('www.baidu.com',80))
    except BlockingIOError:
        # connect无法立即完成,会有异常
        pass
    while True:
    # 这个循环是为了确保connect完成了
        try:
            sock.send(('GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (url,host)).encode('utf-8'))
            break
        except IOError:
            pass
    response = b''
    while True:
        while True:
        # 这个循环确保recv收到信息了
            try:
                chunk = sock.recv(4096)
                break
            except IOError:
                pass
        if chunk:
            response += chunk
        else:
            break
    # print(response.decode('utf-8'))
    sock.close()
if __name__ == '__main__':
    import time
    start = time.time()
    for i in urls:
        fn(i)
    print(time.time()-start)

运行时间实际上和阻塞式IO时间近似,虽然IO 操作不在等待了,但是我们不知道具体何时IO操作会完成,所以还是得不停的去检测到底是不是完成了,故而效率并没有提高。

那么如果我们有办法不用不停的去检测何时IO就绪,有一个东西可以告诉我们,那是不是就可以提高效率呢?对的这个就是IO多路复用

IO多路复用的下载

# coding=utf-8
import socket
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
import time

'''
相比于select模块,官方更推荐使用selectors这个高级IO多路复用库
'''

# 自动选择最高效的IO多路复用,Linux上默认是epoll
selector = DefaultSelector()
stopped = False
urls_todo = {'/'+str(i) for i in range(10)}

class Crawler:
    def __init__(self,url):
        self.url = url
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('flycold.cn',80))
        except BlockingIOError:
            pass
        # 注册sock的可写事件(connect完成时)
        selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

    # 当connect完成时发request
    def connected(self,key,mask):
        # 取消注册
        selector.unregister(key.fd)
        get = 'GET {0} HTTP/1.0\r\nHost:flycold.cn\r\n\r\n'.format(self.url)
        self.sock.send(get.encode('utf-8'))
        # 注册可读事件(当服务器给予回应时)
        selector.register(key.fd,EVENT_READ,self.read_response)

    # 当服务器回应后触发
    def read_response(self,key,mask):
        global stopped
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            urls_todo.remove(self.url)
            self.sock.close()
            if not urls_todo:
                stopped = True
        print(chunk.decode('utf-8'))

# 注册事件后,需要获取哪个套接字触发了事件,并去处理
def loop():
    while not stopped:
        # 阻塞,直到一个事件发生
        events = selector.select()
        for event_key,event_mask in events:
            callback = event_key.data
            callback(event_key,event_mask)

if __name__ == '__main__':
    start = time.time()
    for i in urls_todo:
        x = Crawler(i)
        x.fetch()
    loop()
    print(time.time()-start)

运行时间提高了很多,在网络情况良好时,只花费了不到0.5秒。
这样虽然提高了效率,但是问题还是蛮多的:

回调层次过多时代码可读性差
破坏代码结构
共享状态管理困难
错误处理困难

所以,我们可以使用协程来优化代码。
我们知道,协程可以暂停去执行别的部分,并且不会忘记刚才的状态

协程版

我们需要几个新的对象

未来对象(Future)

异步执行的结果,放在这个对象里。不用回调,我们不知道何时IO操作完成,那么用这个未来对象,存储未来结果。

# 未来对象,result存放未来的执行结果
class Future:
    # 初始化结果和回调函数为空
    def __init__(self):
        self.result = None
        self._callbacks = []

    # 添加回调函数
    def add_done_callback(self,fn):
        self._callbacks.append(fn)

    # 设置结果并执行回调
    def set_result(self,result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

重构Crawler

# 爬虫类
class Crawler:
    def __init__(self,url):
        self.url = url
        self.sock = None
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('flycold.cn',80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connected():
            f.set_result(None)

        epoller.register(sock.fileno(),EVENT_WRITE,on_connected)
        # connect后会跳出协程
        yield f
        epoller.unregister(sock.fileno())
        get = 'GET {0} HTTP/1.0\r\nHost:flycold.cn\r\n\r\n'.format(self.url)
        sock.send(get.encode('utf-8'))

        global stopped
        while True:
            f = Future()

            # sock可读时的事件
            def on_readable():
                f.set_result(sock.recv(4096))

            epoller.register(sock.fileno(),EVENT_READ,on_readable)
            # send后跳出协程
            chunk = yield f
            epoller.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
                break

任务对象Task

我们何时回复爬虫对象继续执行了,需要有人去管理,就是Task

# 任务对象
class Task:
    # 初始化,设置爬虫任务对象,执行step
    # coro是fetch生成器
    def __init__(self,coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    # 单步执行
    def step(self,future):
        try:
            # send 会进入到coro执行,即fetch,直到下次yield
            # next_future 为yield返回的对象
            # 第一次发送None启动生成器,返回空的未来对象
            # 之后
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        # 新的未来对象添加回调
        next_future.add_done_callback(self.step)

循环

def loop():
    while not stopped:
        # 阻塞,直到一个事件发生
        events = epoller.select()
        for fd,mask in events:
            # 收到的是send事件和recv事件
            callback = fd.data
            callback()

总览

# coding=utf-8

import socket
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE

epoller = DefaultSelector()
stopped = False
urls_todo = {'/'+str(i) for i in range(10)}

# 未来对象,result存放未来的执行结果
class Future:
    # 初始化结果和回调函数为空
    def __init__(self):
        self.result = None
        self._callbacks = []

    # 添加回调函数
    def add_done_callback(self,fn):
        self._callbacks.append(fn)

    # 设置结果并执行回调
    def set_result(self,result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

# 爬虫类
class Crawler:
    def __init__(self,url):
        self.url = url
        self.sock = None
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('flycold.cn',80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connected():
            f.set_result(None)

        epoller.register(sock.fileno(),EVENT_WRITE,on_connected)
        yield f
        epoller.unregister(sock.fileno())
        get = 'GET {0} HTTP/1.0\r\nHost:flycold.cn\r\n\r\n'.format(self.url)
        sock.send(get.encode('utf-8'))

        global stopped
        while True:
            f = Future()

            # sock可读时的事件
            def on_readable():
                f.set_result(sock.recv(4096))

            epoller.register(sock.fileno(),EVENT_READ,on_readable)
            chunk = yield f
            epoller.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
                break

# 任务对象
class Task:
    # 初始化,设置爬虫任务对象,执行step
    def __init__(self,coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    # 单步执行
    def step(self,future):
        try:
            # send 会进入到coro执行,即fetch,直到下次yield
            # next_future 为yield返回的对象
            # 第一次发送None启动生成器,返回空的未来对象
            # 之后
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        # 新的未来对象添加回调
        next_future.add_done_callback(self.step)

def loop():
    while not stopped:
        # 阻塞,直到一个事件发生
        events = epoller.select()
        for fd,mask in events:
            # 收到的是send事件和recv事件
            callback = fd.data
            callback()

if __name__ == '__main__':
    import time
    start = time.time()

    # 循环创建爬虫任务
    for i in urls_todo:
        x = Crawler(i)
        # 启动爬虫协程,之后全部暂停在connect上
        Task(x.fetch())
    loop()
    print(time.time()-start)
作者:baidu_35085676 发表于 2018/04/24 23:53:57 原文链接 https://blog.csdn.net/baidu_35085676/article/details/80072544
阅读:103