1
- use std:: collections:: HashMap ;
1
+ use std:: collections:: { HashMap , HashSet } ;
2
2
use std:: hint:: black_box;
3
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
3
4
use std:: sync:: Arc ;
4
5
use std:: thread;
5
6
@@ -107,14 +108,66 @@ fn decode(coding: &Coding<u8>, mut bits: impl Iterator<Item = bool>) {
107
108
}
108
109
109
110
#[ inline( always) ]
110
- fn decode_spec ( coding : Arc < Coding < u8 > > , bits : Vec < bool > ) {
111
- let coding = coding. clone ( ) ;
112
- let mut main_iter = bits. into_iter ( ) ;
113
- let mut d = coding. decoder ( ) ;
114
- while let Some ( b) = main_iter. next ( ) {
115
- if let minimum_redundancy:: DecodingResult :: Value ( v) = d. consume ( & coding, b as u32 ) {
116
- black_box ( v) ;
117
- d. reset ( coding. degree . as_u32 ( ) ) ;
111
+ fn decode_spec ( coding : Arc < Coding < u8 > > , bits : Arc < Vec < bool > > ) {
112
+ let cursor = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
113
+
114
+ loop {
115
+ let unique_code_lengths: HashSet < u32 > = coding. code_lengths ( ) . values ( ) . cloned ( ) . collect ( ) ;
116
+ let mut sorted_code_lengths: Vec < u32 > = Vec :: from_iter ( unique_code_lengths) ;
117
+ sorted_code_lengths. sort_unstable ( ) ; // sort_unstable is often faster and appropriate here
118
+ let num_cores = std:: cmp:: max ( 1 , num_cpus:: get ( ) - 2 ) ;
119
+ let top_num_cores_code_lengths =
120
+ sorted_code_lengths[ ..num_cores. min ( sorted_code_lengths. len ( ) ) ] . to_vec ( ) ;
121
+
122
+ let bits_arc = Arc :: clone ( & bits) ;
123
+ let coding_arc = Arc :: clone ( & coding) ;
124
+ let cursor_arc = Arc :: clone ( & cursor) ;
125
+
126
+ let producer_handle = thread:: spawn ( move || {
127
+ let mut len = 0 ;
128
+ let mut decoder = coding_arc. decoder ( ) ;
129
+ let mut bits_iter = bits_arc[ cursor_arc. load ( Ordering :: SeqCst ) ..] . into_iter ( ) ;
130
+ while let Some ( b) = bits_iter. next ( ) {
131
+ len += 1 ;
132
+ if let minimum_redundancy:: DecodingResult :: Value ( v) =
133
+ decoder. consume ( & coding_arc, * b as u32 )
134
+ {
135
+ black_box ( v) ;
136
+ return len;
137
+ }
138
+ }
139
+ panic ! ( "invalid encoded value" ) ;
140
+ } ) ;
141
+
142
+ let mut handles = HashMap :: new ( ) ;
143
+ for l in top_num_cores_code_lengths {
144
+ let bits_arc = Arc :: clone ( & bits) ;
145
+ let coding_arc = Arc :: clone ( & coding) ;
146
+ let cursor_arc = Arc :: clone ( & cursor) ;
147
+ let handle = thread:: spawn ( move || {
148
+ let mut len = 0 ;
149
+ let mut decoder = coding_arc. decoder ( ) ;
150
+ let start_index = cursor_arc. load ( Ordering :: SeqCst ) + l as usize ;
151
+ let mut bits_iter = bits_arc[ start_index..] . into_iter ( ) ;
152
+ while let Some ( b) = bits_iter. next ( ) {
153
+ len += 1 ;
154
+ if let minimum_redundancy:: DecodingResult :: Value ( v) =
155
+ decoder. consume ( & coding_arc, * b as u32 )
156
+ {
157
+ black_box ( v) ;
158
+ return len;
159
+ }
160
+ }
161
+ panic ! ( "invalid encoded value" ) ;
162
+ } ) ;
163
+ handles. insert ( l, handle) ;
164
+ }
165
+ let producer_len = producer_handle. join ( ) . unwrap ( ) ;
166
+ if let Some ( handle) = handles. remove ( & producer_len) {
167
+ let guess_len = handle. join ( ) . unwrap ( ) ;
168
+ cursor. fetch_add ( ( producer_len + guess_len) as usize , Ordering :: SeqCst ) ;
169
+ } else {
170
+ cursor. fetch_add ( ( producer_len) as usize , Ordering :: SeqCst ) ;
118
171
}
119
172
}
120
173
}
@@ -277,10 +330,6 @@ pub fn benchmark(conf: &super::Conf) {
277
330
. measure ( || Coding :: from_frequencies_cloned ( BitsPerFragment ( 1 ) , & frequencies) )
278
331
. as_nanos ( ) ;
279
332
let coding = Coding :: from_frequencies_cloned ( BitsPerFragment ( 1 ) , & frequencies) ;
280
- let coding_arc = Arc :: new ( Coding :: from_frequencies_cloned (
281
- BitsPerFragment ( 1 ) ,
282
- & frequencies,
283
- ) ) ;
284
333
285
334
let enc_constr_ns = conf. measure ( || coding. codes_for_values ( ) ) . as_nanos ( ) ;
286
335
let rev_enc_constr_ns = conf
@@ -300,15 +349,22 @@ pub fn benchmark(conf: &super::Conf) {
300
349
. bit_in_range_iter ( 0 ..compressed_size_bits)
301
350
. collect ( ) ;
302
351
conf. print_compressed_size ( compressed_size_bits) ;
303
- conf. print_speed (
304
- " decoding from a queue (without storing) using vec instead of iterator" ,
305
- conf. measure ( || decode_spec ( coding_arc. clone ( ) , bits. clone ( ) ) ) ,
306
- ) ;
352
+
353
+ let coding_arc = Arc :: new ( Coding :: from_frequencies_cloned (
354
+ BitsPerFragment ( 1 ) ,
355
+ & frequencies,
356
+ ) ) ;
307
357
let iter = bits. clone ( ) . into_iter ( ) ;
308
358
conf. print_speed (
309
359
" decoding from a queue (without storing)" ,
310
360
conf. measure ( || decode ( & coding, iter. clone ( ) ) ) ,
311
361
) ;
362
+ let bits_arc = Arc :: new ( bits) ;
363
+ conf. print_speed (
364
+ " decoding from a queue (without storing) using speculative execution" ,
365
+ conf. measure ( || decode_spec ( coding_arc. clone ( ) , bits_arc. clone ( ) ) ) ,
366
+ ) ;
367
+ let coding = Coding :: from_frequencies_cloned ( BitsPerFragment ( 1 ) , & frequencies) ;
312
368
313
369
if conf. verify {
314
370
verify_queue ( & text, compressed_text, & coding, compressed_size_bits) ;
0 commit comments