模組 Atomic

module Atomic: sig .. end

原子參考。

請參閱下方範例。請參閱手冊中的「記憶體模型:困難的部分」章節。


type !'a t 

類型為 'a 的值的原子(可變)參考。

val make : 'a -> 'a t

建立原子參考。

val make_contended : 'a -> 'a t

建立一個在快取行上單獨存在的原子參考。它佔用的記憶體是使用 make v 分配的 4-16 倍。

主要目的是防止偽共享和由此造成的效能下降。當 CPU 執行原子操作時,它會暫時取得包含原子參考的整個快取行的所有權。如果多個原子參考共享同一個快取行,則同時修改這些不相鄰的記憶體區域會變得不可能,這可能會造成瓶頸。因此,一般來說,如果原子參考正在經歷競爭,則將其分配到自己的快取行可能會提高效能。

val get : 'a t -> 'a

取得原子參考的目前值。

val set : 'a t -> 'a -> unit

為原子參考設定一個新值。

val exchange : 'a t -> 'a -> 'a

為原子參考設定一個新值,並傳回目前值。

val compare_and_set : 'a t -> 'a -> 'a -> bool

compare_and_set r seen v 僅在其目前值與 seen 物理上相等時才將 r 的新值設定為 v -- 比較和設定是以原子方式發生。如果比較成功(因此發生設定),則傳回 true,否則傳回 false

val fetch_and_add : int t -> int -> int

fetch_and_add r n 以原子方式將 r 的值增加 n,並傳回目前值(增加之前)。

val incr : int t -> unit

incr r 以原子方式將 r 的值增加 1

val decr : int t -> unit

decr r 以原子方式將 r 的值減少 1

範例

基本的執行緒協調

一個基本的使用案例是擁有以執行緒安全方式更新的全域計數器,例如為了追蹤程式執行的 IO 相關指標。另一個基本的使用案例是協調特定程式中執行緒的終止,例如當一個執行緒找到答案時,或當使用者關閉程式時。

舉例來說,在這裡,我們將嘗試尋找一個雜湊滿足基本屬性的數字。為此,我們將執行多個執行緒,這些執行緒將嘗試隨機數字,直到找到一個可行的數字。

當然,下面的輸出是一個範例執行,每次程式執行時都會變更。

    (* use for termination *)
    let stop_all_threads = Atomic.make false

    (* total number of individual attempts to find a number *)
    let num_attempts = Atomic.make 0

    (* find a number that satisfies [p], by... trying random numbers
       until one fits. *)
    let find_number_where (p:int -> bool) =
      let rand = Random.State.make_self_init() in
      while not (Atomic.get stop_all_threads) do

        let n = Random.State.full_int rand max_int in
        ignore (Atomic.fetch_and_add num_attempts 1 : int);

        if p (Hashtbl.hash n) then (
          Printf.printf "found %d (hash=%d)\n%!" n (Hashtbl.hash n);
          Atomic.set stop_all_threads true; (* signal all threads to stop *)
        )
      done;;


    (* run multiple domains to search for a [n] where [hash n <= 100] *)
    let () =
      let criterion n = n <= 100 in
      let threads =
        Array.init 8
          (fun _ -> Domain.spawn (fun () -> find_number_where criterion))
      in
      Array.iter Domain.join threads;
      Printf.printf "total number of attempts: %d\n%!"
        (Atomic.get num_attempts) ;;

    - : unit = ()
    found 1651745641680046833 (hash=33)
    total number of attempts: 30230350
    

Treiber 堆疊

另一個範例是一個基本的 Treiber 堆疊(一個執行緒安全的堆疊),可以在執行緒之間安全地共享。

請注意,pushpop 都是遞迴的,因為它們嘗試將新的堆疊(多一個或少一個元素)與舊的堆疊交換。這是一種樂觀並行:例如,push stack x 的每次迭代都會取得舊的堆疊 l,並希望在它嘗試用 x::l 取代 l 時,沒有其他人有時間修改該列表。如果 compare_and_set 失敗,則表示我們太樂觀了,必須再次嘗試。

    type 'a stack = 'a list Atomic.t

    let rec push (stack: _ stack) elt : unit =
      let cur = Atomic.get stack in
      let success = Atomic.compare_and_set stack cur (elt :: cur) in
      if not success then
        push stack elt

    let rec pop (stack: _ stack) : _ option =
      let cur = Atomic.get stack in
      match cur with
      | [] -> None
      | x :: tail ->
        let success = Atomic.compare_and_set stack cur tail in
        if success then Some x
        else pop stack

    # let st = Atomic.make []
    # push st 1
    - : unit = ()
    # push st 2
    - : unit = ()
    # pop st
    - : int option = Some 2
    # pop st
    - : int option = Some 1
    # pop st
    - : int option = None