English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Javaのスレッドプールの詳細と実例コード

スレッドプールの技術背景

オブジェクトの作成と破棄は、特に面向オブジェクトプログラミングでは時間がかかることです。なぜなら、オブジェクトを作成するにはメモリリソースや他の多くのリソースを取得する必要があるからです。Javaでは特にそのようにします。仮想機械は、オブジェクトが破棄された後にゴミ収集を行うように、すべてのオブジェクトを追跡しようとします。

したがって、サービスプログラムの効率を向上させる手段の一つは、可能な限りオブジェクトの作成と破棄の回数を減らすことです。特に多くのリソースを消費するオブジェクトの作成と破棄です。既存のオブジェクトをどのようにしてサービスするかが解決すべき鍵問題であり、これが「プール化リソース」技術が生まれる原因です。

例えば、Androidでよく見られる多くの一般的なコンポーネントは「プール」の概念を離れてはならないことが多いです。例えば、さまざまな画像ロードライブラリ、ネットワークリクエストライブラリ、Androidのメッセージ伝達メカニズムでのMessageがMessage.obtain()を使用する場合でも、これはMessageプールのオブジェクトを使用しています。したがって、この概念は非常に重要です。この記事で紹介するスレッドプール技術もこの思想に適合しています。

スレッドプールの利点:

1.スレッドプール内のスレッドを再利用し、オブジェクトの作成、破棄によるパフォーマンスコストを削減します;

2.最大の並行スレッド数を効果的に制御し、システムリソースの利用効率を向上させ、過度なリソース競争を避け、ブロックを避けます;

3.複数のスレッドで簡単な管理ができ、スレッドの使用を簡単で効率的にします。

スレッドプールフレームワークExecutor

javaでのスレッドプールはExecutorフレームワークによって実装されており、Executorフレームワークには以下のクラスが含まれています:Executor、Executors、ExecutorService、ThreadPoolExecutor、CallableおよびFuture、FutureTaskの使用など。

Executor: すべてのスレッドプールのインターフェースであり、メソッドは一つだけです。

public interface Executor {  
 void execute(Runnable command);  
}

ExecutorService: Executorの行動を追加し、Executor実装クラスの最も直接的なインターフェースです。

Executors:スレッドプールの作成に使用される一連のファクトリメソッドを提供しており、返されるスレッドプールはExecutorServiceインターフェースを実装しています。

ThreadPoolExecutor:スレッドプールの具体的な実装クラスであり、一般的に使用されるさまざまなスレッドプールはこのクラスに基づいて実装されています。コンストラクタは以下の通りです:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize:スレッドプールの核心スレッド数で、スレッドプールで実行中のスレッド数もcorePoolSizeを超えません。デフォルトでは、allowCoreThreadTimeOutをTrueに設定することで、核心スレッド数は0に設定され、この場合、keepAliveTimeはすべてのスレッドの超時時間を制御します。

maximumPoolSize:スレッドプールが許可する最大スレッド数です。

keepAliveTime:空きスレッドの終了超時時間を指します。

unit :keepAliveTimeの単位を表すエンüm

workQueue:タスクを保存するBlockingQueue<Runnable>キューを表します。

BlockingQueue(ブロッキングキュー)は、java.util.concurrentの下で主にスレッドの同期を制御するために使用されるツールです。BlockQueueが空の場合、BlockingQueueから要素を取得する操作はブロックされ、待機状態に入ります。BlockingQueueに要素が入ると、操作が唤醒されます。同様に、BlockingQueueが満杯の場合、要素を追加しようとする操作もブロックされ、待機状態に入ります。BlockingQueueには空きスペースがあれば、操作が唤醒され続けます。ブロッキングキューは、プロデューサーとコンシューマーのシナリオでよく使用されます。プロデューサーはキューに要素を追加するスレッドであり、コンシューマーはキューから要素を取り出すスレッドです。ブロッキングキューは、プロデューサーが要素を保存するコンテナであり、コンシューマーもコンテナから要素を取り出すだけです。具体的な実装クラスにはLinkedBlockingQueue、ArrayBlockingQueueなどがあります。一般的には、内部ではLockとCondition(表示锁(Lock)およびConditionの学習と使用)を使用してブロッキングと唤醒を実現しています。

スレッドプールの動作プロセスは以下の通りです:

スレッドプールが作成された時点では、スレッドは一切存在しません。タスクキューはパラメータとして渡されます。ただし、キューにタスクが含まれていても、スレッドプールはすぐにそれらを実行しません。

execute() メソッドを呼び出してタスクを追加する際には、スレッドプールが以下のような判断を行います:

もし、実行中のスレッドの数が corePoolSize 未満であれば、すぐにスレッドを作成してこのタスクを実行します;

もし、実行中のスレッドの数が corePoolSize 以上であれば、このタスクをクエueueに追加します;

もし、この時クエueueが満員で、実行中のスレッドの数が maximumPoolSize 未満であれば、非コアスレッドを即座にこのタスクを実行するために作成します;

キューがいっぱいで、実行中のスレッドの数がmaximumPoolSize以上である場合、スレッドプールはRejectExecutionExceptionをスローします。

スレッドがタスクを完了した場合、次のタスクをキューから取得して実行します。

スレッドが何もするべきことがない場合、一定の時間(keepAliveTime)が経過すると、スレッドプールが判断し、現在の実行中のスレッド数が corePoolSize より多い場合、そのスレッドは停止されます。したがって、スレッドプールのすべてのタスクが完了すると、最終的には corePoolSize の大きさに縮小します。

スレッドプールの作成と使用

スレッドプールの生成には、ツールクラスExecutorsの静的メソッドを使用します。以下は一般的なスレッドプールの種類です。

SingleThreadExecutor:単一のバックグラウンドスレッド(バッファキューは無制限)

public static ExecutorService newSingleThreadExecutor() {  
 return new FinalizableDelegatedExecutorService (
  new ThreadPoolExecutor(1, 1,         
  0L, TimeUnit.MILLISECONDS,         
  new LinkedBlockingQueue<Runnable>())); 
}

単一スレッドのスレッドプールを作成します。このスレッドプールには1つの核心スレッドのみが動作しており、すべてのタスクが単一スレッドシーケンシャルに実行されることに等しいです。もし、この唯一のスレッドが例外で終了すると、その代わりに新しいスレッドがそれを代替します。このスレッドプールは、すべてのタスクの実行順序がタスクの提出順序に従って実行されることを保証します。

FixedThreadPool:コアスレッドのみのスレッドプールで、サイズが固定(バッファキューは無限大です) 。

public static ExecutorService newFixedThreadPool(int nThreads) {        
        return new ThreadPoolExecutor(nThreads, nThreads,                                      
            0L, TimeUnit.MILLISECONDS,                                        
            new LinkedBlockingQueue<Runnable>());    
}
固定サイズのスレッドプールを作成します。タスクを提出するたびに新しいスレッドが作成され、スレッドがスレッドプールの最大サイズに達すると、そのサイズは変更されません。スレッドが実行中の例外で終了すると、スレッドプールは新しいスレッドを追加します。

CachedThreadPool:無限大のスレッドプールで、自動的なスレッドリサイクルが可能です。

public static ExecutorService newCachedThreadPool() {   
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,           
   60L, TimeUnit.SECONDS,          
   new SynchronousQueue<Runnable>());  
}

スレッドプールのサイズがタスク処理に必要なスレッドのサイズを超えた場合、一部の空きスレッドをリサイクルします(60秒でタスクを実行しないスレッドが、タスク数が増加した場合、このスレッドプールはまた新しいスレッドを追加してタスクを処理するようにスマートに動作します。このスレッドプールはスレッドプールのサイズに制限を設けず、スレッドプールのサイズはオペレーティングシステム(またはJVM)が作成できる最大スレッドサイズに完全に依存します。SynchronousQueueはバッファが1のブロッキングキュー。

ScheduledThreadPool:コアスレッドが固定で、無限大のスレッドプール。このスレッドプールはタスクの定期的および周期的な実行をサポートしています。

public static ExecutorService newScheduledThreadPool(int corePoolSize) {   
 return new ScheduledThreadPool(corePoolSize, 
    Integer.MAX_VALUE,             
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,             
    new DelayedWorkQueue()); 
}

周期性にタスクを実行するスレッドプールを作成します。非核心スレッドプールが空である場合、DEFAULT_KEEPALIVEMILLIS時間内に回收されます。

スレッドプールで最もよく使用されるタスクの提出方法は2種類あります:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable, T Result);

FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

submit(Callable callable)の実装、submit(Runnable runnable)も同様です。

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 FutureTask<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}

submitが開始するのは返り値があるタスクであり、FutureTaskオブジェクトが返されます。そのため、get()メソッドを使用して結果を取得できます。submitは最終的にexecute(Runnable runable)を呼び出しますが、submitはCallableオブジェクトやRunnableをFutureTaskオブジェクトに包装するだけであり、FutureTaskがRunnableであるため、executeメソッド内で実行できます。CallableオブジェクトやRunnableがFutureTaskオブジェクトにどう包装されるかについては、CallableとFuture、FutureTaskの使用法を参照してください。

スレッドプールの実装原理

スレッドプールの使用方法についてだけ話すと、このブログには大きな価値はありません。最大でもExecutor関連APIを理解する過程に過ぎません。スレッドプールの実装プロセスではSynchronizedキーワードを使用せず、Volatile、Lock、シンクロナイズ(ブロッキング)キュー、Atomic関連クラス、FutureTaskなどを使用しています。なぜなら、後者のパフォーマンスが優れているからです。理解する過程でソースコード内の並行制御の考え方を学ぶことができます。

冒頭で述べたように、スレッドプールの利点は以下の3点にまとめられます:

スレッドの再利用

最大並行数の制御

スレッドの管理

1.スレッドの再利用プロセス

スレッドの再利用の原理を理解するには、まずスレッドのライフサイクルを理解する必要があります。

スレッドのライフサイクルでは、新しい(New)、就寝(Runnable)、実行中(Running)、ブロック(Blocked)、死亡(Dead)の5つの状態を経ています。5状態。

Threadはnewを使って新しいスレッドを作成します。このプロセスは、スレッド情報(スレッド名、ID、スレッドが属するグループなど)を初期化するプロセスであり、普通のオブジェクトと考えられます。Threadのstart()を呼び出すと、Java仮想機がそのためのメソッド呼び出しスタックとプログラムカウンタを生成し、hasBeenStartedをtrueにし、startメソッドを呼び出すと例外が発生します。

この状態にあるスレッドはまだ実行を開始していない。ただし、このスレッドがいつ実行を開始するかは、JVM内のスレッドスケジューラのスケジューリングに依存します。スレッドがCPUを取得すると、run()メソッドが呼び出されます。Threadのrun()メソッドを自分で呼び出す必要はありません。その後、CPUのスケジューリングに基づいて、就寝 - 実行中 - ブロックの間で切り替えが行われ、run()メソッドが終了するか、他の方法でスレッドが停止し、dead状態に入ります。

したがって、スレッドの再利用を実現する原理は、スレッドが生存状態(就寝、実行中、ブロック)を維持することです。次に、ThreadPoolExecutorがどのようにスレッドの再利用を実現するかを見てみましょう。

ThreadPoolExecutorの主要なWorkerクラスがスレッドの再利用を制御しています。Workerクラスの簡略化されたコードを見て、理解しやすくなります:

private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}

WorkerはRunnableであり、threadも持ちます。このthreadは開始する必要があるスレッドです。Workerオブジェクトを新しいオブジェクトとして作成する際に、Threadオブジェクトも同時に作成され、Worker自体がTThreadにパラメータとして渡されます。したがって、Threadのstart()メソッドが呼び出されると、実際にはWorkerのrun()メソッドが実行されます。その後、runWorker()に到達し、whileループがあり、getTask()からRunnableオブジェクトを順序に取得し実行します。getTask()はRunnableオブジェクトをどのように取得するのでしょうか?

これは簡略化されたコードです:

private Runnable getTask() {
 if(一部の特殊な状況) {
  return null;
 }
Runnable r = workQueue.take();
return r;
}

このworkQueueは、ThreadPoolExecutorを初期化する際にタスクを格納するBlockingQueueキューです。このキューには、実行されるRunnableタスクがすべて格納されています。BlockingQueueはブロッキングキューであり、BlockingQueue.take()が空の場合は、新しいオブジェクトがキューに追加されるまで待機状態に入り、ブロッキングされたスレッドが唤醒されます。したがって、一般的にはThreadのrun()メソッドは終了しません。代わりに、workQueueからRunnableタスクを不断に実行し、これによりスレッドの再利用の原理を実現します。

2.最大並行数を制御

RunnableはいつworkQueueに格納されるのでしょうか?Workerはいつ作成され、Worker内のThreadはいつstart()メソッドを呼び出して新しいスレッドを開始し、Workerのrun()メソッドを実行するのでしょうか?上記の分析から、Worker内のrunWorker()がタスクを実行する際には一つずつ、シリアルに進行することが分かります。それでは、並行処理はどのように表現されるのでしょうか?

簡単に思い浮かぶのは、execute(Runnable runnable)が上記のタスクを実行する際にやることです。execute内でどうやっているのかを見てみましょう。

execute:

簡略化されたコード

public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
int c = ctl.get();
// 現在のスレッド数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 新しいスレッドを直接起動。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 活動スレッド数 >= corePoolSize
// runStateがRUNNINGでクエueueが満員でない
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再びRUNNING状態を確認
// .RUNNING状態でない場合、workQueueからタスクを削除して拒否
if (!isRunning(recheck) && remove(command))
reject(command);// スレッドプールが指定するポリシーに従ってタスクを拒否
// 2つの状況:
// 1.RUNNING状態でない場合、新しいタスクを拒否
// 2.クエueueが満員で新しいスレッドの起動に失敗(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}

addWorker:

簡略化されたコード

private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core63; corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}

コードに基づいて、先ほど述べたスレッドプールのタスク追加プロセスを見てみましょう:

* もし、実行中のスレッドの数が corePoolSize 未満であれば、すぐにスレッドを作成してこのタスクを実行します;  
* もし、実行中のスレッドの数が corePoolSize 以上であれば、このタスクをクエueueに追加します;
* もし、この時クエueueが満員で、実行中のスレッドの数が maximumPoolSize 未満であれば、非コアスレッドを即座にこのタスクを実行するために作成します;
* キューがいっぱいで、実行中のスレッドの数がmaximumPoolSize以上である場合、スレッドプールはRejectExecutionExceptionをスローします。

これがAndroidのAsyncTaskが並行実行中に最大タスク数を超えた場合にRejectExecutionExceptionが投げられる理由です。詳細は最新バージョンのAsyncTaskのソースコードの解説およびAsyncTaskの暗部をご参照ください。

addWorkerを通じて新しいスレッドが成功して作成された場合、start()を通じて新しいスレッドを開始し、firstTaskをこのWorkerのrun()内で最初に実行するタスクとして設定します。

各Workerのタスクはシリアルで処理されますが、複数のWorkerが作成された場合、共通のworkQueueを使用するため、並行処理が行われます。

したがって、corePoolSizeとmaximumPoolSizeに基づいて最大並行数を制御します。大まかなプロセスは以下の図で示されます。

上記の説明と図を通じて、このプロセスを非常によく理解できます。

Android開発をしている場合で、Handlerの原理に非常に詳しいと感じるかもしれません。この図は非常に馴染みがあるかもしれません。中にはHandler、Looper、Messageの使用中に非常に似たプロセスがあります。Handler.send(Message)はexecute(Runnable)に相当し、Looperが維持するMessageキューはBlockingQueueに相当しますが、このキューは自分でシンクロナイズして維持する必要があります。Looperのloop()関数は、MeaasgeキューからMeaasgeを取り出すためにループし、WorkerのrunWork()はBlockingQueueからRunnableを取り出すために同様の理由でループします。

3.スレッド管理

スレッドプールを使用して、スレッドの再利用、並行数の制御、および破棄などのプロセスを非常に良い方法で管理できます。スレッドの再利用と並行数の制御については既に説明しましたが、スレッドの管理プロセスはそれに組み込まれており、非常に理解しやすいです。

ThreadPoolExecutorにはctlというAtomicInteger変数があります。この変数は以下の2つの内容を保存しています:

全てのスレッドの数 各スレッドの状態 そして低位29ビットフィールドがスレッド数を保存し、上位3ビットフィールドがrunStateを保存し、ビット演算を通じて異なる値を取得します。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//スレッドの状態を取得します
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//Workerの数を取得
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// スレッドが動作しているかどうかを判断
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

ここでは、shutdownとshutdownNow()を通じてスレッドプールの終了プロセスを分析します。まず、スレッドプールにはタスクの追加と実行を制御するための5つの状態があります。以下の3つの状態を紹介します:

RUNNING状態:スレッドプールが通常動作し、新しいタスクを受け付け、キューのタスクを処理します;

SHUTDOWN状態:新しいタスクを受け付けませんが、キューのタスクを実行します;

STOP状態:新しいタスクを受け付けません。キューのタスクを処理しません。shutdownメソッドはrunStateをSHUTDOWNに設定します。すべての空闲スレッドを終了しますが、まだ動作しているスレッドには影響を受けません。したがって、キューのタスクは実行されます。

shutdownNowメソッドはrunStateをSTOPに設定します。shutdownメソッドとの違いは、このメソッドはすべてのスレッドを終了し、キューのタスクも実行されません。

まとめ
ThreadPoolExecutorのソースコードの分析を通じて、スレッドプールの作成、タスクの追加、実行などのプロセスを全体的に理解し、これらのプロセスに慣れると、スレッドプールの使用がより簡単になります。

そして学んだことが、並行制御やプロデューサー-コンサーマー・モデルのタスク処理について、今後の他の問題の理解や解決に大きな助けとなります。例えば、AndroidのHandlerメカニズム、LooperのMessagerキューはBlookQueueで処理することもできます。これらはソースコードを読む収穫です。

これでJava スレッドプールの情報を整理しました。今後も関連する情報を追加していきます。皆様のこのサイトへのサポートに感謝します!

基礎教程
おすすめ