Python并行计算

笔记python并行

多线程、多进程、协程

1.多进程

每一个逻辑控制流都是一个进程,由内核来调度和维护。有独立的虚拟地址空间,数据是分开的,共享复杂,需要用IPC进行通信。占用内存较多,可靠性高

2.多线程

运行在一个单一进程上下文的逻辑流,由内核调度,多线程共享进程数据,共享简单,占用内存低,可靠性较低

3.协程

一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,先保存再恢复。
直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。
1
2
3
    loop = asyncio.get_event_loop() 事件
    tasks = [loop.create_task(func(*arg)) for arg in ____] 任务
    loop.run_until_complete(asyncio.wait(tasks)) 执行
通过遍历tasks 后可获得返回值 task.result() asyncio.gather()方法也可以 * 自动将函数列表封装成了协程任务
done 为已完成的协程任务列表,pending 为超时未完成的协程任务类别,需通过task.result()方法可以获取每个协程任务返回的结果;
而asyncio.gather 返回的是所有已完成协程任务的 result,不需要再进行调用或其他操作,就可以得到全部结果。
在python3.7之后 利用asyncio.run可以代替上面3步

python中的多线程

1.GIL

GIL(全局解释锁)不是python的特性和Lock不是一个层面的概念,在实现Cpython引入的一个概念。具备一定的优势:
1.在单线程任务中更快 
2.在多线程任务中,对于I/O密集型程序运行更快。 
一定的缺陷:1. 对于CPU密集型任务,需要进行用C语言包来实现。2、使python的多线程并不是真正意义的多线程
补充:
计算密集型:要进行大量的数值计算,例如进行上亿的数字计算、计算圆周率、对视频进行高清解码等等。这种计算密集型任务虽然也可以用多任务完成,但是花费的主要时间在任务切换的时间,此时CPU执行任务的效率比较低。
IO密集型:涉及到网络请求(time.sleep())、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。

2.GIL的运行过程

  1. 线程拿到GIL
  2. 执行逻辑代码,达到check_interval*
  3. 释放当前线程的GIL
  4. 所有线程竞争GIL
  5. 竞争到的线程又从第一步开始执行 补充:对于GIL的释放情景。 1.当前线程执行超时,自动释放。 2.当前线程执行阻塞操作时会自动释放。 3.执行完成时释放

3.GIL存在的问题

    看似并行,实则不然,根据上述的运行过程可以看出,虽然是多线程但是线程之间的运行仍然是具有先后顺序的,并不是同时进行的。(一个进程中有一把全局解释锁,锁住线程)
    可以利用多进程 + 协程来完成相应业务场景

python中的多进程

1. multiprocessing

    multiprocessing 模块在python中的多进程模块 
    1.多进程使用独立的内存空间相比于线程,代码更加直观    
    2.能够使用多个CPU/多核   
    3.避免GIL   
    4.子进程可以被kill(和thread不同)
    5.multiprocessing有和threading.Thread类似的接口
        
    基本模块的使用主要为Process类和Pool进程线程的创建和使用
    close()    关闭pool,使其不在接受新的任务。
    terminate() 结束工作进程,不在处理未完成的任务。
    join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
        
    Pool可调用的方法:
    apply_async()异步
    map_async生成子进程时使用的是list,
    imap和imap_unordered则是Iterable,
    在速度上,map_async效率略高,而imap和 imap_unordered内存消耗显著的小。
    在处理结果上,imap 和 imap_unordered 可以尽快返回一个Iterable的结果,而map_async则需要等待全部Task执行完毕,返回list。
    而imap和imap_unordered 的区别是:imap 和 map_async一样,都按顺序等待Task的执行结果,而imap_unordered则不必。 
    imap_unordered返回的Iterable,会优先迭代到先执行完成的Task。 

    在对于多进程的实现当中还有两个模块,pp模块和pathos模块
    在实现中主要关注异步并行;
    异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成。
    包括 pp.Server() submit方法、concurrent.futures.ProcessPoolExecutor() map方法。pathos.multiprocessing.ProcessPool() imap方法等
    对于不同的参数也有相对应的方法
    1.单任务,多参数 基本包的Process() future中的进程池submit()、multiprocessing.Pool(),apply_async方法、pp.Server(),submit方法
    2.多任务,单参数 multiprocessing.Pool(),map_async,imap,imap_unordered方法
    3.多任务,多参数 concurrent.futures.ProcessPoolExecutor() map方法、pathos.multiprocessing.ProcessPool(),map方法等
    对于返回值也有一定分类
    1.返回list
    2.返回封装好的MapResult和ApplyResult,通过get()方法获取值
    3.返回迭代器,通过循环遍历获取值(按顺序不按顺序) 内存占用较小

2.多进程和多线程基本模块代码比较

在对于池化技术上,推荐使用进程池 进程池中包含两种方法,map()和 submit()
map内部调用了submit,通过对submit封装使用起来更方便。map提交任务返回的是返回值的迭代器,submit提交任务返回的是Future类的实例。
需要提交任务并获得返回值时使用map。在需要提交任务并使用回调等功能的使用submit
使用map时,对于map传参的处理上。对于非迭代参数,需要使用partial偏函数进行处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
 from multiprocessing import Pool
 from functools import partial
 import numpy as np

def adjust(mean, number):
  if number > mean:
      return 1
  else:
      return 0
if __name__ == "__main__":
  num_list = [12, 45, 67, 88, 99, 62]
  num_mean = np.mean(num_list)
  pool = Pool(4)
  pfunc = partial(adjust, num_mean)
  res_list = pool.map(pfunc, num_list)

3.多进程之间的通信

1.对于进程之间的通信,是没有线程之间简单的,在进程之间的通信中,有常用的两种Queue和pipe通信,对于进程池的使用上,运用队列通信的时候,我们要首先进行队列的创建

(1)队列
        queue = multiprocessing.Manager().Queue()
        pool.apply_async(func1,args=(queue,))
        pool.apply_async(func2, args=(queue,))
        完成进程之间的数据通信,通过队列中的基本方法,get()和 put()

(2)pipe
        Pipe返回接收端和发送端,pipe[0]是接收,pipe[1]是发送
        duplex=False表示半双工,发送端只能发送,接收端只能接受
        duplex=True表示全双工,两端既能发送又能接收
        parent_conn, child_conn = multiprocessing.Pipe() 创建管道 
        利用send()和 recv()方法进行传递和接收
Licensed under CC BY-NC-SA 4.0