Compare commits

..

4 Commits

Author SHA1 Message Date
yichuan520030910320
00c44e3980 [cli] fix # 81 2025-09-01 16:53:09 -07:00
yichuan520030910320
e6a542bf4b [cli] better gitignore / better leann list 2025-09-01 16:42:11 -07:00
yichuan520030910320
7e84dae02e [chore] add slack to share use case 2025-08-30 00:32:13 -07:00
yichuan520030910320
2f05ed4535 chore(submodule): bump faiss to latest storage-efficient build 2025-08-23 18:29:11 -07:00
31 changed files with 178 additions and 1629 deletions

View File

@@ -1,50 +0,0 @@
name: Bug Report
description: Report a bug in LEANN
labels: ["bug"]
body:
- type: textarea
id: description
attributes:
label: What happened?
description: A clear description of the bug
validations:
required: true
- type: textarea
id: reproduce
attributes:
label: How to reproduce
placeholder: |
1. Install with...
2. Run command...
3. See error
validations:
required: true
- type: textarea
id: error
attributes:
label: Error message
description: Paste any error messages
render: shell
- type: input
id: version
attributes:
label: LEANN Version
placeholder: "0.1.0"
validations:
required: true
- type: dropdown
id: os
attributes:
label: Operating System
options:
- macOS
- Linux
- Windows
- Docker
validations:
required: true

View File

@@ -1,8 +0,0 @@
blank_issues_enabled: true
contact_links:
- name: Documentation
url: https://github.com/LEANN-RAG/LEANN-RAG/tree/main/docs
about: Read the docs first
- name: Discussions
url: https://github.com/LEANN-RAG/LEANN-RAG/discussions
about: Ask questions and share ideas

View File

@@ -1,27 +0,0 @@
name: Feature Request
description: Suggest a new feature for LEANN
labels: ["enhancement"]
body:
- type: textarea
id: problem
attributes:
label: What problem does this solve?
description: Describe the problem or need
validations:
required: true
- type: textarea
id: solution
attributes:
label: Proposed solution
description: How would you like this to work?
validations:
required: true
- type: textarea
id: example
attributes:
label: Example usage
description: Show how the API might look
render: python

View File

@@ -1,13 +0,0 @@
## What does this PR do?
<!-- Brief description of your changes -->
## Related Issues
Fixes #
## Checklist
- [ ] Tests pass (`uv run pytest`)
- [ ] Code formatted (`ruff format` and `ruff check`)
- [ ] Pre-commit hooks pass (`pre-commit run --all-files`)

View File

@@ -54,17 +54,6 @@ jobs:
python: '3.12'
- os: ubuntu-22.04
python: '3.13'
# ARM64 Linux builds
- os: ubuntu-24.04-arm
python: '3.9'
- os: ubuntu-24.04-arm
python: '3.10'
- os: ubuntu-24.04-arm
python: '3.11'
- os: ubuntu-24.04-arm
python: '3.12'
- os: ubuntu-24.04-arm
python: '3.13'
- os: macos-14
python: '3.9'
- os: macos-14
@@ -119,46 +108,13 @@ jobs:
pkg-config libabsl-dev libaio-dev libprotobuf-dev \
patchelf
# Debug: Show system information
echo "🔍 System Information:"
echo "Architecture: $(uname -m)"
echo "OS: $(uname -a)"
echo "CPU info: $(lscpu | head -5)"
# Install math library based on architecture
ARCH=$(uname -m)
echo "🔍 Setting up math library for architecture: $ARCH"
if [[ "$ARCH" == "x86_64" ]]; then
# Install Intel MKL for DiskANN on x86_64
echo "📦 Installing Intel MKL for x86_64..."
wget -q https://registrationcenter-download.intel.com/akdlm/IRC_NAS/79153e0f-74d7-45af-b8c2-258941adf58a/intel-onemkl-2025.0.0.940.sh
sudo sh intel-onemkl-2025.0.0.940.sh -a --components intel.oneapi.lin.mkl.devel --action install --eula accept -s
source /opt/intel/oneapi/setvars.sh
echo "MKLROOT=/opt/intel/oneapi/mkl/latest" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=/opt/intel/oneapi/compiler/latest/linux/compiler/lib/intel64_lin" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/intel/oneapi/mkl/latest/lib/intel64" >> $GITHUB_ENV
echo "✅ Intel MKL installed for x86_64"
# Debug: Check MKL installation
echo "🔍 MKL Installation Check:"
ls -la /opt/intel/oneapi/mkl/latest/ || echo "MKL directory not found"
ls -la /opt/intel/oneapi/mkl/latest/lib/ || echo "MKL lib directory not found"
elif [[ "$ARCH" == "aarch64" ]]; then
# Use OpenBLAS for ARM64 (MKL installer not compatible with ARM64)
echo "📦 Installing OpenBLAS for ARM64..."
sudo apt-get install -y libopenblas-dev liblapack-dev liblapacke-dev
echo "✅ OpenBLAS installed for ARM64"
# Debug: Check OpenBLAS installation
echo "🔍 OpenBLAS Installation Check:"
dpkg -l | grep openblas || echo "OpenBLAS package not found"
ls -la /usr/lib/aarch64-linux-gnu/openblas/ || echo "OpenBLAS directory not found"
fi
# Debug: Show final library paths
echo "🔍 Final LD_LIBRARY_PATH: $LD_LIBRARY_PATH"
# Install Intel MKL for DiskANN
wget -q https://registrationcenter-download.intel.com/akdlm/IRC_NAS/79153e0f-74d7-45af-b8c2-258941adf58a/intel-onemkl-2025.0.0.940.sh
sudo sh intel-onemkl-2025.0.0.940.sh -a --components intel.oneapi.lin.mkl.devel --action install --eula accept -s
source /opt/intel/oneapi/setvars.sh
echo "MKLROOT=/opt/intel/oneapi/mkl/latest" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=/opt/intel/oneapi/compiler/latest/linux/compiler/lib/intel64_lin" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/intel/oneapi/mkl/latest/lib/intel64" >> $GITHUB_ENV
- name: Install system dependencies (macOS)
if: runner.os == 'macOS'

2
.gitignore vendored
View File

@@ -22,7 +22,6 @@ demo/experiment_results/**/*.json
*.sh
*.txt
!CMakeLists.txt
!llms.txt
latency_breakdown*.json
experiment_results/eval_results/diskann/*.json
aws/
@@ -101,4 +100,3 @@ CLAUDE.local.md
.claude/*.local.*
.claude/local/*
benchmarks/data/
test_add/*

3
.gitmodules vendored
View File

@@ -14,6 +14,3 @@
[submodule "packages/leann-backend-hnsw/third_party/libzmq"]
path = packages/leann-backend-hnsw/third_party/libzmq
url = https://github.com/zeromq/libzmq.git
[submodule "packages/astchunk-leann"]
path = packages/astchunk-leann
url = https://github.com/yichuan-w/astchunk-leann.git

View File

@@ -656,19 +656,6 @@ results = searcher.search(
📖 **[Complete Metadata filtering guide →](docs/metadata_filtering.md)**
### 🔍 Grep Search
For exact text matching instead of semantic search, use the `use_grep` parameter:
```python
# Exact text search
results = searcher.search("bananacrocodile", use_grep=True, top_k=1)
```
**Use cases**: Finding specific code patterns, error messages, function names, or exact phrases where semantic similarity isn't needed.
📖 **[Complete grep search guide →](docs/grep_search.md)**
## 🏗️ Architecture & How It Works
<p align="center">

View File

@@ -1,38 +1,16 @@
"""Unified chunking utilities facade.
This module re-exports the packaged utilities from `leann.chunking_utils` so
that both repo apps (importing `chunking`) and installed wheels share one
single implementation. When running from the repo without installation, it
adds the `packages/leann-core/src` directory to `sys.path` as a fallback.
"""
Chunking utilities for LEANN RAG applications.
Provides AST-aware and traditional text chunking functionality.
"""
import sys
from pathlib import Path
try:
from leann.chunking_utils import (
CODE_EXTENSIONS,
create_ast_chunks,
create_text_chunks,
create_traditional_chunks,
detect_code_files,
get_language_from_extension,
)
except Exception: # pragma: no cover - best-effort fallback for dev environment
repo_root = Path(__file__).resolve().parents[2]
leann_src = repo_root / "packages" / "leann-core" / "src"
if leann_src.exists():
sys.path.insert(0, str(leann_src))
from leann.chunking_utils import (
CODE_EXTENSIONS,
create_ast_chunks,
create_text_chunks,
create_traditional_chunks,
detect_code_files,
get_language_from_extension,
)
else:
raise
from .utils import (
CODE_EXTENSIONS,
create_ast_chunks,
create_text_chunks,
create_traditional_chunks,
detect_code_files,
get_language_from_extension,
)
__all__ = [
"CODE_EXTENSIONS",

View File

@@ -1,6 +1,6 @@
"""
Enhanced chunking utilities with AST-aware code chunking support.
Packaged within leann-core so installed wheels can import it reliably.
Provides unified interface for both traditional and AST-based text chunking.
"""
import logging
@@ -22,9 +22,30 @@ CODE_EXTENSIONS = {
".jsx": "typescript",
}
# Default chunk parameters for different content types
DEFAULT_CHUNK_PARAMS = {
"code": {
"max_chunk_size": 512,
"chunk_overlap": 64,
},
"text": {
"chunk_size": 256,
"chunk_overlap": 128,
},
}
def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
"""Separate documents into code files and regular text files."""
"""
Separate documents into code files and regular text files.
Args:
documents: List of LlamaIndex Document objects
code_extensions: Dict mapping file extensions to languages (defaults to CODE_EXTENSIONS)
Returns:
Tuple of (code_documents, text_documents)
"""
if code_extensions is None:
code_extensions = CODE_EXTENSIONS
@@ -32,10 +53,16 @@ def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
text_docs = []
for doc in documents:
file_path = doc.metadata.get("file_path", "") or doc.metadata.get("file_name", "")
# Get file path from metadata
file_path = doc.metadata.get("file_path", "")
if not file_path:
# Fallback to file_name
file_path = doc.metadata.get("file_name", "")
if file_path:
file_ext = Path(file_path).suffix.lower()
if file_ext in code_extensions:
# Add language info to metadata
doc.metadata["language"] = code_extensions[file_ext]
doc.metadata["is_code"] = True
code_docs.append(doc)
@@ -43,6 +70,7 @@ def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
doc.metadata["is_code"] = False
text_docs.append(doc)
else:
# If no file path, treat as text
doc.metadata["is_code"] = False
text_docs.append(doc)
@@ -51,7 +79,7 @@ def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
def get_language_from_extension(file_path: str) -> Optional[str]:
"""Return language string from a filename/extension using CODE_EXTENSIONS."""
"""Get the programming language from file extension."""
ext = Path(file_path).suffix.lower()
return CODE_EXTENSIONS.get(ext)
@@ -62,26 +90,40 @@ def create_ast_chunks(
chunk_overlap: int = 64,
metadata_template: str = "default",
) -> list[str]:
"""Create AST-aware chunks from code documents using astchunk.
"""
Create AST-aware chunks from code documents using astchunk.
Falls back to traditional chunking if astchunk is unavailable.
Args:
documents: List of code documents
max_chunk_size: Maximum characters per chunk
chunk_overlap: Number of AST nodes to overlap between chunks
metadata_template: Template for chunk metadata
Returns:
List of text chunks with preserved code structure
"""
try:
from astchunk import ASTChunkBuilder # optional dependency
from astchunk import ASTChunkBuilder
except ImportError as e:
logger.error(f"astchunk not available: {e}")
logger.info("Falling back to traditional chunking for code files")
return create_traditional_chunks(documents, max_chunk_size, chunk_overlap)
all_chunks = []
for doc in documents:
# Get language from metadata (set by detect_code_files)
language = doc.metadata.get("language")
if not language:
logger.warning("No language detected; falling back to traditional chunking")
all_chunks.extend(create_traditional_chunks([doc], max_chunk_size, chunk_overlap))
logger.warning(
"No language detected for document, falling back to traditional chunking"
)
traditional_chunks = create_traditional_chunks([doc], max_chunk_size, chunk_overlap)
all_chunks.extend(traditional_chunks)
continue
try:
# Configure astchunk
configs = {
"max_chunk_size": max_chunk_size,
"language": language,
@@ -89,6 +131,7 @@ def create_ast_chunks(
"chunk_overlap": chunk_overlap if chunk_overlap > 0 else 0,
}
# Add repository-level metadata if available
repo_metadata = {
"file_path": doc.metadata.get("file_path", ""),
"file_name": doc.metadata.get("file_name", ""),
@@ -97,13 +140,17 @@ def create_ast_chunks(
}
configs["repo_level_metadata"] = repo_metadata
# Create chunk builder and process
chunk_builder = ASTChunkBuilder(**configs)
code_content = doc.get_content()
if not code_content or not code_content.strip():
logger.warning("Empty code content, skipping")
continue
chunks = chunk_builder.chunkify(code_content)
# Extract text content from chunks
for chunk in chunks:
if hasattr(chunk, "text"):
chunk_text = chunk.text
@@ -112,6 +159,7 @@ def create_ast_chunks(
elif isinstance(chunk, str):
chunk_text = chunk
else:
# Try to convert to string
chunk_text = str(chunk)
if chunk_text and chunk_text.strip():
@@ -120,10 +168,12 @@ def create_ast_chunks(
logger.info(
f"Created {len(chunks)} AST chunks from {language} file: {doc.metadata.get('file_name', 'unknown')}"
)
except Exception as e:
logger.warning(f"AST chunking failed for {language} file: {e}")
logger.info("Falling back to traditional chunking")
all_chunks.extend(create_traditional_chunks([doc], max_chunk_size, chunk_overlap))
traditional_chunks = create_traditional_chunks([doc], max_chunk_size, chunk_overlap)
all_chunks.extend(traditional_chunks)
return all_chunks
@@ -131,10 +181,23 @@ def create_ast_chunks(
def create_traditional_chunks(
documents, chunk_size: int = 256, chunk_overlap: int = 128
) -> list[str]:
"""Create traditional text chunks using LlamaIndex SentenceSplitter."""
"""
Create traditional text chunks using LlamaIndex SentenceSplitter.
Args:
documents: List of documents to chunk
chunk_size: Size of each chunk in characters
chunk_overlap: Overlap between chunks
Returns:
List of text chunks
"""
# Handle invalid chunk_size values
if chunk_size <= 0:
logger.warning(f"Invalid chunk_size={chunk_size}, using default value of 256")
chunk_size = 256
# Ensure chunk_overlap is not negative and not larger than chunk_size
if chunk_overlap < 0:
chunk_overlap = 0
if chunk_overlap >= chunk_size:
@@ -152,9 +215,12 @@ def create_traditional_chunks(
try:
nodes = node_parser.get_nodes_from_documents([doc])
if nodes:
all_texts.extend(node.get_content() for node in nodes)
chunk_texts = [node.get_content() for node in nodes]
all_texts.extend(chunk_texts)
logger.debug(f"Created {len(chunk_texts)} traditional chunks from document")
except Exception as e:
logger.error(f"Traditional chunking failed for document: {e}")
# As last resort, add the raw content
content = doc.get_content()
if content and content.strip():
all_texts.append(content.strip())
@@ -172,13 +238,32 @@ def create_text_chunks(
code_file_extensions: Optional[list[str]] = None,
ast_fallback_traditional: bool = True,
) -> list[str]:
"""Create text chunks from documents with optional AST support for code files."""
"""
Create text chunks from documents with optional AST support for code files.
Args:
documents: List of LlamaIndex Document objects
chunk_size: Size for traditional text chunks
chunk_overlap: Overlap for traditional text chunks
use_ast_chunking: Whether to use AST chunking for code files
ast_chunk_size: Size for AST chunks
ast_chunk_overlap: Overlap for AST chunks
code_file_extensions: Custom list of code file extensions
ast_fallback_traditional: Fall back to traditional chunking on AST errors
Returns:
List of text chunks
"""
if not documents:
logger.warning("No documents provided for chunking")
return []
# Create a local copy of supported extensions for this function call
local_code_extensions = CODE_EXTENSIONS.copy()
# Update supported extensions if provided
if code_file_extensions:
# Map extensions to languages (simplified mapping)
ext_mapping = {
".py": "python",
".java": "java",
@@ -188,32 +273,47 @@ def create_text_chunks(
}
for ext in code_file_extensions:
if ext.lower() not in local_code_extensions:
# Try to guess language from extension
if ext.lower() in ext_mapping:
local_code_extensions[ext.lower()] = ext_mapping[ext.lower()]
else:
logger.warning(f"Unsupported extension {ext}, will use traditional chunking")
all_chunks = []
if use_ast_chunking:
# Separate code and text documents using local extensions
code_docs, text_docs = detect_code_files(documents, local_code_extensions)
# Process code files with AST chunking
if code_docs:
logger.info(f"Processing {len(code_docs)} code files with AST chunking")
try:
all_chunks.extend(
create_ast_chunks(
code_docs, max_chunk_size=ast_chunk_size, chunk_overlap=ast_chunk_overlap
)
ast_chunks = create_ast_chunks(
code_docs, max_chunk_size=ast_chunk_size, chunk_overlap=ast_chunk_overlap
)
all_chunks.extend(ast_chunks)
logger.info(f"Created {len(ast_chunks)} AST chunks from code files")
except Exception as e:
logger.error(f"AST chunking failed: {e}")
if ast_fallback_traditional:
all_chunks.extend(
create_traditional_chunks(code_docs, chunk_size, chunk_overlap)
logger.info("Falling back to traditional chunking for code files")
traditional_code_chunks = create_traditional_chunks(
code_docs, chunk_size, chunk_overlap
)
all_chunks.extend(traditional_code_chunks)
else:
raise
# Process text files with traditional chunking
if text_docs:
all_chunks.extend(create_traditional_chunks(text_docs, chunk_size, chunk_overlap))
logger.info(f"Processing {len(text_docs)} text files with traditional chunking")
text_chunks = create_traditional_chunks(text_docs, chunk_size, chunk_overlap)
all_chunks.extend(text_chunks)
logger.info(f"Created {len(text_chunks)} traditional chunks from text files")
else:
# Use traditional chunking for all files
logger.info(f"Processing {len(documents)} documents with traditional chunking")
all_chunks = create_traditional_chunks(documents, chunk_size, chunk_overlap)
logger.info(f"Total chunks created: {len(all_chunks)}")

View File

@@ -74,7 +74,7 @@ class ChromeHistoryReader(BaseReader):
if count >= max_count and max_count > 0:
break
last_visit, url, title, visit_count, typed_count, _hidden = row
last_visit, url, title, visit_count, typed_count, hidden = row
# Create document content with metadata embedded in text
doc_content = f"""

View File

@@ -26,21 +26,6 @@ leann build my-code-index --docs ./src --use-ast-chunking
uv pip install -e "."
```
#### For normal users (PyPI install)
- Use `pip install leann` or `uv pip install leann`.
- `astchunk` is pulled automatically from PyPI as a dependency; no extra steps.
#### For developers (from source, editable)
```bash
git clone https://github.com/yichuan-w/LEANN.git leann
cd leann
git submodule update --init --recursive
uv sync
```
- This repo vendors `astchunk` as a git submodule at `packages/astchunk-leann` (our fork).
- `[tool.uv.sources]` maps the `astchunk` package to that path in editable mode.
- You can edit code under `packages/astchunk-leann` and Python will use your changes immediately (no separate `pip install astchunk` needed).
## Best Practices
### When to Use AST Chunking

View File

@@ -1,149 +0,0 @@
# LEANN Grep Search Usage Guide
## Overview
LEANN's grep search functionality provides exact text matching for finding specific code patterns, error messages, function names, or exact phrases in your indexed documents.
## Basic Usage
### Simple Grep Search
```python
from leann.api import LeannSearcher
searcher = LeannSearcher("your_index_path")
# Exact text search
results = searcher.search("def authenticate_user", use_grep=True, top_k=5)
for result in results:
print(f"Score: {result.score}")
print(f"Text: {result.text[:100]}...")
print("-" * 40)
```
### Comparison: Semantic vs Grep Search
```python
# Semantic search - finds conceptually similar content
semantic_results = searcher.search("machine learning algorithms", top_k=3)
# Grep search - finds exact text matches
grep_results = searcher.search("def train_model", use_grep=True, top_k=3)
```
## When to Use Grep Search
### Use Cases
- **Code Search**: Finding specific function definitions, class names, or variable references
- **Error Debugging**: Locating exact error messages or stack traces
- **Documentation**: Finding specific API endpoints or exact terminology
### Examples
```python
# Find function definitions
functions = searcher.search("def __init__", use_grep=True)
# Find import statements
imports = searcher.search("from sklearn import", use_grep=True)
# Find specific error types
errors = searcher.search("FileNotFoundError", use_grep=True)
# Find TODO comments
todos = searcher.search("TODO:", use_grep=True)
# Find configuration entries
configs = searcher.search("server_port=", use_grep=True)
```
## Technical Details
### How It Works
1. **File Location**: Grep search operates on the raw text stored in `.jsonl` files
2. **Command Execution**: Uses the system `grep` command with case-insensitive search
3. **Result Processing**: Parses JSON lines and extracts text and metadata
4. **Scoring**: Simple frequency-based scoring based on query term occurrences
### Search Process
```
Query: "def train_model"
grep -i -n "def train_model" documents.leann.passages.jsonl
Parse matching JSON lines
Calculate scores based on term frequency
Return top_k results
```
### Scoring Algorithm
```python
# Term frequency in document
score = text.lower().count(query.lower())
```
Results are ranked by score (highest first), with higher scores indicating more occurrences of the search term.
## Error Handling
### Common Issues
#### Grep Command Not Found
```
RuntimeError: grep command not found. Please install grep or use semantic search.
```
**Solution**: Install grep on your system:
- **Ubuntu/Debian**: `sudo apt-get install grep`
- **macOS**: grep is pre-installed
- **Windows**: Use WSL or install grep via Git Bash/MSYS2
#### No Results Found
```python
# Check if your query exists in the raw data
results = searcher.search("your_query", use_grep=True)
if not results:
print("No exact matches found. Try:")
print("1. Check spelling and case")
print("2. Use partial terms")
print("3. Switch to semantic search")
```
## Complete Example
```python
#!/usr/bin/env python3
"""
Grep Search Example
Demonstrates grep search for exact text matching.
"""
from leann.api import LeannSearcher
def demonstrate_grep_search():
# Initialize searcher
searcher = LeannSearcher("my_index")
print("=== Function Search ===")
functions = searcher.search("def __init__", use_grep=True, top_k=5)
for i, result in enumerate(functions, 1):
print(f"{i}. Score: {result.score}")
print(f" Preview: {result.text[:60]}...")
print()
print("=== Error Search ===")
errors = searcher.search("FileNotFoundError", use_grep=True, top_k=3)
for result in errors:
print(f"Content: {result.text.strip()}")
print("-" * 40)
if __name__ == "__main__":
demonstrate_grep_search()
```

View File

@@ -1,380 +0,0 @@
"""
Dynamic add example for LEANN using HNSW backend without recompute.
- Builds a base index from a directory of documents
- Incrementally adds new documents without recomputing stored embeddings
Defaults:
- Base data: /Users/yichuan/Desktop/code/LEANN/leann/data
- Incremental data: /Users/yichuan/Desktop/code/LEANN/leann/test_add
- Index path: <index_dir>/documents.leann
Usage examples:
uv run python examples/dynamic_add_leann_no_recompute.py --build-base \
--base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \
--index-dir ./test_doc_files
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
--index-dir ./test_doc_files
Quick recompute test (both true):
# Recompute build
uv run python examples/dynamic_add_leann_no_recompute.py --build-base \
--recompute-build --ef-construction 200 \
--base-dir /Users/yichuan/Desktop/code/LEANN/leann/data \
--index-dir ./test_doc_files --index-name documents.leann
# Recompute add
uv run python examples/dynamic_add_leann_no_recompute.py --add-incremental \
--recompute-add --ef-construction 32 \
--add-dir /Users/yichuan/Desktop/code/LEANN/leann/test_add \
--index-dir ./test_doc_files --index-name documents.leann
"""
import argparse
import json
import pickle
import sys
from pathlib import Path
from typing import Any, Optional
# Ensure we can import from the local packages and apps folders
ROOT = Path(__file__).resolve().parents[1]
CORE_SRC = ROOT / "packages" / "leann-core" / "src"
HNSW_PKG_DIR = ROOT / "packages" / "leann-backend-hnsw"
APPS_DIR = ROOT / "apps"
# Prefer the installed backend if available (it contains the compiled extension)
def _prefer_installed(pkg_name: str) -> bool:
try:
import importlib
import importlib.util
spec = importlib.util.find_spec(pkg_name)
if spec and spec.origin and "site-packages" in spec.origin:
# ensure the faiss shim/extension is importable from the installed package
importlib.import_module(f"{pkg_name}.faiss")
return True
except Exception:
pass
return False
# Prepend paths, but only add the repo backend if the installed one is not present
paths_to_prepend = [CORE_SRC, APPS_DIR]
if not _prefer_installed("leann_backend_hnsw"):
paths_to_prepend.insert(1, HNSW_PKG_DIR)
for p in paths_to_prepend:
p_str = str(p)
if p_str not in sys.path:
sys.path.insert(0, p_str)
# Defer non-stdlib imports until after sys.path setup within functions (avoid E402)
def _load_documents(data_dir: str, required_exts: Optional[list[str]] = None) -> list[Any]:
from llama_index.core import SimpleDirectoryReader # type: ignore
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
if required_exts:
reader_kwargs["required_exts"] = required_exts
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
return documents
def _ensure_index_dir(index_dir: Path) -> None:
index_dir.mkdir(parents=True, exist_ok=True)
def _index_files(index_path: Path) -> tuple[Path, Path, Path]:
"""Return (passages.jsonl, passages.idx, index.index) paths for a given index base path.
Note: HNSWBackend writes the FAISS index using the stem (without .leann),
i.e., for base 'documents.leann' the file is 'documents.index'. We prefer the
existing file among candidates.
"""
passages_file = index_path.parent / f"{index_path.name}.passages.jsonl"
offsets_file = index_path.parent / f"{index_path.name}.passages.idx"
candidate_name_index = index_path.parent / f"{index_path.name}.index"
candidate_stem_index = index_path.parent / f"{index_path.stem}.index"
index_file = candidate_stem_index if candidate_stem_index.exists() else candidate_name_index
return passages_file, offsets_file, index_file
def _read_meta(index_path: Path) -> dict[str, Any]:
meta_path = index_path.parent / f"{index_path.name}.meta.json"
if not meta_path.exists():
raise FileNotFoundError(f"Metadata file not found: {meta_path}")
with open(meta_path, encoding="utf-8") as f:
return json.load(f)
def _autodetect_index_base(index_dir: Path) -> Optional[Path]:
"""If exactly one *.leann.meta.json exists, return its base path (without .meta.json)."""
candidates = list(index_dir.glob("*.leann.meta.json"))
if len(candidates) == 1:
meta = candidates[0]
base = meta.with_suffix("") # remove .json
base = base.with_suffix("") # remove .meta
return base
return None
def _load_offset_map(offsets_file: Path) -> dict[str, int]:
if not offsets_file.exists():
return {}
with open(offsets_file, "rb") as f:
return pickle.load(f)
def _next_numeric_id(existing_ids: list[str]) -> int:
numeric_ids = [int(x) for x in existing_ids if x.isdigit()]
if not numeric_ids:
return 0
return max(numeric_ids) + 1
def build_base_index(
base_dir: str,
index_dir: str,
index_name: str,
embedding_model: str,
embedding_mode: str,
chunk_size: int,
chunk_overlap: int,
file_types: Optional[list[str]] = None,
max_items: int = -1,
ef_construction: Optional[int] = None,
recompute_build: bool = False,
) -> str:
print(f"Building base index from: {base_dir}")
documents = _load_documents(base_dir, required_exts=file_types)
if not documents:
raise ValueError(f"No documents found in base_dir: {base_dir}")
from chunking import create_text_chunks
texts = create_text_chunks(
documents,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
use_ast_chunking=False,
)
if max_items > 0 and len(texts) > max_items:
texts = texts[:max_items]
print(f"Limiting to {max_items} chunks")
index_dir_path = Path(index_dir)
_ensure_index_dir(index_dir_path)
index_path = index_dir_path / index_name
print("Creating HNSW index (non-compact)...")
from leann.api import LeannBuilder
from leann.registry import register_project_directory
builder = LeannBuilder(
backend_name="hnsw",
embedding_model=embedding_model,
embedding_mode=embedding_mode,
is_recompute=recompute_build,
is_compact=False,
efConstruction=(ef_construction if ef_construction is not None else 200),
)
for t in texts:
builder.add_text(t)
builder.build_index(str(index_path))
# Register for discovery
register_project_directory(Path.cwd())
print(f"Base index built at: {index_path}")
return str(index_path)
def add_incremental(
add_dir: str,
index_dir: str,
index_name: Optional[str] = None,
embedding_model: Optional[str] = None,
embedding_mode: Optional[str] = None,
chunk_size: int = 256,
chunk_overlap: int = 128,
file_types: Optional[list[str]] = None,
max_items: int = -1,
ef_construction: Optional[int] = None,
recompute_add: bool = False,
) -> str:
print(f"Adding incremental data from: {add_dir}")
index_dir_path = Path(index_dir)
index_path = index_dir_path / (index_name or "documents.leann")
# If specified base doesn't exist, try to auto-detect an existing base
try:
_read_meta(index_path)
except FileNotFoundError:
auto_base = _autodetect_index_base(index_dir_path)
if auto_base is not None:
print(f"Auto-detected index base: {auto_base.name}")
index_path = auto_base
_read_meta(index_path)
else:
raise FileNotFoundError(
f"No index metadata found for base '{index_path.name}'. Build base first with --build-base "
f"or provide --index-name to match an existing index (e.g., 'test_doc_files.leann')."
)
# Prepare validated context from core (checks backend/no-recompute and resolves embedding defaults)
from leann.api import create_incremental_add_context, incremental_add_texts_with_context
ctx = create_incremental_add_context(
str(index_path),
embedding_model=embedding_model,
embedding_mode=embedding_mode,
data_dir=add_dir,
required_exts=file_types,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
max_items=max_items,
)
# Use prepared texts from context to perform the add
prepared_texts = ctx.prepared_texts or []
if not prepared_texts:
print("No new chunks to add.")
return str(index_path)
added = incremental_add_texts_with_context(
ctx,
prepared_texts,
ef_construction=ef_construction,
recompute=recompute_add,
)
print(f"Incremental add completed. Added {added} chunks. Index: {index_path}")
return str(index_path)
def main():
parser = argparse.ArgumentParser(
description="Dynamic add to LEANN HNSW index without recompute",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--build-base", action="store_true", help="Build base index")
parser.add_argument("--add-incremental", action="store_true", help="Add incremental data")
parser.add_argument(
"--base-dir",
type=str,
default="/Users/yichuan/Desktop/code/LEANN/leann/data",
help="Base data directory",
)
parser.add_argument(
"--add-dir",
type=str,
default="/Users/yichuan/Desktop/code/LEANN/leann/test_add",
help="Incremental data directory",
)
parser.add_argument(
"--index-dir",
type=str,
default="./test_doc_files",
help="Directory containing the index",
)
parser.add_argument(
"--index-name",
type=str,
default="documents.leann",
help=(
"Index base file name. If you built via document_rag.py, use 'test_doc_files.leann'. "
"Default: documents.leann"
),
)
parser.add_argument(
"--embedding-model",
type=str,
default="facebook/contriever",
help="Embedding model name",
)
parser.add_argument(
"--embedding-mode",
type=str,
default="sentence-transformers",
choices=["sentence-transformers", "openai", "mlx", "ollama"],
help="Embedding backend mode",
)
parser.add_argument("--chunk-size", type=int, default=256)
parser.add_argument("--chunk-overlap", type=int, default=128)
parser.add_argument("--file-types", nargs="+", default=None)
parser.add_argument("--max-items", type=int, default=-1)
parser.add_argument("--ef-construction", type=int, default=32)
parser.add_argument(
"--recompute-add", action="store_true", help="Enable recompute-mode add (non-compact only)"
)
parser.add_argument(
"--recompute-build",
action="store_true",
help="Enable recompute-mode base build (non-compact only)",
)
args = parser.parse_args()
if not args.build_base and not args.add_incremental:
print("Nothing to do. Use --build-base and/or --add-incremental.")
return
index_path_str: Optional[str] = None
if args.build_base:
index_path_str = build_base_index(
base_dir=args.base_dir,
index_dir=args.index_dir,
index_name=args.index_name,
embedding_model=args.embedding_model,
embedding_mode=args.embedding_mode,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
file_types=args.file_types,
max_items=args.max_items,
ef_construction=args.ef_construction,
recompute_build=args.recompute_build,
)
if args.add_incremental:
index_path_str = add_incremental(
add_dir=args.add_dir,
index_dir=args.index_dir,
index_name=args.index_name,
embedding_model=args.embedding_model,
embedding_mode=args.embedding_mode,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
file_types=args.file_types,
max_items=args.max_items,
ef_construction=args.ef_construction,
recompute_add=args.recompute_add,
)
# Optional: quick test query using searcher
if index_path_str:
try:
from leann.api import LeannSearcher
searcher = LeannSearcher(index_path_str)
query = "what is LEANN?"
if args.add_incremental:
query = "what is the multi vector search and how it works?"
results = searcher.search(query, top_k=5)
if results:
print(f"Sample result: {results[0].text[:80]}...")
except Exception:
pass
if __name__ == "__main__":
main()

View File

@@ -1,35 +0,0 @@
"""
Grep Search Example
Shows how to use grep-based text search instead of semantic search.
Useful when you need exact text matches rather than meaning-based results.
"""
from leann import LeannSearcher
# Load your index
searcher = LeannSearcher("my-documents.leann")
# Regular semantic search
print("=== Semantic Search ===")
results = searcher.search("machine learning algorithms", top_k=3)
for result in results:
print(f"Score: {result.score:.3f}")
print(f"Text: {result.text[:80]}...")
print()
# Grep-based search for exact text matches
print("=== Grep Search ===")
results = searcher.search("def train_model", top_k=3, use_grep=True)
for result in results:
print(f"Score: {result.score}")
print(f"Text: {result.text[:80]}...")
print()
# Find specific error messages
error_results = searcher.search("FileNotFoundError", use_grep=True)
print(f"Found {len(error_results)} files mentioning FileNotFoundError")
# Search for function definitions
func_results = searcher.search("class SearchResult", use_grep=True, top_k=5)
print(f"Found {len(func_results)} class definitions")

View File

@@ -1,28 +0,0 @@
# llms.txt — LEANN MCP and Agent Integration
product: LEANN
homepage: https://github.com/yichuan-w/LEANN
contact: https://github.com/yichuan-w/LEANN/issues
# Installation
install: uv tool install leann-core --with leann
# MCP Server Entry Point
mcp.server: leann_mcp
mcp.protocol_version: 2024-11-05
# Tools
mcp.tools: leann_list, leann_search
mcp.tool.leann_list.description: List available LEANN indexes
mcp.tool.leann_list.input: {}
mcp.tool.leann_search.description: Semantic search across a named LEANN index
mcp.tool.leann_search.input.index_name: string, required
mcp.tool.leann_search.input.query: string, required
mcp.tool.leann_search.input.top_k: integer, optional, default=5, min=1, max=20
mcp.tool.leann_search.input.complexity: integer, optional, default=32, min=16, max=128
# Notes
note: Build indexes with `leann build <name> --docs <files...>` before searching.
example.add: claude mcp add --scope user leann-server -- leann_mcp
example.verify: claude mcp list | cat

View File

@@ -4,8 +4,8 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-diskann"
version = "0.3.4"
dependencies = ["leann-core==0.3.4", "numpy", "protobuf>=3.19.0"]
version = "0.3.2"
dependencies = ["leann-core==0.3.2", "numpy", "protobuf>=3.19.0"]
[tool.scikit-build]
# Key: simplified CMake path

View File

@@ -49,28 +49,9 @@ set(BUILD_TESTING OFF CACHE BOOL "" FORCE)
set(FAISS_ENABLE_C_API OFF CACHE BOOL "" FORCE)
set(FAISS_OPT_LEVEL "generic" CACHE STRING "" FORCE)
# Disable x86-specific SIMD optimizations (important for ARM64 compatibility)
# Disable additional SIMD versions to speed up compilation
set(FAISS_ENABLE_AVX2 OFF CACHE BOOL "" FORCE)
set(FAISS_ENABLE_AVX512 OFF CACHE BOOL "" FORCE)
set(FAISS_ENABLE_SSE4_1 OFF CACHE BOOL "" FORCE)
# ARM64-specific configuration
if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|arm64")
message(STATUS "Configuring Faiss for ARM64 architecture")
if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
# Use SVE optimization level for ARM64 Linux (as seen in Faiss conda build)
set(FAISS_OPT_LEVEL "sve" CACHE STRING "" FORCE)
message(STATUS "Setting FAISS_OPT_LEVEL to 'sve' for ARM64 Linux")
else()
# Use generic optimization for other ARM64 platforms (like macOS)
set(FAISS_OPT_LEVEL "generic" CACHE STRING "" FORCE)
message(STATUS "Setting FAISS_OPT_LEVEL to 'generic' for ARM64 ${CMAKE_SYSTEM_NAME}")
endif()
# ARM64 compatibility: Faiss submodule has been modified to fix x86 header inclusion
message(STATUS "Using ARM64-compatible Faiss submodule")
endif()
# Additional optimization options from INSTALL.md
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "" FORCE)

View File

@@ -15,7 +15,6 @@ from leann.registry import register_backend
from leann.searcher_base import BaseSearcher
from .convert_to_csr import convert_hnsw_graph_to_csr
from .prune_index import prune_embeddings_preserve_graph_inplace
logger = logging.getLogger(__name__)
@@ -91,16 +90,8 @@ class HNSWBuilder(LeannBackendBuilderInterface):
index_file = index_dir / f"{index_prefix}.index"
faiss.write_index(index, str(index_file))
if self.is_recompute:
if self.is_compact:
self._convert_to_csr(index_file)
else:
# Non-compact format: prune only embeddings, keep original graph
ok = prune_embeddings_preserve_graph_inplace(str(index_file))
if not ok:
raise RuntimeError(
"Pruning embeddings while preserving graph failed for non-compact index"
)
if self.is_compact:
self._convert_to_csr(index_file)
def _convert_to_csr(self, index_file: Path):
"""Convert built index to CSR format"""
@@ -157,13 +148,7 @@ class HNSWSearcher(BaseSearcher):
self.is_pruned
) # In C++ code, it's called is_recompute, but it's only for loading IIUC.
# If pruned (recompute mode), explicitly skip storage to avoid reading
# the pruned section. Still allow MMAP for graph.
io_flags = faiss.IO_FLAG_MMAP
if self.is_pruned:
io_flags |= faiss.IO_FLAG_SKIP_STORAGE
self._index = faiss.read_index(str(index_file), io_flags, hnsw_config)
self._index = faiss.read_index(str(index_file), faiss.IO_FLAG_MMAP, hnsw_config)
def search(
self,
@@ -266,55 +251,3 @@ class HNSWSearcher(BaseSearcher):
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
return {"labels": string_labels, "distances": distances}
# ---------- Helper API for incremental add (Python-level) ----------
def add_vectors(
index_file_path: str,
embeddings: np.ndarray,
*,
ef_construction: Optional[int] = None,
recompute: bool = False,
) -> None:
"""Append vectors to an existing non-compact HNSW index.
Args:
index_file_path: Path to the HNSW .index file
embeddings: float32 numpy array (N, D)
ef_construction: Optional override for efConstruction during insertion
recompute: Reserved for future use to control insertion-time recompute behaviors
"""
from . import faiss # type: ignore
if embeddings.dtype != np.float32:
embeddings = embeddings.astype(np.float32)
if not embeddings.flags.c_contiguous:
embeddings = np.ascontiguousarray(embeddings, dtype=np.float32)
# Load index normally to ensure storage is present; toggle is_recompute on the object
index = faiss.read_index(str(index_file_path), faiss.IO_FLAG_MMAP)
# Best-effort: explicitly set flag on the object if the binding exposes it
try:
index.is_recompute = bool(recompute)
except Exception:
pass
try:
if ef_construction is not None:
index.hnsw.efConstruction = int(ef_construction)
except Exception:
# Best-effort; ignore if backend doesn't expose setter
pass
# For non-compact HNSW, calling add directly is sufficient. When is_recompute is set
# (via config or attribute), FAISS will run the insertion/search path accordingly.
# To strictly follow per-point insert semantics in recompute mode, add one-by-one.
if recompute:
# Insert row by row
n = embeddings.shape[0]
for i in range(n):
row = embeddings[i : i + 1]
index.add(1, faiss.swig_ptr(row))
else:
index.add(embeddings.shape[0], faiss.swig_ptr(embeddings))
faiss.write_index(index, str(index_file_path))

View File

@@ -1,149 +0,0 @@
import os
import struct
from pathlib import Path
from .convert_to_csr import (
EXPECTED_HNSW_FOURCCS,
NULL_INDEX_FOURCC,
read_struct,
read_vector_raw,
)
def _write_vector_raw(f_out, count: int, data_bytes: bytes) -> None:
"""Write a vector in the same binary layout as read_vector_raw reads: <Q count> + raw bytes."""
f_out.write(struct.pack("<Q", count))
if count > 0 and data_bytes:
f_out.write(data_bytes)
def prune_embeddings_preserve_graph(input_filename: str, output_filename: str) -> bool:
"""
Copy an original (non-compact) HNSW index file while pruning the trailing embedding storage.
Preserves the graph structure and metadata exactly; only writes a NULL storage marker instead of
the original storage fourcc and payload.
Returns True on success.
"""
print(f"Pruning embeddings from {input_filename} to {output_filename}")
print("--------------------------------")
# running in mode is-recompute=True and is-compact=False
in_path = Path(input_filename)
out_path = Path(output_filename)
try:
with open(in_path, "rb") as f_in, open(out_path, "wb") as f_out:
# Header
index_fourcc = read_struct(f_in, "<I")
if index_fourcc not in EXPECTED_HNSW_FOURCCS:
# Still proceed, but this is unexpected
pass
f_out.write(struct.pack("<I", index_fourcc))
d = read_struct(f_in, "<i")
ntotal_hdr = read_struct(f_in, "<q")
dummy1 = read_struct(f_in, "<q")
dummy2 = read_struct(f_in, "<q")
is_trained = read_struct(f_in, "?")
metric_type = read_struct(f_in, "<i")
f_out.write(struct.pack("<i", d))
f_out.write(struct.pack("<q", ntotal_hdr))
f_out.write(struct.pack("<q", dummy1))
f_out.write(struct.pack("<q", dummy2))
f_out.write(struct.pack("<?", is_trained))
f_out.write(struct.pack("<i", metric_type))
if metric_type > 1:
metric_arg = read_struct(f_in, "<f")
f_out.write(struct.pack("<f", metric_arg))
# Vectors: assign_probas (double), cum_nneighbor_per_level (int32), levels (int32)
cnt, data = read_vector_raw(f_in, "d")
_write_vector_raw(f_out, cnt, data)
cnt, data = read_vector_raw(f_in, "i")
_write_vector_raw(f_out, cnt, data)
cnt, data = read_vector_raw(f_in, "i")
_write_vector_raw(f_out, cnt, data)
# Probe potential extra alignment/flag byte present in some original formats
probe = f_in.read(1)
if probe:
if probe == b"\x00":
# Preserve this unexpected 0x00 byte
f_out.write(probe)
else:
# Likely part of the next vector; rewind
f_in.seek(-1, os.SEEK_CUR)
# Offsets (uint64) and neighbors (int32)
cnt, data = read_vector_raw(f_in, "Q")
_write_vector_raw(f_out, cnt, data)
cnt, data = read_vector_raw(f_in, "i")
_write_vector_raw(f_out, cnt, data)
# Scalar params
entry_point = read_struct(f_in, "<i")
max_level = read_struct(f_in, "<i")
ef_construction = read_struct(f_in, "<i")
ef_search = read_struct(f_in, "<i")
dummy_upper_beam = read_struct(f_in, "<i")
f_out.write(struct.pack("<i", entry_point))
f_out.write(struct.pack("<i", max_level))
f_out.write(struct.pack("<i", ef_construction))
f_out.write(struct.pack("<i", ef_search))
f_out.write(struct.pack("<i", dummy_upper_beam))
# Storage fourcc (if present) — write NULL marker and drop any remaining data
try:
read_struct(f_in, "<I")
# Regardless of original, write NULL
f_out.write(struct.pack("<I", NULL_INDEX_FOURCC))
# Discard the rest of the file (embedding payload)
# (Do not copy anything else)
except EOFError:
# No storage section; nothing else to write
pass
return True
except Exception:
# Best-effort cleanup
try:
if out_path.exists():
out_path.unlink()
except OSError:
pass
return False
def prune_embeddings_preserve_graph_inplace(index_file_path: str) -> bool:
"""
Convenience wrapper: write pruned file to a temporary path next to the
original, then atomically replace on success.
"""
print(f"Pruning embeddings from {index_file_path} to {index_file_path}")
print("--------------------------------")
# running in mode is-recompute=True and is-compact=False
src = Path(index_file_path)
tmp = src.with_suffix(".pruned.tmp")
ok = prune_embeddings_preserve_graph(str(src), str(tmp))
if not ok:
if tmp.exists():
try:
tmp.unlink()
except OSError:
pass
return False
try:
os.replace(str(tmp), str(src))
except Exception:
# Rollback on failure
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
return False
return True

View File

@@ -6,10 +6,10 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-hnsw"
version = "0.3.4"
version = "0.3.2"
description = "Custom-built HNSW (Faiss) backend for the Leann toolkit."
dependencies = [
"leann-core==0.3.4",
"leann-core==0.3.2",
"numpy",
"pyzmq>=23.0.0",
"msgpack>=1.0.0",

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann-core"
version = "0.3.4"
version = "0.3.2"
description = "Core API and plugin system for LEANN"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -5,10 +5,7 @@ with the correct, original embedding logic from the user's reference code.
import json
import logging
import os
import pickle
import re
import subprocess
import time
import warnings
from dataclasses import dataclass, field
@@ -20,7 +17,6 @@ import numpy as np
from leann.interface import LeannBackendSearcherInterface
from .chat import get_llm
from .embedding_server_manager import EmbeddingServerManager
from .interface import LeannBackendFactoryInterface
from .metadata_filter import MetadataFilterEngine
from .registry import BACKEND_REGISTRY
@@ -120,20 +116,6 @@ class SearchResult:
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class IncrementalAddContext:
"""Prepared context for safe incremental add operations on an index."""
index_path: str
passages_file: Path
offsets_file: Path
vector_index_file: Path
embedding_model: str
embedding_mode: str
distance_metric: str
prepared_texts: Optional[list[str]] = None
class PassageManager:
def __init__(
self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None
@@ -492,7 +474,9 @@ class LeannBuilder:
is_compact = self.backend_kwargs.get("is_compact", True)
is_recompute = self.backend_kwargs.get("is_recompute", True)
meta_data["is_compact"] = is_compact
meta_data["is_pruned"] = is_recompute # Pruned only if compact and recompute
meta_data["is_pruned"] = (
is_compact and is_recompute
) # Pruned only if compact and recompute
with open(leann_meta_path, "w", encoding="utf-8") as f:
json.dump(meta_data, f, indent=2)
@@ -669,7 +653,6 @@ class LeannSearcher:
expected_zmq_port: int = 5557,
metadata_filters: Optional[dict[str, dict[str, Union[str, int, float, bool, list]]]] = None,
batch_size: int = 0,
use_grep: bool = False,
**kwargs,
) -> list[SearchResult]:
"""
@@ -696,10 +679,6 @@ class LeannSearcher:
Returns:
List of SearchResult objects with text, metadata, and similarity scores
"""
# Handle grep search
if use_grep:
return self._grep_search(query, top_k)
logger.info("🔍 LeannSearcher.search() called:")
logger.info(f" Query: '{query}'")
logger.info(f" Top_k: {top_k}")
@@ -816,96 +795,9 @@ class LeannSearcher:
logger.info(f" {GREEN}✓ Final enriched results: {len(enriched_results)} passages{RESET}")
return enriched_results
def _find_jsonl_file(self) -> Optional[str]:
"""Find the .jsonl file containing raw passages for grep search"""
index_path = Path(self.meta_path_str).parent
potential_files = [
index_path / "documents.leann.passages.jsonl",
index_path.parent / "documents.leann.passages.jsonl",
]
for file_path in potential_files:
if file_path.exists():
return str(file_path)
return None
def _grep_search(self, query: str, top_k: int = 5) -> list[SearchResult]:
"""Perform grep-based search on raw passages"""
jsonl_file = self._find_jsonl_file()
if not jsonl_file:
raise FileNotFoundError("No .jsonl passages file found for grep search")
try:
cmd = ["grep", "-i", "-n", query, jsonl_file]
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode == 1:
return []
elif result.returncode != 0:
raise RuntimeError(f"Grep failed: {result.stderr}")
matches = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split(":", 1)
if len(parts) != 2:
continue
try:
data = json.loads(parts[1])
text = data.get("text", "")
score = text.lower().count(query.lower())
matches.append(
SearchResult(
id=data.get("id", parts[0]),
text=text,
metadata=data.get("metadata", {}),
score=float(score),
)
)
except json.JSONDecodeError:
continue
matches.sort(key=lambda x: x.score, reverse=True)
return matches[:top_k]
except FileNotFoundError:
raise RuntimeError(
"grep command not found. Please install grep or use semantic search."
)
def _python_regex_search(self, query: str, top_k: int = 5) -> list[SearchResult]:
"""Fallback regex search"""
jsonl_file = self._find_jsonl_file()
if not jsonl_file:
raise FileNotFoundError("No .jsonl file found")
pattern = re.compile(re.escape(query), re.IGNORECASE)
matches = []
with open(jsonl_file, encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
if pattern.search(line):
try:
data = json.loads(line.strip())
matches.append(
SearchResult(
id=data.get("id", str(line_num)),
text=data.get("text", ""),
metadata=data.get("metadata", {}),
score=float(len(pattern.findall(data.get("text", "")))),
)
)
except json.JSONDecodeError:
continue
matches.sort(key=lambda x: x.score, reverse=True)
return matches[:top_k]
def cleanup(self):
"""Explicitly cleanup embedding server resources.
This method should be called after you're done using the searcher,
especially in test environments or batch processing scenarios.
"""
@@ -961,7 +853,6 @@ class LeannChat:
expected_zmq_port: int = 5557,
metadata_filters: Optional[dict[str, dict[str, Union[str, int, float, bool, list]]]] = None,
batch_size: int = 0,
use_grep: bool = False,
**search_kwargs,
):
if llm_kwargs is None:
@@ -1032,405 +923,8 @@ class LeannChat:
except Exception:
pass
# ------------------------------
# Incremental Add Utilities (HNSW no-recompute only)
# ------------------------------
def _resolve_index_paths(index_path: str) -> tuple[Path, Path, Path]:
"""Given base index path (without extension), return (passages.jsonl, passages.idx, vector.index).
For HNSW, vector index file is typically <stem>.index (e.g., documents.index) even when base is
'documents.leann'. We prefer an existing <stem>.index, otherwise fall back to <name>.index.
"""
base = Path(index_path)
passages_file = base.parent / f"{base.name}.passages.jsonl"
offsets_file = base.parent / f"{base.name}.passages.idx"
candidate_name_index = base.parent / f"{base.name}.index"
candidate_stem_index = base.parent / f"{base.stem}.index"
vector_index_file = (
candidate_stem_index if candidate_stem_index.exists() else candidate_name_index
)
return passages_file, offsets_file, vector_index_file
def _read_meta_file(index_path: str) -> dict[str, Any]:
meta_path = Path(f"{index_path}.meta.json")
if not meta_path.exists():
raise FileNotFoundError(f"Leann metadata file not found: {meta_path}")
with open(meta_path, encoding="utf-8") as f:
return json.load(f)
def _load_offset_map_pickle(offsets_file: Path) -> dict[str, int]:
if not offsets_file.exists():
return {}
with open(offsets_file, "rb") as f:
return pickle.load(f)
def _append_passages_and_update_offsets(
passages_file: Path, offsets_file: Path, new_texts: list[str]
) -> list[str]:
"""Append new texts to passages file, update offset map, and return assigned string IDs.
IDs are assigned as incrementing integers based on existing keys in the offset map.
"""
offset_map = _load_offset_map_pickle(offsets_file)
# Compute next numeric id
numeric_ids = [int(x) for x in offset_map.keys() if str(x).isdigit()]
next_id_num = (max(numeric_ids) + 1) if numeric_ids else 0
assigned_ids: list[str] = []
with open(passages_file, "a", encoding="utf-8") as f:
for text in new_texts:
offset = f.tell()
str_id = str(next_id_num)
json.dump({"id": str_id, "text": text, "metadata": {}}, f, ensure_ascii=False)
f.write("\n")
offset_map[str_id] = offset
assigned_ids.append(str_id)
next_id_num += 1
with open(offsets_file, "wb") as f:
pickle.dump(offset_map, f)
return assigned_ids
def incremental_add_texts(
index_path: str,
texts: list[str],
*,
embedding_model: Optional[str] = None,
embedding_mode: Optional[str] = None,
ef_construction: Optional[int] = None,
recompute: bool = False,
) -> int:
"""Incrementally add text chunks to an existing HNSW index built with no-recompute.
- Validates backend is HNSW and index is non-compact (no-recompute path)
- Appends passages and offsets
- Computes embeddings and appends to the HNSW vector index
Returns number of added chunks.
"""
if not texts:
return 0
meta = _read_meta_file(index_path)
if meta.get("backend_name") != "hnsw":
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
if meta.get("is_compact", True):
raise RuntimeError(
"Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add."
)
passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path)
if not vector_index_file.exists():
raise FileNotFoundError(
f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder."
)
# Resolve embedding config from meta if not provided
model_name = embedding_model or meta.get("embedding_model", "facebook/contriever")
mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers")
# Append passages and update offsets
assigned_ids = _append_passages_and_update_offsets(passages_file, offsets_file, texts)
# Compute embeddings
# Embedding computation path
esm = None
port = None
if recompute:
# Determine distance metric early for server config
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
# Start embedding server and compute via ZMQ for consistency with recompute semantics
passages_source_file = f"{index_path}.meta.json"
esm = EmbeddingServerManager(
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
)
started, port = esm.start_server(
port=5557,
model_name=model_name,
embedding_mode=mode_name,
passages_file=passages_source_file,
distance_metric=distance_metric,
enable_warmup=False,
)
if not started:
raise RuntimeError("Failed to start embedding server for recompute add")
embeddings = compute_embeddings_via_server(texts, model_name, port)
else:
embeddings = compute_embeddings(
texts,
model_name=model_name,
mode=mode_name,
use_server=False,
is_build=True,
)
# Normalize for cosine if needed
if "distance_metric" not in locals():
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
if distance_metric == "cosine":
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
embeddings = embeddings / norms
# Append via backend helper (supports ef_construction/recompute plumbing)
try:
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
except Exception as e:
raise RuntimeError(
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
) from e
# Propagate ZMQ port to FAISS add path when recompute is True
if recompute and port is not None:
os.environ["LEANN_ZMQ_PORT"] = str(port)
hnsw_add_vectors(
str(vector_index_file),
embeddings,
ef_construction=ef_construction,
recompute=recompute,
)
# Stop server after add when recompute path used
if esm is not None:
def __del__(self):
try:
esm.stop_server()
self.cleanup()
except Exception:
pass
# Sanity: ids length should match embeddings rows
if len(assigned_ids) != embeddings.shape[0]:
warnings.warn(
f"Assigned {len(assigned_ids)} IDs but computed {embeddings.shape[0]} embeddings.",
UserWarning,
stacklevel=2,
)
return len(assigned_ids)
def create_incremental_add_context(
index_path: str,
*,
# Optional embedding choices; if None will use meta
embedding_model: Optional[str] = None,
embedding_mode: Optional[str] = None,
# Optional data-to-text preparation in context
data_dir: Optional[str] = None,
required_exts: Optional[list[str]] = None,
chunk_size: int = 256,
chunk_overlap: int = 128,
max_items: int = -1,
) -> IncrementalAddContext:
"""Validate index and prepare context for repeated incremental adds.
Additionally, if data_dir is provided, this function will load documents,
chunk them to texts with the specified parameters, and store them in ctx.prepared_texts.
"""
meta = _read_meta_file(index_path)
if meta.get("backend_name") != "hnsw":
raise RuntimeError("Incremental add is currently supported only for HNSW backend")
if meta.get("is_compact", True):
raise RuntimeError(
"Index is compact/pruned. Rebuild base with is_recompute=False and is_compact=False for incremental add."
)
passages_file, offsets_file, vector_index_file = _resolve_index_paths(index_path)
if not vector_index_file.exists():
raise FileNotFoundError(
f"Vector index file missing: {vector_index_file}. Build base first with LeannBuilder."
)
model_name = embedding_model or meta.get("embedding_model", "facebook/contriever")
mode_name = embedding_mode or meta.get("embedding_mode", "sentence-transformers")
distance_metric = meta.get("backend_kwargs", {}).get("distance_metric", "mips").lower()
prepared_texts: Optional[list[str]] = None
if data_dir is not None:
try:
from llama_index.core import SimpleDirectoryReader # type: ignore
from llama_index.core.node_parser import SentenceSplitter # type: ignore
except Exception as e:
raise RuntimeError(
"llama-index-core is required when using data_dir in create_incremental_add_context"
) from e
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
if required_exts:
reader_kwargs["required_exts"] = required_exts
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
if documents:
splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
)
prepared_texts = []
for doc in documents:
try:
nodes = splitter.get_nodes_from_documents([doc])
if nodes:
prepared_texts.extend([node.get_content() for node in nodes])
except Exception:
content = doc.get_content()
if content and content.strip():
prepared_texts.append(content.strip())
if max_items > 0 and len(prepared_texts) > max_items:
prepared_texts = prepared_texts[:max_items]
return IncrementalAddContext(
index_path=index_path,
passages_file=passages_file,
offsets_file=offsets_file,
vector_index_file=vector_index_file,
embedding_model=model_name,
embedding_mode=mode_name,
distance_metric=distance_metric,
prepared_texts=prepared_texts,
)
def incremental_add_texts_with_context(
ctx: IncrementalAddContext,
texts: list[str],
*,
ef_construction: Optional[int] = None,
recompute: bool = False,
) -> int:
"""Incrementally add texts using a prepared context (no repeated validation).
For non-compact HNSW, ef_construction (efConstruction) can be overridden during insertion.
"""
if not texts:
return 0
# Append passages & offsets
_append_passages_and_update_offsets(ctx.passages_file, ctx.offsets_file, texts)
# Compute embeddings
# Embedding computation path
esm = None
port = None
if recompute:
passages_source_file = f"{ctx.index_path}.meta.json"
esm = EmbeddingServerManager(
backend_module_name="leann_backend_hnsw.hnsw_embedding_server",
)
started, port = esm.start_server(
port=5557,
model_name=ctx.embedding_model,
embedding_mode=ctx.embedding_mode,
passages_file=passages_source_file,
distance_metric=ctx.distance_metric,
enable_warmup=False,
)
if not started:
raise RuntimeError("Failed to start embedding server for recompute add")
embeddings = compute_embeddings_via_server(texts, ctx.embedding_model, port)
else:
embeddings = compute_embeddings(
texts,
model_name=ctx.embedding_model,
mode=ctx.embedding_mode,
use_server=False,
is_build=True,
)
# Normalize for cosine if needed
if ctx.distance_metric == "cosine":
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1
embeddings = embeddings / norms
# Append via backend helper (supports ef_construction/recompute plumbing)
try:
from leann_backend_hnsw.hnsw_backend import add_vectors as hnsw_add_vectors # type: ignore
except Exception as e:
raise RuntimeError(
"Failed to import HNSW backend add helper. Ensure HNSW backend is installed."
) from e
if recompute and port is not None:
os.environ["LEANN_ZMQ_PORT"] = str(port)
hnsw_add_vectors(
str(ctx.vector_index_file),
embeddings,
ef_construction=ef_construction,
recompute=recompute,
)
# Stop server after add when recompute path used
if esm is not None:
try:
esm.stop_server()
except Exception:
pass
return embeddings.shape[0]
def incremental_add_directory(
index_path: str,
data_dir: str,
*,
chunk_size: int = 256,
chunk_overlap: int = 128,
required_exts: Optional[list[str]] = None,
max_items: int = -1,
embedding_model: Optional[str] = None,
embedding_mode: Optional[str] = None,
) -> int:
"""Load documents from a directory, chunk them, and incrementally add to an index.
Chunking uses LlamaIndex SentenceSplitter for simplicity and avoids external app dependencies.
"""
try:
from llama_index.core import SimpleDirectoryReader # type: ignore
from llama_index.core.node_parser import SentenceSplitter # type: ignore
except Exception as e:
raise RuntimeError("llama-index-core is required for incremental_add_directory") from e
reader_kwargs: dict[str, Any] = {"recursive": True, "encoding": "utf-8"}
if required_exts:
reader_kwargs["required_exts"] = required_exts
documents = SimpleDirectoryReader(data_dir, **reader_kwargs).load_data(show_progress=True)
if not documents:
return 0
# Traditional text chunking
splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
)
all_texts: list[str] = []
for doc in documents:
try:
nodes = splitter.get_nodes_from_documents([doc])
if nodes:
all_texts.extend([node.get_content() for node in nodes])
except Exception:
content = doc.get_content()
if content and content.strip():
all_texts.append(content.strip())
if max_items > 0 and len(all_texts) > max_items:
all_texts = all_texts[:max_items]
return incremental_add_texts(
index_path,
all_texts,
embedding_model=embedding_model,
embedding_mode=embedding_mode,
)

View File

@@ -1,5 +1,6 @@
import argparse
import asyncio
import sys
from pathlib import Path
from typing import Any, Optional, Union
@@ -1215,8 +1216,13 @@ Examples:
if use_ast:
print("🧠 Using AST-aware chunking for code files")
try:
# Import enhanced chunking utilities from packaged module
from .chunking_utils import create_text_chunks
# Import enhanced chunking utilities
# Add apps directory to path to import chunking utilities
apps_dir = Path(__file__).parent.parent.parent.parent.parent / "apps"
if apps_dir.exists():
sys.path.insert(0, str(apps_dir))
from chunking import create_text_chunks
# Use enhanced chunking with AST support
all_texts = create_text_chunks(
@@ -1231,9 +1237,7 @@ Examples:
)
except ImportError as e:
print(
f"⚠️ AST chunking utilities not available in package ({e}), falling back to traditional chunking"
)
print(f"⚠️ AST chunking not available ({e}), falling back to traditional chunking")
use_ast = False
if not use_ast:

View File

@@ -2,8 +2,6 @@
Transform your development workflow with intelligent code assistance using LEANN's semantic search directly in Claude Code.
For agent-facing discovery details, see `llms.txt` in the repository root.
## Prerequisites
Install LEANN globally for MCP integration (with default backend):

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann"
version = "0.3.4"
version = "0.3.2"
description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -99,7 +99,6 @@ wechat-exporter = "wechat_exporter.main:main"
leann-core = { path = "packages/leann-core", editable = true }
leann-backend-diskann = { path = "packages/leann-backend-diskann", editable = true }
leann-backend-hnsw = { path = "packages/leann-backend-hnsw", editable = true }
astchunk = { path = "packages/astchunk-leann", editable = true }
[tool.ruff]
target-version = "py39"

45
uv.lock generated
View File

@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.9"
resolution-markers = [
"python_full_version >= '3.12'",
@@ -201,7 +201,7 @@ wheels = [
[[package]]
name = "astchunk"
version = "0.1.0"
source = { editable = "packages/astchunk-leann" }
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy", version = "2.0.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
{ name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.10.*'" },
@@ -214,31 +214,10 @@ dependencies = [
{ name = "tree-sitter-python" },
{ name = "tree-sitter-typescript" },
]
[package.metadata]
requires-dist = [
{ name = "black", marker = "extra == 'dev'", specifier = ">=22.0.0" },
{ name = "flake8", marker = "extra == 'dev'", specifier = ">=5.0.0" },
{ name = "isort", marker = "extra == 'dev'", specifier = ">=5.10.0" },
{ name = "mypy", marker = "extra == 'dev'", specifier = ">=1.0.0" },
{ name = "myst-parser", marker = "extra == 'docs'", specifier = ">=0.18.0" },
{ name = "numpy", specifier = ">=1.20.0" },
{ name = "pre-commit", marker = "extra == 'dev'", specifier = ">=2.20.0" },
{ name = "pyrsistent", specifier = ">=0.18.0" },
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0.0" },
{ name = "pytest", marker = "extra == 'test'", specifier = ">=7.0.0" },
{ name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.0.0" },
{ name = "pytest-cov", marker = "extra == 'test'", specifier = ">=4.0.0" },
{ name = "pytest-xdist", marker = "extra == 'test'", specifier = ">=2.5.0" },
{ name = "sphinx", marker = "extra == 'docs'", specifier = ">=5.0.0" },
{ name = "sphinx-rtd-theme", marker = "extra == 'docs'", specifier = ">=1.0.0" },
{ name = "tree-sitter", specifier = ">=0.20.0" },
{ name = "tree-sitter-c-sharp", specifier = ">=0.20.0" },
{ name = "tree-sitter-java", specifier = ">=0.20.0" },
{ name = "tree-sitter-python", specifier = ">=0.20.0" },
{ name = "tree-sitter-typescript", specifier = ">=0.20.0" },
sdist = { url = "https://files.pythonhosted.org/packages/db/2a/7a35e2fac7d550265ae2ee40651425083b37555f921d1a1b77c3f525e0df/astchunk-0.1.0.tar.gz", hash = "sha256:f4dff0ef8b3b3bcfeac363384db1e153f74d4c825dc2e35864abfab027713be4", size = 18093, upload-time = "2025-06-19T04:37:25.34Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/be/84/5433ab0e933b572750cb16fd7edf3d6c7902b069461a22ec670042752a4d/astchunk-0.1.0-py3-none-any.whl", hash = "sha256:33ada9fc3620807fdda5846fa1948af463f281a60e0d43d4f3782b6dbb416d24", size = 15396, upload-time = "2025-06-19T04:37:23.87Z" },
]
provides-extras = ["dev", "docs", "test"]
[[package]]
name = "asttokens"
@@ -1585,7 +1564,7 @@ name = "importlib-metadata"
version = "8.7.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "zipp", marker = "python_full_version < '3.10'" },
{ name = "zipp" },
]
sdist = { url = "https://files.pythonhosted.org/packages/76/66/650a33bd90f786193e4de4b3ad86ea60b53c89b669a5c7be931fac31cdb0/importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000", size = 56641, upload-time = "2025-04-27T15:29:01.736Z" }
wheels = [
@@ -2138,7 +2117,7 @@ wheels = [
[[package]]
name = "leann-backend-diskann"
version = "0.3.3"
version = "0.3.2"
source = { editable = "packages/leann-backend-diskann" }
dependencies = [
{ name = "leann-core" },
@@ -2150,14 +2129,14 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "leann-core", specifier = "==0.3.3" },
{ name = "leann-core", specifier = "==0.3.2" },
{ name = "numpy" },
{ name = "protobuf", specifier = ">=3.19.0" },
]
[[package]]
name = "leann-backend-hnsw"
version = "0.3.3"
version = "0.3.2"
source = { editable = "packages/leann-backend-hnsw" }
dependencies = [
{ name = "leann-core" },
@@ -2170,7 +2149,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "leann-core", specifier = "==0.3.3" },
{ name = "leann-core", specifier = "==0.3.2" },
{ name = "msgpack", specifier = ">=1.0.0" },
{ name = "numpy" },
{ name = "pyzmq", specifier = ">=23.0.0" },
@@ -2178,7 +2157,7 @@ requires-dist = [
[[package]]
name = "leann-core"
version = "0.3.3"
version = "0.3.2"
source = { editable = "packages/leann-core" }
dependencies = [
{ name = "accelerate" },
@@ -2318,7 +2297,7 @@ test = [
[package.metadata]
requires-dist = [
{ name = "astchunk", editable = "packages/astchunk-leann" },
{ name = "astchunk", specifier = ">=0.1.0" },
{ name = "beautifulsoup4", marker = "extra == 'documents'", specifier = ">=4.13.0" },
{ name = "black", marker = "extra == 'dev'", specifier = ">=23.0" },
{ name = "boto3" },