ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

简单的线程池(五)

2021-12-14 13:33:55  阅读:168  来源: 互联网

标签:task workerqueues workers unsigned 线程 简单 workersize


◆ 概要

笔者在 《简单的线程池(四)》 中采用了非阻塞的(nonblocking)线程同步方式,在此文中笔者将采用阻塞的(blocking)线程同步方式(参考 《简单的线程池(二)》) 实现相同特性的线程池。

本文不再赘述与 《简单的线程池(二)》《简单的线程池(四)》 相同的内容。如有不明之处,请参考该博客。

◆ 实现

以下代码给出了此线程池的实现,(blocking_unique_pool.h)

class Thread_Pool {

  private:

    struct Task_Wrapper { ...
    	
    };

    atomic<bool> _suspend_;
    atomic<bool> _done_;
    unsigned _workersize_;
    thread* _workers_;
    Blocking_Queue<Task_Wrapper>* _workerqueues_;            // #1

    void work(unsigned index) {
        Task_Wrapper task;
        while (!_done_.load(memory_order_acquire)) {
            _workerqueues_[index].pop(task);
            task();
            while (_suspend_.load(memory_order_acquire))
                std::this_thread::yield();
        }
    }

    void stop() {
        size_t remaining = 0;
        _suspend_.store(true, memory_order_release);
        for (unsigned i = 0; i < _workersize_; ++i)
            remaining += _workerqueues_[i].size();
        _suspend_.store(false, memory_order_release);
        for (unsigned i = 0; i < _workersize_; ++i)
            while (!_workerqueues_[i].empty())
                std::this_thread::yield();
        std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining);
        _done_.store(true, memory_order_release);
        for (unsigned i = 0; i < _workersize_; ++i)
            _workerqueues_->push([] {});                        // #2
        for (unsigned i = 0; i < _workersize_; ++i)
            if (_workers_[i].joinable())
                _workers_[i].join();
        delete[] _workers_;
        delete[] _workerqueues_;
    }

  public:
    Thread_Pool() : _suspend_(false), _done_(false) {
        try {
            _workersize_ = thread::hardware_concurrency();
            _workers_ = new thread[_workersize_]();
            _workerqueues_ = new Blocking_Queue<Task_Wrapper>[_workersize_]();
            for (unsigned i = 0; i < _workersize_; ++i)
                _workers_[i] = thread(&Thread_Pool::work, this, i);
        } catch (...) {
            stop();
            throw;
        }
    }
    ~Thread_Pool() {
        stop();
    }

    template<class Callable>
    future<typename std::result_of<Callable()>::type> submit(Callable c) {
        typedef typename std::result_of<Callable()>::type R;
        packaged_task<R()> task(c);
        future<R> r = task.get_future();
        _workerqueues_[std::rand() % _workersize_].push(std::move(task));
        return r;
    }

};

此线程池采用阻塞式的任务队列存放线程池用户提交的任务(#1)。为了避免发生 《简单的线程池(二)》 中提及的死锁问题,在主线程调用工作线程的 join() 函数之前,向每个任务队列中放入一个假任务(#2),确保各个工作线程都能退出循环等待。

◆ 逻辑

以下类图展现了此线程池的代码主要逻辑结构。它与 《简单的线程池(二)》 中的线程池的区别在于 Thread_Pool 类到 Blocking_Queue<> 类的多重性由 1 变为 1..* 。

线程池用户提交任务与工作线程执行任务的并发过程与 《简单的线程池(二)》 中的一致,此处略。

◆ 验证

验证过程采用了 《简单的线程池(三)》 中定义的的测试用例,对应的测试结果均保存在 [github] cnblogs/15676560 中。

◆ 最后

完整的代码与测试数据请参考 [github] cnblogs/15676560

标签:task,workerqueues,workers,unsigned,线程,简单,workersize
来源: https://www.cnblogs.com/green-cnblogs/p/15676560.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有