ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java线程池之Executors.newSingleThreadExecutor()

2022-06-13 21:35:14  阅读:131  来源: 互联网

标签:Runnable Java newSingleThreadExecutor Worker command task 线程 new


Java线程池Executors.newSingleThreadExecutor()

前言:本文先就Java线程池 ThreadPoolExecutor 进行分析,然后逐步分析单线程池的源码工作流程

ThreadPoolExecutor的工作流程

我们执行以下代码:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(new Runnable() {
    @Override
    public void run() {
        ...
    }
});

当线程池提交Runnable实现时singleThreadExecutor.execute(Runnable command) ,其大致的工作流程如下:

下面跟随源码进入到ThreadPoolExecutor中进行详细分析

线程池初始化

Executors.newSingleThreadExecutor()执行时,其内部源码如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

继续深入new ThreadPoolExecutor方法可以发现,对应的参数的意义分别是:

public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                          int maximumPoolSize, // 最大线程数
                          long keepAliveTime, // 保活时间(这里是0,单线程池暂时用不上此参数)
                          TimeUnit unit, // 保活时间单位
                          BlockingQueue<Runnable> workQueue // 存放Runnable实现的队列) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

往后执行,也就是线程池执行初始化的一个过程,给线程池对象的重要参数进行赋值。

线程池提交任务

先看看提交时,最外层的代码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // 1.
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.
    else if (!addWorker(command, false))
        reject(command);
}

翻译翻译Doug Lea大神写的注释:

  1. 首先判断Worker的数量是否小于corePoolSize核心线程池数量,小于则new Worker对象,启动线程并以此次的Runnable任务为第一次任务进行run()执行。后一段则说有原子性的检查机制保证Worker线程不会额外创建,否则会返回false
  2. 上述条件不满足时,会将Runnale任务放入队列,如果放置成功,且会进行双重校验:先判断线程池是否是SHUTDOWN状态,然后尝试撤回Runnable任务,如果失败,则判断Worker的数量是否为0,如果为0,则新增一个非核心线程池的Worker。根据鄙人目前的理解,这里应该是避免线程池执行shutdown()方法,然后导致Runnable任务没有被正确的处理。
  3. 如果Runnable放入队列失败,则添加非核心线程池的Worker。PS:LinkedBlockingQueue无论如何都可以offer()成功,SynchronousQueueoffer()是返回的false。上述两个BlockingQueue分别对应固定线程数的线程池和非固定线程数的线程池,分别对应的典型是Executors.newSingleThreadExecutor()Executors.newCachedThreadPool()非固定线程数的线程池放在下一期讲。

addWorker方法与Worker类

单线程的线程池的提交任务的时候,最重要的就2个步骤:addWorker(command, true)添加核心线程Worker与 workQueue.offer(command)将超出核心线程数的任务放入队列中。

那么addWorker方法里面发生了什么呢,我摘取了我认为最重要的一段:上锁、new Worker对象与启动线程

private boolean addWorker(Runnable firstTask, boolean core) {
    ...
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); // 注意这里
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

那么,这个Worker究竟是怎么执行的呢,调用t.start()发生了什么呢。先看看Worker的类结构:

再看看Worker的成员变量有哪些,参考其构造方法:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

接下来就是我认为关于Worker最重要核心的地方了,通过上述代码我们可以发现,Worker实现了Runnable接口,然后在构建Worker对象的时候,初始化了它的成员变量thread,并且是以Woker本身赋值进去的!!!看到这里,我们就知道了,t.start()的时候,是执行的Worker.run()方法。


下面是Worker.run()方法内部的部分源码:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) 
            w.lock();
            // If pool is stopping, ensure thread is interr
            // if not, ensure thread is not interrupted.  T
            // requires a recheck in second case to deal wi
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

也就是说,启动Worker的之后,就是循环getTask()task.run()这个getTask()得到的task才是提交到线程池中的Runnable实现
对于单线程池,getTask()最终执行的是LinkedBlockingQueue.take()方法:

  • 按照队列的FIFO原则(先进先出),取出头部节点
  • 如果没有头部节点,则调用notEmpty.await();方法等待,等待offer()方法时,调用notEmpty.signal();,这里涉及多线程的其它知识了,可以参考并发包中的AbstractQueuedSynchronizer相关知识点。

至此,单线程池的执行过程大概讲解了一通,现在再来看开篇的工作流程图,又有了更深地理解。

标签:Runnable,Java,newSingleThreadExecutor,Worker,command,task,线程,new
来源: https://www.cnblogs.com/lcmlyj/p/16371148.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有