Skip to content

Commit 4b8ec4b

Browse files
committed
Re-use iterator state and pool cursors.
1 parent 7f85dc6 commit 4b8ec4b

File tree

3 files changed

+179
-125
lines changed

3 files changed

+179
-125
lines changed

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbRecordIterator.java

Lines changed: 119 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
package org.eclipse.rdf4j.sail.lmdb;
1212

1313
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
14+
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
1415
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST_DUP;
1516
import static org.lwjgl.util.lmdb.LMDB.MDB_GET_BOTH_RANGE;
16-
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
1717
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT_DUP;
1818
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT_NODUP;
1919
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
@@ -36,6 +36,7 @@
3636
import org.eclipse.rdf4j.sail.lmdb.util.EntryMatcher;
3737
import org.lwjgl.PointerBuffer;
3838
import org.lwjgl.system.MemoryStack;
39+
import org.lwjgl.system.MemoryUtil;
3940
import org.lwjgl.util.lmdb.MDBVal;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
@@ -46,117 +47,120 @@
4647
class LmdbRecordIterator implements RecordIterator {
4748

4849
private static final Logger log = LoggerFactory.getLogger(LmdbRecordIterator.class);
49-
private final Pool pool;
5050

51-
private final TripleIndex index;
51+
static class State {
5252

53-
private final long cursor;
53+
private TripleIndex index;
5454

55-
private final MDBVal maxKey;
56-
private final MDBVal maxValue;
55+
private long cursor;
5756

58-
private final boolean matchValues;
59-
private EntryMatcher matcher;
57+
private final MDBVal maxKey = MDBVal.malloc();
58+
private final MDBVal maxValue = MDBVal.malloc();
6059

61-
private final Txn txnRef;
60+
private boolean matchValues;
61+
private EntryMatcher matcher;
6262

63-
private long txnRefVersion;
63+
private Txn txnRef;
6464

65-
private final long txn;
65+
private long txnRefVersion;
6666

67-
private final int dbi;
67+
private long txn;
6868

69-
private volatile boolean closed = false;
69+
private int dbi;
7070

71-
private final MDBVal keyData;
71+
private final MDBVal keyData = MDBVal.malloc();
7272

73-
private final MDBVal valueData;
73+
private final MDBVal valueData = MDBVal.malloc();
7474

75-
private ByteBuffer minKeyBuf;
75+
private final ByteBuffer minKeyBuf = MemoryUtil.memAlloc((Long.BYTES + 1) * 2);
7676

77-
private ByteBuffer minValueBuf;
77+
private final ByteBuffer minValueBuf = MemoryUtil.memAlloc((Long.BYTES + 1) * 2);
7878

79-
private final ByteBuffer maxKeyBuf;
79+
private final ByteBuffer maxKeyBuf = MemoryUtil.memAlloc((Long.BYTES + 1) * 2);
8080

81-
private final ByteBuffer maxValueBuf;
81+
private final ByteBuffer maxValueBuf = MemoryUtil.memAlloc((Long.BYTES + 1) * 2);
8282

83-
private final long[] quad;
84-
private final long[] patternQuad;
83+
private long[] quad;
84+
private long[] patternQuad;
8585

86-
private boolean fetchNext = false;
86+
private StampedLongAdderLockManager txnLockManager;
8787

88-
private final StampedLongAdderLockManager txnLockManager;
88+
private int indexScore;
8989

90-
private final Thread ownerThread = Thread.currentThread();
90+
void close() {
91+
if (cursor != 0) {
92+
mdb_cursor_close(cursor);
93+
cursor = 0;
94+
}
95+
keyData.close();
96+
valueData.close();
97+
MemoryUtil.memFree(minKeyBuf);
98+
MemoryUtil.memFree(minValueBuf);
99+
MemoryUtil.memFree(maxKeyBuf);
100+
maxKey.close();
101+
MemoryUtil.memFree(maxValueBuf);
102+
maxValue.close();
103+
}
104+
}
91105

92-
private final int indexScore;
106+
private final Thread ownerThread = Thread.currentThread();
107+
private final State state;
108+
private volatile boolean closed = false;
109+
private boolean fetchNext = false;
93110

94111
LmdbRecordIterator(TripleIndex index, int indexScore, long subj, long pred, long obj,
95112
long context, boolean explicit, Txn txnRef) throws IOException {
96-
this.patternQuad = new long[] { subj, pred, obj, context };
97-
this.quad = new long[] { subj, pred, obj, context };
98-
this.pool = Pool.get();
99-
this.keyData = pool.getVal();
100-
this.valueData = pool.getVal();
101-
this.index = index;
102-
this.indexScore = indexScore;
113+
this.state = Pool.get().getState();
114+
this.state.patternQuad = new long[] { subj, pred, obj, context };
115+
this.state.quad = new long[] { subj, pred, obj, context };
116+
this.state.index = index;
117+
this.state.indexScore = indexScore;
103118
// prepare min and max keys if index can be used
104119
// otherwise, leave as null to indicate full scan
105120
if (indexScore > 0) {
106-
minKeyBuf = pool.getKeyBuffer();
107-
minValueBuf = pool.getKeyBuffer();
108-
index.getMinEntry(minKeyBuf, minValueBuf, subj, pred, obj, context);
109-
minKeyBuf.flip();
110-
minValueBuf.flip();
111-
112-
this.maxKey = pool.getVal();
113-
this.maxValue = pool.getVal();
114-
this.maxKeyBuf = pool.getKeyBuffer();
115-
this.maxValueBuf = pool.getKeyBuffer();
116-
index.getMaxEntry(maxKeyBuf, maxValueBuf, subj, pred, obj, context);
117-
maxKeyBuf.flip();
118-
maxValueBuf.flip();
119-
this.maxKey.mv_data(maxKeyBuf);
120-
this.maxValue.mv_data(maxValueBuf);
121-
} else {
122-
minKeyBuf = null;
123-
this.maxKey = null;
124-
this.maxValue = null;
125-
this.maxKeyBuf = null;
126-
this.maxValueBuf = null;
121+
state.minKeyBuf.clear();
122+
state.minValueBuf.clear();
123+
index.getMinEntry(state.minKeyBuf, state.minValueBuf, subj, pred, obj, context);
124+
state.minKeyBuf.flip();
125+
state.minValueBuf.flip();
126+
127+
state.maxKeyBuf.clear();
128+
state.maxValueBuf.clear();
129+
index.getMaxEntry(state.maxKeyBuf, state.maxValueBuf, subj, pred, obj, context);
130+
state.maxKeyBuf.flip();
131+
state.maxValueBuf.flip();
132+
state.maxKey.mv_data(state.maxKeyBuf);
133+
state.maxValue.mv_data(state.maxValueBuf);
127134
}
128135

129-
this.matchValues = subj > 0 || pred > 0 || obj > 0 || context >= 0;
136+
state.matchValues = subj > 0 || pred > 0 || obj > 0 || context >= 0;
137+
state.matcher = null;
130138

131-
this.dbi = index.getDB(explicit);
132-
this.txnRef = txnRef;
133-
this.txnLockManager = txnRef.lockManager();
139+
var dbi = index.getDB(explicit);
134140

135141
long readStamp;
136142
try {
137-
readStamp = txnLockManager.readLock();
143+
readStamp = txnRef.lockManager().readLock();
138144
} catch (InterruptedException e) {
139145
throw new SailException(e);
140146
}
141147
try {
142-
this.txnRefVersion = txnRef.version();
143-
this.txn = txnRef.get();
144-
145-
try (MemoryStack stack = MemoryStack.stackPush()) {
146-
PointerBuffer pp = stack.mallocPointer(1);
147-
E(mdb_cursor_open(txn, dbi, pp));
148-
cursor = pp.get(0);
149-
}
148+
state.dbi = dbi;
149+
state.txnRef = txnRef;
150+
state.txnLockManager = txnRef.lockManager();
151+
state.txnRefVersion = txnRef.version();
152+
state.txn = txnRef.get();
153+
state.cursor = txnRef.getCursor(dbi);
150154
} finally {
151-
txnLockManager.unlockRead(readStamp);
155+
txnRef.lockManager().unlockRead(readStamp);
152156
}
153157
}
154158

155159
@Override
156160
public long[] next() {
157161
long readStamp;
158162
try {
159-
readStamp = txnLockManager.readLock();
163+
readStamp = state.txnLockManager.readLock();
160164
} catch (InterruptedException e) {
161165
throw new SailException(e);
162166
}
@@ -167,28 +171,26 @@ public long[] next() {
167171
return null;
168172
}
169173

170-
if (txnRefVersion != txnRef.version()) {
174+
if (state.txnRefVersion != state.txnRef.version()) {
171175
// TODO: None of the tests in the LMDB Store cover this case!
172176
// cursor must be renewed
173-
mdb_cursor_renew(txn, cursor);
177+
mdb_cursor_renew(state.txn, state.cursor);
174178
if (fetchNext) {
175179
// cursor must be positioned on last item, reuse minKeyBuf if available
176-
if (minKeyBuf == null) {
177-
minKeyBuf = pool.getKeyBuffer();
178-
minValueBuf = pool.getKeyBuffer();
179-
}
180-
minKeyBuf.clear();
181-
index.toEntry(minKeyBuf, minValueBuf, quad[0], quad[1], quad[2], quad[3]);
182-
minKeyBuf.flip();
183-
minValueBuf.flip();
184-
keyData.mv_data(minKeyBuf);
180+
state.minKeyBuf.clear();
181+
state.minValueBuf.clear();
182+
state.index.toEntry(state.minKeyBuf, state.minValueBuf, state.quad[0], state.quad[1], state.quad[2],
183+
state.quad[3]);
184+
state.minKeyBuf.flip();
185+
state.minValueBuf.flip();
186+
state.keyData.mv_data(state.minKeyBuf);
185187
// use set range if entry was deleted
186-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
188+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_SET_RANGE);
187189
if (lastResult == MDB_SUCCESS) {
188-
valueData.mv_data(minValueBuf);
189-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_GET_BOTH_RANGE);
190+
state.valueData.mv_data(state.minValueBuf);
191+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_GET_BOTH_RANGE);
190192
if (lastResult != MDB_SUCCESS) {
191-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST_DUP);
193+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_FIRST_DUP);
192194
}
193195
}
194196
if (lastResult != MDB_SUCCESS) {
@@ -197,82 +199,84 @@ public long[] next() {
197199
}
198200
}
199201
// update version of txn ref
200-
this.txnRefVersion = txnRef.version();
202+
state.txnRefVersion = state.txnRef.version();
201203
}
202204

203205
boolean isDupValue = false;
204206
if (fetchNext) {
205-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT_DUP);
207+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_NEXT_DUP);
206208
if (lastResult != MDB_SUCCESS) {
207209
// no more duplicates, move to next key
208-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT_NODUP);
210+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_NEXT_NODUP);
209211
} else {
210212
isDupValue = true;
211213
}
212214
fetchNext = false;
213215
} else {
214-
if (minKeyBuf != null) {
216+
if (state.indexScore > 0) {
215217
// set cursor to min key
216-
keyData.mv_data(minKeyBuf);
218+
state.keyData.mv_data(state.minKeyBuf);
217219
// set range on key is only required if less than the first two key elements are fixed
218-
lastResult = indexScore >= 2 ? MDB_SUCCESS
219-
: mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
220+
lastResult = state.indexScore >= 2 ? MDB_SUCCESS
221+
: mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_SET_RANGE);
220222
if (lastResult == MDB_SUCCESS) {
221-
valueData.mv_data(minValueBuf);
222-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_GET_BOTH_RANGE);
223+
state.valueData.mv_data(state.minValueBuf);
224+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_GET_BOTH_RANGE);
223225
if (lastResult != MDB_SUCCESS) {
224-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_FIRST_DUP);
226+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_FIRST_DUP);
225227
} else {
226-
isDupValue = indexScore >= 2;
228+
isDupValue = state.indexScore >= 2;
227229
}
228230
}
229231
} else {
230232
// set cursor to first item
231-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT);
233+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_FIRST);
232234
}
233235
}
234236

235237
while (lastResult == MDB_SUCCESS) {
236238
// if (maxKey != null && TripleStore.COMPARATOR.compare(keyData.mv_data(), maxKey.mv_data()) > 0) {
237239
int keyDiff;
238-
if (maxKey != null &&
239-
(keyDiff = isDupValue ? 0 : mdb_cmp(txn, dbi, keyData, maxKey)) >= 0
240-
&& (keyDiff > 0 || mdb_dcmp(txn, dbi, valueData, maxValue) > 0)) {
240+
if (state.indexScore > 0 &&
241+
(keyDiff = isDupValue ? 0 : mdb_cmp(state.txn, state.dbi, state.keyData, state.maxKey)) >= 0
242+
&& (keyDiff > 0 || mdb_dcmp(state.txn, state.dbi, state.valueData, state.maxValue) > 0)) {
241243
lastResult = MDB_NOTFOUND;
242244
} else if (notMatches(isDupValue)) {
243245
// value doesn't match search key/mask, fetch next value
244-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT_DUP);
246+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_NEXT_DUP);
245247
if (lastResult != MDB_SUCCESS) {
246248
// no more duplicates, move to next key
247-
lastResult = mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT_NODUP);
249+
lastResult = mdb_cursor_get(state.cursor, state.keyData, state.valueData, MDB_NEXT_NODUP);
250+
isDupValue = false;
248251
}
249-
isDupValue = false;
250252
} else {
251253
// Matching value found
252-
index.entryToQuad(keyData.mv_data(), valueData.mv_data(), patternQuad, quad);
254+
state.index.entryToQuad(state.keyData.mv_data(), state.valueData.mv_data(), state.patternQuad,
255+
state.quad);
253256
// fetch next value
254257
fetchNext = true;
255-
return quad;
258+
return state.quad;
256259
}
257260
}
258261
closeInternal(false);
259262
return null;
260263
} finally {
261-
txnLockManager.unlockRead(readStamp);
264+
state.txnLockManager.unlockRead(readStamp);
262265
}
263266
}
264267

265268
private boolean notMatches(boolean testValueOnly) {
266-
if (matcher != null) {
269+
if (state.matcher != null) {
267270
if (testValueOnly) {
268271
// already positioned on correct key, no need to match key again
269-
return !this.matcher.matchesValue(valueData.mv_data());
272+
return !state.matcher.matchesValue(state.valueData.mv_data());
270273
}
271-
return !this.matcher.matches(keyData.mv_data(), valueData.mv_data());
272-
} else if (matchValues) {
274+
return !state.matcher.matches(state.keyData.mv_data(), state.valueData.mv_data());
275+
} else if (state.matchValues) {
273276
// lazy init of group matcher
274-
this.matcher = index.createMatcher(patternQuad[0], patternQuad[1], patternQuad[2], patternQuad[3]);
275-
return !this.matcher.matches(keyData.mv_data(), valueData.mv_data());
277+
state.matcher = state.index.createMatcher(state.patternQuad[0], state.patternQuad[1], state.patternQuad[2],
278+
state.patternQuad[3]);
279+
return !state.matcher.matches(state.keyData.mv_data(), state.valueData.mv_data());
276280
} else {
277281
return false;
278282
}
@@ -284,32 +288,22 @@ private void closeInternal(boolean maybeCalledAsync) {
284288
boolean writeLocked = false;
285289
if (maybeCalledAsync && ownerThread != Thread.currentThread()) {
286290
try {
287-
writeStamp = txnLockManager.writeLock();
291+
writeStamp = state.txnLockManager.writeLock();
288292
writeLocked = true;
289293
} catch (InterruptedException e) {
290294
throw new SailException(e);
291295
}
292296
}
293297
try {
294298
if (!closed) {
295-
mdb_cursor_close(cursor);
296-
pool.free(keyData);
297-
pool.free(valueData);
298-
if (minKeyBuf != null) {
299-
pool.free(minKeyBuf);
300-
pool.free(minValueBuf);
301-
}
302-
if (maxKey != null) {
303-
pool.free(maxKeyBuf);
304-
pool.free(maxKey);
305-
pool.free(maxValueBuf);
306-
pool.free(maxValue);
307-
}
299+
state.txnRef.returnCursor(state.dbi, state.cursor);
300+
state.cursor = 0;
301+
Pool.get().free(state);
308302
}
309303
} finally {
310304
closed = true;
311305
if (writeLocked) {
312-
txnLockManager.unlockWrite(writeStamp);
306+
state.txnLockManager.unlockWrite(writeStamp);
313307
}
314308
}
315309
}

0 commit comments

Comments
 (0)