Compare commits
8 Commits
feature/op
...
dynamic-ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d02aee6901 | ||
|
|
62a5d7b31d | ||
|
|
0a69118f87 | ||
|
|
880a039e1d | ||
|
|
4a39b40e72 | ||
|
|
ed5fd88a85 | ||
|
|
8f4f2b4873 | ||
|
|
6a06bd893a |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -101,3 +101,4 @@ CLAUDE.local.md
|
|||||||
.claude/*.local.*
|
.claude/*.local.*
|
||||||
.claude/local/*
|
.claude/local/*
|
||||||
benchmarks/data/
|
benchmarks/data/
|
||||||
|
test_add/*
|
||||||
|
|||||||
380
examples/dynamic_add_leann_no_recompute.py
Normal file
380
examples/dynamic_add_leann_no_recompute.py
Normal file
@@ -0,0 +1,380 @@
|
|||||||
|
"""
|
||||||
|
Dynamic add example for LEANN using HNSW backend without recompute.
|
||||||
|
|
||||||
|
- Builds a base index from a directory of documents
|
||||||
|
- Incrementally adds new documents without recomputing stored embeddings
|
||||||
|
|
||||||
|
Defaults:
|
||||||
|
- Base data: /Users/yichuan/Desktop/code/LEANN/leann/data
|
||||||
|
- Incremental data: /Users/yichuan/Desktop/code/LEANN/leann/test_add
|
||||||
|
- Index path: <index_dir>/documents.leann
|
||||||
|
|
||||||
|
Usage examples:
|
||||||
|
uv run python examples/dynamic_add_leann_no_recompute.py --build-base \
|
||||||
|
--base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \
|
||||||
|
--index-dir ./test_doc_files
|
||||||
|
|
||||||
|
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
|
||||||
|
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
|
||||||
|
--index-dir ./test_doc_files
|
||||||
|
|
||||||
|
Quick recompute test (both true):
|
||||||
|
# Recompute build
|
||||||
|
uv run python examples/dynamic_add_leann_no_recompute.py --build-base \
|
||||||
|
--recompute-build --ef-construction 200 \
|
||||||
|
--base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \
|
||||||
|
--index-dir ./test_doc_files --index-name documents.leann
|
||||||
|
|
||||||
|
# Recompute add
|
||||||
|
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
|
||||||
|
--recompute-add --ef-construction 32 \
|
||||||
|
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
|
||||||
|
--index-dir ./test_doc_files --index-name documents.leann
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import pickle
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
# Ensure we can import from the local packages and apps folders
|
||||||
|
ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
CORE_SRC = ROOT / "packages" / "leann-core" / "src"
|
||||||
|
HNSW_PKG_DIR = ROOT / "packages" / "leann-backend-hnsw"
|
||||||
|
APPS_DIR = ROOT / "apps"
|
||||||
|
|
||||||
|
|
||||||
|
# Prefer the installed backend if available (it contains the compiled extension)
|
||||||
|
def _prefer_installed(pkg_name: str) -> bool:
|
||||||
|
try:
|
||||||
|
import importlib
|
||||||
|
import importlib.util
|
||||||
|
|
||||||
|
spec = importlib.util.find_spec(pkg_name)
|
||||||
|
if spec and spec.origin and "site-packages" in spec.origin:
|
||||||
|
# ensure the faiss shim/extension is importable from the installed package
|
||||||
|
importlib.import_module(f"{pkg_name}.faiss")
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# Prepend paths, but only add the repo backend if the installed one is not present
|
||||||
|
paths_to_prepend = [CORE_SRC, APPS_DIR]
|
||||||
|
if not _prefer_installed("leann_backend_hnsw"):
|
||||||
|
paths_to_prepend.insert(1, HNSW_PKG_DIR)
|
||||||
|
|
||||||
|
for p in paths_to_prepend:
|
||||||
|
p_str = str(p)
|
||||||
|
if p_str not in sys.path:
|
||||||
|
sys.path.insert(0, p_str)
|
||||||
|
|
||||||
|
# Defer non-stdlib imports until after sys.path setup within functions (avoid E402)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_documents(data_dir: str, required_exts: Optional[list[str]] = None) -> list[Any]:
|
||||||
|
from llama_index.core import SimpleDirectoryReader # type: ignore
|
||||||
|
|
||||||
|
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
|
||||||
|
if required_exts:
|
||||||
|
reader_kwargs["required_exts"] = required_exts
|
||||||
|
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
|
||||||
|
return documents
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_index_dir(index_dir: Path) -> None:
|
||||||
|
index_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _index_files(index_path: Path) -> tuple[Path, Path, Path]:
|
||||||
|
"""Return (passages.jsonl, passages.idx, index.index) paths for a given index base path.
|
||||||
|
|
||||||
|
Note: HNSWBackend writes the FAISS index using the stem (without .leann),
|
||||||
|
i.e., for base 'documents.leann' the file is 'documents.index'. We prefer the
|
||||||
|
existing file among candidates.
|
||||||
|
"""
|
||||||
|
passages_file = index_path.parent / f"{index_path.name}.passages.jsonl"
|
||||||
|
offsets_file = index_path.parent / f"{index_path.name}.passages.idx"
|
||||||
|
candidate_name_index = index_path.parent / f"{index_path.name}.index"
|
||||||
|
candidate_stem_index = index_path.parent / f"{index_path.stem}.index"
|
||||||
|
index_file = candidate_stem_index if candidate_stem_index.exists() else candidate_name_index
|
||||||
|
return passages_file, offsets_file, index_file
|
||||||
|
|
||||||
|
|
||||||
|
def _read_meta(index_path: Path) -> dict[str, Any]:
|
||||||
|
meta_path = index_path.parent / f"{index_path.name}.meta.json"
|
||||||
|
if not meta_path.exists():
|
||||||
|
raise FileNotFoundError(f"Metadata file not found: {meta_path}")
|
||||||
|
with open(meta_path, encoding="utf-8") as f:
|
||||||
|
return json.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
def _autodetect_index_base(index_dir: Path) -> Optional[Path]:
|
||||||
|
"""If exactly one *.leann.meta.json exists, return its base path (without .meta.json)."""
|
||||||
|
candidates = list(index_dir.glob("*.leann.meta.json"))
|
||||||
|
if len(candidates) == 1:
|
||||||
|
meta = candidates[0]
|
||||||
|
base = meta.with_suffix("") # remove .json
|
||||||
|
base = base.with_suffix("") # remove .meta
|
||||||
|
return base
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _load_offset_map(offsets_file: Path) -> dict[str, int]:
|
||||||
|
if not offsets_file.exists():
|
||||||
|
return {}
|
||||||
|
with open(offsets_file, "rb") as f:
|
||||||
|
return pickle.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
def _next_numeric_id(existing_ids: list[str]) -> int:
|
||||||
|
numeric_ids = [int(x) for x in existing_ids if x.isdigit()]
|
||||||
|
if not numeric_ids:
|
||||||
|
return 0
|
||||||
|
return max(numeric_ids) + 1
|
||||||
|
|
||||||
|
|
||||||
|
def build_base_index(
|
||||||
|
base_dir: str,
|
||||||
|
index_dir: str,
|
||||||
|
index_name: str,
|
||||||
|
embedding_model: str,
|
||||||
|
embedding_mode: str,
|
||||||
|
chunk_size: int,
|
||||||
|
chunk_overlap: int,
|
||||||
|
file_types: Optional[list[str]] = None,
|
||||||
|
max_items: int = -1,
|
||||||
|
ef_construction: Optional[int] = None,
|
||||||
|
recompute_build: bool = False,
|
||||||
|
) -> str:
|
||||||
|
print(f"Building base index from: {base_dir}")
|
||||||
|
documents = _load_documents(base_dir, required_exts=file_types)
|
||||||
|
if not documents:
|
||||||
|
raise ValueError(f"No documents found in base_dir: {base_dir}")
|
||||||
|
|
||||||
|
from chunking import create_text_chunks
|
||||||
|
|
||||||
|
texts = create_text_chunks(
|
||||||
|
documents,
|
||||||
|
chunk_size=chunk_size,
|
||||||
|
chunk_overlap=chunk_overlap,
|
||||||
|
use_ast_chunking=False,
|
||||||
|
)
|
||||||
|
if max_items > 0 and len(texts) > max_items:
|
||||||
|
texts = texts[:max_items]
|
||||||
|
print(f"Limiting to {max_items} chunks")
|
||||||
|
|
||||||
|
index_dir_path = Path(index_dir)
|
||||||
|
_ensure_index_dir(index_dir_path)
|
||||||
|
index_path = index_dir_path / index_name
|
||||||
|
|
||||||
|
print("Creating HNSW index (non-compact)...")
|
||||||
|
from leann.api import LeannBuilder
|
||||||
|
from leann.registry import register_project_directory
|
||||||
|
|
||||||
|
builder = LeannBuilder(
|
||||||
|
backend_name="hnsw",
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
embedding_mode=embedding_mode,
|
||||||
|
is_recompute=recompute_build,
|
||||||
|
is_compact=False,
|
||||||
|
efConstruction=(ef_construction if ef_construction is not None else 200),
|
||||||
|
)
|
||||||
|
for t in texts:
|
||||||
|
builder.add_text(t)
|
||||||
|
builder.build_index(str(index_path))
|
||||||
|
|
||||||
|
# Register for discovery
|
||||||
|
register_project_directory(Path.cwd())
|
||||||
|
|
||||||
|
print(f"Base index built at: {index_path}")
|
||||||
|
return str(index_path)
|
||||||
|
|
||||||
|
|
||||||
|
def add_incremental(
|
||||||
|
add_dir: str,
|
||||||
|
index_dir: str,
|
||||||
|
index_name: Optional[str] = None,
|
||||||
|
embedding_model: Optional[str] = None,
|
||||||
|
embedding_mode: Optional[str] = None,
|
||||||
|
chunk_size: int = 256,
|
||||||
|
chunk_overlap: int = 128,
|
||||||
|
file_types: Optional[list[str]] = None,
|
||||||
|
max_items: int = -1,
|
||||||
|
ef_construction: Optional[int] = None,
|
||||||
|
recompute_add: bool = False,
|
||||||
|
) -> str:
|
||||||
|
print(f"Adding incremental data from: {add_dir}")
|
||||||
|
index_dir_path = Path(index_dir)
|
||||||
|
index_path = index_dir_path / (index_name or "documents.leann")
|
||||||
|
|
||||||
|
# If specified base doesn't exist, try to auto-detect an existing base
|
||||||
|
try:
|
||||||
|
_read_meta(index_path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
auto_base = _autodetect_index_base(index_dir_path)
|
||||||
|
if auto_base is not None:
|
||||||
|
print(f"Auto-detected index base: {auto_base.name}")
|
||||||
|
index_path = auto_base
|
||||||
|
_read_meta(index_path)
|
||||||
|
else:
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"No index metadata found for base '{index_path.name}'. Build base first with --build-base "
|
||||||
|
f"or provide --index-name to match an existing index (e.g., 'test_doc_files.leann')."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare validated context from core (checks backend/no-recompute and resolves embedding defaults)
|
||||||
|
from leann.api import create_incremental_add_context, incremental_add_texts_with_context
|
||||||
|
|
||||||
|
ctx = create_incremental_add_context(
|
||||||
|
str(index_path),
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
embedding_mode=embedding_mode,
|
||||||
|
data_dir=add_dir,
|
||||||
|
required_exts=file_types,
|
||||||
|
chunk_size=chunk_size,
|
||||||
|
chunk_overlap=chunk_overlap,
|
||||||
|
max_items=max_items,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use prepared texts from context to perform the add
|
||||||
|
prepared_texts = ctx.prepared_texts or []
|
||||||
|
if not prepared_texts:
|
||||||
|
print("No new chunks to add.")
|
||||||
|
return str(index_path)
|
||||||
|
|
||||||
|
added = incremental_add_texts_with_context(
|
||||||
|
ctx,
|
||||||
|
prepared_texts,
|
||||||
|
ef_construction=ef_construction,
|
||||||
|
recompute=recompute_add,
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Incremental add completed. Added {added} chunks. Index: {index_path}")
|
||||||
|
return str(index_path)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Dynamic add to LEANN HNSW index without recompute",
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument("--build-base", action="store_true", help="Build base index")
|
||||||
|
parser.add_argument("--add-incremental", action="store_true", help="Add incremental data")
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--base-dir",
|
||||||
|
type=str,
|
||||||
|
default="/Users/yichuan/Desktop/code/LEANN/leann/data",
|
||||||
|
help="Base data directory",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--add-dir",
|
||||||
|
type=str,
|
||||||
|
default="/Users/yichuan/Desktop/code/LEANN/leann/test_add",
|
||||||
|
help="Incremental data directory",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--index-dir",
|
||||||
|
type=str,
|
||||||
|
default="./test_doc_files",
|
||||||
|
help="Directory containing the index",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--index-name",
|
||||||
|
type=str,
|
||||||
|
default="documents.leann",
|
||||||
|
help=(
|
||||||
|
"Index base file name. If you built via document_rag.py, use 'test_doc_files.leann'. "
|
||||||
|
"Default: documents.leann"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--embedding-model",
|
||||||
|
type=str,
|
||||||
|
default="facebook/contriever",
|
||||||
|
help="Embedding model name",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--embedding-mode",
|
||||||
|
type=str,
|
||||||
|
default="sentence-transformers",
|
||||||
|
choices=["sentence-transformers", "openai", "mlx", "ollama"],
|
||||||
|
help="Embedding backend mode",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument("--chunk-size", type=int, default=256)
|
||||||
|
parser.add_argument("--chunk-overlap", type=int, default=128)
|
||||||
|
parser.add_argument("--file-types", nargs="+", default=None)
|
||||||
|
parser.add_argument("--max-items", type=int, default=-1)
|
||||||
|
parser.add_argument("--ef-construction", type=int, default=32)
|
||||||
|
parser.add_argument(
|
||||||
|
"--recompute-add", action="store_true", help="Enable recompute-mode add (non-compact only)"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--recompute-build",
|
||||||
|
action="store_true",
|
||||||
|
help="Enable recompute-mode base build (non-compact only)",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not args.build_base and not args.add_incremental:
|
||||||
|
print("Nothing to do. Use --build-base and/or --add-incremental.")
|
||||||
|
return
|
||||||
|
|
||||||
|
index_path_str: Optional[str] = None
|
||||||
|
|
||||||
|
if args.build_base:
|
||||||
|
index_path_str = build_base_index(
|
||||||
|
base_dir=args.base_dir,
|
||||||
|
index_dir=args.index_dir,
|
||||||
|
index_name=args.index_name,
|
||||||
|
embedding_model=args.embedding_model,
|
||||||
|
embedding_mode=args.embedding_mode,
|
||||||
|
chunk_size=args.chunk_size,
|
||||||
|
chunk_overlap=args.chunk_overlap,
|
||||||
|
file_types=args.file_types,
|
||||||
|
max_items=args.max_items,
|
||||||
|
ef_construction=args.ef_construction,
|
||||||
|
recompute_build=args.recompute_build,
|
||||||
|
)
|
||||||
|
|
||||||
|
if args.add_incremental:
|
||||||
|
index_path_str = add_incremental(
|
||||||
|
add_dir=args.add_dir,
|
||||||
|
index_dir=args.index_dir,
|
||||||
|
index_name=args.index_name,
|
||||||
|
embedding_model=args.embedding_model,
|
||||||
|
embedding_mode=args.embedding_mode,
|
||||||
|
chunk_size=args.chunk_size,
|
||||||
|
chunk_overlap=args.chunk_overlap,
|
||||||
|
file_types=args.file_types,
|
||||||
|
max_items=args.max_items,
|
||||||
|
ef_construction=args.ef_construction,
|
||||||
|
recompute_add=args.recompute_add,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Optional: quick test query using searcher
|
||||||
|
if index_path_str:
|
||||||
|
try:
|
||||||
|
from leann.api import LeannSearcher
|
||||||
|
|
||||||
|
searcher = LeannSearcher(index_path_str)
|
||||||
|
query = "what is LEANN?"
|
||||||
|
if args.add_incremental:
|
||||||
|
query = "what is the multi vector search and how it works?"
|
||||||
|
results = searcher.search(query, top_k=5)
|
||||||
|
if results:
|
||||||
|
print(f"Sample result: {results[0].text[:80]}...")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -15,6 +15,7 @@ from leann.registry import register_backend
|
|||||||
from leann.searcher_base import BaseSearcher
|
from leann.searcher_base import BaseSearcher
|
||||||
|
|
||||||
from .convert_to_csr import convert_hnsw_graph_to_csr
|
from .convert_to_csr import convert_hnsw_graph_to_csr
|
||||||
|
from .prune_index import prune_embeddings_preserve_graph_inplace
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -90,8 +91,16 @@ class HNSWBuilder(LeannBackendBuilderInterface):
|
|||||||
index_file = index_dir / f"{index_prefix}.index"
|
index_file = index_dir / f"{index_prefix}.index"
|
||||||
faiss.write_index(index, str(index_file))
|
faiss.write_index(index, str(index_file))
|
||||||
|
|
||||||
if self.is_compact:
|
if self.is_recompute:
|
||||||
self._convert_to_csr(index_file)
|
if self.is_compact:
|
||||||
|
self._convert_to_csr(index_file)
|
||||||
|
else:
|
||||||
|
# Non-compact format: prune only embeddings, keep original graph
|
||||||
|
ok = prune_embeddings_preserve_graph_inplace(str(index_file))
|
||||||
|
if not ok:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Pruning embeddings while preserving graph failed for non-compact index"
|
||||||
|
)
|
||||||
|
|
||||||
def _convert_to_csr(self, index_file: Path):
|
def _convert_to_csr(self, index_file: Path):
|
||||||
"""Convert built index to CSR format"""
|
"""Convert built index to CSR format"""
|
||||||
@@ -148,7 +157,13 @@ class HNSWSearcher(BaseSearcher):
|
|||||||
self.is_pruned
|
self.is_pruned
|
||||||
) # In C++ code, it's called is_recompute, but it's only for loading IIUC.
|
) # In C++ code, it's called is_recompute, but it's only for loading IIUC.
|
||||||
|
|
||||||
self._index = faiss.read_index(str(index_file), faiss.IO_FLAG_MMAP, hnsw_config)
|
# If pruned (recompute mode), explicitly skip storage to avoid reading
|
||||||
|
# the pruned section. Still allow MMAP for graph.
|
||||||
|
io_flags = faiss.IO_FLAG_MMAP
|
||||||
|
if self.is_pruned:
|
||||||
|
io_flags |= faiss.IO_FLAG_SKIP_STORAGE
|
||||||
|
|
||||||
|
self._index = faiss.read_index(str(index_file), io_flags, hnsw_config)
|
||||||
|
|
||||||
def search(
|
def search(
|
||||||
self,
|
self,
|
||||||
@@ -251,3 +266,55 @@ class HNSWSearcher(BaseSearcher):
|
|||||||
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
|
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
|
||||||
|
|
||||||
return {"labels": string_labels, "distances": distances}
|
return {"labels": string_labels, "distances": distances}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- Helper API for incremental add (Python-level) ----------
|
||||||
|
def add_vectors(
|
||||||
|
index_file_path: str,
|
||||||
|
embeddings: np.ndarray,
|
||||||
|
*,
|
||||||
|
ef_construction: Optional[int] = None,
|
||||||
|
recompute: bool = False,
|
||||||
|
) -> None:
|
||||||
|
"""Append vectors to an existing non-compact HNSW index.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
index_file_path: Path to the HNSW .index file
|
||||||
|
embeddings: float32 numpy array (N, D)
|
||||||
|
ef_construction: Optional override for efConstruction during insertion
|
||||||
|
recompute: Reserved for future use to control insertion-time recompute behaviors
|
||||||
|
"""
|
||||||
|
from . import faiss # type: ignore
|
||||||
|
|
||||||
|
if embeddings.dtype != np.float32:
|
||||||
|
embeddings = embeddings.astype(np.float32)
|
||||||
|
if not embeddings.flags.c_contiguous:
|
||||||
|
embeddings = np.ascontiguousarray(embeddings, dtype=np.float32)
|
||||||
|
|
||||||
|
# Load index normally to ensure storage is present; toggle is_recompute on the object
|
||||||
|
index = faiss.read_index(str(index_file_path), faiss.IO_FLAG_MMAP)
|
||||||
|
|
||||||
|
# Best-effort: explicitly set flag on the object if the binding exposes it
|
||||||
|
try:
|
||||||
|
index.is_recompute = bool(recompute)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
if ef_construction is not None:
|
||||||
|
index.hnsw.efConstruction = int(ef_construction)
|
||||||
|
except Exception:
|
||||||
|
# Best-effort; ignore if backend doesn't expose setter
|
||||||
|
pass
|
||||||
|
|
||||||
|
# For non-compact HNSW, calling add directly is sufficient. When is_recompute is set
|
||||||
|
# (via config or attribute), FAISS will run the insertion/search path accordingly.
|
||||||
|
# To strictly follow per-point insert semantics in recompute mode, add one-by-one.
|
||||||
|
if recompute:
|
||||||
|
# Insert row by row
|
||||||
|
n = embeddings.shape[0]
|
||||||
|
for i in range(n):
|
||||||
|
row = embeddings[i : i + 1]
|
||||||
|
index.add(1, faiss.swig_ptr(row))
|
||||||
|
else:
|
||||||
|
index.add(embeddings.shape[0], faiss.swig_ptr(embeddings))
|
||||||
|
faiss.write_index(index, str(index_file_path))
|
||||||
|
|||||||
149
packages/leann-backend-hnsw/leann_backend_hnsw/prune_index.py
Normal file
149
packages/leann-backend-hnsw/leann_backend_hnsw/prune_index.py
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
import os
|
||||||
|
import struct
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from .convert_to_csr import (
|
||||||
|
EXPECTED_HNSW_FOURCCS,
|
||||||
|
NULL_INDEX_FOURCC,
|
||||||
|
read_struct,
|
||||||
|
read_vector_raw,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_vector_raw(f_out, count: int, data_bytes: bytes) -> None:
|
||||||
|
"""Write a vector in the same binary layout as read_vector_raw reads: <Q count> + raw bytes."""
|
||||||
|
f_out.write(struct.pack("<Q", count))
|
||||||
|
if count > 0 and data_bytes:
|
||||||
|
f_out.write(data_bytes)
|
||||||
|
|
||||||
|
|
||||||
|
def prune_embeddings_preserve_graph(input_filename: str, output_filename: str) -> bool:
|
||||||
|
"""
|
||||||
|
Copy an original (non-compact) HNSW index file while pruning the trailing embedding storage.
|
||||||
|
Preserves the graph structure and metadata exactly; only writes a NULL storage marker instead of
|
||||||
|
the original storage fourcc and payload.
|
||||||
|
|
||||||
|
Returns True on success.
|
||||||
|
"""
|
||||||
|
print(f"Pruning embeddings from {input_filename} to {output_filename}")
|
||||||
|
print("--------------------------------")
|
||||||
|
# running in mode is-recompute=True and is-compact=False
|
||||||
|
in_path = Path(input_filename)
|
||||||
|
out_path = Path(output_filename)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(in_path, "rb") as f_in, open(out_path, "wb") as f_out:
|
||||||
|
# Header
|
||||||
|
index_fourcc = read_struct(f_in, "<I")
|
||||||
|
if index_fourcc not in EXPECTED_HNSW_FOURCCS:
|
||||||
|
# Still proceed, but this is unexpected
|
||||||
|
pass
|
||||||
|
f_out.write(struct.pack("<I", index_fourcc))
|
||||||
|
|
||||||
|
d = read_struct(f_in, "<i")
|
||||||
|
ntotal_hdr = read_struct(f_in, "<q")
|
||||||
|
dummy1 = read_struct(f_in, "<q")
|
||||||
|
dummy2 = read_struct(f_in, "<q")
|
||||||
|
is_trained = read_struct(f_in, "?")
|
||||||
|
metric_type = read_struct(f_in, "<i")
|
||||||
|
f_out.write(struct.pack("<i", d))
|
||||||
|
f_out.write(struct.pack("<q", ntotal_hdr))
|
||||||
|
f_out.write(struct.pack("<q", dummy1))
|
||||||
|
f_out.write(struct.pack("<q", dummy2))
|
||||||
|
f_out.write(struct.pack("<?", is_trained))
|
||||||
|
f_out.write(struct.pack("<i", metric_type))
|
||||||
|
|
||||||
|
if metric_type > 1:
|
||||||
|
metric_arg = read_struct(f_in, "<f")
|
||||||
|
f_out.write(struct.pack("<f", metric_arg))
|
||||||
|
|
||||||
|
# Vectors: assign_probas (double), cum_nneighbor_per_level (int32), levels (int32)
|
||||||
|
cnt, data = read_vector_raw(f_in, "d")
|
||||||
|
_write_vector_raw(f_out, cnt, data)
|
||||||
|
|
||||||
|
cnt, data = read_vector_raw(f_in, "i")
|
||||||
|
_write_vector_raw(f_out, cnt, data)
|
||||||
|
|
||||||
|
cnt, data = read_vector_raw(f_in, "i")
|
||||||
|
_write_vector_raw(f_out, cnt, data)
|
||||||
|
|
||||||
|
# Probe potential extra alignment/flag byte present in some original formats
|
||||||
|
probe = f_in.read(1)
|
||||||
|
if probe:
|
||||||
|
if probe == b"\x00":
|
||||||
|
# Preserve this unexpected 0x00 byte
|
||||||
|
f_out.write(probe)
|
||||||
|
else:
|
||||||
|
# Likely part of the next vector; rewind
|
||||||
|
f_in.seek(-1, os.SEEK_CUR)
|
||||||
|
|
||||||
|
# Offsets (uint64) and neighbors (int32)
|
||||||
|
cnt, data = read_vector_raw(f_in, "Q")
|
||||||
|
_write_vector_raw(f_out, cnt, data)
|
||||||
|
|
||||||
|
cnt, data = read_vector_raw(f_in, "i")
|
||||||
|
_write_vector_raw(f_out, cnt, data)
|
||||||
|
|
||||||
|
# Scalar params
|
||||||
|
entry_point = read_struct(f_in, "<i")
|
||||||
|
max_level = read_struct(f_in, "<i")
|
||||||
|
ef_construction = read_struct(f_in, "<i")
|
||||||
|
ef_search = read_struct(f_in, "<i")
|
||||||
|
dummy_upper_beam = read_struct(f_in, "<i")
|
||||||
|
f_out.write(struct.pack("<i", entry_point))
|
||||||
|
f_out.write(struct.pack("<i", max_level))
|
||||||
|
f_out.write(struct.pack("<i", ef_construction))
|
||||||
|
f_out.write(struct.pack("<i", ef_search))
|
||||||
|
f_out.write(struct.pack("<i", dummy_upper_beam))
|
||||||
|
|
||||||
|
# Storage fourcc (if present) — write NULL marker and drop any remaining data
|
||||||
|
try:
|
||||||
|
read_struct(f_in, "<I")
|
||||||
|
# Regardless of original, write NULL
|
||||||
|
f_out.write(struct.pack("<I", NULL_INDEX_FOURCC))
|
||||||
|
# Discard the rest of the file (embedding payload)
|
||||||
|
# (Do not copy anything else)
|
||||||
|
except EOFError:
|
||||||
|
# No storage section; nothing else to write
|
||||||
|
pass
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
# Best-effort cleanup
|
||||||
|
try:
|
||||||
|
if out_path.exists():
|
||||||
|
out_path.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def prune_embeddings_preserve_graph_inplace(index_file_path: str) -> bool:
|
||||||
|
"""
|
||||||
|
Convenience wrapper: write pruned file to a temporary path next to the
|
||||||
|
original, then atomically replace on success.
|
||||||
|
"""
|
||||||
|
print(f"Pruning embeddings from {index_file_path} to {index_file_path}")
|
||||||
|
print("--------------------------------")
|
||||||
|
# running in mode is-recompute=True and is-compact=False
|
||||||
|
src = Path(index_file_path)
|
||||||
|
tmp = src.with_suffix(".pruned.tmp")
|
||||||
|
ok = prune_embeddings_preserve_graph(str(src), str(tmp))
|
||||||
|
if not ok:
|
||||||
|
if tmp.exists():
|
||||||
|
try:
|
||||||
|
tmp.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
os.replace(str(tmp), str(src))
|
||||||
|
except Exception:
|
||||||
|
# Rollback on failure
|
||||||
|
try:
|
||||||
|
if tmp.exists():
|
||||||
|
tmp.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
return True
|
||||||
Submodule packages/leann-backend-hnsw/third_party/faiss updated: ed96ff7dba...ea86d06ceb
@@ -5,6 +5,7 @@ with the correct, original embedding logic from the user's reference code.
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
@@ -19,6 +20,7 @@ import numpy as np
|
|||||||
from leann.interface import LeannBackendSearcherInterface
|
from leann.interface import LeannBackendSearcherInterface
|
||||||
|
|
||||||
from .chat import get_llm
|
from .chat import get_llm
|
||||||
|
from .embedding_server_manager import EmbeddingServerManager
|
||||||
from .interface import LeannBackendFactoryInterface
|
from .interface import LeannBackendFactoryInterface
|
||||||
from .metadata_filter import MetadataFilterEngine
|
from .metadata_filter import MetadataFilterEngine
|
||||||
from .registry import BACKEND_REGISTRY
|
from .registry import BACKEND_REGISTRY
|
||||||
@@ -118,6 +120,20 @@ class SearchResult:
|
|||||||
metadata: dict[str, Any] = field(default_factory=dict)
|
metadata: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class IncrementalAddContext:
|
||||||
|
"""Prepared context for safe incremental add operations on an index."""
|
||||||
|
|
||||||
|
index_path: str
|
||||||
|
passages_file: Path
|
||||||
|
offsets_file: Path
|
||||||
|
vector_index_file: Path
|
||||||
|
embedding_model: str
|
||||||
|
embedding_mode: str
|
||||||
|
distance_metric: str
|
||||||
|
prepared_texts: Optional[list[str]] = None
|
||||||
|
|
||||||
|
|
||||||
class PassageManager:
|
class PassageManager:
|
||||||
def __init__(
|
def __init__(
|
||||||
self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None
|
self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None
|
||||||
@@ -476,9 +492,7 @@ class LeannBuilder:
|
|||||||
is_compact = self.backend_kwargs.get("is_compact", True)
|
is_compact = self.backend_kwargs.get("is_compact", True)
|
||||||
is_recompute = self.backend_kwargs.get("is_recompute", True)
|
is_recompute = self.backend_kwargs.get("is_recompute", True)
|
||||||
meta_data["is_compact"] = is_compact
|
meta_data["is_compact"] = is_compact
|
||||||
meta_data["is_pruned"] = (
|
meta_data["is_pruned"] = is_recompute # Pruned only if compact and recompute
|
||||||
is_compact and is_recompute
|
|
||||||
) # Pruned only if compact and recompute
|
|
||||||
with open(leann_meta_path, "w", encoding="utf-8") as f:
|
with open(leann_meta_path, "w", encoding="utf-8") as f:
|
||||||
json.dump(meta_data, f, indent=2)
|
json.dump(meta_data, f, indent=2)
|
||||||
|
|
||||||
@@ -1018,8 +1032,405 @@ class LeannChat:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
|
# ------------------------------
|
||||||
|
# Incremental Add Utilities (HNSW no-recompute only)
|
||||||
|
# ------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_index_paths(index_path: str) -> tuple[Path, Path, Path]:
|
||||||
|
"""Given base index path (without extension), return (passages.jsonl, passages.idx, vector.index).
|
||||||
|
|
||||||
|
For HNSW, vector index file is typically <stem>.index (e.g., documents.index) even when base is
|
||||||
|
'documents.leann'. We prefer an existing <stem>.index, otherwise fall back to <name>.index.
|
||||||
|
"""
|
||||||
|
base = Path(index_path)
|
||||||
|
passages_file = base.parent / f"{base.name}.passages.jsonl"
|
||||||
|
offsets_file = base.parent / f"{base.name}.passages.idx"
|
||||||
|
candidate_name_index = base.parent / f"{base.name}.index"
|
||||||
|
candidate_stem_index = base.parent / f"{base.stem}.index"
|
||||||
|
vector_index_file = (
|
||||||
|
candidate_stem_index if candidate_stem_index.exists() else candidate_name_index
|
||||||
|
)
|
||||||
|
return passages_file, offsets_file, vector_index_file
|
||||||
|
|
||||||
|
|
||||||
|
def _read_meta_file(index_path: str) -> dict[str, Any]:
|
||||||
|
meta_path = Path(f"{index_path}.meta.json")
|
||||||
|
if not meta_path.exists():
|
||||||
|
raise FileNotFoundError(f"Leann metadata file not found: {meta_path}")
|
||||||
|
with open(meta_path, encoding="utf-8") as f:
|
||||||
|
return json.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_offset_map_pickle(offsets_file: Path) -> dict[str, int]:
|
||||||
|
if not offsets_file.exists():
|
||||||
|
return {}
|
||||||
|
with open(offsets_file, "rb") as f:
|
||||||
|
return pickle.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
def _append_passages_and_update_offsets(
|
||||||
|
passages_file: Path, offsets_file: Path, new_texts: list[str]
|
||||||
|
) -> list[str]:
|
||||||
|
"""Append new texts to passages file, update offset map, and return assigned string IDs.
|
||||||
|
|
||||||
|
IDs are assigned as incrementing integers based on existing keys in the offset map.
|
||||||
|
"""
|
||||||
|
offset_map = _load_offset_map_pickle(offsets_file)
|
||||||
|
# Compute next numeric id
|
||||||
|
numeric_ids = [int(x) for x in offset_map.keys() if str(x).isdigit()]
|
||||||
|
next_id_num = (max(numeric_ids) + 1) if numeric_ids else 0
|
||||||
|
assigned_ids: list[str] = []
|
||||||
|
|
||||||
|
with open(passages_file, "a", encoding="utf-8") as f:
|
||||||
|
for text in new_texts:
|
||||||
|
offset = f.tell()
|
||||||
|
str_id = str(next_id_num)
|
||||||
|
json.dump({"id": str_id, "text": text, "metadata": {}}, f, ensure_ascii=False)
|
||||||
|
f.write("\n")
|
||||||
|
offset_map[str_id] = offset
|
||||||
|
assigned_ids.append(str_id)
|
||||||
|
next_id_num += 1
|
||||||
|
|
||||||
|
with open(offsets_file, "wb") as f:
|
||||||
|
pickle.dump(offset_map, f)
|
||||||
|
|
||||||
|
return assigned_ids
|
||||||
|
|
||||||
|
|
||||||
|
def incremental_add_texts(
|
||||||
|
index_path: str,
|
||||||
|
texts: list[str],
|
||||||
|
*,
|
||||||
|
embedding_model: Optional[str] = None,
|
||||||
|
embedding_mode: Optional[str] = None,
|
||||||
|
ef_construction: Optional[int] = None,
|
||||||
|
recompute: bool = False,
|
||||||
|
) -> int:
|
||||||
|
"""Incrementally add text chunks to an existing HNSW index built with no-recompute.
|
||||||
|
|
||||||
|
- Validates backend is HNSW and index is non-compact (no-recompute path)
|
||||||
|
- Appends passages and offsets
|
||||||
|
- Computes embeddings and appends to the HNSW vector index
|
||||||
|
|
||||||
|
Returns number of added chunks.
|
||||||
|
"""
|
||||||
|
if not texts:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
meta = _read_meta_file(index_path)
|
||||||
|
if meta.get("backend_name") != "hnsw":
|
||||||
|
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
|
||||||
|
if meta.get("is_compact", True):
|
||||||
|
raise RuntimeError(
|
||||||
|
"Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add."
|
||||||
|
)
|
||||||
|
|
||||||
|
passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path)
|
||||||
|
if not vector_index_file.exists():
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Resolve embedding config from meta if not provided
|
||||||
|
model_name = embedding_model or meta.get("embedding_model", "facebook/contriever")
|
||||||
|
mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers")
|
||||||
|
|
||||||
|
# Append passages and update offsets
|
||||||
|
assigned_ids = _append_passages_and_update_offsets(passages_file, offsets_file, texts)
|
||||||
|
|
||||||
|
# Compute embeddings
|
||||||
|
# Embedding computation path
|
||||||
|
esm = None
|
||||||
|
port = None
|
||||||
|
if recompute:
|
||||||
|
# Determine distance metric early for server config
|
||||||
|
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
|
||||||
|
# Start embedding server and compute via ZMQ for consistency with recompute semantics
|
||||||
|
passages_source_file = f"{index_path}.meta.json"
|
||||||
|
esm = EmbeddingServerManager(
|
||||||
|
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
|
||||||
|
)
|
||||||
|
started, port = esm.start_server(
|
||||||
|
port=5557,
|
||||||
|
model_name=model_name,
|
||||||
|
embedding_mode=mode_name,
|
||||||
|
passages_file=passages_source_file,
|
||||||
|
distance_metric=distance_metric,
|
||||||
|
enable_warmup=False,
|
||||||
|
)
|
||||||
|
if not started:
|
||||||
|
raise RuntimeError("Failed to start embedding server for recompute add")
|
||||||
|
embeddings = compute_embeddings_via_server(texts, model_name, port)
|
||||||
|
else:
|
||||||
|
embeddings = compute_embeddings(
|
||||||
|
texts,
|
||||||
|
model_name=model_name,
|
||||||
|
mode=mode_name,
|
||||||
|
use_server=False,
|
||||||
|
is_build=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Normalize for cosine if needed
|
||||||
|
if "distance_metric" not in locals():
|
||||||
|
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
|
||||||
|
if distance_metric == "cosine":
|
||||||
|
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
|
||||||
|
norms[norms == 0] = 1
|
||||||
|
embeddings = embeddings / norms
|
||||||
|
|
||||||
|
# Append via backend helper (supports ef_construction/recompute plumbing)
|
||||||
|
try:
|
||||||
|
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
|
||||||
|
) from e
|
||||||
|
|
||||||
|
# Propagate ZMQ port to FAISS add path when recompute is True
|
||||||
|
if recompute and port is not None:
|
||||||
|
os.environ["LEANN_ZMQ_PORT"] = str(port)
|
||||||
|
|
||||||
|
hnsw_add_vectors(
|
||||||
|
str(vector_index_file),
|
||||||
|
embeddings,
|
||||||
|
ef_construction=ef_construction,
|
||||||
|
recompute=recompute,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Stop server after add when recompute path used
|
||||||
|
if esm is not None:
|
||||||
try:
|
try:
|
||||||
self.cleanup()
|
esm.stop_server()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Sanity: ids length should match embeddings rows
|
||||||
|
if len(assigned_ids) != embeddings.shape[0]:
|
||||||
|
warnings.warn(
|
||||||
|
f"Assigned {len(assigned_ids)} IDs but computed {embeddings.shape[0]} embeddings.",
|
||||||
|
UserWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(assigned_ids)
|
||||||
|
|
||||||
|
|
||||||
|
def create_incremental_add_context(
|
||||||
|
index_path: str,
|
||||||
|
*,
|
||||||
|
# Optional embedding choices; if None will use meta
|
||||||
|
embedding_model: Optional[str] = None,
|
||||||
|
embedding_mode: Optional[str] = None,
|
||||||
|
# Optional data-to-text preparation in context
|
||||||
|
data_dir: Optional[str] = None,
|
||||||
|
required_exts: Optional[list[str]] = None,
|
||||||
|
chunk_size: int = 256,
|
||||||
|
chunk_overlap: int = 128,
|
||||||
|
max_items: int = -1,
|
||||||
|
) -> IncrementalAddContext:
|
||||||
|
"""Validate index and prepare context for repeated incremental adds.
|
||||||
|
|
||||||
|
Additionally, if data_dir is provided, this function will load documents,
|
||||||
|
chunk them to texts with the specified parameters, and store them in ctx.prepared_texts.
|
||||||
|
"""
|
||||||
|
meta = _read_meta_file(index_path)
|
||||||
|
if meta.get("backend_name") != "hnsw":
|
||||||
|
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
|
||||||
|
if meta.get("is_compact", True):
|
||||||
|
raise RuntimeError(
|
||||||
|
"Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add."
|
||||||
|
)
|
||||||
|
|
||||||
|
passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path)
|
||||||
|
if not vector_index_file.exists():
|
||||||
|
raise FileNotFoundError(
|
||||||
|
f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder."
|
||||||
|
)
|
||||||
|
|
||||||
|
model_name = embedding_model or meta.get("embedding_model", "facebook/contriever")
|
||||||
|
mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers")
|
||||||
|
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
|
||||||
|
|
||||||
|
prepared_texts: Optional[list[str]] = None
|
||||||
|
if data_dir is not None:
|
||||||
|
try:
|
||||||
|
from llama_index.core import SimpleDirectoryReader # type: ignore
|
||||||
|
from llama_index.core.node_parser import SentenceSplitter # type: ignore
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(
|
||||||
|
"llama-index-core is required when using data_dir in create_incremental_add_context"
|
||||||
|
) from e
|
||||||
|
|
||||||
|
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
|
||||||
|
if required_exts:
|
||||||
|
reader_kwargs["required_exts"] = required_exts
|
||||||
|
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
|
||||||
|
if documents:
|
||||||
|
splitter = SentenceSplitter(
|
||||||
|
chunk_size=chunk_size,
|
||||||
|
chunk_overlap=chunk_overlap,
|
||||||
|
separator=" ",
|
||||||
|
paragraph_separator="\n\n",
|
||||||
|
)
|
||||||
|
prepared_texts = []
|
||||||
|
for doc in documents:
|
||||||
|
try:
|
||||||
|
nodes = splitter.get_nodes_from_documents([doc])
|
||||||
|
if nodes:
|
||||||
|
prepared_texts.extend([node.get_content() for node in nodes])
|
||||||
|
except Exception:
|
||||||
|
content = doc.get_content()
|
||||||
|
if content and content.strip():
|
||||||
|
prepared_texts.append(content.strip())
|
||||||
|
if max_items > 0 and len(prepared_texts) > max_items:
|
||||||
|
prepared_texts = prepared_texts[:max_items]
|
||||||
|
|
||||||
|
return IncrementalAddContext(
|
||||||
|
index_path=index_path,
|
||||||
|
passages_file=passages_file,
|
||||||
|
offsets_file=offsets_file,
|
||||||
|
vector_index_file=vector_index_file,
|
||||||
|
embedding_model=model_name,
|
||||||
|
embedding_mode=mode_name,
|
||||||
|
distance_metric=distance_metric,
|
||||||
|
prepared_texts=prepared_texts,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def incremental_add_texts_with_context(
|
||||||
|
ctx: IncrementalAddContext,
|
||||||
|
texts: list[str],
|
||||||
|
*,
|
||||||
|
ef_construction: Optional[int] = None,
|
||||||
|
recompute: bool = False,
|
||||||
|
) -> int:
|
||||||
|
"""Incrementally add texts using a prepared context (no repeated validation).
|
||||||
|
|
||||||
|
For non-compact HNSW, ef_construction (efConstruction) can be overridden during insertion.
|
||||||
|
"""
|
||||||
|
if not texts:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Append passages & offsets
|
||||||
|
_append_passages_and_update_offsets(ctx.passages_file, ctx.offsets_file, texts)
|
||||||
|
|
||||||
|
# Compute embeddings
|
||||||
|
# Embedding computation path
|
||||||
|
esm = None
|
||||||
|
port = None
|
||||||
|
if recompute:
|
||||||
|
passages_source_file = f"{ctx.index_path}.meta.json"
|
||||||
|
esm = EmbeddingServerManager(
|
||||||
|
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
|
||||||
|
)
|
||||||
|
started, port = esm.start_server(
|
||||||
|
port=5557,
|
||||||
|
model_name=ctx.embedding_model,
|
||||||
|
embedding_mode=ctx.embedding_mode,
|
||||||
|
passages_file=passages_source_file,
|
||||||
|
distance_metric=ctx.distance_metric,
|
||||||
|
enable_warmup=False,
|
||||||
|
)
|
||||||
|
if not started:
|
||||||
|
raise RuntimeError("Failed to start embedding server for recompute add")
|
||||||
|
embeddings = compute_embeddings_via_server(texts, ctx.embedding_model, port)
|
||||||
|
else:
|
||||||
|
embeddings = compute_embeddings(
|
||||||
|
texts,
|
||||||
|
model_name=ctx.embedding_model,
|
||||||
|
mode=ctx.embedding_mode,
|
||||||
|
use_server=False,
|
||||||
|
is_build=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Normalize for cosine if needed
|
||||||
|
if ctx.distance_metric == "cosine":
|
||||||
|
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
|
||||||
|
norms[norms == 0] = 1
|
||||||
|
embeddings = embeddings / norms
|
||||||
|
|
||||||
|
# Append via backend helper (supports ef_construction/recompute plumbing)
|
||||||
|
try:
|
||||||
|
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
|
||||||
|
) from e
|
||||||
|
|
||||||
|
if recompute and port is not None:
|
||||||
|
os.environ["LEANN_ZMQ_PORT"] = str(port)
|
||||||
|
|
||||||
|
hnsw_add_vectors(
|
||||||
|
str(ctx.vector_index_file),
|
||||||
|
embeddings,
|
||||||
|
ef_construction=ef_construction,
|
||||||
|
recompute=recompute,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Stop server after add when recompute path used
|
||||||
|
if esm is not None:
|
||||||
|
try:
|
||||||
|
esm.stop_server()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return embeddings.shape[0]
|
||||||
|
|
||||||
|
|
||||||
|
def incremental_add_directory(
|
||||||
|
index_path: str,
|
||||||
|
data_dir: str,
|
||||||
|
*,
|
||||||
|
chunk_size: int = 256,
|
||||||
|
chunk_overlap: int = 128,
|
||||||
|
required_exts: Optional[list[str]] = None,
|
||||||
|
max_items: int = -1,
|
||||||
|
embedding_model: Optional[str] = None,
|
||||||
|
embedding_mode: Optional[str] = None,
|
||||||
|
) -> int:
|
||||||
|
"""Load documents from a directory, chunk them, and incrementally add to an index.
|
||||||
|
|
||||||
|
Chunking uses LlamaIndex SentenceSplitter for simplicity and avoids external app dependencies.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from llama_index.core import SimpleDirectoryReader # type: ignore
|
||||||
|
from llama_index.core.node_parser import SentenceSplitter # type: ignore
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError("llama-index-core is required for incremental_add_directory") from e
|
||||||
|
|
||||||
|
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
|
||||||
|
if required_exts:
|
||||||
|
reader_kwargs["required_exts"] = required_exts
|
||||||
|
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
|
||||||
|
if not documents:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Traditional text chunking
|
||||||
|
splitter = SentenceSplitter(
|
||||||
|
chunk_size=chunk_size,
|
||||||
|
chunk_overlap=chunk_overlap,
|
||||||
|
separator=" ",
|
||||||
|
paragraph_separator="\n\n",
|
||||||
|
)
|
||||||
|
all_texts: list[str] = []
|
||||||
|
for doc in documents:
|
||||||
|
try:
|
||||||
|
nodes = splitter.get_nodes_from_documents([doc])
|
||||||
|
if nodes:
|
||||||
|
all_texts.extend([node.get_content() for node in nodes])
|
||||||
|
except Exception:
|
||||||
|
content = doc.get_content()
|
||||||
|
if content and content.strip():
|
||||||
|
all_texts.append(content.strip())
|
||||||
|
|
||||||
|
if max_items > 0 and len(all_texts) > max_items:
|
||||||
|
all_texts = all_texts[:max_items]
|
||||||
|
|
||||||
|
return incremental_add_texts(
|
||||||
|
index_path,
|
||||||
|
all_texts,
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
embedding_mode=embedding_mode,
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user