This page looks best with JavaScript enabled

Julia 并行计算

 ·  ☕ 12 min read  ·  🔮 Yu · 👀... views

Julia并行基础-并发、管道1

Julia进程(Worker)

有关并行化与分布式的功能一般均在 Julia 的模块 Distributed 中提供。在 Julia 的并行环境中,将进程称为工作者(Worker)。默认情况下,Julia 启动时只会开启一个 Worker,其 PID 默认为1。运行中可以通过 addprocs() 和 rmprocs() 两个函数随时增加或减少 Worker 的数量,此时其他 Worker 的PID会依次从2开始,并被视为远程 Worker。

也可以在启动时以 julia-pn 的方式启动多进程的 REPL,其中 n 是进程数,通常是 CPU 核数。我们可以随时调用 procs() 函数获得所有 Worker 的 PID,包括本地默认的 Worker;也可以通过 workers() 查看所有远程 Worker 的 PID。

julia> using Distributed    #启用Distributed

julia> procs()  #获取所有WorkerPID
1-element Array{Int64,1}:
 1

 julia> addprocs()  #不写参数调用貌似会曾在CPU的逻辑处理器数量的Worker,我这是开了超线程的I7-8700ES
12-element Array{Int64,1}:
  2
  3
  ⋮
 12
 13

julia> procs()  #获取所有WorkID
13-element Array{Int64,1}:
  1
  2
  ⋮
 12
 13

julia> rmprocs(13)  #删掉ID = 13的Worker
Task (done) @0x000000001252f3d0

julia> procs()  #再次查看,ID = 13的Work已经消失
12-element Array{Int64,1}:
  1
  2
  ⋮
 11
 12

julia> rmprocs(2:12)    #删掉ID为2~12的Worker
Task (done) @0x00000000125248b0

julia> procs()  #获取所有Worker的ID已经只有ID = 1的Worker了
1-element Array{Int64,1}:
 1

julia> addprocs(2)  #增加两个Worker,看来因为之前创建过Worker,即使删除还是会从14开始
2-element Array{Int64,1}:
 14
 15

julia> procs()  #查看所有Worker的ID
3-element Array{Int64,1}:
  1
 14
 15

julia> workers()    #查看所有远程Worker的ID,此时不会输出ID = 1的Worker
2-element Array{Int64,1}:
 14
 15

协程

协程并不是进程或者线程这种系统级别的并发设施,可以看作是用户态的轻量级线程,所以在切换过程中的消耗要比进程、线程上下文切换时要小得多。更为吸引人的是,协程不会涉及让人头疼的同步或锁的问题,因为它不像进程、线程那样是抢占式多任务机制,而是协作式多任务机制。理论上讲,协程比线程的执行效率更高,而且因为占用的额外消耗极少,所以在一个系统中完全可能出现成千上万的协程,远远超出进程或线程的并发程度。

创建协程

在 Julia 中创建协程 Task 非常简单。构造方法Task(f::Function)@task宏可分别将一个函数对象与一个表达式封装为协程 Task 对象;但是函数参数f必须能够进行无参调用(没有参数或所有参数都有默认值)。

例如,分别以这两种方式创建 Task 对象,用于将数组的所有元素修改为单一的值,如下:

julia> a = zeros(1,5)   #创建一个元素值全为0.0的1x5数组
1×5 Array{Float64,2}:
 0.0  0.0  0.0  0.0  0.0

julia> f() = fill!(a, 1.0)  #定义一个无参函数f(),该函数将数组a所有值均填为1.0
f (generic function with 1 method)

julia> tsk1 = Task(f)   #将f()封装为Task对象,命名为tsk1
Task (runnable) @0x00000000314382f0

julia> tsk2 = @task fill!(a, 2.0)   #使用宏直接将表达式fill!(a, 2.0)封装为Task对象,并命名为tsk2,执行该表达式时将a中元素全填为2.0
Task (runnable) @0x0000000031439150

julia> a    #可以看出,f()和表达式fill!(a, 2.0)都未执行,a中元素还是全为0.0
1×5 Array{Float64,2}:
 0.0  0.0  0.0  0.0  0.0
对比以下错误/正确的代码:

  • × tskerr = Task(fill!(a, 3.0))
  • tskrig = @task (fill!(a, 4.0))

因为 fill!(a,3.0) 会在作为构造函数 Task() 的参数时被执行,所以传入 Task()构造方法的实际并不是函数对象,而是数组。虽然上例中数组 a 的内容被更新,但起作用的并非是创建的 Task 对象,而是因参数表达式的执行造成的。

julia> tskerr = Task(fill!(a, 3.0)) #错误示范
Task (runnable) @0x0000000011485710

julia> a    #表达式被执行
1×5 Array{Float64,2}:
 3.0  3.0  3.0  3.0  3.0

julia> schedule(tskerr) #不能将tskerr加入调度队列,并提示该对象为Array类型
Task (failed) @0x0000000011485710
MethodError: objects of type Array{Float64,2} are not callable
Use square brackets [] for indexing an Array.

julia> tskerr   #查看tskerr,虽为Task类型,但标记为failed状态(执行完了却发生了异常)
Task (failed) @0x0000000011485710
MethodError: objects of type Array{Float64,2} are not callable
Use square brackets [] for indexing an Array.

协程的状态

我们可以使用下面两个函数查看Task对象的状态(返回为turefalse

  • istaskstarted():是否启动
  • istaskdone():是否完成

显然其将全部输出false,既没启动也没完成

julia> istaskstarted(tsk1)
false

julia> istaskstarted(tsk2)
false

julia> istaskdone(tsk1)
false

julia> istaskdone(tsk2)
false

或者我们可以直接输入Task对象的名字:


julia> tsk1
Task (runnable) @0x0000000011eddfb0

julia> tsk2
Task (runnable) @0x0000000011edf990

Task对象状态表:

符号 含义
:runnable 可运行的(可切换到此 Task)
:waiting 阻塞等待中
:quened 调度中,等待启动或恢复
:done 成功结束执行
:failed 以一个没被捕获的异常结束

将Task加入调度队列(执行协程)

将 Task 对象加入调度队列的方法,需要调用 schedule() 函数即可。

启动创建的 Task 对象,需要将它们加入到 Julia 的调度器(Scheduler)中。调度器会在系统内部维护可运行 Task 的队列,并能在多个 Task 之间进行自动的切换。如果Task中的处理过程存在 fetch()wait() 操作(后文介绍),该 Task 会被挂起,同时另外一个 Task 会被切换执行。当导致挂起的操作事件结束,该 Task 便会重启并继续运行。一般情况下,这种切换调用的过程不需要我们关心,Julia 内部会自动处理。例如:

julia> schedule(tsk1)   #将tsk1加入到调度队列
Task (done) @0x0000000011c64e70

julia> a    #可以看出a的值已被更改
1×5 Array{Float64,2}:
 1.0  1.0  1.0  1.0  1.0

julia> schedule(tsk2)
Task (done) @0x0000000011c66570

julia> a
1×5 Array{Float64,2}:
 2.0  2.0  2.0  2.0  2.0

julia> tsk1 #Task对象状态由runnable变为done
Task (done) @0x0000000011c64e70

julia> tsk2
Task (done) @0x0000000011c66570

再用istaskstartedistaskdone查看Task对象状态:

julia> istaskstarted(tsk1)
true

julia> istaskstarted(tsk2)
true

julia> istaskdone(tsk1)
true

julia> istaskdone(tsk2)
true

为了便于使用,Julia提供了宏@async可以直接将表达式或无参函数创建为Task对象并加入调度队列:

julia> @async fill!(a, 5.0)
Task (done) @0x0000000011c66290

julia> a
1×5 Array{Float64,2}:
 5.0  5.0  5.0  5.0  5.0

julia> @async f()
Task (done) @0x0000000011eddcd0

julia> a
1×5 Array{Float64,2}:
 1.0  1.0  1.0  1.0  1.0

demo_sleep.jl

接下来在这个稍微复杂的例子中,定义了两个函数,分别会 sleep 两次且时间间隔不相同,将它们封装为 Task 对象并加入到调度队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
function f()    # 函数 f,期间会 sleep 两次
    println("f starting...")
    println("f sleep 1...")
    sleep(1)
    println("f sleep 2...")
    sleep(2)
    println("f wake up 2.")
    println("f finished.")
end

 function g()   # 函数 g,也会 sleep 两次,但周期与 f 不一样
    println("g starting...")
    println("g sleep 1...")
    sleep(2)
    println("g sleep 2...")
    sleep(0.3)
    println("g wake up 2.")
    println("g finished.")
end

 t1 = Task(f)   # 将两个函数封装为 Task 对象
 t2 = Task(g)

 schedule(t1)   # 加入到调度队列中
 schedule(t2)

 # yield()

 read(stdin, Char)  # 与演示功能无关,仅用于阻塞脚本避免上述异步操作过早退出

运行以上脚本结果如下:

f starting...
f sleep 1...
g starting...
g sleep 1...
f sleep 2...
g sleep 2...
g wake up 2.
g finished.
f wake up 2.
f finished.

从运行的信息可见,在 f()g() 进入 sleep 时,队列会自动切换到另外一个 Task 执行,不会让整个队列停顿阻塞,高效地利用了计算资源。

demo_yield.jl

在 Julia 的协程机制中,除了在 sleep() 等事件时让调度系统自动切换 Task 外,也可以主动进行切换。例如在脚本 demo_yield.jl 中有如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
function f()
    println("f starting...")
    println("f pause 1...")
    yield()                       # 要求切换到别的Task
    println("f back 1 and pause 2...")
    yield()                       # 要求切换到别的Task
    println("f back.")
    println("f finished.")
end

function g()
    println("g starting...")
    println("g pause 1...")
    yield()                      # 要求切换到别的Task
    println("g back 1 and pause 2...")
    yield()                      # 要求切换到别的Task
    println("g back.")
    println("g finished.")
end

t1 = Task(f)
t2 = Task(g)

schedule(t1)
schedule(t2)

read(stdin, Char)   # 与演示功能无关,仅用于阻塞脚本避免上述的异步操作过早退出

运行结果:

f starting...
f pause 1...
g starting...
g pause 1...
f back 1 and pause 2...
g back 1 and pause 2...
f back.
f finished.
g back.
g finished.

可见,在 yield() 调用处会切换到另外一个 Task 执行,然后在另外的 Task 要求切换时,会再次恢复到 yield() 处继续执行后续的语句。

上例中的 yield() 函数,是调度系统提供的较为底层的功能,用于在 Task 中主动要求于当前位置切换到其他任务中。不过在未提供参数时,调用方不会限定待切换的是哪个 Task。如果暂时没有其他可切换的 Task,当前 Task 会立即重新启动,继续运行。

另外还有一个 yieldto() 函数,能够显式地指定要切入哪个 Task。但官方不建议使用该函数,因为其过于底层,在执行切换动作时不会考虑状态及队列秩序(Scheduling)问题,所以也不作过多的介绍。

数据通道

协程是一种轻量而高效的并发机制,但一般只能在本机上实现多逻辑路线的切换,无法跨机器或在集群中实现并行化。所以,在 Julia 的分布式框架中,多进程是主要的并行方案。

但在这种机制中,进程之间的协同工作难免会涉及消息传递与数据通信。为此,Julia 在其并行框架中引入数据通道(Channel)这一基础设施。通道机制不但能够用于跨进程的通信,同样适用于上文所介绍的 Task 之间的协同。为了便于解释,本节以 Task 为例说明数据通道的原理以及用法。

Channel 对象

Julia 中的 Channel 对象其实是一种带有阻塞特性的先进先出(first-infirst-out,FIFO)队列,可以理解为一种管道(Pipe),可同时被多个任务并发地、安全地读写。其原型为:Channel{T}(sz::Int)

若类型 T 未指定,则可容纳任意的类型;若参数 sz>0,则会建立 sz 大小的缓冲区,且 sz 会限定可以容纳的最大元素个数,但如果 sz 为 0,则不会建立缓冲区。

例如, Channel(32) 会创建一个最大容纳 32 个任意类型的对象,而 Channel{Int64}(128) 则创建一个对象,其最多容纳128个类型为 Int64 的元素。在 Channel 对象创建后,便可分别通过 take!()put!() 函数对其进行读写操作。例如:

julia> c = Channel(2)
Channel{Any}(sz_max:2,sz_curr:0)

julia> x1 = put!(c,1)
1

julia> x2 = put!(c,2)
2

julia> y1 = take!(c)
1

julia> y2 = take!(c)
2

可见, put!() 操作在放入元素时同时会返回该元素, take!() 操作会按放入的先后顺序逐个提取。

如果在元素提取完再次执行 take!(),或者放满元素后再次执行 put!() 操作,会发现这两个操作都没有立即返回,而是出现了阻塞等待状态。事实上,每次 take!() 函数被调用时,只会从 Channel 中提取一个元素同时移除该元素;但如果 Channel 为空,该函数阻塞直到有新的元素加入其中。相对地,函数 put!() 会将一个元素放入 Channel 对象中,但如果队列已满,该函数会一直阻塞直到 Channel 中某个元素被移除并有空余的空间。

如果 Channel 没有缓冲区, put!() 的阻塞行为会在 take()! 后立即被释放,而 take!() 的阻塞会在 put!() 执行后立即被释放。

demo_ch1.jl

我们在脚本 demo_ch1.jl 中写入如下的代码,演示这两个操作的用法以及阻塞时的行为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
using Distributed

function putter(c::Channel) # 往通道 c 填数据
    println("putting...")
    for i = 1:5
        put!(c, i)  # 执行填入操作
        t = rand()
        println("Put ", i, "\t sleep ", t)
        sleep(t)    # sleep 随机的 t 秒
    end
end

function taker(c::Channel)
    println("taking...")
    while true
        x = 0
        t = @elapsed x = take!(c)   # 提取元素并记录操作耗时
        println("Take ", x, "\t elapsed ", t)
    end
end


c = Channel(2)  # 创建一个大小为 2 的缓冲区

@async taker(c)    # 异步地执行提取操作

putter(c)   # 为避免 Task 之间在 sleep 等处的自动切换,故同步地执行放入操作

read(stdin, Char)  # 与演示功能无关,仅用于阻塞脚本避免上述的异步操作过早退出

其中, putter() 会间隔随机的秒数在 Channel 中放入元素,而 taker() 则会持续地提取内容。在命令行执行该脚本后,会输出类似如下的结果(因随机数的不同输出每次都会不同):

putting...
Put 1    sleep 0.642530296202334
taking...
Take 1   elapsed 1.301e-6
Put 2    sleep 0.7874780797463183
Take 2   elapsed 0.643385999
Put 3    sleep 0.7070721084935319
Take 3   elapsed 0.788968601
Put 4    sleep 0.6766404851472756
Take 4   elapsed 0.707095801
Put 5    sleep 0.8021641046321559
Take 5   elapsed 0.677384001

可见,每次 take!() 操作并非是立即返回的,而是都有耗时。这是因为元素是逐一放入的,而且相隔了一定时间,导致 take!() 操作总会阻塞等待。
并且输出的Take n timePut n time 相比只增加了约毫秒量级,可以看出线程由阻塞到执行的调度还是很快的。

fetch()

在操作 Channel 对象时,如果不希望删除取得的元素,可使用 fetch() 函数,例如:

julia> c = Channel(2)
Channel{Any}(sz_max:2,sz_curr:0)

julia> put!(c,1)
1

julia> put!(c,2)
2

julia> fetch(c)
1

julia> fetch(c)
1

可见 fetch() 总会取得队首(尾)的元素,但因未删除所以重复的调用都会返回同一结果。 fetch() 操作同样会在 Channel 为空时发生阻塞,而且不适用于无缓冲的 Channel 对象。

判断Channel状态

如果我们不希望在提取元素时发生阻塞,可以利用 Julia 提供的 isready() 函数,先判断 Channel 是否有数据存在,因为 isready() 不会阻塞,因此会立即返回 Channel 的状态。例如:

julia> c = Channel(2)
Channel{Any}(sz_max:2,sz_curr:0)

julia> isready(c)   #未放入任何元素
false

julia> put!(c,1)
1

julia> isready(c)   #有元素返回true
true

我们可以在 Channel 没有准备好时继续执行其他操作,待其有元素时再执行提取操作;或者可通过函数 wait() 等待 Channel 中有可用的元素加入。

如果通过 close() 函数关闭了 Channel,如果Channel中剩余有元素的话,该函数仍会返回 true,即:
julia> c = Channel(2)   #无元素始终为false
Channel{Any}(sz_max:2,sz_curr:0)

julia> isready(c)
false

julia> close(c)

julia> isready(c)
false

julia> c = Channel(2)   #有元是是无论是否使用过close()都返回true
Channel{Any}(sz_max:2,sz_curr:0)

julia> put!(c,1)
1

julia> isready(c)
true

julia> close(c)

julia> isready(c)
true

此时如果进行 put!() 操作会抛出异常,例如:

julia> put!(c,2)
ERROR: InvalidStateException("Channel is closed.", :closed)
Stacktrace:
 [1] check_channel_state at .\channels.jl:167 [inlined]
 [2] put!(::Channel{Any}, ::Int64) at .\channels.jl:323
 [3] top-level scope at REPL[20]:1

但对 fetch()take!() 有些特别,即:

julia> fetch(c)
1

julia> take!(c)
1

julia> take!(c)
ERROR: InvalidStateException("Channel is closed.", :closed)
Stacktrace:
 [1] check_channel_state at .\channels.jl:167 [inlined]
 [2] take_buffered(::Channel{Any}) at .\channels.jl:399
 [3] take!(::Channel{Any}) at .\channels.jl:394
 [4] top-level scope at REPL[22]:1

可见,对关闭的 Channel 仍可调用 fetch()take()! 取得其中余留的数据,但在全部取完后再取数据时会报错。可以使用 isopen() 函数判断 Channel 的状态,避免出现上述的错误。

julia> isopen(c)
false

demo_ch2.jl

除了上述的特点外,Channel 还是可迭代的,可用 for 循环结构遍历其中的元素。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
julia> c = Channel(2)
Channel{Any}(sz_max:2,sz_curr:0)

julia> put!(c,2019)
2019

julia> put!(c,2020)
2020

julia> for x in c
           print(x," ")
       end
2019 2020   #阻塞等待

需要注意的是,这种遍历相当于执行了 take!() 操作,会“消费” Channel 中的元素。一旦其中的元素被取完,for 结构便会阻塞,等待新元素的加入。
要结束上述代码,在REPL中键入Ctrl+c即可


我们采用 for 循环方式修改脚本 demo_ch1.jl 后,在 demo_ch2.jl 中写入新的代码,即:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
using Distributed

function putter(c::Channel)
    println("putting...")
    # 放入5个元素
    for i = 1:5
        put!(c, i)
    end
    println("closing...")
    sleep(1)
    close(c)
    println("put over.")
end

function taker(c::Channel)
    println("taking...")
    for x in c
      print(x, " ")
    end
    println("all is taken.")
end


c = Channel(2)

 @async taker(c)
 @async putter(c)

 read(stdin, Char)

putter() 放完5个元素后,通过 close() 函数关闭 Channel 对象;而 for 循环会持续读取内容。执行该脚本,结果为:

taking...
putting...
1 2 3 4 5 closing...
put over.
all is taken.

仔细对照实现的代码可见,在最后一个元素5打印后,for 循环之后的println("all is taken.")并没有立即执行,因为 for 结构阻塞了。在 putter() 关闭 Channel 对象并输出put over.之后,for 循环才释放阻塞并打印之后的信息all is taken.从而结束整个元素提取过程。其中, close() 函数将 Channel 对象关闭的同时,也会中断 put!()take!() 操作的阻塞等待状态。

demo_mconsumer.jl

基于Channel,我们很容易编写出一个生产者多个消费者的应用。为了便于演示,在脚本 demo_mconsumer.jl 中输入如下的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
using Distributed

const jobs = Channel{Int}(32);
const results = Channel{Tuple}(32);

function do_work()  # 消费者
    for job_id in jobs  # 用for循环从输入通道jobs中读取任务数据
        exec_time = rand()
        sleep(exec_time)    # 用sleep模拟处理过程
        put!(results, (job_id, exec_time))  # 将结果放入输出通道results中
    end
end

function make_jobs(n)   # 生产者
    for i in 1:n                        
        put!(jobs, i)   # 往任务通道jobs中放入数据
    end
end

n = 12;

@async make_jobs(n);    # 将生产者加入任务调度中

for i in 1:4                           
    @async do_work()    # 异步地启动4个消费者,实现并行消费
end

@elapsed while n > 0    # 打印消费者执行后的结果
            job_id, exec_time = take!(results); # 从结果通道中提取数据
            println("$job_id finished in $(round(exec_time,digits=2)) seconds")
            global n = n - 1
        end

其中,有4个消费者会并行地消费 jobs 通道提供的任务数据,然后将结果并发地输出到结果通道 results 中。执行后会得到类似如下的结果:

3 finished in 0.27 seconds
4 finished in 0.4 seconds
6 finished in 0.22 seconds
5 finished in 0.38 seconds
2 finished in 0.91 seconds
1 finished in 0.99 seconds
10 finished in 0.05 seconds
9 finished in 0.22 seconds
11 finished in 0.16 seconds
8 finished in 0.7 seconds
7 finished in 0.79 seconds
12 finished in 0.65 seconds

Plan

wait()尝试

参考文献:

点击链接末尾的回车符可以跳转回引用处~


Yu
WRITTEN BY
Yu
🎓 College Students 📐Physics 💾 Programmer