KeishiS.github.io

Tutorial on Distributed Computing in Julia

📅
🔄
✍️ KeishiS

1. 導入

研究で分散処理しないと時間がかかりすぎる計算があり, Distributed.jl の使い方をチェックしたので,調べてわかったことをメモ書きします.

2. 基本的な使い方

メインプロセスワーカープロセス があり,メインプロセスから「この仕事やってねー」とワーカープロセスへタスクを投げるというのが基本的な考え方となる.ワーカープロセスを立ち上げる方法はいくつかある.

  1. Julia起動時にローカルで立ち上げるワーカープロセス数を指定

    • julia -p 2 とすれば,foregroundではメインプロセスが立ち上がり,backgroundでワーカープロセスが2つ立ち上がる.

  2. ワーカープロセスを起動したいホスト一覧をコード 1へ記載して,コマンドライン引数から指定 julia --machine-file hosts

    • 各行が1つのホストを表し,[起動するプロセス数*]hostname の書式で書けば複数のプロセスを起動できる.

  3. Julia起動後, addprocs でワーカープロセスを起動

    • 最も柔軟に設定できる起動方法で working directory 等を指定できる.

コード 1. hosts
localhost
2*localhost
remote-server-01

その際にいくつか注意点がある.

  • リモートサーバでプロセスを起動するにはSSH接続が必要

  • ローカル環境をリモートプロセスで自動的に再現してくれない.

    • ローカルには入っているけどリモート環境では入れてないパッケージを使おうとすると,後述する @everywhere の時に止まる

  • リモートサーバでプロセスを起動する際,指定がなければローカル環境と同じworking directoryのパスで起動しようとする点に留意

Note

状況によって使い分けるのが良い.

  • ローカルでサクッと分散処理したいなら1番目 ( Threads を使ったほうがよいと思うが...)

  • 計算機クラスタ等が整っていて(NFS等を用いて全マシンから共通のディレクトリへアクセスできる等),かつリモートサーバへの複雑なアクセスが不要であれば2番目が楽

  • 環境の異なる複数のリモートサーバを利用したいなら3番目で引数を個別に設定しないと対応できない

3. 分散処理のためのコード設計

分散処理は多少触ってはいるものの経験は乏しいため,今回じっくり考えて,以下2つの方針が自分的にBest Practiceだろうという結論になった.

  1. "分散タスク数" が "リモートプロセス数" の高々数倍程度,またはリモートプロセス数の方が多い場合

    • タスクを列挙しながら逐次プロセスへ投げる

  2. "分散タスク数" が "リモートプロセス数" より圧倒的に多い場合

    • タスクをキューへ投げて,リモートプロセスはキューを監視して,タスクが入ってきたら処理して結果用のキューへ返す

主にコンテキストスイッチによるオーバーヘッドを考慮した結果である.

Note

コンテキストスイッチによるオーバーヘッドを考慮し,以下のように書き分けるのが良さそう

  • プロセス数がタスク数より多いなら,タスク列挙しながら逐次プロセスへ投げる方針

  • プロセス数がタスク数より圧倒的に少ないなら,キューを介してタスクと結果をやり取りする方針

4. 方針2のためのExample Code

後学のために,必要最小限のExample Codeを コード 2 の通りにまとめた.

コード 2. example code
using Distributed

buff_size = 100
jobs = RemoteChannel(() -> Channel{Int}(buff_size))
results = RemoteChannel(() -> Channel{Tuple{Int,Int}}(buff_size))

@everywhere begin
    function remote_work(jobs, results)
        while isopen(jobs)
            if !isready(jobs)
                sleep(1)
                continue
            end

            try
                argv = take!(jobs)
                println("[$(myid()), $(gethostname())] argv: $(argv)")
                sleep(5)
                put!(results, (myid(), argv))
            catch
                println("[SUSPEND] $(myid()), $(gethostname())")
                break
            end
        end
        println("[END] $(myid()), $(gethostname())")
    end
end

# Setup workers
for p in workers()
    remote_do(remote_work, p, jobs, results)
end

# throw jobs
for i in 1:10
    put!(jobs, i)
end

# check results
sleep(5)
for i in 1:10
    wid, ret = take!(results)
    println("wid: $(wid), ret: $(ret)")
end
close(jobs)
close(results)
  • ワーカープロセスでは remote_work 関数を動かして, jobs キューが開いている間動作する

    • jobs キューが開いてても何も入っていなければ1秒待って再確認

    • jobs キューに入っていれば取り出して,それをもとに何らかの処理をして results キューに結果を返す

  • メインプロセスでは以下の処理をする

    • ワーカープロセスで実行する関数の指定

    • ジョブをキューへ詰め込む

    • 結果を取り出して整理

Note

Julia言語での分散処理について,コードの設計方針を考えてExample Codeをまとめた.