- 5.1. SocketChannel/ServerSocketChannel
- 5.2. ノンブロッキング入出力
5.2. ノンブロッキング入出力
次に、ノンブロッキング入出力を利用したサーバアプリケーションの記述方法を説明します。
ノンブロッキングにすることのできるチャネルは SelectableChannel クラスを継承しています。SocketChannel クラスや ServerSocketChannel クラスも SelectableChannel クラスを継承しています。ブロックモードを変更するには configureBlocking メソッドを用います。新規作成された SelectableChannel は必ずブロックモードです。configureBloking(false) を呼び出すことでノンブロッキングなチャネルとなります。
channel.configureBlocking(false);
ノンブロッキングモードのチャネルでは、入出力操作を行っても処理がブロックされません。たとえば ServerSocketChannel で accept() メソッドを呼び出した場合、接続があってもなくてもただちにメソッドの実行が終了します。これではいつ入出力を行うメソッドを呼び出したらよいのかわかりません。利用可能なチャネルを取得するための仕組みとしてセレクタ(Selector)というものを用います。セレクタにチャネルを登録しておき、そこから利用可能となったものを取り出して入出力を行います。セレクタには複数のチャネルを登録することが可能で、一つのスレッドでも見かけ上複数の入出力を同時に行うことができます。
Selector は SelectorProvider により生成されます。SelectorProvider は独自に実装することもできますが、ここではデフォルトで用意されているプロバイダを利用します。デフォルトのプロバイダは SelectorProvider.provider() メソッドで取得し、Selector は SelectorProvider クラスの openSelector() メソッドで取得します。
Selector selector = SelectorProvider.provider().openSelector();
これは次のように記述しても同等の処理が行われます。
Selector selector = Selector.open();
セレクタにチャネルを登録するには、SelectableChannel の register() メソッドを呼び出します(Selectorクラスのメソッドでないことに注意)。
-
SelectionKey register(Selector sel, int ops, Object att)
-
SelectionKey register(Selector sel, int ops)
第一引数に登録先のセレクタ、第二引数にチャネルに期待する操作の種別、第三引数に添付オブジェクトを指定します。チャネルの操作の種別には以下のものがあります。
値 | 説明 |
SelectionKey.OP_ACCEPT | ソケットの接続受け付け操作 |
SelectionKey.OP_CONNECT | ソケットの接続操作 |
SelectionKey.OP_READ | 読み込み操作 |
SelectionKey.OP_WRITE | 書込み操作 |
第三引数の添付オブジェクトには任意のオブジェクトを指定することができます。引数2つの register() メソッドは、添付オブジェクトとして null を指定したことになります。
register() メソッドの戻り値である SelectionKey は、セレクタに登録したチャネルを識別するためのオブジェクトです。登録したチャネルや添付オブジェクトは SelectionKey オブジェクトを利用して取り出します。
セレクタから利用可能となったチャネルを取り出すためには、通常はまず Selector クラスの select() メソッドを呼び出します。select() メソッドは利用可能なチャネルの個数を返します。ここで「利用可能とは」、たとえば OP_ACCEPT を指定して登録した ServerSocketChannel に接続要求がきた場合などを指します。select() メソッドには以下の3つのバリエーションがあります。
-
int select()
-
int select(long timeout)
-
int selectNow()
select() メソッドは利用可能なチャネルが出現する、割り込みが入る、または timeout で指定した時間(ミリ秒)経過するまで処理をブロックします。selectNow() メソッドはブロックしない操作で、利用可能なチャネルが存在しない場合ただちに0を返します。
select() メソッドで利用可能なチャネルがあることが確認できたら、次は selectedKeys() メソッドを呼び出します。selectedKeys() メソッドは利用可能なチャネルを表す SelectionKey の Set を返します。SelectionKey の channel() メソッドや attachment() メソッドで、チャネルや添付オブジェクトを取得することができます。
それでは実際にサンプルのプログラムを見てみましょう。
NonBlockingChannelEchoServer.java
package nio.chapter5; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; public class NonBlockingChannelEchoServer { private static final int ECHO_PORT = 10007; private static final int BUF_SIZE = 1000; private Selector selector; public static void main(String[] args) { new NonBlockingChannelEchoServer().run(); } public void run() { ServerSocketChannel serverChannel = null; try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(ECHO_PORT)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("NonBlockingChannelEchoServerが起動しました(port=" + serverChannel.socket().getLocalPort() + ")"); while (selector.select() > 0) { for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();) { SelectionKey key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) { doAccept((ServerSocketChannel) key.channel()); } else if (key.isReadable()) { doRead((SocketChannel) key.channel()); } } } } catch (IOException e) { e.printStackTrace(); } finally { if (serverChannel != null && serverChannel.isOpen()) { try { System.out.println("NonBlockingChannelEchoServerを停止します。"); serverChannel.close(); } catch (IOException e) {} } } } private void doAccept(ServerSocketChannel serverChannel) { try { SocketChannel channel = serverChannel.accept(); String remoteAddress = channel.socket() .getRemoteSocketAddress() .toString(); System.out.println(remoteAddress + ":[接続されました]"); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } private void doRead(SocketChannel channel) { ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); Charset charset = Charset.forName("UTF-8"); String remoteAddress = channel.socket() .getRemoteSocketAddress() .toString(); try { if (channel.read(buf) < 0) { return; } buf.flip(); System.out.print(remoteAddress + ":" + charset.decode(buf).toString()); buf.flip(); channel.write(buf); } catch (IOException e) { e.printStackTrace(); } finally { System.out.println(remoteAddress + ":[切断しました]"); try { channel.close(); } catch (IOException e) {} } } }
このプログラムは、上記の ChannelEchoServer をノンブロッキングチャネルを利用するように書き換えたものです。
run() メソッドで主要な処理を行っています。
selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(ECHO_PORT)); serverChannel.register(selector, SelectionKey.OP_ACCEPT);
この部分ではセレクタの初期化、チャネルのブロックモードの設定、チャネルのバインド、セレクタへのチャネルの登録を行っています。このチャネルは新規接続を受け付けるためのチャネルですので、OP_ACCEPT の操作を指定して登録しています。
その次の while ループでセレクタからチャネルを取り出す処理を行っています。
while (selector.select() > 0) { for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();) { SelectionKey key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) { doAccept((ServerSocketChannel) key.channel()); } else if (key.isReadable()) { doRead((SocketChannel) key.channel()); } } }
ここでは、利用可能なチャネルがどの操作が可能なチャネルであるかを判定して処理を分岐させています。新規接続受付処理が可能なチャネルであれば最初に登録した受信用のチャネルと判定し、データ受信可能なチャネルであればクライアントからのデータ受信処理を行います。データ受信用のチャネルの作成やセレクタへの登録は新規接続受け付け処理の中で行っています。この部分についてはこのあとで説明します。
取得された SelectionKey からどの処理が実行できるチャネルであるかを判別するためには以下のメソッドを利用します。
メソッド | 説明 |
isAcceptable() | 新規接続が受け付け可能であるかどうかを判定する |
isConnectable() | 接続可能であるかどうかを判定する |
isReadable() | データ受信処理が可能であるかどうかを判定する |
isWritable() | データ送信処理が可能であるかどうかを判定する |
doAccept() メソッドでは新規接続の受付処理を行っています。
SocketChannel channel = serverChannel.accept(); String remoteAddress = channel.socket() .getRemoteSocketAddress() .toString(); System.out.println(remoteAddress + ":[接続されました]"); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ);
59行目の accept() メソッドで新規接続の受付を行っています。このチャネルはノンブロッキングですのでブロックされませんが、すでにクライアントからの接続があることが確認されていますので、ただちに SocketChannel が取得されます。取得された SocketChannel を利用してデータの送受信を行いますが、データの受信処理はデータの到着を待たなければいけませんので、このチャネルもノンブロッキングとしてセレクタに登録します。ここで待ち受ける処理はデータ受信処理ですので OP_READ を指定して登録します。
doRead() メソッドでは、データを受信して返信する処理を行っています。
if (channel.read(buf) < 0) {
この場合でも79行目の read() メソッドはノンブロッキングな処理ですが、データが到着していることが確認できていますので、確実にデータを受信することができます。
このプログラムを実行し複数のクライアントからの接続を行うと、マルチスレッドを利用していないにも関わらず複数接続の同時処理が実現できていることが確認できます。
(実習課題2)
例で示したサーバプログラムを改良し、2つの別のポートで待ち受けるサーバアプリケーションを1スレッドで実現しなさい。1つのポートは例と同じように同じ文字列を返し、もう1つのポートは文字列を逆順に並べ替えたものを返すようにすること。接続があったときにどちらのサーバに接続があったかは、セレクタに登録する添付オブジェクトを利用して識別すること。