Python 中的并发编程

尽管 Python 当中有着 GIL(Global Interpreter Lock)的限制,在同一时间当中一个进程只能由 GIL 解释单个线程的代码,但是合理地进行并发编程,仍然可以解决不少的性能瓶颈问题,大大提高效率。尤其是在看完 Fluent Python 之后,对于 Python 的并发编程有了更加深厚的了解。 以下介绍几种可用的并发编程方式,以及它们相关的概念和使用的基本方法。

以下只考虑 Python3 的状况。

Threading / Multiprocessing

最简单的并发编程形式,莫过于直接开启新的线程或者进程来执行代码。

  • threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 通过调用该方法可以直接创建线程对象,其中 target 为需要执行的目标函数,args 和 kwargs 为该目标函数的参数,创建后,通过
    • 调用 start 方法以开启线程,进行调度执行 target 函数
    • 线程开启调度后,调用join(timeout=None)方法可以阻塞调用线程,等待线程结束或者超时
    • 使用 threading 进行多线程编程实现并发的话,仍然受到 GIL 的限制,但仍可以避免因等待 IO 完成的阻塞
  • multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 类似于线程的创建,创建进程实现多线程可以避免 GIL 的限制从而利用到 CPU 的多个核心,在 CPU 密集的代码中非常有用。
    • 类似于线程的使用,通过 start 方法开启进程任务调度,调用 join 方法等待完成
    • 除此以外,可以调用进程池 Pool 进行多个任务的调度处理,通过 map 或者 apply 方法,开启多个进程处理多个任务

直接创建线程/进程这种方式简单、直接、粗暴,不过大多时候需要自己进行任务的调度,需要自己安排怎样创建进程,怎样完成任务。在之前尝试 批量下载漫画 的过程中,使用的就是 threading 方法,就需要自己进行任务的安排,进而安排线程的创建和执行。当时还不知道怎么合理地分配调度任务,就只能将任务分为若干子集,每个子集开启独立的线程来完成下载任务,但是仍然无法避免下载过程中网络访问和文件保存的 IO 阻塞。另外,线程创建之后就是独立运行的线程,除了等待结束返回之后,无法传输数据进行额外的控制。

concurrent.futures

它在 Python3.2 中被引入,提供了高层次的异步可调用对象执行的接口。我们可以通过 ThreadPoolExecutor 在线程级别上进行异步执行,或者通过 ProcessPoolExecutor 在不同的进程中实现。两者实现相同的抽象接口类 Excutor,因而具有一样的调用方法,便于进行转换。

Executor

  • submit(fn, *args, **kwargs) 提交任务进行调度,返回 Future 对象
  • map(func, *iterables, timeout=None, chunksize=1) 类似于 map 方法,对于 iter 里面的每一个对象都会异步地调用 func,当任务结束时返回各个函数返回结果的迭代器

Future

在 futures 里面具体的任务类,由 submit 方法创建。

  • result 获取任务的返回结果,若未结束则阻塞至结束或超时
  • cancel 关闭任务
  • canceled 是否关闭
  • done 是否结束

使用方式

  1. 通过 submit 提交任务创建获取任务的 future 对象,再通过 as_completed 等待 future 对象结束,获取结果。as_completed 接收 future 对象的迭代器。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
  1. 通过 map 方法批量执行任务,再获取结果。但是 map 方法返回结果的顺序与调用顺序即传入的迭代对象的顺序一样,若前面的任务耗时过长会阻塞后面执行快的任务返回。
1
2
3
with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

asyncio

asyncio 采用 event loop 的事件驱动型的异步调度执行。

coroutine

coroutine(协程)从定义上来说,指的是包含 yield/yield from 语句的函数(在 Python3.5 引入 async 和 await 之前)。在此之前,我们可以通过生成器的形式来实现,样例如下。首先可以调用函数获得生成器对象,然后调用 next 方法或 send(None) 的方法开启 coroutine,开启后,函数执行到 yield 位置,返回 yield 右侧的表达式后则挂起自己,将控制流交回给主线程。当再次调用 send 方法时,可以传输数据并激活协程,继续执行至结束或者下一个 yield 语句处。该样例则是一个计算累计输入的平均值协程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
"""
# BEGIN CORO_AVERAGER_TEST
    >>> coro_avg = averager()  # <1>
    >>> next(coro_avg)  # <2>
    >>> coro_avg.send(10)  # <3>
    10.0
    >>> coro_avg.send(30)
    20.0
    >>> coro_avg.send(5)
    15.0
# END CORO_AVERAGER_TEST
"""
def averager():
    total = 0.0
    count = 0
    average = None
    while True:  # <1>
        term = yield average  # <2>
        total += term
        count += 1
        average = total/count

当 coroutine 运行结束时,会抛出一个 StopIteration 的异常,告知协程结束。若协程函数有返回值,也会被存在 StopIteration.value 中随着异常返回。如果我们要获得返回值,必须要捕获异常。另外,在使用 coroutine 的过程中,我们还需要手动调用 next 方法来开启 coroutine。这些不方便之处,使我们通常会使用一个 delegating generator 来进行中间的调用操作。在 delegation generator 当中,通过 yield from 来调用 subgenerator,并可以直接获取返回值,且 delegating generator 可以处理 subgenerator 抛出的异常。

使用方式

asyncio 库则是通过协程的方式,引入事件循环(event loop)的方式,通过事件轮询与回调的方式进行异步编程。具体的使用方式通过书中一个具体的例子来说明。这个例子是批量从网上下载不同国家国旗的图片,通过将每一个国家国旗的下载任务封装为一个协程,进行并发编程。在协程中,对于每一个可能会发生阻塞,需要等待的操作,均使用 yield from 进行调用。当运行到此处时,程序会交出当前的控制权,异步调用并执行 coroutine,当运行完返回之后才会将控制权交回给之前的线程。

当编写完协程函数之后,通过 list comprehension 批量生成协程对象,通过 wait 方法开启新的协程,来调用开启所有的任务协程对象。在通过 loop.run_until_complete 等待所有协程完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@asyncio.coroutine  # <3>
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)  # <4>
    image = yield from resp.read()  # <5>
    return image

@asyncio.coroutine
def download_one(cc):  # <6>
    image = yield from get_flag(cc)  # <7>
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()  # <8>
    to_do = [download_one(cc) for cc in sorted(cc_list)]  # <9>
    wait_coro = asyncio.wait(to_do)  # <10>
    res, _ = loop.run_until_complete(wait_coro)  # <11>
    loop.close() # <12>

    return len(res)

PS:

  1. 可能会出现的阻塞应该全部使用异步调用。因为这里采用的是 event loop,其实还是在同一个线程,如果使用了线程阻塞的操作,依然会阻塞其余所有的操作。因而应该使用异步调用的方式,如 aiohttp 进行 http 请求,或者 asyncio.sleep 进行休眠操作,当会发生阻塞的时候,抛出控制权给回 event loop
  2. 此处调用 wait 后再调用 run_until_complete,需要等到全部协程完成时才会一并返回结果,不利于判断完成情况。可以利用 asyncio.as_completed 进行代替,可以马上返回以及完成的任务
  3. 只有通过 loop 或 async 中相关的 run 方法,任务才会被执行

update

在 Python3.5 中,引入了关键字 async 和 await,以及 coroutine 类型。可以更加清晰地使用 asyncio 进行并发编程。其中 async 等价于装饰器@asyncio.coroutine,可以将函数声明为 coroutine 对象。await 关键字则可以取代 yield from 关键字,进行异步函数的调用。看了看变化还挺大的,尽管核心的思想没变,还是需要重新看看。具体的文档 见此

后记

在学习了 Fluent Python 中的并发编程之后,突发兴趣打算使用 asyncio 和 concurreny.futures 来重写之前的漫画下载任务,来看看实际的效果提升。尽管在看书的过程中,感觉自己掌握得好像还行,对于样例代码也能一看就可以理解,而且目标任务也比较简单,应该可以比较快就实现目标任务。然而实际操作编写代码就发现并不是这么简单。首先,书中的代码有些已经落后(如 async 和 await 关键字没有引入),越看文档就越能发现新的 features,有些还与书中的样例代码有所出入。其次,现实中代码的实际运行状况远比书中的理想状况复杂。就错误处理而言,一开始并没有做错误处理,导致出错后就静静地挂起,没有提示,也不会结束。后面做了简单的错误处理后发现,异常类型真可谓各式各样。最后,就是书中不能对全部状况状况作出全面的介绍,存在着许多未知的状况。发现很多错误也还好,最怕的是发现不了错误,或者知道有问题但是找不出来,还没有信息。

coding 还是需要多动手实践,从具体的代码编写入手,这样才能真正提高所谓的 coding 能力,否则只看书的话,很容易就会陷入一种我都会了的感觉,但这其实只是我都知道的状态,并不代表我都会用,我都能用,我都可以用好。看书更多的是留下一种对某方面知识或技能的印象,当需要的时候能够根据这些印象进行快速的索引和学习,重新获取知识。两者都很重要,缺一不可。