zzh

zzh

java 线程池的核心线程为什么不会释放

线程池的生命周期#

*
* The runState provides the main lifecycle control, taking on values:
*
*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
*

源码解析#

image

当工作线程小于核心线程数的时候就进入 addWorker 函数执行方法。接下来我们进入 addWorker 函数看看:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
        }
    }
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里我们删除了一些不那么相关的代码,只保留了核心部分。其中我们可以看到 for (;;) 循环内部先判断当前线程数是否大于核心线程数,如果大于就返回一个 false(这里为什么又要判断一次,是因为多线程中可能存在多个线程同时提交一个任务竞争最后一个核心线程);然后在 try 部分会新建一个 Worker 对象,然后使用 Worker 对象中的 thread 开始执行。那么,我们接下来看看 Worker 对象的内部实现:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

首先我们可以看到 Worker 类继承于 AQS,内部一些代码会使用 AQS 的 state 来执行加锁解锁操作。

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

Worker 类的构造函数使用 ThreadFactory 工厂新建一个线程,同时把 Worker 对象自身传入(还记得前面使用了 Woker 对象中的 thread 运行吗,这里就是为了传入 thread 需要执行的代码);然后 thread 开启以后会执行 Worker 对象的 run 方法:

public void run() {
    runWorker(this);
}

然后进入 runWorker 方法:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    try {
        while (task != null || (task = getTask()) != null) {
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                }finally {
                    afterExecute(task, thrown);
                }
            }finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们这里同样删除了大部分代码,只保留核心的部分;可以看到代码使用了 while 循环不断执行 task 方法(即 submit 提交的 new Runnable 或者 new Callable),然后设置 task=null 同时使用 task = getTask () 从阻塞队列中获取新的方法继续执行。这里就是核心线程与非核心线程的关键区别所在,我们接着进入 getTask () 方法:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

从这里我们可以看出核心线程使用 take () 方法获取 task 而非核心线程使用 poll (keepAliveTime, TimeUnit.NANOSECONDS) 获取 task,两者都会阻塞当前线程,但是 poll (keepAliveTime, TimeUnit.NANOSECONDS) 会存在一个时间退出;退出以后 timeOut 为 true,然后在上面的 if 块中返回 null;这时就可以退出 runWorker 的 while 循环。而对于核心线程就没有这个问题。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。