diff --git a/examples/dynamic_add_leann_no_recompute.py b/examples/dynamic_add_leann_no_recompute.py index 5974dcc..621a893 100644 --- a/examples/dynamic_add_leann_no_recompute.py +++ b/examples/dynamic_add_leann_no_recompute.py @@ -173,88 +173,41 @@ def add_incremental( index_path = index_dir_path / (index_name or "documents.leann") # If specified base doesn't exist, try to auto-detect an existing base - meta = None try: - meta = _read_meta(index_path) + _read_meta(index_path) except FileNotFoundError: auto_base = _autodetect_index_base(index_dir_path) if auto_base is not None: print(f"Auto-detected index base: {auto_base.name}") index_path = auto_base - meta = _read_meta(index_path) + _read_meta(index_path) else: raise FileNotFoundError( f"No index metadata found for base '{index_path.name}'. Build base first with --build-base " f"or provide --index-name to match an existing index (e.g., 'test_doc_files.leann')." ) - passages_file, offsets_file, faiss_index_file = _index_files(index_path) + # Prepare validated context from core (checks backend/no-recompute and resolves embedding defaults) + from leann.api import create_incremental_add_context, incremental_add_texts_with_context - if meta.get("backend_name") != "hnsw": - raise RuntimeError("Incremental add is currently supported only for HNSW backend") - if meta.get("is_compact", True): - raise RuntimeError( - "Index is compact. Rebuild base with --no-recompute and --no-compact for incremental add." - ) - - # Ensure the vector index exists before appending passages - if not faiss_index_file.exists(): - raise FileNotFoundError( - f"Vector index file missing: {faiss_index_file}. Build base first (use --build-base)." - ) - - # Resolve embedding config from meta if not provided - embedding_model or meta.get("embedding_model", "facebook/contriever") - embedding_mode or meta.get("embedding_mode", "sentence-transformers") - - documents = _load_documents(add_dir, required_exts=file_types) - if not documents: - raise ValueError(f"No documents found in add_dir: {add_dir}") - - from chunking import create_text_chunks - - new_texts = create_text_chunks( - documents, + ctx = create_incremental_add_context( + str(index_path), + embedding_model=embedding_model, + embedding_mode=embedding_mode, + data_dir=add_dir, + required_exts=file_types, chunk_size=chunk_size, chunk_overlap=chunk_overlap, - use_ast_chunking=False, + max_items=max_items, ) - if max_items > 0 and len(new_texts) > max_items: - new_texts = new_texts[:max_items] - print(f"Limiting to {max_items} chunks (incremental)") - if not new_texts: + # Use prepared texts from context to perform the add + prepared_texts = ctx.prepared_texts or [] + if not prepared_texts: print("No new chunks to add.") return str(index_path) - # Load and extend passages + offsets - offset_map = _load_offset_map(offsets_file) - start_id_int = _next_numeric_id(list(offset_map.keys())) - next_id = start_id_int - - # Append to passages.jsonl and collect offsets - print("Appending passages and updating offsets...") - with open(passages_file, "a", encoding="utf-8") as f: - for text in new_texts: - offset = f.tell() - str_id = str(next_id) - json.dump({"id": str_id, "text": text, "metadata": {}}, f, ensure_ascii=False) - f.write("\n") - offset_map[str_id] = offset - next_id += 1 - - with open(offsets_file, "wb") as f: - pickle.dump(offset_map, f) - - # Compute embeddings for new texts - print("Computing embeddings for incremental chunks...") - from leann.api import incremental_add_texts - - # Let core handle embeddings and vector index update - added = incremental_add_texts( - str(index_path), - new_texts, - ) + added = incremental_add_texts_with_context(ctx, prepared_texts) print(f"Incremental add completed. Added {added} chunks. Index: {index_path}") return str(index_path) diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index 6079f1d..46a0e74 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -118,6 +118,20 @@ class SearchResult: metadata: dict[str, Any] = field(default_factory=dict) +@dataclass +class IncrementalAddContext: + """Prepared context for safe incremental add operations on an index.""" + + index_path: str + passages_file: Path + offsets_file: Path + vector_index_file: Path + embedding_model: str + embedding_mode: str + distance_metric: str + prepared_texts: Optional[list[str]] = None + + class PassageManager: def __init__( self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None @@ -1169,6 +1183,130 @@ def incremental_add_texts( return len(assigned_ids) +def create_incremental_add_context( + index_path: str, + *, + # Optional embedding choices; if None will use meta + embedding_model: Optional[str] = None, + embedding_mode: Optional[str] = None, + # Optional data-to-text preparation in context + data_dir: Optional[str] = None, + required_exts: Optional[list[str]] = None, + chunk_size: int = 256, + chunk_overlap: int = 128, + max_items: int = -1, +) -> IncrementalAddContext: + """Validate index and prepare context for repeated incremental adds. + + Additionally, if data_dir is provided, this function will load documents, + chunk them to texts with the specified parameters, and store them in ctx.prepared_texts. + """ + meta = _read_meta_file(index_path) + if meta.get("backend_name") != "hnsw": + raise RuntimeError("Incremental add is currently supported only for HNSW backend") + if meta.get("is_compact", True): + raise RuntimeError( + "Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add." + ) + + passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path) + if not vector_index_file.exists(): + raise FileNotFoundError( + f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder." + ) + + model_name = embedding_model or meta.get("embedding_model", "facebook/contriever") + mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers") + distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower() + + prepared_texts: Optional[list[str]] = None + if data_dir is not None: + try: + from llama_index.core import SimpleDirectoryReader # type: ignore + from llama_index.core.node_parser import SentenceSplitter # type: ignore + except Exception as e: + raise RuntimeError( + "llama-index-core is required when using data_dir in create_incremental_add_context" + ) from e + + reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"} + if required_exts: + reader_kwargs["required_exts"] = required_exts + documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True) + if documents: + splitter = SentenceSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + separator=" ", + paragraph_separator="\n\n", + ) + prepared_texts = [] + for doc in documents: + try: + nodes = splitter.get_nodes_from_documents([doc]) + if nodes: + prepared_texts.extend([node.get_content() for node in nodes]) + except Exception: + content = doc.get_content() + if content and content.strip(): + prepared_texts.append(content.strip()) + if max_items > 0 and len(prepared_texts) > max_items: + prepared_texts = prepared_texts[:max_items] + + return IncrementalAddContext( + index_path=index_path, + passages_file=passages_file, + offsets_file=offsets_file, + vector_index_file=vector_index_file, + embedding_model=model_name, + embedding_mode=mode_name, + distance_metric=distance_metric, + prepared_texts=prepared_texts, + ) + + +def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[str]) -> int: + """Incrementally add texts using a prepared context (no repeated validation).""" + if not texts: + return 0 + + # Append passages & offsets + _append_passages_and_update_offsets(ctx.passages_file, ctx.offsets_file, texts) + + # Compute embeddings + embeddings = compute_embeddings( + texts, + model_name=ctx.embedding_model, + mode=ctx.embedding_mode, + use_server=False, + is_build=True, + ) + + # Normalize for cosine if needed + if ctx.distance_metric == "cosine": + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + norms[norms == 0] = 1 + embeddings = embeddings / norms + + # Load vector index and append + try: + from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore + except Exception as e: + raise RuntimeError( + "Failed to import leann_backend_hnsw.faiss. Ensure HNSW backend is installed." + ) from e + + index = hnsw_faiss.read_index(str(ctx.vector_index_file), hnsw_faiss.IO_FLAG_MMAP) + if embeddings.dtype != np.float32: + embeddings = embeddings.astype(np.float32) + if not embeddings.flags.c_contiguous: + embeddings = np.ascontiguousarray(embeddings, dtype=np.float32) + index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings)) + hnsw_faiss.write_index(index, str(ctx.vector_index_file)) + + return embeddings.shape[0] + + def incremental_add_directory( index_path: str, data_dir: str,