线程プールのライフサイクル#
*
* runStateは、以下の値を取ります。
*
* RUNNING: 新しいタスクを受け入れ、キューにあるタスクを処理します
* SHUTDOWN: 新しいタスクを受け入れませんが、キューにあるタスクを処理します
* STOP: 新しいタスクを受け入れず、キューにあるタスクを処理せず、
* 進行中のタスクを中断します
* TIDYING: すべてのタスクが終了し、workerCountがゼロになり、
* 状態がTIDYINGに遷移するスレッドは、terminated()フックメソッドを実行します
* TERMINATED: terminated()が完了しました
*
ソースコードの解析#
ワーカースレッドがコアスレッド数よりも少ない場合、addWorker 関数が実行されます。次に、addWorker 関数に移動してみましょう:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
}
}
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
ここでは、関係のないコードを削除し、コアの部分のみを保持しました。ここで、for (;;) ループ内で現在のスレッド数がコアスレッド数を超えているかどうかを最初に確認し、超えている場合は false を返します(なぜ再度確認する必要があるのかというと、マルチスレッド環境では、複数のスレッドが同時に最後のコアスレッドを競合させる可能性があるためです);その後、try ブロックでは新しい Worker オブジェクトを作成し、Worker オブジェクトの thread を使用して実行を開始します。それでは、次に Worker オブジェクトの内部実装を見てみましょう:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
まず、Worker クラスが AQS を継承していることに注意してください。いくつかのコードでは、AQS の state を使用してロックの取得と解放を実行します。
Worker(Runnable firstTask) {
setState(-1); // runWorkerまで割り込みを禁止する
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Worker クラスのコンストラクタでは、ThreadFactory ファクトリを使用して新しいスレッドを作成し、Worker オブジェクト自体を渡します(前述のように、Woker オブジェクトの thread を使用して実行する必要があるためです);その後、スレッドが開始されると、Worker オブジェクトの run メソッドが実行されます:
public void run() {
runWorker(this);
}
そして、runWorker メソッドに入ります:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
try {
while (task != null || (task = getTask()) != null) {
try {
beforeExecute(wt, task);
try {
task.run();
}finally {
afterExecute(task, thrown);
}
}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
ここでも、大部分のコードは削除し、核心部分のみを保持しました。コードは while ループを使用して task メソッド(つまり、submit で提出された new Runnable または new Callable)を継続的に実行し、task = getTask () を使用してブロッキングキューから新しいメソッドを取得して継続します。これがコアスレッドと非コアスレッドの主な違いです。次に、getTask () メソッドに入りましょう:
private Runnable getTask() {
boolean timedOut = false; // 最後のpoll()がタイムアウトしたかどうか
for (;;) {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
ここからわかるように、コアスレッドは take () メソッドを使用して task を取得し、非コアスレッドは poll (keepAliveTime, TimeUnit.NANOSECONDS) を使用して task を取得します。両方のメソッドは現在のスレッドをブロックしますが、poll (keepAliveTime, TimeUnit.NANOSECONDS) は一定時間経過すると終了します。終了後、timeOut が true になり、上記の if ブロックで null を返すことができます。これにより、runWorker の while ループを終了することができます。コアスレッドにはこの問題はありません。