-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[lucene] Support vector index using Lucene #6773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* upstream/master: [test] Fix test SparkWriteITCase.testTruncatePartitionValueNull [orc] Limiting Memory Usage of OrcBulkWriter When Writing VectorizedRowBatch (apache#6590) [arrow] Improve customization capabilities for data type conversion. (apache#6695) [spark] Fix NPE in spark truncate null partitions [core] Exclude .class files from sources.jar (apache#6707) [core] DataEvolutionFileStoreScan should not filter files by read type when it contains no physical columns. (apache#6714) [spark] Update scalafmt version to 3.10.2 (apache#6709) [variant] Extract only required columns when reading shredded variants (apache#6720) [python] Fix read large volume of blob data (apache#6701) [flink] Support cdc source (apache#6606) [hive] fix splitting for bucket tables (apache#6594) [spark] Update spark build topology for global index (2) (apache#6703) [test][spark] Fix the flaky test setDefaultDatabase (apache#6696) [spark] Update global index build topology (apache#6700) [spark] Introduce global file index builder on spark (apache#6684) [python] Fix with_shard feature for blob data (apache#6691) [test][spark] Add alter with incompatible col type test case (apache#6689) [variant] Introduce withVariantAccess in ReadBuilder (apache#6685) [python] Fix file name prefix in postpone mode. (apache#6668)
* upstream/master: [core] format table: throw exception when get partiiton from file system (apache#6730) [core] Enrich static creation methods for GlobalIndexResult [spark] Fix duplicate column error when merging on _ROW_ID (apache#6727) [core] Remove Builder of SplitReadProvider.Context [core] Intruduce bitmap64 for GlobalIndexResult and support with ranges filter push down (apache#6725) [spark] Push down partition filter when compactUnAwareBucketTable (apache#6663) [core] Support the new split description for reading data (apache#6711) [python] support paimon as ray datasource for distributed processing (apache#6686) [docs] Add missing PaimonSparkSessionExtensions to Spark configs (apache#6729) Revert "[flink] Flink batch job support specify partition with max_pt() and max_two_pt()" without review [flink] Flink batch job support specify partition with max_pt() and max_two_pt() [spark] Update the display of size and timing metrics (apache#6722) [core] Append commit should check bucket number if latest commit user is different (apache#6723) [core] ShardScanner should not keep partition parameter (apache#6724)
* upstream/master: [core] Upgrade LZ4 dependency to 1.8.1 (apache#6737) [core] Update the result of show global system tables (apache#6751) [python] Introduce lance file format support to pypaimon (apache#6746) [Python] Support DLF OSS endpoint override in RESTTokenFileIO (apache#6749) [python] fix pypaimon manifest entry incomplete identifier issue (apache#6748) [spark] Merge into supports _ROW_ID shortcut (apache#6745) [core] Fix BlockIterator may not move forward after calling `next()` (apache#6738)
paimon-vector-index/pom.xml
Outdated
| <version>1.4-SNAPSHOT</version> | ||
| </parent> | ||
|
|
||
| <artifactId>paimon-vector-index</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe paimon-lucene is better.
7522f32 to
27f41fb
Compare
27f41fb to
f1b6011
Compare
| IndexReader reader = null; | ||
| boolean success = false; | ||
| try { | ||
| directory = deserializeDirectory(in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current situation: serializeDirectory / deserializeDirectory only write the number of files, the length of each filename, the filename itself, and the file length, then directly write the bytes. There is no magic number, version number, or verification/checksum mechanism.
Impact: If the serialization structure is modified in the future, older versions will not be able to recognize it; it's difficult to pinpoint which part is corrupted if the stream is damaged; security/robustness is poor (prone to silent corruption).
Recommendation: Add a magic number, version number, and CRC32 for each file (or an overall checksum), and use standard DataOutputStream/DataInputStream to write int/long, avoiding manual ByteBuffer wrapping. This will be more robust and easier to maintain backward compatibility (using a version field).
examples:
private static final int MAGIC = 0x50414D4F; // 'PAMO' or any magic
private static final short VERSION = 1;
private void serializeDirectory(Directory directory, OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(out));
dos.writeInt(MAGIC);
dos.writeShort(VERSION);
String[] files = directory.listAll();
dos.writeInt(files.length);
for (String fileName : files) {
byte[] nameBytes = fileName.getBytes(StandardCharsets.UTF_8);
dos.writeInt(nameBytes.length);
dos.write(nameBytes);
long fileLength = directory.fileLength(fileName);
dos.writeLong(fileLength);
try (IndexInput input = directory.openInput(fileName, IOContext.DEFAULT)) {
byte[] buffer = new byte[32 * 1024];
long remaining = fileLength;
CRC32 crc = new CRC32();
while (remaining > 0) {
int toRead = (int) Math.min(buffer.length, remaining);
input.readBytes(buffer, 0, toRead);
dos.write(buffer, 0, toRead);
crc.update(buffer, 0, toRead);
remaining -= toRead;
}
dos.writeLong(crc.getValue()); // per-file checksum
}
}
dos.flush();
}
And the corresponding deserializeDirectory function (which reads the magic number/version and verifies the CRC).
private IndexMMapDirectory deserializeDirectory(SeekableInputStream in) throws IOException {
DataInputStream dis = new DataInputStream(new BufferedInputStream(in));
int magic = dis.readInt();
if (magic != MAGIC) {
throw new IOException("Invalid vector index file magic: " + Integer.toHexString(magic));
}
short version = dis.readShort();
if (version != VERSION) {
throw new IOException("Unsupported vector index version: " + version);
}
IndexMMapDirectory indexMMapDirectory = new IndexMMapDirectory();
try {
int numFiles = dis.readInt();
byte[] buffer = new byte[BUFFER_SIZE];
for (int i = 0; i < numFiles; i++) {
int nameLength = dis.readInt();
byte[] nameBytes = new byte[nameLength];
dis.readFully(nameBytes);
String fileName = new String(nameBytes, StandardCharsets.UTF_8);
long fileLength = dis.readLong();
long expectedCrc = -1L;
// We need to stream file data to index output, and compute CRC while reading
try (IndexOutput output = indexMMapDirectory.directory().createOutput(fileName, IOContext.DEFAULT)) {
long remaining = fileLength;
CRC32 crc = new CRC32();
while (remaining > 0) {
int toRead = (int) Math.min(buffer.length, remaining);
dis.readFully(buffer, 0, toRead);
output.writeBytes(buffer, 0, toRead);
crc.update(buffer, 0, toRead);
remaining -= toRead;
}
expectedCrc = dis.readLong();
if (crc.getValue() != expectedCrc) {
throw new IOException("CRC mismatch for file " + fileName);
}
}
}
return indexMMapDirectory;
} catch (Exception e) {
try {
indexMMapDirectory.close();
} catch (Exception ee) {
// ignore or add suppressed
e.addSuppressed(ee);
}
if (e instanceof IOException) {
throw (IOException) e;
} else {
throw new IOException("Failed to deserialize directory", e);
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaori-seasons Please do not copy all AI answer to the review comment, it will cost author a lot to understand a simple point of view. You should summarize them into one or two words, like "I think add a MAGIC number is better".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @leaves12138 This is really a waste of everyone's time.
566c8fd to
0a90b51
Compare
* upstream/master: [Python] Support read deletion vector pk table (apache#6766) [core] Fix retry in Consumer and SnapshotManager (apache#6780) [core] Introduce Like LeafFunction for Predicate (apache#6776) [core] Remote lookup file should overwrite old orphan file (apache#6769) [core] Make bucketed append table write initial lighter (apache#6741) [python] update requirements.txt for pyarrow and pylance (apache#6774) [docs] Fix error in blob.md docs (apache#6771) [core] Should prevent users from specifying partition columns as blob field (apache#6753) [test] add e2e test case for lance read write with python and java (apache#6765) [spark] keep the rowId unchanged when updating the row tracking and deletion vectors table. (apache#6756) [core] Remove useless classes and clean memory in FileBasedBloomFilter [core] Extract an SST File Format from LookupStore. (apache#6755) [python] fix pypaimin timestamp non free time zone issue (apache#6750) [doc] Add blob document (apache#6757) [lance] update lance version to 0.39.0 (apache#6758) [python] update data type of _VALUE_KIND to int8 (apache#6759)
| push: | ||
| pull_request: | ||
| paths: | ||
| - 'paimon-lucene/**' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not cover this in utitcase-jdk11?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @jerry-024
Left some comments below
ec2feb1 to
51ddaae
Compare
| private final MMapDirectory mmapDirectory; | ||
|
|
||
| public IndexMMapDirectory() throws IOException { | ||
| this.path = java.nio.file.Files.createTempDirectory("paimon-lucene-" + UUID.randomUUID()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use java.nio.file.Files in class, import them in head.
| public void close() throws Exception { | ||
| mmapDirectory.close(); | ||
| if (java.nio.file.Files.exists(path)) { | ||
| java.nio.file.Files.walk(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| mmapDirectory.close(); | ||
| if (java.nio.file.Files.exists(path)) { | ||
| java.nio.file.Files.walk(path) | ||
| .sorted(java.util.Comparator.reverseOrder()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to sort
| .forEach( | ||
| p -> { | ||
| try { | ||
| java.nio.file.Files.delete(p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| /** Factory for creating vector global indexers. */ | ||
| public class VectorGlobalIndexerFactory implements GlobalIndexerFactory { | ||
|
|
||
| public static final String IDENTIFIER = "vector"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lucene-hnw? "vector" is too wide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about lucene-hnsw? (ps: HNSW (Hierarchical Navigable Small World))
| // Implementation of FunctionVisitor methods | ||
| @Override | ||
| public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) { | ||
| throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not throw Exception. Please return GlobalIndexResult.fromRange(<the range pass to you in GlobalIndexIOMeta>)
0c5ab2a to
30efab8
Compare
4ab8246 to
a33cf08
Compare
a33cf08 to
7bacb22
Compare
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- All classes should start with
Lucene. - Identifier should be
lucene-vector-knn?
7cc814f to
d68e048
Compare
|
+1 |
Purpose
Introduce vector index support for Paimon using Apache Lucene 9.x to enable:
Implementation
Tests
API and Format
Documentation