19
19
*/
20
20
package org .neo4j .graphalgo .beta .filter ;
21
21
22
+ import com .carrotsearch .hppc .AbstractIterator ;
22
23
import org .apache .commons .lang3 .mutable .MutableInt ;
23
24
import org .eclipse .collections .api .block .function .primitive .LongToLongFunction ;
24
25
import org .neo4j .graphalgo .NodeLabel ;
25
26
import org .neo4j .graphalgo .annotation .ValueClass ;
26
27
import org .neo4j .graphalgo .api .DefaultValue ;
27
28
import org .neo4j .graphalgo .api .GraphStore ;
29
+ import org .neo4j .graphalgo .api .IdMapping ;
28
30
import org .neo4j .graphalgo .api .NodeMapping ;
29
31
import org .neo4j .graphalgo .api .NodeProperties ;
30
32
import org .neo4j .graphalgo .api .NodeProperty ;
45
47
import org .neo4j .graphalgo .core .utils .mem .AllocationTracker ;
46
48
import org .neo4j .graphalgo .core .utils .paged .HugeLongArray ;
47
49
import org .neo4j .graphalgo .core .utils .paged .HugeMergeSort ;
48
- import org .neo4j .graphalgo .core .utils .paged .SparseLongArray ;
49
50
import org .neo4j .graphalgo .core .utils .partition .Partition ;
50
51
import org .neo4j .graphalgo .core .utils .partition .PartitionUtils ;
51
52
52
53
import java .util .Collection ;
54
+ import java .util .Iterator ;
53
55
import java .util .Map ;
54
56
import java .util .concurrent .ExecutorService ;
55
57
import java .util .stream .Collectors ;
56
58
59
+ import static org .neo4j .graphalgo .core .utils .paged .SparseLongArray .SUPER_BLOCK_SHIFT ;
57
60
import static org .neo4j .graphalgo .utils .StringFormatting .formatWithLocale ;
58
61
59
62
final class NodesFilter {
@@ -78,6 +81,13 @@ static FilteredNodes filterNodes(
78
81
LongToLongFunction originalIdFunction ;
79
82
LongToLongFunction internalIdFunction ;
80
83
84
+ // Partitions over the id space are created depending on the id map
85
+ // implementation. For the BitIdMap, we need to make sure that the
86
+ // ranges of original ids in each partition are aligned with the
87
+ // block size used for creating the BitIdMap. For the regular IdMap,
88
+ // we use range partitioning.
89
+ Iterator <Partition > partitions ;
90
+
81
91
var inputNodes = graphStore .nodes ();
82
92
83
93
var nodesBuilderBuilder = GraphFactory .initNodesBuilder ()
@@ -112,39 +122,42 @@ static FilteredNodes filterNodes(
112
122
// We signal the nodes builder to use the block-based
113
123
// BitIdMap builder.
114
124
nodesBuilderBuilder .hasDisjointPartitions (true );
125
+ // Create partitions that are aligned to the blocks that
126
+ // original ids belong to. We must guarantee, that no two
127
+ // partitions contain ids that belong to the same block.
128
+ partitions = PartitionUtils .blockAlignedPartitioning (
129
+ sortedOriginalIds ,
130
+ SUPER_BLOCK_SHIFT ,
131
+ partition -> partition
132
+ );
133
+
115
134
progressLogger .finishSubTask ("Prepare node ids" );
116
135
} else {
117
136
// If we need to construct a regular IdMap, we can just
118
137
// delegate to the input node id mapping and use the
119
138
// internal id as given.
120
139
originalIdFunction = inputNodes ::toOriginalNodeId ;
121
140
internalIdFunction = (id ) -> id ;
141
+
142
+ partitions = PartitionUtils
143
+ .rangePartition (concurrency , graphStore .nodeCount (), partition -> partition )
144
+ .iterator ();
122
145
}
123
146
124
147
var nodesBuilder = nodesBuilderBuilder .build ();
125
148
126
- var nodeFilterTasks = PartitionUtils .numberAlignedPartitioning (
127
- concurrency ,
128
- graphStore .nodeCount (),
129
- // We need to make sure to align the partition size
130
- // with the block size in the SLA, which is the main
131
- // data structure of the BitIdMap. If partition sizes
132
- // are unaligned, wrong internal ids will be generated
133
- // during import.
134
- SparseLongArray .SUPER_BLOCK_SIZE ,
135
- partition -> new NodeFilterTask (
136
- partition ,
137
- expression ,
138
- graphStore ,
139
- originalIdFunction ,
140
- internalIdFunction ,
141
- nodesBuilder ,
142
- progressLogger
143
- )
149
+ var tasks = NodeFilterTask .of (
150
+ graphStore ,
151
+ expression ,
152
+ partitions ,
153
+ originalIdFunction ,
154
+ internalIdFunction ,
155
+ nodesBuilder ,
156
+ progressLogger
144
157
);
145
158
146
159
progressLogger .startSubTask ("Nodes" ).reset (graphStore .nodeCount ());
147
- ParallelUtil .runWithConcurrency (concurrency , nodeFilterTasks , executorService );
160
+ ParallelUtil .runWithConcurrency (concurrency , tasks , executorService );
148
161
progressLogger .finishSubTask ("Nodes" );
149
162
150
163
var nodeMappingAndProperties = nodesBuilder .build ();
@@ -154,7 +167,6 @@ static FilteredNodes filterNodes(
154
167
var filteredNodePropertyStores = filterNodeProperties (
155
168
filteredNodeMapping ,
156
169
graphStore ,
157
- executorService ,
158
170
concurrency ,
159
171
progressLogger
160
172
);
@@ -196,7 +208,6 @@ private static HugeLongArray sortOriginalIds(
196
208
private static Map <NodeLabel , NodePropertyStore > filterNodeProperties (
197
209
NodeMapping filteredNodeMapping ,
198
210
GraphStore inputGraphStore ,
199
- ExecutorService executorService ,
200
211
int concurrency ,
201
212
ProgressLogger progressLogger
202
213
) {
@@ -220,7 +231,6 @@ private static Map<NodeLabel, NodePropertyStore> filterNodeProperties(
220
231
nodeLabel ,
221
232
propertyKeys ,
222
233
concurrency ,
223
- executorService ,
224
234
progressLogger
225
235
);
226
236
@@ -234,11 +244,10 @@ private static Map<NodeLabel, NodePropertyStore> filterNodeProperties(
234
244
235
245
private static NodePropertyStore createNodePropertyStore (
236
246
GraphStore inputGraphStore ,
237
- NodeMapping filteredMapping ,
247
+ IdMapping filteredMapping ,
238
248
NodeLabel nodeLabel ,
239
249
Collection <String > propertyKeys ,
240
250
int concurrency ,
241
- ExecutorService executorService ,
242
251
ProgressLogger progressLogger
243
252
) {
244
253
var builder = NodePropertyStore .builder ();
@@ -382,6 +391,35 @@ private static final class NodeFilterTask implements Runnable {
382
391
private final LongToLongFunction internalIdFunction ;
383
392
private final NodesBuilder nodesBuilder ;
384
393
394
+ static Iterator <NodeFilterTask > of (
395
+ GraphStore graphStore ,
396
+ Expression expression ,
397
+ Iterator <Partition > partitions ,
398
+ LongToLongFunction originalIdFunction ,
399
+ LongToLongFunction internalIdFunction ,
400
+ NodesBuilder nodesBuilder ,
401
+ ProgressLogger progressLogger
402
+ ) {
403
+ return new AbstractIterator <>() {
404
+ @ Override
405
+ protected NodeFilterTask fetch () {
406
+ if (!partitions .hasNext ()) {
407
+ return done ();
408
+ }
409
+
410
+ return new NodeFilterTask (
411
+ partitions .next (),
412
+ expression ,
413
+ graphStore ,
414
+ originalIdFunction ,
415
+ internalIdFunction ,
416
+ nodesBuilder ,
417
+ progressLogger
418
+ );
419
+ }
420
+ };
421
+ }
422
+
385
423
private NodeFilterTask (
386
424
Partition partition ,
387
425
Expression expression ,
0 commit comments