笔记python并行
多线程、多进程、协程
1.多进程
每一个逻辑控制流都是一个进程,由内核来调度和维护。有独立的虚拟地址空间,数据是分开的,共享复杂,需要用IPC进行通信。占用内存较多,可靠性高
2.多线程
运行在一个单一进程上下文的逻辑流,由内核调度,多线程共享进程数据,共享简单,占用内存低,可靠性较低
3.协程
一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,先保存再恢复。
直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。
1
2
3
|
loop = asyncio.get_event_loop() 事件
tasks = [loop.create_task(func(*arg)) for arg in ____] 任务
loop.run_until_complete(asyncio.wait(tasks)) 执行
|
通过遍历tasks 后可获得返回值 task.result() asyncio.gather()方法也可以 * 自动将函数列表封装成了协程任务
done 为已完成的协程任务列表,pending 为超时未完成的协程任务类别,需通过task.result()方法可以获取每个协程任务返回的结果;
而asyncio.gather 返回的是所有已完成协程任务的 result,不需要再进行调用或其他操作,就可以得到全部结果。
在python3.7之后 利用asyncio.run可以代替上面3步
python中的多线程
1.GIL
GIL(全局解释锁)不是python的特性和Lock不是一个层面的概念,在实现Cpython引入的一个概念。具备一定的优势:
1.在单线程任务中更快
2.在多线程任务中,对于I/O密集型程序运行更快。
一定的缺陷:1. 对于CPU密集型任务,需要进行用C语言包来实现。2、使python的多线程并不是真正意义的多线程
补充:
计算密集型:要进行大量的数值计算,例如进行上亿的数字计算、计算圆周率、对视频进行高清解码等等。这种计算密集型任务虽然也可以用多任务完成,但是花费的主要时间在任务切换的时间,此时CPU执行任务的效率比较低。
IO密集型:涉及到网络请求(time.sleep())、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。
2.GIL的运行过程
- 线程拿到GIL
- 执行逻辑代码,达到check_interval*
- 释放当前线程的GIL
- 所有线程竞争GIL
- 竞争到的线程又从第一步开始执行
补充:对于GIL的释放情景。 1.当前线程执行超时,自动释放。 2.当前线程执行阻塞操作时会自动释放。 3.执行完成时释放
3.GIL存在的问题
看似并行,实则不然,根据上述的运行过程可以看出,虽然是多线程但是线程之间的运行仍然是具有先后顺序的,并不是同时进行的。(一个进程中有一把全局解释锁,锁住线程)
可以利用多进程 + 协程来完成相应业务场景
python中的多进程
1. multiprocessing
multiprocessing 模块在python中的多进程模块
1.多进程使用独立的内存空间相比于线程,代码更加直观
2.能够使用多个CPU/多核
3.避免GIL
4.子进程可以被kill(和thread不同)
5.multiprocessing有和threading.Thread类似的接口
基本模块的使用主要为Process类和Pool进程线程的创建和使用
close() 关闭pool,使其不在接受新的任务。
terminate() 结束工作进程,不在处理未完成的任务。
join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
Pool可调用的方法:
apply_async()异步
map_async生成子进程时使用的是list,
imap和imap_unordered则是Iterable,
在速度上,map_async效率略高,而imap和 imap_unordered内存消耗显著的小。
在处理结果上,imap 和 imap_unordered 可以尽快返回一个Iterable的结果,而map_async则需要等待全部Task执行完毕,返回list。
而imap和imap_unordered 的区别是:imap 和 map_async一样,都按顺序等待Task的执行结果,而imap_unordered则不必。
imap_unordered返回的Iterable,会优先迭代到先执行完成的Task。
在对于多进程的实现当中还有两个模块,pp模块和pathos模块
在实现中主要关注异步并行;
异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成。
包括 pp.Server() submit方法、concurrent.futures.ProcessPoolExecutor() map方法。pathos.multiprocessing.ProcessPool() imap方法等
对于不同的参数也有相对应的方法
1.单任务,多参数 基本包的Process() future中的进程池submit()、multiprocessing.Pool(),apply_async方法、pp.Server(),submit方法
2.多任务,单参数 multiprocessing.Pool(),map_async,imap,imap_unordered方法
3.多任务,多参数 concurrent.futures.ProcessPoolExecutor() map方法、pathos.multiprocessing.ProcessPool(),map方法等
对于返回值也有一定分类
1.返回list
2.返回封装好的MapResult和ApplyResult,通过get()方法获取值
3.返回迭代器,通过循环遍历获取值(按顺序不按顺序) 内存占用较小
2.多进程和多线程基本模块代码比较
在对于池化技术上,推荐使用进程池 进程池中包含两种方法,map()和 submit()
map内部调用了submit,通过对submit封装使用起来更方便。map提交任务返回的是返回值的迭代器,submit提交任务返回的是Future类的实例。
需要提交任务并获得返回值时使用map。在需要提交任务并使用回调等功能的使用submit
使用map时,对于map传参的处理上。对于非迭代参数,需要使用partial偏函数进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from multiprocessing import Pool
from functools import partial
import numpy as np
def adjust(mean, number):
if number > mean:
return 1
else:
return 0
if __name__ == "__main__":
num_list = [12, 45, 67, 88, 99, 62]
num_mean = np.mean(num_list)
pool = Pool(4)
pfunc = partial(adjust, num_mean)
res_list = pool.map(pfunc, num_list)
|
3.多进程之间的通信
1.对于进程之间的通信,是没有线程之间简单的,在进程之间的通信中,有常用的两种Queue和pipe通信,对于进程池的使用上,运用队列通信的时候,我们要首先进行队列的创建
(1)队列
queue = multiprocessing.Manager().Queue()
pool.apply_async(func1,args=(queue,))
pool.apply_async(func2, args=(queue,))
完成进程之间的数据通信,通过队列中的基本方法,get()和 put()
(2)pipe
Pipe返回接收端和发送端,pipe[0]是接收,pipe[1]是发送
duplex=False表示半双工,发送端只能发送,接收端只能接受
duplex=True表示全双工,两端既能发送又能接收
parent_conn, child_conn = multiprocessing.Pipe() 创建管道
利用send()和 recv()方法进行传递和接收