第九章 平行程式設計

在本章中,我們將探討 OCaml 中的平行程式設計功能。OCaml 標準函式庫公開了用於平行程式設計的底層原語。我們建議使用者利用高階的平行程式設計函式庫,例如 domainslib。本教學將首先介紹使用 domainslib 的高階平行程式設計,然後介紹編譯器公開的底層原語。

OCaml 區分並行(concurrency)和平行(parallelism),並為它們提供不同的表達機制。並行是指任務的重疊執行(章節 12.24.2),而平行是指任務的同時執行。特別是,平行任務在時間上重疊,但並行任務可能在時間上重疊,也可能不重疊。任務可以透過相互讓出控制權來並行執行。雖然並行是一種程式結構機制,但平行是一種讓您的程式執行得更快的機制。如果您對 OCaml 中的並行程式設計機制感興趣,請參閱關於效果處理器的章節 12.24 和關於 threads 函式庫的章節 34

1 網域

網域是 OCaml 中平行的單位。Domain 模組提供了建立和管理網域的原語。可以使用 spawn 函式產生新的網域。

Domain.spawn (fun _ -> print_endline "我平行執行了")
平行執行 - : unit Domain.t = <abstr>

spawn 函式與呼叫網域平行執行給定的計算。

網域是重量級實體。每個網域都以 1:1 的方式對應到一個作業系統執行緒。每個網域也有自己的執行階段狀態,其中包括用於分配記憶體的網域本機結構。因此,它們的建立和拆除成本相對較高。

建議程式產生的網域數量不要超過可用的核心數量.

在本教學中,我們將實作、執行和測量平行程式的效能。觀察到的結果取決於目標機器上可用的核心數量。本教學是在具有 4 個核心和 8 個硬體執行緒的 2.3 GHz 四核心 Intel Core i7 MacBook Pro 上撰寫的。對於網域之間幾乎沒有協調的平行程式,並且當機器未處於負載狀態時,預期在 4 個網域上可以達到大約 4 倍的效能。超過 4 個網域後,加速可能小於線性。我們還將使用命令列基準測試工具 hyperfine 來為我們的程式進行基準測試。

1.1 連接網域

我們將使用程式以遞迴方式計算第 n 個費波那契數作為執行範例。用於計算第 n 個費波那契數的循序程式如下。

(* fib.ml *) let n = try int_of_string Sys.argv.(1) with _ -> 1 let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2) let main () = let r = fib n in Printf.printf "fib(%d) = %d\n%!" n r let _ = main ()

程式可以如下進行編譯和基準測試。

$ ocamlopt -o fib.exe fib.ml
$ ./fib.exe 42
fib(42) = 433494437
$ hyperfine './fib.exe 42' # Benchmarking
Benchmark 1: ./fib.exe 42
  Time (mean ± sd):     1.193 s ±  0.006 s    [User: 1.186 s, System: 0.003 s]
  Range (min … max):    1.181 s …  1.202 s    10 runs

我們看到計算第 42 個費波那契數大約需要 1.2 秒。

可以使用 join 函式連接產生的網域,以取得其結果。join 函式會等待目標網域終止。以下程式平行計算第 n 個費波那契數兩次。

(* fib_twice.ml *) let n = int_of_string Sys.argv.(1) let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2) let main () = let d1 = Domain.spawn (fun _ -> fib n) in let d2 = Domain.spawn (fun _ -> fib n) in let r1 = Domain.join d1 in Printf.printf "fib(%d) = %d\n%!" n r1; let r2 = Domain.join d2 in Printf.printf "fib(%d) = %d\n%!" n r2 let _ = main ()

該程式會產生兩個網域來計算第 n 個費波那契數。spawn 函式會傳回一個 Domain.t 值,該值可以連接以取得平行計算的結果。join 函式會封鎖,直到計算執行完成。

$ ocamlopt -o fib_twice.exe fib_twice.ml
$ ./fib_twice.exe 42
fib(42) = 433494437
fib(42) = 433494437
$ hyperfine './fib_twice.exe 42'
Benchmark 1: ./fib_twice.exe 42
  Time (mean ± sd):     1.249 s ±  0.025 s    [User: 2.451 s, System: 0.012 s]
  Range (min … max):    1.221 s …  1.290 s    10 runs

可以看出,由於平行化,計算第 n 個費波那契數兩次所花費的時間幾乎與計算一次所花費的時間相同。

2 Domainslib:用於巢狀平行程式設計的函式庫

讓我們嘗試平行化費波那契函式。兩個遞迴呼叫可以平行執行。但是,透過為每個遞迴呼叫產生網域來天真地平行化它們是行不通的,因為它會產生太多的網域。

(* fib_par1.ml *) let n = try int_of_string Sys.argv.(1) with _ -> 1 let rec fib n = if n < 2 then 1 else begin let d1 = Domain.spawn (fun _ -> fib (n - 1)) in let d2 = Domain.spawn (fun _ -> fib (n - 2)) in Domain.join d1 + Domain.join d2 end let main () = let r = fib n in Printf.printf "fib(%d) = %d\n%!" n r let _ = main ()
fib(1) = 1 val n : int = 1 val fib : int -> int = <fun> val main : unit -> unit = <fun>
$ ocamlopt -o fib_par1.exe fib_par1.ml
$ ./fib_par1.exe 42
Fatal error: exception Failure("failed to allocate domain")

OCaml 同時可以啟用的網域數限制為 128 個。嘗試產生更多網域將引發例外。那麼我們該如何平行化費波那契函式?

2.1 使用 domainslib 平行化費波那契

OCaml 標準函式庫僅提供用於並行和平行程式設計的底層原語,將高階程式設計函式庫留給核心編譯器發行版之外進行開發和發行。Domainslib 是一個用於巢狀平行程式設計的函式庫,以遞迴費波那契計算中可用的平行性為例。讓我們使用 domainslib 來平行化遞迴費波那契程式。建議您使用 opam 套件管理員安裝 domainslib。本教學使用 domainslib 版本 0.5.0。

Domainslib 為產生平行任務和等待其結果提供了 async/await 機制。在此機制的基礎上,domainslib 提供了平行迭代器。核心方面,domainslib 具有工作竊取佇列的有效實作,以便有效地與其他網域共用任務。下面給出費波那契程式的平行實作。

(* fib_par2.ml *)
let num_domains = int_of_string Sys.argv.(1)
let n = int_of_string Sys.argv.(2)

let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2)

module T = Domainslib.Task

let rec fib_par pool n =
  if n > 20 then begin
    let a = T.async pool (fun _ -> fib_par pool (n-1)) in
    let b = T.async pool (fun _ -> fib_par pool (n-2)) in
    T.await pool a + T.await pool b
  end else fib n

let main () =
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
  let res = T.run pool (fun _ -> fib_par pool n) in
  T.teardown_pool pool;
  Printf.printf "fib(%d) = %d\n" n res

let _ = main ()

該程式將網域數量和費波那契函式的輸入分別作為第一個和第二個命令列引數。

讓我們從 main 函式開始。首先,我們設定一個網域池,巢狀平行任務將在該網域池上執行。呼叫 run 函式的網域也將參與執行提交到池的任務。我們在 run 函式中呼叫平行費波那契函式 fib_par。最後,我們拆除池並列印結果。

對於足夠大的輸入 (n > 20),fib_par 函式使用 async 函式在池中非同步產生左側和右側遞迴呼叫。async 函式會傳回結果的承諾。非同步計算的結果是透過使用 await 函式等待承諾來獲得的。await 函式呼叫會封鎖,直到承諾得到解析。

對於小型輸入,fib_par 函式只會呼叫循序費波那契函式 fib。對於小型問題規模,切換到循序模式非常重要。否則,平行化的成本將超過可用的工作。

為了簡單起見,我們使用 ocamlfind 來編譯此程式。建議使用者使用 dune 來建置使用透過 opam 安裝的函式庫的程式。

$ ocamlfind ocamlopt -package domainslib -linkpkg -o fib_par2.exe fib_par2.ml
$ ./fib_par2.exe 1 42
fib(42) = 433494437
$ hyperfine './fib.exe 42' './fib_par2.exe 2 42' \
            './fib_par2.exe 4 42' './fib_par2.exe 8 42'
Benchmark 1: ./fib.exe 42
  Time (mean ± sd):     1.217 s ±  0.018 s    [User: 1.203 s, System: 0.004 s]
  Range (min … max):    1.202 s …  1.261 s    10 runs

Benchmark 2: ./fib_par2.exe 2 42
  Time (mean ± sd):    628.2 ms ±   2.9 ms    [User: 1243.1 ms, System: 4.9 ms]
  Range (min … max):   625.7 ms … 634.5 ms    10 runs

Benchmark 3: ./fib_par2.exe 4 42
  Time (mean ± sd):    337.6 ms ±  23.4 ms    [User: 1321.8 ms, System: 8.4 ms]
  Range (min … max):   318.5 ms … 377.6 ms    10 runs

Benchmark 4: ./fib_par2.exe 8 42
  Time (mean ± sd):    250.0 ms ±   9.4 ms    [User: 1877.1 ms, System: 12.6 ms]
  Range (min … max):   242.5 ms … 277.3 ms    11 runs

Summary
  './fib_par2.exe 8 42' ran
    1.35 ± 0.11 times faster than './fib_par2.exe 4 42'
    2.51 ± 0.10 times faster than './fib_par2.exe 2 42'
    4.87 ± 0.20 times faster than './fib.exe 42'

結果顯示,在 8 個網域的情況下,平行費波那契程式的執行速度比循序版本快 4.87 倍。

2.2 平行迭代建構

許多數值演算法都使用 for 迴圈。平行 for 原語提供了一種平行化此類程式碼的直接方法。讓我們從電腦語言基準測試遊戲中取得 spectral-norm 基準測試並將其平行化。下面給出程式的循序版本。

(* spectralnorm.ml *) let n = try int_of_string Sys.argv.(1) with _ -> 32 let eval_A i j = 1. /. float((i+j)*(i+j+1)/2+i+1) let eval_A_times_u u v = let n = Array.length v - 1 in for i = 0 to n do let vi = ref 0. in for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done; v.(i) <- !vi done let eval_At_times_u u v = let n = Array.length v - 1 in for i = 0 to n do let vi = ref 0. in for j = 0 to n do vi := !vi +. eval_A j i *. u.(j) done; v.(i) <- !vi done let eval_AtA_times_u u v = let w = Array.make (Array.length u) 0.0 in eval_A_times_u u w; eval_At_times_u w v let () = let u = Array.make n 1.0 and v = Array.make n 0.0 in for _i = 0 to 9 do eval_AtA_times_u u v; eval_AtA_times_u v u done; let vv = ref 0.0 and vBv = ref 0.0 in for i=0 to n-1 do vv := !vv +. v.(i) *. v.(i); vBv := !vBv +. u.(i) *. v.(i) done; Printf.printf "%0.9f\n" (sqrt(!vBv /. !vv))

請注意,程式在 eval_A_times_ueval_At_times_u 中有巢狀迴圈。外層迴圈主體的每次迭代都會從 u 讀取,但會寫入 v 中不相交的記憶體位置。因此,外層迴圈的迭代之間沒有相互依賴性,可以平行執行。

以下顯示光譜範數的平行版本。

(* spectralnorm_par.ml *)
let num_domains = try int_of_string Sys.argv.(1) with _ -> 1
let n = try int_of_string Sys.argv.(2) with _ -> 32

let eval_A i j = 1. /. float((i+j)*(i+j+1)/2+i+1)

module T = Domainslib.Task

let eval_A_times_u pool u v =
  let n = Array.length v - 1 in
  T.parallel_for pool ~start:0 ~finish:n ~body:(fun i ->
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done;
    v.(i) <- !vi
  )

let eval_At_times_u pool u v =
  let n = Array.length v - 1 in
  T.parallel_for pool ~start:0 ~finish:n ~body:(fun i ->
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A j i *. u.(j) done;
    v.(i) <- !vi
  )

let eval_AtA_times_u pool u v =
  let w = Array.make (Array.length u) 0.0 in
  eval_A_times_u pool u w; eval_At_times_u pool w v

let () =
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
  let u = Array.make n 1.0  and  v = Array.make n 0.0 in
  T.run pool (fun _ ->
  for _i = 0 to 9 do
    eval_AtA_times_u pool u v; eval_AtA_times_u pool v u
  done);

  let vv = ref 0.0  and  vBv = ref 0.0 in
  for i=0 to n-1 do
    vv := !vv +. v.(i) *. v.(i);
    vBv := !vBv +. u.(i) *. v.(i)
  done;
  T.teardown_pool pool;
  Printf.printf "%0.9f\n" (sqrt(!vBv /. !vv))

請注意,parallel_for 函式與循序版本中的 for 迴圈同構。除了設定和拆解池的樣板程式碼之外,不需要其他變更。

$ ocamlopt -o spectralnorm.exe spectralnorm.ml
$ ocamlfind ocamlopt -package domainslib -linkpkg -o spectralnorm_par.exe \
  spectralnorm_par.ml
$ hyperfine './spectralnorm.exe 4096' './spectralnorm_par.exe 2 4096' \
            './spectralnorm_par.exe 4 4096' './spectralnorm_par.exe 8 4096'
Benchmark 1: ./spectralnorm.exe 4096
  Time (mean ± sd):     1.989 s ±  0.013 s    [User: 1.972 s, System: 0.007 s]
  Range (min … max):    1.975 s …  2.018 s    10 runs

Benchmark 2: ./spectralnorm_par.exe 2 4096
  Time (mean ± sd):     1.083 s ±  0.015 s    [User: 2.140 s, System: 0.009 s]
  Range (min … max):    1.064 s …  1.102 s    10 runs

Benchmark 3: ./spectralnorm_par.exe 4 4096
  Time (mean ± sd):    698.7 ms ±  10.3 ms    [User: 2730.8 ms, System: 18.3 ms]
  Range (min … max):   680.9 ms … 721.7 ms    10 runs

Benchmark 4: ./spectralnorm_par.exe 8 4096
  Time (mean ± sd):    921.8 ms ±  52.1 ms    [User: 6711.6 ms, System: 51.0 ms]
  Range (min … max):   838.6 ms … 989.2 ms    10 runs

Summary
  './spectralnorm_par.exe 4 4096' ran
    1.32 ± 0.08 times faster than './spectralnorm_par.exe 8 4096'
    1.55 ± 0.03 times faster than './spectralnorm_par.exe 2 4096'
    2.85 ± 0.05 times faster than './spectralnorm.exe 4096'

在作者的機器上,該程式在最多 4 個網域時可以很好地擴展,但在 8 個網域時效能較差。回想一下,該機器只有 4 個實體核心。偵錯和修正此效能問題不在本教學課程的範圍內。

3 平行垃圾回收

平行 OCaml 程式可擴展性的重要面向是垃圾回收器 (GC) 的可擴展性。OCaml GC 的設計兼具低延遲和良好的平行可擴展性。OCaml 具有世代垃圾回收器,具有小型次要堆積和大型主要堆積。新物件(直到一定大小)會配置在次要堆積中。每個網域都有其自己的網域本地次要堆積區,新物件會配置到其中,而不會與其他網域同步。當網域耗盡其次要堆積區時,它會呼叫停止世界 (stop-the-world) 的次要堆積收集。在停止世界區段中,所有網域會平行收集其次要堆積區,將倖存者移至主要堆積。

對於主要堆積,每個網域會維護網域本地、大小分段的記憶體池,大型物件和從次要收集倖存下來的物件會配置到其中。擁有網域本地池可避免大多數主要堆積配置的同步。主要堆積會由並行標記清除演算法收集,該演算法在每個主要週期中會涉及幾個短暫的停止世界暫停。

整體而言,使用者應預期垃圾回收器會隨著網域數量的增加而良好擴展,而延遲時間會保持在低檔。如需垃圾回收器設計和評估的詳細資訊,請參閱有關 將平行機制改裝到 OCaml的 ICFP 2020 論文。

4 記憶體模型:簡單的部分

現代處理器和編譯器會積極最佳化程式。這些最佳化會在不影響循序程式的情況下加速,但會導致在平行程式中可見令人驚訝的行為。為了從這些最佳化中獲益,OCaml 採用了寬鬆記憶體模型,該模型精確指定了程式可能會觀察到哪些寬鬆行為。雖然這些模型難以直接編寫程式,但 OCaml 記憶體模型提供了保留循序推理簡單性的方法。

首先,不可變的值可以在多個網域之間自由共用,並且可以平行存取。對於可變資料結構,例如參考儲存格、陣列和可變記錄欄位,程式設計師應避免資料競爭。參考儲存格、陣列和可變記錄欄位被稱為非原子資料結構。當兩個網域同時存取非原子記憶體位置而沒有同步,並且至少一個存取是寫入時,就會發生資料競爭。OCaml 提供了許多引入同步的方法,包括原子變數(第 9.7 節)和互斥鎖(第 9.5 節)。

重要的是,對於無資料競爭 (DRF) 的程式,OCaml 提供循序一致 (SC) 語意 – 此類程式的觀察行為可以透過不同網域中操作的交錯來解釋。此屬性稱為 DRF-SC 保證。此外,在 OCaml 中,DRF-SC 保證是模組化的 – 如果程式的一部分沒有資料競爭,則 OCaml 記憶體模型會確保這些部分具有循序一致性,即使程式的其他部分有資料競爭。即使對於有資料競爭的程式,OCaml 也提供了強大的保證。雖然使用者可能會觀察到非循序一致的行為,但不會發生當機。

如需資料競爭存在時寬鬆行為的更多詳細資訊,請參閱有關記憶體模型困難部分的章節(第 10 章)。

5 封鎖同步

網域可以使用 MutexConditionSemaphore 模組執行封鎖同步。這些模組與用來同步執行緒程式庫建立的執行緒的模組相同(第 34 章)。為了清楚起見,在本章的其餘部分,我們將執行緒程式庫建立的執行緒稱為系統執行緒。下列程式使用互斥鎖和條件變數實作並行堆疊。

module Blocking_stack : sig type 'a t val make : unit -> 'a t val push : 'a t -> 'a -> unit val pop : 'a t -> 'a end = struct type 'a t = { mutable contents: 'a list; mutex : Mutex.t; nonempty : Condition.t } let make () = { contents = []; mutex = Mutex.create (); nonempty = Condition.create () } let push r v = Mutex.lock r.mutex; r.contents <- v::r.contents; Condition.signal r.nonempty; Mutex.unlock r.mutex let pop r = Mutex.lock r.mutex; let rec loop () = match r.contents with | [] -> Condition.wait r.nonempty r.mutex; loop () | x::xs -> r.contents <- xs; x in let res = loop () in Mutex.unlock r.mutex; res end

並行堆疊是使用具有三個欄位的記錄實作的:一個可變欄位 contents,用於儲存堆疊中的元素、一個 mutex,用於控制對 contents 欄位的存取,以及一個條件變數 nonempty,用於向等待堆疊變為非空的已封鎖網域發出訊號。

push 操作會鎖定互斥鎖,並使用一個新的清單更新 contents 欄位,該清單的頭部是要推送的元素,而尾部是舊清單。當鎖定保持時,會向條件變數 nonempty 發出訊號,以便喚醒等待此條件的任何網域。如果有等待的網域,則會喚醒其中一個網域。如果沒有,則 signal 操作不會產生任何影響。

pop 操作會鎖定互斥鎖並檢查堆疊是否為空。如果是,則呼叫網域會使用 wait 基本類型,在條件變數 nonempty 上等待。wait 呼叫會自動暫停目前網域的執行並解除鎖定 mutex。當此網域再次喚醒時(當 wait 呼叫傳回時),它會持有 mutex 上的鎖定。網域會嘗試再次讀取堆疊的內容。如果 pop 操作發現堆疊不為空,則會將 contents 更新為舊清單的尾部,並傳回頭部。

使用 mutex 來控制對共用資源 contents 的存取,會在多個網域使用堆疊時引入足夠的同步。因此,當多個網域平行使用堆疊時,不會發生資料競爭。

5.1 與系統執行緒互動

系統執行緒如何與網域互動?在特定網域上建立的系統執行緒會保持固定到該網域。一次只允許一個系統執行緒在特定網域上執行 OCaml 程式碼。但是,屬於特定網域的系統執行緒可以平行執行 C 程式庫或系統程式碼。屬於不同網域的系統執行緒可以平行執行。

使用系統執行緒時,為執行提供給 Domain.spawn 的計算而建立的執行緒也會被視為系統執行緒。例如,下列程式總共會建立兩個網域(包括初始網域),每個網域各有兩個系統執行緒(包括每個網域的初始系統執行緒)。

(* dom_thr.ml *)
let m = Mutex.create ()
let r = ref None (* protected by m *)

let task () =
  let my_thr_id = Thread.(id (self ())) in
  let my_dom_id :> int = Domain.self () in
  Mutex.lock m;
  begin match !r with
  | None ->
      Printf.printf "Thread %d running on domain %d saw initial write\n%!"
        my_thr_id my_dom_id
  | Some their_thr_id ->
      Printf.printf "Thread %d running on domain %d saw the write by thread %d\n%!"
        my_thr_id my_dom_id their_thr_id;
  end;
  r := Some my_thr_id;
  Mutex.unlock m

let task' () =
  let t = Thread.create task () in
  task ();
  Thread.join t

let main () =
  let d = Domain.spawn task' in
  task' ();
  Domain.join d

let _ = main ()
$ ocamlopt -I +threads unix.cmxa threads.cmxa -o dom_thr.exe dom_thr.ml
$ ./dom_thr.exe
Thread 1 running on domain 1 saw initial write
Thread 0 running on domain 0 saw the write by thread 1
Thread 2 running on domain 1 saw the write by thread 0
Thread 3 running on domain 0 saw the write by thread 2

此程式使用互斥鎖保護的共用參考儲存格,以便在兩個不同網域上執行的不同系統執行緒之間進行通訊。系統執行緒識別碼會在程式中唯一識別系統執行緒。初始網域會取得網域 ID 和執行緒 ID 作為 0。新產生的網域會取得網域 ID 作為 1。

6 與 C 繫結互動

在多個域並行執行的過程中,即使 C 程式碼在執行時沒有釋放「域鎖」,在一個域中執行的 C 程式碼也可能與在其他域中執行的任何 C 程式碼並行執行。在 OCaml 5.0 之前,C 綁定可能會假設如果 OCaml 執行時鎖未釋放,則操作全域 C 狀態(例如,初始化函數本地靜態值)是安全的。在多個域並行執行的情況下,這不再成立。

7 原子操作

互斥鎖、條件變數和信號量用於實現域之間的阻塞同步。對於非阻塞同步,OCaml 提供了 Atomic 變數。顧名思義,非阻塞同步不提供暫停和喚醒域的機制。另一方面,非阻塞同步中使用的基本操作通常會編譯為硬體提供的原子讀-修改-寫操作。例如,以下程式碼並行遞增一個非原子計數器和一個原子計數器。

(* incr.ml *) let twice_in_parallel f = let d1 = Domain.spawn f in let d2 = Domain.spawn f in Domain.join d1; Domain.join d2 let plain_ref n = let r = ref 0 in let f () = for _i=1 to n do incr r done in twice_in_parallel f; Printf.printf "Non-atomic ref count: %d\n" !r let atomic_ref n = let r = Atomic.make 0 in let f () = for _i=1 to n do Atomic.incr r done in twice_in_parallel f; Printf.printf "Atomic ref count: %d\n" (Atomic.get r) let main () = let n = try int_of_string Sys.argv.(1) with _ -> 1 in plain_ref n; atomic_ref n let _ = main ()
$ ocamlopt -o incr.exe incr.ml
$ ./incr.exe 1_000_000
Non-atomic ref count: 1187193
Atomic ref count: 2000000

請注意,使用非原子計數器得到的結果低於人們天真地預期的結果。這是因為非原子 incr 函數等效於

let incr r = let curr = !r in r := curr + 1

請注意,讀取和寫入是兩個獨立的操作,並且整個遞增操作不是以原子方式執行的。當兩個域並行執行此程式碼時,它們都可能讀取計數器的相同值 curr 並將其更新為 curr + 1。因此,效果將只是一個遞增,而不是兩個遞增。另一方面,原子計數器借助於硬體對原子性的支持,以原子方式執行讀取和寫入。原子計數器會返回預期的結果。

原子變數可用於域之間的底層同步。以下範例使用原子變數在兩個域之間交換訊息。

let r = Atomic.make None let sender () = Atomic.set r (Some "Hello") let rec receiver () = match Atomic.get r with | None -> Domain.cpu_relax (); receiver () | Some m -> print_endline m let main () = let s = Domain.spawn sender in let d = Domain.spawn receiver in Domain.join s; Domain.join d let _ = main ()
Hello val r : string option Atomic.t = <abstr> val sender : unit -> unit = <fun> val receiver : unit -> unit = <fun> val main : unit -> unit = <fun>

雖然傳送者和接收者競爭訪問 r,但這不是資料競爭,因為 r 是原子參考。

7.1 無鎖堆疊

Atomic 模組用於實作非阻塞、無鎖資料結構。以下程式碼實作無鎖堆疊。

module Lockfree_stack : sig type 'a t val make : unit -> 'a t val push : 'a t -> 'a -> unit val pop : 'a t -> 'a option end = struct type 'a t = 'a list Atomic.t let make () = Atomic.make [] let rec push r v = let s = Atomic.get r in if Atomic.compare_and_set r s (v::s) then () else (Domain.cpu_relax (); push r v) let rec pop r = let s = Atomic.get r in match s with | [] -> None | x::xs -> if Atomic.compare_and_set r s xs then Some x else (Domain.cpu_relax (); pop r) end

原子堆疊由保存列表的原子參考表示。pushpop 操作使用 compare_and_set 基本操作來嘗試以原子方式更新原子參考。 運算式 compare_and_set r seen v 僅在其目前值與 seen 物理相等時,才會將 r 的值設定為 v。重要的是,比較和更新以原子方式發生。如果比較成功(並且發生更新),則運算式的值為 true,否則為 false

如果 compare_and_set 失敗,則表示還有其他域也同時嘗試更新原子參考。在這種情況下,pushpop 操作會呼叫 Domain.cpu_relax 以短暫退讓,允許競爭的域在重試失敗的操作之前取得進展。這種無鎖堆疊實作也稱為 Treiber 堆疊。