diff --git a/examples/dynamic_add_leann_no_recompute.py b/examples/dynamic_add_leann_no_recompute.py index 621a893..99aa27b 100644 --- a/examples/dynamic_add_leann_no_recompute.py +++ b/examples/dynamic_add_leann_no_recompute.py @@ -17,6 +17,19 @@ Usage examples: uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \ --add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \ --index-dir ./test_doc_files + +Quick recompute test (both true): + # Recompute build + uv run python examples/dynamic_add_leann_no_recompute.py --build-base \ + --recompute-build --ef-construction 200 \ + --base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \ + --index-dir ./test_doc_files --index-name documents.leann + + # Recompute add + uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \ + --recompute-add --ef-construction 32 \ + --add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \ + --index-dir ./test_doc_files --index-name documents.leann """ import argparse @@ -32,8 +45,29 @@ CORE_SRC = ROOT / "packages" / "leann-core" / "src" HNSW_PKG_DIR = ROOT / "packages" / "leann-backend-hnsw" APPS_DIR = ROOT / "apps" -# Prepend precise paths so the core module name `leann` resolves to leann-core -for p in [CORE_SRC, HNSW_PKG_DIR, APPS_DIR]: + +# Prefer the installed backend if available (it contains the compiled extension) +def _prefer_installed(pkg_name: str) -> bool: + try: + import importlib + import importlib.util + + spec = importlib.util.find_spec(pkg_name) + if spec and spec.origin and "site-packages" in spec.origin: + # ensure the faiss shim/extension is importable from the installed package + importlib.import_module(f"{pkg_name}.faiss") + return True + except Exception: + pass + return False + + +# Prepend paths, but only add the repo backend if the installed one is not present +paths_to_prepend = [CORE_SRC, APPS_DIR] +if not _prefer_installed("leann_backend_hnsw"): + paths_to_prepend.insert(1, HNSW_PKG_DIR) + +for p in paths_to_prepend: p_str = str(p) if p_str not in sys.path: sys.path.insert(0, p_str) @@ -113,6 +147,8 @@ def build_base_index( chunk_overlap: int, file_types: Optional[list[str]] = None, max_items: int = -1, + ef_construction: Optional[int] = None, + recompute_build: bool = False, ) -> str: print(f"Building base index from: {base_dir}") documents = _load_documents(base_dir, required_exts=file_types) @@ -135,7 +171,7 @@ def build_base_index( _ensure_index_dir(index_dir_path) index_path = index_dir_path / index_name - print("Creating HNSW index with no-recompute (non-compact)...") + print("Creating HNSW index (non-compact)...") from leann.api import LeannBuilder from leann.registry import register_project_directory @@ -143,8 +179,9 @@ def build_base_index( backend_name="hnsw", embedding_model=embedding_model, embedding_mode=embedding_mode, - is_recompute=False, + is_recompute=recompute_build, is_compact=False, + efConstruction=(ef_construction if ef_construction is not None else 200), ) for t in texts: builder.add_text(t) @@ -167,6 +204,8 @@ def add_incremental( chunk_overlap: int = 128, file_types: Optional[list[str]] = None, max_items: int = -1, + ef_construction: Optional[int] = None, + recompute_add: bool = False, ) -> str: print(f"Adding incremental data from: {add_dir}") index_dir_path = Path(index_dir) @@ -207,7 +246,12 @@ def add_incremental( print("No new chunks to add.") return str(index_path) - added = incremental_add_texts_with_context(ctx, prepared_texts) + added = incremental_add_texts_with_context( + ctx, + prepared_texts, + ef_construction=ef_construction, + recompute=recompute_add, + ) print(f"Incremental add completed. Added {added} chunks. Index: {index_path}") return str(index_path) @@ -268,6 +312,15 @@ def main(): parser.add_argument("--chunk-overlap", type=int, default=128) parser.add_argument("--file-types", nargs="+", default=None) parser.add_argument("--max-items", type=int, default=-1) + parser.add_argument("--ef-construction", type=int, default=32) + parser.add_argument( + "--recompute-add", action="store_true", help="Enable recompute-mode add (non-compact only)" + ) + parser.add_argument( + "--recompute-build", + action="store_true", + help="Enable recompute-mode base build (non-compact only)", + ) args = parser.parse_args() @@ -288,6 +341,8 @@ def main(): chunk_overlap=args.chunk_overlap, file_types=args.file_types, max_items=args.max_items, + ef_construction=args.ef_construction, + recompute_build=args.recompute_build, ) if args.add_incremental: @@ -301,6 +356,8 @@ def main(): chunk_overlap=args.chunk_overlap, file_types=args.file_types, max_items=args.max_items, + ef_construction=args.ef_construction, + recompute_add=args.recompute_add, ) # Optional: quick test query using searcher @@ -312,7 +369,7 @@ def main(): query = "what is LEANN?" if args.add_incremental: query = "what is the multi vector search and how it works?" - results = searcher.search(query, top_k=5, recompute_embeddings=False) + results = searcher.search(query, top_k=5) if results: print(f"Sample result: {results[0].text[:80]}...") except Exception: diff --git a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_backend.py b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_backend.py index 4437bf6..d517029 100644 --- a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_backend.py +++ b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_backend.py @@ -15,6 +15,7 @@ from leann.registry import register_backend from leann.searcher_base import BaseSearcher from .convert_to_csr import convert_hnsw_graph_to_csr +from .prune_index import prune_embeddings_preserve_graph_inplace logger = logging.getLogger(__name__) @@ -90,8 +91,16 @@ class HNSWBuilder(LeannBackendBuilderInterface): index_file = index_dir / f"{index_prefix}.index" faiss.write_index(index, str(index_file)) - if self.is_compact: - self._convert_to_csr(index_file) + if self.is_recompute: + if self.is_compact: + self._convert_to_csr(index_file) + else: + # Non-compact format: prune only embeddings, keep original graph + ok = prune_embeddings_preserve_graph_inplace(str(index_file)) + if not ok: + raise RuntimeError( + "Pruning embeddings while preserving graph failed for non-compact index" + ) def _convert_to_csr(self, index_file: Path): """Convert built index to CSR format""" @@ -148,7 +157,13 @@ class HNSWSearcher(BaseSearcher): self.is_pruned ) # In C++ code, it's called is_recompute, but it's only for loading IIUC. - self._index = faiss.read_index(str(index_file), faiss.IO_FLAG_MMAP, hnsw_config) + # If pruned (recompute mode), explicitly skip storage to avoid reading + # the pruned section. Still allow MMAP for graph. + io_flags = faiss.IO_FLAG_MMAP + if self.is_pruned: + io_flags |= faiss.IO_FLAG_SKIP_STORAGE + + self._index = faiss.read_index(str(index_file), io_flags, hnsw_config) def search( self, @@ -251,3 +266,55 @@ class HNSWSearcher(BaseSearcher): string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels] return {"labels": string_labels, "distances": distances} + + +# ---------- Helper API for incremental add (Python-level) ---------- +def add_vectors( + index_file_path: str, + embeddings: np.ndarray, + *, + ef_construction: Optional[int] = None, + recompute: bool = False, +) -> None: + """Append vectors to an existing non-compact HNSW index. + + Args: + index_file_path: Path to the HNSW .index file + embeddings: float32 numpy array (N, D) + ef_construction: Optional override for efConstruction during insertion + recompute: Reserved for future use to control insertion-time recompute behaviors + """ + from . import faiss # type: ignore + + if embeddings.dtype != np.float32: + embeddings = embeddings.astype(np.float32) + if not embeddings.flags.c_contiguous: + embeddings = np.ascontiguousarray(embeddings, dtype=np.float32) + + # Load index normally to ensure storage is present; toggle is_recompute on the object + index = faiss.read_index(str(index_file_path), faiss.IO_FLAG_MMAP) + + # Best-effort: explicitly set flag on the object if the binding exposes it + try: + index.is_recompute = bool(recompute) + except Exception: + pass + try: + if ef_construction is not None: + index.hnsw.efConstruction = int(ef_construction) + except Exception: + # Best-effort; ignore if backend doesn't expose setter + pass + + # For non-compact HNSW, calling add directly is sufficient. When is_recompute is set + # (via config or attribute), FAISS will run the insertion/search path accordingly. + # To strictly follow per-point insert semantics in recompute mode, add one-by-one. + if recompute: + # Insert row by row + n = embeddings.shape[0] + for i in range(n): + row = embeddings[i : i + 1] + index.add(1, faiss.swig_ptr(row)) + else: + index.add(embeddings.shape[0], faiss.swig_ptr(embeddings)) + faiss.write_index(index, str(index_file_path)) diff --git a/packages/leann-backend-hnsw/leann_backend_hnsw/prune_index.py b/packages/leann-backend-hnsw/leann_backend_hnsw/prune_index.py new file mode 100644 index 0000000..840313e --- /dev/null +++ b/packages/leann-backend-hnsw/leann_backend_hnsw/prune_index.py @@ -0,0 +1,149 @@ +import os +import struct +from pathlib import Path + +from .convert_to_csr import ( + EXPECTED_HNSW_FOURCCS, + NULL_INDEX_FOURCC, + read_struct, + read_vector_raw, +) + + +def _write_vector_raw(f_out, count: int, data_bytes: bytes) -> None: + """Write a vector in the same binary layout as read_vector_raw reads: + raw bytes.""" + f_out.write(struct.pack(" 0 and data_bytes: + f_out.write(data_bytes) + + +def prune_embeddings_preserve_graph(input_filename: str, output_filename: str) -> bool: + """ + Copy an original (non-compact) HNSW index file while pruning the trailing embedding storage. + Preserves the graph structure and metadata exactly; only writes a NULL storage marker instead of + the original storage fourcc and payload. + + Returns True on success. + """ + print(f"Pruning embeddings from {input_filename} to {output_filename}") + print("--------------------------------") + # running in mode is-recompute=True and is-compact=False + in_path = Path(input_filename) + out_path = Path(output_filename) + + try: + with open(in_path, "rb") as f_in, open(out_path, "wb") as f_out: + # Header + index_fourcc = read_struct(f_in, " 1: + metric_arg = read_struct(f_in, " bool: + """ + Convenience wrapper: write pruned file to a temporary path next to the + original, then atomically replace on success. + """ + print(f"Pruning embeddings from {index_file_path} to {index_file_path}") + print("--------------------------------") + # running in mode is-recompute=True and is-compact=False + src = Path(index_file_path) + tmp = src.with_suffix(".pruned.tmp") + ok = prune_embeddings_preserve_graph(str(src), str(tmp)) + if not ok: + if tmp.exists(): + try: + tmp.unlink() + except OSError: + pass + return False + try: + os.replace(str(tmp), str(src)) + except Exception: + # Rollback on failure + try: + if tmp.exists(): + tmp.unlink() + except OSError: + pass + return False + return True diff --git a/packages/leann-backend-hnsw/third_party/faiss b/packages/leann-backend-hnsw/third_party/faiss index ed96ff7..ea86d06 160000 --- a/packages/leann-backend-hnsw/third_party/faiss +++ b/packages/leann-backend-hnsw/third_party/faiss @@ -1 +1 @@ -Subproject commit ed96ff7dbaea0562b994f8ce7823af41884b1010 +Subproject commit ea86d06cebff587b9b41afd1041e5a617076ac46 diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index 46a0e74..a4d6a71 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -5,6 +5,7 @@ with the correct, original embedding logic from the user's reference code. import json import logging +import os import pickle import re import subprocess @@ -19,6 +20,7 @@ import numpy as np from leann.interface import LeannBackendSearcherInterface from .chat import get_llm +from .embedding_server_manager import EmbeddingServerManager from .interface import LeannBackendFactoryInterface from .metadata_filter import MetadataFilterEngine from .registry import BACKEND_REGISTRY @@ -490,9 +492,7 @@ class LeannBuilder: is_compact = self.backend_kwargs.get("is_compact", True) is_recompute = self.backend_kwargs.get("is_recompute", True) meta_data["is_compact"] = is_compact - meta_data["is_pruned"] = ( - is_compact and is_recompute - ) # Pruned only if compact and recompute + meta_data["is_pruned"] = is_recompute # Pruned only if compact and recompute with open(leann_meta_path, "w", encoding="utf-8") as f: json.dump(meta_data, f, indent=2) @@ -1105,6 +1105,8 @@ def incremental_add_texts( *, embedding_model: Optional[str] = None, embedding_mode: Optional[str] = None, + ef_construction: Optional[int] = None, + recompute: bool = False, ) -> int: """Incrementally add text chunks to an existing HNSW index built with no-recompute. @@ -1139,38 +1141,70 @@ def incremental_add_texts( assigned_ids = _append_passages_and_update_offsets(passages_file, offsets_file, texts) # Compute embeddings - embeddings = compute_embeddings( - texts, - model_name=model_name, - mode=mode_name, - use_server=False, - is_build=True, - ) + # Embedding computation path + esm = None + port = None + if recompute: + # Determine distance metric early for server config + distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower() + # Start embedding server and compute via ZMQ for consistency with recompute semantics + passages_source_file = f"{index_path}.meta.json" + esm = EmbeddingServerManager( + backend_module_name="leann_backend_hnsw.hnsw_embedding_server", + ) + started, port = esm.start_server( + port=5557, + model_name=model_name, + embedding_mode=mode_name, + passages_file=passages_source_file, + distance_metric=distance_metric, + enable_warmup=False, + ) + if not started: + raise RuntimeError("Failed to start embedding server for recompute add") + embeddings = compute_embeddings_via_server(texts, model_name, port) + else: + embeddings = compute_embeddings( + texts, + model_name=model_name, + mode=mode_name, + use_server=False, + is_build=True, + ) # Normalize for cosine if needed - distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower() + if "distance_metric" not in locals(): + distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower() if distance_metric == "cosine": norms = np.linalg.norm(embeddings, axis=1, keepdims=True) norms[norms == 0] = 1 embeddings = embeddings / norms - # Load vector index and append + # Append via backend helper (supports ef_construction/recompute plumbing) try: - from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore + from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore except Exception as e: raise RuntimeError( - "Failed to import leann_backend_hnsw.faiss. Ensure HNSW backend is installed." + "Failed to import HNSW backend add helper. Ensure HNSW backend is installed." ) from e - index = hnsw_faiss.read_index(str(vector_index_file), hnsw_faiss.IO_FLAG_MMAP) - if embeddings.dtype != np.float32: - embeddings = embeddings.astype(np.float32) - if not embeddings.flags.c_contiguous: - embeddings = np.ascontiguousarray(embeddings, dtype=np.float32) + # Propagate ZMQ port to FAISS add path when recompute is True + if recompute and port is not None: + os.environ["LEANN_ZMQ_PORT"] = str(port) - # C++-style signature (n, swig_ptr(x)) - index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings)) - hnsw_faiss.write_index(index, str(vector_index_file)) + hnsw_add_vectors( + str(vector_index_file), + embeddings, + ef_construction=ef_construction, + recompute=recompute, + ) + + # Stop server after add when recompute path used + if esm is not None: + try: + esm.stop_server() + except Exception: + pass # Sanity: ids length should match embeddings rows if len(assigned_ids) != embeddings.shape[0]: @@ -1265,8 +1299,17 @@ def create_incremental_add_context( ) -def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[str]) -> int: - """Incrementally add texts using a prepared context (no repeated validation).""" +def incremental_add_texts_with_context( + ctx: IncrementalAddContext, + texts: list[str], + *, + ef_construction: Optional[int] = None, + recompute: bool = False, +) -> int: + """Incrementally add texts using a prepared context (no repeated validation). + + For non-compact HNSW, ef_construction (efConstruction) can be overridden during insertion. + """ if not texts: return 0 @@ -1274,13 +1317,33 @@ def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[s _append_passages_and_update_offsets(ctx.passages_file, ctx.offsets_file, texts) # Compute embeddings - embeddings = compute_embeddings( - texts, - model_name=ctx.embedding_model, - mode=ctx.embedding_mode, - use_server=False, - is_build=True, - ) + # Embedding computation path + esm = None + port = None + if recompute: + passages_source_file = f"{ctx.index_path}.meta.json" + esm = EmbeddingServerManager( + backend_module_name="leann_backend_hnsw.hnsw_embedding_server", + ) + started, port = esm.start_server( + port=5557, + model_name=ctx.embedding_model, + embedding_mode=ctx.embedding_mode, + passages_file=passages_source_file, + distance_metric=ctx.distance_metric, + enable_warmup=False, + ) + if not started: + raise RuntimeError("Failed to start embedding server for recompute add") + embeddings = compute_embeddings_via_server(texts, ctx.embedding_model, port) + else: + embeddings = compute_embeddings( + texts, + model_name=ctx.embedding_model, + mode=ctx.embedding_mode, + use_server=False, + is_build=True, + ) # Normalize for cosine if needed if ctx.distance_metric == "cosine": @@ -1288,21 +1351,30 @@ def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[s norms[norms == 0] = 1 embeddings = embeddings / norms - # Load vector index and append + # Append via backend helper (supports ef_construction/recompute plumbing) try: - from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore + from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore except Exception as e: raise RuntimeError( - "Failed to import leann_backend_hnsw.faiss. Ensure HNSW backend is installed." + "Failed to import HNSW backend add helper. Ensure HNSW backend is installed." ) from e - index = hnsw_faiss.read_index(str(ctx.vector_index_file), hnsw_faiss.IO_FLAG_MMAP) - if embeddings.dtype != np.float32: - embeddings = embeddings.astype(np.float32) - if not embeddings.flags.c_contiguous: - embeddings = np.ascontiguousarray(embeddings, dtype=np.float32) - index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings)) - hnsw_faiss.write_index(index, str(ctx.vector_index_file)) + if recompute and port is not None: + os.environ["LEANN_ZMQ_PORT"] = str(port) + + hnsw_add_vectors( + str(ctx.vector_index_file), + embeddings, + ef_construction=ef_construction, + recompute=recompute, + ) + + # Stop server after add when recompute path used + if esm is not None: + try: + esm.stop_server() + except Exception: + pass return embeddings.shape[0]