こんにちは、寺岡です。
今日はJava8のStreamを使ったお手軽な並列処理と、ちょっとした注意点をご紹介します。
Stream#parallel() で並列処理
Java8 の Stream#parallel() メソッドを使うと、簡単に並列処理を行うことができます。
1 2 3 4 5 6 7 8 9 10 11 12 |
IntStream.rangeClosed(1, 10).parallel().forEach(System.out::println); // マルチスレッドなので順不同で出力される 7 6 1 2 8 10 3 5 4 9 |
Stream#parallel() は要スレッドセーフ
Stream#parallel() で並列化されたストリームはマルチスレッドで処理されるため、map や filter などの中間操作、forEach などの終端操作がスレッドセーフではない場合に問題が発生します。
1 2 3 4 5 6 7 8 9 10 11 12 |
public static void main(String[] a) throws Exception { Set<String> set1 = new ConcurrentSkipListSet<>(); digestStream().parallel().map(String::new).forEach(set1::add); } // 1~10000までの数字文字列のSHA-1ハッシュ値のStreamを返す。 public static Stream<byte[]> digestStream() throws NoSuchAlgorithmException{ // パフォーマンスを稼ぐため、MessageDigestインスタンスを使い回す。 MessageDigest md = MessageDigest.getInstance("SHA-1"); // MessageDigest#digestはスレッドセーフではないので並列実行されるとまずい。 return IntStream.rangeClosed(1, 10000).mapToObj(i -> String.valueOf(i).getBytes()).map(i -> md.digest(i)); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at java.lang.System.arraycopy(Native Method) at sun.security.provider.DigestBase.engineUpdate(DigestBase.java:114) at sun.security.provider.SHA.implDigest(SHA.java:94) at sun.security.provider.DigestBase.engineDigest(DigestBase.java:181) at sun.security.provider.DigestBase.engineDigest(DigestBase.java:160) at java.security.MessageDigest$Delegate.engineDigest(MessageDigest.java:588) at java.security.MessageDigest.digest(MessageDigest.java:365) at com.techscore.Sample.lambda$1(Sample.java:28) at com.techscore.Sample$$Lambda$2/1406718218.apply(Unknown Source) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) |
Iteratorでも並列処理
Stream#parallel() を利用して、Iterator に対しても並列処理を行えます。
コレクションは Collection#stream() を使って簡単に Stream に変換できますが、Iterator の場合はちょっと手順が複雑です。
Iterator を一旦、Spliterator、あるいは Iterable なインスタンスに変換し、StreamSupport#stream() の第一引数に渡すことで Stream に変換することができます。
1 2 3 4 5 6 7 |
Iterator<Integer> iterator = Arrays.asList(1,2,3).iterator(); Stream<Integer> stream= StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); // こっちでもOK // Iterable<Integer> iterable = () -> iterator; // Stream<Integer> stream= StreamSupport.stream(iterable.spliterator(), false); |
Streamにさえなってしまえば、あとはparallelを呼ぶだけですね。
1 |
stream.parallel().forEach(System.out::println); |
Stream 処理時の効率を上げるために、可能であれば Spliterators.spliterator() によりサイズ指定を行ったほうが良いでしょう。
また、parallel を呼びださずとも、StreamSupport#stream() の第二引数にtrueを渡すだけで並列化可能な Stream を作成することができます。
1 2 3 |
List<Integer> list = Arrays.asList(1,2,3); Iterator<Integer> iterator = list.iterator(); Stream<Integer> stream = StreamSupport.stream(Spliterators.spliterator(iterator, list.size(), 0), false); |
あれ? Iterator ってスレッドセーフじゃないよね……
Iterator は内部に状態を持つためスレッドセーフではない筈。
ではなぜ Stream で並列実行が行えるのでしょうか。
先ほど、Iterator を Stream に変換する際に、一度 Spliterator に変換する方法を紹介しました。
この Spliterator の主な役割は、分割統治法アルゴリズムを利用するためにIteratorを分割することにあります。
Spliteraotr の分割は Spliterator#trySplit で行われますが、Iterator を元にした Spliterator は trySplit メソッドでの分割時にバッファリングされ、分割後は配列を使った Spliterator に変換されます。
そして、Spliterator はスレッドセーフではないため trySplit メソッドが並列実行されることはありません。
並列アルゴリズムでの有用性は明らかですが、スプリッテレータはスレッドセーフとは見なされません。
スプリッテレータを使用して並列アルゴリズムを実装する場合は、一度に1つのスレッドのみがスプリッテレータを使用するようにしてください。
http://docs.oracle.com/javase/jp/8/docs/api/java/util/Spliterator.html
つまり、Iteratorから作成されたStreamの場合、 Stream#parallel() で並列化されたストリームでは、Spliterator#trySplit を使ってIteratorにアクセスする際には並列処理は行われず、
中間操作と終端操作に対して並列に処理が行われることになります。
なお、Iterator から作成した Spliterator を使った Stream は Collection などから作成した Stream と比べて、並列処理の効率はかなり落ちます。
そもそも、ある程度要素数が多くないと並列処理すら行ってくれません。
並列処理されないStreamに変換する
Stream には並列実行を禁止する API は用意されていません。
そのため、サンプルコードにあった digestStream メソッドのように、public に公開するメソッドが非スレッドセーフな中間操作を含んだ Stream を返すのは危険です。
そんな時は、Stream から Spliterator を作成し、それを元に Stream を組み立てることで、並列処理可能な 並列処理が行われない Stream に変換することができます。
(Iteratorから作成した Spliterator と違って、中間操作を含む Stream の Stream#spliterator を元にした Stream は Stream#parallel を呼び出しても並列に処理されることはないようです)
1 2 3 4 5 6 7 8 9 10 11 12 |
public static void main(String[] a) throws Exception { Set<String> set1 = new ConcurrentSkipListSet<>(); digestStream().parallel().map(String::new).forEach(set1::add); } // 1~10000までの数字文字列のSHA-1ハッシュ値のStreamを返す。 public static Stream<byte[]> digestStream() throws NoSuchAlgorithmException { MessageDigest md = MessageDigest.getInstance("SHA-1"); Stream<byte[]> stream = IntStream.rangeClosed(1, 10000).mapToObj(i -> String.valueOf(i).getBytes()).map(i -> md.digest(i)); // map中間操作までを含んだSpliteratorを作成し、それをソースとしたStreamを返す。 return StreamSupport.stream(stream.spliterator(), false); } |
まとめ
Stream#parallel() を使うと、とても簡単に並列処理を行えます。
ただ、マルチスレッドで動作するのでスレッドセーフになるよう十分気をつける必要があります。
また、Stream を返すメソッドを作る場合、メソッド利用者側の呼び出し次第で並列処理が行われる可能性がある点に注意しましょう。