ClojureでPostgreSQLのデータをMongoDBに放り込む

細かいことはさておき、とりあえずPostgreSQLのデータをMongoDBにぶち込みたい場合に使うためのClojureスクリプトを書いた。

pg2mongo.clj

(ns pg2mongo
  (:import
    (java.sql Connection DriverManager)
    (java.util ArrayList)
    (com.mongodb Mongo DBCollection BasicDBObject WriteConcern)
  )
)

; PostgreSQL config
(def _dbmsUser "joe")
(def _dbmsPass "joe")
(def _dbmsUrl "jdbc:postgresql:foo")

; MongoDB config
(def _mongoDBName "foo")
(def _mongoHost "127.0.0.1")

(.newInstance (Class/forName "org.postgresql.Driver" ))

(defn _copyData [ _conn _tableName _coll ]
  (let
    [
    _rs (.executeQuery (.prepareStatement _conn (str "select * from " _tableName) ) )
    _md (.getMetaData _rs)
    _columnCount (.getColumnCount _md)
    ]
    (while (true? (.next _rs))
      (let [ _dbObject (new BasicDBObject) ]
        (dotimes [ _i _columnCount ]
          (.put _dbObject (.getColumnName _md (inc _i)) (.getObject _rs (inc _i)))
        )
        (.insert _coll _dbObject (WriteConcern/NORMAL))
      )
    )
  )
)

(with-open
  [
  _mongo (new Mongo _mongoHost)
  _conn (DriverManager/getConnection _dbmsUrl _dbmsUser _dbmsPass)
  _rs (.executeQuery (.prepareStatement _conn "select tablename from pg_tables where schemaname = 'public'"))
  ]
  (let [ _mongoDB (.getDB _mongo _mongoDBName) ]
    (while (true? (.next _rs))
      (let
        [
        _tableName (.getString _rs 1 )
        _coll (.getCollection _mongoDB _tableName)
        ]
        (_copyData _conn _tableName _coll)
      )
    )
  )
)

(System/exit 0)

実行には以下のjarファイルが必要

  • Clojure
  • JDBC Driver
  • MongoDB Driver

  • DoormanがEclipse Marketplaceからインストール可能に


    0x00. Eclipse Marketplaceとは


    Eclipse Market Placeとは、Eclipseプラットフォーム上で動作するプラグインや製品などを検索したりダウンロードしたりすることができる仕組みである。ウェブサイトとしてはhttp://marketplace.eclipse.org/となっており、ブラウザからも使用できるほか、Eclipseの3.7以降からは直接メニュー(Help->Eclipse Marketplace)からアクセスできるようになっている。特にEclipseから直接使う場合、ソフトウェアのダウンロードからインストールまでできて便利な仕組みになっている。開発者は自分の開発したソフトウェアを登録することで、より多くのユーザにアピールすることができるようになる。いわばEclipse版AppStoreである。

    0x01. 登録は簡単


    以前のエントリにまとめたように、以前はスタンドアローンのアプリケーションであったDoorman@JUMPERZ.NETを「Doorman Eclipse Plugin」という名前のEclipseプラグインに移植した(というか、プラットフォームをEclipseに変更した)。そこで、せっかくAppStore的な場所があるのならば使ってみようと思い、手続きを行ってみた。自作アプリケーションを公開する場合、今どきの開発者ならばAppleのAppStoreかGoogleのAndroidマーケットあたりが普通だと思うが、筆者は斜め上のEclipse Marketplaceである。

    Doormanのようなオープンソースのプラグインの場合、Eclipse Marketplaceへの登録にはお金は必要ない(商用のものがどうなのかは要調査)。お金どころか特に身分証明も必要ない。ガイダンスにしたがってアカウントを作成し、ソフトウェアの情報を記入すると、数日の審査期間を経て登録が済んだ。現在無事にこちらで公開されている。分類も、自分が選んだとおりに「Tools」「Network」となっている。

    0x02. アップデートサイトも用意した


    以前のエントリに書いたようにDoormanのアップデートサイトは用意しないつもりだったのだが、Eclipse Marketplaceを眺めていて考えが変わった。意外と多くのソフトウェアがきちんとアップデートサイトを用意しているのだ。もしかしたら今どきのEclipseユーザはjarファイルを直接pluginsフォルダに突っ込むなどということはしなくなっているのかもしれない。Eclipseからのアップデートサイトの使い方も、なれてしまえばURLひとつで行けるため楽な気もする。ということでjumperz.net上に/update/というパスを用意し、そちらでEclipseからのリクエストを受け付けるようにした。現在Doormanだけでなく、ExEditもインストールできるようになっている。

    アップデートサイトURL: http://www.jumperz.net/update/

    Marketplaceに登録されたため、Eclipseのメニューから直接Marketplaceを開き、「Doorman」で検索すればインストールができるようになっている。

    これは案外素晴らしいかもしれない。

    0x03. 新機能JSフック

    今回の登録と時を同じくして、DoormanJSHookという新プラグインを追加した。Doorman上でリクエストやレスポンスをフックし、その際JavaScriptで処理を書けるようにするものである。以前からClojureで処理を書くことができるDoormanHookは存在していたが、さすがにClojureで書くやつはあまりいなさそうなので、Java上でデフォルトで使えるRhinoを使うものを作成した。User-Agentの書き換えを行いたい場合などやCookieのコントロールを行いたい場合などに、if文で条件を付けることが簡単にできるので便利である。

    例えば「Googleにアクセスする際にはCookieを削除し、User-AgentはAnonymousという文字列にする」という場合は以下のように書けばよい。

    var host = request.getHeaderValue( 'Host' )
    if( host.indexOf( 'google' ) > -1 )
    {
    request.removeHeaderValue( 'Cookie' )
    request.setHeaderValue( 'User-Agent', 'Anonymous' )
    }

    書き換えを行う場合にはnet.jumperz.net.MHttpRequestクラスなどのメソッドを呼び出すことになる(上記ソースコードにおけるrequestはこのクラスのインスタンスである)。このあたりのAPIなどはドキュメントすら存在していない(なぜなら仕様がコロコロ変わる可能性を秘めているから)ので、詳しくはソースコードを参照いただければと思う。あるいはtwitter上で@kinyukaで質問していただければお答えできる。まぁ、とにかくマニア向けのソフトウェアということである。

    0x04. SSL証明書の表示機能

    また、順番としてはDoormanJSHookよりも前になるのだが、SSL通信時にどのような証明書チェーンが利用されているのかを簡単に確認できるような機能を追加してある。これは筆者が自分で欲しかったので作った機能だ。

    ウェブサーバの管理者ならばご存じかと思うが、SSL証明書を自分でインストールする場合、中間CA証明書というよくわからないものをインストールしたりすることがある。この際やっかいなのが、「IEではエラーにならないのに、Firefoxではエラーになる」のような場合だ。中間CA証明書がIEにはもともと入っているのに、Firefoxには入っていないような場合で、かつサーバ上にインストールし忘れているような場合にこれが起こる。

    SSLを使用しているウェブサイトを訪問中、現在どのような証明書が使われているのかをウェブブラウザから確認する場合、手順が少し面倒くさい。まず証明書のダイアログを開くまでの手間がけっこうかかる場合がある。そしてさらに、証明書のチェーンを表示するダイアログにおいて、中間CA証明書が果たしてサーバ側にあるものなのか、あるいはローカルにインストールされているものなのかが確認できないことが多い。

    Doormanではこの部分の不満を改善するため、以下のようにCertificate ListとCertificate Detailという2つのViewを用意した(クリックで拡大)。

    Certificate Listの一番左のカラムは証明書がサーバ側なのかローカルなのかを示しており、ここでは2つの証明書(ウェブサイトの証明書とそれに対応する中間CA証明書)がサーバ側にインストールされていることがわかる。また、Expカラムでは期限切れまでの日数を表示している。

    ちなみに証明書の正当性について「Validation: OK」という表示をしているが、これはDoormanの独自基準(証明書のチェーンだけ見る。ウェブサイトのFQDNと証明書のコモンネームは見ない。実行中のJREに含まれるルート証明書でチェック)での判定なので、あまり当てにしない方がよい。基本的にはウェブブラウザで直接アクセスして確認するのがベストだ。ブラウザには入っているのにJREには入っていない証明書などもあるため、DoormanでNGとなってもブラウザでは問題ないケースがある。

    この機能を作ったことで、筆者の業務でのSSL関連の運用作業はずいぶん楽になった。

    0x05. 登録してみての感想

    Eclipseプラグインの開発自体がかなりマニアックな存在となっていることもあり、Eclipse MarketplaceもAppStoreやAndroidマーケットと比べると3桁か4桁くらい登録されているソフトウェアが少ない(反面、ソフトウェアの平均的な質は高い)。Doormanを登録した先のジャンルである「Network」にわずか16個しかソフトウェアが存在しないことからも、Marketplaceの寂れっぷりが孤高の存在であることが伝わってくる。

    ぶっちゃけ登録しても全然ダウンロードされないだろうと思っていたのだが、アクセスログを確認してみたところ意外とそうでもなかった。オランダ・ブラジル・中国など世界各国からそこそこダウンロードされているようだ。(ところで、Eclipseが直接ダウンロードしにくるわけだが、その際のUser-AgentがJakarta Commonsになっていた。Eclipseにしておけばいいのに…)

    ぼちぼちでも利用者が増えてくれるとうれしいので、登録して正解だったと考えている。

    0x06. 孤高のプログラマを目指せ

    ということでClojureやEclipseプラグインの開発などを行っているのだが、国内では書店に行ってもどちらも関連書籍が1冊ずつしかないような有様でかなり孤高な感じである(英語書籍ならそこそこ出版されているのだが…)。Clojureは国内でもじわりとユーザを増やしているような気がするので、Eclipseプラグインの方にも頑張ってもらいたいと考えている日々である。

    0x07. 追記

    JSHookとHookはどちらもBreakさせるために使うことも可能となっている。Hookのコードが戻り値として”break”を返す場合にはBreakを設定したのと同じようにリクエストやレスポンスの編集が行えるようになる。

    例えば「リクエストがPOSTで、かつレスポンスにSet-Cookieヘッダが存在する場合にはBreakする」という場合には以下のようなJSHook(TypeとしてResponseを選ぶことに注意)を記述する。

    if( request.getMethod() == 'POST' && response.headerExists( 'Set-Cookie' ) )
    {
    'break'
    }

    このように、複雑な条件でのみBreakさせたいという(あるのかどうか極めて微妙な)ニーズに対応した。


    ClojureのSTMは使い物にならない

    0x00. Clojureがいけてる件について


    ここ数ヶ月でClojureをどんどん実戦投入してみているが、その成果は素晴らしいの一言に尽きる。Javaでは考えられなかったほどスマートかつ柔軟にデータ処理が可能であり、「あれ、こんなに短い記述でできちゃうのか!」と驚かされることが多い。そんなわけで、何でもかんでもJavaで片付けてきた筆者はここにきてClojureにかなり惚れ込んでおり、電子書籍やらウェブサイトやらで本格的に情報収集を進めているのだが…

    0x01. Clojureの並列プログラミング


    現時点では、Clojureを実戦投入したのは、ちょっとした処理に使うツール的なものだけである。理由は単に、筆者がまだClojureの初心者だからだ。しかしそろそろメインの仕事であるサーバアプリケーションやウェブアプリケーションでも使いたくてウズウズしてきており、そのような視点からさらに調査を進めている。

    サーバアプリケーションやウェブアプリケーションでは並列プログラミング的なことをよくやるのだが、ClojureではこれはSTMを使って処理するのが王道のようだ。Clojureの解説を行う本などでは、STMは従来のJavaのロック地獄から解放してくれる救世主的な扱いをされており、筆者としては非常に期待していた。Javaでのマルチスレッドプログラミングは確かになかなか難しい部分があり、コードレビューなどで問題点を見つけ出す技術には経験に基づく勘が要求される。単に並列プログラミングを行いたいだけなのにメモリバリアーが〜とか言われるため、ある種バッドノウハウ的な側面がある(筆者は個人的に大好きだが)。

    ClojureのSTMはぱっと見た感じでは拍子抜けするほどシンプルであり、「これで本当に動くなら、今までのJavaの並列プログラミングは何だったんだ…」と思う仕上がりになっている。

    0x02. 遅いらしい


    筆者の見落としである可能性もあるが、Clojureの解説を行う本(Programming ClojureとPractical Clojure。ついでに現在The Joy of Clojureを読んでいるところ)では、STMのパフォーマンスに関する記述はなかったように思う。そのため、普通に読んでいる読者は、間違いなく「並列プログラミングが必要とされる場面ではSTMを使えばよい」と考えるだろう。筆者もそう思っていたのだが、ある日Clojureに的を絞っているわけではない書籍である「Programming Concurrency on the JVM」を読んでいたところ、「STMはReadが多く、Writeが少ない場面だけにしておくべし」的な記述があって驚いた。Clojureでは基本的に並列プログラミングにはSTMしか選択肢がない(もちろんClojureからJavaのスレッドを生成したりすることもできるが、それならばsynchronized等の文法がサポートされている生のJavaを使った方が百倍開発しやすいため、却下)のに、書き込み競合が多い場面ではSTMが使えないのでは、開発が成り立たないのではと思ってしまうが…。この書籍内では実際にSTMが遅くて使えないケースをソースコード付きで掲載している。

    そんなわけで、単純な例でベンチマークを取ってみることにした。

    0x03. ベンチマークの内容


    マップをひとつ生成する。このマップに対して多数のスレッドから書き込み競合が多く発生するようなアクセスを行う。具体的には、各スレッド内でランダムにキー(100種類のうちの1つ)を生成し、そのキーに対応する値を取得する。値は数値とする。この数値をひとつインクリメントし、再びマップにセットする。マップ内にキーが存在していない場合には1をセットする。スレッドは300個生成し、上記のインクリメント動作はそれぞれのスレッドで10万回ずつおこなう。これをJavaとClojureでそれぞれ記述したものが以下である。

    Test2.java

    package test;

    import java.util.*;

    public class Test2
    {
    public static final int _keyRange = 100;
    public static final int _threadCount = 300;
    public static final int _repeat = 100000;
    //--------------------------------------------------------------------------------
    public static void main( String[] args )
    throws Exception
    {
    final long start = System.currentTimeMillis();
    final Map map = new HashMap();

    Runtime.getRuntime().addShutdownHook( new Thread( new Runnable()
      {
      public void run()
        {
        System.out.println( System.currentTimeMillis() - start );
        System.out.println( map );
        }
      } ) );

    for( int i = 0; i < _threadCount; ++i )
      {
      new Thread( new Runnable()
        {
        public void run()
          {
          for( int k = 0; k < _repeat; ++k )
            {
            Random random = new Random();
            String key = "key" + random.nextInt( _keyRange );
            
            synchronized( map )
              {
              Integer value = ( Integer )map.get( key );
              if( value != null )
                {
                int oldValue = value.intValue();
                map.put( key, new Integer( oldValue + 1 ) );
                }
              else
                {
                map.put( key, new Integer( 1 ) );
                }
            
              }
            
            }
          }
        } ).start();
      }
    }
    //--------------------------------------------------------------------------------
    }

    bench.clj(注:間違いあり。修正版は0x06項目を参照)

    (ns bench)
    (import '(java.util Random))

    (def _keyRange 100)
    (def _threadCount 300)
    (def _repeat 100000)
    (def mapref1 (ref {}))
    (def _start (System/currentTimeMillis))
    (def _agents (seq (repeat _threadCount (agent nil))))

    (defn fn1 []
      (let
        [
         _random (new Random )
         _key (str "key" (.nextInt _random _keyRange))
        ]
        
        (dosync
          (let [ _oldValue (get @mapref1 _key) ]
            (if (nil? _oldValue)
              (alter mapref1 assoc _key 1)
              (alter mapref1 assoc _key (+ 1 _oldValue))
            )
          )
        )
      )
    )

    (defn fn2 [ dummy ]
      (dotimes [ _index _repeat ]
        (fn1)
      )
    )

    (doseq [ _agent _agents ]
      (send-off _agent fn2)
    )

    (defn fn3 []
      (println (- (System/currentTimeMillis) _start))
      (println @mapref1)
    )

    (doseq [ _agent _agents ]
      (await _agent)
    )

    (fn3)
    (shutdown-agents)
    (System/exit 0)

    0x04. ベンチマーク結果


    この2つのプログラムを実行してみると、Java版は6〜7秒程度で終わるが、Clojure版は60秒程度かかる。Clojure版はざっと10倍程度遅いことになる。(これは書き込み競合が極端に多い例であり、ClojureのSTMが最も苦手とするケースではあることに注意が必要だ。Writeが少なく、Readが多いケースでは、Javaより速い場合もあるかもしれない。)

    0x05. STMは使えないと思う理由


    筆者はこの結果を受けて、「ClojureのSTMは使えない」と判断した。

    「Readが多いケースならば、STMもよいのでは」という意見もあるかもしれない。しかし開発している最中に、いちいち「この部分の競合ではReadが多いか?」などと考えるのはナンセンスだ。

    また、サーバアプリケーションに予想以上のアクセスが集中するなどのケースで、「読み込みが大部分だろう」と思っていた箇所で書き込み競合が多く発生してしまったら悲惨なことになってしまうかもしれない。そして、そもそも書き込み競合が多い場面で使えないのでは、別の技術も勉強する必要があり、無駄である。

    さらにもう一つの理由として、「正しいコードかどうか」の判断を付けることができないという点があげられる。Javaのロックベースのマルチスレッドプログラミングは、確かに落とし穴が多い。しかしFindBugsのようなツールやコードレビューなどによって、デッドロック等の問題が発生する可能性などを見つけることができる。また、実際にデッドロックが起こってしまった場合などにも、問題点がはっきりする。問題点が見つかれば、それを修正していくことで、「正しいコード」に近づく。そしてある時点で、ほぼ確実に正しく動作するコードになった、と自信を持つことができるようになる。そしてそのコードは速い。

    ClojureのSTMでは、実際にアプリケーションを完成させてテストするまで、「性能がでるかどうか」つまり「正しいコードかどうか」を判断することができない。サーバアプリケーションではしばしばテスト時点では想像もできない複雑なパターンのアクセス集中が発生することが予想されるため、いつまでたっても「このコードでいける!」と自信を持つことができなくなるだろう。また、問題があった場合の修正はアプリケーションの作り自体の変更になる可能性があり、Javaでのデッドロックの修正よりもよほど大がかりになる可能性がある。これはかなりの茨の道になるだろう。

    この筆者の判断は現時点でのものなので、数年後にSTMが爆速になっていたりすれば、もちろん使ってみるつもりだ。また、性能が問題にならないようなちょっとした処理であればもちろんSTMは便利に使えるだろう。

    0x06. 追記(コードのミスを修正)


    コメント欄でTakahiro Hozumiさんより非常に有意義なコメントをいただいた(ありがとうございます!)。やはりSTMのパフォーマンスには問題があるらしい。また、上記の筆者のClojureコードにはミスがあり、実際には300スレッドを生成できていないことがわかった。

    修正版を作成したので以下に掲載する。

    bench.fixed.clj

    29行目をコメントアウトすると、確かに300スレッド生成できていることが確認できる(また、ps -eLf等のコマンドでも確認済み)。

    問題のパフォーマンスだが、修正後のコードを使って300スレッド生成してみるとさらに遅くなることがわかり、Javaより10倍どころではなく、恐ろしく遅くなることがわかった。そのため、0x05で示した本稿の結論的なものは変わらない形となる。

    0x07. さらに追記


    Clojureでは(今回のテストのように)単純に多くのスレッドを生成したいだけの場合にはagentの使用は不適切であるとコメント欄で指摘していただいた。このような場合にはagentの使用には大きなオーバーヘッドがあるようだ。The Joy of Clojureの11章がこのあたりに触れた説明となっており、とても参考になる。

    本稿で目的としているテストを正しく行うためのコードはTakahiro Hozumiさんが作成してくれたhttps://gist.github.com/3048d90328d3118583a4の(ref-bench)であり、パフォーマンスはJavaの10倍程度遅いということになるようだ。

    少し話題が反れるが、Programming ClojureやPractical Clojureを読んだ印象はまさに「Clojure=シンプル」であったのだが、The Joy of Clojureの11章の印象は(悪い意味で)かなり違う。現実的にはそれほどシンプルな記述で並行プログラミングを実現できる、というわけではなさそうだ。lockingなどは普通にデッドロックを起こす可能性があるように思えるためClojureらしくないと感じる。

    0x08. またまた追記


    詳しくはコメント欄を参照していただきたいが、最終的には以下のようなことになるようだ。

  • Agentを使ってスレッドを作ること自体のオーバーヘッドはそれほど大きくない
  • (書き込み競合が多い場合の)STMは非常に重く、Javaの10倍どころではない時間を要する