15
15
// specific language governing permissions and limitations
16
16
// under the License.
17
17
18
+ use std:: sync:: atomic:: { AtomicI64 , Ordering } ;
19
+
18
20
use crate :: bit_iterator:: { BitIndexIterator , BitIterator , BitSliceIterator } ;
19
21
use crate :: buffer:: BooleanBuffer ;
20
22
use crate :: { Buffer , MutableBuffer } ;
21
23
24
+ const UNINITIALIZED_NULL_COUNT : i64 = -1 ;
25
+
26
+ #[ derive( Debug ) ]
27
+ pub enum NullCount {
28
+ Eager ( usize ) ,
29
+ Lazy ( AtomicI64 ) ,
30
+ }
31
+
32
+ impl Clone for NullCount {
33
+ fn clone ( & self ) -> Self {
34
+ match self {
35
+ Self :: Eager ( v) => Self :: Eager ( * v) ,
36
+ Self :: Lazy ( v) => {
37
+ let v = v. load ( Ordering :: Relaxed ) ;
38
+ Self :: Lazy ( AtomicI64 :: new ( v) )
39
+ }
40
+ }
41
+ }
42
+ }
43
+
22
44
/// A [`BooleanBuffer`] used to encode validity for arrow arrays
23
45
///
24
46
/// As per the [Arrow specification], array validity is encoded in a packed bitmask with a
25
47
/// `true` value indicating the corresponding slot is not null, and `false` indicating
26
48
/// that it is null.
27
49
///
28
50
/// [Arrow specification]: https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps
29
- #[ derive( Debug , Clone , Eq , PartialEq ) ]
51
+ #[ derive( Debug , Clone ) ]
30
52
pub struct NullBuffer {
31
53
buffer : BooleanBuffer ,
32
- null_count : usize ,
54
+ null_count : NullCount ,
55
+ }
56
+
57
+ impl PartialEq for NullBuffer {
58
+ fn eq ( & self , other : & Self ) -> bool {
59
+ self . buffer == other. buffer
60
+ }
33
61
}
34
62
63
+ impl Eq for NullBuffer { }
64
+
35
65
impl NullBuffer {
36
66
/// Create a new [`NullBuffer`] computing the null count
37
67
pub fn new ( buffer : BooleanBuffer ) -> Self {
38
- let null_count = buffer. len ( ) - buffer. count_set_bits ( ) ;
68
+ // Expensive to calc the null count, we should lazily compute it when
69
+ let null_count = NullCount :: Lazy ( AtomicI64 :: new ( UNINITIALIZED_NULL_COUNT ) ) ;
39
70
Self { buffer, null_count }
40
71
}
41
72
42
73
/// Create a new [`NullBuffer`] of length `len` where all values are null
43
74
pub fn new_null ( len : usize ) -> Self {
44
75
Self {
45
76
buffer : BooleanBuffer :: new_unset ( len) ,
46
- null_count : len,
77
+ null_count : NullCount :: Eager ( len) ,
47
78
}
48
79
}
49
80
@@ -53,7 +84,7 @@ impl NullBuffer {
53
84
pub fn new_valid ( len : usize ) -> Self {
54
85
Self {
55
86
buffer : BooleanBuffer :: new_set ( len) ,
56
- null_count : 0 ,
87
+ null_count : NullCount :: Eager ( 0 ) ,
57
88
}
58
89
}
59
90
@@ -63,7 +94,10 @@ impl NullBuffer {
63
94
///
64
95
/// `buffer` must contain `null_count` `0` bits
65
96
pub unsafe fn new_unchecked ( buffer : BooleanBuffer , null_count : usize ) -> Self {
66
- Self { buffer, null_count }
97
+ Self {
98
+ buffer,
99
+ null_count : NullCount :: Eager ( null_count) ,
100
+ }
67
101
}
68
102
69
103
/// Computes the union of the nulls in two optional [`NullBuffer`]
@@ -81,9 +115,12 @@ impl NullBuffer {
81
115
82
116
/// Returns true if all nulls in `other` also exist in self
83
117
pub fn contains ( & self , other : & NullBuffer ) -> bool {
84
- if other. null_count == 0 {
85
- return true ;
118
+ if let NullCount :: Eager ( v) = & other. null_count {
119
+ if * v == 0 {
120
+ return true ;
121
+ }
86
122
}
123
+
87
124
let lhs = self . inner ( ) . bit_chunks ( ) . iter_padded ( ) ;
88
125
let rhs = other. inner ( ) . bit_chunks ( ) . iter_padded ( ) ;
89
126
lhs. zip ( rhs) . all ( |( l, r) | ( l & !r) == 0 )
@@ -106,9 +143,17 @@ impl NullBuffer {
106
143
crate :: bit_util:: set_bit ( buffer. as_mut ( ) , i * count + j)
107
144
}
108
145
}
146
+
147
+ let null_count = if let NullCount :: Eager ( v) = & self . null_count {
148
+ NullCount :: Eager ( v * count)
149
+ } else {
150
+ // TODO: not sure about if it is better to load the atomic and attempt to reuse the compute result
151
+ NullCount :: Lazy ( AtomicI64 :: new ( UNINITIALIZED_NULL_COUNT ) )
152
+ } ;
153
+
109
154
Self {
110
155
buffer : BooleanBuffer :: new ( buffer. into ( ) , 0 , capacity) ,
111
- null_count : self . null_count * count ,
156
+ null_count,
112
157
}
113
158
}
114
159
@@ -131,9 +176,20 @@ impl NullBuffer {
131
176
}
132
177
133
178
/// Returns the null count for this [`NullBuffer`]
134
- #[ inline]
135
179
pub fn null_count ( & self ) -> usize {
136
- self . null_count
180
+ match & self . null_count {
181
+ NullCount :: Eager ( v) => * v,
182
+ NullCount :: Lazy ( v) => {
183
+ let cached_null_count = v. load ( Ordering :: Acquire ) ;
184
+ if cached_null_count != UNINITIALIZED_NULL_COUNT {
185
+ return cached_null_count as usize ;
186
+ }
187
+
188
+ let computed_null_count = self . buffer . len ( ) - self . buffer . count_set_bits ( ) ;
189
+ v. store ( computed_null_count as i64 , Ordering :: Release ) ;
190
+ computed_null_count
191
+ }
192
+ }
137
193
}
138
194
139
195
/// Returns `true` if the value at `idx` is not null
@@ -189,8 +245,10 @@ impl NullBuffer {
189
245
& self ,
190
246
f : F ,
191
247
) -> Result < ( ) , E > {
192
- if self . null_count == self . len ( ) {
193
- return Ok ( ( ) ) ;
248
+ if let NullCount :: Eager ( v) = & self . null_count {
249
+ if * v == self . len ( ) {
250
+ return Ok ( ( ) ) ;
251
+ }
194
252
}
195
253
self . valid_indices ( ) . try_for_each ( f)
196
254
}
0 commit comments