zzh

zzh

為什麼 Java 線程池的核心線程不會釋放?

線程池的生命周期#

*
* runState 提供主要的生命周期控制,具有以下值:
*
*   RUNNING:接受新任務並處理排隊任務
*   SHUTDOWN:不接受新任務,但處理排隊任務
*   STOP:不接受新任務,不處理排隊任務,
*             並中斷正在進行的任務
*   TIDYING:所有任務已終止,workerCount 為零,
*             線程正在過渡到 TIDYING 狀態
*             將運行 terminated() 鉤子方法
*   TERMINATED:terminated() 已完成
*

源碼解析#

image

當工作線程小於核心線程數時就進入 addWorker 函數執行方法。接下來我們進入 addWorker 函數看看:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
        }
    }
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這裡我們刪除了一些不那麼相關的代碼,只保留了核心部分。其中我們可以看到 for (;;) 循環內部先判斷當前線程數是否大於核心線程數,如果大於就返回一個 false(這裡為什麼又要判斷一次,是因為多線程中可能存在多個線程同時提交一個任務競爭最後一個核心線程);然後在 try 部分會新建一個 Worker 對象,然後使用 Worker 對象中的 thread 開始執行。那麼,我們接下來看看 Worker 對象的內部實現:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

首先我們可以看到 Worker 類繼承於 AQS,內部一些代碼會使用 AQS 的 state 來執行加鎖解鎖操作。

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

Worker 類的構造函數使用 ThreadFactory 工廠新建一個線程,同時把 Worker 對象自身傳入(還記得前面使用了 Woker 對象中的 thread 運行嗎,這裡就是為了傳入 thread 需要執行的代碼);然後 thread 開啟以後會執行 Worker 對象的 run 方法:

public void run() {
    runWorker(this);
}

然後進入 runWorker 方法:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    try {
        while (task != null || (task = getTask()) != null) {
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                }finally {
                    afterExecute(task, thrown);
                }
            }finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我們這裡同樣刪除了大部分代碼,只保留核心的部分;可以看到代碼使用了 while 循環不斷執行 task 方法(即 submit 提交的 new Runnable 或者 new Callable),然後設置 task=null 同時使用 task = getTask () 從阻塞隊列中獲取新的方法繼續執行。這裡就是核心線程與非核心線程的關鍵區別所在,我們接著進入 getTask () 方法:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

從這裡我們可以看出核心線程使用 take () 方法獲取 task 而非核心線程使用 poll (keepAliveTime, TimeUnit.NANOSECONDS) 獲取 task,兩者都會阻塞當前線程,但是 poll (keepAliveTime, TimeUnit.NANOSECONDS) 會存在一個時間退出;退出以後 timeOut 為 true,然後在上面的 if 塊中返回 null;這時就可以退出 runWorker 的 while 循環。而對於核心線程就沒有這個問題。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。