zzh

zzh

Why won't the core threads of the Java thread pool be released?

Lifecycle of Thread Pool#

*
* The runState provides the main lifecycle control, taking on values:
*
*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
*

Source Code Analysis#

image

When the number of working threads is less than the core thread count, the addWorker function is executed. Next, let's take a look at the addWorker function:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
        }
    }
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Here we have removed some irrelevant code and only kept the core part. We can see that inside the for(;;) loop, it first checks whether the current number of threads is greater than the core thread count. If it is, it returns false (why do we need to check again here? Because in multi-threading, multiple threads may simultaneously submit a task to compete for the last core thread); then in the try block, a new Worker object is created, and the thread in the Worker object is started. So, let's take a look at the implementation of the Worker object next:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

First, we can see that the Worker class inherits from AQS, and some internal code will use the state of AQS to perform locking and unlocking operations.

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

The constructor of the Worker class uses the ThreadFactory factory to create a new thread, and passes the Worker object itself (do you remember that we used the thread in the Woker object to run the code before? This is to pass the code that the thread needs to execute); then after the thread starts, it will execute the run method of the Worker object:

public void run() {
    runWorker(this);
}

Then enter the runWorker method:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    try {
        while (task != null || (task = getTask()) != null) {
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                }finally {
                    afterExecute(task, thrown);
                }
            }finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Here we have also removed most of the code and only kept the core part; we can see that the code uses a while loop to continuously execute the task method (i.e., the new Runnable or new Callable submitted by submit), and then sets task=null and uses task = getTask() to get a new method from the blocking queue to continue execution. This is the key difference between core threads and non-core threads. Next, let's enter the getTask() method:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

From here, we can see that core threads use the take() method to get tasks, while non-core threads use the poll(keepAliveTime, TimeUnit.NANOSECONDS) method to get tasks. Both will block the current thread, but poll(keepAliveTime, TimeUnit.NANOSECONDS) will have a timeout after a certain period of time; after exiting, timeOut becomes true, and then returns null in the if block above; at this time, it can exit the while loop in runWorker. However, for core threads, this is not a problem.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.