模組 Stdlib.Atomic

module Atomic: Atomic

type !'a t 

一個指向 'a 型別數值的原子性(可變)參考。

val make : 'a -> 'a t

建立一個原子性參考。

val make_contended : 'a -> 'a t

建立一個在快取行上獨立存在的原子性參考。它佔用的記憶體空間是使用 make v 分配的 4-16 倍。

主要目的是防止偽共享以及由此導致的效能下降。當 CPU 執行原子操作時,它會暫時取得包含原子性參考的整個快取行的所有權。如果多個原子性參考共享同一個快取行,則同時修改這些不相連的記憶體區域將變得不可能,這可能會造成瓶頸。因此,一般來說,如果原子性參考正在經歷競爭,將其分配到自己的快取行可能會提高效能。

val get : 'a t -> 'a

取得原子性參考的目前值。

val set : 'a t -> 'a -> unit

設定原子性參考的新值。

val exchange : 'a t -> 'a -> 'a

設定原子性參考的新值,並傳回目前值。

val compare_and_set : 'a t -> 'a -> 'a -> bool

compare_and_set r seen v 只有在其目前值與 seen 進行物理上的相等比較時,才會將 r 的新值設定為 v -- 這個比較和設定是原子性地發生的。如果比較成功(因此發生了設定),則傳回 true,否則傳回 false

val fetch_and_add : int t -> int -> int

fetch_and_add r n 以原子方式將 r 的值增加 n,並傳回目前值(增加之前的值)。

val incr : int t -> unit

incr r 以原子方式將 r 的值增加 1

val decr : int t -> unit

decr r 以原子方式將 r 的值減少 1

範例

基礎執行緒協調

一個基本的使用案例是擁有以執行緒安全方式更新的全域計數器,例如,用來追蹤程式執行的 IO 操作的某些指標。另一個基本的使用案例是在給定程式中協調執行緒的終止,例如,當一個執行緒找到答案,或當使用者關閉程式時。

在這裡,舉例來說,我們將嘗試尋找一個其雜湊滿足基本屬性的數字。為此,我們將執行多個執行緒,這些執行緒將嘗試隨機數字,直到找到一個可用的數字。

當然,下面的輸出是一個範例執行,每次執行程式時都會改變。

    (* use for termination *)
    let stop_all_threads = Atomic.make false

    (* total number of individual attempts to find a number *)
    let num_attempts = Atomic.make 0

    (* find a number that satisfies [p], by... trying random numbers
       until one fits. *)
    let find_number_where (p:int -> bool) =
      let rand = Random.State.make_self_init() in
      while not (Atomic.get stop_all_threads) do

        let n = Random.State.full_int rand max_int in
        ignore (Atomic.fetch_and_add num_attempts 1 : int);

        if p (Hashtbl.hash n) then (
          Printf.printf "found %d (hash=%d)\n%!" n (Hashtbl.hash n);
          Atomic.set stop_all_threads true; (* signal all threads to stop *)
        )
      done;;


    (* run multiple domains to search for a [n] where [hash n <= 100] *)
    let () =
      let criterion n = n <= 100 in
      let threads =
        Array.init 8
          (fun _ -> Domain.spawn (fun () -> find_number_where criterion))
      in
      Array.iter Domain.join threads;
      Printf.printf "total number of attempts: %d\n%!"
        (Atomic.get num_attempts) ;;

    - : unit = ()
    found 1651745641680046833 (hash=33)
    total number of attempts: 30230350
    

Treiber 堆疊

另一個範例是基本的 Treiber 堆疊(一個執行緒安全的堆疊),可以在執行緒之間安全地共享。

請注意 pushpop 都是遞迴的,因為它們會嘗試將新堆疊(多一個或少一個元素)與舊堆疊交換。這是樂觀並行:例如,push stack x 的每次迭代都會取得舊堆疊 l,並希望在它嘗試將 l 替換為 x::l 時,沒有其他人有時間修改該列表。如果 compare_and_set 失敗,則表示我們太過樂觀,必須再次嘗試。

    type 'a stack = 'a list Atomic.t

    let rec push (stack: _ stack) elt : unit =
      let cur = Atomic.get stack in
      let success = Atomic.compare_and_set stack cur (elt :: cur) in
      if not success then
        push stack elt

    let rec pop (stack: _ stack) : _ option =
      let cur = Atomic.get stack in
      match cur with
      | [] -> None
      | x :: tail ->
        let success = Atomic.compare_and_set stack cur tail in
        if success then Some x
        else pop stack

    # let st = Atomic.make []
    # push st 1
    - : unit = ()
    # push st 2
    - : unit = ()
    # pop st
    - : int option = Some 2
    # pop st
    - : int option = Some 1
    # pop st
    - : int option = None