かとじゅんの技術日誌

技術の話をするところ

JavaでActorっぽいものを作ってみる

前回 JavaScalaの"アクターのようなもの"を作ろうということだったので、早速 作ってみました。目的は、Actorの概念に触れることで、並行処理プログラミングの勘所を学ぶことなので、その前提で読んでいただければと思います。

リソース共有モデルには限界がある

オブジェクト指向プログラマが次に読む本 Scalaで学ぶ関数脳入門」には、複数のスレッド間でリソースを共有する「リソース共有モデル」の限界について触れています。

「リソース共有」モデルを前提としている限り、プログラムの規模が大きくなるに従って、並行処理にまつわる複雑さや問題に対処することが困難になってきます。
これに対して、もしスレッド間で同一リソースを共有しないで、協調処理を行うとしたらどうでしょうか。リソースを共有しなければ、データ不整合やデッドロックなどの、並行処理で問題とされていることを回避できるのです。メッセージパッシングは、このようなモデルの代表的なものです

スレッドセーフにするために考えること 〜応用編 その1〜では、複数のスレッドから可変オブジェクトであるリポジトリにアクセスを受け付け、スレッド間で同期化を行う機構を導入しスレッドセーフになるようにしていました。同期化は計算コストがかかります。そもそも共有しないのであれば、このようなコストは払う必要がありません。
ここで紹介するアクターのモデルでは、「メッセージパッシング」という機構を使って、スレッドに対して複製された「メッセージ」を送受信することで、リソースの共有を避けることを可能にしています。

オブジェクト指向プログラマが次に読む本 ?Scalaで学ぶ関数脳入門

オブジェクト指向プログラマが次に読む本 ?Scalaで学ぶ関数脳入門

メッセージボックス+スレッドがアクター

アクターにはスレッドと、「メッセージパッシング」を行うためのメッセージボックスのようなオブジェクトが含まれます。簡単に説明すると、メッセージボックスは、メッセージを格納できるキューみたいなものです。スレッドに処理を依頼するクライアント側は、そのメッセージボックスにメッセージを登録して、メッセージをスレッドが読み取って動作するような仕組みです。

Actorっぽいものを実装してみる

前述したように、アクターの外部からメッセージを送信するときは、そのキューにメッセージを登録することにして、アクターが受信するときは、メッセージを取得して削除するようにします。FIFO*1ですね。
まず、アクターに送信するためのメッセージを表すインターフェイスは以下のような感じ。メッセージは可変オブジェクトの可能性があるので、複製ができるようにCloneableを継承して、cloneメソッドを実装するようにしています。

package actor;

/**
 * アクターに送信するためのメッセージを表すインターフェイス。
 */
public interface Message extends Cloneable {

	Message clone();

}

メッセージの抽象クラスは、面倒なんでcommons-langの力を借りてざっくり作ってます。アクターに何かのメッセージを送信したい場合はこの抽象クラスを継承することにします。

package actor;

import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;

/**
 * メッセージの抽象クラス。
 */
public abstract class AbstractMessage implements Message {

	@Override
	public int hashCode() {
		return HashCodeBuilder.reflectionHashCode(this);
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj == null) {
			return false;
		}
		if (getClass() != obj.getClass()) {
			return false;
		}
		return EqualsBuilder.reflectionEquals(this, obj);
	}

	@Override
	public AbstractMessage clone() {
		try {
			return (AbstractMessage) super.clone();
		} catch (CloneNotSupportedException e) {
			throw new Error(e);
		}
	}

}

そして、アクターっぽいものを使うサンプル。ActorLikeというクラス名にしました。mainメソッドを見てください。
ActorLikeを無名クラスでインスタンス化します。receiveMessageメソッドはActorLikeがメッセージを受信した時に何を実行するか処理を定義したメソッドです。この例では、Messageを実装したStringMessageクラスと、EndMessageクラスを処理しています。
その下には、ActorLikeを開始した後に、sendMessageメソッドを使い、メッセージを送信しています。StringMessageは文字列を表現したメッセージ、EndMessageはアクターを終了させるためのメッセージを意味しています。

package actor;

import org.apache.commons.lang.Validate;

public class App {

	// アクターを終了させるメッセージ
	static class EndMessage extends AbstractMessage {

	}

	// アクターに文字列を送るメッセージ
	static class StringMessage extends AbstractMessage {
		private final String message;

		public StringMessage(String message) {
			Validate.notNull(message);
			this.message = message;
		}

		@Override
		public String toString() {
			return message;
		}

	}

	public static void main(String[] args) {

		ActorLike actorLike = new ActorLike() {

			@Override
			protected void receiveMessage(Message message) {
				// StringMessageなら画面に表示
				if (message instanceof StringMessage) {
					String messageText = message.toString();
					System.out.println(messageText);
				} else
				// EndMessageの時はアクターを終了
				if (message instanceof EndMessage) {
					// 本当は何かの終了処理を行う
					shutdown(); // アクターのシャットダウンを要求
				}
			}

		};

		// アクターを開始
		actorLike.start();

		try {
			// アクターにメッセージを送信
			actorLike.sendMessage(new StringMessage("Hello World1"));
			actorLike.sendMessage(new StringMessage("Hello World2"));
			actorLike.sendMessage(new StringMessage("Hello World3"));
			actorLike.sendMessage(new StringMessage("Hello World4"));
			actorLike.sendMessage(new StringMessage("Hello World5"));
			actorLike.sendMessage(new StringMessage("Hello World6"));
			actorLike.sendMessage(new StringMessage("Hello World7"));
			actorLike.sendMessage(new StringMessage("Hello World8"));
			actorLike.sendMessage(new StringMessage("Hello World9"));
			actorLike.sendMessage(new StringMessage("Hello World10"));
			actorLike.sendMessage(new EndMessage());
			// アクターの終了を待機
			actorLike.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

実行結果の標準出力はこんな感じ。

Hello World1
Hello World2
(中略)
Hello World9
Hello World10
canceled

で、本題のActorLikeのソースです。
このActorLikeが開始(start)すると、Threadを使って自身のrunメソッドをスレッドで実行します。

package actor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang.Validate;

/**
 * アクターっぽいもの
 * 
 * @author j5ik2o
 */
public abstract class ActorLike implements Runnable {

	public static final int DEFAULT_PROCESS_INTERVAL = 50;
	public static final int DEFAULT_MESSAGE_QUEUE_SIZE = 5;

	private final Thread thread = new Thread(this);

	private final BlockingQueue<Message> messageQueue;

	private final int processInterval;
	private volatile boolean canceled = false;

	/**
	 * インスタンスを生成する。
	 */
	public ActorLike() {
		this(DEFAULT_MESSAGE_QUEUE_SIZE, DEFAULT_PROCESS_INTERVAL);
	}

	/**
	 * インスタンスを生成する。
	 * 
	 * @param messageQueueSize
	 *            メッセージキューのサイズ
	 * @param processInterval
	 *            アクターの待機時間間隔
	 */
	public ActorLike(int messageQueueSize, int processInterval) {
		Validate.isTrue(processInterval != 0);
		messageQueue = new LinkedBlockingQueue<Message>(messageQueueSize);
		this.processInterval = processInterval;
	}

	/**
	 * アクターを開始する。
	 */
	public void start() {
		thread.start();
	}

	/**
	 * アクターの終了を待機する。
	 * 
	 * @throws InterruptedException
	 */
	public void join() throws InterruptedException {
		thread.join();
	}

	/**
	 * アクターを中断する。
	 * 
	 * @param interrupt
	 *            インターラプトする場合はtrue
	 */
	public synchronized void cancel(boolean interrupt) {
		canceled = true;
		if (interrupt) {
			thread.interrupt();
		}
	}

	/**
	 * アクターをシャットダウンする。
	 */
	public void shutdown() {
		cancel(true);
	}

	/**
	 * アクターにメッセージを送る。
	 * 
	 * @param message
	 *            {@link Message}
	 * @throws InterruptedException
	 */
	public void sendMessage(Message message) throws InterruptedException {
		Validate.notNull(message);
		messageQueue.put(message.clone());
	}

	@Override
	public void run() {
		while (canceled == false) {
			try {
				final Message message = messageQueue.take();
				if (message != null) {
					Runnable runnable = new Runnable() {
						@Override
						public void run() {
							receiveMessage(message);
						}
					};
					new Thread(runnable).start();
				}
				Thread.sleep(processInterval);
			} catch (InterruptedException e) {
				if (canceled == false) { // キャンセルの指示が無いときのInterruptedExceptionを再生する。
					Thread.currentThread().interrupt();
				} else {
					System.out.println("canceled");
				}
			}
		}
	}

	/**
	 * メッセージを受信した際に呼ばれるメソッド。
	 * 
	 * @param message
	 *            {@link Message}
	 */
	protected abstract void receiveMessage(Message message);

}

開始するとcanceledフラグがfalseの間、メッセージを読み取ってそのメッセージに対応する処理を行います。
そのメッセージを格納するメッセージボックスは、BlockingQueue型のmessageQueueです。そのBlockingQueueインタフェースとは、以下のことです。名前のままですが、要素の操作時にブロックすることがあるキューです。

要素の取得時にキューが空でなくなるまで待機したり、要素の格納時にキュー内に空きが生じるまで待機する操作を追加でサポートする Queue です。

実装には、LinkedBlockingQueueクラスを利用しています。

リンクノードに基づく、任意のバウンド形式のブロッキングキューです。このキューは FIFO (先入れ先出し) で要素を順序付けします。このキューの「先頭」は、キューに入っていた時間がもっとも長い要素です。このキューの「末尾」は、キューに入っていた時間がもっとも短い要素です。新しい要素はキューの末尾に挿入され、キュー取得オペレーションにより、キューの先頭の要素が取得されます。

BlockingQueueのtakeメソッドは、メッセージが読み取れるようになるまで、待機(ブロック)します。main側では、ActorLikeのsendMessageメソッドを使って、メッセージを送信します。sendMessageメソッド内部では、BlockingQueueのputメソッドを使ってメッセージを登録します。この際、Messageの複製をcloneメソッドを使って作り、キューに登録します。BlockingQueueのtakeメソッドでMessageオブジェクトと取得できたら、ActorLikeのreceiveMessageをスレッドで実行します。メッセージは複製されて登録されるので、リソースの共有はありません。
この例では、actorLike.sendMessage(new StringMessage("Hello World6"));以降の命令を実行する際に、キューにまだ5個のメッセージがある場合はブロックします。

receiveMessageメソッドの実装では、Messageの実装型をinstanceofで判定(これがダサい...Scalaならパターンマッチですね。後述します)して、StringMessageなら標準出力に表示、EndMessageなら、shutdownメソッドを呼び出します。

ActorLike actorLike = new ActorLike() {

	@Override
	protected void receiveMessage(Message message) {
		// StringMessageなら画面に表示
		if (message instanceof StringMessage) {
			String messageText = message.toString();
			System.out.println(messageText);
		} else
		// EndMessageの時はアクターを終了
		if (message instanceof EndMessage) {
			// 何かの終了処理
			shutdown();
		}
	}
};

shutdownメソッドではcanceledフラグをtrueにセットし、Threadのinterruptメソッドを使い、runメソッドのスレッドにインターラプト(中断要求の割り込み)を発生させています。
というのも、BlockingQueueのtakeメソッドや、Thread.sleepなどの時間がかかる、もしくはブロックするメソッドで待機している場合は、フラグ値を更新しても終了することができないからです。
ちなみに、runメソッドのcatch (InterruptedException e)で、こんなことをやっていますが、InterruptedExceptionは握りつぶしちゃいけません。。

	} catch (InterruptedException e) {
		if (canceled == false) { // キャンセルの指示が無いときのInterruptedExceptionを再生する。
			Thread.currentThread().interrupt();
		} else {
			System.out.println("canceled");
		}
	}

本書にも書いている通り、インターラプトは「今の処理を中断してください」というリクエストだということがわかります。

スレッドのインタラプトは協力のための仕組みです。インタラプトするスレッドが目的スレッドを強制的に停止してほかのことをやらせることはできません。スレッドAがスレッドBにインタラプトすると、それはAからBに対する「あなたのご都合のよいときに、今やってることを止めてください」という単なるリクエストであり希望です。

なので、キャンセルの命令を指定していない時のインターラプトのInterruptedExceptionは握りつぶしてはいけないのです。Runnableではキャッチ例外であるInterruptedExceptionを再スローできないので、Thread.currentThread().interrupt(); を呼び出してインターラプトを再生します。

以下の、volatileはこれまでに触れていないキーワードですが、これをちゃんと説明しようとする超重量級になるので別のエントリで説明しますが、簡単にいうとと固有ロックを使わずに、フィールドを読み込むスレッドが、最後に書きこまれた値を見えるようにするためのvolatile修飾子です。

// フィールドを読み込むスレッドが、最後に書きこまれて値を見えるようにするためのvolatileで宣言
private volatile boolean canceled = false;

あと、問題点としては、ActorLikeのreceiveMessageを実行するスレッドを毎回、

new Thread(runnable).start();

で起動しているので、以下のようにExecutorServiceを使って、スレッドプールを使って実行するとよりよいと思います。

public abstract class ActorLike implements Runnable {
// ...
private final ExecutorService executorService = Executors
			.newCachedThreadPool();
// ...
@Override
	public void run() {
		while (canceled == false) {
			try {
// ...
					Runnable runnable = new Runnable() {
						@Override
						public void run() {
							receiveMessage(message);
						}
					};
					// スレッドプールで実行する
					executorService.submit(runnable);
				}
				Thread.sleep(processInterval);
			} catch (InterruptedException e) {
// ...
	}

}

ScalaのActorで書いてみよう

上記で記述したアクターっぽいものを、ScalaのActorで書き換えてみました。以下、参照。

  // ケースクラスでEndを定義。EndMessage相当のクラスが定義されると考えてください。
  case class End

  // アクターの定義と生成
  val testActor = actor {
    loop{ // ループ
      react{ // メッセージの受信
        case message: String => println(message) // Stringのメッセージを受信した場合に表示する
        case End => System.exit(0) // End型のメッセージを受信した場合に終了
      }
    }
  }
  // アクターの開始
  testActor.start
  // メッセージの送信
  testActor ! "Hello World1" // !はメソッドです。 キモイならtestActor.!("Hello World1")と読み替えてみて。
  testActor ! "Hello World2"
  testActor ! "Hello World3"
  testActor ! "Hello World4"
  testActor ! "Hello World5"
  testActor ! "Hello World6"
  testActor ! "Hello World7"
  testActor ! "Hello World8"
  testActor ! "Hello World9"
  testActor ! "Hello World10"
  // アクター終了させる
  testActor ! End

言語やライブラリが違うので比較しようがないのですが、ライブラリとしてActorが提供されているだけあって簡単に実装できます。
メッセージの型はどんな型でも利用でき、そのメッセージをパターンマッチをかけるだけですね。

前述のサンプルは送信のみでしたが、ScalaのActorではActorとの送受信が同期/非同期で行えます。
たとえば、フューチャーによる非同期/同期処理というのができます。上記のアクターではアクターにメッセージを送信するだけで、返信は取得できません。つまり、クライアント側がメッセージを送信し、その処理結果を受信することです。
ScalaのActorを使うと以下のように実現できます。クライアント側は、!!を使ってメッセージを送信しフューチャーを取得します。この処理はActor側の処理の完了を待たずに非同期に行います。取得したフューチャーを関数として呼び出し、結果を取得します。この時にまだActor側の処理が完了していなければ、完了するまで待機し結果を取得する同期処理を行います。

// ...
  val testActor = actor{
    loop{
      react{
        case message: String => {
          println(message)
          reply("echo:"+message) 
        }
        case End => System.exit(0)
      }
    }
  }
// ...
  val future = testActor !! "Hello World1" // 終了した時に値を取得できるフューチャーを取得する。
  Thread.sleep(5000);
  val result = future(); // フィーチャーから結果を取得する。
  println(result); // echo:Hello World1 と表示される。

他にもいろいろ機能があるので触ってみると面白いかもしれません。
ScalaのActorについては、プログラミングScala「9 章アクターによる堅牢でスケーラブルな並行処理 P209」がステップバイステップで分かりやすい解説なんで理解しやすいと思います。

プログラミングScala

プログラミングScala

追記:
id:makotanより@もらったので、以下を紹介。
Akka Projectの成果物を使うと、JavaでもActorが使えます。ScalaAPIも提供しているぽいです。知っていたけどまだ触っていない。。
以下のサンプルはUntypedActorです。受信するメッセージがObjectで辛いところですねw あと、TypedActorというのもあるらしいです。興味あれば、ぜひどうぞw

public class SampleUntypedActor extends UntypedActor {
 
  public void onReceive(Object message) throws Exception {
    if (message instanceof String) log.info("Received String message: %s", message);
    else throw new IllegalArgumentException("Unknown message: " + message);
  }
}

*1:ファイフォという呼び方だと思っていたが、"フィフォ"、"フィーフォー"とも呼ぶのかなんか違和感あるな。。どうでもいいけど。