From 880a039e1d65df38da7d2f4ce7720d24ebb34fe9 Mon Sep 17 00:00:00 2001 From: yichuan520030910320 Date: Sun, 14 Sep 2025 16:03:48 -0700 Subject: [PATCH] modular add --- examples/dynamic_add_leann_no_recompute.py | 51 +---- packages/leann-core/src/leann/api.py | 207 ++++++++++++++++++++- 2 files changed, 211 insertions(+), 47 deletions(-) diff --git a/examples/dynamic_add_leann_no_recompute.py b/examples/dynamic_add_leann_no_recompute.py index 9e5a358..5974dcc 100644 --- a/examples/dynamic_add_leann_no_recompute.py +++ b/examples/dynamic_add_leann_no_recompute.py @@ -204,8 +204,8 @@ def add_incremental( ) # Resolve embedding config from meta if not provided - emb_model = embedding_model or meta.get("embedding_model", "facebook/contriever") - emb_mode = embedding_mode or meta.get("embedding_mode", "sentence-transformers") + embedding_model or meta.get("embedding_model", "facebook/contriever") + embedding_mode or meta.get("embedding_mode", "sentence-transformers") documents = _load_documents(add_dir, required_exts=file_types) if not documents: @@ -248,52 +248,15 @@ def add_incremental( # Compute embeddings for new texts print("Computing embeddings for incremental chunks...") - from leann.api import compute_embeddings + from leann.api import incremental_add_texts - embeddings = compute_embeddings( + # Let core handle embeddings and vector index update + added = incremental_add_texts( + str(index_path), new_texts, - model_name=emb_model, - mode=emb_mode, - use_server=False, - is_build=True, ) - # Load FAISS HNSW index and add vectors, then write back - print("Loading HNSW index and appending vectors...") - try: - from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore - except Exception as e: # pragma: no cover - environment-specific - raise RuntimeError( - "Failed to import leann_backend_hnsw.faiss. Ensure backend is built/installed." - ) from e - - # Read existing index - # Read existing index (basic read is sufficient for appending) - index = hnsw_faiss.read_index(str(faiss_index_file), hnsw_faiss.IO_FLAG_MMAP) - - # Normalize for cosine if needed - distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower() - if distance_metric == "cosine": - import numpy as _np - - norms = _np.linalg.norm(embeddings, axis=1, keepdims=True) - norms[norms == 0] = 1 - embeddings = embeddings / norms - - # Ensure dtype float32 and contiguous - import numpy as _np - - if embeddings.dtype != _np.float32: - embeddings = embeddings.astype(_np.float32) - if not embeddings.flags.c_contiguous: - embeddings = _np.ascontiguousarray(embeddings, dtype=_np.float32) - - # Append using FAISS-style signature (n, swig_ptr(x)); fall back to Python wrapper if needed - index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings)) - - hnsw_faiss.write_index(index, str(faiss_index_file)) - - print(f"Incremental add completed. Index updated at: {index_path}") + print(f"Incremental add completed. Added {added} chunks. Index: {index_path}") return str(index_path) diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index 653f573..6079f1d 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -1018,8 +1018,209 @@ class LeannChat: except Exception: pass - def __del__(self): + +# ------------------------------ +# Incremental Add Utilities (HNSW no-recompute only) +# ------------------------------ + + +def _resolve_index_paths(index_path: str) -> tuple[Path, Path, Path]: + """Given base index path (without extension), return (passages.jsonl, passages.idx, vector.index). + + For HNSW, vector index file is typically .index (e.g., documents.index) even when base is + 'documents.leann'. We prefer an existing .index, otherwise fall back to .index. + """ + base = Path(index_path) + passages_file = base.parent / f"{base.name}.passages.jsonl" + offsets_file = base.parent / f"{base.name}.passages.idx" + candidate_name_index = base.parent / f"{base.name}.index" + candidate_stem_index = base.parent / f"{base.stem}.index" + vector_index_file = ( + candidate_stem_index if candidate_stem_index.exists() else candidate_name_index + ) + return passages_file, offsets_file, vector_index_file + + +def _read_meta_file(index_path: str) -> dict[str, Any]: + meta_path = Path(f"{index_path}.meta.json") + if not meta_path.exists(): + raise FileNotFoundError(f"Leann metadata file not found: {meta_path}") + with open(meta_path, encoding="utf-8") as f: + return json.load(f) + + +def _load_offset_map_pickle(offsets_file: Path) -> dict[str, int]: + if not offsets_file.exists(): + return {} + with open(offsets_file, "rb") as f: + return pickle.load(f) + + +def _append_passages_and_update_offsets( + passages_file: Path, offsets_file: Path, new_texts: list[str] +) -> list[str]: + """Append new texts to passages file, update offset map, and return assigned string IDs. + + IDs are assigned as incrementing integers based on existing keys in the offset map. + """ + offset_map = _load_offset_map_pickle(offsets_file) + # Compute next numeric id + numeric_ids = [int(x) for x in offset_map.keys() if str(x).isdigit()] + next_id_num = (max(numeric_ids) + 1) if numeric_ids else 0 + assigned_ids: list[str] = [] + + with open(passages_file, "a", encoding="utf-8") as f: + for text in new_texts: + offset = f.tell() + str_id = str(next_id_num) + json.dump({"id": str_id, "text": text, "metadata": {}}, f, ensure_ascii=False) + f.write("\n") + offset_map[str_id] = offset + assigned_ids.append(str_id) + next_id_num += 1 + + with open(offsets_file, "wb") as f: + pickle.dump(offset_map, f) + + return assigned_ids + + +def incremental_add_texts( + index_path: str, + texts: list[str], + *, + embedding_model: Optional[str] = None, + embedding_mode: Optional[str] = None, +) -> int: + """Incrementally add text chunks to an existing HNSW index built with no-recompute. + + - Validates backend is HNSW and index is non-compact (no-recompute path) + - Appends passages and offsets + - Computes embeddings and appends to the HNSW vector index + + Returns number of added chunks. + """ + if not texts: + return 0 + + meta = _read_meta_file(index_path) + if meta.get("backend_name") != "hnsw": + raise RuntimeError("Incremental add is currently supported only for HNSW backend") + if meta.get("is_compact", True): + raise RuntimeError( + "Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add." + ) + + passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path) + if not vector_index_file.exists(): + raise FileNotFoundError( + f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder." + ) + + # Resolve embedding config from meta if not provided + model_name = embedding_model or meta.get("embedding_model", "facebook/contriever") + mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers") + + # Append passages and update offsets + assigned_ids = _append_passages_and_update_offsets(passages_file, offsets_file, texts) + + # Compute embeddings + embeddings = compute_embeddings( + texts, + model_name=model_name, + mode=mode_name, + use_server=False, + is_build=True, + ) + + # Normalize for cosine if needed + distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower() + if distance_metric == "cosine": + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + norms[norms == 0] = 1 + embeddings = embeddings / norms + + # Load vector index and append + try: + from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore + except Exception as e: + raise RuntimeError( + "Failed to import leann_backend_hnsw.faiss. Ensure HNSW backend is installed." + ) from e + + index = hnsw_faiss.read_index(str(vector_index_file), hnsw_faiss.IO_FLAG_MMAP) + if embeddings.dtype != np.float32: + embeddings = embeddings.astype(np.float32) + if not embeddings.flags.c_contiguous: + embeddings = np.ascontiguousarray(embeddings, dtype=np.float32) + + # C++-style signature (n, swig_ptr(x)) + index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings)) + hnsw_faiss.write_index(index, str(vector_index_file)) + + # Sanity: ids length should match embeddings rows + if len(assigned_ids) != embeddings.shape[0]: + warnings.warn( + f"Assigned {len(assigned_ids)} IDs but computed {embeddings.shape[0]} embeddings.", + UserWarning, + stacklevel=2, + ) + + return len(assigned_ids) + + +def incremental_add_directory( + index_path: str, + data_dir: str, + *, + chunk_size: int = 256, + chunk_overlap: int = 128, + required_exts: Optional[list[str]] = None, + max_items: int = -1, + embedding_model: Optional[str] = None, + embedding_mode: Optional[str] = None, +) -> int: + """Load documents from a directory, chunk them, and incrementally add to an index. + + Chunking uses LlamaIndex SentenceSplitter for simplicity and avoids external app dependencies. + """ + try: + from llama_index.core import SimpleDirectoryReader # type: ignore + from llama_index.core.node_parser import SentenceSplitter # type: ignore + except Exception as e: + raise RuntimeError("llama-index-core is required for incremental_add_directory") from e + + reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"} + if required_exts: + reader_kwargs["required_exts"] = required_exts + documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True) + if not documents: + return 0 + + # Traditional text chunking + splitter = SentenceSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + separator=" ", + paragraph_separator="\n\n", + ) + all_texts: list[str] = [] + for doc in documents: try: - self.cleanup() + nodes = splitter.get_nodes_from_documents([doc]) + if nodes: + all_texts.extend([node.get_content() for node in nodes]) except Exception: - pass + content = doc.get_content() + if content and content.strip(): + all_texts.append(content.strip()) + + if max_items > 0 and len(all_texts) > max_items: + all_texts = all_texts[:max_items] + + return incremental_add_texts( + index_path, + all_texts, + embedding_model=embedding_model, + embedding_mode=embedding_mode, + )