かとじゅんの技術日誌

技術の話をするところ

タスクを並行で実行するために必要なこと 〜基礎編〜

スレッドセーフの話も語ればキリがないのですが、そろそろ「タスクを並行に実行する」話題にいってみましょう。この手の記事は結構あるし、書籍の内容をまるまるというわけにいかないので、独断と偏見でポイントを絞って軽く解説する感じで書いてみます。とりあえず、ThreadクラスとExecutorクラスあたりから。

この書籍のP129あたりです。

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―

並行処理のアプリケーションには、「良いスループット」と「良い応答性」の両方が必要と書かれています。スループットは、単位時間あたりの処理能力のことです。処理の効率みたいなものといえると思います。応答性については、処理待ちがなく次のタスクを待ち受けることができるのが応答性がよいといえるでしょう。

さて、P130には以下のようなウェブサーバのサンプルが掲載されていますが(というか、Socketとか出てきて面食らっている人も多いかもしれませんが)、簡単に説明するとnew ServerSocket(80)でローカルホスト上で80ポートで通信(TCP)を待受れるようになります。socket.accept()ではクライアントが接続するまでブロックします。接続があるとリターンしクライアントを表現するSocketオブジェクトであるconnectionが取得できます。具体的なコードは示されていませんが、handleRequestメソッドはそのconncetionを使ってクライントを通信を行います。
この処理は逐次処理なので、acceptメソッドでブロック中はhandleRequestメソッドは処理できませんし、その逆もそうです。つまり、複数のクライアントを同時に処理することができません。

class SingleThreadWebServer {

	public static void main(String[] args) throws IOException {
		ServerSocket socket = new ServerSocket(80);
		while( true ) {
			Socket connection = socket.accept();
			handleRequest(connection);
		}
	}

	private static void handleRequest(Socket connection) {
		// ...
	}
}

並行処理の基本はThreadクラス

続いて、P131で紹介されているThreadクラスを使ったサンプル。Runnableのrunメソッドが並行に実行されるので、複数のクライアントを処理することができます。

class ThreadPerTaskWebServer {

	public static void main(String[] args) throws IOException {
		ServerSocket socket = new ServerSocket(80);
		while( true ) {
			final Socket connection = socket.accept();
			Runnable task = new Runnable() {
				public void run() {
					handleRequest(connection);
				}
			}
			new Thread(task).start();
		}
	}
	
	private static void handleRequest(Socket connection) {
		// ...
	}
}

利点としては、mainメソッドのメインループがタスク(クライアントとの通信)を処理しないので、次の接続のための待ち受けが可能になります。その分、応答性がよくなります。また、タスクを並行に実行できるので、同時に複数のクライアントとの通信が可能になります。

欠点としては、スレッドを作成したり破棄したりすることはCPUやメモリを使うためコストがかかります。また、アクティブなスレッドはメモリも消費します。すでにタスクを処理しているスレッドでCPUがビジー状態であれば、新たなスレッドを作成しても、オーバーヘッドが増えて返って処理能力が低下します。スレッドは無限には生成できず、上限が決まっています。その上限に至った場合はOutOfMemoryErorrが発生します。
ということで、たくさん作ればいいという問題ではありません。実行環境に最適なスレッド数というものがあります。

スレッドを再利用できるExecutorフレームワーク

実行効率を考慮した上で、実行環境に最適なスレッド数を実現するには、「スレッドの再利用」が不可欠です。java.util.concurrentパッケージにはExecutorフレームワークがあり、それを可能にします。

それでは、つべこべ言わずに、Executor版のソースコードをみてみましょう。「Threadクラスは、すっこんでろ!」ってことで、主役はExecutorさんです。

class TaskExecutionWebServer {
	private static final int NTHREADS = 100;
	private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

	public static void main(String[] args) throws IOException {
		ServerSocket socket = newServerSocket(80);
		while (true) {
			final Socket connection = socket.accept();
			Runnable task = new Runnable() {
				public void run() {
					handleRequest(connection);
				}
			};
			exec.execute(task); // タスクを実行
		}
	}

	private static void handleRequest(Socket connection) {
		// ...
	}
}

最初に目が行くのは、

	private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

だと思います。
ここでは、ExecutorsのnewFixedThreadPoolというファクトリメソッドでExecutorを生成しています。
Executorオブジェクトであるexecは、NTHREADS個(100個)のスレッドを持つスレッドプールを持つオブジェクトです。100個のスレッドがすべて処理中である場合は、追加されたタスクはスレッドが使用可能になるまでキュー上で待機します。何かリクエストがある毎にスレッドを作成しないため実行効率もよく、NTHREADS値がCPUの処理能力に対して適切であれば、前述の問題も起きません。
詳しくは以下のJavadocですが、ちょっと訳がかなり厳しい感じのようですw

public static ExecutorService newFixedThreadPool(int nThreads)

共有アンバウンド形式のキューなしで動作する、固定数のスレッドを再利用するスレッドプールを作成します。任意のポイントで、最大 nThreads のスレッドがアクティブな処理タスクになります。すべてのスレッドがアクティブな場合に、追加のタスクが送信されると、それらのタスクはスレッドが使用可能になるまでキューで待機します。実行中に発生した障害のために、いずれかのスレッドがシャットダウン前に終了した場合は、必要に応じて新規スレッドが引き継いで後続のタスクを実行します。明示的なシャットダウンが行われるまでは、スレッドはプール内に存在します。

この書籍の訳者の岩谷さんのブログの、Javadocの日本語訳の方がわかりやすいです。
java.util.concurrent.Executors

共有アンバウンド形式のキューなしで動作する、

は、

サイズ制限のない共有キューを使って仕事をする

ということらしい。。これでは全く意味が違うのではw 正確さを求める方は、やはり原文を参照ということで。

次にここ。RunnableのインスタンスをExecutorクラスのexecuteメソッドに渡しています。Javadocではこのように説明があります。

将来のどの時点かで、指定されたコマンドを実行します。

ということなので、タスクが実行されるかどうかはExecutorの実装に依存します。今回はタスクの実行(execute)というよりは送信とか登録に近いです。実行は前述したようにスレッドプールの状況によって決定されます。

exec.execute(task); // タスクを実行(今から将来のどの時点かで実行)

Executors.newFixedThreadPoolメソッドは、ExecutorではなくExecutorServiceを返します。ExecutorはExecutorServiceのスーパーインターフェイスなので代入可能なのですが、タスクを中断させたり、タスクの完了したかどうかを調べることができません。ExecutorServiceの詳しい使い方は別のエントリで紹介しますが、殆どの場合でExecutorのexecuteメソッドではなく、ExecutorServiceのsubmitメソッドを使って「タスクを送信する」ことになると思います。
ExecutorService (Java Platform SE 6)

他のファクトリメソッドもある

Executorsクラスには、他にも使えるファクトリメソッドがあります。これらのファクトリメソッドで、Executor(ExecutorService)の戦略をストラテジパターンのように変更することができます。

newSingleThreadExecutorメソッド 名前の通り、シングルスレッドで実行されるExecutorを生成します。executeを複数回実行してもタスクは逐次的に実行されます。
newCachedThreadPoolメソッド 複数のスレッドで並行処理しますが、終了しているスレッドがあれば再利用されます。60秒使用されないスレッドは破棄されます。短時間で大量に発生するタスクを処理するのに向いています。
newScheduledThreadPoolメソッド タスクの遅延実行と周期実行のスケジューリングをサポートする固定サイズのスレッドプールを生成します。
newSingleThreadScheduledExecutorメソッド こちらもスケジューリングをサポートしますが、スレッドプールではなく1つのスレッドで実行するExecutorが生成されます。

このようなスレッドの再利用機構を利用すれば、タスクの発生毎にスレッドを作成/破棄するコストを抑え、CPUの処理能力を超えるタスクを実行することもなくなります。CPUの処理能力よりタスクが少ない場合はThreadクラスはお手軽でよいのですが、CPUの処理能力よりタスクが多い、もしくは多くなる可能性がある場合は、スレッドプールを利用することをお勧めします。

次はExecutorServiceを使って、ScalaとかErlangのようなアクターを簡易的に実装してみますか。

あわせて読みたい
shameful hiwa(Java並行処理プログラミングの訳者の方のブログ)
マルチスレッドプログラミング-TECHSCORE-
java.util.concurrentパッケージを用いたマルチスレッドプログラミングについて
虎の穴 J2SE 5.0 入門 Concurrency Utilities No.1