Compare commits
4 Commits
dynamic-ad
...
colqwen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df3350be43 | ||
|
|
94d9a203a2 | ||
|
|
72455bb269 | ||
|
|
d034e2195b |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -18,6 +18,7 @@ demo/experiment_results/**/*.json
|
||||
*.eml
|
||||
*.emlx
|
||||
*.json
|
||||
*.png
|
||||
!.vscode/*.json
|
||||
*.sh
|
||||
*.txt
|
||||
@@ -101,4 +102,6 @@ CLAUDE.local.md
|
||||
.claude/*.local.*
|
||||
.claude/local/*
|
||||
benchmarks/data/
|
||||
test_add/*
|
||||
|
||||
## multi vector
|
||||
apps/multimodal/vision-based-pdf-multi-vector/multi-vector-colpali-native-weaviate.py
|
||||
|
||||
@@ -0,0 +1,182 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def _ensure_repo_paths_importable(current_file: str) -> None:
|
||||
_repo_root = Path(current_file).resolve().parents[3]
|
||||
_leann_core_src = _repo_root / "packages" / "leann-core" / "src"
|
||||
_leann_hnsw_pkg = _repo_root / "packages" / "leann-backend-hnsw"
|
||||
if str(_leann_core_src) not in sys.path:
|
||||
sys.path.append(str(_leann_core_src))
|
||||
if str(_leann_hnsw_pkg) not in sys.path:
|
||||
sys.path.append(str(_leann_hnsw_pkg))
|
||||
|
||||
|
||||
_ensure_repo_paths_importable(__file__)
|
||||
|
||||
from leann_backend_hnsw.hnsw_backend import HNSWBuilder, HNSWSearcher # noqa: E402
|
||||
|
||||
|
||||
class LeannMultiVector:
|
||||
def __init__(
|
||||
self,
|
||||
index_path: str,
|
||||
dim: int = 128,
|
||||
distance_metric: str = "mips",
|
||||
m: int = 16,
|
||||
ef_construction: int = 500,
|
||||
is_compact: bool = False,
|
||||
is_recompute: bool = False,
|
||||
embedding_model_name: str = "colvision",
|
||||
) -> None:
|
||||
self.index_path = index_path
|
||||
self.dim = dim
|
||||
self.embedding_model_name = embedding_model_name
|
||||
self._pending_items: list[dict] = []
|
||||
self._backend_kwargs = {
|
||||
"distance_metric": distance_metric,
|
||||
"M": m,
|
||||
"efConstruction": ef_construction,
|
||||
"is_compact": is_compact,
|
||||
"is_recompute": is_recompute,
|
||||
}
|
||||
self._labels_meta: list[dict] = []
|
||||
|
||||
def _meta_dict(self) -> dict:
|
||||
return {
|
||||
"version": "1.0",
|
||||
"backend_name": "hnsw",
|
||||
"embedding_model": self.embedding_model_name,
|
||||
"embedding_mode": "custom",
|
||||
"dimensions": self.dim,
|
||||
"backend_kwargs": self._backend_kwargs,
|
||||
"is_compact": self._backend_kwargs.get("is_compact", True),
|
||||
"is_pruned": self._backend_kwargs.get("is_compact", True)
|
||||
and self._backend_kwargs.get("is_recompute", True),
|
||||
}
|
||||
|
||||
def create_collection(self) -> None:
|
||||
path = Path(self.index_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def insert(self, data: dict) -> None:
|
||||
self._pending_items.append(
|
||||
{
|
||||
"doc_id": int(data["doc_id"]),
|
||||
"filepath": data.get("filepath", ""),
|
||||
"colbert_vecs": [np.asarray(v, dtype=np.float32) for v in data["colbert_vecs"]],
|
||||
}
|
||||
)
|
||||
|
||||
def _labels_path(self) -> Path:
|
||||
index_path_obj = Path(self.index_path)
|
||||
return index_path_obj.parent / f"{index_path_obj.name}.labels.json"
|
||||
|
||||
def _meta_path(self) -> Path:
|
||||
index_path_obj = Path(self.index_path)
|
||||
return index_path_obj.parent / f"{index_path_obj.name}.meta.json"
|
||||
|
||||
def create_index(self) -> None:
|
||||
if not self._pending_items:
|
||||
return
|
||||
|
||||
embeddings: list[np.ndarray] = []
|
||||
labels_meta: list[dict] = []
|
||||
|
||||
for item in self._pending_items:
|
||||
doc_id = int(item["doc_id"])
|
||||
filepath = item.get("filepath", "")
|
||||
colbert_vecs = item["colbert_vecs"]
|
||||
for seq_id, vec in enumerate(colbert_vecs):
|
||||
vec_np = np.asarray(vec, dtype=np.float32)
|
||||
embeddings.append(vec_np)
|
||||
labels_meta.append(
|
||||
{
|
||||
"id": f"{doc_id}:{seq_id}",
|
||||
"doc_id": doc_id,
|
||||
"seq_id": int(seq_id),
|
||||
"filepath": filepath,
|
||||
}
|
||||
)
|
||||
|
||||
if not embeddings:
|
||||
return
|
||||
|
||||
embeddings_np = np.vstack(embeddings).astype(np.float32)
|
||||
# print shape of embeddings_np
|
||||
print(embeddings_np.shape)
|
||||
|
||||
builder = HNSWBuilder(**{**self._backend_kwargs, "dimensions": self.dim})
|
||||
ids = [str(i) for i in range(embeddings_np.shape[0])]
|
||||
builder.build(embeddings_np, ids, self.index_path)
|
||||
|
||||
import json as _json
|
||||
|
||||
with open(self._meta_path(), "w", encoding="utf-8") as f:
|
||||
_json.dump(self._meta_dict(), f, indent=2)
|
||||
with open(self._labels_path(), "w", encoding="utf-8") as f:
|
||||
_json.dump(labels_meta, f)
|
||||
|
||||
self._labels_meta = labels_meta
|
||||
|
||||
def _load_labels_meta_if_needed(self) -> None:
|
||||
if self._labels_meta:
|
||||
return
|
||||
labels_path = self._labels_path()
|
||||
if labels_path.exists():
|
||||
import json as _json
|
||||
|
||||
with open(labels_path, encoding="utf-8") as f:
|
||||
self._labels_meta = _json.load(f)
|
||||
|
||||
def search(
|
||||
self, data: np.ndarray, topk: int, first_stage_k: int = 50
|
||||
) -> list[tuple[float, int]]:
|
||||
if data.ndim == 1:
|
||||
data = data.reshape(1, -1)
|
||||
if data.dtype != np.float32:
|
||||
data = data.astype(np.float32)
|
||||
|
||||
self._load_labels_meta_if_needed()
|
||||
|
||||
searcher = HNSWSearcher(self.index_path, meta=self._meta_dict())
|
||||
raw = searcher.search(
|
||||
data,
|
||||
first_stage_k,
|
||||
recompute_embeddings=False,
|
||||
complexity=128,
|
||||
beam_width=1,
|
||||
prune_ratio=0.0,
|
||||
batch_size=0,
|
||||
)
|
||||
|
||||
labels = raw.get("labels")
|
||||
distances = raw.get("distances")
|
||||
if labels is None or distances is None:
|
||||
return []
|
||||
|
||||
doc_scores: dict[int, float] = {}
|
||||
B = len(labels)
|
||||
for b in range(B):
|
||||
per_doc_best: dict[int, float] = {}
|
||||
for k, sid in enumerate(labels[b]):
|
||||
try:
|
||||
idx = int(sid)
|
||||
except Exception:
|
||||
continue
|
||||
if 0 <= idx < len(self._labels_meta):
|
||||
doc_id = int(self._labels_meta[idx]["doc_id"]) # type: ignore[index]
|
||||
else:
|
||||
continue
|
||||
score = float(distances[b][k])
|
||||
if (doc_id not in per_doc_best) or (score > per_doc_best[doc_id]):
|
||||
per_doc_best[doc_id] = score
|
||||
for doc_id, best_score in per_doc_best.items():
|
||||
doc_scores[doc_id] = doc_scores.get(doc_id, 0.0) + best_score
|
||||
|
||||
scores = sorted(((v, k) for k, v in doc_scores.items()), key=lambda x: x[0], reverse=True)
|
||||
return scores[:topk] if len(scores) >= topk else scores
|
||||
@@ -0,0 +1,477 @@
|
||||
## Jupyter-style notebook script
|
||||
# %%
|
||||
# uv pip install matplotlib qwen_vl_utils
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, cast
|
||||
|
||||
from PIL import Image
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
def _ensure_repo_paths_importable(current_file: str) -> None:
|
||||
"""Make local leann packages importable without installing (mirrors multi-vector-leann.py)."""
|
||||
_repo_root = Path(current_file).resolve().parents[3]
|
||||
_leann_core_src = _repo_root / "packages" / "leann-core" / "src"
|
||||
_leann_hnsw_pkg = _repo_root / "packages" / "leann-backend-hnsw"
|
||||
if str(_leann_core_src) not in sys.path:
|
||||
sys.path.append(str(_leann_core_src))
|
||||
if str(_leann_hnsw_pkg) not in sys.path:
|
||||
sys.path.append(str(_leann_hnsw_pkg))
|
||||
|
||||
|
||||
_ensure_repo_paths_importable(__file__)
|
||||
|
||||
from leann_multi_vector import LeannMultiVector # noqa: E402
|
||||
|
||||
# %%
|
||||
# Config
|
||||
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
||||
QUERY = "How does DeepSeek-V2 compare against the LLaMA family of LLMs?"
|
||||
MODEL: str = "colqwen2" # "colpali" or "colqwen2"
|
||||
|
||||
# Data source: set to True to use the Hugging Face dataset example (recommended)
|
||||
USE_HF_DATASET: bool = True
|
||||
DATASET_NAME: str = "weaviate/arXiv-AI-papers-multi-vector"
|
||||
DATASET_SPLIT: str = "train"
|
||||
MAX_DOCS: Optional[int] = None # limit number of pages to index; None = all
|
||||
|
||||
# Local pages (used when USE_HF_DATASET == False)
|
||||
PDF: Optional[str] = None # e.g., "./pdfs/2004.12832v2.pdf"
|
||||
PAGES_DIR: str = "./pages"
|
||||
|
||||
# Index + retrieval settings
|
||||
INDEX_PATH: str = "./indexes/colvision.leann"
|
||||
TOPK: int = 1
|
||||
FIRST_STAGE_K: int = 500
|
||||
REBUILD_INDEX: bool = False
|
||||
|
||||
# Artifacts
|
||||
SAVE_TOP_IMAGE: Optional[str] = "./figures/retrieved_page.png"
|
||||
SIMILARITY_MAP: bool = True
|
||||
SIM_TOKEN_IDX: int = 13 # -1 means auto-select the most salient token
|
||||
SIM_OUTPUT: str = "./figures/similarity_map.png"
|
||||
ANSWER: bool = True
|
||||
MAX_NEW_TOKENS: int = 128
|
||||
|
||||
|
||||
# %%
|
||||
# Helpers
|
||||
def _natural_sort_key(name: str) -> int:
|
||||
m = re.search(r"\d+", name)
|
||||
return int(m.group()) if m else 0
|
||||
|
||||
|
||||
def _load_images_from_dir(pages_dir: str) -> tuple[list[str], list[Image.Image]]:
|
||||
filenames = [n for n in os.listdir(pages_dir) if n.lower().endswith((".png", ".jpg", ".jpeg"))]
|
||||
filenames = sorted(filenames, key=_natural_sort_key)
|
||||
filepaths = [os.path.join(pages_dir, n) for n in filenames]
|
||||
images = [Image.open(p) for p in filepaths]
|
||||
return filepaths, images
|
||||
|
||||
|
||||
def _maybe_convert_pdf_to_images(pdf_path: Optional[str], pages_dir: str, dpi: int = 200) -> None:
|
||||
if not pdf_path:
|
||||
return
|
||||
os.makedirs(pages_dir, exist_ok=True)
|
||||
try:
|
||||
from pdf2image import convert_from_path
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"pdf2image is required to convert PDF to images. Install via pip install pdf2image"
|
||||
) from e
|
||||
images = convert_from_path(pdf_path, dpi=dpi)
|
||||
for i, image in enumerate(images):
|
||||
image.save(os.path.join(pages_dir, f"page_{i + 1}.png"), "PNG")
|
||||
|
||||
|
||||
def _select_device_and_dtype():
|
||||
import torch
|
||||
from colpali_engine.utils.torch_utils import get_torch_device
|
||||
|
||||
device_str = (
|
||||
"cuda"
|
||||
if torch.cuda.is_available()
|
||||
else (
|
||||
"mps"
|
||||
if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available()
|
||||
else "cpu"
|
||||
)
|
||||
)
|
||||
device = get_torch_device(device_str)
|
||||
# Stable dtype selection to avoid NaNs:
|
||||
# - CUDA: prefer bfloat16 if supported, else float16
|
||||
# - MPS: use float32 (fp16 on MPS can produce NaNs in some ops)
|
||||
# - CPU: float32
|
||||
if device_str == "cuda":
|
||||
dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
|
||||
try:
|
||||
torch.backends.cuda.matmul.allow_tf32 = True # Better stability/perf on Ampere+
|
||||
except Exception:
|
||||
pass
|
||||
elif device_str == "mps":
|
||||
dtype = torch.float32
|
||||
else:
|
||||
dtype = torch.float32
|
||||
return device_str, device, dtype
|
||||
|
||||
|
||||
def _load_colvision(model_choice: str):
|
||||
import torch
|
||||
from colpali_engine.models import ColPali, ColQwen2, ColQwen2Processor
|
||||
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
|
||||
from transformers.utils.import_utils import is_flash_attn_2_available
|
||||
|
||||
device_str, device, dtype = _select_device_and_dtype()
|
||||
|
||||
if model_choice == "colqwen2":
|
||||
model_name = "vidore/colqwen2-v1.0"
|
||||
# On CPU/MPS we must avoid flash-attn and stay eager; on CUDA prefer flash-attn if available
|
||||
attn_implementation = (
|
||||
"flash_attention_2"
|
||||
if (device_str == "cuda" and is_flash_attn_2_available())
|
||||
else "eager"
|
||||
)
|
||||
model = ColQwen2.from_pretrained(
|
||||
model_name,
|
||||
torch_dtype=torch.bfloat16,
|
||||
device_map=device,
|
||||
attn_implementation=attn_implementation,
|
||||
).eval()
|
||||
processor = ColQwen2Processor.from_pretrained(model_name)
|
||||
else:
|
||||
model_name = "vidore/colpali-v1.2"
|
||||
model = ColPali.from_pretrained(
|
||||
model_name,
|
||||
torch_dtype=torch.bfloat16,
|
||||
device_map=device,
|
||||
).eval()
|
||||
processor = cast(ColPaliProcessor, ColPaliProcessor.from_pretrained(model_name))
|
||||
|
||||
return model_name, model, processor, device_str, device, dtype
|
||||
|
||||
|
||||
def _embed_images(model, processor, images: list[Image.Image]) -> list[Any]:
|
||||
import torch
|
||||
from colpali_engine.utils.torch_utils import ListDataset
|
||||
from torch.utils.data import DataLoader
|
||||
|
||||
# Ensure deterministic eval and autocast for stability
|
||||
model.eval()
|
||||
|
||||
dataloader = DataLoader(
|
||||
dataset=ListDataset[Image.Image](images),
|
||||
batch_size=1,
|
||||
shuffle=False,
|
||||
collate_fn=lambda x: processor.process_images(x),
|
||||
)
|
||||
|
||||
doc_vecs: list[Any] = []
|
||||
for batch_doc in dataloader:
|
||||
with torch.no_grad():
|
||||
batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
|
||||
# autocast on CUDA for bf16/fp16; on CPU/MPS stay in fp32
|
||||
if model.device.type == "cuda":
|
||||
with torch.autocast(
|
||||
device_type="cuda",
|
||||
dtype=model.dtype if model.dtype.is_floating_point else torch.bfloat16,
|
||||
):
|
||||
embeddings_doc = model(**batch_doc)
|
||||
else:
|
||||
embeddings_doc = model(**batch_doc)
|
||||
doc_vecs.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
|
||||
return doc_vecs
|
||||
|
||||
|
||||
def _embed_queries(model, processor, queries: list[str]) -> list[Any]:
|
||||
import torch
|
||||
from colpali_engine.utils.torch_utils import ListDataset
|
||||
from torch.utils.data import DataLoader
|
||||
|
||||
model.eval()
|
||||
|
||||
dataloader = DataLoader(
|
||||
dataset=ListDataset[str](queries),
|
||||
batch_size=1,
|
||||
shuffle=False,
|
||||
collate_fn=lambda x: processor.process_queries(x),
|
||||
)
|
||||
|
||||
q_vecs: list[Any] = []
|
||||
for batch_query in dataloader:
|
||||
with torch.no_grad():
|
||||
batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
|
||||
if model.device.type == "cuda":
|
||||
with torch.autocast(
|
||||
device_type="cuda",
|
||||
dtype=model.dtype if model.dtype.is_floating_point else torch.bfloat16,
|
||||
):
|
||||
embeddings_query = model(**batch_query)
|
||||
else:
|
||||
embeddings_query = model(**batch_query)
|
||||
q_vecs.extend(list(torch.unbind(embeddings_query.to("cpu"))))
|
||||
return q_vecs
|
||||
|
||||
|
||||
def _build_index(index_path: str, doc_vecs: list[Any], filepaths: list[str]) -> LeannMultiVector:
|
||||
dim = int(doc_vecs[0].shape[-1])
|
||||
retriever = LeannMultiVector(index_path=index_path, dim=dim)
|
||||
retriever.create_collection()
|
||||
for i, vec in enumerate(doc_vecs):
|
||||
data = {
|
||||
"colbert_vecs": vec.float().numpy(),
|
||||
"doc_id": i,
|
||||
"filepath": filepaths[i],
|
||||
}
|
||||
retriever.insert(data)
|
||||
retriever.create_index()
|
||||
return retriever
|
||||
|
||||
|
||||
def _load_retriever_if_index_exists(index_path: str, dim: int) -> Optional[LeannMultiVector]:
|
||||
index_base = Path(index_path)
|
||||
# Rough heuristic: index dir exists AND meta+labels files exist
|
||||
meta = index_base.parent / f"{index_base.name}.meta.json"
|
||||
labels = index_base.parent / f"{index_base.name}.labels.json"
|
||||
if index_base.exists() and meta.exists() and labels.exists():
|
||||
return LeannMultiVector(index_path=index_path, dim=dim)
|
||||
return None
|
||||
|
||||
|
||||
def _generate_similarity_map(
|
||||
model,
|
||||
processor,
|
||||
image: Image.Image,
|
||||
query: str,
|
||||
token_idx: Optional[int] = None,
|
||||
output_path: Optional[str] = None,
|
||||
) -> tuple[int, float]:
|
||||
import torch
|
||||
from colpali_engine.interpretability import (
|
||||
get_similarity_maps_from_embeddings,
|
||||
plot_similarity_map,
|
||||
)
|
||||
|
||||
batch_images = processor.process_images([image]).to(model.device)
|
||||
batch_queries = processor.process_queries([query]).to(model.device)
|
||||
|
||||
with torch.no_grad():
|
||||
image_embeddings = model.forward(**batch_images)
|
||||
query_embeddings = model.forward(**batch_queries)
|
||||
|
||||
n_patches = processor.get_n_patches(
|
||||
image_size=image.size,
|
||||
spatial_merge_size=getattr(model, "spatial_merge_size", None),
|
||||
)
|
||||
image_mask = processor.get_image_mask(batch_images)
|
||||
|
||||
batched_similarity_maps = get_similarity_maps_from_embeddings(
|
||||
image_embeddings=image_embeddings,
|
||||
query_embeddings=query_embeddings,
|
||||
n_patches=n_patches,
|
||||
image_mask=image_mask,
|
||||
)
|
||||
|
||||
similarity_maps = batched_similarity_maps[0]
|
||||
|
||||
# Determine token index if not provided: choose the token with highest max score
|
||||
if token_idx is None:
|
||||
per_token_max = similarity_maps.view(similarity_maps.shape[0], -1).max(dim=1).values
|
||||
token_idx = int(per_token_max.argmax().item())
|
||||
|
||||
max_sim_score = similarity_maps[token_idx, :, :].max().item()
|
||||
|
||||
if output_path:
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
fig, ax = plot_similarity_map(
|
||||
image=image,
|
||||
similarity_map=similarity_maps[token_idx],
|
||||
figsize=(14, 14),
|
||||
show_colorbar=False,
|
||||
)
|
||||
ax.set_title(f"Token #{token_idx}. MaxSim score: {max_sim_score:.2f}", fontsize=12)
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
plt.savefig(output_path, bbox_inches="tight")
|
||||
plt.close(fig)
|
||||
|
||||
return token_idx, float(max_sim_score)
|
||||
|
||||
|
||||
class QwenVL:
|
||||
def __init__(self, device: str):
|
||||
from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration
|
||||
from transformers.utils.import_utils import is_flash_attn_2_available
|
||||
|
||||
attn_implementation = "flash_attention_2" if is_flash_attn_2_available() else "eager"
|
||||
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
|
||||
"Qwen/Qwen2.5-VL-3B-Instruct",
|
||||
torch_dtype="auto",
|
||||
device_map=device,
|
||||
attn_implementation=attn_implementation,
|
||||
)
|
||||
|
||||
min_pixels = 256 * 28 * 28
|
||||
max_pixels = 1280 * 28 * 28
|
||||
self.processor = AutoProcessor.from_pretrained(
|
||||
"Qwen/Qwen2.5-VL-3B-Instruct", min_pixels=min_pixels, max_pixels=max_pixels
|
||||
)
|
||||
|
||||
def answer(self, query: str, images: list[Image.Image], max_new_tokens: int = 128) -> str:
|
||||
import base64
|
||||
from io import BytesIO
|
||||
|
||||
from qwen_vl_utils import process_vision_info
|
||||
|
||||
content = []
|
||||
for img in images:
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format="jpeg")
|
||||
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
|
||||
content.append({"type": "image", "image": f"data:image;base64,{img_base64}"})
|
||||
content.append({"type": "text", "text": query})
|
||||
messages = [{"role": "user", "content": content}]
|
||||
|
||||
text = self.processor.apply_chat_template(
|
||||
messages, tokenize=False, add_generation_prompt=True
|
||||
)
|
||||
image_inputs, video_inputs = process_vision_info(messages)
|
||||
inputs = self.processor(
|
||||
text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt"
|
||||
)
|
||||
inputs = inputs.to(self.model.device)
|
||||
|
||||
generated_ids = self.model.generate(**inputs, max_new_tokens=max_new_tokens)
|
||||
generated_ids_trimmed = [
|
||||
out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
|
||||
]
|
||||
return self.processor.batch_decode(
|
||||
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
|
||||
)[0]
|
||||
|
||||
|
||||
# %%
|
||||
|
||||
# Step 1: Prepare data
|
||||
if USE_HF_DATASET:
|
||||
from datasets import load_dataset
|
||||
|
||||
dataset = load_dataset(DATASET_NAME, split=DATASET_SPLIT)
|
||||
N = len(dataset) if MAX_DOCS is None else min(MAX_DOCS, len(dataset))
|
||||
filepaths: list[str] = []
|
||||
images: list[Image.Image] = []
|
||||
for i in tqdm(range(N), desc="Loading dataset"):
|
||||
p = dataset[i]
|
||||
# Compose a descriptive identifier for printing later
|
||||
identifier = f"arXiv:{p['paper_arxiv_id']}|title:{p['paper_title']}|page:{int(p['page_number'])}|id:{p['page_id']}"
|
||||
print(identifier)
|
||||
filepaths.append(identifier)
|
||||
images.append(p["page_image"]) # PIL Image
|
||||
else:
|
||||
_maybe_convert_pdf_to_images(PDF, PAGES_DIR)
|
||||
filepaths, images = _load_images_from_dir(PAGES_DIR)
|
||||
if not images:
|
||||
raise RuntimeError(
|
||||
f"No images found in {PAGES_DIR}. Provide PDF path in PDF variable or ensure images exist."
|
||||
)
|
||||
|
||||
|
||||
# %%
|
||||
# Step 2: Load model and processor
|
||||
model_name, model, processor, device_str, device, dtype = _load_colvision(MODEL)
|
||||
print(f"Using model={model_name}, device={device_str}, dtype={dtype}")
|
||||
|
||||
|
||||
# %%
|
||||
|
||||
# %%
|
||||
# Step 3: Build or load index
|
||||
retriever: Optional[LeannMultiVector] = None
|
||||
if not REBUILD_INDEX:
|
||||
try:
|
||||
one_vec = _embed_images(model, processor, [images[0]])[0]
|
||||
retriever = _load_retriever_if_index_exists(INDEX_PATH, dim=int(one_vec.shape[-1]))
|
||||
except Exception:
|
||||
retriever = None
|
||||
|
||||
if retriever is None:
|
||||
doc_vecs = _embed_images(model, processor, images)
|
||||
retriever = _build_index(INDEX_PATH, doc_vecs, filepaths)
|
||||
|
||||
|
||||
# %%
|
||||
# Step 4: Embed query and search
|
||||
q_vec = _embed_queries(model, processor, [QUERY])[0]
|
||||
results = retriever.search(q_vec.float().numpy(), topk=TOPK, first_stage_k=FIRST_STAGE_K)
|
||||
if not results:
|
||||
print("No results found.")
|
||||
else:
|
||||
print(f'Top {len(results)} results for query: "{QUERY}"')
|
||||
top_images: list[Image.Image] = []
|
||||
for rank, (score, doc_id) in enumerate(results, start=1):
|
||||
path = filepaths[doc_id]
|
||||
# For HF dataset, path is a descriptive identifier, not a real file path
|
||||
print(f"{rank}) MaxSim: {score:.4f}, Page: {path}")
|
||||
top_images.append(images[doc_id])
|
||||
|
||||
if SAVE_TOP_IMAGE:
|
||||
from pathlib import Path as _Path
|
||||
|
||||
base = _Path(SAVE_TOP_IMAGE)
|
||||
base.parent.mkdir(parents=True, exist_ok=True)
|
||||
for rank, img in enumerate(top_images[:TOPK], start=1):
|
||||
if base.suffix:
|
||||
out_path = base.parent / f"{base.stem}_rank{rank}{base.suffix}"
|
||||
else:
|
||||
out_path = base / f"retrieved_page_rank{rank}.png"
|
||||
img.save(str(out_path))
|
||||
print(f"Saved retrieved page (rank {rank}) to: {out_path}")
|
||||
|
||||
## TODO stange results of second page of DeepSeek-V2 rather than the first page
|
||||
|
||||
# %%
|
||||
# Step 5: Similarity maps for top-K results
|
||||
if results and SIMILARITY_MAP:
|
||||
token_idx = None if SIM_TOKEN_IDX < 0 else int(SIM_TOKEN_IDX)
|
||||
from pathlib import Path as _Path
|
||||
|
||||
output_base = _Path(SIM_OUTPUT) if SIM_OUTPUT else None
|
||||
for rank, img in enumerate(top_images[:TOPK], start=1):
|
||||
if output_base:
|
||||
if output_base.suffix:
|
||||
out_dir = output_base.parent
|
||||
out_name = f"{output_base.stem}_rank{rank}{output_base.suffix}"
|
||||
out_path = str(out_dir / out_name)
|
||||
else:
|
||||
out_dir = output_base
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
out_path = str(out_dir / f"similarity_map_rank{rank}.png")
|
||||
else:
|
||||
out_path = None
|
||||
chosen_idx, max_sim = _generate_similarity_map(
|
||||
model=model,
|
||||
processor=processor,
|
||||
image=img,
|
||||
query=QUERY,
|
||||
token_idx=token_idx,
|
||||
output_path=out_path,
|
||||
)
|
||||
if out_path:
|
||||
print(
|
||||
f"Saved similarity map for rank {rank}, token #{chosen_idx} (max={max_sim:.2f}) to: {out_path}"
|
||||
)
|
||||
else:
|
||||
print(
|
||||
f"Computed similarity map for rank {rank}, token #{chosen_idx} (max={max_sim:.2f})"
|
||||
)
|
||||
|
||||
|
||||
# %%
|
||||
# Step 6: Optional answer generation
|
||||
if results and ANSWER:
|
||||
qwen = QwenVL(device=device_str)
|
||||
response = qwen.answer(QUERY, top_images[:TOPK], max_new_tokens=MAX_NEW_TOKENS)
|
||||
print("\nAnswer:")
|
||||
print(response)
|
||||
@@ -0,0 +1,134 @@
|
||||
# pip install pdf2image
|
||||
# pip install pymilvus
|
||||
# pip install colpali_engine
|
||||
# pip install tqdm
|
||||
# pip install pillow
|
||||
|
||||
# %%
|
||||
from pdf2image import convert_from_path
|
||||
|
||||
pdf_path = "pdfs/2004.12832v2.pdf"
|
||||
images = convert_from_path(pdf_path)
|
||||
|
||||
for i, image in enumerate(images):
|
||||
image.save(f"pages/page_{i + 1}.png", "PNG")
|
||||
|
||||
# %%
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Make local leann packages importable without installing
|
||||
_repo_root = Path(__file__).resolve().parents[3]
|
||||
_leann_core_src = _repo_root / "packages" / "leann-core" / "src"
|
||||
_leann_hnsw_pkg = _repo_root / "packages" / "leann-backend-hnsw"
|
||||
import sys
|
||||
|
||||
if str(_leann_core_src) not in sys.path:
|
||||
sys.path.append(str(_leann_core_src))
|
||||
if str(_leann_hnsw_pkg) not in sys.path:
|
||||
sys.path.append(str(_leann_hnsw_pkg))
|
||||
|
||||
from leann_multi_vector import LeannMultiVector
|
||||
|
||||
|
||||
class LeannRetriever(LeannMultiVector):
|
||||
pass
|
||||
|
||||
|
||||
# %%
|
||||
from typing import cast
|
||||
|
||||
import torch
|
||||
from colpali_engine.models import ColPali
|
||||
from colpali_engine.models.paligemma.colpali.processing_colpali import ColPaliProcessor
|
||||
from colpali_engine.utils.torch_utils import ListDataset, get_torch_device
|
||||
from torch.utils.data import DataLoader
|
||||
|
||||
# Auto-select device: CUDA > MPS (mac) > CPU
|
||||
_device_str = (
|
||||
"cuda"
|
||||
if torch.cuda.is_available()
|
||||
else (
|
||||
"mps"
|
||||
if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available()
|
||||
else "cpu"
|
||||
)
|
||||
)
|
||||
device = get_torch_device(_device_str)
|
||||
# Prefer fp16 on GPU/MPS, bfloat16 on CPU
|
||||
_dtype = torch.float16 if _device_str in ("cuda", "mps") else torch.bfloat16
|
||||
model_name = "vidore/colpali-v1.2"
|
||||
|
||||
model = ColPali.from_pretrained(
|
||||
model_name,
|
||||
torch_dtype=_dtype,
|
||||
device_map=device,
|
||||
).eval()
|
||||
print(f"Using device={_device_str}, dtype={_dtype}")
|
||||
|
||||
queries = [
|
||||
"How to end-to-end retrieval with ColBert",
|
||||
"Where is ColBERT performance Table, including text representation results?",
|
||||
]
|
||||
|
||||
processor = cast(ColPaliProcessor, ColPaliProcessor.from_pretrained(model_name))
|
||||
|
||||
dataloader = DataLoader(
|
||||
dataset=ListDataset[str](queries),
|
||||
batch_size=1,
|
||||
shuffle=False,
|
||||
collate_fn=lambda x: processor.process_queries(x),
|
||||
)
|
||||
|
||||
qs: list[torch.Tensor] = []
|
||||
for batch_query in dataloader:
|
||||
with torch.no_grad():
|
||||
batch_query = {k: v.to(model.device) for k, v in batch_query.items()}
|
||||
embeddings_query = model(**batch_query)
|
||||
qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))
|
||||
print(qs[0].shape)
|
||||
# %%
|
||||
|
||||
|
||||
import re
|
||||
|
||||
from PIL import Image
|
||||
from tqdm import tqdm
|
||||
|
||||
page_filenames = sorted(os.listdir("./pages"), key=lambda n: int(re.search(r"\d+", n).group()))
|
||||
images = [Image.open(os.path.join("./pages", name)) for name in page_filenames]
|
||||
|
||||
dataloader = DataLoader(
|
||||
dataset=ListDataset[str](images),
|
||||
batch_size=1,
|
||||
shuffle=False,
|
||||
collate_fn=lambda x: processor.process_images(x),
|
||||
)
|
||||
|
||||
ds: list[torch.Tensor] = []
|
||||
for batch_doc in tqdm(dataloader):
|
||||
with torch.no_grad():
|
||||
batch_doc = {k: v.to(model.device) for k, v in batch_doc.items()}
|
||||
embeddings_doc = model(**batch_doc)
|
||||
ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
|
||||
|
||||
print(ds[0].shape)
|
||||
|
||||
# %%
|
||||
# Build HNSW index via LeannRetriever primitives and run search
|
||||
index_path = "./indexes/colpali.leann"
|
||||
retriever = LeannRetriever(index_path=index_path, dim=int(ds[0].shape[-1]))
|
||||
retriever.create_collection()
|
||||
filepaths = [os.path.join("./pages", name) for name in page_filenames]
|
||||
for i in range(len(filepaths)):
|
||||
data = {
|
||||
"colbert_vecs": ds[i].float().numpy(),
|
||||
"doc_id": i,
|
||||
"filepath": filepaths[i],
|
||||
}
|
||||
retriever.insert(data)
|
||||
retriever.create_index()
|
||||
for query in qs:
|
||||
query_np = query.float().numpy()
|
||||
result = retriever.search(query_np, topk=1)
|
||||
print(filepaths[result[0][1]])
|
||||
@@ -1,380 +0,0 @@
|
||||
"""
|
||||
Dynamic add example for LEANN using HNSW backend without recompute.
|
||||
|
||||
- Builds a base index from a directory of documents
|
||||
- Incrementally adds new documents without recomputing stored embeddings
|
||||
|
||||
Defaults:
|
||||
- Base data: /Users/yichuan/Desktop/code/LEANN/leann/data
|
||||
- Incremental data: /Users/yichuan/Desktop/code/LEANN/leann/test_add
|
||||
- Index path: <index_dir>/documents.leann
|
||||
|
||||
Usage examples:
|
||||
uv run python examples/dynamic_add_leann_no_recompute.py --build-base \
|
||||
--base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \
|
||||
--index-dir ./test_doc_files
|
||||
|
||||
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
|
||||
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
|
||||
--index-dir ./test_doc_files
|
||||
|
||||
Quick recompute test (both true):
|
||||
# Recompute build
|
||||
uv run python examples/dynamic_add_leann_no_recompute.py --build-base \
|
||||
--recompute-build --ef-construction 200 \
|
||||
--base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \
|
||||
--index-dir ./test_doc_files --index-name documents.leann
|
||||
|
||||
# Recompute add
|
||||
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
|
||||
--recompute-add --ef-construction 32 \
|
||||
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
|
||||
--index-dir ./test_doc_files --index-name documents.leann
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import pickle
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
# Ensure we can import from the local packages and apps folders
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
CORE_SRC = ROOT / "packages" / "leann-core" / "src"
|
||||
HNSW_PKG_DIR = ROOT / "packages" / "leann-backend-hnsw"
|
||||
APPS_DIR = ROOT / "apps"
|
||||
|
||||
|
||||
# Prefer the installed backend if available (it contains the compiled extension)
|
||||
def _prefer_installed(pkg_name: str) -> bool:
|
||||
try:
|
||||
import importlib
|
||||
import importlib.util
|
||||
|
||||
spec = importlib.util.find_spec(pkg_name)
|
||||
if spec and spec.origin and "site-packages" in spec.origin:
|
||||
# ensure the faiss shim/extension is importable from the installed package
|
||||
importlib.import_module(f"{pkg_name}.faiss")
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
# Prepend paths, but only add the repo backend if the installed one is not present
|
||||
paths_to_prepend = [CORE_SRC, APPS_DIR]
|
||||
if not _prefer_installed("leann_backend_hnsw"):
|
||||
paths_to_prepend.insert(1, HNSW_PKG_DIR)
|
||||
|
||||
for p in paths_to_prepend:
|
||||
p_str = str(p)
|
||||
if p_str not in sys.path:
|
||||
sys.path.insert(0, p_str)
|
||||
|
||||
# Defer non-stdlib imports until after sys.path setup within functions (avoid E402)
|
||||
|
||||
|
||||
def _load_documents(data_dir: str, required_exts: Optional[list[str]] = None) -> list[Any]:
|
||||
from llama_index.core import SimpleDirectoryReader # type: ignore
|
||||
|
||||
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
|
||||
if required_exts:
|
||||
reader_kwargs["required_exts"] = required_exts
|
||||
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
|
||||
return documents
|
||||
|
||||
|
||||
def _ensure_index_dir(index_dir: Path) -> None:
|
||||
index_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def _index_files(index_path: Path) -> tuple[Path, Path, Path]:
|
||||
"""Return (passages.jsonl, passages.idx, index.index) paths for a given index base path.
|
||||
|
||||
Note: HNSWBackend writes the FAISS index using the stem (without .leann),
|
||||
i.e., for base 'documents.leann' the file is 'documents.index'. We prefer the
|
||||
existing file among candidates.
|
||||
"""
|
||||
passages_file = index_path.parent / f"{index_path.name}.passages.jsonl"
|
||||
offsets_file = index_path.parent / f"{index_path.name}.passages.idx"
|
||||
candidate_name_index = index_path.parent / f"{index_path.name}.index"
|
||||
candidate_stem_index = index_path.parent / f"{index_path.stem}.index"
|
||||
index_file = candidate_stem_index if candidate_stem_index.exists() else candidate_name_index
|
||||
return passages_file, offsets_file, index_file
|
||||
|
||||
|
||||
def _read_meta(index_path: Path) -> dict[str, Any]:
|
||||
meta_path = index_path.parent / f"{index_path.name}.meta.json"
|
||||
if not meta_path.exists():
|
||||
raise FileNotFoundError(f"Metadata file not found: {meta_path}")
|
||||
with open(meta_path, encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def _autodetect_index_base(index_dir: Path) -> Optional[Path]:
|
||||
"""If exactly one *.leann.meta.json exists, return its base path (without .meta.json)."""
|
||||
candidates = list(index_dir.glob("*.leann.meta.json"))
|
||||
if len(candidates) == 1:
|
||||
meta = candidates[0]
|
||||
base = meta.with_suffix("") # remove .json
|
||||
base = base.with_suffix("") # remove .meta
|
||||
return base
|
||||
return None
|
||||
|
||||
|
||||
def _load_offset_map(offsets_file: Path) -> dict[str, int]:
|
||||
if not offsets_file.exists():
|
||||
return {}
|
||||
with open(offsets_file, "rb") as f:
|
||||
return pickle.load(f)
|
||||
|
||||
|
||||
def _next_numeric_id(existing_ids: list[str]) -> int:
|
||||
numeric_ids = [int(x) for x in existing_ids if x.isdigit()]
|
||||
if not numeric_ids:
|
||||
return 0
|
||||
return max(numeric_ids) + 1
|
||||
|
||||
|
||||
def build_base_index(
|
||||
base_dir: str,
|
||||
index_dir: str,
|
||||
index_name: str,
|
||||
embedding_model: str,
|
||||
embedding_mode: str,
|
||||
chunk_size: int,
|
||||
chunk_overlap: int,
|
||||
file_types: Optional[list[str]] = None,
|
||||
max_items: int = -1,
|
||||
ef_construction: Optional[int] = None,
|
||||
recompute_build: bool = False,
|
||||
) -> str:
|
||||
print(f"Building base index from: {base_dir}")
|
||||
documents = _load_documents(base_dir, required_exts=file_types)
|
||||
if not documents:
|
||||
raise ValueError(f"No documents found in base_dir: {base_dir}")
|
||||
|
||||
from chunking import create_text_chunks
|
||||
|
||||
texts = create_text_chunks(
|
||||
documents,
|
||||
chunk_size=chunk_size,
|
||||
chunk_overlap=chunk_overlap,
|
||||
use_ast_chunking=False,
|
||||
)
|
||||
if max_items > 0 and len(texts) > max_items:
|
||||
texts = texts[:max_items]
|
||||
print(f"Limiting to {max_items} chunks")
|
||||
|
||||
index_dir_path = Path(index_dir)
|
||||
_ensure_index_dir(index_dir_path)
|
||||
index_path = index_dir_path / index_name
|
||||
|
||||
print("Creating HNSW index (non-compact)...")
|
||||
from leann.api import LeannBuilder
|
||||
from leann.registry import register_project_directory
|
||||
|
||||
builder = LeannBuilder(
|
||||
backend_name="hnsw",
|
||||
embedding_model=embedding_model,
|
||||
embedding_mode=embedding_mode,
|
||||
is_recompute=recompute_build,
|
||||
is_compact=False,
|
||||
efConstruction=(ef_construction if ef_construction is not None else 200),
|
||||
)
|
||||
for t in texts:
|
||||
builder.add_text(t)
|
||||
builder.build_index(str(index_path))
|
||||
|
||||
# Register for discovery
|
||||
register_project_directory(Path.cwd())
|
||||
|
||||
print(f"Base index built at: {index_path}")
|
||||
return str(index_path)
|
||||
|
||||
|
||||
def add_incremental(
|
||||
add_dir: str,
|
||||
index_dir: str,
|
||||
index_name: Optional[str] = None,
|
||||
embedding_model: Optional[str] = None,
|
||||
embedding_mode: Optional[str] = None,
|
||||
chunk_size: int = 256,
|
||||
chunk_overlap: int = 128,
|
||||
file_types: Optional[list[str]] = None,
|
||||
max_items: int = -1,
|
||||
ef_construction: Optional[int] = None,
|
||||
recompute_add: bool = False,
|
||||
) -> str:
|
||||
print(f"Adding incremental data from: {add_dir}")
|
||||
index_dir_path = Path(index_dir)
|
||||
index_path = index_dir_path / (index_name or "documents.leann")
|
||||
|
||||
# If specified base doesn't exist, try to auto-detect an existing base
|
||||
try:
|
||||
_read_meta(index_path)
|
||||
except FileNotFoundError:
|
||||
auto_base = _autodetect_index_base(index_dir_path)
|
||||
if auto_base is not None:
|
||||
print(f"Auto-detected index base: {auto_base.name}")
|
||||
index_path = auto_base
|
||||
_read_meta(index_path)
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"No index metadata found for base '{index_path.name}'. Build base first with --build-base "
|
||||
f"or provide --index-name to match an existing index (e.g., 'test_doc_files.leann')."
|
||||
)
|
||||
|
||||
# Prepare validated context from core (checks backend/no-recompute and resolves embedding defaults)
|
||||
from leann.api import create_incremental_add_context, incremental_add_texts_with_context
|
||||
|
||||
ctx = create_incremental_add_context(
|
||||
str(index_path),
|
||||
embedding_model=embedding_model,
|
||||
embedding_mode=embedding_mode,
|
||||
data_dir=add_dir,
|
||||
required_exts=file_types,
|
||||
chunk_size=chunk_size,
|
||||
chunk_overlap=chunk_overlap,
|
||||
max_items=max_items,
|
||||
)
|
||||
|
||||
# Use prepared texts from context to perform the add
|
||||
prepared_texts = ctx.prepared_texts or []
|
||||
if not prepared_texts:
|
||||
print("No new chunks to add.")
|
||||
return str(index_path)
|
||||
|
||||
added = incremental_add_texts_with_context(
|
||||
ctx,
|
||||
prepared_texts,
|
||||
ef_construction=ef_construction,
|
||||
recompute=recompute_add,
|
||||
)
|
||||
|
||||
print(f"Incremental add completed. Added {added} chunks. Index: {index_path}")
|
||||
return str(index_path)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Dynamic add to LEANN HNSW index without recompute",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
|
||||
parser.add_argument("--build-base", action="store_true", help="Build base index")
|
||||
parser.add_argument("--add-incremental", action="store_true", help="Add incremental data")
|
||||
|
||||
parser.add_argument(
|
||||
"--base-dir",
|
||||
type=str,
|
||||
default="/Users/yichuan/Desktop/code/LEANN/leann/data",
|
||||
help="Base data directory",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--add-dir",
|
||||
type=str,
|
||||
default="/Users/yichuan/Desktop/code/LEANN/leann/test_add",
|
||||
help="Incremental data directory",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--index-dir",
|
||||
type=str,
|
||||
default="./test_doc_files",
|
||||
help="Directory containing the index",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--index-name",
|
||||
type=str,
|
||||
default="documents.leann",
|
||||
help=(
|
||||
"Index base file name. If you built via document_rag.py, use 'test_doc_files.leann'. "
|
||||
"Default: documents.leann"
|
||||
),
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--embedding-model",
|
||||
type=str,
|
||||
default="facebook/contriever",
|
||||
help="Embedding model name",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--embedding-mode",
|
||||
type=str,
|
||||
default="sentence-transformers",
|
||||
choices=["sentence-transformers", "openai", "mlx", "ollama"],
|
||||
help="Embedding backend mode",
|
||||
)
|
||||
|
||||
parser.add_argument("--chunk-size", type=int, default=256)
|
||||
parser.add_argument("--chunk-overlap", type=int, default=128)
|
||||
parser.add_argument("--file-types", nargs="+", default=None)
|
||||
parser.add_argument("--max-items", type=int, default=-1)
|
||||
parser.add_argument("--ef-construction", type=int, default=32)
|
||||
parser.add_argument(
|
||||
"--recompute-add", action="store_true", help="Enable recompute-mode add (non-compact only)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--recompute-build",
|
||||
action="store_true",
|
||||
help="Enable recompute-mode base build (non-compact only)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.build_base and not args.add_incremental:
|
||||
print("Nothing to do. Use --build-base and/or --add-incremental.")
|
||||
return
|
||||
|
||||
index_path_str: Optional[str] = None
|
||||
|
||||
if args.build_base:
|
||||
index_path_str = build_base_index(
|
||||
base_dir=args.base_dir,
|
||||
index_dir=args.index_dir,
|
||||
index_name=args.index_name,
|
||||
embedding_model=args.embedding_model,
|
||||
embedding_mode=args.embedding_mode,
|
||||
chunk_size=args.chunk_size,
|
||||
chunk_overlap=args.chunk_overlap,
|
||||
file_types=args.file_types,
|
||||
max_items=args.max_items,
|
||||
ef_construction=args.ef_construction,
|
||||
recompute_build=args.recompute_build,
|
||||
)
|
||||
|
||||
if args.add_incremental:
|
||||
index_path_str = add_incremental(
|
||||
add_dir=args.add_dir,
|
||||
index_dir=args.index_dir,
|
||||
index_name=args.index_name,
|
||||
embedding_model=args.embedding_model,
|
||||
embedding_mode=args.embedding_mode,
|
||||
chunk_size=args.chunk_size,
|
||||
chunk_overlap=args.chunk_overlap,
|
||||
file_types=args.file_types,
|
||||
max_items=args.max_items,
|
||||
ef_construction=args.ef_construction,
|
||||
recompute_add=args.recompute_add,
|
||||
)
|
||||
|
||||
# Optional: quick test query using searcher
|
||||
if index_path_str:
|
||||
try:
|
||||
from leann.api import LeannSearcher
|
||||
|
||||
searcher = LeannSearcher(index_path_str)
|
||||
query = "what is LEANN?"
|
||||
if args.add_incremental:
|
||||
query = "what is the multi vector search and how it works?"
|
||||
results = searcher.search(query, top_k=5)
|
||||
if results:
|
||||
print(f"Sample result: {results[0].text[:80]}...")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,5 +1,5 @@
|
||||
[build-system]
|
||||
requires = ["scikit-build-core>=0.10", "pybind11>=2.12.0", "numpy"]
|
||||
requires = ["scikit-build-core>=0.10", "pybind11>=2.12.0", "numpy", "cmake>=3.30"]
|
||||
build-backend = "scikit_build_core.build"
|
||||
|
||||
[project]
|
||||
|
||||
@@ -15,7 +15,6 @@ from leann.registry import register_backend
|
||||
from leann.searcher_base import BaseSearcher
|
||||
|
||||
from .convert_to_csr import convert_hnsw_graph_to_csr
|
||||
from .prune_index import prune_embeddings_preserve_graph_inplace
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -91,16 +90,8 @@ class HNSWBuilder(LeannBackendBuilderInterface):
|
||||
index_file = index_dir / f"{index_prefix}.index"
|
||||
faiss.write_index(index, str(index_file))
|
||||
|
||||
if self.is_recompute:
|
||||
if self.is_compact:
|
||||
self._convert_to_csr(index_file)
|
||||
else:
|
||||
# Non-compact format: prune only embeddings, keep original graph
|
||||
ok = prune_embeddings_preserve_graph_inplace(str(index_file))
|
||||
if not ok:
|
||||
raise RuntimeError(
|
||||
"Pruning embeddings while preserving graph failed for non-compact index"
|
||||
)
|
||||
if self.is_compact:
|
||||
self._convert_to_csr(index_file)
|
||||
|
||||
def _convert_to_csr(self, index_file: Path):
|
||||
"""Convert built index to CSR format"""
|
||||
@@ -157,13 +148,7 @@ class HNSWSearcher(BaseSearcher):
|
||||
self.is_pruned
|
||||
) # In C++ code, it's called is_recompute, but it's only for loading IIUC.
|
||||
|
||||
# If pruned (recompute mode), explicitly skip storage to avoid reading
|
||||
# the pruned section. Still allow MMAP for graph.
|
||||
io_flags = faiss.IO_FLAG_MMAP
|
||||
if self.is_pruned:
|
||||
io_flags |= faiss.IO_FLAG_SKIP_STORAGE
|
||||
|
||||
self._index = faiss.read_index(str(index_file), io_flags, hnsw_config)
|
||||
self._index = faiss.read_index(str(index_file), faiss.IO_FLAG_MMAP, hnsw_config)
|
||||
|
||||
def search(
|
||||
self,
|
||||
@@ -266,55 +251,3 @@ class HNSWSearcher(BaseSearcher):
|
||||
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
|
||||
|
||||
return {"labels": string_labels, "distances": distances}
|
||||
|
||||
|
||||
# ---------- Helper API for incremental add (Python-level) ----------
|
||||
def add_vectors(
|
||||
index_file_path: str,
|
||||
embeddings: np.ndarray,
|
||||
*,
|
||||
ef_construction: Optional[int] = None,
|
||||
recompute: bool = False,
|
||||
) -> None:
|
||||
"""Append vectors to an existing non-compact HNSW index.
|
||||
|
||||
Args:
|
||||
index_file_path: Path to the HNSW .index file
|
||||
embeddings: float32 numpy array (N, D)
|
||||
ef_construction: Optional override for efConstruction during insertion
|
||||
recompute: Reserved for future use to control insertion-time recompute behaviors
|
||||
"""
|
||||
from . import faiss # type: ignore
|
||||
|
||||
if embeddings.dtype != np.float32:
|
||||
embeddings = embeddings.astype(np.float32)
|
||||
if not embeddings.flags.c_contiguous:
|
||||
embeddings = np.ascontiguousarray(embeddings, dtype=np.float32)
|
||||
|
||||
# Load index normally to ensure storage is present; toggle is_recompute on the object
|
||||
index = faiss.read_index(str(index_file_path), faiss.IO_FLAG_MMAP)
|
||||
|
||||
# Best-effort: explicitly set flag on the object if the binding exposes it
|
||||
try:
|
||||
index.is_recompute = bool(recompute)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if ef_construction is not None:
|
||||
index.hnsw.efConstruction = int(ef_construction)
|
||||
except Exception:
|
||||
# Best-effort; ignore if backend doesn't expose setter
|
||||
pass
|
||||
|
||||
# For non-compact HNSW, calling add directly is sufficient. When is_recompute is set
|
||||
# (via config or attribute), FAISS will run the insertion/search path accordingly.
|
||||
# To strictly follow per-point insert semantics in recompute mode, add one-by-one.
|
||||
if recompute:
|
||||
# Insert row by row
|
||||
n = embeddings.shape[0]
|
||||
for i in range(n):
|
||||
row = embeddings[i : i + 1]
|
||||
index.add(1, faiss.swig_ptr(row))
|
||||
else:
|
||||
index.add(embeddings.shape[0], faiss.swig_ptr(embeddings))
|
||||
faiss.write_index(index, str(index_file_path))
|
||||
|
||||
@@ -1,149 +0,0 @@
|
||||
import os
|
||||
import struct
|
||||
from pathlib import Path
|
||||
|
||||
from .convert_to_csr import (
|
||||
EXPECTED_HNSW_FOURCCS,
|
||||
NULL_INDEX_FOURCC,
|
||||
read_struct,
|
||||
read_vector_raw,
|
||||
)
|
||||
|
||||
|
||||
def _write_vector_raw(f_out, count: int, data_bytes: bytes) -> None:
|
||||
"""Write a vector in the same binary layout as read_vector_raw reads: <Q count> + raw bytes."""
|
||||
f_out.write(struct.pack("<Q", count))
|
||||
if count > 0 and data_bytes:
|
||||
f_out.write(data_bytes)
|
||||
|
||||
|
||||
def prune_embeddings_preserve_graph(input_filename: str, output_filename: str) -> bool:
|
||||
"""
|
||||
Copy an original (non-compact) HNSW index file while pruning the trailing embedding storage.
|
||||
Preserves the graph structure and metadata exactly; only writes a NULL storage marker instead of
|
||||
the original storage fourcc and payload.
|
||||
|
||||
Returns True on success.
|
||||
"""
|
||||
print(f"Pruning embeddings from {input_filename} to {output_filename}")
|
||||
print("--------------------------------")
|
||||
# running in mode is-recompute=True and is-compact=False
|
||||
in_path = Path(input_filename)
|
||||
out_path = Path(output_filename)
|
||||
|
||||
try:
|
||||
with open(in_path, "rb") as f_in, open(out_path, "wb") as f_out:
|
||||
# Header
|
||||
index_fourcc = read_struct(f_in, "<I")
|
||||
if index_fourcc not in EXPECTED_HNSW_FOURCCS:
|
||||
# Still proceed, but this is unexpected
|
||||
pass
|
||||
f_out.write(struct.pack("<I", index_fourcc))
|
||||
|
||||
d = read_struct(f_in, "<i")
|
||||
ntotal_hdr = read_struct(f_in, "<q")
|
||||
dummy1 = read_struct(f_in, "<q")
|
||||
dummy2 = read_struct(f_in, "<q")
|
||||
is_trained = read_struct(f_in, "?")
|
||||
metric_type = read_struct(f_in, "<i")
|
||||
f_out.write(struct.pack("<i", d))
|
||||
f_out.write(struct.pack("<q", ntotal_hdr))
|
||||
f_out.write(struct.pack("<q", dummy1))
|
||||
f_out.write(struct.pack("<q", dummy2))
|
||||
f_out.write(struct.pack("<?", is_trained))
|
||||
f_out.write(struct.pack("<i", metric_type))
|
||||
|
||||
if metric_type > 1:
|
||||
metric_arg = read_struct(f_in, "<f")
|
||||
f_out.write(struct.pack("<f", metric_arg))
|
||||
|
||||
# Vectors: assign_probas (double), cum_nneighbor_per_level (int32), levels (int32)
|
||||
cnt, data = read_vector_raw(f_in, "d")
|
||||
_write_vector_raw(f_out, cnt, data)
|
||||
|
||||
cnt, data = read_vector_raw(f_in, "i")
|
||||
_write_vector_raw(f_out, cnt, data)
|
||||
|
||||
cnt, data = read_vector_raw(f_in, "i")
|
||||
_write_vector_raw(f_out, cnt, data)
|
||||
|
||||
# Probe potential extra alignment/flag byte present in some original formats
|
||||
probe = f_in.read(1)
|
||||
if probe:
|
||||
if probe == b"\x00":
|
||||
# Preserve this unexpected 0x00 byte
|
||||
f_out.write(probe)
|
||||
else:
|
||||
# Likely part of the next vector; rewind
|
||||
f_in.seek(-1, os.SEEK_CUR)
|
||||
|
||||
# Offsets (uint64) and neighbors (int32)
|
||||
cnt, data = read_vector_raw(f_in, "Q")
|
||||
_write_vector_raw(f_out, cnt, data)
|
||||
|
||||
cnt, data = read_vector_raw(f_in, "i")
|
||||
_write_vector_raw(f_out, cnt, data)
|
||||
|
||||
# Scalar params
|
||||
entry_point = read_struct(f_in, "<i")
|
||||
max_level = read_struct(f_in, "<i")
|
||||
ef_construction = read_struct(f_in, "<i")
|
||||
ef_search = read_struct(f_in, "<i")
|
||||
dummy_upper_beam = read_struct(f_in, "<i")
|
||||
f_out.write(struct.pack("<i", entry_point))
|
||||
f_out.write(struct.pack("<i", max_level))
|
||||
f_out.write(struct.pack("<i", ef_construction))
|
||||
f_out.write(struct.pack("<i", ef_search))
|
||||
f_out.write(struct.pack("<i", dummy_upper_beam))
|
||||
|
||||
# Storage fourcc (if present) — write NULL marker and drop any remaining data
|
||||
try:
|
||||
read_struct(f_in, "<I")
|
||||
# Regardless of original, write NULL
|
||||
f_out.write(struct.pack("<I", NULL_INDEX_FOURCC))
|
||||
# Discard the rest of the file (embedding payload)
|
||||
# (Do not copy anything else)
|
||||
except EOFError:
|
||||
# No storage section; nothing else to write
|
||||
pass
|
||||
|
||||
return True
|
||||
except Exception:
|
||||
# Best-effort cleanup
|
||||
try:
|
||||
if out_path.exists():
|
||||
out_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def prune_embeddings_preserve_graph_inplace(index_file_path: str) -> bool:
|
||||
"""
|
||||
Convenience wrapper: write pruned file to a temporary path next to the
|
||||
original, then atomically replace on success.
|
||||
"""
|
||||
print(f"Pruning embeddings from {index_file_path} to {index_file_path}")
|
||||
print("--------------------------------")
|
||||
# running in mode is-recompute=True and is-compact=False
|
||||
src = Path(index_file_path)
|
||||
tmp = src.with_suffix(".pruned.tmp")
|
||||
ok = prune_embeddings_preserve_graph(str(src), str(tmp))
|
||||
if not ok:
|
||||
if tmp.exists():
|
||||
try:
|
||||
tmp.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return False
|
||||
try:
|
||||
os.replace(str(tmp), str(src))
|
||||
except Exception:
|
||||
# Rollback on failure
|
||||
try:
|
||||
if tmp.exists():
|
||||
tmp.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return False
|
||||
return True
|
||||
Submodule packages/leann-backend-hnsw/third_party/faiss updated: ea86d06ceb...ed96ff7dba
@@ -5,7 +5,6 @@ with the correct, original embedding logic from the user's reference code.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import re
|
||||
import subprocess
|
||||
@@ -20,7 +19,6 @@ import numpy as np
|
||||
from leann.interface import LeannBackendSearcherInterface
|
||||
|
||||
from .chat import get_llm
|
||||
from .embedding_server_manager import EmbeddingServerManager
|
||||
from .interface import LeannBackendFactoryInterface
|
||||
from .metadata_filter import MetadataFilterEngine
|
||||
from .registry import BACKEND_REGISTRY
|
||||
@@ -120,20 +118,6 @@ class SearchResult:
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IncrementalAddContext:
|
||||
"""Prepared context for safe incremental add operations on an index."""
|
||||
|
||||
index_path: str
|
||||
passages_file: Path
|
||||
offsets_file: Path
|
||||
vector_index_file: Path
|
||||
embedding_model: str
|
||||
embedding_mode: str
|
||||
distance_metric: str
|
||||
prepared_texts: Optional[list[str]] = None
|
||||
|
||||
|
||||
class PassageManager:
|
||||
def __init__(
|
||||
self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None
|
||||
@@ -492,7 +476,9 @@ class LeannBuilder:
|
||||
is_compact = self.backend_kwargs.get("is_compact", True)
|
||||
is_recompute = self.backend_kwargs.get("is_recompute", True)
|
||||
meta_data["is_compact"] = is_compact
|
||||
meta_data["is_pruned"] = is_recompute # Pruned only if compact and recompute
|
||||
meta_data["is_pruned"] = (
|
||||
is_compact and is_recompute
|
||||
) # Pruned only if compact and recompute
|
||||
with open(leann_meta_path, "w", encoding="utf-8") as f:
|
||||
json.dump(meta_data, f, indent=2)
|
||||
|
||||
@@ -1032,405 +1018,8 @@ class LeannChat:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ------------------------------
|
||||
# Incremental Add Utilities (HNSW no-recompute only)
|
||||
# ------------------------------
|
||||
|
||||
|
||||
def _resolve_index_paths(index_path: str) -> tuple[Path, Path, Path]:
|
||||
"""Given base index path (without extension), return (passages.jsonl, passages.idx, vector.index).
|
||||
|
||||
For HNSW, vector index file is typically <stem>.index (e.g., documents.index) even when base is
|
||||
'documents.leann'. We prefer an existing <stem>.index, otherwise fall back to <name>.index.
|
||||
"""
|
||||
base = Path(index_path)
|
||||
passages_file = base.parent / f"{base.name}.passages.jsonl"
|
||||
offsets_file = base.parent / f"{base.name}.passages.idx"
|
||||
candidate_name_index = base.parent / f"{base.name}.index"
|
||||
candidate_stem_index = base.parent / f"{base.stem}.index"
|
||||
vector_index_file = (
|
||||
candidate_stem_index if candidate_stem_index.exists() else candidate_name_index
|
||||
)
|
||||
return passages_file, offsets_file, vector_index_file
|
||||
|
||||
|
||||
def _read_meta_file(index_path: str) -> dict[str, Any]:
|
||||
meta_path = Path(f"{index_path}.meta.json")
|
||||
if not meta_path.exists():
|
||||
raise FileNotFoundError(f"Leann metadata file not found: {meta_path}")
|
||||
with open(meta_path, encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def _load_offset_map_pickle(offsets_file: Path) -> dict[str, int]:
|
||||
if not offsets_file.exists():
|
||||
return {}
|
||||
with open(offsets_file, "rb") as f:
|
||||
return pickle.load(f)
|
||||
|
||||
|
||||
def _append_passages_and_update_offsets(
|
||||
passages_file: Path, offsets_file: Path, new_texts: list[str]
|
||||
) -> list[str]:
|
||||
"""Append new texts to passages file, update offset map, and return assigned string IDs.
|
||||
|
||||
IDs are assigned as incrementing integers based on existing keys in the offset map.
|
||||
"""
|
||||
offset_map = _load_offset_map_pickle(offsets_file)
|
||||
# Compute next numeric id
|
||||
numeric_ids = [int(x) for x in offset_map.keys() if str(x).isdigit()]
|
||||
next_id_num = (max(numeric_ids) + 1) if numeric_ids else 0
|
||||
assigned_ids: list[str] = []
|
||||
|
||||
with open(passages_file, "a", encoding="utf-8") as f:
|
||||
for text in new_texts:
|
||||
offset = f.tell()
|
||||
str_id = str(next_id_num)
|
||||
json.dump({"id": str_id, "text": text, "metadata": {}}, f, ensure_ascii=False)
|
||||
f.write("\n")
|
||||
offset_map[str_id] = offset
|
||||
assigned_ids.append(str_id)
|
||||
next_id_num += 1
|
||||
|
||||
with open(offsets_file, "wb") as f:
|
||||
pickle.dump(offset_map, f)
|
||||
|
||||
return assigned_ids
|
||||
|
||||
|
||||
def incremental_add_texts(
|
||||
index_path: str,
|
||||
texts: list[str],
|
||||
*,
|
||||
embedding_model: Optional[str] = None,
|
||||
embedding_mode: Optional[str] = None,
|
||||
ef_construction: Optional[int] = None,
|
||||
recompute: bool = False,
|
||||
) -> int:
|
||||
"""Incrementally add text chunks to an existing HNSW index built with no-recompute.
|
||||
|
||||
- Validates backend is HNSW and index is non-compact (no-recompute path)
|
||||
- Appends passages and offsets
|
||||
- Computes embeddings and appends to the HNSW vector index
|
||||
|
||||
Returns number of added chunks.
|
||||
"""
|
||||
if not texts:
|
||||
return 0
|
||||
|
||||
meta = _read_meta_file(index_path)
|
||||
if meta.get("backend_name") != "hnsw":
|
||||
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
|
||||
if meta.get("is_compact", True):
|
||||
raise RuntimeError(
|
||||
"Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add."
|
||||
)
|
||||
|
||||
passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path)
|
||||
if not vector_index_file.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder."
|
||||
)
|
||||
|
||||
# Resolve embedding config from meta if not provided
|
||||
model_name = embedding_model or meta.get("embedding_model", "facebook/contriever")
|
||||
mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers")
|
||||
|
||||
# Append passages and update offsets
|
||||
assigned_ids = _append_passages_and_update_offsets(passages_file, offsets_file, texts)
|
||||
|
||||
# Compute embeddings
|
||||
# Embedding computation path
|
||||
esm = None
|
||||
port = None
|
||||
if recompute:
|
||||
# Determine distance metric early for server config
|
||||
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
|
||||
# Start embedding server and compute via ZMQ for consistency with recompute semantics
|
||||
passages_source_file = f"{index_path}.meta.json"
|
||||
esm = EmbeddingServerManager(
|
||||
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
|
||||
)
|
||||
started, port = esm.start_server(
|
||||
port=5557,
|
||||
model_name=model_name,
|
||||
embedding_mode=mode_name,
|
||||
passages_file=passages_source_file,
|
||||
distance_metric=distance_metric,
|
||||
enable_warmup=False,
|
||||
)
|
||||
if not started:
|
||||
raise RuntimeError("Failed to start embedding server for recompute add")
|
||||
embeddings = compute_embeddings_via_server(texts, model_name, port)
|
||||
else:
|
||||
embeddings = compute_embeddings(
|
||||
texts,
|
||||
model_name=model_name,
|
||||
mode=mode_name,
|
||||
use_server=False,
|
||||
is_build=True,
|
||||
)
|
||||
|
||||
# Normalize for cosine if needed
|
||||
if "distance_metric" not in locals():
|
||||
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
|
||||
if distance_metric == "cosine":
|
||||
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
|
||||
norms[norms == 0] = 1
|
||||
embeddings = embeddings / norms
|
||||
|
||||
# Append via backend helper (supports ef_construction/recompute plumbing)
|
||||
try:
|
||||
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
|
||||
) from e
|
||||
|
||||
# Propagate ZMQ port to FAISS add path when recompute is True
|
||||
if recompute and port is not None:
|
||||
os.environ["LEANN_ZMQ_PORT"] = str(port)
|
||||
|
||||
hnsw_add_vectors(
|
||||
str(vector_index_file),
|
||||
embeddings,
|
||||
ef_construction=ef_construction,
|
||||
recompute=recompute,
|
||||
)
|
||||
|
||||
# Stop server after add when recompute path used
|
||||
if esm is not None:
|
||||
def __del__(self):
|
||||
try:
|
||||
esm.stop_server()
|
||||
self.cleanup()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Sanity: ids length should match embeddings rows
|
||||
if len(assigned_ids) != embeddings.shape[0]:
|
||||
warnings.warn(
|
||||
f"Assigned {len(assigned_ids)} IDs but computed {embeddings.shape[0]} embeddings.",
|
||||
UserWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
return len(assigned_ids)
|
||||
|
||||
|
||||
def create_incremental_add_context(
|
||||
index_path: str,
|
||||
*,
|
||||
# Optional embedding choices; if None will use meta
|
||||
embedding_model: Optional[str] = None,
|
||||
embedding_mode: Optional[str] = None,
|
||||
# Optional data-to-text preparation in context
|
||||
data_dir: Optional[str] = None,
|
||||
required_exts: Optional[list[str]] = None,
|
||||
chunk_size: int = 256,
|
||||
chunk_overlap: int = 128,
|
||||
max_items: int = -1,
|
||||
) -> IncrementalAddContext:
|
||||
"""Validate index and prepare context for repeated incremental adds.
|
||||
|
||||
Additionally, if data_dir is provided, this function will load documents,
|
||||
chunk them to texts with the specified parameters, and store them in ctx.prepared_texts.
|
||||
"""
|
||||
meta = _read_meta_file(index_path)
|
||||
if meta.get("backend_name") != "hnsw":
|
||||
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
|
||||
if meta.get("is_compact", True):
|
||||
raise RuntimeError(
|
||||
"Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add."
|
||||
)
|
||||
|
||||
passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path)
|
||||
if not vector_index_file.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder."
|
||||
)
|
||||
|
||||
model_name = embedding_model or meta.get("embedding_model", "facebook/contriever")
|
||||
mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers")
|
||||
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
|
||||
|
||||
prepared_texts: Optional[list[str]] = None
|
||||
if data_dir is not None:
|
||||
try:
|
||||
from llama_index.core import SimpleDirectoryReader # type: ignore
|
||||
from llama_index.core.node_parser import SentenceSplitter # type: ignore
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"llama-index-core is required when using data_dir in create_incremental_add_context"
|
||||
) from e
|
||||
|
||||
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
|
||||
if required_exts:
|
||||
reader_kwargs["required_exts"] = required_exts
|
||||
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
|
||||
if documents:
|
||||
splitter = SentenceSplitter(
|
||||
chunk_size=chunk_size,
|
||||
chunk_overlap=chunk_overlap,
|
||||
separator=" ",
|
||||
paragraph_separator="\n\n",
|
||||
)
|
||||
prepared_texts = []
|
||||
for doc in documents:
|
||||
try:
|
||||
nodes = splitter.get_nodes_from_documents([doc])
|
||||
if nodes:
|
||||
prepared_texts.extend([node.get_content() for node in nodes])
|
||||
except Exception:
|
||||
content = doc.get_content()
|
||||
if content and content.strip():
|
||||
prepared_texts.append(content.strip())
|
||||
if max_items > 0 and len(prepared_texts) > max_items:
|
||||
prepared_texts = prepared_texts[:max_items]
|
||||
|
||||
return IncrementalAddContext(
|
||||
index_path=index_path,
|
||||
passages_file=passages_file,
|
||||
offsets_file=offsets_file,
|
||||
vector_index_file=vector_index_file,
|
||||
embedding_model=model_name,
|
||||
embedding_mode=mode_name,
|
||||
distance_metric=distance_metric,
|
||||
prepared_texts=prepared_texts,
|
||||
)
|
||||
|
||||
|
||||
def incremental_add_texts_with_context(
|
||||
ctx: IncrementalAddContext,
|
||||
texts: list[str],
|
||||
*,
|
||||
ef_construction: Optional[int] = None,
|
||||
recompute: bool = False,
|
||||
) -> int:
|
||||
"""Incrementally add texts using a prepared context (no repeated validation).
|
||||
|
||||
For non-compact HNSW, ef_construction (efConstruction) can be overridden during insertion.
|
||||
"""
|
||||
if not texts:
|
||||
return 0
|
||||
|
||||
# Append passages & offsets
|
||||
_append_passages_and_update_offsets(ctx.passages_file, ctx.offsets_file, texts)
|
||||
|
||||
# Compute embeddings
|
||||
# Embedding computation path
|
||||
esm = None
|
||||
port = None
|
||||
if recompute:
|
||||
passages_source_file = f"{ctx.index_path}.meta.json"
|
||||
esm = EmbeddingServerManager(
|
||||
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
|
||||
)
|
||||
started, port = esm.start_server(
|
||||
port=5557,
|
||||
model_name=ctx.embedding_model,
|
||||
embedding_mode=ctx.embedding_mode,
|
||||
passages_file=passages_source_file,
|
||||
distance_metric=ctx.distance_metric,
|
||||
enable_warmup=False,
|
||||
)
|
||||
if not started:
|
||||
raise RuntimeError("Failed to start embedding server for recompute add")
|
||||
embeddings = compute_embeddings_via_server(texts, ctx.embedding_model, port)
|
||||
else:
|
||||
embeddings = compute_embeddings(
|
||||
texts,
|
||||
model_name=ctx.embedding_model,
|
||||
mode=ctx.embedding_mode,
|
||||
use_server=False,
|
||||
is_build=True,
|
||||
)
|
||||
|
||||
# Normalize for cosine if needed
|
||||
if ctx.distance_metric == "cosine":
|
||||
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
|
||||
norms[norms == 0] = 1
|
||||
embeddings = embeddings / norms
|
||||
|
||||
# Append via backend helper (supports ef_construction/recompute plumbing)
|
||||
try:
|
||||
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
|
||||
) from e
|
||||
|
||||
if recompute and port is not None:
|
||||
os.environ["LEANN_ZMQ_PORT"] = str(port)
|
||||
|
||||
hnsw_add_vectors(
|
||||
str(ctx.vector_index_file),
|
||||
embeddings,
|
||||
ef_construction=ef_construction,
|
||||
recompute=recompute,
|
||||
)
|
||||
|
||||
# Stop server after add when recompute path used
|
||||
if esm is not None:
|
||||
try:
|
||||
esm.stop_server()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return embeddings.shape[0]
|
||||
|
||||
|
||||
def incremental_add_directory(
|
||||
index_path: str,
|
||||
data_dir: str,
|
||||
*,
|
||||
chunk_size: int = 256,
|
||||
chunk_overlap: int = 128,
|
||||
required_exts: Optional[list[str]] = None,
|
||||
max_items: int = -1,
|
||||
embedding_model: Optional[str] = None,
|
||||
embedding_mode: Optional[str] = None,
|
||||
) -> int:
|
||||
"""Load documents from a directory, chunk them, and incrementally add to an index.
|
||||
|
||||
Chunking uses LlamaIndex SentenceSplitter for simplicity and avoids external app dependencies.
|
||||
"""
|
||||
try:
|
||||
from llama_index.core import SimpleDirectoryReader # type: ignore
|
||||
from llama_index.core.node_parser import SentenceSplitter # type: ignore
|
||||
except Exception as e:
|
||||
raise RuntimeError("llama-index-core is required for incremental_add_directory") from e
|
||||
|
||||
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
|
||||
if required_exts:
|
||||
reader_kwargs["required_exts"] = required_exts
|
||||
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
|
||||
if not documents:
|
||||
return 0
|
||||
|
||||
# Traditional text chunking
|
||||
splitter = SentenceSplitter(
|
||||
chunk_size=chunk_size,
|
||||
chunk_overlap=chunk_overlap,
|
||||
separator=" ",
|
||||
paragraph_separator="\n\n",
|
||||
)
|
||||
all_texts: list[str] = []
|
||||
for doc in documents:
|
||||
try:
|
||||
nodes = splitter.get_nodes_from_documents([doc])
|
||||
if nodes:
|
||||
all_texts.extend([node.get_content() for node in nodes])
|
||||
except Exception:
|
||||
content = doc.get_content()
|
||||
if content and content.strip():
|
||||
all_texts.append(content.strip())
|
||||
|
||||
if max_items > 0 and len(all_texts) > max_items:
|
||||
all_texts = all_texts[:max_items]
|
||||
|
||||
return incremental_add_texts(
|
||||
index_path,
|
||||
all_texts,
|
||||
embedding_model=embedding_model,
|
||||
embedding_mode=embedding_mode,
|
||||
)
|
||||
|
||||
@@ -104,7 +104,11 @@ astchunk = { path = "packages/astchunk-leann", editable = true }
|
||||
[tool.ruff]
|
||||
target-version = "py39"
|
||||
line-length = 100
|
||||
extend-exclude = ["third_party"]
|
||||
extend-exclude = [
|
||||
"third_party",
|
||||
"apps/multimodal/vision-based-pdf-multi-vector/multi-vector-leann.py",
|
||||
"apps/multimodal/vision-based-pdf-multi-vector/multi-vector-leann-similarity-map.py"
|
||||
]
|
||||
|
||||
|
||||
[tool.ruff.lint]
|
||||
|
||||
12
uv.lock
generated
12
uv.lock
generated
@@ -1,5 +1,5 @@
|
||||
version = 1
|
||||
revision = 2
|
||||
revision = 3
|
||||
requires-python = ">=3.9"
|
||||
resolution-markers = [
|
||||
"python_full_version >= '3.12'",
|
||||
@@ -2138,7 +2138,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "leann-backend-diskann"
|
||||
version = "0.3.3"
|
||||
version = "0.3.4"
|
||||
source = { editable = "packages/leann-backend-diskann" }
|
||||
dependencies = [
|
||||
{ name = "leann-core" },
|
||||
@@ -2150,14 +2150,14 @@ dependencies = [
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "leann-core", specifier = "==0.3.3" },
|
||||
{ name = "leann-core", specifier = "==0.3.4" },
|
||||
{ name = "numpy" },
|
||||
{ name = "protobuf", specifier = ">=3.19.0" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "leann-backend-hnsw"
|
||||
version = "0.3.3"
|
||||
version = "0.3.4"
|
||||
source = { editable = "packages/leann-backend-hnsw" }
|
||||
dependencies = [
|
||||
{ name = "leann-core" },
|
||||
@@ -2170,7 +2170,7 @@ dependencies = [
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "leann-core", specifier = "==0.3.3" },
|
||||
{ name = "leann-core", specifier = "==0.3.4" },
|
||||
{ name = "msgpack", specifier = ">=1.0.0" },
|
||||
{ name = "numpy" },
|
||||
{ name = "pyzmq", specifier = ">=23.0.0" },
|
||||
@@ -2178,7 +2178,7 @@ requires-dist = [
|
||||
|
||||
[[package]]
|
||||
name = "leann-core"
|
||||
version = "0.3.3"
|
||||
version = "0.3.4"
|
||||
source = { editable = "packages/leann-core" }
|
||||
dependencies = [
|
||||
{ name = "accelerate" },
|
||||
|
||||
Reference in New Issue
Block a user