0%

Python中的并发编程

尽管Python当中有着GIL(Global Interpreter Lock)的限制,在同一时间当中一个进程只能由GIL解释单个线程的代码,但是合理地进行并发编程,仍然可以解决不少的性能瓶颈问题,大大提高效率。尤其是在看完Fluent Python之后,对于Python的并发编程有了更加深厚的了解。 以下介绍几种可用的并发编程方式,以及它们相关的概念和使用的基本方法。

以下只考虑Python3的状况。

Threading / Multiprocessing

最简单的并发编程形式,莫过于直接开启新的线程或者进程来执行代码。

  • threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 通过调用该方法可以直接创建线程对象,其中target为需要执行的目标函数,args和kwargs为该目标函数的参数,创建后,通过
    • 调用start方法以开启线程,进行调度执行target函数
    • 线程开启调度后,调用join(timeout=None)方法可以阻塞调用线程,等待线程结束或者超时
    • 使用threading进行多线程编程实现并发的话,仍然受到GIL的限制,但仍可以避免因等待IO完成的阻塞
  • multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 类似于线程的创建,创建进程实现多线程可以避免GIL的限制从而利用到CPU的多个核心,在CPU密集的代码中非常有用。
    • 类似于线程的使用,通过start方法开启进程任务调度,调用join方法等待完成
    • 除此以外,可以调用进程池Pool进行多个任务的调度处理,通过map或者apply方法,开启多个进程处理多个任务

直接创建线程/进程这种方式简单、直接、粗暴,不过大多时候需要自己进行任务的调度,需要自己安排怎样创建进程,怎样完成任务。在之前尝试批量下载漫画的过程中,使用的就是threading方法,就需要自己进行任务的安排,进而安排线程的创建和执行。当时还不知道怎么合理地分配调度任务,就只能将任务分为若干子集,每个子集开启独立的线程来完成下载任务,但是仍然无法避免下载过程中网络访问和文件保存的IO阻塞。另外,线程创建之后就是独立运行的线程,除了等待结束返回之后,无法传输数据进行额外的控制。

concurrent.futures

它在Python3.2中被引入,提供了高层次的异步可调用对象执行的接口。我们可以通过ThreadPoolExecutor在线程级别上进行异步执行,或者通过ProcessPoolExecutor在不同的进程中实现。两者实现相同的抽象接口类Excutor,因而具有一样的调用方法,便于进行转换。

Executor

  • submit(fn, *args, **kwargs) 提交任务进行调度,返回Future对象
  • map(func, *iterables, timeout=None, chunksize=1) 类似于map方法,对于iter里面的每一个对象都会异步地调用func,当任务结束时返回各个函数返回结果的迭代器

Future

在futures里面具体的任务类,由submit方法创建。

  • result 获取任务的返回结果,若未结束则阻塞至结束或超时
  • cancel 关闭任务
  • canceled 是否关闭
  • done 是否结束

使用方式

  1. 通过submit提交任务创建获取任务的future对象,再通过as_completed等待future对象结束,获取结果。as_completed接收future对象的迭代器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
    url = future_to_url[future]
    try:
    data = future.result()
    except Exception as exc:
    print('%r generated an exception: %s' % (url, exc))
    else:
    print('%r page is %d bytes' % (url, len(data)))

  2. 通过map方法批量执行任务,再获取结果。但是map方法返回结果的顺序与调用顺序即传入的迭代对象的顺序一样,若前面的任务耗时过长会阻塞后面执行快的任务返回。

1
2
3
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))

asyncio

asyncio采用event loop的事件驱动型的异步调度执行。

coroutine

coroutine(协程)从定义上来说,指的是包含yield/yield from语句的函数(在Python3.5引入async和await之前)。在此之前,我们可以通过生成器的形式来实现,样例如下。首先可以调用函数获得生成器对象,然后调用next方法或send(None)的方法开启coroutine,开启后,函数执行到yield位置,返回yield右侧的表达式后则挂起自己,将控制流交回给主线程。当再次调用send方法时,可以传输数据并激活协程,继续执行至结束或者下一个yield语句处。该样例则是一个计算累计输入的平均值协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
"""
# BEGIN CORO_AVERAGER_TEST
>>> coro_avg = averager() # <1>
>>> next(coro_avg) # <2>
>>> coro_avg.send(10) # <3>
10.0
>>> coro_avg.send(30)
20.0
>>> coro_avg.send(5)
15.0
# END CORO_AVERAGER_TEST
"""
def averager():
total = 0.0
count = 0
average = None
while True: # <1>
term = yield average # <2>
total += term
count += 1
average = total/count

当coroutine运行结束时,会抛出一个StopIteration的异常,告知协程结束。若协程函数有返回值,也会被存在StopIteration.value中随着异常返回。如果我们要获得返回值,必须要捕获异常。另外,在使用coroutine的过程中,我们还需要手动调用next方法来开启coroutine。这些不方便之处,使我们通常会使用一个delegating generator来进行中间的调用操作。在delegation generator当中,通过yield from来调用subgenerator,并可以直接获取返回值,且delegating generator可以处理subgenerator抛出的异常。

使用方式

asyncio库则是通过协程的方式,引入事件循环(event loop)的方式,通过事件轮询与回调的方式进行异步编程。具体的使用方式通过书中一个具体的例子来说明。这个例子是批量从网上下载不同国家国旗的图片,通过将每一个国家国旗的下载任务封装为一个协程,进行并发编程。在协程中,对于每一个可能会发生阻塞,需要等待的操作,均使用yield from进行调用。当运行到此处时,程序会交出当前的控制权,异步调用并执行coroutine,当运行完返回之后才会将控制权交回给之前的线程。

当编写完协程函数之后,通过list comprehension批量生成协程对象,通过wait方法开启新的协程,来调用开启所有的任务协程对象。在通过loop.run_until_complete等待所有协程完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@asyncio.coroutine  # <3>
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url) # <4>
image = yield from resp.read() # <5>
return image


@asyncio.coroutine
def download_one(cc): # <6>
image = yield from get_flag(cc) # <7>
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc


def download_many(cc_list):
loop = asyncio.get_event_loop() # <8>
to_do = [download_one(cc) for cc in sorted(cc_list)] # <9>
wait_coro = asyncio.wait(to_do) # <10>
res, _ = loop.run_until_complete(wait_coro) # <11>
loop.close() # <12>

return len(res)

PS: 1. 可能会出现的阻塞应该全部使用异步调用。因为这里采用的是event loop,其实还是在同一个线程,如果使用了线程阻塞的操作,依然会阻塞其余所有的操作。因而应该使用异步调用的方式,如aiohttp进行http请求,或者asyncio.sleep进行休眠操作,当会发生阻塞的时候,抛出控制权给回event loop 2. 此处调用wait后再调用run_until_complete,需要等到全部协程完成时才会一并返回结果,不利于判断完成情况。可以利用asyncio.as_completed进行代替,可以马上返回以及完成的任务 3. 只有通过loop或async中相关的run方法,任务才会被执行

update

在Python3.5中,引入了关键字async和await,以及coroutine类型。可以更加清晰地使用asyncio进行并发编程。其中async等价于装饰器@asyncio.coroutine,可以将函数声明为coroutine对象。await关键字则可以取代yield from关键字,进行异步函数的调用。看了看变化还挺大的,尽管核心的思想没变,还是需要重新看看。具体的文档见此

后记

在学习了Fluent Python中的并发编程之后,突发兴趣打算使用asyncio和concurreny.futures来重写之前的漫画下载任务,来看看实际的效果提升。尽管在看书的过程中,感觉自己掌握得好像还行,对于样例代码也能一看就可以理解,而且目标任务也比较简单,应该可以比较快就实现目标任务。然而实际操作编写代码就发现并不是这么简单。首先,书中的代码有些已经落后(如async和await关键字没有引入),越看文档就越能发现新的features,有些还与书中的样例代码有所出入。其次,现实中代码的实际运行状况远比书中的理想状况复杂。就错误处理而言,一开始并没有做错误处理,导致出错后就静静地挂起,没有提示,也不会结束。后面做了简单的错误处理后发现,异常类型真可谓各式各样。最后,就是书中不能对全部状况状况作出全面的介绍,存在着许多未知的状况。发现很多错误也还好,最怕的是发现不了错误,或者知道有问题但是找不出来,还没有信息。

coding还是需要多动手实践,从具体的代码编写入手,这样才能真正提高所谓的coding 能力,否则只看书的话,很容易就会陷入一种我都会了的感觉,但这其实只是我都知道的状态,并不代表我都会用,我都能用,我都可以用好。看书更多的是留下一种对某方面知识或技能的印象,当需要的时候能够根据这些印象进行快速的索引和学习,重新获取知识。两者都很重要,缺一不可。

佛系求打赏