Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pop_opt, peek, peek_opt functions for queue #92

Merged
merged 2 commits into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ let run () =
start_time)
in
for _ = 1 to item_count do
while Option.is_none (Spsc_queue.pop queue) do
while Option.is_none (Spsc_queue.pop_opt queue) do
()
done
done;
Expand Down
43 changes: 24 additions & 19 deletions src_lockfree/michael_scott_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ let create () =

let is_empty { head; _ } = Atomic.get (Atomic.get head) == Nil

let pop { head; _ } =
exception Empty

let pop_opt { head; _ } =
let b = Backoff.create () in
let rec loop () =
let old_head = Atomic.get head in
Expand All @@ -45,6 +47,27 @@ let pop { head; _ } =
in
loop ()

let pop { head; _ } =
let b = Backoff.create () in
let rec loop () =
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> raise Empty
| Next (value, next) when Atomic.compare_and_set head old_head next -> value
| _ ->
Backoff.once b;
loop ()
in
loop ()
Comment on lines +51 to +61
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same function as pop_opt except for very small changes. Should I use [@inline] and a generic pop function ?


let peek_opt { head; _ } =
let old_head = Atomic.get head in
match Atomic.get old_head with Nil -> None | Next (value, _) -> Some value

let peek { head; _ } =
let old_head = Atomic.get head in
match Atomic.get old_head with Nil -> raise Empty | Next (value, _) -> value

let rec fix_tail tail new_tail =
let old_tail = Atomic.get tail in
if
Expand All @@ -66,24 +89,6 @@ let push { tail; _ } value =
if not (Atomic.compare_and_set tail old_tail new_tail) then
fix_tail tail new_tail

let clean_until { head; _ } f =
let b = Backoff.create () in
let rec loop () =
let old_head = Atomic.get head in
match Atomic.get old_head with
| Nil -> ()
| Next (value, next) ->
if not (f value) then
if Atomic.compare_and_set head old_head next then (
Backoff.reset b;
loop ())
else (
Backoff.once b;
loop ())
else ()
in
loop ()

type 'a cursor = 'a node

let snapshot { head; _ } = Atomic.get (Atomic.get head)
Expand Down
24 changes: 18 additions & 6 deletions src_lockfree/michael_scott_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,26 @@ val is_empty : 'a t -> bool
val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. *)

val pop : 'a t -> 'a option
(** [pop q] removes and returns the first element in queue [q], or
exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue [q].

@raise Empty if [q] is empty. *)

val pop_opt : 'a t -> 'a option
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function name is not edited in the line below.

(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. *)

val clean_until : 'a t -> ('a -> bool) -> unit
(** [clean_until q f] drops the prefix of the queue until the element [e],
where [f e] is [true]. If no such element exists, then the queue is
emptied. *)
val peek : 'a t -> 'a
(** [peek q] returns the first element in queue [q].

@raise Empty if [q] is empty. *)

val peek_opt : 'a t -> 'a option
(** [peek_opt q] returns the first element in queue [q], or
returns [None] if the queue is empty. *)

type 'a cursor
(** The type of cursor. *)
Expand Down
29 changes: 28 additions & 1 deletion src_lockfree/mpsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ let rec close (t : 'a t) =
(* Retry *)
close t)

let pop t =
let pop_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Expand All @@ -115,6 +115,33 @@ let pop t =
(* So it can be GC'd *)
Some v)

exception Empty

let pop t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node
~none:(fun () -> raise Empty)
~some:(fun node ->
t.head <- node;
let v = node.value in
node.value <- Obj.magic ();
(* So it can be GC'd *)
v)

let peek_opt t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> None) ~some:(fun node -> Some node.value)

let peek t =
let p = t.head in
(* [p] is the previously-popped item. *)
let node = Atomic.get p.next in
Node.fold node ~none:(fun () -> raise Empty) ~some:(fun node -> node.value)

let is_empty t =
Node.fold (Atomic.get t.head.next)
~none:(fun () -> true)
Expand Down
51 changes: 38 additions & 13 deletions src_lockfree/mpsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,58 @@ exception Closed
val create : unit -> 'a t
(** [create ()] returns a new empty queue. *)

val is_empty : 'a t -> bool
(** [is_empty q] is [true] if calling [pop] would return [None].

@raise Closed if [q] is closed and empty. *)

val close : 'a t -> unit
(** [close q] marks [q] as closed, preventing any further items from
being pushed by the producers (i.e. with {!push}).

@raise Closed if [q] has already been closed. *)

val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. This
(** [push q v] adds the element [v] at the end of the queue [q]. This
can be used safely by multiple producer domains, in parallel with
the other operations.

@raise Closed if [q] is closed. *)

val pop : 'a t -> 'a option
(** [pop q] removes and returns the first element in queue [q] or
(** {2 Consumer-only functions} *)

exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue [q].

@raise Empty if [q] is empty.

@raise Closed if [q] is closed and empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q] or
returns [None] if the queue is empty.

@raise Closed if [q] is closed and empty. *)

val push_head : 'a t -> 'a -> unit
(** [push_head q v] adds the element [v] at the head of the queue
[q]. This can only be used by the consumer (if run in parallel
with {!pop}, the item might be skipped).
val peek : 'a t -> 'a
(** [peek q] returns the first element in queue [q].

@raise Empty if [q] is empty.

@raise Closed if [q] is closed and empty. *)

val is_empty : 'a t -> bool
(** [is_empty q] is [true] if calling [pop] would return [None].
val peek_opt : 'a t -> 'a option
(** [peek_opt q] returns the first element in queue [q] or
returns [None] if the queue is empty.

@raise Closed if [q] is closed and empty. *)

val close : 'a t -> unit
(** [close q] marks [q] as closed, preventing any further items from
being pushed by the producers (i.e. with {!push}).
val push_head : 'a t -> 'a -> unit
(** [push_head q v] adds the element [v] at the head of the queue
[q]. This can only be used by the consumer (if run in parallel
with {!pop}, the item might be skipped).

@raise Closed if [q] has already been closed. *)
@raise Closed if [q] is closed and empty. *)
41 changes: 41 additions & 0 deletions src_lockfree/spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,31 @@ let push { array; head; tail; mask; _ } element =
Array.set array (tail_val land mask) (Some element);
Atomic.set tail (tail_val + 1))

let try_push { array; head; tail; mask; _ } element =
let size = mask + 1 in
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val + size == tail_val then false
else (
Array.set array (tail_val land mask) (Some element);
Atomic.set tail (tail_val + 1);
true)

exception Empty

let pop { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then raise Empty
else
let index = head_val land mask in
let v = Array.get array index in
(* allow gc to collect it *)
Array.set array index None;
Atomic.set head (head_val + 1);
match v with None -> assert false | Some v -> v

let pop_opt { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then None
Expand All @@ -61,4 +85,21 @@ let pop { array; head; tail; mask; _ } =
assert (Option.is_some v);
v

let peek_opt { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then None
else
let v = Array.get array @@ (head_val land mask) in
assert (Option.is_some v);
v

let peek { array; head; tail; mask; _ } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
if head_val == tail_val then raise Empty
else
let v = Array.get array @@ (head_val land mask) in
match v with None -> assert false | Some v -> v

let size { head; tail; _ } = Atomic.get tail - Atomic.get head
53 changes: 43 additions & 10 deletions src_lockfree/spsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,59 @@ type 'a t
(** Type of single-producer single-consumer non-resizable domain-safe
queue that works in FIFO order. *)

exception Full
(** Raised when {!push} is applied to a full queue. *)

val create : size_exponent:int -> 'a t
(** [create ~size_exponent:int] returns a new queue of maximum size
[2^size_exponent] and initially empty. *)

val size : 'a t -> int
(** [size] returns the size of the queue. This method linearizes only when called
from either consumer or producer domain. Otherwise, it is safe to call but
provides only an *indication* of the size of the structure. *)

(** {1 Producer functions} *)

exception Full
(** Raised when {!push} is applied to a full queue. *)

val push : 'a t -> 'a -> unit
(** [push q v] adds the element [v] at the end of the queue [q]. This
method can be used by at most one domain at the time.

@raise [Full] if the queue is full.
@raise Full if the queue is full.
*)

val try_push : 'a t -> 'a -> bool
(** [try_push q v] tries to add the element [v] at the end of the
queue [q]. It fails it the queue [q] is full. This method can be
used by at most one domain at the time.
*)

(** {2 Consumer functions} *)

exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue [q]. This
method can be used by at most one domain at the time.

@raise Empty if [q] is empty.
*)

val pop : 'a t -> 'a option
(** [pop q] removes and returns the first element in queue [q], or
val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. This method can be used by
at most one domain at the time. *)

val size : 'a t -> int
(** [size] returns the size of the queue. This method linearizes only when called
from either consumer or producer domain. Otherwise, it is safe to call but
provides only an *indication* of the size of the structure. *)
val peek : 'a t -> 'a
(** [peek q] returns the first element in queue [q]. This method can be
used by at most one domain at the time.

@raise Empty if [q] is empty.
*)

val peek_opt : 'a t -> 'a option
(** [peek_opt q] returns the first element in queue [q], or [None]
if the queue is empty. This method can be used by at most one
domain at the time.
*)
24 changes: 24 additions & 0 deletions src_lockfree/treiber_stack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,31 @@ let push q v =
in
loop b

exception Empty

let pop q =
let rec loop b =
let s = Atomic.get q.head in
match s with
| Nil -> raise Empty
| Next (v, next) ->
if Atomic.compare_and_set q.head s next then v
else (
Backoff.once b;
loop b)
in

let s = Atomic.get q.head in
match s with
| Nil -> raise Empty
| Next (v, next) ->
if Atomic.compare_and_set q.head s next then v
else
let b = Backoff.create () in
Backoff.once b;
loop b

let pop_opt q =
let rec loop b =
let s = Atomic.get q.head in
match s with
Expand Down
17 changes: 14 additions & 3 deletions src_lockfree/treiber_stack.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ val is_empty : 'a t -> bool
val push : 'a t -> 'a -> unit
(** [push s v] adds the element [v] at the top of stack [s]. *)

val pop : 'a t -> 'a option
(** [pop a] removes and returns the topmost element in the
stack [s], or returns [None] if the stack is empty. *)
exception Empty
(** Raised when {!pop} or {!peek} is applied to an empty queue. *)

val pop : 'a t -> 'a
(** [pop s] removes and returns the topmost element in the
stack [s].

@raise Empty if [a] is empty.
*)

val pop_opt : 'a t -> 'a option
(** [pop_opt s] removes and returns the topmost element in the
stack [s], or returns [None] if the stack is empty.
*)
Loading