Skip to content

Commit 0c6015f

Browse files
committed
Add optional capacity argument to Queue.create and Stack.create
1 parent 1747c24 commit 0c6015f

12 files changed

+363
-87
lines changed

src/kcas_data/elems.ml

+10
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,13 @@ let rev_prepend_to_seq t tl =
3838
| Right t' -> t'
3939
in
4040
prepend_to_seq t tl ()
41+
42+
let rec of_list_rev tl length = function
43+
| [] -> tl
44+
| x :: xs ->
45+
let length = length + 1 in
46+
of_list_rev { value = x; tl; length } length xs
47+
48+
let[@inline] of_list_rev = function
49+
| [] -> empty
50+
| x :: xs -> of_list_rev (singleton x) 1 xs

src/kcas_data/elems.mli

+1
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ val prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
1818
val to_seq : 'a t -> 'a Seq.t
1919
val of_seq_rev : 'a Seq.t -> 'a t
2020
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
21+
val of_list_rev : 'a list -> 'a t

src/kcas_data/list_with_capacity.ml

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
open Kcas
2+
3+
type 'a t = { capacity : int; length : int; list : 'a list; limit : int }
4+
5+
let empty_unlimited =
6+
{ capacity = Int.max_int; length = 0; list = []; limit = Int.max_int }
7+
8+
let[@inline] make_empty ~capacity =
9+
if capacity = Int.max_int then empty_unlimited
10+
else { capacity; length = 0; list = []; limit = capacity }
11+
12+
let[@inline] make ~capacity ~length ~list ~limit =
13+
{ capacity; length; list; limit }
14+
15+
let[@inline] to_rev_elems t = Elems.of_list_rev t.list
16+
let[@inline] is_empty t = t.length = 0
17+
let[@inline] length t = t.length
18+
let[@inline] capacity t = t.capacity
19+
let[@inline] limit t = t.limit
20+
let[@inline] list t = t.list
21+
22+
let[@inline] tl_safe = function
23+
| { list = []; _ } as t -> t
24+
| { capacity; length; list = _ :: list; _ } as t ->
25+
let limit = if capacity = Int.max_int then capacity else t.limit in
26+
{ capacity; length = length - 1; list; limit }
27+
28+
let[@inline] tl_or_retry = function
29+
| { list = []; _ } -> Retry.later ()
30+
| { capacity; length; list = _ :: list; _ } as t ->
31+
let limit = if capacity = Int.max_int then capacity else t.limit in
32+
{ capacity; length = length - 1; list; limit }
33+
34+
let[@inline] hd_opt t = match t.list with [] -> None | x :: _ -> Some x
35+
36+
let[@inline] hd_or_retry t =
37+
match t.list with [] -> Retry.later () | x :: _ -> x
38+
39+
let[@inline] hd_unsafe t = List.hd t.list
40+
41+
let[@inline] cons_safe x ({ capacity; _ } as t) =
42+
if capacity = Int.max_int then
43+
let { length; list; _ } = t in
44+
{ capacity; length = length + 1; list = x :: list; limit = capacity }
45+
else
46+
let { length; limit; _ } = t in
47+
if length < limit then
48+
let { list; _ } = t in
49+
{ capacity; length = length + 1; list = x :: list; limit }
50+
else t
51+
52+
let[@inline] cons_or_retry x ({ capacity; _ } as t) =
53+
if capacity = Int.max_int then
54+
let { length; list; _ } = t in
55+
{ capacity; length = length + 1; list = x :: list; limit = capacity }
56+
else
57+
let { length; limit; _ } = t in
58+
if length < limit then
59+
let { list; _ } = t in
60+
{ capacity; length = length + 1; list = x :: list; limit }
61+
else Retry.later ()
62+
63+
let[@inline] move ({ capacity; _ } as t) =
64+
if capacity = Int.max_int then empty_unlimited
65+
else
66+
let { length; _ } = t in
67+
if length = 0 then t
68+
else
69+
let { limit; _ } = t in
70+
{ capacity; length = 0; list = []; limit = limit - length }
71+
72+
let move_last ({ capacity; _ } as t) =
73+
if capacity = Int.max_int then empty_unlimited
74+
else
75+
let { length; _ } = t in
76+
let limit = capacity - length in
77+
if length = 0 && t.limit = limit then t
78+
else { capacity; length = 0; list = []; limit }
79+
80+
let[@inline] clear ({ capacity; _ } as t) =
81+
if capacity = Int.max_int then empty_unlimited
82+
else if t.length = 0 && t.limit = capacity then t
83+
else make_empty ~capacity
84+
85+
let rec prepend_to_seq xs tl =
86+
match xs with
87+
| [] -> tl
88+
| x :: xs -> fun () -> Seq.Cons (x, prepend_to_seq xs tl)
89+
90+
let to_seq { list; _ } = prepend_to_seq list Seq.empty
91+
92+
let rev_prepend_to_seq { length; list; _ } tl =
93+
if length <= 1 then prepend_to_seq list tl
94+
else
95+
let t = ref (`Original list) in
96+
fun () ->
97+
let t =
98+
match !t with
99+
| `Original t' ->
100+
(* This is domain safe as the result is always equivalent. *)
101+
let t' = List.rev t' in
102+
t := `Reversed t';
103+
t'
104+
| `Reversed t' -> t'
105+
in
106+
prepend_to_seq t tl ()
107+
108+
let of_list ?(capacity = Int.max_int) list =
109+
let length = List.length list in
110+
let limit = Int.min 0 (capacity - length) in
111+
{ capacity; length; list; limit }
112+
113+
let of_seq_rev ?capacity xs =
114+
of_list ?capacity (Seq.fold_left (fun xs x -> x :: xs) [] xs)

src/kcas_data/list_with_capacity.mli

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
type !'a t
2+
3+
val empty_unlimited : 'a t
4+
val make_empty : capacity:int -> 'a t
5+
val make : capacity:int -> length:int -> list:'a list -> limit:int -> 'a t
6+
val is_empty : 'a t -> bool
7+
val length : 'a t -> int
8+
val capacity : 'a t -> int
9+
val limit : 'a t -> int
10+
val list : 'a t -> 'a list
11+
val cons_safe : 'a -> 'a t -> 'a t
12+
val cons_or_retry : 'a -> 'a t -> 'a t
13+
val move : 'a t -> 'a t
14+
val move_last : 'a t -> 'a t
15+
val clear : 'a t -> 'a t
16+
val to_rev_elems : 'a t -> 'a Elems.t
17+
val to_seq : 'a t -> 'a Seq.t
18+
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
19+
val of_seq_rev : ?capacity:int -> 'a Seq.t -> 'a t
20+
val tl_safe : 'a t -> 'a t
21+
val tl_or_retry : 'a t -> 'a t
22+
val hd_opt : 'a t -> 'a option
23+
val hd_or_retry : 'a t -> 'a
24+
val hd_unsafe : 'a t -> 'a

src/kcas_data/queue.ml

+95-52
Original file line numberDiff line numberDiff line change
@@ -2,52 +2,78 @@ open Kcas
22

33
type 'a t = {
44
front : 'a Elems.t Loc.t;
5-
middle : 'a Elems.t Loc.t;
6-
back : 'a Elems.t Loc.t;
5+
back : 'a List_with_capacity.t Loc.t;
6+
middle : 'a List_with_capacity.t Loc.t;
77
}
88

9-
let alloc ~front ~middle ~back =
9+
let alloc ~front ~back ~middle =
1010
(* We allocate locations in specific order to make most efficient use of the
1111
splay-tree based transaction log. *)
1212
let front = Loc.make ~padded:true front
13-
and middle = Loc.make ~padded:true middle
14-
and back = Loc.make ~padded:true back in
13+
and back = Loc.make ~padded:true back
14+
and middle = Loc.make ~padded:true middle in
1515
Multicore_magic.copy_as_padded { back; middle; front }
1616

17-
let create () = alloc ~front:Elems.empty ~middle:Elems.empty ~back:Elems.empty
17+
let create ?(capacity = Int.max_int) () =
18+
if capacity < 0 then invalid_arg "Queue.create: capacity must be non-negative";
19+
let back = List_with_capacity.make_empty ~capacity in
20+
alloc ~front:Elems.empty ~back ~middle:List_with_capacity.empty_unlimited
1821

1922
let copy q =
20-
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.middle, Xt.get ~xt q.back) in
21-
let front, middle, back = Xt.commit { tx } in
22-
alloc ~front ~middle ~back
23+
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.back, Xt.get ~xt q.middle) in
24+
let front, back, middle = Xt.commit { tx } in
25+
alloc ~front ~back ~middle
2326

2427
module Xt = struct
2528
let is_empty ~xt t =
2629
(* We access locations in order of allocation to make most efficient use of
2730
the splay-tree based transaction log. *)
2831
Xt.get ~xt t.front == Elems.empty
29-
&& Xt.get ~xt t.middle == Elems.empty
30-
&& Xt.get ~xt t.back == Elems.empty
31-
32-
let length ~xt { back; middle; front } =
33-
Elems.length (Xt.get ~xt front)
34-
+ Elems.length (Xt.get ~xt middle)
35-
+ Elems.length (Xt.get ~xt back)
36-
37-
let add ~xt x q = Xt.modify ~xt q.back @@ Elems.cons x
32+
&& List_with_capacity.is_empty (Xt.get ~xt t.back)
33+
&& Xt.get ~xt t.middle == List_with_capacity.empty_unlimited
34+
35+
let length ~xt q =
36+
Elems.length (Xt.get ~xt q.front)
37+
+ List_with_capacity.length (Xt.get ~xt q.back)
38+
+ List_with_capacity.length (Xt.get ~xt q.middle)
39+
40+
let try_add ~xt x q =
41+
let lwc = Xt.update ~xt q.back (List_with_capacity.cons_safe x) in
42+
let capacity = List_with_capacity.capacity lwc in
43+
capacity = Int.max_int
44+
||
45+
let back_length = List_with_capacity.length lwc in
46+
back_length < List_with_capacity.limit lwc
47+
||
48+
let other_length =
49+
List_with_capacity.length (Xt.get ~xt q.middle)
50+
+ Elems.length (Xt.get ~xt q.front)
51+
in
52+
let limit = capacity - other_length in
53+
back_length < limit
54+
&&
55+
(Xt.set ~xt q.back
56+
(List_with_capacity.make ~capacity ~length:(back_length + 1)
57+
~list:(x :: List_with_capacity.list lwc)
58+
~limit);
59+
true)
60+
61+
let add ~xt x q = Retry.unless (try_add ~xt x q)
3862
let push = add
3963

4064
(** Cooperative helper to move elems from back to middle. *)
41-
let back_to_middle ~middle ~back =
65+
let back_to_middle ~back ~middle =
4266
let tx ~xt =
43-
let xs = Xt.exchange ~xt back Elems.empty in
44-
if xs == Elems.empty || Xt.exchange ~xt middle xs != Elems.empty then
45-
raise_notrace Exit
67+
let xs = Xt.update ~xt back List_with_capacity.move in
68+
if
69+
List_with_capacity.length xs = 0
70+
|| Xt.exchange ~xt middle xs != List_with_capacity.empty_unlimited
71+
then raise_notrace Exit
4672
in
4773
try Xt.commit { tx } with Exit -> ()
4874

49-
let take_opt_finish ~xt front elems =
50-
let elems = Elems.rev elems in
75+
let take_opt_finish ~xt front lwc =
76+
let elems = List_with_capacity.to_rev_elems lwc in
5177
Xt.set ~xt front (Elems.tl_safe elems);
5278
Elems.hd_opt elems
5379

@@ -58,17 +84,19 @@ module Xt = struct
5884
else
5985
let middle = t.middle and back = t.back in
6086
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
61-
back_to_middle ~middle ~back;
62-
let elems = Xt.exchange ~xt middle Elems.empty in
63-
if elems != Elems.empty then take_opt_finish ~xt front elems
87+
back_to_middle ~back ~middle;
88+
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
89+
if lwc != List_with_capacity.empty_unlimited then
90+
take_opt_finish ~xt front lwc
6491
else
65-
let elems = Xt.exchange ~xt back Elems.empty in
66-
if elems != Elems.empty then take_opt_finish ~xt front elems else None
92+
let lwc = Xt.update ~xt back List_with_capacity.move_last in
93+
if List_with_capacity.length lwc <> 0 then take_opt_finish ~xt front lwc
94+
else None
6795

6896
let take_blocking ~xt q = Xt.to_blocking ~xt (take_opt q)
6997

70-
let peek_opt_finish ~xt front elems =
71-
let elems = Elems.rev elems in
98+
let peek_opt_finish ~xt front lwc =
99+
let elems = List_with_capacity.to_rev_elems lwc in
72100
Xt.set ~xt front elems;
73101
Elems.hd_opt elems
74102

@@ -79,57 +107,72 @@ module Xt = struct
79107
else
80108
let middle = t.middle and back = t.back in
81109
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
82-
back_to_middle ~middle ~back;
83-
let elems = Xt.exchange ~xt middle Elems.empty in
84-
if elems != Elems.empty then peek_opt_finish ~xt front elems
110+
back_to_middle ~back ~middle;
111+
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
112+
if lwc != List_with_capacity.empty_unlimited then
113+
peek_opt_finish ~xt front lwc
85114
else
86-
let elems = Xt.exchange ~xt back Elems.empty in
87-
if elems != Elems.empty then peek_opt_finish ~xt front elems else None
115+
let lwc = Xt.update ~xt back List_with_capacity.move_last in
116+
if List_with_capacity.length lwc <> 0 then peek_opt_finish ~xt front lwc
117+
else None
88118

89119
let peek_blocking ~xt q = Xt.to_blocking ~xt (peek_opt q)
90120

91121
let clear ~xt t =
92122
Xt.set ~xt t.front Elems.empty;
93-
Xt.set ~xt t.middle Elems.empty;
94-
Xt.set ~xt t.back Elems.empty
123+
Xt.modify ~xt t.back List_with_capacity.clear;
124+
Xt.set ~xt t.middle List_with_capacity.empty_unlimited
95125

96126
let swap ~xt q1 q2 =
97127
let front = Xt.get ~xt q1.front
98-
and middle = Xt.get ~xt q1.middle
99-
and back = Xt.get ~xt q1.back in
128+
and back = Xt.get ~xt q1.back
129+
and middle = Xt.get ~xt q1.middle in
100130
let front = Xt.exchange ~xt q2.front front
101-
and middle = Xt.exchange ~xt q2.middle middle
102-
and back = Xt.exchange ~xt q2.back back in
131+
and back = Xt.exchange ~xt q2.back back
132+
and middle = Xt.exchange ~xt q2.middle middle in
103133
Xt.set ~xt q1.front front;
104-
Xt.set ~xt q1.middle middle;
105-
Xt.set ~xt q1.back back
134+
Xt.set ~xt q1.back back;
135+
Xt.set ~xt q1.middle middle
106136

107137
let seq_of ~front ~middle ~back =
108138
(* Sequence construction is lazy, so this function is O(1). *)
109139
Seq.empty
110-
|> Elems.rev_prepend_to_seq back
111-
|> Elems.rev_prepend_to_seq middle
140+
|> List_with_capacity.rev_prepend_to_seq back
141+
|> List_with_capacity.rev_prepend_to_seq middle
112142
|> Elems.prepend_to_seq front
113143

114144
let to_seq ~xt t =
115145
let front = Xt.get ~xt t.front
116-
and middle = Xt.get ~xt t.middle
117-
and back = Xt.get ~xt t.back in
146+
and back = Xt.get ~xt t.back
147+
and middle = Xt.get ~xt t.middle in
118148
seq_of ~front ~middle ~back
119149

120150
let take_all ~xt t =
121151
let front = Xt.exchange ~xt t.front Elems.empty
122-
and middle = Xt.exchange ~xt t.middle Elems.empty
123-
and back = Xt.exchange ~xt t.back Elems.empty in
152+
and back = Xt.update ~xt t.back List_with_capacity.clear
153+
and middle = Xt.exchange ~xt t.middle List_with_capacity.empty_unlimited in
124154
seq_of ~front ~middle ~back
125155
end
126156

127157
let is_empty q = Kcas.Xt.commit { tx = Xt.is_empty q }
128158
let length q = Kcas.Xt.commit { tx = Xt.length q }
129159

160+
let try_add x q =
161+
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
162+
let lwc = Loc.fenceless_update q.back (List_with_capacity.cons_safe x) in
163+
let capacity = List_with_capacity.capacity lwc in
164+
capacity = Int.max_int
165+
||
166+
let back_length = List_with_capacity.length lwc in
167+
back_length < List_with_capacity.limit lwc
168+
|| Kcas.Xt.commit { tx = Xt.try_add x q }
169+
130170
let add x q =
131-
(* Fenceless is safe as we always update. *)
132-
Loc.fenceless_modify q.back @@ Elems.cons x
171+
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
172+
let lwc = Loc.fenceless_update q.back (List_with_capacity.cons_safe x) in
173+
if List_with_capacity.capacity lwc <> Int.max_int then
174+
if List_with_capacity.length lwc = List_with_capacity.limit lwc then
175+
Kcas.Xt.commit { tx = Xt.add x q }
133176

134177
let push = add
135178

0 commit comments

Comments
 (0)