ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

Python 初学笔记 - 第五章-多进程

2020-11-23 02:01:29  阅读:186  来源: 互联网

标签:__ ... name start Python Process 厨师 初学 第五章


目录

概念

Python 提供了 multiprocessing,multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块
threading 的编程接口类似,multiprocessing 模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了 Process 、 Queue 、Pipe 、 Lock 等组件。

注意:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

开启多进程

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程。

强调:

  • 需要使用关键字的方式来指定参数。
  • args 指定的为传给 target 函数的位置参数,是一个元组形式,必须有逗号。

参数介绍:

  • group,参数未使用,值始终为 None。
  • target,表示调用对象,即子进程要执行的任务。
  • args,表示调用对象的位置参数元组,args=(1,2,'abc',)。
  • kwargs,表示调用对象的字典,kwargs={'name':'egon','age':18}。
  • name,为子进程的名称。

使用方式一

# 直接调用Process
import time
from multiprocessing import Process


def foo(t):
    time.sleep(t)
    print(f'subprocess{t} running')
    

if __name__ == '__main__':
    for i in range(3):
        p = Process(target=foo, args=(i,))
        p.start()
    print('main')
main
subprocess0 running
subprocess1 running
subprocess2 running

使用方式二

# 自定义一个Process
import time
from multiprocessing import Process


class MyProcess(Process):

    def __init__(self, name, t):
        super().__init__()
        self.name = name
        self.t = t

    # 一定要把运行的子进程写为run,因为p.start 会调用note方法
    def note(self):
        print(f'SubProcess {self.name} note ...')
        time.sleep(self.t)
        print(f'{self.name} done')


if __name__ == '__main__':
    p1 = MyProcess('info1', 4)
    p2 = MyProcess('info2', 3)
    p3 = MyProcess('info3', 2)
    p4 = MyProcess('info4', 1)
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('test')
test
SubProcess info1 note ...
SubProcess info3 note ...
SubProcess info4 note ...
SubProcess info2 note ...
info4 done
info3 done
info2 done
info1 done

函数属性

  • p.start():启动进程,并调用该子进程中的 p.run()。
  • p.run():进程启动时运行的方法,向操作系统发送请求,正是它去调用 target 指定的函数。
  • p.join([timeout]):主线程等待 p 终止(主线程处于等的状态,而p是处于运行的状态),timeout 是可选的超时时间,timeout 不写代表等待子进程运行至结束。
import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is run ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1))
    p2 = Process(target=work, args=('p2', 1))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('Process done')
p1 Subprocess is run ...
p2 Subprocess is run ...
p1 Subprocess done ...
p2 Subprocess done ...
Process done
  • p.terminate():向系统发出强制终止进程 p 信号,不会进行任何清理操作,如果 p 创建了子进程,该子进程就成了僵尸进程,使用该方法需要
    特别小心这种情况,如果 p 还保存了一个锁那么也将不会被释放, 进而导致死锁。
import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is run ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1))
    p2 = Process(target=work, args=('p2', 1))
    p1.start()
    p2.start()
    # time.sleep(2)  # 这句执行p1就有时间打印语句
    p1.terminate()
    p1.join()
    p2.join()
    print('Process done')
# p1 刚请求开启就马上就被强制终止,所以没有任何输出
p2 Subprocess is run ...
p2 Subprocess done ...
Process done
  • p.is_alive():如果 p 仍然运行,返回 True。
import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is note ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1))
    p1.start()
    p1.terminate()
    time.sleep(2)
    print(p1.is_alive())
    print('Process done')
False
Process done

数据属性

  • p.daemon,默认值为 False,如果设为 True,代表 p 为后台运行的守护进程,当 p 的父进程终止时,p 也随之终止,并且设定为 True 后,p 不能创建自己的子进程,必须在 p.start() 之前设置。
  • p.name,进程的名称,可在实例化 Process 时指定。
  • p.pid,进程的 pid。
import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is note ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1), name='test')
    p1.start()
    p1.terminate()
    time.sleep(2)
    print(p1.is_alive())

    print(p1.name, p1.pid, p1.daemon)
    print('Process done')
False
test 13536 False
Process done

守护进程

  • 守护进程会在主进程代码执行结束后就终止。
  • 守护进程内无法再开启子进程,否则抛出异常。
from multiprocessing import Process
import time


def subprocess(name):
    print(f'{name} start ...')
    time.sleep(2)
    print(f'{name} done ...')


if __name__ == '__main__':
    p1 = Process(target=subprocess, args=('subprocess 1',), name='process', daemon=True)
    # p.daemon = True  # 与上面开启的方式效果相同
    p1.start()
    print('main process note ...')

#  会发现因为主进程运行得很快就结束了,作为守护进程的 p 都来不及执行
main process note ...

竞争问题

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端 是有问题的,而共享带来的是竞争,竞争带来的结果就是错乱。

from multiprocessing import Process, Lock
import time


def subprocess(name):
    print(f'{name} start ...')
    time.sleep(1)
    print(f'{name} done ...')


if __name__ == '__main__':
    p1 = Process(target=subprocess, args=('subprocess 1',), name='process 1')
    p2 = Process(target=subprocess, args=('subprocess 2',), name='process 2')
    p3 = Process(target=subprocess, args=('subprocess 3',), name='process 3')
    p1.start()
    p2.start()
    p3.start()
    print('main process note ...')

# 会发现并发出现了竞争的问题,我们想要的是一个子进程竞争成功输出 start 后其它进程就不要输出了,等这个进程
# done 后,剩余的进程再继续竞争,但是结果显示三个进程都输出了 start
main process note ...
subprocess 3 start ...
subprocess 2 start ...
subprocess 1 start ...
subprocess 3 done ...
subprocess 2 done ...
subprocess 1 done ...

互斥锁

互斥锁的作用也就是一个子程序竞争成功后,其它子进程等待下一次的竞争。

from multiprocessing import Process, Lock


def subprocess(name, lock):
    lock.acquire()
    print(f'{name} start ...')
    print(f'{name} done ...')
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    p1 = Process(target=subprocess, args=('subprocess 1', lock), name='process 1')
    p2 = Process(target=subprocess, args=('subprocess 2', lock), name='process 2')
    p3 = Process(target=subprocess, args=('subprocess 3', lock), name='process 3')
    p1.start()
    p2.start()
    p3.start()
    print('main process note ...')
# 下面可以看到竞争成功的其它子进程将等待
main process note ...
subprocess 1 start ...
subprocess 1 done ...
subprocess 3 start ...
subprocess 3 done ...
subprocess 2 start ...
subprocess 2 done ...

模拟抢票

在实际中,抢票也是一个竞争问题,当我们查询剩余的票的时候每个人看到剩余的票数是一样的没有竞争问题,但是在抢票的时候,大家同时抢,一张票肯定只能给一个人,这个时候需要考虑竞争。

下面是模拟 5 个人抢 2 张票的情况。

#  db模拟数据库剩余票数, db文件内容如下(json格式):
# {"ticket": 2}

from multiprocessing import Process, Lock
import time
import json


# 模拟抢票之前先查询剩余票
def search(name):
    db = json.load(open('db'))
    ticket = db['ticket']
    print(f'{name} 查询剩余车票, 车票剩余 : {ticket}')


# 抢票
def get_ticket(name):
    db = json.load(open('db'))
    if db['ticket'] > 0:  # 如果数据库中票数大于0,则抢票成功
        db['ticket'] -= 1
        with open('db', 'w') as f:
            json.dump(db, f)
        print(f'{name} 抢票成功')
    else:
        print(f'{name} 抢票失败, 票已抢完')


# 真正执行的抢票流程
def task(name, lock):
    search(name)  # 先查询票,看到票还剩多少张
    time.sleep(1) # 模拟查询用的时间

    lock.acquire()  # 加锁
    get_ticket(name)  # 抢票
    time.sleep(1)  # 模拟抢票用的时间
    lock.release()  # 释放锁


if __name__ == '__main__':
    lock = Lock()
    # 5 个用户
    p1 = Process(target=task, args=('user1', lock), name='process 1')
    p2 = Process(target=task, args=('user2', lock), name='process 2')
    p3 = Process(target=task, args=('user3', lock), name='process 3')
    p4 = Process(target=task, args=('user4', lock), name='process 4')
    p5 = Process(target=task, args=('user5', lock), name='process 5')
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()
user3 查询剩余车票, 车票剩余 : 2
user1 查询剩余车票, 车票剩余 : 2
user5 查询剩余车票, 车票剩余 : 2
user2 查询剩余车票, 车票剩余 : 2
user4 查询剩余车票, 车票剩余 : 2
user3 抢票成功
user1 抢票成功
user5 抢票失败, 票已抢完
user2 抢票失败, 票已抢完
user4 抢票失败, 票已抢完

互斥锁与 join 的区别

互斥锁与 join 都可以用于抢票问题的解决,但是使用 join 的话,在一个用户使用的时候可以查票和购票而其它用户什么都做不了,使用互斥锁我们可以针对的使用限制。

总结:加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

  • 效率低(共享数据基于文件,而文件是硬盘上的数据)
  • 需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

  • 效率高(多个进程共享一块内存的数据)
  • 帮我们处理好锁问题。

这就是 multiprocessing 模块为我们提供的基于消息的 IPC 通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 因而队列才是进程间通信的最佳选择。

队列(Queue)

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing 模块支持两种形式,队列和管道,这两种方式都是使用消息传递的。

创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):创建共享的进程队列,Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。maxsize 是队列中允许最大项数,省略则无大小限制。

注意:

  • 队列内存放的是消息而非大数据
  • 队列占用的是内存空间,因而 maxsize 即便是无大小限制也受限于内存大小

主要方法介绍:

  • q.put 方法用以插入数据到队列中。
  • q.get 方法可以从队列读取并且删除一个元素。

使用

from multiprocessing import Queue

# 生成队列,最大为3个
q = Queue(3)

# 向队列添加数据
q.put('str')
q.put([1, 2, 3])
q.put({'dict': 1})
# q.put((1,2))  # 再添加将会阻塞,因为超出最大个数

# 查看队列是否满了
print(q.full())

# 取出队列消息
res1 = q.get()
res2 = q.get()
print('res1:', res1)
print('res2:', res2)

# 判断队列是否是空的
print(q.empty())

# 判断队列是否是满的
print(q.full())

# 关闭队列
q.close()
True
res1: str
res2: [1, 2, 3]
False
False

生产者消费者模型

当有多个生产者或者多个消费者时,消费者想要获取数据就需要去问每个生产者是否有产生数据,当生产者数量很多时效率就会低,而这个时候我们就
可建立一个临时的仓库,只要生产者产生一个数据就放入仓库中,而消费者就不需要去问没有生产者获取数据,只要不停的查看仓库就可以了。

实现

下面是模拟生产者与消费者通过队列进行信息通信的过程,需要注意的是当生产者生产完数据后没有告诉消费者生产数据已经结束了,消费者就会卡住,
所以下面的处理办法是等待生产者执行完毕后传递一个 None 信息给消费者告诉执行完毕,不用等待了。

from multiprocessing import Process, Queue
import time


def producer(name, n, q, t):
    """
    生产者
    :param name: 生产者名字
    :param n: 生产者生产个数
    :param q: 队列
    :param t: 生产时间
    :return: None
    """
    for i in range(n):
        time.sleep(t)
        res = f'{name}的第{i+1}个包子'
        print(f'{name} 生产了 {res}...')
        q.put(res)


def consumer(name, q, t):
    """
    消费者
    :param name: 消费者名字
    :param q: 队列
    :param t: 消费时间
    :return: None
    """
    while True:
        time.sleep(t)
        res = q.get()
        if res is None:
            break
        print(f'{name} 吃掉了 {res}')


if __name__ == '__main__':
    q = Queue()
    # 生成3个生产者
    p1 = Process(target=producer, args=('厨师1', 3, q, 1))
    p2 = Process(target=producer, args=('厨师2', 4, q, 2))
    p3 = Process(target=producer, args=('厨师3', 2, q, 3))
    # 生成2个消费者
    c1 = Process(target=consumer, args=('食客1', q, 2))
    c2 = Process(target=consumer, args=('食客2', q, 2))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    
    # 等待生产者生产完毕发送结束消息给消费者,不加下面的代码,程序会在consumer这里卡住
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    q.put(None)
厨师1 生产了 厨师1的第1个包子...
厨师2 生产了 厨师2的第1个包子...
厨师1 生产了 厨师1的第2个包子...
食客2 吃掉了 厨师1的第1个包子
食客1 吃掉了 厨师2的第1个包子
厨师1 生产了 厨师1的第3个包子...
厨师3 生产了 厨师3的第1个包子...
厨师2 生产了 厨师2的第2个包子...
食客2 吃掉了 厨师1的第2个包子
食客1 吃掉了 厨师1的第3个包子
厨师2 生产了 厨师2的第3个包子...
食客2 吃掉了 厨师3的第1个包子
食客1 吃掉了 厨师2的第2个包子
厨师3 生产了 厨师3的第2个包子...
厨师2 生产了 厨师2的第4个包子...
食客2 吃掉了 厨师2的第3个包子
食客1 吃掉了 厨师3的第2个包子
食客2 吃掉了 厨师2的第4个包子

JoinableQueue 实现

上面的实现无非是发送结束信号而已,有另外一种队列提供了这种机制。

JoinableQueue([maxsize])
这就像是一个 Queue 对象,但队列允许项目的使用者通知生成者项目已经被成功处理,通知进程是使用共享的信号和条件变量来实现的。

JoinableQueue 的实例 p 除了与 Queue 对象相同的方法之外还具有:

  • q.task_done(),使用者使用此方法发出信号,表示 q.get() 的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发 ValueError 异常。
  • q.join(): 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理,阻塞将持续到队列中的每个项目均调用q.task_done()。
from multiprocessing import Process, JoinableQueue
import time


def producer(name, n, q, t):
    """
    生产者
    :param name: 生产者名字
    :param n: 生产者生产个数
    :param q: 队列
    :param t: 生产时间
    :return: None
    """
    for i in range(n):
        time.sleep(t)
        res = f'{name}的第{i+1}个包子'
        print(f'{name} 生产了 {res}...')
        q.put(res)

    # 等待消费者处理数据发送q.task_done()信号,确保每个数据都被处理
    q.join()


def consumer(name, q, t):
    """
    消费者
    :param name: 消费者名字
    :param q: 队列
    :param t: 消费时间
    :return: None
    """
    while True:
        time.sleep(t)
        res = q.get()
        print(f'{name} 吃掉了 {res}')

        # 每处理一个数据发送一次已处理信号给q.join()
        q.task_done()


if __name__ == '__main__':
    q = JoinableQueue()  # 使用JoinableQueue

    p1 = Process(target=producer, args=('厨师1', 3, q, 1))
    p2 = Process(target=producer, args=('厨师2', 4, q, 2))
    p3 = Process(target=producer, args=('厨师3', 2, q, 3))

    # 生成2个消费者, 设为守护进程, 当消费者处理完数据就和主程序一起结束
    c1 = Process(target=consumer, args=('食客1', q, 2), daemon=True)
    c2 = Process(target=consumer, args=('食客2', q, 2), daemon=True)

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    # 等待生产者生产完毕后结束主程序, 而生产者是等待消费者把数据处理完才会结束.
    # 也就是说生产者等待消费者处理完数据就结束, 主程序等待生产者结束才结束, 主程序结束带动消费者结束
    p1.join()
    p2.join()
    p3.join()
厨师1 生产了 厨师1的第1个包子...
食客1 吃掉了 厨师1的第1个包子
厨师1 生产了 厨师1的第2个包子...
食客2 吃掉了 厨师1的第2个包子
厨师2 生产了 厨师2的第1个包子...
厨师1 生产了 厨师1的第3个包子...
厨师3 生产了 厨师3的第1个包子...
食客1 吃掉了 厨师2的第1个包子
食客2 吃掉了 厨师1的第3个包子
厨师2 生产了 厨师2的第2个包子...
食客1 吃掉了 厨师3的第1个包子
食客2 吃掉了 厨师2的第2个包子
厨师3 生产了 厨师3的第2个包子...
厨师2 生产了 厨师2的第3个包子...
食客1 吃掉了 厨师3的第2个包子
食客2 吃掉了 厨师2的第3个包子
厨师2 生产了 厨师2的第4个包子...
食客1 吃掉了 厨师2的第4个包子

标签:__,...,name,start,Python,Process,厨师,初学,第五章
来源: https://www.cnblogs.com/sugarq/p/14022557.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有