fix: remove unused storage_fourcc

This commit is contained in:
yichuan520030910320
2025-09-19 15:44:38 -07:00
parent 0a69118f87
commit 62a5d7b31d
5 changed files with 396 additions and 51 deletions

View File

@@ -17,6 +17,19 @@ Usage examples:
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
--index-dir ./test_doc_files
Quick recompute test (both true):
# Recompute build
uv run python examples/dynamic_add_leann_no_recompute.py --build-base \
--recompute-build --ef-construction 200 \
--base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \
--index-dir ./test_doc_files --index-name documents.leann
# Recompute add
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
--recompute-add --ef-construction 32 \
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
--index-dir ./test_doc_files --index-name documents.leann
"""
import argparse
@@ -32,8 +45,29 @@ CORE_SRC = ROOT / "packages" / "leann-core" / "src"
HNSW_PKG_DIR = ROOT / "packages" / "leann-backend-hnsw"
APPS_DIR = ROOT / "apps"
# Prepend precise paths so the core module name `leann` resolves to leann-core
for p in [CORE_SRC, HNSW_PKG_DIR, APPS_DIR]:
# Prefer the installed backend if available (it contains the compiled extension)
def _prefer_installed(pkg_name: str) -> bool:
try:
import importlib
import importlib.util
spec = importlib.util.find_spec(pkg_name)
if spec and spec.origin and "site-packages" in spec.origin:
# ensure the faiss shim/extension is importable from the installed package
importlib.import_module(f"{pkg_name}.faiss")
return True
except Exception:
pass
return False
# Prepend paths, but only add the repo backend if the installed one is not present
paths_to_prepend = [CORE_SRC, APPS_DIR]
if not _prefer_installed("leann_backend_hnsw"):
paths_to_prepend.insert(1, HNSW_PKG_DIR)
for p in paths_to_prepend:
p_str = str(p)
if p_str not in sys.path:
sys.path.insert(0, p_str)
@@ -113,6 +147,8 @@ def build_base_index(
chunk_overlap: int,
file_types: Optional[list[str]] = None,
max_items: int = -1,
ef_construction: Optional[int] = None,
recompute_build: bool = False,
) -> str:
print(f"Building base index from: {base_dir}")
documents = _load_documents(base_dir, required_exts=file_types)
@@ -135,7 +171,7 @@ def build_base_index(
_ensure_index_dir(index_dir_path)
index_path = index_dir_path / index_name
print("Creating HNSW index with no-recompute (non-compact)...")
print("Creating HNSW index (non-compact)...")
from leann.api import LeannBuilder
from leann.registry import register_project_directory
@@ -143,8 +179,9 @@ def build_base_index(
backend_name="hnsw",
embedding_model=embedding_model,
embedding_mode=embedding_mode,
is_recompute=False,
is_recompute=recompute_build,
is_compact=False,
efConstruction=(ef_construction if ef_construction is not None else 200),
)
for t in texts:
builder.add_text(t)
@@ -167,6 +204,8 @@ def add_incremental(
chunk_overlap: int = 128,
file_types: Optional[list[str]] = None,
max_items: int = -1,
ef_construction: Optional[int] = None,
recompute_add: bool = False,
) -> str:
print(f"Adding incremental data from: {add_dir}")
index_dir_path = Path(index_dir)
@@ -207,7 +246,12 @@ def add_incremental(
print("No new chunks to add.")
return str(index_path)
added = incremental_add_texts_with_context(ctx, prepared_texts)
added = incremental_add_texts_with_context(
ctx,
prepared_texts,
ef_construction=ef_construction,
recompute=recompute_add,
)
print(f"Incremental add completed. Added {added} chunks. Index: {index_path}")
return str(index_path)
@@ -268,6 +312,15 @@ def main():
parser.add_argument("--chunk-overlap", type=int, default=128)
parser.add_argument("--file-types", nargs="+", default=None)
parser.add_argument("--max-items", type=int, default=-1)
parser.add_argument("--ef-construction", type=int, default=32)
parser.add_argument(
"--recompute-add", action="store_true", help="Enable recompute-mode add (non-compact only)"
)
parser.add_argument(
"--recompute-build",
action="store_true",
help="Enable recompute-mode base build (non-compact only)",
)
args = parser.parse_args()
@@ -288,6 +341,8 @@ def main():
chunk_overlap=args.chunk_overlap,
file_types=args.file_types,
max_items=args.max_items,
ef_construction=args.ef_construction,
recompute_build=args.recompute_build,
)
if args.add_incremental:
@@ -301,6 +356,8 @@ def main():
chunk_overlap=args.chunk_overlap,
file_types=args.file_types,
max_items=args.max_items,
ef_construction=args.ef_construction,
recompute_add=args.recompute_add,
)
# Optional: quick test query using searcher
@@ -312,7 +369,7 @@ def main():
query = "what is LEANN?"
if args.add_incremental:
query = "what is the multi vector search and how it works?"
results = searcher.search(query, top_k=5, recompute_embeddings=False)
results = searcher.search(query, top_k=5)
if results:
print(f"Sample result: {results[0].text[:80]}...")
except Exception:

View File

@@ -15,6 +15,7 @@ from leann.registry import register_backend
from leann.searcher_base import BaseSearcher
from .convert_to_csr import convert_hnsw_graph_to_csr
from .prune_index import prune_embeddings_preserve_graph_inplace
logger = logging.getLogger(__name__)
@@ -90,8 +91,16 @@ class HNSWBuilder(LeannBackendBuilderInterface):
index_file = index_dir / f"{index_prefix}.index"
faiss.write_index(index, str(index_file))
if self.is_compact:
self._convert_to_csr(index_file)
if self.is_recompute:
if self.is_compact:
self._convert_to_csr(index_file)
else:
# Non-compact format: prune only embeddings, keep original graph
ok = prune_embeddings_preserve_graph_inplace(str(index_file))
if not ok:
raise RuntimeError(
"Pruning embeddings while preserving graph failed for non-compact index"
)
def _convert_to_csr(self, index_file: Path):
"""Convert built index to CSR format"""
@@ -148,7 +157,13 @@ class HNSWSearcher(BaseSearcher):
self.is_pruned
) # In C++ code, it's called is_recompute, but it's only for loading IIUC.
self._index = faiss.read_index(str(index_file), faiss.IO_FLAG_MMAP, hnsw_config)
# If pruned (recompute mode), explicitly skip storage to avoid reading
# the pruned section. Still allow MMAP for graph.
io_flags = faiss.IO_FLAG_MMAP
if self.is_pruned:
io_flags |= faiss.IO_FLAG_SKIP_STORAGE
self._index = faiss.read_index(str(index_file), io_flags, hnsw_config)
def search(
self,
@@ -251,3 +266,55 @@ class HNSWSearcher(BaseSearcher):
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
return {"labels": string_labels, "distances": distances}
# ---------- Helper API for incremental add (Python-level) ----------
def add_vectors(
index_file_path: str,
embeddings: np.ndarray,
*,
ef_construction: Optional[int] = None,
recompute: bool = False,
) -> None:
"""Append vectors to an existing non-compact HNSW index.
Args:
index_file_path: Path to the HNSW .index file
embeddings: float32 numpy array (N, D)
ef_construction: Optional override for efConstruction during insertion
recompute: Reserved for future use to control insertion-time recompute behaviors
"""
from . import faiss # type: ignore
if embeddings.dtype != np.float32:
embeddings = embeddings.astype(np.float32)
if not embeddings.flags.c_contiguous:
embeddings = np.ascontiguousarray(embeddings, dtype=np.float32)
# Load index normally to ensure storage is present; toggle is_recompute on the object
index = faiss.read_index(str(index_file_path), faiss.IO_FLAG_MMAP)
# Best-effort: explicitly set flag on the object if the binding exposes it
try:
index.is_recompute = bool(recompute)
except Exception:
pass
try:
if ef_construction is not None:
index.hnsw.efConstruction = int(ef_construction)
except Exception:
# Best-effort; ignore if backend doesn't expose setter
pass
# For non-compact HNSW, calling add directly is sufficient. When is_recompute is set
# (via config or attribute), FAISS will run the insertion/search path accordingly.
# To strictly follow per-point insert semantics in recompute mode, add one-by-one.
if recompute:
# Insert row by row
n = embeddings.shape[0]
for i in range(n):
row = embeddings[i : i + 1]
index.add(1, faiss.swig_ptr(row))
else:
index.add(embeddings.shape[0], faiss.swig_ptr(embeddings))
faiss.write_index(index, str(index_file_path))

View File

@@ -0,0 +1,149 @@
import os
import struct
from pathlib import Path
from .convert_to_csr import (
EXPECTED_HNSW_FOURCCS,
NULL_INDEX_FOURCC,
read_struct,
read_vector_raw,
)
def _write_vector_raw(f_out, count: int, data_bytes: bytes) -> None:
"""Write a vector in the same binary layout as read_vector_raw reads: <Q count> + raw bytes."""
f_out.write(struct.pack("<Q", count))
if count > 0 and data_bytes:
f_out.write(data_bytes)
def prune_embeddings_preserve_graph(input_filename: str, output_filename: str) -> bool:
"""
Copy an original (non-compact) HNSW index file while pruning the trailing embedding storage.
Preserves the graph structure and metadata exactly; only writes a NULL storage marker instead of
the original storage fourcc and payload.
Returns True on success.
"""
print(f"Pruning embeddings from {input_filename} to {output_filename}")
print("--------------------------------")
# running in mode is-recompute=True and is-compact=False
in_path = Path(input_filename)
out_path = Path(output_filename)
try:
with open(in_path, "rb") as f_in, open(out_path, "wb") as f_out:
# Header
index_fourcc = read_struct(f_in, "<I")
if index_fourcc not in EXPECTED_HNSW_FOURCCS:
# Still proceed, but this is unexpected
pass
f_out.write(struct.pack("<I", index_fourcc))
d = read_struct(f_in, "<i")
ntotal_hdr = read_struct(f_in, "<q")
dummy1 = read_struct(f_in, "<q")
dummy2 = read_struct(f_in, "<q")
is_trained = read_struct(f_in, "?")
metric_type = read_struct(f_in, "<i")
f_out.write(struct.pack("<i", d))
f_out.write(struct.pack("<q", ntotal_hdr))
f_out.write(struct.pack("<q", dummy1))
f_out.write(struct.pack("<q", dummy2))
f_out.write(struct.pack("<?", is_trained))
f_out.write(struct.pack("<i", metric_type))
if metric_type > 1:
metric_arg = read_struct(f_in, "<f")
f_out.write(struct.pack("<f", metric_arg))
# Vectors: assign_probas (double), cum_nneighbor_per_level (int32), levels (int32)
cnt, data = read_vector_raw(f_in, "d")
_write_vector_raw(f_out, cnt, data)
cnt, data = read_vector_raw(f_in, "i")
_write_vector_raw(f_out, cnt, data)
cnt, data = read_vector_raw(f_in, "i")
_write_vector_raw(f_out, cnt, data)
# Probe potential extra alignment/flag byte present in some original formats
probe = f_in.read(1)
if probe:
if probe == b"\x00":
# Preserve this unexpected 0x00 byte
f_out.write(probe)
else:
# Likely part of the next vector; rewind
f_in.seek(-1, os.SEEK_CUR)
# Offsets (uint64) and neighbors (int32)
cnt, data = read_vector_raw(f_in, "Q")
_write_vector_raw(f_out, cnt, data)
cnt, data = read_vector_raw(f_in, "i")
_write_vector_raw(f_out, cnt, data)
# Scalar params
entry_point = read_struct(f_in, "<i")
max_level = read_struct(f_in, "<i")
ef_construction = read_struct(f_in, "<i")
ef_search = read_struct(f_in, "<i")
dummy_upper_beam = read_struct(f_in, "<i")
f_out.write(struct.pack("<i", entry_point))
f_out.write(struct.pack("<i", max_level))
f_out.write(struct.pack("<i", ef_construction))
f_out.write(struct.pack("<i", ef_search))
f_out.write(struct.pack("<i", dummy_upper_beam))
# Storage fourcc (if present) — write NULL marker and drop any remaining data
try:
read_struct(f_in, "<I")
# Regardless of original, write NULL
f_out.write(struct.pack("<I", NULL_INDEX_FOURCC))
# Discard the rest of the file (embedding payload)
# (Do not copy anything else)
except EOFError:
# No storage section; nothing else to write
pass
return True
except Exception:
# Best-effort cleanup
try:
if out_path.exists():
out_path.unlink()
except OSError:
pass
return False
def prune_embeddings_preserve_graph_inplace(index_file_path: str) -> bool:
"""
Convenience wrapper: write pruned file to a temporary path next to the
original, then atomically replace on success.
"""
print(f"Pruning embeddings from {index_file_path} to {index_file_path}")
print("--------------------------------")
# running in mode is-recompute=True and is-compact=False
src = Path(index_file_path)
tmp = src.with_suffix(".pruned.tmp")
ok = prune_embeddings_preserve_graph(str(src), str(tmp))
if not ok:
if tmp.exists():
try:
tmp.unlink()
except OSError:
pass
return False
try:
os.replace(str(tmp), str(src))
except Exception:
# Rollback on failure
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
return False
return True

View File

@@ -5,6 +5,7 @@ with the correct, original embedding logic from the user's reference code.
import json
import logging
import os
import pickle
import re
import subprocess
@@ -19,6 +20,7 @@ import numpy as np
from leann.interface import LeannBackendSearcherInterface
from .chat import get_llm
from .embedding_server_manager import EmbeddingServerManager
from .interface import LeannBackendFactoryInterface
from .metadata_filter import MetadataFilterEngine
from .registry import BACKEND_REGISTRY
@@ -490,9 +492,7 @@ class LeannBuilder:
is_compact = self.backend_kwargs.get("is_compact", True)
is_recompute = self.backend_kwargs.get("is_recompute", True)
meta_data["is_compact"] = is_compact
meta_data["is_pruned"] = (
is_compact and is_recompute
) # Pruned only if compact and recompute
meta_data["is_pruned"] = is_recompute # Pruned only if compact and recompute
with open(leann_meta_path, "w", encoding="utf-8") as f:
json.dump(meta_data, f, indent=2)
@@ -1105,6 +1105,8 @@ def incremental_add_texts(
*,
embedding_model: Optional[str] = None,
embedding_mode: Optional[str] = None,
ef_construction: Optional[int] = None,
recompute: bool = False,
) -> int:
"""Incrementally add text chunks to an existing HNSW index built with no-recompute.
@@ -1139,38 +1141,70 @@ def incremental_add_texts(
assigned_ids = _append_passages_and_update_offsets(passages_file, offsets_file, texts)
# Compute embeddings
embeddings = compute_embeddings(
texts,
model_name=model_name,
mode=mode_name,
use_server=False,
is_build=True,
)
# Embedding computation path
esm = None
port = None
if recompute:
# Determine distance metric early for server config
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
# Start embedding server and compute via ZMQ for consistency with recompute semantics
passages_source_file = f"{index_path}.meta.json"
esm = EmbeddingServerManager(
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
)
started, port = esm.start_server(
port=5557,
model_name=model_name,
embedding_mode=mode_name,
passages_file=passages_source_file,
distance_metric=distance_metric,
enable_warmup=False,
)
if not started:
raise RuntimeError("Failed to start embedding server for recompute add")
embeddings = compute_embeddings_via_server(texts, model_name, port)
else:
embeddings = compute_embeddings(
texts,
model_name=model_name,
mode=mode_name,
use_server=False,
is_build=True,
)
# Normalize for cosine if needed
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
if "distance_metric" not in locals():
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
if distance_metric == "cosine":
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
embeddings = embeddings / norms
# Load vector index and append
# Append via backend helper (supports ef_construction/recompute plumbing)
try:
from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
except Exception as e:
raise RuntimeError(
"Failed to import leann_backend_hnsw.faiss. Ensure HNSW backend is installed."
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
) from e
index = hnsw_faiss.read_index(str(vector_index_file), hnsw_faiss.IO_FLAG_MMAP)
if embeddings.dtype != np.float32:
embeddings = embeddings.astype(np.float32)
if not embeddings.flags.c_contiguous:
embeddings = np.ascontiguousarray(embeddings, dtype=np.float32)
# Propagate ZMQ port to FAISS add path when recompute is True
if recompute and port is not None:
os.environ["LEANN_ZMQ_PORT"] = str(port)
# C++-style signature (n, swig_ptr(x))
index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings))
hnsw_faiss.write_index(index, str(vector_index_file))
hnsw_add_vectors(
str(vector_index_file),
embeddings,
ef_construction=ef_construction,
recompute=recompute,
)
# Stop server after add when recompute path used
if esm is not None:
try:
esm.stop_server()
except Exception:
pass
# Sanity: ids length should match embeddings rows
if len(assigned_ids) != embeddings.shape[0]:
@@ -1265,8 +1299,17 @@ def create_incremental_add_context(
)
def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[str]) -> int:
"""Incrementally add texts using a prepared context (no repeated validation)."""
def incremental_add_texts_with_context(
ctx: IncrementalAddContext,
texts: list[str],
*,
ef_construction: Optional[int] = None,
recompute: bool = False,
) -> int:
"""Incrementally add texts using a prepared context (no repeated validation).
For non-compact HNSW, ef_construction (efConstruction) can be overridden during insertion.
"""
if not texts:
return 0
@@ -1274,13 +1317,33 @@ def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[s
_append_passages_and_update_offsets(ctx.passages_file, ctx.offsets_file, texts)
# Compute embeddings
embeddings = compute_embeddings(
texts,
model_name=ctx.embedding_model,
mode=ctx.embedding_mode,
use_server=False,
is_build=True,
)
# Embedding computation path
esm = None
port = None
if recompute:
passages_source_file = f"{ctx.index_path}.meta.json"
esm = EmbeddingServerManager(
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
)
started, port = esm.start_server(
port=5557,
model_name=ctx.embedding_model,
embedding_mode=ctx.embedding_mode,
passages_file=passages_source_file,
distance_metric=ctx.distance_metric,
enable_warmup=False,
)
if not started:
raise RuntimeError("Failed to start embedding server for recompute add")
embeddings = compute_embeddings_via_server(texts, ctx.embedding_model, port)
else:
embeddings = compute_embeddings(
texts,
model_name=ctx.embedding_model,
mode=ctx.embedding_mode,
use_server=False,
is_build=True,
)
# Normalize for cosine if needed
if ctx.distance_metric == "cosine":
@@ -1288,21 +1351,30 @@ def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[s
norms[norms == 0] = 1
embeddings = embeddings / norms
# Load vector index and append
# Append via backend helper (supports ef_construction/recompute plumbing)
try:
from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
except Exception as e:
raise RuntimeError(
"Failed to import leann_backend_hnsw.faiss. Ensure HNSW backend is installed."
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
) from e
index = hnsw_faiss.read_index(str(ctx.vector_index_file), hnsw_faiss.IO_FLAG_MMAP)
if embeddings.dtype != np.float32:
embeddings = embeddings.astype(np.float32)
if not embeddings.flags.c_contiguous:
embeddings = np.ascontiguousarray(embeddings, dtype=np.float32)
index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings))
hnsw_faiss.write_index(index, str(ctx.vector_index_file))
if recompute and port is not None:
os.environ["LEANN_ZMQ_PORT"] = str(port)
hnsw_add_vectors(
str(ctx.vector_index_file),
embeddings,
ef_construction=ef_construction,
recompute=recompute,
)
# Stop server after add when recompute path used
if esm is not None:
try:
esm.stop_server()
except Exception:
pass
return embeddings.shape[0]