3
3
* Copyright (c) 2015, KC Sivaramakrishnan <[email protected] >
4
4
* Copyright (c) 2017, Nicolas ASSOUAD <[email protected] >
5
5
* Copyright (c) 2021, Tom Kelly <[email protected] >
6
+ * Copyright (c) 2024, Vesa Karvonen <[email protected] >
6
7
*
7
8
* Permission to use, copy, modify, and/or distribute this software for any
8
9
* purpose with or without fee is hereby granted, provided that the above
27
28
* https://dl.acm.org/doi/abs/10.1145/2442516.2442524
28
29
*)
29
30
31
+ module Atomic = Transparent_atomic
32
+
30
33
module type S = sig
31
34
type 'a t
32
35
@@ -38,162 +41,110 @@ module type S = sig
38
41
val steal_opt : 'a t -> 'a option
39
42
end
40
43
41
- module CArray = struct
42
- type 'a t = 'a array
43
-
44
- let rec log2 n = if n < = 1 then 0 else 1 + log2 (n asr 1 )
45
-
46
- let create sz v =
47
- (* [sz] must be a power of two. *)
48
- assert (0 < sz && sz = Int. shift_left 1 (log2 sz));
49
- assert (Int. logand sz (sz - 1 ) == 0 );
50
- Array. make sz v
51
-
52
- let [@ inline] size t = Array. length t
53
- let [@ inline] mask t = size t - 1
54
-
55
- let [@ inline] index i t =
56
- (* Because [size t] is a power of two, [i mod (size t)] is the same as
57
- [i land (size t - 1)], that is, [i land (mask t)]. *)
58
- Int. logand i (mask t)
59
-
60
- let [@ inline] get t i = Array. unsafe_get t (index i t)
61
- let [@ inline] put t i v = Array. unsafe_set t (index i t) v
62
-
63
- let [@ inline] transfer src dst top num =
64
- ArrayExtra. blit_circularly (* source array and index: *)
65
- src
66
- (index top src) (* target array and index: *)
67
- dst
68
- (index top dst) (* number of elements: *)
69
- num
70
-
71
- let grow t top bottom =
72
- let sz = size t in
73
- assert (bottom - top = sz);
74
- let dst = create (2 * sz) (Obj. magic () ) in
75
- transfer t dst top sz;
76
- dst
77
-
78
- let shrink t top bottom =
79
- let sz = size t in
80
- assert (bottom - top < = sz / 2 );
81
- let dst = create (sz / 2 ) (Obj. magic () ) in
82
- transfer t dst top (bottom - top);
83
- dst
84
- end
85
-
86
44
module M : S = struct
87
- let min_size = 32
88
- let shrink_const = 3
45
+ (* * This must be a power of two. *)
46
+ let min_capacity = 16
89
47
90
48
type 'a t = {
91
49
top : int Atomic .t ;
92
50
bottom : int Atomic .t ;
93
- tab : 'a ref CArray .t Atomic .t ;
94
- mutable next_shrink : int ;
51
+ mutable tab : 'a ref array ;
95
52
}
96
53
97
54
let create () =
98
- let top = Atomic. make 1 |> Multicore_magic. copy_as_padded in
99
- let bottom = Atomic. make 1 |> Multicore_magic. copy_as_padded in
100
- let tab =
101
- Atomic. make (CArray. create min_size (Obj. magic () ))
102
- |> Multicore_magic. copy_as_padded
103
- in
104
- let next_shrink = 0 in
105
- { top; bottom; tab; next_shrink } |> Multicore_magic. copy_as_padded
106
-
107
- let set_next_shrink q =
108
- let sz = CArray. size (Atomic. get q.tab) in
109
- if sz < = min_size then q.next_shrink < - 0
110
- else q.next_shrink < - sz / shrink_const
111
-
112
- let grow q t b =
113
- Atomic. set q.tab (CArray. grow (Atomic. get q.tab) t b);
114
- set_next_shrink q
115
-
116
- let size q =
117
- let b = Atomic. get q.bottom in
118
- let t = Atomic. get q.top in
119
- b - t
55
+ let top = Atomic. make 0 |> Multicore_magic. copy_as_padded in
56
+ let bottom = Atomic. make 0 |> Multicore_magic. copy_as_padded in
57
+ let tab = Array. make min_capacity (Obj. magic () ) in
58
+ { top; bottom; tab } |> Multicore_magic. copy_as_padded
59
+
60
+ let realloc a t b sz new_sz =
61
+ let new_a = Array. make new_sz (Obj. magic () ) in
62
+ ArrayExtra. blit_circularly a
63
+ (t land (sz - 1 ))
64
+ new_a
65
+ (t land (new_sz - 1 ))
66
+ (b - t);
67
+ new_a
120
68
121
69
let push q v =
122
- let v' = ref v in
123
- let b = Atomic. get q.bottom in
70
+ let v = ref v in
71
+ (* Read of [bottom] by the owner simply does not require a fence as the
72
+ [bottom] is only mutated by the owner. *)
73
+ let b = Atomic. fenceless_get q.bottom in
124
74
let t = Atomic. get q.top in
125
- let a = Atomic. get q.tab in
75
+ let a = q.tab in
126
76
let size = b - t in
127
- let a =
128
- if size = CArray. size a then begin
129
- grow q t b;
130
- Atomic. get q.tab
131
- end
132
- else a
133
- in
134
- CArray. put a b v';
135
- Atomic. set q.bottom (b + 1 )
77
+ let capacity = Array. length a in
78
+ if size < capacity then begin
79
+ Array. unsafe_set a (b land (capacity - 1 )) v;
80
+ Atomic. incr q.bottom
81
+ end
82
+ else
83
+ let a = realloc a t b capacity (capacity lsl 1 ) in
84
+ Array. unsafe_set a (b land (Array. length a - 1 )) v;
85
+ q.tab < - a;
86
+ Atomic. incr q.bottom
136
87
137
88
type ('a, _) poly = Option : ('a , 'a option ) poly | Value : ('a , 'a ) poly
138
89
139
- let [@ inline] release : type a r. a ref -> (a, r) poly -> r =
140
- fun ptr poly ->
141
- let res = ! ptr in
142
- (* we know this ptr will never be dereferenced, but want to
143
- break the reference to ensure that the contents of the
144
- deque array get garbage collected *)
145
- ptr := Obj. magic () ;
146
- match poly with Option -> Some res | Value -> res
147
-
148
90
let pop_as : type a r. a t -> (a, r) poly -> r =
149
91
fun q poly ->
150
- if size q = 0 then match poly with Option -> None | Value -> raise Exit
92
+ let b = Atomic. fetch_and_add q.bottom (- 1 ) - 1 in
93
+ (* Read of [top] at this point requires no fence as we simply need to ensure
94
+ that the read happens after updating [bottom]. *)
95
+ let t = Atomic. fenceless_get q.top in
96
+ let size = b - t in
97
+ if 0 < size then begin
98
+ let a = q.tab in
99
+ let capacity = Array. length a in
100
+ let out = Array. unsafe_get a (b land (capacity - 1 )) in
101
+ let res = ! out in
102
+ out := Obj. magic () ;
103
+ if size + size + size < = capacity - min_capacity then
104
+ q.tab < - realloc a t b capacity (capacity lsr 1 );
105
+ match poly with Option -> Some res | Value -> res
106
+ end
107
+ else if size < 0 then begin
108
+ (* This write of [bottom] requires no fence. The deque is empty and
109
+ remains so until the next [push]. *)
110
+ Atomic. fenceless_set q.bottom (b + 1 );
111
+ match poly with Option -> None | Value -> raise_notrace Exit
112
+ end
151
113
else
152
- let b = Atomic. get q.bottom - 1 in
153
- Atomic. set q.bottom b;
154
- let t = Atomic. get q.top in
155
- let a = Atomic. get q.tab in
156
- let size = b - t in
157
- if size < 0 then begin
158
- (* empty queue *)
159
- Atomic. set q.bottom (b + 1 );
160
- match poly with Option -> None | Value -> raise Exit
114
+ let got = Atomic. compare_and_set q.top t (t + 1 ) in
115
+ (* This write of [bottom] requires no fence. The deque is empty and
116
+ remains so until the next [push]. *)
117
+ Atomic. fenceless_set q.bottom (b + 1 );
118
+ let a = q.tab in
119
+ let out = Array. unsafe_get a (b land (Array. length a - 1 )) in
120
+ if got then begin
121
+ let res = ! out in
122
+ out := Obj. magic () ;
123
+ match poly with Option -> Some res | Value -> res
161
124
end
162
- else
163
- let out = CArray. get a b in
164
- if b = t then
165
- (* single last element *)
166
- if Atomic. compare_and_set q.top t (t + 1 ) then begin
167
- Atomic. set q.bottom (b + 1 );
168
- release out poly
169
- end
170
- else begin
171
- Atomic. set q.bottom (b + 1 );
172
- match poly with Option -> None | Value -> raise Exit
173
- end
174
- else begin
175
- (* non-empty queue *)
176
- if q.next_shrink > size then begin
177
- Atomic. set q.tab (CArray. shrink a t b);
178
- set_next_shrink q
179
- end ;
180
- release out poly
181
- end
125
+ else match poly with Option -> None | Value -> raise_notrace Exit
182
126
183
127
let pop q = pop_as q Value
184
128
let pop_opt q = pop_as q Option
185
129
186
130
let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
187
131
fun q backoff poly ->
188
- let t = Atomic. get q.top in
132
+ (* Read of [top] does not require a fence at this point, but the read of
133
+ [top] must happen before the read of [bottom]. The write of [top] later
134
+ has no effect in case we happened to read an old value of [top]. *)
135
+ let t = Atomic. fenceless_get q.top in
189
136
let b = Atomic. get q.bottom in
190
137
let size = b - t in
191
- if size < = 0 then match poly with Option -> None | Value -> raise Exit
192
- else
193
- let a = Atomic. get q.tab in
194
- let out = CArray. get a t in
195
- if Atomic. compare_and_set q.top t (t + 1 ) then release out poly
138
+ if 0 < size then
139
+ let a = q.tab in
140
+ let out = Array. unsafe_get a (t land (Array. length a - 1 )) in
141
+ if Atomic. compare_and_set q.top t (t + 1 ) then begin
142
+ let res = ! out in
143
+ out := Obj. magic () ;
144
+ match poly with Option -> Some res | Value -> res
145
+ end
196
146
else steal_as q (Backoff. once backoff) poly
147
+ else match poly with Option -> None | Value -> raise_notrace Exit
197
148
198
149
let steal q = steal_as q Backoff. default Value
199
150
let steal_opt q = steal_as q Backoff. default Option
0 commit comments