Skip to content

Commit af5180a

Browse files
s1ckknutwalker
andcommitted
Add block aligned partitioning
Co-authored-by: Paul Horn <[email protected]>
1 parent a7b48ae commit af5180a

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

core/src/main/java/org/neo4j/graphalgo/core/utils/partition/PartitionUtils.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
*/
2020
package org.neo4j.graphalgo.core.utils.partition;
2121

22+
import com.carrotsearch.hppc.AbstractIterator;
2223
import org.neo4j.graphalgo.api.Graph;
2324
import org.neo4j.graphalgo.core.concurrency.ParallelUtil;
2425
import org.neo4j.graphalgo.core.utils.BitUtil;
2526
import org.neo4j.graphalgo.core.utils.collection.primitive.PrimitiveLongIterator;
27+
import org.neo4j.graphalgo.core.utils.paged.HugeCursor;
28+
import org.neo4j.graphalgo.core.utils.paged.HugeLongArray;
2629

2730
import java.util.ArrayList;
31+
import java.util.Iterator;
2832
import java.util.List;
2933
import java.util.function.Function;
3034

@@ -123,4 +127,80 @@ private static <TASK> List<TASK> tasks(
123127
}
124128
return result;
125129
}
130+
131+
public static <TASK> Iterator<TASK> blockAlignedPartitioning(
132+
HugeLongArray sortedIds,
133+
int blockShift,
134+
Function<Partition, TASK> taskCreator
135+
) {
136+
return new BlockAlignedPartitionIterator<>(sortedIds, blockShift, taskCreator);
137+
}
138+
139+
private static class BlockAlignedPartitionIterator<TASK> extends AbstractIterator<TASK> {
140+
private final HugeCursor<long[]> cursor;
141+
private final long size;
142+
private final int blockShift;
143+
private final Function<Partition, TASK> taskCreator;
144+
145+
private int prevBlockId;
146+
private long blockStart;
147+
private boolean done;
148+
private int lastIndex;
149+
150+
BlockAlignedPartitionIterator(
151+
HugeLongArray sortedIds,
152+
int blockShift,
153+
Function<Partition, TASK> taskCreator
154+
) {
155+
this.size = sortedIds.size();
156+
this.blockShift = blockShift;
157+
this.taskCreator = taskCreator;
158+
this.cursor = sortedIds.initCursor(sortedIds.newCursor());
159+
this.prevBlockId = 0;
160+
this.blockStart = 0L;
161+
this.done = false;
162+
this.lastIndex = Integer.MAX_VALUE;
163+
}
164+
165+
@Override
166+
protected TASK fetch() {
167+
if (this.done) {
168+
return done();
169+
}
170+
171+
long base = cursor.base;
172+
int limit = cursor.limit;
173+
long[] array = cursor.array;
174+
int prevBlockId = this.prevBlockId;
175+
int blockShift = this.blockShift;
176+
177+
for (int i = lastIndex; i < limit; i++) {
178+
long originalId = array[i];
179+
int blockId = (int) (originalId >>> blockShift);
180+
if (blockId != prevBlockId) {
181+
long internalId = base + i;
182+
prevBlockId = blockId;
183+
184+
if (internalId > 0) {
185+
var partition = ImmutablePartition.of(blockStart, internalId - blockStart);
186+
this.blockStart = internalId;
187+
this.prevBlockId = prevBlockId;
188+
this.lastIndex = i;
189+
return taskCreator.apply(partition);
190+
}
191+
}
192+
}
193+
194+
if (cursor.next()) {
195+
this.prevBlockId = prevBlockId;
196+
this.lastIndex = cursor.offset;
197+
return fetch();
198+
}
199+
200+
var partition = ImmutablePartition.of(blockStart, size - blockStart);
201+
this.done = true;
202+
203+
return taskCreator.apply(partition);
204+
}
205+
}
126206
}

core/src/test/java/org/neo4j/graphalgo/core/utils/PartitionUtilsTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,16 @@
2929
import org.neo4j.graphalgo.beta.generator.RandomGraphGenerator;
3030
import org.neo4j.graphalgo.beta.generator.RelationshipDistribution;
3131
import org.neo4j.graphalgo.core.utils.mem.AllocationTracker;
32+
import org.neo4j.graphalgo.core.utils.paged.HugeLongArray;
33+
import org.neo4j.graphalgo.core.utils.partition.ImmutablePartition;
3234
import org.neo4j.graphalgo.core.utils.partition.Partition;
3335
import org.neo4j.graphalgo.core.utils.partition.PartitionUtils;
3436

3537
import java.util.List;
3638
import java.util.function.Function;
3739
import java.util.stream.Collectors;
3840
import java.util.stream.Stream;
41+
import java.util.stream.StreamSupport;
3942

4043
import static org.assertj.core.api.Assertions.assertThat;
4144
import static org.assertj.core.api.Assertions.within;
@@ -180,6 +183,38 @@ void testDegreePartitioningWithNodeFilter() {
180183
assertEquals(3, partitions.get(0).nodeCount());
181184
}
182185

186+
@Test
187+
void testBlockAlignedPartitioning() {
188+
var blockShift = 3; // 2^3 = 8 ids per block
189+
190+
var sortedArray = HugeLongArray.of(
191+
/* block 0 */ 0, 3, 5,
192+
/* block 1 */ 9, 10, 11, 12, 13, 14, 15,
193+
/* block 2 */
194+
/* block 3 */ 24, 28, 30, 31,
195+
/* block 4 */ 32
196+
);
197+
198+
var partitionIterator = PartitionUtils.blockAlignedPartitioning(
199+
sortedArray,
200+
blockShift,
201+
partition -> partition
202+
);
203+
204+
// 😿 Java
205+
var partitions = StreamSupport
206+
.stream(((Iterable<Partition>) () -> partitionIterator).spliterator(), false)
207+
.collect(Collectors.toList());
208+
209+
assertThat(partitions)
210+
.containsExactly(
211+
ImmutablePartition.of(0, 3),
212+
ImmutablePartition.of(3, 7),
213+
ImmutablePartition.of(10, 4),
214+
ImmutablePartition.of(14, 1)
215+
);
216+
}
217+
183218
static class TestTask implements Runnable {
184219

185220
public final long start;

0 commit comments

Comments
 (0)