Giraphの解剖記録

導入

Giraphのソースコードを読んで見たので、そのメモとして残しておきます。

GiraphというのはHadoop上で動作する大規模グラフ処理システムのことです。 オリジナルはPregelといわれるGoogleが論文を出したBulk synchronous parallel(BSP)を用いたグラフ処理システムのようです。

もう少し詳しい導入が必要な方は、smly先生TokyoWebMiningでの発表を見てください。

さて、あまりに適当な導入を飛ばしてまずはコード例を見ていきます。

コード例を見てみる

Giraph自体は、グラフ処理を行うためのフレームワークですので、具体的なアプリケーションコードは用途に合わせて書いてあげる必要があります。

具体例として、Giraphのソースコードにはたくさんのexampleがあるので、その中で最短経路を求めるプログラムを見てみます。 細かい部分は見ず、今は薄目でざーっと眺めてみます。

public class SimpleShortestPathsVertex extends
    EdgeListVertex {
  public static final String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
  public static final long SOURCE_ID_DEFAULT = 1;
  private static final Logger LOG =
      Logger.getLogger(SimpleShortestPathsVertex.class);

  private boolean isSource() {
    return getVertexId().get() ==
        getContext().getConfiguration().getLong(SOURCE_ID,
            SOURCE_ID_DEFAULT);
  }

  @Override
  public void compute(Iterator msgIterator) {
    if (getSuperstep() == 0) {
      setVertexValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource() ? 0d : Double.MAX_VALUE;
    while (msgIterator.hasNext()) {
      minDist = Math.min(minDist, msgIterator.next().get());
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
          " vertex value = " + getVertexValue());
    }
    if (minDist < getVertexValue().get()) {
      setVertexValue(new DoubleWritable(minDist));
      for (LongWritable targetVertexId : this) {
        FloatWritable edgeValue = getEdgeValue(targetVertexId);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Vertex " + getVertexId() + " sent to " +
              targetVertexId + " = " +
              (minDist + edgeValue.get()));
        }
        sendMsg(targetVertexId,
            new DoubleWritable(minDist + edgeValue.get()));
      }
    }
    voteToHalt();
  }
}

どうやらDijkstraを実装しており、グラフの各頂点ごとにcomputeというメソッドが実行されるようです。 処理の主体が頂点であり、各頂点からは、隣接する頂点に対してメッセージを送信したり、受け取ったりすることで最短経路を求めることができるようです。 おそらくメッセージのやりとりがなくなった段階で収束したと見なして処理が終了するのでしょう。

それではこのコードは実際どのように呼ばれるかをジョブの投入から見ていきたいと思います。

ジョブの投入からの処理の流れ

実のところGiraphはHadoopMapReduceを利用しています。 ですが、一般的なMapReduceアプリケーションのように、MapReduceを何度も実行して上記のcompute関数を呼び出している、というわけではありません。

MapReduceもReducerは使わずMapperだけが使用され、たくさんのサーバ上でプログラムを実行するためのディスパッチャとして利用されているだけです。

メッセージのやり取りについても、独自にhadoop.ipc.RPCを利用してネットワーク通信方法を構築しており、また各サーバ間の同期にはzookeeperを利用しています。

それではcomputeがどのように呼ばれるか、処理の流れを追っていきます。

Flow-1
()内は処理が書かれているファイル名及び関数です。
1. main関数からMapReduceジョブを実行(GiraphRunner.java main)
  -> 2. MapReduceジョブの各種設定及びドライバーコードを実行(GiraphJob.java run)
    -> 3. Mapperタスクの並列起動(GraphMapper.java run)
      -> 4. 各Mapperタスクの役割(後述)の決定と初期化(GraphMapper.java prepare)
        -> 5. スーパーステップ(後述)の開始(GraphMapper.java map)
        -> 6. 自分に割り当てられたグラフの頂点取得(GraphMapper.java map)
        -> 7. 各頂点ごとにcomputeメソッドを実行(GraphMapper.java map)
        -> 8. 全処理が終了していなければ次のスーパーステップを実行するため5に戻る(GraphMapper.java map)

Mapperタスクの役割や、スーパーステップという聞きなれない言葉が出てきました。 スーパーステップというのは何でしょうか?

Bulk synchronous parallel(BSP)

最初にも紹介しましたがGiraphはPregelのオープンソース実装です。PregelはBSPを利用しているという話もありました。 BSPというのは並列アルゴリズムを実行するための計算モデルです。

次の画像はBSPの処理を表現したものです。

実際のところそう難しくはなさそうです。下記の処理をスーパーステップという1単位として、何度もスーパーステップを繰り返すことにより、上記の最短経路のような問題を解くことができます。

  1. Concurrent computation: 各サーバがそれぞれローカルの情報を元に計算
  2. Communication: 各サーバがデータを交換
  3. Barrier synchronisation: 全ての計算及びデータ交換が終了するまで待つ

上記のFlow-1では、5-8が1つのスーパーステップになっていることがわかります。「Concurrent Computation」及び「Communication」は最短経路プログラムでのcomputeメソッドが行なっていますね。「Barrier Synchronisation」はどこに相当するかというと、Flow-1 8の時に準備ができるまで待つという処理が含まれており、zookeeperに今回のスーパーステップ終了だよと書きこまれれば各サーバーはその情報を参照して次のステップへ進んでいきます。

それではその「スーパーステップ終了の合図」は誰が書き込むのでしょうか? 次の「各Mapperの役割」で説明します。

各Mapperの役割

スーパーステップを実行をするためには、各頂点における計算が終了したかどうかといった同期情報を管理・監視する処理も必要であることに気づきます。 Giraphはこのような処理を実行するために、下記のような役割があります。

これらの役割は重複して持つことも可能ですし、各サーバが別々に持つこともできます。

  • Master: スーパーステップの管理・監視を行う。1~複数台(パラメータで設定可能)で起動される。(BspServiceMaster.java, MasterThread.java)
  • Zookeeper: Masterノードと同じサーバで実行されることが多い. Zookeeperを起動して分散協調管理を行う。1~複数台(パラメータで設定可能)で起動される。
  • Worker: 「compute」メソッドを実行するMapperタスク(Graph Mapper.java, BspService.java, BspServiceWorker.java)

注意点ですが、どの役割もMapperタスクで実行されるので、HadoopのいわゆるMasterノードと混同しないように気をつけてください。

ネットワーク通信

最後に各サーバ間でどのような通信が行われるかを説明します。

スーパーステップの同期や、グラフの頂点の割り当て情報などはZookeeperを通じてMaster, Worker間でやり取りをします。

各ワーカー間で送られるメッセージについては、毎回メッセージが送信されるわけではありません。 ある程度大きな塊となった段階で送るか、それともスーパーステップの終了段階で送るかの2通りにわかれます。 こちらの処理は、hadoop.ipc.RPCを通じて、各サーバに対して直接メッセージの送受信を行います。

例えば5台、ワーカが起動しているサーバがあれば、ネットワーク通信は最大5x5=25回、特にスーパーステップが完了するあたりで実行されるということになります。 (ソースコード的には、BasicRPCCommunications.java のPeerFlushExecutorやLargeMessageFlushExecutorでproxy.putMsgと自然にリモートメソッド呼び出し(RPC)しているあたりがネットワーク通信に該当します。)

感想

ソースコードを熟読したわけではなくて、さらっと見ただけですので間違えているかもしれません。そもそもSVNのTRUNKですので、公開した段階で既に修正されている部分があるかもしれません。 必ずこちらのメモを鵜呑みにはせずに自分でも調べるようにしてください。

それから、コード読んでてブログにする段階で見つけてしまったんですが、Giraphの内部の仕組みを解説するパワポはこのスライドが詳しいですね。。。なので多分続きは書きません。

あと、どこかで「キリンさんが好きです。でも象さんの方がもっと好きです」というベタベタなネタを入れたかったんですが、入れる場所がありませんでした。

参考URL

Giraph Introducing Apache Giraph for Large Scale Graph Processing グラフ問題とバルク同期並列の常識をGiraphで体得 TokyoWebMiningでの発表 Wikipedia - Bulk synchronous parallel HadoopのIPC/RPC