Julia并行基础-并发、管道1
Julia进程(Worker)
有关并行化与分布式的功能一般均在 Julia 的模块 Distributed 中提供。在 Julia 的并行环境中,将进程称为工作者(Worker)。默认情况下,Julia 启动时只会开启一个 Worker,其 PID 默认为1。运行中可以通过 addprocs() 和 rmprocs() 两个函数随时增加或减少 Worker 的数量,此时其他 Worker 的PID会依次从2开始,并被视为远程 Worker。
也可以在启动时以 julia-pn 的方式启动多进程的 REPL,其中 n 是进程数,通常是 CPU 核数。我们可以随时调用 procs() 函数获得所有 Worker 的 PID,包括本地默认的 Worker;也可以通过 workers() 查看所有远程 Worker 的 PID。
julia> using Distributed #启用Distributed
julia> procs() #获取所有WorkerPID
1-element Array{Int64,1}:
1
julia> addprocs() #不写参数调用貌似会曾在CPU的逻辑处理器数量的Worker,我这是开了超线程的I7-8700ES
12-element Array{Int64,1}:
2
3
⋮
12
13
julia> procs() #获取所有WorkID
13-element Array{Int64,1}:
1
2
⋮
12
13
julia> rmprocs(13) #删掉ID = 13的Worker
Task (done) @0x000000001252f3d0
julia> procs() #再次查看,ID = 13的Work已经消失
12-element Array{Int64,1}:
1
2
⋮
11
12
julia> rmprocs(2:12) #删掉ID为2~12的Worker
Task (done) @0x00000000125248b0
julia> procs() #获取所有Worker的ID已经只有ID = 1的Worker了
1-element Array{Int64,1}:
1
julia> addprocs(2) #增加两个Worker,看来因为之前创建过Worker,即使删除还是会从14开始
2-element Array{Int64,1}:
14
15
julia> procs() #查看所有Worker的ID
3-element Array{Int64,1}:
1
14
15
julia> workers() #查看所有远程Worker的ID,此时不会输出ID = 1的Worker
2-element Array{Int64,1}:
14
15
协程
协程并不是进程或者线程这种系统级别的并发设施,可以看作是用户态的轻量级线程,所以在切换过程中的消耗要比进程、线程上下文切换时要小得多。更为吸引人的是,协程不会涉及让人头疼的同步或锁的问题,因为它不像进程、线程那样是抢占式多任务机制,而是协作式多任务机制。理论上讲,协程比线程的执行效率更高,而且因为占用的额外消耗极少,所以在一个系统中完全可能出现成千上万的协程,远远超出进程或线程的并发程度。
创建协程
在 Julia 中创建协程 Task 非常简单。构造方法Task(f::Function)
与@task
宏可分别将一个函数对象与一个表达式封装为协程 Task 对象;但是函数参数f
必须能够进行无参调用(没有参数或所有参数都有默认值)。
例如,分别以这两种方式创建 Task 对象,用于将数组的所有元素修改为单一的值,如下:
julia> a = zeros(1,5) #创建一个元素值全为0.0的1x5数组
1×5 Array{Float64,2}:
0.0 0.0 0.0 0.0 0.0
julia> f() = fill!(a, 1.0) #定义一个无参函数f(),该函数将数组a所有值均填为1.0
f (generic function with 1 method)
julia> tsk1 = Task(f) #将f()封装为Task对象,命名为tsk1
Task (runnable) @0x00000000314382f0
julia> tsk2 = @task fill!(a, 2.0) #使用宏直接将表达式fill!(a, 2.0)封装为Task对象,并命名为tsk2,执行该表达式时将a中元素全填为2.0
Task (runnable) @0x0000000031439150
julia> a #可以看出,f()和表达式fill!(a, 2.0)都未执行,a中元素还是全为0.0
1×5 Array{Float64,2}:
0.0 0.0 0.0 0.0 0.0
对比以下错误/正确的代码:
- ×
tskerr = Task(fill!(a, 3.0))
- √
tskrig = @task (fill!(a, 4.0))
因为 fill!(a,3.0) 会在作为构造函数 Task() 的参数时被执行,所以传入 Task()构造方法的实际并不是函数对象,而是数组。虽然上例中数组 a 的内容被更新,但起作用的并非是创建的 Task 对象,而是因参数表达式的执行造成的。
julia> tskerr = Task(fill!(a, 3.0)) #错误示范
Task (runnable) @0x0000000011485710
julia> a #表达式被执行
1×5 Array{Float64,2}:
3.0 3.0 3.0 3.0 3.0
julia> schedule(tskerr) #不能将tskerr加入调度队列,并提示该对象为Array类型
Task (failed) @0x0000000011485710
MethodError: objects of type Array{Float64,2} are not callable
Use square brackets [] for indexing an Array.
julia> tskerr #查看tskerr,虽为Task类型,但标记为failed状态(执行完了却发生了异常)
Task (failed) @0x0000000011485710
MethodError: objects of type Array{Float64,2} are not callable
Use square brackets [] for indexing an Array.
协程的状态
我们可以使用下面两个函数查看Task对象的状态(返回为ture
或false
)
istaskstarted()
:是否启动istaskdone()
:是否完成
显然其将全部输出false,既没启动也没完成
julia> istaskstarted(tsk1)
false
julia> istaskstarted(tsk2)
false
julia> istaskdone(tsk1)
false
julia> istaskdone(tsk2)
false
或者我们可以直接输入Task对象的名字:
julia> tsk1
Task (runnable) @0x0000000011eddfb0
julia> tsk2
Task (runnable) @0x0000000011edf990
Task对象状态表:
符号 | 含义 |
---|---|
:runnable | 可运行的(可切换到此 Task) |
:waiting | 阻塞等待中 |
:quened | 调度中,等待启动或恢复 |
:done | 成功结束执行 |
:failed | 以一个没被捕获的异常结束 |
将Task加入调度队列(执行协程)
将 Task 对象加入调度队列的方法,需要调用 schedule()
函数即可。
启动创建的 Task 对象,需要将它们加入到 Julia 的调度器(Scheduler)中。调度器会在系统内部维护可运行 Task 的队列,并能在多个 Task 之间进行自动的切换。如果Task中的处理过程存在 fetch()
或 wait()
操作(后文介绍),该 Task 会被挂起,同时另外一个 Task 会被切换执行。当导致挂起的操作事件结束,该 Task 便会重启并继续运行。一般情况下,这种切换调用的过程不需要我们关心,Julia 内部会自动处理。例如:
julia> schedule(tsk1) #将tsk1加入到调度队列
Task (done) @0x0000000011c64e70
julia> a #可以看出a的值已被更改
1×5 Array{Float64,2}:
1.0 1.0 1.0 1.0 1.0
julia> schedule(tsk2)
Task (done) @0x0000000011c66570
julia> a
1×5 Array{Float64,2}:
2.0 2.0 2.0 2.0 2.0
julia> tsk1 #Task对象状态由runnable变为done
Task (done) @0x0000000011c64e70
julia> tsk2
Task (done) @0x0000000011c66570
再用istaskstarted
和istaskdone
查看Task对象状态:
julia> istaskstarted(tsk1)
true
julia> istaskstarted(tsk2)
true
julia> istaskdone(tsk1)
true
julia> istaskdone(tsk2)
true
为了便于使用,Julia提供了宏@async
可以直接将表达式或无参函数创建为Task对象并加入调度队列:
julia> @async fill!(a, 5.0)
Task (done) @0x0000000011c66290
julia> a
1×5 Array{Float64,2}:
5.0 5.0 5.0 5.0 5.0
julia> @async f()
Task (done) @0x0000000011eddcd0
julia> a
1×5 Array{Float64,2}:
1.0 1.0 1.0 1.0 1.0
demo_sleep.jl
接下来在这个稍微复杂的例子中,定义了两个函数,分别会 sleep 两次且时间间隔不相同,将它们封装为 Task 对象并加入到调度队列
|
|
运行以上脚本结果如下:
f starting...
f sleep 1...
g starting...
g sleep 1...
f sleep 2...
g sleep 2...
g wake up 2.
g finished.
f wake up 2.
f finished.
从运行的信息可见,在 f()
或 g()
进入 sleep 时,队列会自动切换到另外一个 Task 执行,不会让整个队列停顿阻塞,高效地利用了计算资源。
demo_yield.jl
在 Julia 的协程机制中,除了在 sleep()
等事件时让调度系统自动切换 Task 外,也可以主动进行切换。例如在脚本 demo_yield.jl
中有如下代码:
|
|
运行结果:
f starting...
f pause 1...
g starting...
g pause 1...
f back 1 and pause 2...
g back 1 and pause 2...
f back.
f finished.
g back.
g finished.
可见,在 yield()
调用处会切换到另外一个 Task 执行,然后在另外的 Task 要求切换时,会再次恢复到 yield()
处继续执行后续的语句。
上例中的 yield()
函数,是调度系统提供的较为底层的功能,用于在 Task 中主动要求于当前位置切换到其他任务中。不过在未提供参数时,调用方不会限定待切换的是哪个 Task。如果暂时没有其他可切换的 Task,当前 Task 会立即重新启动,继续运行。
另外还有一个 yieldto()
函数,能够显式地指定要切入哪个 Task。但官方不建议使用该函数,因为其过于底层,在执行切换动作时不会考虑状态及队列秩序(Scheduling)问题,所以也不作过多的介绍。
数据通道
协程是一种轻量而高效的并发机制,但一般只能在本机上实现多逻辑路线的切换,无法跨机器或在集群中实现并行化。所以,在 Julia 的分布式框架中,多进程是主要的并行方案。
但在这种机制中,进程之间的协同工作难免会涉及消息传递与数据通信。为此,Julia 在其并行框架中引入数据通道(Channel)这一基础设施。通道机制不但能够用于跨进程的通信,同样适用于上文所介绍的 Task 之间的协同。为了便于解释,本节以 Task 为例说明数据通道的原理以及用法。
Channel 对象
Julia 中的 Channel 对象其实是一种带有阻塞特性的先进先出(first-infirst-out,FIFO)队列,可以理解为一种管道(Pipe),可同时被多个任务并发地、安全地读写。其原型为:Channel{T}(sz::Int)
若类型 T
未指定,则可容纳任意的类型;若参数 sz>0
,则会建立 sz
大小的缓冲区,且 sz
会限定可以容纳的最大元素个数,但如果 sz
为 0,则不会建立缓冲区。
例如, Channel(32)
会创建一个最大容纳 32 个任意类型的对象,而 Channel{Int64}(128)
则创建一个对象,其最多容纳128个类型为 Int64
的元素。在 Channel 对象创建后,便可分别通过 take!()
和 put!()
函数对其进行读写操作。例如:
julia> c = Channel(2)
Channel{Any}(sz_max:2,sz_curr:0)
julia> x1 = put!(c,1)
1
julia> x2 = put!(c,2)
2
julia> y1 = take!(c)
1
julia> y2 = take!(c)
2
可见, put!()
操作在放入元素时同时会返回该元素, take!()
操作会按放入的先后顺序逐个提取。
如果在元素提取完再次执行 take!()
,或者放满元素后再次执行 put!()
操作,会发现这两个操作都没有立即返回,而是出现了阻塞等待状态。事实上,每次 take!()
函数被调用时,只会从 Channel 中提取一个元素同时移除该元素;但如果 Channel 为空,该函数阻塞直到有新的元素加入其中。相对地,函数 put!()
会将一个元素放入 Channel 对象中,但如果队列已满,该函数会一直阻塞直到 Channel 中某个元素被移除并有空余的空间。
如果 Channel 没有缓冲区, put!()
的阻塞行为会在 take()!
后立即被释放,而 take!()
的阻塞会在 put!()
执行后立即被释放。
demo_ch1.jl
我们在脚本 demo_ch1.jl 中写入如下的代码,演示这两个操作的用法以及阻塞时的行为:
|
|
其中, putter()
会间隔随机的秒数在 Channel
中放入元素,而 taker()
则会持续地提取内容。在命令行执行该脚本后,会输出类似如下的结果(因随机数的不同输出每次都会不同):
putting...
Put 1 sleep 0.642530296202334
taking...
Take 1 elapsed 1.301e-6
Put 2 sleep 0.7874780797463183
Take 2 elapsed 0.643385999
Put 3 sleep 0.7070721084935319
Take 3 elapsed 0.788968601
Put 4 sleep 0.6766404851472756
Take 4 elapsed 0.707095801
Put 5 sleep 0.8021641046321559
Take 5 elapsed 0.677384001
可见,每次 take!()
操作并非是立即返回的,而是都有耗时。这是因为元素是逐一放入的,而且相隔了一定时间,导致 take!()
操作总会阻塞等待。
并且输出的Take n time
与Put n time
相比只增加了约毫秒量级,可以看出线程由阻塞到执行的调度还是很快的。
fetch()
在操作 Channel 对象时,如果不希望删除取得的元素,可使用 fetch()
函数,例如:
julia> c = Channel(2)
Channel{Any}(sz_max:2,sz_curr:0)
julia> put!(c,1)
1
julia> put!(c,2)
2
julia> fetch(c)
1
julia> fetch(c)
1
可见 fetch()
总会取得队首(尾)的元素,但因未删除所以重复的调用都会返回同一结果。 fetch()
操作同样会在 Channel 为空时发生阻塞,而且不适用于无缓冲的 Channel 对象。
判断Channel状态
如果我们不希望在提取元素时发生阻塞,可以利用 Julia 提供的 isready()
函数,先判断 Channel 是否有数据存在,因为 isready()
不会阻塞,因此会立即返回 Channel 的状态。例如:
julia> c = Channel(2)
Channel{Any}(sz_max:2,sz_curr:0)
julia> isready(c) #未放入任何元素
false
julia> put!(c,1)
1
julia> isready(c) #有元素返回true
true
我们可以在 Channel 没有准备好时继续执行其他操作,待其有元素时再执行提取操作;或者可通过函数 wait()
等待 Channel 中有可用的元素加入。
close()
函数关闭了 Channel,如果Channel中剩余有元素的话,该函数仍会返回 true
,即:
julia> c = Channel(2) #无元素始终为false
Channel{Any}(sz_max:2,sz_curr:0)
julia> isready(c)
false
julia> close(c)
julia> isready(c)
false
julia> c = Channel(2) #有元是是无论是否使用过close()都返回true
Channel{Any}(sz_max:2,sz_curr:0)
julia> put!(c,1)
1
julia> isready(c)
true
julia> close(c)
julia> isready(c)
true
此时如果进行 put!()
操作会抛出异常,例如:
julia> put!(c,2)
ERROR: InvalidStateException("Channel is closed.", :closed)
Stacktrace:
[1] check_channel_state at .\channels.jl:167 [inlined]
[2] put!(::Channel{Any}, ::Int64) at .\channels.jl:323
[3] top-level scope at REPL[20]:1
但对 fetch()
或 take!()
有些特别,即:
julia> fetch(c)
1
julia> take!(c)
1
julia> take!(c)
ERROR: InvalidStateException("Channel is closed.", :closed)
Stacktrace:
[1] check_channel_state at .\channels.jl:167 [inlined]
[2] take_buffered(::Channel{Any}) at .\channels.jl:399
[3] take!(::Channel{Any}) at .\channels.jl:394
[4] top-level scope at REPL[22]:1
可见,对关闭的 Channel 仍可调用 fetch()
或 take()!
取得其中余留的数据,但在全部取完后再取数据时会报错。可以使用 isopen()
函数判断 Channel 的状态,避免出现上述的错误。
julia> isopen(c)
false
demo_ch2.jl
除了上述的特点外,Channel 还是可迭代的,可用 for 循环结构遍历其中的元素。例如:
|
|
需要注意的是,这种遍历相当于执行了 take!()
操作,会“消费” Channel 中的元素。一旦其中的元素被取完,for 结构便会阻塞,等待新元素的加入。
要结束上述代码,在REPL中键入Ctrl
+c
即可
我们采用 for 循环方式修改脚本 demo_ch1.jl
后,在 demo_ch2.jl
中写入新的代码,即:
|
|
在 putter()
放完5个元素后,通过 close()
函数关闭 Channel 对象;而 for 循环会持续读取内容。执行该脚本,结果为:
taking...
putting...
1 2 3 4 5 closing...
put over.
all is taken.
仔细对照实现的代码可见,在最后一个元素5打印后,for 循环之后的println("all is taken.")
并没有立即执行,因为 for 结构阻塞了。在 putter()
关闭 Channel 对象并输出put over.
之后,for 循环才释放阻塞并打印之后的信息all is taken.
从而结束整个元素提取过程。其中, close()
函数将 Channel 对象关闭的同时,也会中断 put!()
与 take!()
操作的阻塞等待状态。
demo_mconsumer.jl
基于Channel,我们很容易编写出一个生产者多个消费者的应用。为了便于演示,在脚本 demo_mconsumer.jl
中输入如下的代码:
|
|
其中,有4个消费者会并行地消费 jobs 通道提供的任务数据,然后将结果并发地输出到结果通道 results 中。执行后会得到类似如下的结果:
3 finished in 0.27 seconds
4 finished in 0.4 seconds
6 finished in 0.22 seconds
5 finished in 0.38 seconds
2 finished in 0.91 seconds
1 finished in 0.99 seconds
10 finished in 0.05 seconds
9 finished in 0.22 seconds
11 finished in 0.16 seconds
8 finished in 0.7 seconds
7 finished in 0.79 seconds
12 finished in 0.65 seconds
Plan
wait()
尝试
有参考:
点击链接末尾的回车符可以跳转回引用处~
-
Julia | 并行计算(上)
https://mp.weixin.qq.com/s/p8S9Qon3C-qwaT0EzNNLew ↩︎