Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index operations #4

Merged
merged 8 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions fast_forward/index/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from enum import Enum
from time import perf_counter
from typing import Iterable, List, Sequence, Set, Tuple, Union
from typing import Iterable, Iterator, List, Optional, Sequence, Set, Tuple

import numpy as np
import pandas as pd
Expand All @@ -26,6 +26,9 @@ class Mode(Enum):
AVEP = 4


IDSequence = Sequence[Optional[str]]


class Index(abc.ABC):
"""Abstract base class for Fast-Forward indexes."""

Expand Down Expand Up @@ -165,52 +168,66 @@ def __len__(self) -> int:
def _add(
self,
vectors: np.ndarray,
doc_ids: Union[Sequence[str], None],
psg_ids: Union[Sequence[str], None],
doc_ids: Sequence[Optional[str]],
psg_ids: Sequence[Optional[str]],
) -> None:
"""Add vector representations and corresponding IDs to the index.
Each vector is guaranteed to have either a document or passage ID associated.

Document IDs may have duplicates, passage IDs are assumed to be unique.

Args:
vectors (np.ndarray): The representations, shape `(num_vectors, dim)`.
doc_ids (Union[Sequence[str], None]): The corresponding document IDs (may be duplicate).
psg_ids (Union[Sequence[str], None]): The corresponding passage IDs (must be unique).
doc_ids (Sequence[Optional[str]]): The corresponding document IDs.
psg_ids (Sequence[Optional[str]]): The corresponding passage IDs.
"""
pass

def add(
self,
vectors: np.ndarray,
doc_ids: Sequence[str] = None,
psg_ids: Sequence[str] = None,
doc_ids: Sequence[Optional[str]] = None,
psg_ids: Sequence[Optional[str]] = None,
) -> None:
"""Add vector representations and corresponding IDs to the index.
Only one of `doc_ids` and `psg_ids` may be None.

Only one of `doc_ids` and `psg_ids` may be None. Individual IDs in the sequence may also be None,
but each vector must have at least one associated ID.

Document IDs may have duplicates, passage IDs must be unique.

Args:
vectors (np.ndarray): The representations, shape `(num_vectors, dim)`.
doc_id (Sequence[str], optional): The corresponding document IDs (may be duplicate). Defaults to None.
psg_id (Sequence[str], optional): The corresponding passage IDs (must be unique). Defaults to None.
doc_id (Sequence[Optional[str]], optional): The corresponding document IDs (may be duplicate). Defaults to None.
psg_id (Sequence[Optional[str]], optional): The corresponding passage IDs (must be unique). Defaults to None.

Raises:
ValueError: When there are no document IDs and no passage IDs.
ValueError: When vector and index dimensionalities don't match.
ValueError: When the number of IDs does not match the number of vectors.
ValueError: When the vector and index dimensionalities don't match.
ValueError: When a vector has neither a document nor a passage ID.
RuntimeError: When items can't be added to the index for any reason.
"""
if doc_ids is None and psg_ids is None:
raise ValueError("At least one of doc_ids and psg_ids must be provided.")

num_vectors, dim = vectors.shape
if doc_ids is not None:
assert num_vectors == len(doc_ids)
if psg_ids is not None:
assert num_vectors == len(psg_ids)

if doc_ids is None:
doc_ids = [None] * num_vectors
if psg_ids is None:
psg_ids = [None] * num_vectors
if not len(doc_ids) == len(psg_ids) == num_vectors:
raise ValueError("Number of IDs does not match number of vectors.")

if dim != self.dim:
raise ValueError(
f"Vector dimensionality ({dim}) does not match index dimensionality ({self.dim})."
)

for doc_id, psg_id in zip(doc_ids, psg_ids):
if doc_id is None and psg_id is None:
raise ValueError("Vector has neither document nor passage ID.")

self._add(vectors, doc_ids, psg_ids)

@abc.abstractmethod
Expand Down Expand Up @@ -427,3 +444,29 @@ def __call__(
copy=False,
is_sorted=False,
)

@abc.abstractmethod
def batch_iter(
self, batch_size: int
) -> Iterator[Tuple[np.ndarray, IDSequence, IDSequence]]:
"""Iterate over all vectors, document IDs, and passage IDs in batches.
IDs may be either strings or None.

Args:
batch_size (int): Batch size.

Yields:
Tuple[np.ndarray, IDSequence, IDSequence]: Batches of vectors, document IDs (if any), passage IDs (if any).
"""
pass

def __iter__(
self,
) -> Iterator[Tuple[np.ndarray, Optional[str], Optional[str]]]:
"""Iterate over all vectors, document IDs, and passage IDs.

Yields:
Tuple[np.ndarray, Optional[str], Optional[str]]: Vector, document ID (if any), passage ID (if any).
"""
for vectors, doc_ids, psg_ids in self.batch_iter(2**9):
yield from zip(vectors, doc_ids, psg_ids)
89 changes: 53 additions & 36 deletions fast_forward/index/disk.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
from collections import defaultdict
from pathlib import Path
from typing import Iterable, List, Sequence, Set, Tuple, Union
from typing import Iterable, Iterator, List, Optional, Sequence, Set, Tuple

import h5py
import numpy as np
from tqdm import tqdm

import fast_forward
from fast_forward.encoder import Encoder
from fast_forward.index import Index, Mode
from fast_forward.index import IDSequence, Index, Mode
from fast_forward.index.memory import InMemoryIndex

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -157,10 +157,25 @@ def to_memory(self, buffer_size=None) -> InMemoryIndex:
def _add(
self,
vectors: np.ndarray,
doc_ids: Union[Sequence[str], None],
psg_ids: Union[Sequence[str], None],
doc_ids: Sequence[Optional[str]],
psg_ids: Sequence[Optional[str]],
) -> None:
with h5py.File(self._index_file, "a") as fp:

# check all IDs first before adding anything
doc_id_size = fp["doc_ids"].dtype.itemsize
for doc_id in doc_ids:
if doc_id is not None and len(doc_id) > doc_id_size:
raise RuntimeError(
f"Document ID {doc_id} is longer than the maximum ({doc_id_size} characters)."
)
psg_id_size = fp["psg_ids"].dtype.itemsize
for psg_id in psg_ids:
if psg_id is not None and len(psg_id) > psg_id_size:
raise RuntimeError(
f"Passage ID {psg_id} is longer than the maximum ({psg_id_size} characters)."
)

num_new_vecs = vectors.shape[0]
capacity = fp["vectors"].shape[0]

Expand All @@ -176,38 +191,23 @@ def _add(
fp["doc_ids"].resize(new_size, axis=0)
fp["psg_ids"].resize(new_size, axis=0)

# check all IDs first before adding anything
doc_id_size = fp["doc_ids"].dtype.itemsize
psg_id_size = fp["psg_ids"].dtype.itemsize
add_doc_ids, add_psg_ids = [], []
if doc_ids is not None:
for i, doc_id in enumerate(doc_ids):
if len(doc_id) > doc_id_size:
raise RuntimeError(
f"Document ID {doc_id} is longer than the maximum ({doc_id_size} characters)."
)
add_doc_ids.append((doc_id, cur_num_vectors + i))
if psg_ids is not None:
for i, psg_id in enumerate(psg_ids):
if len(psg_id) > psg_id_size:
raise RuntimeError(
f"Passage ID {psg_id} is longer than the maximum ({psg_id_size} characters)."
)
add_psg_ids.append((psg_id, cur_num_vectors + i))

# add new IDs to index and in-memory mappings
if doc_ids is not None:
for doc_id, idx in add_doc_ids:
self._doc_id_to_idx[doc_id].append(idx)
fp["doc_ids"][
cur_num_vectors : cur_num_vectors + num_new_vecs
] = doc_ids
if psg_ids is not None:
for psg_id, idx in add_psg_ids:
self._psg_id_to_idx[psg_id] = idx
fp["psg_ids"][
cur_num_vectors : cur_num_vectors + num_new_vecs
] = psg_ids
# add new document IDs to index and in-memory mappings
doc_id_idxs, non_null_doc_ids = [], []
for i, doc_id in enumerate(doc_ids):
if doc_id is not None:
self._doc_id_to_idx[doc_id].append(cur_num_vectors + i)
doc_id_idxs.append(cur_num_vectors + i)
non_null_doc_ids.append(doc_id)
fp["doc_ids"][doc_id_idxs] = non_null_doc_ids

# add new passage IDs to index and in-memory mappings
psg_id_idxs, non_null_psg_ids = [], []
for i, psg_id in enumerate(psg_ids):
if psg_id is not None:
self._psg_id_to_idx[psg_id] = cur_num_vectors + i
psg_id_idxs.append(cur_num_vectors + i)
non_null_psg_ids.append(psg_id)
fp["psg_ids"][psg_id_idxs] = non_null_psg_ids

# add new vectors
fp["vectors"][cur_num_vectors : cur_num_vectors + num_new_vecs] = vectors
Expand Down Expand Up @@ -253,6 +253,23 @@ def _get_vectors(self, ids: Iterable[str]) -> Tuple[np.ndarray, List[List[int]]]
)
return vectors, [id_to_idxs[id] for id in ids]

def batch_iter(
self, batch_size: int
) -> Iterator[Tuple[np.ndarray, IDSequence, IDSequence]]:
with h5py.File(self._index_file, "r") as fp:
num_vectors = fp.attrs["num_vectors"]
for i in range(0, num_vectors, batch_size):
j = min(i + batch_size, num_vectors)
doc_ids = fp["doc_ids"].asstr()[i:j]
doc_ids[doc_ids == ""] = None
psg_ids = fp["psg_ids"].asstr()[i:j]
psg_ids[psg_ids == ""] = None
yield (
fp["vectors"][i:j],
doc_ids.tolist(),
psg_ids.tolist(),
)

@classmethod
def load(
cls,
Expand Down
Loading