Log4j 2にも採用されたLMAX Disruptorはなぜ狂ったように速いのか?

dis

LMAXという会社はおそらくFX業者で、筆者はLMAXの開発者の講演を、InfoQの動画で何度か見たことがあった。
彼らは非常に特異な集団で、さしずめ「Javaのスピード狂」という感じだ。
印象的なのは、シングルスレッドで仕事を片付けることを強調している点だ。
「Javaならマルチスレッドで並列処理すれば性能が出ると広く思われているが、我々の仕事においてはシングルスレッドが最速だ」というような主張を何度も見た。
ゴールドマンサックスといいLMAXといい、やはり多額の金が動く会社でガチでJavaをやっている連中はカリカリにチューニングするため、技術的には非常に面白い。
彼らがコアのライブラリをOSS化してくれるというのは、金融業界を否定的な目で見る筆者からすると複雑だが、悔しいことに参考になる。

LMAX DisruptorはJavaのライブラリだ。Producer/Consumerパターンであるスレッドから別のスレッドへメッセージを渡す際のパフォーマンスを追求したものである。Log4j2のAsync実装に採用されているようだ。

Disruptorの速さの秘密はいくつかのブログで詳細に解説されているので、じっくり知りたい人はソースコードを読むとともにそちらを参照するとよいだろう。

  • ロックを極力排除し、Producerからのアクセスの一ヶ所でしか使用していない
  • データ構造としてリングバッファを使い、メッセージが格納されている領域のインデックスを各Consumer等がシーケンス番号として管理することで、ロックを減らしている

何とも、非常に頭が良い解決方法である。

シーケンス番号はどんどん上がっていくので、配列を用意すると無限の大きさの配列が必要となってしまう。しかしリングバッファをぐるぐる廻して、全Consumerが処理済みの領域を上書きして使っていくことでこの問題を解決している。

どんなConsumerが存在するのか?が中央で管理されているのが面白い。オブジェクト指向的にはイケていないが、ロックを排除するための工夫ということだろう。

2015/01/06追記
最後の「オブジェクト指向的にはイケていないが…」の部分に対して「別にイケていなくてもいいのでは」的な反応を頂いたので追記しておく。

まさしくその通りで、Disruptorはパフォーマンスを極めるためのものなので、「オブジェクト指向的にイケて」いなくてもOKである。
しかし実は筆者ではなく、LMAXの中の人が、この点について(かなり強く)言及しており、ここは大きなポイントだ。
オブジェクト指向的に綺麗に設計する(関心を分離する)ことにこだわることがパフォーマンス劣化(正確にはcontention)につながっている
と、http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdfに記述されている。

Further investigation and a focus on the computer science made us realise that the conflation of concerns inherent in
conventional approaches, (e.g. queues and processing nodes) leads to contention in multi-threaded implementations,
suggesting that there may be a better approach.

同様にこちらのブログの以下の部分も同様。

The ConsumerTrackingProducerBarrier has a list of all the Consumers that are accessing the ring buffer. Now to me this seemed a bit odd – I wouldn’t expect the ProducerBarrier to know anything about the consuming side. But wait, there is a reason. Because we don’t want the “conflation of concerns” a queue has (it has to track the head and tail which are sometimes the same point), our consumers are responsible for knowing which sequence number they’re up to, not the ring buffer. So, if we want to make sure we don’t wrap the buffer, we need to check where the consumers have got to.

筆者の書き方が悪かったのだが、つまり「オブジェクト指向的にきれいな設計を捨てることで性能向上を実現している」という、トレードオフの部分が非常に面白いのだ。(そのため「オブジェクト指向的にはイケていない」と書いたことは文脈的におかしくないし、むしろもっと強調してもよい部分だと考えている。)

ちなみに上記の指摘をされた方からは「アルゴリズム的な話に、オブジェクト指向をもってくるのがナンセンスなんです。」とも言われたのだが、上記PDFの図を見てもらえばわかるようにDisruptorは典型的なオブジェクト指向で書かれたソフトウェアであるので、Disruptorについて言及している本エントリにおいて「オブジェクト指向をもってくるのがナンセンス」と言われても困る。おそらく指摘された方はDisruptorのドキュメントやソース等は読まずに、本エントリだけに脊髄反射されただけだろうと思う。

Disruptorはいわば「守破離」的に、全体としてはオブジェクト指向に従いながら、オブジェクト指向で当たり前とされる「関心の分離」の一部だけをわざと崩すことで最高のパフォーマンスを実現しているところが素晴らしい。

並行はモデルで、並列は結果である(Concurrency is a model, parallelism is a result)

マルチスレッド・マルチコアプログラミングでよく話題になる「並行(Concurrency)と並列(Parallelism)」の関係について。
僕が今まで見た中で一番すっきりした説明は、Rob Pike先生の以下の動画。

http://www.infoq.com/interviews/pike-concurrency

この中で先生が一言で説明してくれている。

「並行はモデルで、並列は結果である(Concurrency is a model, parallelism is a result)」

これは次のような意味である。(…たぶん)

あるマルチスレッドな処理を行うアプリケーションを書いたとする。このコードを書いた時点で、(マルチスレッドで処理するようにモデルを決定したので、)既にConcurrentであることは決まった。

実行時に、実行する環境がシングルコアだったら、それは真にParallelには実行されない。
実行時に、実行する環境がマルチコアだったり、Hadoopのような分散環境であれば、真にParallelに実行されるだろう。

つまりConcurrencyとParallelismは相対する概念ではなく、ConcurrentかつParallelの場合もあるし、単にConcurrentの場合もある、ということ。
事前にConcurrentであるかどうかはモデルとして決定していて、実行時の環境によってParallelかどうかが決まる(結果となる)。

Apacheのアクセスログを爆速でLTSVに変換する

Apacheのアクセスログ(commonあるいはcombined形式)をLTSVに変換するツールをJavaで書いてみた。

使い方:
mcp.jarをダウンロードし、

cat access_log | java -cp mcp.jar net.jumperz.app.MCP.MCP net.jumperz.io.multicore.example.MCombined2LTSV

パースするコードそのものはこんな感じで普通。正規表現は使っていない。たぶんJava1.4以上でコンパイル可能。

こういう処理をさくっとマルチコア対応にするためのフレームワーク、MCPというのを作ったので、早速利用してみると、手元の4コアのマシンでは約140万行/秒(common形式のログ)という爆速で変換が終了する。dankogai氏が書かれているperlスクリプトより約20〜30倍速い(正規表現を使うかどうかという点が大きく関係していると思われるが)。

MCPについてはScutumブログに詳しく書いたので、参考まで。『テキストデータ処理を簡単にマルチコア対応にするJavaフレームワーク、MCPの実装

最近Rubyの勉強もしている。Ruby関連の本を読んでいると『使いづらい言語』の代表みたいにDISられ気味なJavaだが、この例のように、速度が簡単に出るあたり、すごく魅力がある言語だと思う。

Clojureでデッドロック


0x00. Clojureでデッドロックは起こるのか?


Clojureの並行プログラミングモデルはSTMとagentを中心とした非常に洗練されたものだとされていることがある。Javaでスレッドを生で扱う場合の危険性と対比され、Clojureは開発効率がよく危険性が少ない、と思っているユーザも多いだろう。しかし実際にはClojureのSTMは書き込み競合が発生した場合に非常に遅く、また実際の並行プログラミングのためにはSTMとagentだけではなく他の方法も覚える必要があり、それほどシンプルなものではない。

安全性についてはどうだろうか。Clojureではデッドロックは起こるのだろうか。結論を書いてしまうと、Clojureで書いたアプリケーションもデッドロックを起こす可能性がある。以下にいくつかの例をあげる。

0x01. lockingを使うパターン


ClojureにはJavaのsynchronizeのほぼ代用となるlockingマクロが存在している。lockingの引数として渡したオブジェクトに対するモニターを取得し、ボディ部の処理を行う。デッドロックの王道パターンの1つとして「複数のスレッドが2つ以上のモニターを逆順で取得しながら処理する」というものがあるが、これをClojureで書くと以下のようになり、デッドロックを起こす。

deadlock1.clj

(ns deadlock1)

(def _mutex1 (new Object))
(def _mutex2 (new Object))

(defn fn1[ _ ]
  (while true
    (locking _mutex1
      (println "sleep 1")
      (Thread/sleep (rand-int 10))
      (locking _mutex2
        (println "sleep 2")
        (Thread/sleep (rand-int 10))
      )
    )
  )
)

(defn fn2[ _ ]
  (while true
    (Thread/sleep (rand-int 10))
    (locking _mutex2
      (println "sleep 3")
      (Thread/sleep (rand-int 10))
      (locking _mutex1
        (println "sleep 4")
        (Thread/sleep (rand-int 10))
      )
    )
  )
)

(send-off (agent nil) fn1)
(send-off (agent nil) fn2)

このときのスレッドダンプは以下のようになる。

"pool-2-thread-2" prio=10 tid=0x0000000040e22000 nid=0x42d6 waiting for monitor entry [0x00007f43ba4c0000..0x00007f43ba4c09f0]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at deadlock1$fn2$fn__12.invoke(deadlock1.clj:24)
	- waiting to lock  (a java.lang.Object)
	- locked  (a java.lang.Object)
	at deadlock1$fn2.invoke(deadlock1.clj:22)
	at clojure.lang.AFn.applyToHelper(AFn.java:163)
	at clojure.lang.AFn.applyTo(AFn.java:151)
	at clojure.lang.Agent$Action.doRun(Agent.java:100)
	at clojure.lang.Agent$Action.run(Agent.java:150)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

"pool-2-thread-1" prio=10 tid=0x0000000040b29800 nid=0x42d5 waiting for monitor entry [0x00007f43ba5c1000..0x00007f43ba5c1a70]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at deadlock1$fn1$fn__7.invoke(deadlock1.clj:10)
	- waiting to lock  (a java.lang.Object)
	- locked  (a java.lang.Object)
	at deadlock1$fn1.invoke(deadlock1.clj:8)
	at clojure.lang.AFn.applyToHelper(AFn.java:163)
	at clojure.lang.AFn.applyTo(AFn.java:151)
	at clojure.lang.Agent$Action.doRun(Agent.java:100)
	at clojure.lang.Agent$Action.run(Agent.java:150)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

この例はソースコードを見るだけでデッドロックの可能性を発見できるが、実際のケースではこの例ほど単純ではないことが多い。関数呼び出しの階層が深くなる中で、意識せずにロックを取得しながら別の関数を呼び出すことで問題に繋がることが多いだろう(これはClojureだけではなく他のロックを用いる言語でも同様である)。そのため、このパターンのデッドロックは実際に発生することが考えられ、注意が必要となる。lockingを使わずに済めば、それが一番の対策だ。lockingを使う必要がある場合には、取得する順番を決めておく必要がある(この順番を言語で制御できればよいのだが…)。

0x02. awaitを使うパターン


agentの実行を待つためのawaitはブロックする。そのため2つのagentがお互いにawaitするとデッドロックを引き起こすことがある。

deadlock2.clj

(ns deadlock2)

(def _agent1 (agent 1))
(def _agent2 (agent 1))

(defn fn1 [ _ ]
  (Thread/sleep 1000)
  (await _agent2)
  (println 1)
)

(defn fn2 [ _ ]
  (Thread/sleep 500)
  (await _agent1)
  (println 2)
)

(send-off _agent1 fn1)
(send-off _agent2 fn2)

このときのスレッドダンプは以下のようになる。

"pool-2-thread-2" prio=10 tid=0x0000000041bc4c00 nid=0x5b28 waiting on condition [0x00007ffedf571000..0x00007ffedf571a70]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
	at java.lang.Thread.run(Thread.java:619)

"pool-2-thread-1" prio=10 tid=0x0000000041bc6400 nid=0x5b27 waiting on condition [0x00007ffedf672000..0x00007ffedf6729f0]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
	at java.lang.Thread.run(Thread.java:619)

このようなデッドロックを避けるためにはawait-forを使用しタイムアウトさせればよい。しかしその場合、タイムアウト時のエラー処理を行わなければいけないために、若干シンプルさが欠けてしまうというデメリットが存在する。

0x03. futureを使うパターン


futureのderefはブロックするので、0x02項と同じようにデッドロックを引き起こす可能性がある。

deadlock3.clj

(ns deadlock3)

(def _ref1 (ref 10))
(def _ref2 (ref 20))

(defn fn1 []
  (Thread/sleep 1000)
  (println 1)
  (println (deref (deref _ref2)))
  (println 2)
)

(defn fn2 []
  (Thread/sleep 500)
  (println 3)
  (println (deref (deref _ref1)))
  (println 4)
)

(dosync
  (ref-set _ref1 (future (fn1)))
  (ref-set _ref2 (future (fn2)))
)

このときのスタックトレースは以下のようになる。

"pool-2-thread-2" prio=10 tid=0x00007f09ac38f400 nid=0x7421 waiting on condition [0x00007f09b2c00000..0x00007f09b2c00d70]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.FutureTask$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:218)
	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
	at clojure.core$future_call$reify__5494.deref(core.clj:5399)
	at clojure.core$deref.invoke(core.clj:1765)
	at deadlock3$fn2.invoke(deadlock3.clj:16)
	at deadlock3$eval9$fn__10$fn__13.invoke(deadlock3.clj:22)
	at clojure.lang.AFn.call(AFn.java:18)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

"pool-2-thread-1" prio=10 tid=0x00007f09ac390800 nid=0x7420 waiting on condition [0x00007f09b2d01000..0x00007f09b2d01cf0]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.FutureTask$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:218)
	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
	at clojure.core$future_call$reify__5494.deref(core.clj:5399)
	at clojure.core$deref.invoke(core.clj:1765)
	at deadlock3$fn1.invoke(deadlock3.clj:9)
	at deadlock3$eval9$fn__10$fn__11.invoke(deadlock3.clj:21)
	at clojure.lang.AFn.call(AFn.java:18)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

このようなデッドロックを避けるための対策として、futureの処理が終了しているのかどうかを調べるfuture-done?を使うことができる。

0x04. promiseを使うパターン


promiseはデッドロック製造装置かと思われるほどの危険な機能である。以下に例を示す。

deadlock4.clj

(ns deadlock4)

(def _promise (promise))
(println @_promise)

このときのスタックトレースは以下のようになる。

"main" prio=10 tid=0x0000000041442800 nid=0x6282 waiting on condition [0x00007fd6f435b000..0x00007fd6f435beb0]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
	at clojure.core$promise$reify__5536.deref(core.clj:5513)
	at clojure.core$deref.invoke(core.clj:1765)
	at deadlock4$eval7.invoke(deadlock4.clj:4)
	at clojure.lang.Compiler.eval(Compiler.java:5419)
	at clojure.lang.Compiler.load(Compiler.java:5852)
	at clojure.lang.RT.loadResourceScript(RT.java:340)
	at clojure.lang.RT.loadResourceScript(RT.java:327)
	at clojure.lang.RT.loadResourceScript(RT.java:319)
	at clojure.main$load_script.invoke(main.clj:220)
	at clojure.main$script_opt.invoke(main.clj:273)
	at clojure.main$main.doInvoke(main.clj:354)
	at clojure.lang.RestFn.invoke(RestFn.java:409)
	at clojure.lang.Var.invoke(Var.java:365)
	at clojure.lang.AFn.applyToHelper(AFn.java:163)
	at clojure.lang.Var.applyTo(Var.java:482)
	at clojure.main.main(main.java:37)

promiseを正しく使うためのコーディングは難しく、危険性はJavaで生のスレッドAPIを使用するのと変わらない(waitとnotifyの関係に似ている)。そのため、特殊な場合にのみ使うようにするのがよいだろう。例えば並列処理を行うための少し低レベルなライブラリを作るなどの場合に、十分なテストとともに使用するようなケースが考えられる。

0x05. まとめ


このように、Clojureでもデッドロックは起こりえる。他にもパターンがあれば追記していきたいので、コメント欄やtwitter(@kinyuka)で教えていただければありがたい。

ClojureのSTMは使い物にならない

 

0x00. Clojureがいけてる件について


ここ数ヶ月でClojureをどんどん実戦投入してみているが、その成果は素晴らしいの一言に尽きる。Javaでは考えられなかったほどスマートかつ柔軟にデータ処理が可能であり、「あれ、こんなに短い記述でできちゃうのか!」と驚かされることが多い。そんなわけで、何でもかんでもJavaで片付けてきた筆者はここにきてClojureにかなり惚れ込んでおり、電子書籍やらウェブサイトやらで本格的に情報収集を進めているのだが…

 

0x01. Clojureの並列プログラミング


現時点では、Clojureを実戦投入したのは、ちょっとした処理に使うツール的なものだけである。理由は単に、筆者がまだClojureの初心者だからだ。しかしそろそろメインの仕事であるサーバアプリケーションやウェブアプリケーションでも使いたくてウズウズしてきており、そのような視点からさらに調査を進めている。

サーバアプリケーションやウェブアプリケーションでは並列プログラミング的なことをよくやるのだが、ClojureではこれはSTMを使って処理するのが王道のようだ。Clojureの解説を行う本などでは、STMは従来のJavaのロック地獄から解放してくれる救世主的な扱いをされており、筆者としては非常に期待していた。Javaでのマルチスレッドプログラミングは確かになかなか難しい部分があり、コードレビューなどで問題点を見つけ出す技術には経験に基づく勘が要求される。単に並列プログラミングを行いたいだけなのにメモリバリアーが〜とか言われるため、ある種バッドノウハウ的な側面がある(筆者は個人的に大好きだが)。

ClojureのSTMはぱっと見た感じでは拍子抜けするほどシンプルであり、「これで本当に動くなら、今までのJavaの並列プログラミングは何だったんだ…」と思う仕上がりになっている。

 

0x02. 遅いらしい


筆者の見落としである可能性もあるが、Clojureの解説を行う本(Programming ClojureとPractical Clojure。ついでに現在The Joy of Clojureを読んでいるところ)では、STMのパフォーマンスに関する記述はなかったように思う。そのため、普通に読んでいる読者は、間違いなく「並列プログラミングが必要とされる場面ではSTMを使えばよい」と考えるだろう。筆者もそう思っていたのだが、ある日Clojureに的を絞っているわけではない書籍である「Programming Concurrency on the JVM」を読んでいたところ、「STMはReadが多く、Writeが少ない場面だけにしておくべし」的な記述があって驚いた。Clojureでは基本的に並列プログラミングにはSTMしか選択肢がない(もちろんClojureからJavaのスレッドを生成したりすることもできるが、それならばsynchronized等の文法がサポートされている生のJavaを使った方が百倍開発しやすいため、却下)のに、書き込み競合が多い場面ではSTMが使えないのでは、開発が成り立たないのではと思ってしまうが…。この書籍内では実際にSTMが遅くて使えないケースをソースコード付きで掲載している。

そんなわけで、単純な例でベンチマークを取ってみることにした。

 

0x03. ベンチマークの内容


マップをひとつ生成する。このマップに対して多数のスレッドから書き込み競合が多く発生するようなアクセスを行う。具体的には、各スレッド内でランダムにキー(100種類のうちの1つ)を生成し、そのキーに対応する値を取得する。値は数値とする。この数値をひとつインクリメントし、再びマップにセットする。マップ内にキーが存在していない場合には1をセットする。スレッドは300個生成し、上記のインクリメント動作はそれぞれのスレッドで10万回ずつおこなう。これをJavaとClojureでそれぞれ記述したものが以下である。

Test2.java

package test;

import java.util.*;

public class Test2
{
public static final int _keyRange = 100;
public static final int _threadCount = 300;
public static final int _repeat = 100000;
//--------------------------------------------------------------------------------
public static void main( String[] args )
throws Exception
{
final long start = System.currentTimeMillis();
final Map map = new HashMap();

Runtime.getRuntime().addShutdownHook( new Thread( new Runnable()
  {
  public void run()
    {
    System.out.println( System.currentTimeMillis() - start );
    System.out.println( map );
    }
  } ) );

for( int i = 0; i < _threadCount; ++i )
  {
  new Thread( new Runnable()
    {
    public void run()
      {
      for( int k = 0; k < _repeat; ++k )
        {
        Random random = new Random();
        String key = "key" + random.nextInt( _keyRange );
        
        synchronized( map )
          {
          Integer value = ( Integer )map.get( key );
          if( value != null )
            {
            int oldValue = value.intValue();
            map.put( key, new Integer( oldValue + 1 ) );
            }
          else
            {
            map.put( key, new Integer( 1 ) );
            }
        
          }
        
        }
      }
    } ).start();
  }
}
//--------------------------------------------------------------------------------
}

bench.clj(注:間違いあり。修正版は0x06項目を参照)

(ns bench)
(import '(java.util Random))

(def _keyRange 100)
(def _threadCount 300)
(def _repeat 100000)
(def mapref1 (ref {}))
(def _start (System/currentTimeMillis))
(def _agents (seq (repeat _threadCount (agent nil))))

(defn fn1 []
  (let
    [
     _random (new Random )
     _key (str "key" (.nextInt _random _keyRange))
    ]
    
    (dosync
      (let [ _oldValue (get @mapref1 _key) ]
        (if (nil? _oldValue)
          (alter mapref1 assoc _key 1)
          (alter mapref1 assoc _key (+ 1 _oldValue))
        )
      )
    )
  )
)

(defn fn2 [ dummy ]
  (dotimes [ _index _repeat ]
    (fn1)
  )
)

(doseq [ _agent _agents ]
  (send-off _agent fn2)
)

(defn fn3 []
  (println (- (System/currentTimeMillis) _start))
  (println @mapref1)
)

(doseq [ _agent _agents ]
  (await _agent)
)

(fn3)
(shutdown-agents)
(System/exit 0)

 

0x04. ベンチマーク結果


この2つのプログラムを実行してみると、Java版は6〜7秒程度で終わるが、Clojure版は60秒程度かかる。Clojure版はざっと10倍程度遅いことになる。(これは書き込み競合が極端に多い例であり、ClojureのSTMが最も苦手とするケースではあることに注意が必要だ。Writeが少なく、Readが多いケースでは、Javaより速い場合もあるかもしれない。)

 

0x05. STMは使えないと思う理由


筆者はこの結果を受けて、「ClojureのSTMは使えない」と判断した。

「Readが多いケースならば、STMもよいのでは」という意見もあるかもしれない。しかし開発している最中に、いちいち「この部分の競合ではReadが多いか?」などと考えるのはナンセンスだ。

また、サーバアプリケーションに予想以上のアクセスが集中するなどのケースで、「読み込みが大部分だろう」と思っていた箇所で書き込み競合が多く発生してしまったら悲惨なことになってしまうかもしれない。そして、そもそも書き込み競合が多い場面で使えないのでは、別の技術も勉強する必要があり、無駄である。

さらにもう一つの理由として、「正しいコードかどうか」の判断を付けることができないという点があげられる。Javaのロックベースのマルチスレッドプログラミングは、確かに落とし穴が多い。しかしFindBugsのようなツールやコードレビューなどによって、デッドロック等の問題が発生する可能性などを見つけることができる。また、実際にデッドロックが起こってしまった場合などにも、問題点がはっきりする。問題点が見つかれば、それを修正していくことで、「正しいコード」に近づく。そしてある時点で、ほぼ確実に正しく動作するコードになった、と自信を持つことができるようになる。そしてそのコードは速い。

ClojureのSTMでは、実際にアプリケーションを完成させてテストするまで、「性能がでるかどうか」つまり「正しいコードかどうか」を判断することができない。サーバアプリケーションではしばしばテスト時点では想像もできない複雑なパターンのアクセス集中が発生することが予想されるため、いつまでたっても「このコードでいける!」と自信を持つことができなくなるだろう。また、問題があった場合の修正はアプリケーションの作り自体の変更になる可能性があり、Javaでのデッドロックの修正よりもよほど大がかりになる可能性がある。これはかなりの茨の道になるだろう。

この筆者の判断は現時点でのものなので、数年後にSTMが爆速になっていたりすれば、もちろん使ってみるつもりだ。また、性能が問題にならないようなちょっとした処理であればもちろんSTMは便利に使えるだろう。

 

0x06. 追記(コードのミスを修正)


コメント欄でTakahiro Hozumiさんより非常に有意義なコメントをいただいた(ありがとうございます!)。やはりSTMのパフォーマンスには問題があるらしい。また、上記の筆者のClojureコードにはミスがあり、実際には300スレッドを生成できていないことがわかった。

修正版を作成したので以下に掲載する。

bench.fixed.clj

29行目をコメントアウトすると、確かに300スレッド生成できていることが確認できる(また、ps -eLf等のコマンドでも確認済み)。

問題のパフォーマンスだが、修正後のコードを使って300スレッド生成してみるとさらに遅くなることがわかり、Javaより10倍どころではなく、恐ろしく遅くなることがわかった。そのため、0x05で示した本稿の結論的なものは変わらない形となる。

 

0x07. さらに追記


Clojureでは(今回のテストのように)単純に多くのスレッドを生成したいだけの場合にはagentの使用は不適切であるとコメント欄で指摘していただいた。このような場合にはagentの使用には大きなオーバーヘッドがあるようだ。The Joy of Clojureの11章がこのあたりに触れた説明となっており、とても参考になる。

本稿で目的としているテストを正しく行うためのコードはTakahiro Hozumiさんが作成してくれたhttps://gist.github.com/3048d90328d3118583a4の(ref-bench)であり、パフォーマンスはJavaの10倍程度遅いということになるようだ。

少し話題が反れるが、Programming ClojureやPractical Clojureを読んだ印象はまさに「Clojure=シンプル」であったのだが、The Joy of Clojureの11章の印象は(悪い意味で)かなり違う。現実的にはそれほどシンプルな記述で並行プログラミングを実現できる、というわけではなさそうだ。lockingなどは普通にデッドロックを起こす可能性があるように思えるためClojureらしくないと感じる。

 

0x08. またまた追記


詳しくはコメント欄を参照していただきたいが、最終的には以下のようなことになるようだ。

  • Agentを使ってスレッドを作ること自体のオーバーヘッドはそれほど大きくない
  • (書き込み競合が多い場合の)STMは非常に重く、Javaの10倍どころではない時間を要する

 

0x09. STMを使わない方法

下記教えて頂きました。参考まで。