Clojureでデッドロック


0x00. Clojureでデッドロックは起こるのか?


Clojureの並行プログラミングモデルはSTMとagentを中心とした非常に洗練されたものだとされていることがある。Javaでスレッドを生で扱う場合の危険性と対比され、Clojureは開発効率がよく危険性が少ない、と思っているユーザも多いだろう。しかし実際にはClojureのSTMは書き込み競合が発生した場合に非常に遅く、また実際の並行プログラミングのためにはSTMとagentだけではなく他の方法も覚える必要があり、それほどシンプルなものではない。

安全性についてはどうだろうか。Clojureではデッドロックは起こるのだろうか。結論を書いてしまうと、Clojureで書いたアプリケーションもデッドロックを起こす可能性がある。以下にいくつかの例をあげる。

0x01. lockingを使うパターン


ClojureにはJavaのsynchronizeのほぼ代用となるlockingマクロが存在している。lockingの引数として渡したオブジェクトに対するモニターを取得し、ボディ部の処理を行う。デッドロックの王道パターンの1つとして「複数のスレッドが2つ以上のモニターを逆順で取得しながら処理する」というものがあるが、これをClojureで書くと以下のようになり、デッドロックを起こす。

deadlock1.clj

(ns deadlock1)

(def _mutex1 (new Object))
(def _mutex2 (new Object))

(defn fn1[ _ ]
  (while true
    (locking _mutex1
      (println "sleep 1")
      (Thread/sleep (rand-int 10))
      (locking _mutex2
        (println "sleep 2")
        (Thread/sleep (rand-int 10))
      )
    )
  )
)

(defn fn2[ _ ]
  (while true
    (Thread/sleep (rand-int 10))
    (locking _mutex2
      (println "sleep 3")
      (Thread/sleep (rand-int 10))
      (locking _mutex1
        (println "sleep 4")
        (Thread/sleep (rand-int 10))
      )
    )
  )
)

(send-off (agent nil) fn1)
(send-off (agent nil) fn2)

このときのスレッドダンプは以下のようになる。

"pool-2-thread-2" prio=10 tid=0x0000000040e22000 nid=0x42d6 waiting for monitor entry [0x00007f43ba4c0000..0x00007f43ba4c09f0]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at deadlock1$fn2$fn__12.invoke(deadlock1.clj:24)
	- waiting to lock  (a java.lang.Object)
	- locked  (a java.lang.Object)
	at deadlock1$fn2.invoke(deadlock1.clj:22)
	at clojure.lang.AFn.applyToHelper(AFn.java:163)
	at clojure.lang.AFn.applyTo(AFn.java:151)
	at clojure.lang.Agent$Action.doRun(Agent.java:100)
	at clojure.lang.Agent$Action.run(Agent.java:150)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

"pool-2-thread-1" prio=10 tid=0x0000000040b29800 nid=0x42d5 waiting for monitor entry [0x00007f43ba5c1000..0x00007f43ba5c1a70]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at deadlock1$fn1$fn__7.invoke(deadlock1.clj:10)
	- waiting to lock  (a java.lang.Object)
	- locked  (a java.lang.Object)
	at deadlock1$fn1.invoke(deadlock1.clj:8)
	at clojure.lang.AFn.applyToHelper(AFn.java:163)
	at clojure.lang.AFn.applyTo(AFn.java:151)
	at clojure.lang.Agent$Action.doRun(Agent.java:100)
	at clojure.lang.Agent$Action.run(Agent.java:150)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

この例はソースコードを見るだけでデッドロックの可能性を発見できるが、実際のケースではこの例ほど単純ではないことが多い。関数呼び出しの階層が深くなる中で、意識せずにロックを取得しながら別の関数を呼び出すことで問題に繋がることが多いだろう(これはClojureだけではなく他のロックを用いる言語でも同様である)。そのため、このパターンのデッドロックは実際に発生することが考えられ、注意が必要となる。lockingを使わずに済めば、それが一番の対策だ。lockingを使う必要がある場合には、取得する順番を決めておく必要がある(この順番を言語で制御できればよいのだが…)。

0x02. awaitを使うパターン


agentの実行を待つためのawaitはブロックする。そのため2つのagentがお互いにawaitするとデッドロックを引き起こすことがある。

deadlock2.clj

(ns deadlock2)

(def _agent1 (agent 1))
(def _agent2 (agent 1))

(defn fn1 [ _ ]
  (Thread/sleep 1000)
  (await _agent2)
  (println 1)
)

(defn fn2 [ _ ]
  (Thread/sleep 500)
  (await _agent1)
  (println 2)
)

(send-off _agent1 fn1)
(send-off _agent2 fn2)

このときのスレッドダンプは以下のようになる。

"pool-2-thread-2" prio=10 tid=0x0000000041bc4c00 nid=0x5b28 waiting on condition [0x00007ffedf571000..0x00007ffedf571a70]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
	at java.lang.Thread.run(Thread.java:619)

"pool-2-thread-1" prio=10 tid=0x0000000041bc6400 nid=0x5b27 waiting on condition [0x00007ffedf672000..0x00007ffedf6729f0]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
	at java.lang.Thread.run(Thread.java:619)

このようなデッドロックを避けるためにはawait-forを使用しタイムアウトさせればよい。しかしその場合、タイムアウト時のエラー処理を行わなければいけないために、若干シンプルさが欠けてしまうというデメリットが存在する。

0x03. futureを使うパターン


futureのderefはブロックするので、0x02項と同じようにデッドロックを引き起こす可能性がある。

deadlock3.clj

(ns deadlock3)

(def _ref1 (ref 10))
(def _ref2 (ref 20))

(defn fn1 []
  (Thread/sleep 1000)
  (println 1)
  (println (deref (deref _ref2)))
  (println 2)
)

(defn fn2 []
  (Thread/sleep 500)
  (println 3)
  (println (deref (deref _ref1)))
  (println 4)
)

(dosync
  (ref-set _ref1 (future (fn1)))
  (ref-set _ref2 (future (fn2)))
)

このときのスタックトレースは以下のようになる。

"pool-2-thread-2" prio=10 tid=0x00007f09ac38f400 nid=0x7421 waiting on condition [0x00007f09b2c00000..0x00007f09b2c00d70]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.FutureTask$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:218)
	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
	at clojure.core$future_call$reify__5494.deref(core.clj:5399)
	at clojure.core$deref.invoke(core.clj:1765)
	at deadlock3$fn2.invoke(deadlock3.clj:16)
	at deadlock3$eval9$fn__10$fn__13.invoke(deadlock3.clj:22)
	at clojure.lang.AFn.call(AFn.java:18)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

"pool-2-thread-1" prio=10 tid=0x00007f09ac390800 nid=0x7420 waiting on condition [0x00007f09b2d01000..0x00007f09b2d01cf0]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.FutureTask$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:218)
	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
	at clojure.core$future_call$reify__5494.deref(core.clj:5399)
	at clojure.core$deref.invoke(core.clj:1765)
	at deadlock3$fn1.invoke(deadlock3.clj:9)
	at deadlock3$eval9$fn__10$fn__11.invoke(deadlock3.clj:21)
	at clojure.lang.AFn.call(AFn.java:18)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)

このようなデッドロックを避けるための対策として、futureの処理が終了しているのかどうかを調べるfuture-done?を使うことができる。

0x04. promiseを使うパターン


promiseはデッドロック製造装置かと思われるほどの危険な機能である。以下に例を示す。

deadlock4.clj

(ns deadlock4)

(def _promise (promise))
(println @_promise)

このときのスタックトレースは以下のようになる。

"main" prio=10 tid=0x0000000041442800 nid=0x6282 waiting on condition [0x00007fd6f435b000..0x00007fd6f435beb0]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for   (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
	at clojure.core$promise$reify__5536.deref(core.clj:5513)
	at clojure.core$deref.invoke(core.clj:1765)
	at deadlock4$eval7.invoke(deadlock4.clj:4)
	at clojure.lang.Compiler.eval(Compiler.java:5419)
	at clojure.lang.Compiler.load(Compiler.java:5852)
	at clojure.lang.RT.loadResourceScript(RT.java:340)
	at clojure.lang.RT.loadResourceScript(RT.java:327)
	at clojure.lang.RT.loadResourceScript(RT.java:319)
	at clojure.main$load_script.invoke(main.clj:220)
	at clojure.main$script_opt.invoke(main.clj:273)
	at clojure.main$main.doInvoke(main.clj:354)
	at clojure.lang.RestFn.invoke(RestFn.java:409)
	at clojure.lang.Var.invoke(Var.java:365)
	at clojure.lang.AFn.applyToHelper(AFn.java:163)
	at clojure.lang.Var.applyTo(Var.java:482)
	at clojure.main.main(main.java:37)

promiseを正しく使うためのコーディングは難しく、危険性はJavaで生のスレッドAPIを使用するのと変わらない(waitとnotifyの関係に似ている)。そのため、特殊な場合にのみ使うようにするのがよいだろう。例えば並列処理を行うための少し低レベルなライブラリを作るなどの場合に、十分なテストとともに使用するようなケースが考えられる。

0x05. まとめ


このように、Clojureでもデッドロックは起こりえる。他にもパターンがあれば追記していきたいので、コメント欄やtwitter(@kinyuka)で教えていただければありがたい。


異種RDBMS間のレプリケーションを可能にするSymmetric DS

0x00. Symmetric DSとは



Symmetric DSはオープンソースのJava製ソフトウェアで、トリガーベースのRDBMS間レプリケーションを行うソフトウェアである。ライセンスはLGPLだ。先日、Java製の組み込みRDBMSであるH2をレプリケーションする方法がないか探していたところ、H2のサイトからリンクされており見つけた。筆者自身はまだH2/Symmetric DSともに手元で動作確認を行っただけの段階であり、これらがどの程度実用に耐えるクオリティなのかは未知数だが、ドキュメントの整備のされ方やソフトウェアから伝わってくる感触は非常によいものである。

0x01. POSシステムから誕生


RDBMS自身がレプリケーションの機能を持つ場合も多いが、筆者にもっとも馴染みのあるPostgreSQLではレプリケーションのためにさらに別のソフトウェア(pgpool IIなど)を導入するケースがよく見られる。Symmetric DSもそのような「レプリケーション機能を持たないRDBMSでもレプリケーションを可能にする」ものだ。サイトのトップページに書いてあるように、大規模なPOSシステム間でのデータ転送のニーズから開発されたというやや異色なバックグラウンドを持つ。そのため単に2つのRDBMS間のデータを同期するだけではなく、複数の階層にまたがって多数のRDBMS間でのデータ転送も行うことができる。また、遅い回線やたまにオフラインになるような信頼性の低い回線をまたいだデータ転送も考慮されているようだ。

0x02. サポートするRDBMS


ドキュメントによるとMySQL/Oracle/PostgreSQLなどの主要なRDBMSのほか、HSQLDBやDerby,H2などのJava製組み込みRDBMSもサポートしている(ただしSQLiteはサポート外のようだ)。トリガーベースのアプローチを採るため、トリガーをサポートしており、さらにJDBCがサポートされていれば、Symmetric DSが使用できる可能性があるようだ。

非常にユニークなのは、トリガーベースの動作であるため、異なる種類のRDBMS間でのレプリケーションが可能であるということだ。これにより、例えばJavaアプリケーション内で動作しているH2のデータを、ネットワークを通じて、別の場所にある大規模サーバのPostgreSQLに同期しておき、データ解析等の重い処理はサーバ側で行う、ようなことが可能となる。例えば将来、Android端末内のH2をインターネット上のPostgreSQLに同期するようなアプリケーションが出てくるかもしれない。

0x03. 実際にさわった感触


筆者はドキュメントどおりのハンズオンを、以下の2パターンで行ってみた。

  • PostgreSQL – PostgreSQL間
  • PostgreSQL – H2間
  • どちらも特にハマることもなく、20〜50分ほどで動作確認までおこなうことができた。設定に不備がある場合にはエラーが出力されるが、この際わかりやすいエラーメッセージを出してくれることもあれば、Javaの例外のスタックトレースが大量に出て少々まいることもあった。レプリケーション対象とするRDBMSについての経験はもちろん、Javaの経験や、Springに関する知識があれば特に役に立つだろう。また、チュートリアルに取りかかる前に必ずこちらのうち、使うRDBMSの項目を読んでおくことをオススメする。

    H2とPostgreSQLという異なるRDBMS間で、双方向にデータがコピーされるのを確認できたときには、なかなかに新鮮でニヤリとさせられた。異種RDBMS間でもこのようなレプリケーションが可能であるということで、今後のシステム設計の際に柔軟な思考が可能となりそうだ。MongoDBなどもサポートされれば非常に楽しそうなのだが、MongoDBではトリガーがサポートされていないので厳しいかもしれない。

    0x04. Javaでの拡張が可能


    ドキュメントにあるように、Javaを使ってプラグイン的に機能を追加できるような仕組みが用意されている。おそらく実際に現場で使うことを強く想定したものとなっているのだろうと思われる。

    Symmetric DSの内部ではSpringフレームワークが使用されており、また配布アーカイブのlibディレクトリには多くのJDBCコネクタやJetty、JMS、JUnitなどのjarファイルが含まれている。おそらくJavaに非常に詳しい開発チームが存在しているのではないかと思う。

    0x05. おわりに


    個人的な意見として、著者が使い込んでいないソフトウェアの紹介記事はあまり面白くないという考えを持っている。この記事はその趣旨に反するのだが、Symmetric DSは発想からして面白いのにあまり紹介されていなさそうなので、このような記事を書いてみた。

    筆者自身は、RDBMSよりもMongoDBの(柔軟なスキーマ変更を可能とする)方向に向かいたいと考えており、実際の環境でこれからSymmetric DSを使う日が来るかどうかは定かではないが、いつかまたレプリケーションを行う必要があるケースに遭遇した場合には候補として挙げることになるだろう。