ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

【进程间通讯与进程池】 -- 2019-08-16 20:44:08

2019-08-16 20:56:42  阅读:71  来源: 互联网

标签:__ con1 20 队列 08 queue 进程 obj



原文: http://blog.gqylpy.com/gqy/230

"

目录

一、队列

二、管道

三、进程间数据共享

四、进程池


进程间通讯:IPC(Inter-Process Communication)

一、队列:

队列:先进先出(First In First Out)简称 FIFO

栈:先进后出(First In Last Out)简称 FILO

1. multiprocessing.Queue模块

用于创建共享的进程队列,Queue是多进程安全的队列,可以实现对进程之间的数据传递,队列底层是使用管道和锁定实现的. 另外,还需要运行支持的线程以便队列中的数据传输到底层管道中.

  • 方法

obj = Queue(maxsize=-1):创建共享的进程队列,maxsize是队列中允许的最大项数,默认-1 无大小限制。

obj.get(block=None, timeout=None):返回obj队列中的一个项目(遵循FIFO),如果队列为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True,如果设置为False,无项目可用将引发_queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,若在指定的超时时间内没有项目变为可用,同样引发_queue.Empty异常。obj.get(False)等价于obj.get_nowait()方法。

obj.put(obj, block=True, timeout=None):将对象放入队列,如果队列已满将阻塞至有空间可用为止。block控制阻塞行为,默认为True,如果设置为False,无空间可用将引发queue.Full异常(定义在Queue库模块中),timeout是可选超时时间,如在指定的超时时间内没有空间变为可用,将引发queue.Full异常。

obj.qsize():返回队列中当前的项目数量,此方法不可靠:因为在返回结果和程序使用结果之间的时间段内,队列中可能会有项目增加或删除。本人实测在macOS系统上,此方法直接引发NotImplementedError异常,无法使用。

obj.empty():队列为空时返回True,否则返回False,此方法不可靠,本人实测返回的值不一定。

obj.full():如果队列已满返回True,由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。

obj.close():关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已加入队列但尚未写入的数据,待数据写入完成后立即关闭。如果队列被回收,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被get()阻塞,关闭生产者中的队列不会导致get()方法返回错误

obj.cancel_join_thread():不会在进程退出时自动连接后台线程,这可以防止join_thread()方法阻塞。

obj.join_thread():连接队列的后台线程,此方法用于在调用obj.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是obj的原始创建者的所有进程调用。调用cancel_join_thread()方法可以禁止这种行为。

  • 实例:生产者消费者模型
  1. # 生产者消费者模型1
  2. import multiprocessing
  3. import time
  4. import os
  5. # 向queue中添加项目的函数
  6. def inputQ(queue):
  7. info = str(os.getppid()) + '(put):' + str(time.asctime())
  8. queue.put(info)
  9. # 从queue中输出项目的函数
  10. def outputQ(queue):
  11. info = queue.get()
  12. print('%s%s\033[32m%s\033[0m' %(str(os.getpid()), '(get):', info))
  13. # Main
  14. if __name__ == '__main__':
  15. # multiprocessing.freeze_support() # 冻结的支持
  16. recored1, recored2 = [], [] # 用于存放子进程对象
  17. queue = multiprocessing.Queue(3) # 实例化一个Queue的对象,队列中最大3个项目
  18. # 输入进程
  19. for i in range(10):
  20. process = multiprocessing.Process(target=inputQ, args=(queue,))
  21. process.start()
  22. recored1.append(process)
  23. time.sleep(0.3) # 暂停一下,便于错开时间
  24. # 输出进程
  25. for i in range(10):
  26. process = multiprocessing.Process(target=outputQ, args=(queue,))
  27. process.start()
  28. recored2.append(process)
  29. # 阻塞父进程,等待所有子进程运行完毕
  30. [p.join() for p in recored1]
  31. [p.join() for p in recored2]
  1. # 生产者消费者模型2
  2. from multiprocessing import Queue, Process
  3. import time
  4. # 从队列中取数据
  5. def consumer(queue, name, color):
  6. while 1:
  7. time.sleep(0.1)
  8. info =queue.get() # 取数据,若队列已空则阻塞
  9. if not info:break # 对应最后两行
  10. print("%s%s 拿走了%s\033[0m" %(color, name, info))
  11. # 往队列中放数据
  12. def producer(queue, product):
  13. for i in range(10): # 模拟生产10个娃娃
  14. info = product + "版本的%s号娃娃" %(i+1)
  15. queue.put(info) # 放数据,若队列已满,便阻塞
  16. if __name__ == '__main__':
  17. queue = Queue(10) # 实例化一个Queue的对象,队列中最大10个项目
  18. # 创建3个生产者子进程
  19. p_pro1 = Process(target=producer, args=(queue,"A",))
  20. p_pro2 = Process(target=producer, args=(queue, "B",))
  21. p_pro3 = Process(target=producer, args=(queue, "C",))
  22. # 创建2个消费者子进程
  23. p_con1 = Process(target=consumer, args=(queue, 'zyk', '\033[32m'))
  24. p_con2 = Process(target=consumer, args=(queue, 'kyz', '\033[34m'))
  25. p_lst = [p_pro1, p_pro2, p_pro3, p_con1, p_con2]
  26. [p.start() for p in p_lst]
  27. [p.join() for p in p_lst[:-2]] # 阻塞父进程,等待所有生产者子进程执行完毕
  28. [queue.put(i) for i in (None, None)] # 用于终止消费者进程
  • 关于生产者消费者模型

在并发编程中使用消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

  • 为什么要使用生产者和消费者模式

在线程世界中,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,将导致生产者必须等待消费者处理完数据才能继续生产数据。反之,消费者就必须等待生产者。正是为了解决这种问题,才引入了生产者和消费者模式。

  • 什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过一个阻塞队列进行通讯,所以生产者生产完数据之后不用等待消费者处理,而是直接往入阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。


2. multiprocessing.JoinableQueue模块

创建可连接的共享进程队列,它就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理,通知进程是使用共享的信号和条件变量来实现的。相较于Queue模块使用起来更方便些.

  • 方法
    JoinableQueue除了有与Queue对象有相同的方法之外,还具有以下必备方法:

obj = JoinableQueue([maxsize]):实例化一个对象,maxsize可指定队列中最大的项目数

ob j.task_done():反馈信号,(即告诉生产者最近一次取出来的数据已经被处理,可以放入下一个数据了),如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

obj.join():阻塞进程,将持续到队列中的每个项目均调用task_done()方法为止。(即生产者使用此方法暂停生产,等待队列中所有的数据均被消费者处理完毕后,再进行生产)

  • 实例:生产者消费者模型
  1. # JoinableQueue模块实现生产者消费者模型
  2. from multiprocessing import Process, JoinableQueue
  3. from time import sleep
  4. def consumer(jqueue, name, color):
  5. while 1:
  6. info = jqueue.get()
  7. jqueue.task_done() # 每消费一个,反馈一个
  8. print("%s%s拿走了%s\033[0m" %(color, name, info))
  9. def producer(jqueue, product):
  10. for i in range(20):
  11. info = product + "的娃娃%s" %str(i)
  12. jqueue.put(info)
  13. jqueue.join() # 阻塞,等待消费者消费完队列中的所有数据
  14. if __name__ == '__main__':
  15. jqueue = JoinableQueue(10)
  16. p_pro1 = Process(target=producer, args=(jqueue, "岛国米饭保你爱"))
  17. p_con1= Process(target= consumer, args=(jqueue, 'zyk', '\033[34m'))
  18. p_con1.daemon = True # 保证消费者子进程终止
  19. p_pro1.start()
  20. p_con1.start()
  21. p_pro1.join() # 阻塞,等待生产者结束

二、管道:multiprocessing.Pipe

  • 介绍

Pipe(deplex=True):在进程之间创建一条管道,并返回元组(con1, con2)。其中con1, con2表示管道两端的连接对象,若con1发,则con2收;若con2发,则con1收。强调:必须在产生Process对象之前创建管道。参数deplex默认是全双工的,如果将其设为False,con1将只能用于接收,con2只能发送。

  • 主要方法

 con1.recv():接收con2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直被阻塞;如果连接的另一端(con2)被关闭,则抛出EOFError异常。同理con2.recv()。

con1.send(obj):通过连接发送对象,obj是与序列化兼容的任意对象。同理con2.send(obj)。

con1.close():关闭连接,对于不使用的接口应该及时关闭。垃圾处理机制会自动调用此方法。

  • 其它方法

con1.fileno():返回连接使用的整数文件描述符。实测未返回任何值。

con1.poll(timeout=None):如果连接上的数据可用,则返回True。timeout可指定等待的最长时限。如果省略此参数,将立即返回结果。如果设为None,操作将无期限等待数据到达。

con1.recv_bytes(maxlength):加收con2.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个值,将引发IOError异常,并且在连接上无法进行进一步读取;如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。

con1.send_bytes(buffer, offset=-1, size=-1):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送的字节数。数据以单条消息的形式发出,另一段调用con2.recv_bytes()方法接收。

con1.recv_bytes_into(buffer, offset=-1):接收一条完整的字节消息,并把它保存到buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移,返回值是接收的字节数,如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

  • 实例:生产者消费者模型
  1. # 管道实现生产者消费者模型
  2. from multiprocessing import Pipe, Process
  3. from time import sleep
  4. def func(con):
  5. con1, con2 = con
  6. con1.close() #
  7. while 1:
  8. sleep(0.3)
  9. try:
  10. print(con2.recv())
  11. except EOFError:
  12. con2.close()
  13. break
  14. if __name__ == '__main__':
  15. con1, con2 = Pipe() # 要写在创建Process对象之前
  16. p_func = Process(target=func, args=((con1, con2),))
  17. p_func.start()
  18. con2.close() # 关闭父进程的con2接口,要写在开启子进程之后
  19. [con1.send(i) for i in range(10)] # 生产数据
  20. con1.close()

需要注意的是管道两端的正确管理问题。如果生产者或消费者中都没有使用管道的某个端口,就应将其关闭。如果忘记执行这些步骤,程序可能会消费者中的recv()操作上刮起,管道是由操作系统引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端口。


三、进程间数据共享:multiprocessing.Manager

展望未来,基于消息传递的并发编程是大势所趋,即便是使用线程,推获做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。这样可极大地减少对使用锁定和其它同步手段的需求,还可以扩展到分布式系统中。

但是,进程间应尽量避免通信,即便是需要通信,也应该选择进程安全的工具来避免加锁带来的问题。以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的,虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此。

  1. # Manager模块实现进程间数据共享
  2. from multiprocessing import Process, Manager, Lock
  3. def work(lock, dct):
  4. with lock: # 加锁
  5. dct['count'] +=1 # 如果不加锁,极有可能会引发数据错乱
  6. if __name__ == '__main__':
  7. lock = Lock()
  8. with Manager() as m:
  9. # lst = m.list([1, 2, 3]) # 生成list
  10. dct = m.dict({'count': 10}) # 生成字典
  11. p_lst = []
  12. for i in range(10):
  13. p = Process(target=work, args=(lock, dct))
  14. p_lst.append(p)
  15. p.start()
  16. [p.join() for p in p_lst]
  17. print(dct)

四、进程池:multiprocess.Pool

在程序实例处理问题的过程中,忙时会有上百千万的任务需要被执行,闲时可能只有零星任务,那么在有上百千万个任务需要被执行的时候,我们就需要去创建上百千万个进程么?答案是否定的,首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二,即便是开启了上百千万个进程,操作系统也不能让这些进程同时执行。这么做反而会影响程序的效率,因此我们不能无限制的根据任务开启或者结束进程,那么我们要怎么做呢?

在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面开启固定数量的进程,有需求来了,就拿一个进程去处理,处理完毕后再放入池中。如果有很多任务需要执行,池中的进程数量不够,就要等待之前的任务执行完毕归还进程,待有空闲的进程时才继续执行。也就是说,池中进程的数量时固定的,同一时间最多运行的进程数量固定,这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也能在一定程度上实现并发效果.

  • 模块介绍

obj = Pool(os.cpu_count()+1):创建进程池,os.cpu_count()是获取cpu核心数,核心数+1是开启进程数量的一个统一标准。

  • 主要参数

process:要创建的进程数,默认使用os._count()的值。

initializer:指定每个工作进程启动时要执行的可调用对象,默认为None

initargs:是要传给initializer的参数组

  • 主要方法

obj.apply(func, args=(), kwds={}):同步执行,池中的进程串行的执行,强调:此方法不会在所有池工作中并发执行func函数,如果要通过不同参数并发地执行func函数,必须从不同线程调用obj.apply()方法,或者使用obj.apply_async()方法。

obj.apply_async(func, args=(), kwds={}, callback=None):异步执行,池中的进程并行或并发执行func函数。callback是回调函数,会将func返回的结果交给指定的函数,指定的函数必须要有一个形参来接收fnc的返回值。调用此方法时,进程池中的所有进程都会是守护进程,必须同时加上close()和join()方法。

obj.close():关闭进程池,防止进一步操作,如果所有操作持续挂起,它们将在工作进程终止前完成。

obj.join():等待所有工作进程退出,此方法只能在close()或ti=eminate()之后调用

  • 其它方法

obj.wait(timeout):等待结果变为可用

obj.ready():如果调用完成,返回True

obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作,如果obj被垃圾回收,将自动调用此方法。

obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的,如果在指定时间内还没有到达,将引发异常,如果远程操作中引发了异常,它将在调用此方法时再次被引发。

obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常

 

  • 实例一:同步与异步爬取网页源码效率对比
  1. # 同步与异步, 效率对比
  2. from multiprocessing import Pool
  3. from time import time
  4. from requests import get
  5. from os import cpu_count
  6. def func(url):
  7. if get(url).status_code == 200:
  8. print("正在爬取: ", url)
  9. if __name__ == '__main__':
  10. p = Pool((cpu_count() or 1) +1) # 实例化一个进程池对象, 内有cpu核心数+1个进程
  11. url_lst = ['https://www.baidu.com',
  12. 'http://www.jd.com',
  13. 'http://www.taobao.com',
  14. 'http://www.mi.com',
  15. 'http://www.cnblogs.com',
  16. ]
  17. print("同步执行>>> ")
  18. start = time()
  19. [p.apply(func, args=(i,)) for i in url_lst] # 同步(串行)执行
  20. apply_stop = time() - start
  21. print("\n异步执行>>> ")
  22. start = time()
  23. [p.apply_async(func, args=(i,)) for i in url_lst] # 异步(并发/并行)执行
  24. # 异步执行必须写close()与join()
  25. p.close()
  26. p.join()
  27. apply_async_stop = time() - start
  28. print("\n同步用时: %s\t异步用时: %s" %(apply_stop, apply_async_stop))
  29. # 同步用时: 1.5356788635253906 异步用时: 0.6395771503448486
  • 实例二:回调函数的基本使用
  1. # 回调函数(callback=func)
  2. from multiprocessing import Pool
  3. from time import sleep, time
  4. from requests import get
  5. from os import getpid, getppid
  6. # 执行函数
  7. def func(url):
  8. print("子进程:%s\t父进程:%s" %(getpid(), getppid()))
  9. res = get(url)
  10. if res.status_code == 200:
  11. return url, res.text
  12. # 回调函数
  13. def cal_back(sta):
  14. url, text = sta
  15. print("url: %s\t回调函数:%s" %(url, getpid()))
  16. with open('网页源码', 'w', encoding='utf-8') as f:
  17. f.write(text)
  18. if __name__ == '__main__':
  19. p = Pool(3)
  20. url_lst = ['https://www.baidu.com',
  21. 'http://www.jd.com',
  22. 'http://www.taobao.com',
  23. 'http://www.mi.com',
  24. 'http://www.cnblogs.com',
  25. ]
  26. start = time()
  27. # callback指定回调函数,回调函数接收func函数的返回值
  28. [p.apply_async(func, args=(i,), callback=cal_back) for i in url_lst]
  29. # 异步执行必须写close()与join()
  30. p.close()
  31. p.join()
  32. apply_async_stop = time() - start

需要回调函数的场景:进程池中任何一个任务处理完后,就会立即告知主进程。主程序收到结果后便去调用指定的函数处理该结果。我们可以把耗时间(阻塞)的任务放到进程池中,然后制定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的就是任务结果。

完结



标签:__,con1,20,队列,08,queue,进程,obj

专注分享技术,共同学习,共同进步。侵权联系[admin#icode9.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有