ええっと、リクエストされたので、今回は並行処理ネタにいってみます。
並行処理といえば、この本は必読書ですね。でも、書いてることは、かなりムズカシイデス(´・ω・`)
Java並行処理プログラミング ―その「基盤」と「最新API」を究める―
- 作者: Brian Goetz,Joshua Bloch,Doug Lea
- 出版社/メーカー: ソフトバンククリエイティブ
- 発売日: 2006/11/22
- メディア: 単行本
- 購入: 24人 クリック: 419回
- この商品を含むブログ (163件) を見る
ということで、並行処理について、もうちょっと優しく分かりやすく解説できないか考えて書いてみます。
スレッドセーフにするには
今回は、プログラムをスレッドセーフにするためにどんなことを考えればよいか。そういうテーマです。
アンスレッドセーフなコードから
この本の最初の方に、"スレッドのリスク"という章で、スレッドセーフでないコード例が紹介されています。
そのコードは一部だけだったので、実行できるコードを書いてみました。*1
Sequenceクラスは順序(シーケンス)を表すモデルでvalueプロパティを持ちます。getAndIncrementメソッドを呼ぶと現在値を返しインクリメントできます。
これを並行に実行したした場合どうなるのかという実験をしてみました。
public class Sequence { private int value = 1; public int getValue() { return value - 1; } public int getAndIncrement() { return value++; } }
以下のMainクラスのソースをみてください。
まず、複数のスレッドで共有するSequenceオブジェクトをひとつだけ作ります。
CountDownLatchは複数のスレッドを、同時にスタートさせるための同期オブジェクトです。各スレッドが開始するとstartLatch.await()で待機状態になり、mainメソッドでstartLatch.countDown()を呼ぶと一斉に開始します。(スレッド数が多いとどうしても生成している途中で最初の方のスレッドが終わってしまうのでCountDownLatchで一斉に開始できるようにしています。)
次にTHREAD_COUNT分のスレッドを生成して開始します。
ThreadにはRunnableを実装したThreadAccessクラスのインスタンスを生成して渡します。ThreadAccessオブジェクトにはSequenceオブジェクトと、ThreadAccess内のループ回数 THREAD_LOOPが渡されます。実際のスレッドの処理はThreadAccess#runを見てください。
最後はすべてのスレッドの終了を待って、Sequenceオブジェクトの値を答え合わせします。
public class Main { private static final int THREAD_LOOP = 3; private static final int THREAD_COUNT = 1000; public static void main(String[] args) { Sequence sequence = new Sequence(); CountDownLatch startLatch = new CountDownLatch(1); Collection<Thread> threads = new HashSet<Thread>(); // スレッドの準備 for (int i = 1; i <= THREAD_COUNT; i++) { Thread thread = new Thread(new ThreadAccess(startLatch, sequence, THREAD_LOOP)); threads.add(thread); thread.start(); } // 足並み揃えてゴー。 startLatch.countDown(); try { // みんなが終わるのを待つ for (Thread thread : threads) { thread.join(); } } catch (InterruptedException e) { e.printStackTrace(); } // 答え合わせ if ( THREAD_LOOP * THREAD_COUNT != sequence.getValue() ){ System.out.println(String.format("NG !!!, total counter = %d",sequence.getValue())); }else{ System.out.println("OK !!!"); } } // スレッドの処理 private static class ThreadAccess implements Runnable { private Sequence sequence; private CountDownLatch startLatch; private int loopCount; public ThreadAccess(CountDownLatch startLatch, Sequence sequence, int loopCount) { this.startLatch = startLatch; this.sequence = sequence; this.loopCount = loopCount; } @Override public void run() { try { long threadId = Thread.currentThread().getId(); startLatch.await(); for (int i = 1; i <= loopCount; i++) { int counter = sequence.getAndIncrement(); System.out.println(String.format("%04d:%05d", threadId, counter)); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
これを実行すると3000という値になっているはずですが、OK!!!となったり、NG!!!となったり期待どおり動作しません。
OK !!!
NG !!!, total counter = 2999
アトミックな操作とは
value++のインクリメントはひとつの操作ではなく、実際は以下の3つの操作から成り立っています。*2
- 現在の値を読む -> この値がリターンされる
- その値に1を加える
- その新しい値を書きこむ
value++のような操作は、複数のスレッドが割り込むと正しい動作が保証できません。スレッドセーフにするには、この操作をアトミックにしなければなりません。アトミックとは単一不可分という意味です。ここでは、並行処理環境において、これ以上分けることができない操作として、アトミックな操作(不可分操作)と表現しています。つまり、あるスレッドがある操作を実行中の場合は、他のスレッドには割り込みさせずに、その操作を実行できないようにしなければなりません。
競り合い状態とは
アトミックではない操作を複数のスレッドで並行に実行した場合どのようになるか見てみましょう。下の表は、2つのスレッドがvalue++を並行に実行する様子を擬似的にモデル化したものです。
スレッドA | 1 = get | → | 1+1 | → | set(2) | → | 2 = get | → |
スレッドB | → | 1 = get | → | 1+1 | → | set(2) | → | 2 = get |
スレッドAが1を返したら、スレッドBでは2を返すと考えてしまうのですが、実際はそうではなく、二つのスレッドが同じ値を取得することが可能なのです。そして、取得したその値に対して1を加え、新しい値を書きこむが可能になります。つまり、二つのスレッドで、同じ値が戻り値として繰り返し返されてしまうわけです。
このような状態を、競り合い状態(レースコンディション=race condition)と呼びます。こういう状態になるかどうかは、JVMのスレッドのスケジューリングやタイミングなどに依存してしまうので、計算がうまくいったり、いかなかったりするわけです。上記のコードのように、複数のスレッドで並行に実行した場合は、このような競り合い状態になってしまう可能性があるのです。
どのような操作をアトミックにすればよいか
value++のような操作をリード・モディファイ・ライト操作と言います。リード・モディファイ・ライトは、複数のスレッドが同時に割り込むと上記のような競り合い状態を起こす可能性が高いわけです。このような操作はアトミックに実行されなければなりません。
「リード・モディファイ・ライト(read-modify-write,読んで/変更して/書き戻す)操作」と言います。それは、結果のステートが前のステートに基づいて得られる操作です。
他にも、この競り合い状態が起きやすいのは、チェック・ゼン・アクト操作です。
競り合い状態のいちばん多いタイプは、チェック・ゼン・アクト(check-then-act ステートをチェックしてからその結果に基づいて行為する)です。競り合い状態により、陳腐化した観察(ステートの本来の最新更新値ではない一瞬前の古い値)に基づいて次の行動を決めてしまいます。
チェック・ゼン・アクト操作の典型的な例は、以下のような遅延初期化の処理です。
public class Cache { private static Cache instance; public static Cache getInstance() { if ( instance == null ) { instance = new Cache(); } return instance; } private Cache(){ } }
例えば、二つのスレッドがgetInstanceを呼び出した場合は、スレッドAがinstanceがnullなのを見て新たなCacheオブジェクトを生成します。Bもinstanceがnullなのを見て、スレッドAが作ったCacheオブジェクトとは別のCacheオブジェクトを生成する可能性があります。これはスレッドのスケジューリングに依存するので、運任せです。
チェック・ゼン・アクト操作(遅延初期化のような) とリード・モディファイ・ライト操作をあわせて、複合アクションと言います。
スレッドセーフであるためには、チェック・ゼン・アクト操作(遅延初期化のような) とリード・モディファイ・ライト操作(インクリメントのような)はつねに必ずアトミックであるべきです。チェック・ゼン・アクト操作とリード・モディファイ・ライト操作をまとめて、複合アクション(compound actions) と呼びます。複数の操作の集まりという意味です。複合アクションは、スレッドセーフであるためにはアトミックに実行されなければなりません。
スレッドセーフにするには、これらの操作をアトミックに実行できるようにする必要があります。
アトミックにするための条件
アトミックである条件は以下の通りです。リード・モディファイ・ライトの例で、簡単にいうと、スレッドAが一連の操作を実行中は、スレッドBの一連の操作は、完全に終了しているか、まったく未実行であるか、でなければなりません。これがアトミックの条件です。
操作AとBは、次の条件を満たすときお互いにアトミックです:Aをこれから実行しようとするスレッドから見て、そのときBを実行する別のスレッドが必ずBの実行を完全に終えているか、またはまったくBに未着手である。アトミックな操作は、それ自身も含めて、同じステー卜を操作するすべての操作に対してアトミックな操作です。
固有ロックを使ってアトミック性を保証する
では、アトミック性を保証するにはどうすればよいでしょうか。
SequenceのgetAndIncrementメソッドにsynchronizedを付加することです。*3
public synchronized int getAndIncrement() { return value++; }
上記のコードをもっと分かりやすく書くと、以下のコードと同じ意味になります。
public int getAndIncrement() { synchronized(this){ return value++; } }
synchronized構文は、以下のような構造になっています。synchronizedに渡しているthisは、鍵(ロック)の役割をするオブジェクトで、ロックする処理ブロックを複数のスレッドからロックするための構文です。この処理ブロックを実行できるスレッドは、一度にロックを取得した1つのスレッドだけです。このロックのことを固有ロックと呼びます。
synchronized(ロックオブジェクト){ ロックされる処理ブロック(ロックを取得した1つのスレッドしか実行できない) }
ロックを取得中のスレッドがあれば、ロックを取得しようしたスレッドはブロックされます。ロックが開放されたら、後続のスレッドがロックを取得できます。つまり、ロックされる処理ブロックでは、スレッドの実行が同期化することを意味します。
上記の例では、thisがロックオブジェクトなので、インスタンス単位のロックを行います。例えば、Sequence.classをロックオブジェクトにした場合は、すべてのインスタンスに適用されるジャイアントロックになります。
この固有ロックを使えば、指定した処理ブロックのアトミック性を保証することができます。
固有ロックは、再入可能なロックなので、以下のようなコードで同じスレッドが再びロックを取得することができます。これはデッドロックしません。仮に同じスレッドでデッドロックするようであれば、そのような再入性を考慮するために、コードが複雑化する可能性があります。また、プログラマへの負担も増加させてしまいます。しかし、Javaの固有ロックではこのような心配事はありません。プログラマに優しいのです。
ロックはスレッドが取得するもので、ロックを取得するごとにカウントが増えます。ブロックを抜けてロックを開放すれば、そのカウントは減ります。ロックを取得するスレッドが同じであれば、再入しカウントが増減するということです。(当然、スレッドが異なる場合はこの限りではありません)
public abstract class SequenceBase { public synchronized int getAndIncrement() { return value++; } } public class Sequence extends SequenceBase { @Override public synchronized int getAndIncrement() { // 前処理 int result = super.getAndIncrement(); // デッドロックはしない // 後処理 return result; } }
ReentrantLockはよく考えて使う
Java5以降から含まれているconcurrentパッケージには、再入可能なロック ReentrantLockクラスがあります。このクラスを利用すると以下のように固有ロックを用いずにロックを実現できます。固有ロックは、ロック範囲が明確でわかりやすい反面、そのコードブロック単位でしかロックが実現できません。しかし、ReentrantLockはそのような制限を受けません。ロックの制御に自由度があるものの、ロックの制御はプログラマ任せになり、デッドロックを起こすリスクも高くなります。このようなリスクを承知した上で、ロックの制御に自由度が求められる場合は有効でしょう。
また、性能面ではJava5環境では、固有ロックよりReentrantLockは有利ですが、Java6の固有ロックの性能がよくなり性能差があまりありません。concurrentパッケージのAPIだということで新しくて性能がよいはずだと誤解しがちなのですが、Java5とJava6では事情が異なるわけです。何も考えずにReentrantLockを乱用してしまうとスレッドセーフを実現する初期コストや維持コストが上がってしまう可能性があります。Java6の場合は、コードブロックを股がるロックの要求がなければ、コードの表現としてシンプルでかつ、安全な固有ロックを使うことをお勧めします。
Lock lock = new ReentrantLock(); try { lock.lock(); // ロック // ロックしたい処理 } finally { lock.unlock(); // アンロック }
ちょっと長くなったので、ここまでにします。次のエントリでもう少し具体例をあげてスレッドセーフについて考えたいと思います。
あわせて読みたい
スレッドセーフにするために考えること 〜応用編 その1〜 - じゅんいち☆かとうの技術日誌
スレッドセーフにするために考えること 〜応用編 その2〜 - じゅんいち☆かとうの技術日誌