modular add v2
This commit is contained in:
@@ -173,88 +173,41 @@ def add_incremental(
|
||||
index_path = index_dir_path / (index_name or "documents.leann")
|
||||
|
||||
# If specified base doesn't exist, try to auto-detect an existing base
|
||||
meta = None
|
||||
try:
|
||||
meta = _read_meta(index_path)
|
||||
_read_meta(index_path)
|
||||
except FileNotFoundError:
|
||||
auto_base = _autodetect_index_base(index_dir_path)
|
||||
if auto_base is not None:
|
||||
print(f"Auto-detected index base: {auto_base.name}")
|
||||
index_path = auto_base
|
||||
meta = _read_meta(index_path)
|
||||
_read_meta(index_path)
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"No index metadata found for base '{index_path.name}'. Build base first with --build-base "
|
||||
f"or provide --index-name to match an existing index (e.g., 'test_doc_files.leann')."
|
||||
)
|
||||
|
||||
passages_file, offsets_file, faiss_index_file = _index_files(index_path)
|
||||
# Prepare validated context from core (checks backend/no-recompute and resolves embedding defaults)
|
||||
from leann.api import create_incremental_add_context, incremental_add_texts_with_context
|
||||
|
||||
if meta.get("backend_name") != "hnsw":
|
||||
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
|
||||
if meta.get("is_compact", True):
|
||||
raise RuntimeError(
|
||||
"Index is compact. Rebuild base with --no-recompute and --no-compact for incremental add."
|
||||
)
|
||||
|
||||
# Ensure the vector index exists before appending passages
|
||||
if not faiss_index_file.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Vector index file missing: {faiss_index_file}. Build base first (use --build-base)."
|
||||
)
|
||||
|
||||
# Resolve embedding config from meta if not provided
|
||||
embedding_model or meta.get("embedding_model", "facebook/contriever")
|
||||
embedding_mode or meta.get("embedding_mode", "sentence-transformers")
|
||||
|
||||
documents = _load_documents(add_dir, required_exts=file_types)
|
||||
if not documents:
|
||||
raise ValueError(f"No documents found in add_dir: {add_dir}")
|
||||
|
||||
from chunking import create_text_chunks
|
||||
|
||||
new_texts = create_text_chunks(
|
||||
documents,
|
||||
ctx = create_incremental_add_context(
|
||||
str(index_path),
|
||||
embedding_model=embedding_model,
|
||||
embedding_mode=embedding_mode,
|
||||
data_dir=add_dir,
|
||||
required_exts=file_types,
|
||||
chunk_size=chunk_size,
|
||||
chunk_overlap=chunk_overlap,
|
||||
use_ast_chunking=False,
|
||||
max_items=max_items,
|
||||
)
|
||||
if max_items > 0 and len(new_texts) > max_items:
|
||||
new_texts = new_texts[:max_items]
|
||||
print(f"Limiting to {max_items} chunks (incremental)")
|
||||
|
||||
if not new_texts:
|
||||
# Use prepared texts from context to perform the add
|
||||
prepared_texts = ctx.prepared_texts or []
|
||||
if not prepared_texts:
|
||||
print("No new chunks to add.")
|
||||
return str(index_path)
|
||||
|
||||
# Load and extend passages + offsets
|
||||
offset_map = _load_offset_map(offsets_file)
|
||||
start_id_int = _next_numeric_id(list(offset_map.keys()))
|
||||
next_id = start_id_int
|
||||
|
||||
# Append to passages.jsonl and collect offsets
|
||||
print("Appending passages and updating offsets...")
|
||||
with open(passages_file, "a", encoding="utf-8") as f:
|
||||
for text in new_texts:
|
||||
offset = f.tell()
|
||||
str_id = str(next_id)
|
||||
json.dump({"id": str_id, "text": text, "metadata": {}}, f, ensure_ascii=False)
|
||||
f.write("\n")
|
||||
offset_map[str_id] = offset
|
||||
next_id += 1
|
||||
|
||||
with open(offsets_file, "wb") as f:
|
||||
pickle.dump(offset_map, f)
|
||||
|
||||
# Compute embeddings for new texts
|
||||
print("Computing embeddings for incremental chunks...")
|
||||
from leann.api import incremental_add_texts
|
||||
|
||||
# Let core handle embeddings and vector index update
|
||||
added = incremental_add_texts(
|
||||
str(index_path),
|
||||
new_texts,
|
||||
)
|
||||
added = incremental_add_texts_with_context(ctx, prepared_texts)
|
||||
|
||||
print(f"Incremental add completed. Added {added} chunks. Index: {index_path}")
|
||||
return str(index_path)
|
||||
|
||||
@@ -118,6 +118,20 @@ class SearchResult:
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IncrementalAddContext:
|
||||
"""Prepared context for safe incremental add operations on an index."""
|
||||
|
||||
index_path: str
|
||||
passages_file: Path
|
||||
offsets_file: Path
|
||||
vector_index_file: Path
|
||||
embedding_model: str
|
||||
embedding_mode: str
|
||||
distance_metric: str
|
||||
prepared_texts: Optional[list[str]] = None
|
||||
|
||||
|
||||
class PassageManager:
|
||||
def __init__(
|
||||
self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None
|
||||
@@ -1169,6 +1183,130 @@ def incremental_add_texts(
|
||||
return len(assigned_ids)
|
||||
|
||||
|
||||
def create_incremental_add_context(
|
||||
index_path: str,
|
||||
*,
|
||||
# Optional embedding choices; if None will use meta
|
||||
embedding_model: Optional[str] = None,
|
||||
embedding_mode: Optional[str] = None,
|
||||
# Optional data-to-text preparation in context
|
||||
data_dir: Optional[str] = None,
|
||||
required_exts: Optional[list[str]] = None,
|
||||
chunk_size: int = 256,
|
||||
chunk_overlap: int = 128,
|
||||
max_items: int = -1,
|
||||
) -> IncrementalAddContext:
|
||||
"""Validate index and prepare context for repeated incremental adds.
|
||||
|
||||
Additionally, if data_dir is provided, this function will load documents,
|
||||
chunk them to texts with the specified parameters, and store them in ctx.prepared_texts.
|
||||
"""
|
||||
meta = _read_meta_file(index_path)
|
||||
if meta.get("backend_name") != "hnsw":
|
||||
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
|
||||
if meta.get("is_compact", True):
|
||||
raise RuntimeError(
|
||||
"Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add."
|
||||
)
|
||||
|
||||
passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path)
|
||||
if not vector_index_file.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder."
|
||||
)
|
||||
|
||||
model_name = embedding_model or meta.get("embedding_model", "facebook/contriever")
|
||||
mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers")
|
||||
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
|
||||
|
||||
prepared_texts: Optional[list[str]] = None
|
||||
if data_dir is not None:
|
||||
try:
|
||||
from llama_index.core import SimpleDirectoryReader # type: ignore
|
||||
from llama_index.core.node_parser import SentenceSplitter # type: ignore
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"llama-index-core is required when using data_dir in create_incremental_add_context"
|
||||
) from e
|
||||
|
||||
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
|
||||
if required_exts:
|
||||
reader_kwargs["required_exts"] = required_exts
|
||||
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
|
||||
if documents:
|
||||
splitter = SentenceSplitter(
|
||||
chunk_size=chunk_size,
|
||||
chunk_overlap=chunk_overlap,
|
||||
separator=" ",
|
||||
paragraph_separator="\n\n",
|
||||
)
|
||||
prepared_texts = []
|
||||
for doc in documents:
|
||||
try:
|
||||
nodes = splitter.get_nodes_from_documents([doc])
|
||||
if nodes:
|
||||
prepared_texts.extend([node.get_content() for node in nodes])
|
||||
except Exception:
|
||||
content = doc.get_content()
|
||||
if content and content.strip():
|
||||
prepared_texts.append(content.strip())
|
||||
if max_items > 0 and len(prepared_texts) > max_items:
|
||||
prepared_texts = prepared_texts[:max_items]
|
||||
|
||||
return IncrementalAddContext(
|
||||
index_path=index_path,
|
||||
passages_file=passages_file,
|
||||
offsets_file=offsets_file,
|
||||
vector_index_file=vector_index_file,
|
||||
embedding_model=model_name,
|
||||
embedding_mode=mode_name,
|
||||
distance_metric=distance_metric,
|
||||
prepared_texts=prepared_texts,
|
||||
)
|
||||
|
||||
|
||||
def incremental_add_texts_with_context(ctx: IncrementalAddContext, texts: list[str]) -> int:
|
||||
"""Incrementally add texts using a prepared context (no repeated validation)."""
|
||||
if not texts:
|
||||
return 0
|
||||
|
||||
# Append passages & offsets
|
||||
_append_passages_and_update_offsets(ctx.passages_file, ctx.offsets_file, texts)
|
||||
|
||||
# Compute embeddings
|
||||
embeddings = compute_embeddings(
|
||||
texts,
|
||||
model_name=ctx.embedding_model,
|
||||
mode=ctx.embedding_mode,
|
||||
use_server=False,
|
||||
is_build=True,
|
||||
)
|
||||
|
||||
# Normalize for cosine if needed
|
||||
if ctx.distance_metric == "cosine":
|
||||
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
|
||||
norms[norms == 0] = 1
|
||||
embeddings = embeddings / norms
|
||||
|
||||
# Load vector index and append
|
||||
try:
|
||||
from leann_backend_hnsw import faiss as hnsw_faiss # type: ignore
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"Failed to import leann_backend_hnsw.faiss. Ensure HNSW backend is installed."
|
||||
) from e
|
||||
|
||||
index = hnsw_faiss.read_index(str(ctx.vector_index_file), hnsw_faiss.IO_FLAG_MMAP)
|
||||
if embeddings.dtype != np.float32:
|
||||
embeddings = embeddings.astype(np.float32)
|
||||
if not embeddings.flags.c_contiguous:
|
||||
embeddings = np.ascontiguousarray(embeddings, dtype=np.float32)
|
||||
index.add(embeddings.shape[0], hnsw_faiss.swig_ptr(embeddings))
|
||||
hnsw_faiss.write_index(index, str(ctx.vector_index_file))
|
||||
|
||||
return embeddings.shape[0]
|
||||
|
||||
|
||||
def incremental_add_directory(
|
||||
index_path: str,
|
||||
data_dir: str,
|
||||
|
||||
Reference in New Issue
Block a user