- 5.1. 他のスレッドの終了を待つ
- 5.2. スレッド間の待ち合わせ
5.2. スレッド間の待ち合わせ
複数のスレッドが協調して処理を行う場合、あるスレッドの部分処理が終了するのを、もう一方のスレッドが待機したい場合があります。そのようなとき、java.lang.Objectクラスのwait/notify/notifyAllメソッドを使用します。
wait()
waitメソッドは、notifyまたはnotifyAllメソッドが呼び出されるまで処理を待機するメソッドです。waitメソッドを呼び出すためには、スレッドは該当するオブジェクトのロックを取得していなければいけません。
notify()/notifyAll()
notifyおよびnotifyAllメソッドは、waitメソッドによって待機状態にあるスレッドの実行を再開させるメソッドです。あるオブジェクトに関して、waitメソッドで待機しているスレッドの集合のことを「ウェイトセット」と呼びます。notifyメソッドは、ウェイトセット内の1つのスレッドの処理を再開させます。再開されるスレッドは、Java仮想マシンによって任意に選ばれ、プログラムで指定することはできません。notifyAllメソッドは、ウェイトセット内の全てのスレッドの処理を再開させます。notifyおよびnotifyAllメソッドを実行するためには、waitメソッドの場合と同じく、該当オブジェクトのロックを取得していなければなりません。
それではwaitを利用したプログラムを見てみましょう。一つのキューのデータを複数のスレッドから入出力するプログラムです。
リスト QueueTest.java
import java.util.LinkedList; import java.util.Iterator; public class QueueTest { public static void main(String[] args) { Queue queue = new Queue(); new Producer(queue).start(); new Consumer(queue).start(); } } class Queue { LinkedList queue; public Queue() { queue = new LinkedList(); } synchronized public void put(Object obj) { while (queue.size() >= 3) { System.out.println(obj + "を追加しようとしましたが," + "キューがいっぱいなので待機します。"); try { wait(); } catch (InterruptedException e) {} } queue.addFirst(obj); printList(); System.out.println(obj + "をキューに追加しました。"); notifyAll(); } synchronized public Object get() { while (queue.size() == 0) { System.out.println("データを取り出そうとしましたが," + "キューが空なので待機します。"); try { wait(); } catch (InterruptedException e) {} } Object obj = queue.removeLast(); printList(); System.out.println(obj + "をキューから取り出しました。"); notifyAll(); return obj; } synchronized public void printList() { System.out.print("[ "); for (Iterator i = queue.iterator(); i.hasNext();) { System.out.print(i.next() + " "); } System.out.print("] "); } } class Producer extends Thread { private Queue queue; public Producer(Queue queue) { this.queue = queue; } public void run() { for (int i = 0; i < 100; i++) { try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) {} queue.put(new Integer(i)); } } } class Consumer extends Thread { private Queue queue; public Consumer(Queue queue) { this.queue = queue; } public void run() { for (int i = 0; i < 100; i++) { try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) {} Object obj = queue.get(); } } }
QueueTest クラス
メインのプログラムです。1つのQueueクラスのインスタンスを作成し、そのインスタンスに対して操作を実行するProducerクラスとConsumerクラスの2つのスレッドを起動します。
Producer クラス
キューにデータを格納するクラスです。適当な時間間隔をおいて、100個のデータをキューに格納します。データを格納する際に、キューの状態確認などは行いません。キューがいっぱいでデータを格納できない状態でも、データを格納しようとします。
Consumer クラス
キューからデータを取り出すクラスです。適当な時間間隔を置いて、100個のデータをキューから取り出します。データを取り出す際に、キューの状態確認は行いません。キューが空のときでも、データを取り出そうとします。
Queue クラス
Queueクラスはキューを表すクラスです。内部的にはLinkedListを用いてデータを表現しています。格納できるデータ数を制限し、最大3つまでしかデータを格納できないことにします。このクラスには、キューにオブジェクトを追加するputメソッドと、キューからデータを取り出すgetメソッドを用意しています。これらのメソッドは、複数のスレッドから同時に要求があっても正常動作するように、同期処理を行っています。
Queueクラスのputメソッドとgetメソッドはsynchronizedメソッドとしていますので、データの格納や取り出しを行うことができるスレッドは同時には1つだけであることを保証します。(サイドコーチ:LinkedListは複数のスレッドから同時アクセスされたときの動作が保証されていません。LinkedListを複数のスレッドで利用する場合、このようにsynchronizedブロックを使用しなければいけません。)しかし、キューがいっぱいの時にputメソッドが呼ばれたり、キューが空の時にgetメソッドが呼ばれたときはどうしましょう。例外を発生させてしまうというのもひとつの解決策ですが、ここでは要求に対応できる状態になるまで待機することにします。例えば、キューがいっぱいでデータを格納できなかった場合、他のスレッドによってデータが格納されるのを待機します。ここで、wait()メソッドを利用します。
putメソッドの初めの部分を見てみましょう。
synchronized public void put(Object obj) { while (queue.size() >= 3) { System.out.println(obj + "を追加しようとしましたが," + "キューがいっぱいなので待機します。"); try { wait(); } catch (InterruptedException e) {} }
キューの上限を3としていますので、データを格納できない状態(キューのサイズが3以上)であれば、メッセージを表示してwait()メソッドを呼び出しています。これで、他のスレッドから通知されるまで待機します。wait()メソッドは獲得しているロックを解放するため、他のスレッドがsynchronizedメソッドであるput()やget()を実行できるようになります。
ここで他のスレッドがget()を呼び出したとしましょう(もしput()が呼び出されたなら、そのスレッドも先ほどと同じようにwait()メソッドで待機します)。get()メソッドも、put()メソッドと同じようにはじめに条件をチェックします。ここではキューにデータがあるので、先に進みます。
synchronized public Object get() { while (queue.size() == 0) { System.out.println("データを取り出そうとしましたが," + "キューが空なので待機します。"); try { wait(); } catch (InterruptedException e) {} }
その先では、まずキューからのデータの取り出しを行い、キューの状態やメッセージを表示しています。printList()メソッドは、キューの内容を表示するためのメソッドで、44〜50行目で定義しています。
Object obj = queue.removeLast(); printList(); System.out.println(obj + "をキューから取り出しました。");
データの取り出し処理が終われば、notifyAll()メソッドでwait()メソッドで待機しているスレッドを再開できる状態にします。
notifyAll();
ただし、wait()で待機していたスレッドは直ちに実行されるわけではありません。スレッドは、wait()メソッドで待機状態に入る際にロックを解放していましたので、ロックを再取得しなければそのスレッドの処理を再開できません。しかしnotifyAll()を呼び出したスレッドは依然としてロックを取得したままの状態です。そのスレッドがsynchronizedブロックを終了し、ロックを解放して、はじめてロック取得待ちのスレッドは処理を再開できます。
プログラムの実行結果は次のようになります。
wait()メソッドで、スレッドが待機していることが確認できます。
スレッドが、wait()メソッドで待機している状態と、ロックが解放されるのを待機している状態は、全く別の状態ですので注意してください。ウェイトセット内のスレッドは、notify/notifyAll/interrupt等のメソッドによってウェイトセットから抜け出します。ウェイトセットから抜けるとき、必ずロックを再取得しなければいけませんので、直ちにロックを取得できない場合、今度はロック解放待ちの状態となります。
スレッドの状態遷移をまとめると、次のようになります。
ところで、putメソッドやgetメソッドでは、なぜnotify()ではなくnotifyAll()を利用しているのでしょうか。実際、このプログラムの場合、notifyAll()をnotify()と書き換えても問題なく動作します。しかし、waitで待機する条件が複数あるとき(ここでも「データを格納したいのに、キューがいっぱいのとき」と「データを取り出したいのに、キューが空のとき」の2つがあります)で、異なる条件でウェイトセットに入っているスレッドが共存しているときに(このサンプルは、同時には1つのスレッドしかウェイトセットに入りません)、問題が発生する可能性があります。
notifyメソッドは、再始動させようとするスレッドを指定することができません。したがって、場合によっては、まだ処理を再開できる条件が整っていないものが指定されてしまうことがあります。そのとき、そのスレッドは、ウェイトセット内の別のスレッドを再始動させるため、またnotifyメソッドを呼び出さなければいけません。notifyを呼び忘れると、別のウェイトセットに入ったスレッドがそのまま放置されてしまうことになります。このような処理は、ウェイトセットに入る条件やスレッド数が増えたときのプログラムの動作予測が非常に難しく、プログラミングの間違いを犯しやすくなります。
notifyAllメソッドを用いると、とりあえずウェイトセット内のすべてのメソッドが再始動しようとします。そこで、それぞれのスレッドに条件を再評価させ、条件を満たしていない場合は再度ウェイトセットに入ってもらうようにします。そのため、条件判断にifではなくwhileを用いています。粗雑なやり方に思えるかもしれませんが、こちらの方がプログラムを書きやすく、意図しない動作も起こりにくいため、一般的にnotifyAllを用いる方法が推奨されています。
(実習課題2)
2節のサンプルプログラムを、簡単な待ち行列をシミュレーションするプログラムに改良しなさい。ATMの数を1つ2つと変化させて、シミュレーションを実行しなさい。
- 待ち行列の長さに上限は無い。つまり待ち行列が一杯なので、待機するという事は無い。
- 顧客は、平均して3秒に1人到着する。到着間隔は、ポアソン分布に従うのが理想だが、どのようなものでも構わない。
- ATMは、平均して2秒で1人分のサービスを完了する。サービス時間は、指数分布に従うのが理想だが、どのようなものでも構わない。