KeishiS.github.io

Tutorial on Distributed Computing in Julia

📅
🔄
✍️ KeishiS
Warning

This article was automatically translated from the Japanese original. Please refer to the Japanese article for accurate information.

1. Introduction

There was a calculation in my research that would take too much time without distributed processing, so I checked how to use Distributed.jl and wrote down what I found out.

2. Basic Usage

The basic idea is that there is a main process and worker processes, and the main process throws tasks to the worker processes saying "do this work". There are several ways to start worker processes.

  1. Specify the number of worker processes to start locally when starting Julia

    • If you run julia -p 2, the main process starts in the foreground and 2 worker processes start in the background.

  2. List the hosts where you want to start worker processes in Code 1 and specify it from the command line argument julia --machine-file hosts

    • Each line represents one host, and you can start multiple processes by writing in the format [number of processes to start*]hostname.

  3. Start worker processes with addprocs after starting Julia

    • This is the most flexible startup method, allowing you to specify working directory, etc.

Code 1. hosts
localhost
2*localhost
remote-server-01

There are several points to note at that time.

  • SSH connection is required to start processes on remote servers

  • The local environment is not automatically reproduced in the remote process.

    • If you try to use a package that is installed locally but not in the remote environment, it will stop at @everywhere described later

  • Note that when starting a process on a remote server, unless specified, it tries to start with the same working directory path as the local environment

Note

It is good to use them depending on the situation.

  • If you want to do distributed processing quickly locally, use the 1st one (although I think it is better to use Threads…​)

  • If a computer cluster etc. is set up (e.g., a common directory can be accessed from all machines using NFS etc.) and complex access to the remote server is not required, the 2nd one is easy

  • If you want to use multiple remote servers with different environments, you cannot deal with it unless you set arguments individually with the 3rd one

3. Code Design for Distributed Processing

Although I have some experience with distributed processing, my experience is scarce, so after thinking carefully this time, I came to the conclusion that the following two policies would be the Best Practice for me.

  1. When "number of distributed tasks" is at most several times the "number of remote processes", or the number of remote processes is larger

    • Throw tasks to processes sequentially while enumerating them

  2. When "number of distributed tasks" is overwhelmingly larger than "number of remote processes"

    • Throw tasks into a queue, and remote processes monitor the queue, process tasks when they come in, and return them to the result queue

This is mainly a result of considering the overhead due to context switching.

Note

Considering the overhead due to context switching, it seems good to write separately as follows

  • If the number of processes is larger than the number of tasks, the policy is to throw tasks to processes sequentially while enumerating tasks

  • If the number of processes is overwhelmingly smaller than the number of tasks, the policy is to exchange tasks and results via a queue

4. Example Code for Policy 2

For future reference, I summarized the minimum necessary Example Code as shown in Code 2.

Code 2. example code
using Distributed

buff_size = 100
jobs = RemoteChannel(() -> Channel{Int}(buff_size))
results = RemoteChannel(() -> Channel{Tuple{Int,Int}}(buff_size))

@everywhere begin
    function remote_work(jobs, results)
        while isopen(jobs)
            if !isready(jobs)
                sleep(1)
                continue
            end

            try
                argv = take!(jobs)
                println("[$(myid()), $(gethostname())] argv: $(argv)")
                sleep(5)
                put!(results, (myid(), argv))
            catch
                println("[SUSPEND] $(myid()), $(gethostname())")
                break
            end
        end
        println("[END] $(myid()), $(gethostname())")
    end
end

# Setup workers
for p in workers()
    remote_do(remote_work, p, jobs, results)
end

# throw jobs
for i in 1:10
    put!(jobs, i)
end

# check results
sleep(5)
for i in 1:10
    wid, ret = take!(results)
    println("wid: $(wid), ret: $(ret)")
end
close(jobs)
close(results)
  • In the worker process, run the remote_work function, and it operates while the jobs queue is open

    • Even if the jobs queue is open, if nothing is in it, wait 1 second and check again

    • If it is in the jobs queue, take it out, do some processing based on it, and return the result to the results queue

  • In the main process, do the following processing

    • Specify the function to be executed in the worker process

    • Stuff jobs into the queue

    • Take out the results and organize them

Note

I summarized the Example Code by thinking about the code design policy for distributed processing in Julia language.