ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

C++多生产者多消费者模型

2022-06-12 23:35:18  阅读:165  来源: 互联网

标签:count std producer 生产者 模型 Factory C++ int line


// 多生产者多消费者模型
// 需要了解以下概念
// thread 线程
// mutex 互斥锁
// atomic 原子操作
// condition_variable 条件变量

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <queue>
#include <random>
#include <ctime>
#include <vector>

// 随机数生成,用于模拟生产消费的耗时
std::default_random_engine e(time(0));
std::uniform_int_distribution<int> distribution(3000, 7000);

class Factory
{
public:
    Factory(int production_target, int producer_count,
        int consumer_count, int line_capacity);

    void produce(int tag);
    void consume(int tag);
    void start_working();

private:
    bool do_produce(int tag);
    bool do_consume(int tag);

private:
    const int production_target; // 生产目标,计划生产的数目
    const int producer_count; // 生产者数量
    const int consumer_count; // 消费者数量
    const int line_capacity; // 产品暂存区最大存放量
    std::atomic<int> produced_count = 0; // 已生产产品数量
    std::atomic<int> consumed_count = 0; // 已消费产品数量
    std::atomic_flag produce_done = ATOMIC_FLAG_INIT; // 产品已经生产完毕
    std::atomic_flag consume_done = ATOMIC_FLAG_INIT; // 产品已经消费完毕
    std::queue<int> product_line; // 已生产未消费产品存放区
    std::mutex mtx; // 互斥锁
    std::condition_variable line_not_full; // 消费者消耗了一个产品
    std::condition_variable line_not_empty; // 生产者产出了一个产品
};

Factory::Factory(int production_target, int producer_count,
    int consumer_count, int line_capacity) : 
    production_target(production_target),
    producer_count(producer_count),
    consumer_count(consumer_count),
    line_capacity(line_capacity)
{
    // do nothing
}

bool Factory::do_produce(int tag)
{
    // 假设生产总是成功的
    std::this_thread::sleep_for(std::chrono::duration(
        std::chrono::milliseconds(distribution(e))));
    return true;
}

bool Factory::do_consume(int tag)
{
    // 假设消费总是成功的
    std::this_thread::sleep_for(std::chrono::duration(
        std::chrono::milliseconds(distribution(e))));
    return true;
}

void Factory::produce(int tag)
{
    while(produced_count < production_target)
    {
        std::unique_lock<std::mutex> lock(mtx);
        if(produced_count < production_target && product_line.size() < line_capacity)
        {
            int id = ++produced_count; // 提前将计数器加上,防止生产超额
            std::cout << "[P" << tag << "] prodecing " << id << "\n";
            lock.unlock(); // 具体的生产过程会比较耗时,将锁释放,使用其它线程生产/消费
            do_produce(tag); // 如果生产可能失败,则需要重新设计此处逻辑
            lock.lock();
            product_line.push(id);
            std::cout << "[P" << tag << "] prodeced " << id << "\n";
            line_not_empty.notify_one(); // 只生产了一个产品
            if(product_line.size() == line_capacity && produced_count < production_target)
            {
                std::cout << "[" << tag << "] product line full\n";
                line_not_full.wait(lock);
            }
        }
    }
    if(produce_done.test_and_set()) // 生产结束,需要唤醒所有消费者
        line_not_empty.notify_all(); // 消费者在生产线为空后一直等待
    std::cout << "producer " << tag << " done\n";
}

void Factory::consume(int tag)
{
    while(consumed_count < production_target)
    {
        std::unique_lock<std::mutex> lock(mtx);
        if(consumed_count < production_target && !product_line.empty())
        {
            int id = product_line.front();
            ++consumed_count;
            product_line.pop();
            std::cout << "[C" << tag << "] consuming " << id << "\n";
            lock.unlock();
            do_consume(tag);
            lock.lock();
            std::cout << "[C" << tag << "] consumed " << id << "\n";
            line_not_full.notify_one();
            if(product_line.empty() && consumed_count < production_target)
            {
                std::cout << "[" << tag << "] product line empty\n";
                line_not_empty.wait(lock);
            }
        }
    }
    if(consume_done.test_and_set())
        line_not_full.notify_all();
    std::cout << "consumer " << tag << " done\n";
}

void Factory::start_working()
{
    std::vector<std::thread> threads;
    for(int i = 0; i < producer_count; ++i)
        threads.emplace_back(&Factory::produce, this, i); // 使用类成员函数时需要加上this指针
    for(int i = producer_count; i < producer_count + consumer_count; ++i)
        threads.emplace_back(&Factory::consume, this, i);

    for(int i = 0; i < producer_count + consumer_count; ++i)
        threads[i].join();
}

int main()
{
    Factory factory(10, 3, 3, 3);
    factory.start_working();
    std::cout << "done\n";
}


// TODO
// 1. 为每条打印信息加上时间戳
//        chrono打印时间戳有点麻烦
// 2. 处理生产/消费失败的情形
//        使用两个变量,一个记录已生产的数量,一个记录正在生产的数量(包含已生产数量)

标签:count,std,producer,生产者,模型,Factory,C++,int,line
来源: https://www.cnblogs.com/xunxunxun/p/16369233.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有