Skip to content

Commit b69f4c0

Browse files
s1ckDarthMax
andcommitted
Put access to update listeners under lock
Co-Authored-By: Max Kießling <[email protected]>
1 parent b3b85eb commit b69f4c0

File tree

1 file changed

+29
-4
lines changed

1 file changed

+29
-4
lines changed

cypher/cypher-core/src/main/java/org/neo4j/gds/core/cypher/RelationshipIds.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
import java.util.ArrayList;
3434
import java.util.List;
35+
import java.util.concurrent.locks.Lock;
36+
import java.util.concurrent.locks.ReentrantLock;
3537
import java.util.stream.Collectors;
3638

3739
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
@@ -42,6 +44,7 @@ public final class RelationshipIds extends CypherGraphStore.StateVisitor.Adapter
4244
private final TokenHolders tokenHolders;
4345
private final List<RelationshipIdContext> relationshipIdContexts;
4446
private final List<UpdateListener> updateListeners;
47+
private final Lock lock;
4548

4649
public interface UpdateListener {
4750
void onRelationshipIdsAdded(RelationshipIdContext relationshipIdContext);
@@ -62,6 +65,7 @@ private RelationshipIds(GraphStore graphStore, TokenHolders tokenHolders, List<R
6265
this.tokenHolders = tokenHolders;
6366
this.relationshipIdContexts = relationshipIdContexts;
6467
this.updateListeners = new ArrayList<>();
68+
this.lock = new ReentrantLock();
6569
}
6670

6771
public <T> T resolveRelationshipId(long relationshipId, ResolvedRelationshipIdFunction<T> relationshipIdConsumer) {
@@ -93,20 +97,41 @@ public <T> T resolveRelationshipId(long relationshipId, ResolvedRelationshipIdFu
9397
}
9498

9599
public void registerUpdateListener(UpdateListener updateListener) {
96-
this.updateListeners.add(updateListener);
100+
try {
101+
lock.lock();
102+
this.updateListeners.add(updateListener);
103+
} finally {
104+
lock.unlock();
105+
}
97106
// replay added relationship id contexts
98107
relationshipIdContexts.forEach(updateListener::onRelationshipIdsAdded);
99108
}
100109

101110
public void removeUpdateListener(UpdateListener updateListener) {
102-
this.updateListeners.remove(updateListener);
111+
// This is potentially called in parallel by the Cypher runtime
112+
// and while we are adding new relationship types.
113+
try {
114+
lock.lock();
115+
this.updateListeners.remove(updateListener);
116+
} finally {
117+
lock.unlock();
118+
}
103119
}
104120

105121
@Override
106122
public void relationshipTypeAdded(String relationshipType) {
107-
var relationshipIdContext = relationshipIdContextFromRelType(graphStore, tokenHolders, RelationshipType.of(relationshipType));
123+
var relationshipIdContext = relationshipIdContextFromRelType(
124+
graphStore,
125+
tokenHolders,
126+
RelationshipType.of(relationshipType)
127+
);
108128
relationshipIdContexts.add(relationshipIdContext);
109-
updateListeners.forEach(updateListener -> updateListener.onRelationshipIdsAdded(relationshipIdContext));
129+
try {
130+
lock.lock();
131+
updateListeners.forEach(updateListener -> updateListener.onRelationshipIdsAdded(relationshipIdContext));
132+
} finally {
133+
lock.unlock();
134+
}
110135
}
111136

112137
@NotNull

0 commit comments

Comments
 (0)