Scala Parallels Collections

この記事はScala Advent Calendar jp 2010 : ATNDの24日目です。

概要

Scala2.9ではParallel Collectionsが使えるようになります。
パソコンのCPUがコアいくつもあるんだからCollectionも並列処理したいですよね。
詳しくは、Index of /node/138/140を見ると良いでしょう。

準備

Scala2.9をビルドします

Day17に登場した、エルシャダイにパクr、、、インスパイヤしたち○こで有名な@yuroyoroさんの
Scalaのtrunkをビルドする - ( ꒪⌓꒪) ゆるよろ日記を参考に、githubからScalaのtrunkを落としてビルドします。
面倒なのでREPLで試します。
javaだと、RuntimeのavailableProcessorsでJVMが使えるプロセッサ数が取得できるのですが、scala.collection.parallelのパッケージオブジェクトにavailableProcessorsが定義されていました。

mbp13:bin kiyoshi$ ./scala
Picked up _JAVA_OPTIONS: -Dfile.encoding=UTF-8
Welcome to Scala version 2.9.0.r23831-b20101223153340 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_22).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.parallel
import collection.parallel

scala> parallel.availableProcessors
res0: Int = 2

うちのMacBook ProCore2 Duoなので2個使えるわけですね。
では、試してみましょう。最初にfor式を使って普通に評価します。
処理時間を計るためにBenchmarkを使って10回計測します。

scala> import testing.Benchmark
import testing.Benchmark

scala> object bench extends Benchmark {
     |   def run = for(i <- 0 to 1000) Thread.sleep(10)
     | }
defined module bench

scala> bench.runBenchmark(10)
res0: List[Long] = List(10166, 10156, 10157, 10155, 10156, 10155, 10167, 10156, 10156, 10157)

大体1回で10秒くらいかかってますね。
次に0 to 1000だったところを(0 to 1000).parと置き換えて実行します。

scala> import collection.parallel
import collection.parallel

scala> object pbench extends Benchmark {
     |   def run = for(i <- (0 to 1000).par) Thread.sleep(10)
     | }
defined module pbench

scala> pbench.runBenchmark(10)
res1: List[Long] = List(5147, 5078, 5078, 5078, 5077, 5078, 5076, 5078, 5078, 5077)

お、大体1回5秒くらいになりました!availableProcessorsが2なので期待通りです。

parってなに?

なんでこうなるのでしょうか?以下のようにすると、0 to 1000はRange.Inclusiveであることがわかります。

scala> 0 to 1000
res2: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, ...

(0 to 1000).parは、ParRangeのインスタンスを返します。

scala> (0 to 1000).par
res3: scala.collection.parallel.immutable.ParRange = ParRange(0, 1, 2, 3, ...

ParRangeがParallel対応のRangeなのですね。他にも

  • ParArray
  • ParSet
  • ParMap

などが追加されています。
ところでparってなんでしょうか?
parはscala.collection.Parallelizableで定義されているメソッドです。

trait Parallelizable[+ParRepr <: Parallel] {

  /** Returns a parallel implementation of a collection.
   */
  def par: ParRepr

}

これを各クラスなどで実装するわけです。たとえばRangeでは

def par = new ParRange(this)

のように実装されています。

まとめ

これだけのことでプロセッサ数に対応した並列処理が出来るのは便利ですね。
for式について書きましたが、もちろんそれ以外のメソッドも並列処理してくれるようになっています。
並列処理するためのコストもありますので、並列処理すれば必ず早くなるわけではないので注意が必要です。