Compare commits

..

21 Commits

Author SHA1 Message Date
Andy Lee
c8f173c0e5 Merge branch 'main' into arch-eval 2025-08-20 12:25:47 -07:00
Andy Lee
7ea34bd7d0 docs: rpj_wiki 2025-08-20 12:23:46 -07:00
Gabriel Dehan
13bb561aad Add AST-aware code chunking for better code understanding (#58)
* feat(core): Add AST-aware code chunking with astchunk integration

This PR introduces intelligent code chunking that preserves semantic boundaries
(functions, classes, methods) for better code understanding in RAG applications.

Key Features:
- AST-aware chunking for Python, Java, C#, TypeScript files
- Graceful fallback to traditional chunking for unsupported languages
- New specialized code RAG application for repositories
- Enhanced CLI with --use-ast-chunking flag
- Comprehensive test suite with integration tests

Technical Implementation:
- New chunking_utils.py module with enhanced chunking logic
- Extended base RAG framework with AST chunking arguments
- Updated document RAG with --enable-code-chunking flag
- CLI integration with proper error handling and fallback

Benefits:
- Better semantic understanding of code structure
- Improved search quality for code-related queries
- Maintains backward compatibility with existing workflows
- Supports mixed content (code + documentation) seamlessly

Dependencies:
- Added astchunk and tree-sitter parsers to pyproject.toml
- All dependencies are optional - fallback works without them

Testing:
- Comprehensive test suite in test_astchunk_integration.py
- Integration tests with document RAG
- Error handling and edge case coverage

Documentation:
- Updated README.md with AST chunking highlights
- Added ASTCHUNK_INTEGRATION.md with complete guide
- Updated features.md with new capabilities

* Refactored chunk utils

* Remove useless import

* Update README.md

* Update apps/chunking/utils.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update apps/code_rag.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fix issue

* apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fixes after pr review

* Fix tests not passing

* Fix linter error for documentation files

* Update .gitignore with unwanted files

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Andy Lee <andylizf@outlook.com>
2025-08-19 23:35:31 -07:00
Andy Lee
348423eca9 style: format 2025-08-19 14:40:41 -07:00
Andy Lee
bc621677f6 perf: avoid merging offset dicts for lower mem usage 2025-08-19 10:58:16 -07:00
Andy Lee
9d5cdd93b4 chore: ignore benchmark data 2025-08-19 10:57:52 -07:00
GitHub Actions
0174ba5571 chore: release v0.3.2 2025-08-19 09:41:40 +00:00
Andy Lee
03af82d695 fix: leann mcp search cwd & interactive issues (#72) 2025-08-19 02:27:06 -07:00
GitHub Actions
738f1dbab8 chore: release v0.3.1 2025-08-19 05:56:45 +00:00
yichuan520030910320
37d990d51c [feature] fix cli 2025-08-18 22:55:43 -07:00
Andy Lee
a6f07a54f1 fix: Use uv venv for Arch Linux CI wheel installation (#69)
- Use astral-sh/setup-uv@v4 action for consistency with other jobs
- Create virtual environment with uv venv to bypass PEP 668 restrictions
- Install wheels using uv pip install for faster dependency resolution
- Maintain tool consistency across the entire CI pipeline

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Claude <noreply@anthropic.com>
2025-08-16 21:32:19 -07:00
Andy Lee
46905e0687 feat: Improve DiskANN cross-platform compatibility and add Arch Linux support (#66)
* feat: Enhance CLI with improved list and smart remove commands

##  New Features

### 🏠 Enhanced `leann list` command
- **Better UX**: Current project shown first with clear separation
- **Visual improvements**: Icons (🏠/📂), better formatting, size info
- **Smart guidance**: Context-aware usage examples and getting started tips

### 🛡️ Smart `leann remove` command
- **Safety first**: Always shows ALL matching indexes across projects
- **Intelligent handling**:
  - Single match: Clear location display with cross-project warnings
  - Multiple matches: Interactive selection with final confirmation
- **Prevents accidents**: No more deleting wrong indexes due to name conflicts
- **User-friendly**: 'c' to cancel, clear visual hierarchy, detailed info

### 🔧 Technical improvements
- **Clean logging**: Hide debug messages for better CLI experience
- **Comprehensive search**: Always scan all projects for transparency
- **Error handling**: Graceful handling of edge cases and user input

## 🎯 Impact
- **Safer**: Eliminates risk of accidental index deletion
- **Clearer**: Users always know what they're operating on
- **Smarter**: Automatic detection and handling of common scenarios

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* chore: vscode ruff, and format

* fix: Update DiskANN submodule with MKL linking improvements

Updates DiskANN submodule to include fix for MKL linking issues:
- Replaces global link_libraries() with target-specific linking
- Uses dynamic MKL linking (mkl_rt) for better cross-platform compatibility
- Prevents MKL contamination of unrelated targets (like zlib tests)
- Resolves build failures on strict linkers (Arch Linux) while maintaining Ubuntu compatibility

DiskANN commit: c593831 - fix: Replace global MKL linking with target-specific approach

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* chore: all linux deps

* fix: Update Intel MKL download link to avoid 403 error

- Replace problematic Intel download URL that returns 403 Forbidden
- Use general Intel oneAPI MKL page instead of specific download parameters
- This fixes the lychee link checker CI failure

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: Configure lychee to use browser User-Agent for Intel links

- Replace domain exclusion with browser User-Agent to properly check Intel links
- Intel website blocks automated tools but allows browser-like requests
- This enables proper link validation while avoiding 403 Forbidden errors

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: Use curl User-Agent for lychee link checking

Intel website has specific anti-bot logic:
- Blocks browser User-Agents (returns 403)
- Blocks lychee default User-Agent (returns 403)
- Allows curl User-Agent (returns 200)

This enables proper link validation for Intel documentation.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-08-16 14:42:20 -07:00
Andy Lee
838ade231e 🔗 Auto-register apps: Universal index discovery (#64)
* feat: Enhance CLI with improved list and smart remove commands

##  New Features

### 🏠 Enhanced `leann list` command
- **Better UX**: Current project shown first with clear separation
- **Visual improvements**: Icons (🏠/📂), better formatting, size info
- **Smart guidance**: Context-aware usage examples and getting started tips

### 🛡️ Smart `leann remove` command
- **Safety first**: Always shows ALL matching indexes across projects
- **Intelligent handling**:
  - Single match: Clear location display with cross-project warnings
  - Multiple matches: Interactive selection with final confirmation
- **Prevents accidents**: No more deleting wrong indexes due to name conflicts
- **User-friendly**: 'c' to cancel, clear visual hierarchy, detailed info

### 🔧 Technical improvements
- **Clean logging**: Hide debug messages for better CLI experience
- **Comprehensive search**: Always scan all projects for transparency
- **Error handling**: Graceful handling of edge cases and user input

## 🎯 Impact
- **Safer**: Eliminates risk of accidental index deletion
- **Clearer**: Users always know what they're operating on
- **Smarter**: Automatic detection and handling of common scenarios

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* chore: vscode ruff, and format

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-08-16 11:50:25 -07:00
Andy Lee
da6540decd feat: Enhance CLI with improved list and smart remove commands (#63)
- **Better UX**: Current project shown first with clear separation
- **Visual improvements**: Icons (🏠/📂), better formatting, size info
- **Smart guidance**: Context-aware usage examples and getting started tips

- **Safety first**: Always shows ALL matching indexes across projects
- **Intelligent handling**:
  - Single match: Clear location display with cross-project warnings
  - Multiple matches: Interactive selection with final confirmation
- **Prevents accidents**: No more deleting wrong indexes due to name conflicts
- **User-friendly**: 'c' to cancel, clear visual hierarchy, detailed info

- **Clean logging**: Hide debug messages for better CLI experience
- **Comprehensive search**: Always scan all projects for transparency
- **Error handling**: Graceful handling of edge cases and user input

- **Safer**: Eliminates risk of accidental index deletion
- **Clearer**: Users always know what they're operating on
- **Smarter**: Automatic detection and handling of common scenarios
2025-08-15 23:49:47 -07:00
yichuan520030910320
39e18a7c11 [chore] remove gitattribute 2025-08-15 23:12:24 -07:00
Andy Lee
6bde28584b feat: Add Google Gemini API support for chat and embeddings (#57)
- Add GeminiChat class with gemini-2.5-flash model support
- Add compute_embeddings_gemini function with text-embedding-004 model
- Update get_llm factory to support "gemini" type
- Update API documentation to include gemini embedding mode
- Support temperature, max_tokens, top_p parameters for Gemini chat
- Support batch embedding processing with progress bars
- Add proper error handling and API key validation
2025-08-15 21:54:11 -07:00
yichuan520030910320
f62632c41f [readme]update arch linux install 2025-08-15 21:41:34 -07:00
yichuan520030910320
27708243ca update system support 2025-08-15 21:32:53 -07:00
GitHub Actions
9a1e4652ca chore: release v0.3.0 2025-08-16 00:54:47 +00:00
Andy Lee
14e84d9e2d fix(core): skip empty/invalid chunks before embedding; guard OpenAI embeddings (#55)
Avoid 400 errors from OpenAI when chunker yields empty strings by filtering
invalid texts in LeannBuilder.build_index. Add validation fail-fast in
OpenAI embedding path to surface upstream issues earlier. Keeps passages and
embeddings aligned during build.

Refs #54
2025-08-15 17:53:53 -07:00
Yichuan Wang
2dcfca19ff style: apply ruff format (#56) 2025-08-15 17:48:33 -07:00
34 changed files with 5989 additions and 4034 deletions

1
.gitattributes vendored
View File

@@ -1 +0,0 @@
paper_plot/data/big_graph_degree_data.npz filter=lfs diff=lfs merge=lfs -text

View File

@@ -87,7 +87,7 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
ref: ${{ inputs.ref }}
submodules: recursive
@@ -98,21 +98,23 @@ jobs:
python-version: ${{ matrix.python }}
- name: Install uv
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@v6
- name: Install system dependencies (Ubuntu)
if: runner.os == 'Linux'
run: |
sudo apt-get update
sudo apt-get install -y libomp-dev libboost-all-dev protobuf-compiler libzmq3-dev \
pkg-config libopenblas-dev patchelf libabsl-dev libaio-dev libprotobuf-dev
pkg-config libabsl-dev libaio-dev libprotobuf-dev \
patchelf
# Install Intel MKL for DiskANN
wget -q https://registrationcenter-download.intel.com/akdlm/IRC_NAS/79153e0f-74d7-45af-b8c2-258941adf58a/intel-onemkl-2025.0.0.940.sh
sudo sh intel-onemkl-2025.0.0.940.sh -a --components intel.oneapi.lin.mkl.devel --action install --eula accept -s
source /opt/intel/oneapi/setvars.sh
echo "MKLROOT=/opt/intel/oneapi/mkl/latest" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=/opt/intel/oneapi/mkl/latest/lib/intel64:$LD_LIBRARY_PATH" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=/opt/intel/oneapi/compiler/latest/linux/compiler/lib/intel64_lin" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/intel/oneapi/mkl/latest/lib/intel64" >> $GITHUB_ENV
- name: Install system dependencies (macOS)
if: runner.os == 'macOS'
@@ -304,3 +306,53 @@ jobs:
with:
name: packages-${{ matrix.os }}-py${{ matrix.python }}
path: packages/*/dist/
arch-smoke:
name: Arch Linux smoke test (install & import)
needs: build
runs-on: ubuntu-latest
container:
image: archlinux:latest
steps:
- name: Prepare system
run: |
pacman -Syu --noconfirm
pacman -S --noconfirm python python-pip gcc git zlib openssl
- name: Download ALL wheel artifacts from this run
uses: actions/download-artifact@v5
with:
# Don't specify name, download all artifacts
path: ./wheels
- name: Install uv
uses: astral-sh/setup-uv@v6
- name: Create virtual environment and install wheels
run: |
uv venv
source .venv/bin/activate || source .venv/Scripts/activate
uv pip install --find-links wheels leann-core
uv pip install --find-links wheels leann-backend-hnsw
uv pip install --find-links wheels leann-backend-diskann
uv pip install --find-links wheels leann
- name: Import & tiny runtime check
env:
OMP_NUM_THREADS: 1
MKL_NUM_THREADS: 1
run: |
source .venv/bin/activate || source .venv/Scripts/activate
python - <<'PY'
import leann
import leann_backend_hnsw as h
import leann_backend_diskann as d
from leann import LeannBuilder, LeannSearcher
b = LeannBuilder(backend_name="hnsw")
b.add_text("hello arch")
b.build_index("arch_demo.leann")
s = LeannSearcher("arch_demo.leann")
print("search:", s.search("hello", top_k=1))
PY

View File

@@ -14,6 +14,6 @@ jobs:
- uses: actions/checkout@v4
- uses: lycheeverse/lychee-action@v2
with:
args: --no-progress --insecure README.md docs/ apps/ examples/ benchmarks/
args: --no-progress --insecure --user-agent 'curl/7.68.0' README.md docs/ apps/ examples/ benchmarks/
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

3
.gitignore vendored
View File

@@ -18,6 +18,7 @@ demo/experiment_results/**/*.json
*.eml
*.emlx
*.json
!.vscode/*.json
*.sh
*.txt
!CMakeLists.txt
@@ -92,3 +93,5 @@ packages/leann-backend-diskann/third_party/DiskANN/_deps/
batchtest.py
tests/__pytest_cache__/
tests/__pycache__/
benchmarks/data/

5
.vscode/extensions.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"recommendations": [
"charliermarsh.ruff",
]
}

22
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,22 @@
{
"python.defaultInterpreterPath": ".venv/bin/python",
"python.terminal.activateEnvironment": true,
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit"
},
"editor.insertSpaces": true,
"editor.tabSize": 4
},
"ruff.enable": true,
"files.watcherExclude": {
"**/.venv/**": true,
"**/__pycache__/**": true,
"**/*.egg-info/**": true,
"**/build/**": true,
"**/dist/**": true
}
}

101
README.md
View File

@@ -5,7 +5,7 @@
<p align="center">
<img src="https://img.shields.io/badge/Python-3.9%20%7C%203.10%20%7C%203.11%20%7C%203.12%20%7C%203.13-blue.svg" alt="Python Versions">
<img src="https://github.com/yichuan-w/LEANN/actions/workflows/build-and-publish.yml/badge.svg" alt="CI Status">
<img src="https://img.shields.io/badge/Platform-Ubuntu%20%7C%20macOS%20(ARM64%2FIntel)-lightgrey" alt="Platform">
<img src="https://img.shields.io/badge/Platform-Ubuntu%20%26%20Arch%20%26%20WSL%20%7C%20macOS%20(ARM64%2FIntel)-lightgrey" alt="Platform">
<img src="https://img.shields.io/badge/License-MIT-green.svg" alt="MIT License">
<img src="https://img.shields.io/badge/MCP-Native%20Integration-blue" alt="MCP Integration">
</p>
@@ -87,15 +87,60 @@ git submodule update --init --recursive
```
**macOS:**
Note: DiskANN requires MacOS 13.3 or later.
```bash
brew install llvm libomp boost protobuf zeromq pkgconf
CC=$(brew --prefix llvm)/bin/clang CXX=$(brew --prefix llvm)/bin/clang++ uv sync
brew install libomp boost protobuf zeromq pkgconf
uv sync --extra diskann
```
**Linux:**
**Linux (Ubuntu/Debian):**
Note: On Ubuntu 20.04, you may need to build a newer Abseil and pin Protobuf (e.g., v3.20.x) for building DiskANN. See [Issue #30](https://github.com/yichuan-w/LEANN/issues/30) for a step-by-step note.
You can manually install [Intel oneAPI MKL](https://www.intel.com/content/www/us/en/developer/tools/oneapi/onemkl.html) instead of `libmkl-full-dev` for DiskANN. You can also use `libopenblas-dev` for building HNSW only, by removing `--extra diskann` in the command below.
```bash
sudo apt-get install libomp-dev libboost-all-dev protobuf-compiler libabsl-dev libmkl-full-dev libaio-dev libzmq3-dev
uv sync
sudo apt-get update && sudo apt-get install -y \
libomp-dev libboost-all-dev protobuf-compiler libzmq3-dev \
pkg-config libabsl-dev libaio-dev libprotobuf-dev \
libmkl-full-dev
uv sync --extra diskann
```
**Linux (Arch Linux):**
```bash
sudo pacman -Syu && sudo pacman -S --needed base-devel cmake pkgconf git gcc \
boost boost-libs protobuf abseil-cpp libaio zeromq
# For MKL in DiskANN
sudo pacman -S --needed base-devel git
git clone https://aur.archlinux.org/paru-bin.git
cd paru-bin && makepkg -si
paru -S intel-oneapi-mkl intel-oneapi-compiler
source /opt/intel/oneapi/setvars.sh
uv sync --extra diskann
```
**Linux (RHEL / CentOS Stream / Oracle / Rocky / AlmaLinux):**
See [Issue #50](https://github.com/yichuan-w/LEANN/issues/50) for more details.
```bash
sudo dnf groupinstall -y "Development Tools"
sudo dnf install -y libomp-devel boost-devel protobuf-compiler protobuf-devel \
abseil-cpp-devel libaio-devel zeromq-devel pkgconf-pkg-config
# For MKL in DiskANN
sudo dnf install -y intel-oneapi-mkl intel-oneapi-mkl-devel \
intel-oneapi-openmp || sudo dnf install -y intel-oneapi-compiler
source /opt/intel/oneapi/setvars.sh
uv sync --extra diskann
```
</details>
@@ -131,6 +176,9 @@ response = chat.ask("How much storage does LEANN save?", top_k=1)
LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, and more.
**AST-Aware Code Chunking** - LEANN also features intelligent code chunking that preserves semantic boundaries (functions, classes, methods) for Python, Java, C#, and TypeScript files, providing improved code understanding compared to traditional text-based approaches.
📖 Read the [AST Chunking Guide →](docs/ast_chunking_guide.md) to learn more.
### Generation Model Setup
LEANN supports multiple LLM providers for text generation (OpenAI API, HuggingFace, Ollama).
@@ -249,6 +297,12 @@ python -m apps.document_rag --data-dir "~/Documents/Papers" --chunk-size 1024
# Filter only markdown and Python files with smaller chunks
python -m apps.document_rag --data-dir "./docs" --chunk-size 256 --file-types .md .py
# Enable AST-aware chunking for code files
python -m apps.document_rag --enable-code-chunking --data-dir "./my_project"
# Or use the specialized code RAG for better code understanding
python -m apps.code_rag --repo-dir "./my_codebase" --query "How does authentication work?"
```
</details>
@@ -427,6 +481,7 @@ Once the index is built, you can ask questions like:
**Key features:**
- 🔍 **Semantic code search** across your entire project, fully local index and lightweight
- 🧠 **AST-aware chunking** preserves code structure (functions, classes)
- 📚 **Context-aware assistance** for debugging and development
- 🚀 **Zero-config setup** with automatic language detection
@@ -482,11 +537,15 @@ leann ask my-docs --interactive
# List all your indexes
leann list
# Remove an index
leann remove my-docs
```
**Key CLI features:**
- Auto-detects document formats (PDF, TXT, MD, DOCX, PPTX + code files)
- Smart text chunking with overlap
- **🧠 AST-aware chunking** for Python, Java, C#, TypeScript files
- Smart text chunking with overlap for all other content
- Multiple LLM providers (Ollama, OpenAI, HuggingFace)
- Organized index storage in `.leann/indexes/` (project-local)
- Support for advanced search parameters
@@ -494,7 +553,7 @@ leann list
<details>
<summary><strong>📋 Click to expand: Complete CLI Reference</strong></summary>
You can use `leann --help`, or `leann build --help`, `leann search --help`, `leann ask --help` to get the complete CLI reference.
You can use `leann --help`, or `leann build --help`, `leann search --help`, `leann ask --help`, `leann list --help`, `leann remove --help` to get the complete CLI reference.
**Build Command:**
```bash
@@ -532,6 +591,31 @@ Options:
--top-k N Retrieval count (default: 20)
```
**List Command:**
```bash
leann list
# Lists all indexes across all projects with status indicators:
# ✅ - Index is complete and ready to use
# ❌ - Index is incomplete or corrupted
# 📁 - CLI-created index (in .leann/indexes/)
# 📄 - App-created index (*.leann.meta.json files)
```
**Remove Command:**
```bash
leann remove INDEX_NAME [OPTIONS]
Options:
--force, -f Force removal without confirmation
# Smart removal: automatically finds and safely removes indexes
# - Shows all matching indexes across projects
# - Requires confirmation for cross-project removal
# - Interactive selection when multiple matches found
# - Supports both CLI and app-created indexes
```
</details>
## 🏗️ Architecture & How It Works
@@ -573,6 +657,7 @@ Options:
```bash
uv pip install -e ".[dev]" # Install dev dependencies
python benchmarks/run_evaluation.py # Will auto-download evaluation data and run benchmarks
python benchmarks/run_evaluation.py benchmarks/data/indices/rpj_wiki/rpj_wiki --num-queries 2000 # After downloading data, you can run the benchmark with our biggest index
```
The evaluation script downloads data automatically on first run. The last three results were tested with partial personal data, and you can reproduce them with your own data!

View File

@@ -10,7 +10,7 @@ from typing import Any
import dotenv
from leann.api import LeannBuilder, LeannChat
from llama_index.core.node_parser import SentenceSplitter
from leann.registry import register_project_directory
dotenv.load_dotenv()
@@ -108,6 +108,38 @@ class BaseRAGExample(ABC):
help="Thinking budget for reasoning models (low/medium/high). Supported by GPT-Oss:20b and other reasoning models.",
)
# AST Chunking parameters
ast_group = parser.add_argument_group("AST Chunking Parameters")
ast_group.add_argument(
"--use-ast-chunking",
action="store_true",
help="Enable AST-aware chunking for code files (requires astchunk)",
)
ast_group.add_argument(
"--ast-chunk-size",
type=int,
default=512,
help="Maximum characters per AST chunk (default: 512)",
)
ast_group.add_argument(
"--ast-chunk-overlap",
type=int,
default=64,
help="Overlap between AST chunks (default: 64)",
)
ast_group.add_argument(
"--code-file-extensions",
nargs="+",
default=None,
help="Additional code file extensions to process with AST chunking (e.g., .py .java .cs .ts)",
)
ast_group.add_argument(
"--ast-fallback-traditional",
action="store_true",
default=True,
help="Fall back to traditional chunking if AST chunking fails (default: True)",
)
# Search parameters
search_group = parser.add_argument_group("Search Parameters")
search_group.add_argument(
@@ -214,6 +246,11 @@ class BaseRAGExample(ABC):
builder.build_index(index_path)
print(f"Index saved to: {index_path}")
# Register project directory so leann list can discover this index
# The index is saved as args.index_dir/index_name.leann
# We want to register the current working directory where the app is run
register_project_directory(Path.cwd())
return index_path
async def run_interactive_chat(self, args, index_path: str):
@@ -304,21 +341,3 @@ class BaseRAGExample(ABC):
await self.run_single_query(args, index_path, args.query)
else:
await self.run_interactive_chat(args, index_path)
def create_text_chunks(documents, chunk_size=256, chunk_overlap=25) -> list[str]:
"""Helper function to create text chunks from documents."""
node_parser = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
)
all_texts = []
for doc in documents:
nodes = node_parser.get_nodes_from_documents([doc])
if nodes:
all_texts.extend(node.get_content() for node in nodes)
return all_texts

22
apps/chunking/__init__.py Normal file
View File

@@ -0,0 +1,22 @@
"""
Chunking utilities for LEANN RAG applications.
Provides AST-aware and traditional text chunking functionality.
"""
from .utils import (
CODE_EXTENSIONS,
create_ast_chunks,
create_text_chunks,
create_traditional_chunks,
detect_code_files,
get_language_from_extension,
)
__all__ = [
"CODE_EXTENSIONS",
"create_ast_chunks",
"create_text_chunks",
"create_traditional_chunks",
"detect_code_files",
"get_language_from_extension",
]

320
apps/chunking/utils.py Normal file
View File

@@ -0,0 +1,320 @@
"""
Enhanced chunking utilities with AST-aware code chunking support.
Provides unified interface for both traditional and AST-based text chunking.
"""
import logging
from pathlib import Path
from typing import Optional
from llama_index.core.node_parser import SentenceSplitter
logger = logging.getLogger(__name__)
# Code file extensions supported by astchunk
CODE_EXTENSIONS = {
".py": "python",
".java": "java",
".cs": "csharp",
".ts": "typescript",
".tsx": "typescript",
".js": "typescript",
".jsx": "typescript",
}
# Default chunk parameters for different content types
DEFAULT_CHUNK_PARAMS = {
"code": {
"max_chunk_size": 512,
"chunk_overlap": 64,
},
"text": {
"chunk_size": 256,
"chunk_overlap": 128,
},
}
def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
"""
Separate documents into code files and regular text files.
Args:
documents: List of LlamaIndex Document objects
code_extensions: Dict mapping file extensions to languages (defaults to CODE_EXTENSIONS)
Returns:
Tuple of (code_documents, text_documents)
"""
if code_extensions is None:
code_extensions = CODE_EXTENSIONS
code_docs = []
text_docs = []
for doc in documents:
# Get file path from metadata
file_path = doc.metadata.get("file_path", "")
if not file_path:
# Fallback to file_name
file_path = doc.metadata.get("file_name", "")
if file_path:
file_ext = Path(file_path).suffix.lower()
if file_ext in code_extensions:
# Add language info to metadata
doc.metadata["language"] = code_extensions[file_ext]
doc.metadata["is_code"] = True
code_docs.append(doc)
else:
doc.metadata["is_code"] = False
text_docs.append(doc)
else:
# If no file path, treat as text
doc.metadata["is_code"] = False
text_docs.append(doc)
logger.info(f"Detected {len(code_docs)} code files and {len(text_docs)} text files")
return code_docs, text_docs
def get_language_from_extension(file_path: str) -> Optional[str]:
"""Get the programming language from file extension."""
ext = Path(file_path).suffix.lower()
return CODE_EXTENSIONS.get(ext)
def create_ast_chunks(
documents,
max_chunk_size: int = 512,
chunk_overlap: int = 64,
metadata_template: str = "default",
) -> list[str]:
"""
Create AST-aware chunks from code documents using astchunk.
Args:
documents: List of code documents
max_chunk_size: Maximum characters per chunk
chunk_overlap: Number of AST nodes to overlap between chunks
metadata_template: Template for chunk metadata
Returns:
List of text chunks with preserved code structure
"""
try:
from astchunk import ASTChunkBuilder
except ImportError as e:
logger.error(f"astchunk not available: {e}")
logger.info("Falling back to traditional chunking for code files")
return create_traditional_chunks(documents, max_chunk_size, chunk_overlap)
all_chunks = []
for doc in documents:
# Get language from metadata (set by detect_code_files)
language = doc.metadata.get("language")
if not language:
logger.warning(
"No language detected for document, falling back to traditional chunking"
)
traditional_chunks = create_traditional_chunks([doc], max_chunk_size, chunk_overlap)
all_chunks.extend(traditional_chunks)
continue
try:
# Configure astchunk
configs = {
"max_chunk_size": max_chunk_size,
"language": language,
"metadata_template": metadata_template,
"chunk_overlap": chunk_overlap if chunk_overlap > 0 else 0,
}
# Add repository-level metadata if available
repo_metadata = {
"file_path": doc.metadata.get("file_path", ""),
"file_name": doc.metadata.get("file_name", ""),
"creation_date": doc.metadata.get("creation_date", ""),
"last_modified_date": doc.metadata.get("last_modified_date", ""),
}
configs["repo_level_metadata"] = repo_metadata
# Create chunk builder and process
chunk_builder = ASTChunkBuilder(**configs)
code_content = doc.get_content()
if not code_content or not code_content.strip():
logger.warning("Empty code content, skipping")
continue
chunks = chunk_builder.chunkify(code_content)
# Extract text content from chunks
for chunk in chunks:
if hasattr(chunk, "text"):
chunk_text = chunk.text
elif isinstance(chunk, dict) and "text" in chunk:
chunk_text = chunk["text"]
elif isinstance(chunk, str):
chunk_text = chunk
else:
# Try to convert to string
chunk_text = str(chunk)
if chunk_text and chunk_text.strip():
all_chunks.append(chunk_text.strip())
logger.info(
f"Created {len(chunks)} AST chunks from {language} file: {doc.metadata.get('file_name', 'unknown')}"
)
except Exception as e:
logger.warning(f"AST chunking failed for {language} file: {e}")
logger.info("Falling back to traditional chunking")
traditional_chunks = create_traditional_chunks([doc], max_chunk_size, chunk_overlap)
all_chunks.extend(traditional_chunks)
return all_chunks
def create_traditional_chunks(
documents, chunk_size: int = 256, chunk_overlap: int = 128
) -> list[str]:
"""
Create traditional text chunks using LlamaIndex SentenceSplitter.
Args:
documents: List of documents to chunk
chunk_size: Size of each chunk in characters
chunk_overlap: Overlap between chunks
Returns:
List of text chunks
"""
# Handle invalid chunk_size values
if chunk_size <= 0:
logger.warning(f"Invalid chunk_size={chunk_size}, using default value of 256")
chunk_size = 256
# Ensure chunk_overlap is not negative and not larger than chunk_size
if chunk_overlap < 0:
chunk_overlap = 0
if chunk_overlap >= chunk_size:
chunk_overlap = chunk_size // 2
node_parser = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
)
all_texts = []
for doc in documents:
try:
nodes = node_parser.get_nodes_from_documents([doc])
if nodes:
chunk_texts = [node.get_content() for node in nodes]
all_texts.extend(chunk_texts)
logger.debug(f"Created {len(chunk_texts)} traditional chunks from document")
except Exception as e:
logger.error(f"Traditional chunking failed for document: {e}")
# As last resort, add the raw content
content = doc.get_content()
if content and content.strip():
all_texts.append(content.strip())
return all_texts
def create_text_chunks(
documents,
chunk_size: int = 256,
chunk_overlap: int = 128,
use_ast_chunking: bool = False,
ast_chunk_size: int = 512,
ast_chunk_overlap: int = 64,
code_file_extensions: Optional[list[str]] = None,
ast_fallback_traditional: bool = True,
) -> list[str]:
"""
Create text chunks from documents with optional AST support for code files.
Args:
documents: List of LlamaIndex Document objects
chunk_size: Size for traditional text chunks
chunk_overlap: Overlap for traditional text chunks
use_ast_chunking: Whether to use AST chunking for code files
ast_chunk_size: Size for AST chunks
ast_chunk_overlap: Overlap for AST chunks
code_file_extensions: Custom list of code file extensions
ast_fallback_traditional: Fall back to traditional chunking on AST errors
Returns:
List of text chunks
"""
if not documents:
logger.warning("No documents provided for chunking")
return []
# Create a local copy of supported extensions for this function call
local_code_extensions = CODE_EXTENSIONS.copy()
# Update supported extensions if provided
if code_file_extensions:
# Map extensions to languages (simplified mapping)
ext_mapping = {
".py": "python",
".java": "java",
".cs": "c_sharp",
".ts": "typescript",
".tsx": "typescript",
}
for ext in code_file_extensions:
if ext.lower() not in local_code_extensions:
# Try to guess language from extension
if ext.lower() in ext_mapping:
local_code_extensions[ext.lower()] = ext_mapping[ext.lower()]
else:
logger.warning(f"Unsupported extension {ext}, will use traditional chunking")
all_chunks = []
if use_ast_chunking:
# Separate code and text documents using local extensions
code_docs, text_docs = detect_code_files(documents, local_code_extensions)
# Process code files with AST chunking
if code_docs:
logger.info(f"Processing {len(code_docs)} code files with AST chunking")
try:
ast_chunks = create_ast_chunks(
code_docs, max_chunk_size=ast_chunk_size, chunk_overlap=ast_chunk_overlap
)
all_chunks.extend(ast_chunks)
logger.info(f"Created {len(ast_chunks)} AST chunks from code files")
except Exception as e:
logger.error(f"AST chunking failed: {e}")
if ast_fallback_traditional:
logger.info("Falling back to traditional chunking for code files")
traditional_code_chunks = create_traditional_chunks(
code_docs, chunk_size, chunk_overlap
)
all_chunks.extend(traditional_code_chunks)
else:
raise
# Process text files with traditional chunking
if text_docs:
logger.info(f"Processing {len(text_docs)} text files with traditional chunking")
text_chunks = create_traditional_chunks(text_docs, chunk_size, chunk_overlap)
all_chunks.extend(text_chunks)
logger.info(f"Created {len(text_chunks)} traditional chunks from text files")
else:
# Use traditional chunking for all files
logger.info(f"Processing {len(documents)} documents with traditional chunking")
all_chunks = create_traditional_chunks(documents, chunk_size, chunk_overlap)
logger.info(f"Total chunks created: {len(all_chunks)}")
return all_chunks

211
apps/code_rag.py Normal file
View File

@@ -0,0 +1,211 @@
"""
Code RAG example using AST-aware chunking for optimal code understanding.
Specialized for code repositories with automatic language detection and
optimized chunking parameters.
"""
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample
from chunking import CODE_EXTENSIONS, create_text_chunks
from llama_index.core import SimpleDirectoryReader
class CodeRAG(BaseRAGExample):
"""Specialized RAG example for code repositories with AST-aware chunking."""
def __init__(self):
super().__init__(
name="Code",
description="Process and query code repositories with AST-aware chunking",
default_index_name="code_index",
)
# Override defaults for code-specific usage
self.embedding_model_default = "facebook/contriever" # Good for code
self.max_items_default = -1 # Process all code files by default
def _add_specific_arguments(self, parser):
"""Add code-specific arguments."""
code_group = parser.add_argument_group("Code Repository Parameters")
code_group.add_argument(
"--repo-dir",
type=str,
default=".",
help="Code repository directory to index (default: current directory)",
)
code_group.add_argument(
"--include-extensions",
nargs="+",
default=list(CODE_EXTENSIONS.keys()),
help="File extensions to include (default: supported code extensions)",
)
code_group.add_argument(
"--exclude-dirs",
nargs="+",
default=[
".git",
"__pycache__",
"node_modules",
"venv",
".venv",
"build",
"dist",
"target",
],
help="Directories to exclude from indexing",
)
code_group.add_argument(
"--max-file-size",
type=int,
default=1000000, # 1MB
help="Maximum file size in bytes to process (default: 1MB)",
)
code_group.add_argument(
"--include-comments",
action="store_true",
help="Include comments in chunking (useful for documentation)",
)
code_group.add_argument(
"--preserve-imports",
action="store_true",
default=True,
help="Try to preserve import statements in chunks (default: True)",
)
async def load_data(self, args) -> list[str]:
"""Load code files and convert to AST-aware chunks."""
print(f"🔍 Scanning code repository: {args.repo_dir}")
print(f"📁 Including extensions: {args.include_extensions}")
print(f"🚫 Excluding directories: {args.exclude_dirs}")
# Check if repository directory exists
repo_path = Path(args.repo_dir)
if not repo_path.exists():
raise ValueError(f"Repository directory not found: {args.repo_dir}")
# Load code files with filtering
reader_kwargs = {
"recursive": True,
"encoding": "utf-8",
"required_exts": args.include_extensions,
"exclude_hidden": True,
}
# Create exclusion filter
def file_filter(file_path: str) -> bool:
"""Filter out unwanted files and directories."""
path = Path(file_path)
# Check file size
try:
if path.stat().st_size > args.max_file_size:
print(f"⚠️ Skipping large file: {path.name} ({path.stat().st_size} bytes)")
return False
except Exception:
return False
# Check if in excluded directory
for exclude_dir in args.exclude_dirs:
if exclude_dir in path.parts:
return False
return True
try:
# Load documents with file filtering
documents = SimpleDirectoryReader(
args.repo_dir,
file_extractor=None, # Use default extractors
**reader_kwargs,
).load_data(show_progress=True)
# Apply custom filtering
filtered_docs = []
for doc in documents:
file_path = doc.metadata.get("file_path", "")
if file_filter(file_path):
filtered_docs.append(doc)
documents = filtered_docs
except Exception as e:
print(f"❌ Error loading code files: {e}")
return []
if not documents:
print(
f"❌ No code files found in {args.repo_dir} with extensions {args.include_extensions}"
)
return []
print(f"✅ Loaded {len(documents)} code files")
# Show breakdown by language/extension
ext_counts = {}
for doc in documents:
file_path = doc.metadata.get("file_path", "")
if file_path:
ext = Path(file_path).suffix.lower()
ext_counts[ext] = ext_counts.get(ext, 0) + 1
print("📊 Files by extension:")
for ext, count in sorted(ext_counts.items()):
print(f" {ext}: {count} files")
# Use AST-aware chunking by default for code
print(
f"🧠 Using AST-aware chunking (chunk_size: {args.ast_chunk_size}, overlap: {args.ast_chunk_overlap})"
)
all_texts = create_text_chunks(
documents,
chunk_size=256, # Fallback for non-code files
chunk_overlap=64,
use_ast_chunking=True, # Always use AST for code RAG
ast_chunk_size=args.ast_chunk_size,
ast_chunk_overlap=args.ast_chunk_overlap,
code_file_extensions=args.include_extensions,
ast_fallback_traditional=True,
)
# Apply max_items limit if specified
if args.max_items > 0 and len(all_texts) > args.max_items:
print(f"⏳ Limiting to {args.max_items} chunks (from {len(all_texts)})")
all_texts = all_texts[: args.max_items]
print(f"✅ Generated {len(all_texts)} code chunks")
return all_texts
if __name__ == "__main__":
import asyncio
# Example queries for code RAG
print("\n💻 Code RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'How does the embedding computation work?'")
print("- 'What are the main classes in this codebase?'")
print("- 'Show me the search implementation'")
print("- 'How is error handling implemented?'")
print("- 'What design patterns are used?'")
print("- 'Explain the chunking logic'")
print("\n🚀 Features:")
print("- ✅ AST-aware chunking preserves code structure")
print("- ✅ Automatic language detection")
print("- ✅ Smart filtering of large files and common excludes")
print("- ✅ Optimized for code understanding")
print("\nUsage examples:")
print(" python -m apps.code_rag --repo-dir ./my_project")
print(
" python -m apps.code_rag --include-extensions .py .js --query 'How does authentication work?'"
)
print("\nOr run without --query for interactive mode\n")
rag = CodeRAG()
asyncio.run(rag.run())

View File

@@ -9,7 +9,8 @@ from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from base_rag_example import BaseRAGExample
from chunking import create_text_chunks
from llama_index.core import SimpleDirectoryReader
@@ -44,6 +45,11 @@ class DocumentRAG(BaseRAGExample):
doc_group.add_argument(
"--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)"
)
doc_group.add_argument(
"--enable-code-chunking",
action="store_true",
help="Enable AST-aware chunking for code files in the data directory",
)
async def load_data(self, args) -> list[str]:
"""Load documents and convert to text chunks."""
@@ -76,9 +82,22 @@ class DocumentRAG(BaseRAGExample):
print(f"Loaded {len(documents)} documents")
# Convert to text chunks
# Determine chunking strategy
use_ast = args.enable_code_chunking or getattr(args, "use_ast_chunking", False)
if use_ast:
print("Using AST-aware chunking for code files")
# Convert to text chunks with optional AST support
all_texts = create_text_chunks(
documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
documents,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
use_ast_chunking=use_ast,
ast_chunk_size=getattr(args, "ast_chunk_size", 512),
ast_chunk_overlap=getattr(args, "ast_chunk_overlap", 64),
code_file_extensions=getattr(args, "code_file_extensions", None),
ast_fallback_traditional=getattr(args, "ast_fallback_traditional", True),
)
# Apply max_items limit if specified
@@ -102,6 +121,10 @@ if __name__ == "__main__":
print(
"- 'What is the problem of developing pan gu model Huawei meets? (盘古大模型开发中遇到什么问题?)'"
)
print("\n🚀 NEW: Code-aware chunking available!")
print("- Use --enable-code-chunking to enable AST-aware chunking for code files")
print("- Supports Python, Java, C#, TypeScript files")
print("- Better semantic understanding of code structure")
print("\nOr run without --query for interactive mode\n")
rag = DocumentRAG()

View File

@@ -1,82 +0,0 @@
*.7z filter=lfs diff=lfs merge=lfs -text
*.arrow filter=lfs diff=lfs merge=lfs -text
*.bin filter=lfs diff=lfs merge=lfs -text
*.bz2 filter=lfs diff=lfs merge=lfs -text
*.ckpt filter=lfs diff=lfs merge=lfs -text
*.ftz filter=lfs diff=lfs merge=lfs -text
*.gz filter=lfs diff=lfs merge=lfs -text
*.h5 filter=lfs diff=lfs merge=lfs -text
*.joblib filter=lfs diff=lfs merge=lfs -text
*.lfs.* filter=lfs diff=lfs merge=lfs -text
*.lz4 filter=lfs diff=lfs merge=lfs -text
*.mds filter=lfs diff=lfs merge=lfs -text
*.mlmodel filter=lfs diff=lfs merge=lfs -text
*.model filter=lfs diff=lfs merge=lfs -text
*.msgpack filter=lfs diff=lfs merge=lfs -text
*.npy filter=lfs diff=lfs merge=lfs -text
*.npz filter=lfs diff=lfs merge=lfs -text
*.onnx filter=lfs diff=lfs merge=lfs -text
*.ot filter=lfs diff=lfs merge=lfs -text
*.parquet filter=lfs diff=lfs merge=lfs -text
*.pb filter=lfs diff=lfs merge=lfs -text
*.pickle filter=lfs diff=lfs merge=lfs -text
*.pkl filter=lfs diff=lfs merge=lfs -text
*.pt filter=lfs diff=lfs merge=lfs -text
*.pth filter=lfs diff=lfs merge=lfs -text
*.rar filter=lfs diff=lfs merge=lfs -text
*.safetensors filter=lfs diff=lfs merge=lfs -text
saved_model/**/* filter=lfs diff=lfs merge=lfs -text
*.tar.* filter=lfs diff=lfs merge=lfs -text
*.tar filter=lfs diff=lfs merge=lfs -text
*.tflite filter=lfs diff=lfs merge=lfs -text
*.tgz filter=lfs diff=lfs merge=lfs -text
*.wasm filter=lfs diff=lfs merge=lfs -text
*.xz filter=lfs diff=lfs merge=lfs -text
*.zip filter=lfs diff=lfs merge=lfs -text
*.zst filter=lfs diff=lfs merge=lfs -text
*tfevents* filter=lfs diff=lfs merge=lfs -text
# Audio files - uncompressed
*.pcm filter=lfs diff=lfs merge=lfs -text
*.sam filter=lfs diff=lfs merge=lfs -text
*.raw filter=lfs diff=lfs merge=lfs -text
# Audio files - compressed
*.aac filter=lfs diff=lfs merge=lfs -text
*.flac filter=lfs diff=lfs merge=lfs -text
*.mp3 filter=lfs diff=lfs merge=lfs -text
*.ogg filter=lfs diff=lfs merge=lfs -text
*.wav filter=lfs diff=lfs merge=lfs -text
# Image files - uncompressed
*.bmp filter=lfs diff=lfs merge=lfs -text
*.gif filter=lfs diff=lfs merge=lfs -text
*.png filter=lfs diff=lfs merge=lfs -text
*.tiff filter=lfs diff=lfs merge=lfs -text
# Image files - compressed
*.jpg filter=lfs diff=lfs merge=lfs -text
*.jpeg filter=lfs diff=lfs merge=lfs -text
*.webp filter=lfs diff=lfs merge=lfs -text
# Video files - compressed
*.mp4 filter=lfs diff=lfs merge=lfs -text
*.webm filter=lfs diff=lfs merge=lfs -text
ground_truth/dpr/id_map.json filter=lfs diff=lfs merge=lfs -text
indices/dpr/dpr_diskann.passages.idx filter=lfs diff=lfs merge=lfs -text
indices/dpr/dpr_diskann.passages.jsonl filter=lfs diff=lfs merge=lfs -text
indices/dpr/dpr_diskann_disk.index filter=lfs diff=lfs merge=lfs -text
indices/dpr/leann.labels.map filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/leann.labels.map filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.index filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.0.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.0.jsonl filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.1.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.1.jsonl filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.2.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.2.jsonl filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.3.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.3.jsonl filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.4.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.4.jsonl filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.5.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.5.jsonl filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.6.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.6.jsonl filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.7.idx filter=lfs diff=lfs merge=lfs -text
indices/rpj_wiki/rpj_wiki.passages.7.jsonl filter=lfs diff=lfs merge=lfs -text

View File

@@ -183,6 +183,9 @@ class Benchmark:
start_time = time.time()
with torch.no_grad():
self.model(input_ids=input_ids, attention_mask=attention_mask)
# mps sync
if torch.backends.mps.is_available():
torch.mps.synchronize()
end_time = time.time()
return end_time - start_time

128
docs/ast_chunking_guide.md Normal file
View File

@@ -0,0 +1,128 @@
# AST-Aware Code chunking guide
## Overview
This guide covers best practices for using AST-aware code chunking in LEANN. AST chunking provides better semantic understanding of code structure compared to traditional text-based chunking.
## Quick Start
### Basic Usage
```bash
# Enable AST chunking for mixed content (code + docs)
python -m apps.document_rag --enable-code-chunking --data-dir ./my_project
# Specialized code repository indexing
python -m apps.code_rag --repo-dir ./my_codebase
# Global CLI with AST support
leann build my-code-index --docs ./src --use-ast-chunking
```
### Installation
```bash
# Install LEANN with AST chunking support
uv pip install -e "."
```
## Best Practices
### When to Use AST Chunking
**Recommended for:**
- Code repositories with multiple languages
- Mixed documentation and code content
- Complex codebases with deep function/class hierarchies
- When working with Claude Code for code assistance
**Not recommended for:**
- Pure text documents
- Very large files (>1MB)
- Languages not supported by tree-sitter
### Optimal Configuration
```bash
# Recommended settings for most codebases
python -m apps.code_rag \
--repo-dir ./src \
--ast-chunk-size 768 \
--ast-chunk-overlap 96 \
--exclude-dirs .git __pycache__ node_modules build dist
```
### Supported Languages
| Extension | Language | Status |
|-----------|----------|--------|
| `.py` | Python | ✅ Full support |
| `.java` | Java | ✅ Full support |
| `.cs` | C# | ✅ Full support |
| `.ts`, `.tsx` | TypeScript | ✅ Full support |
| `.js`, `.jsx` | JavaScript | ✅ Via TypeScript parser |
## Integration Examples
### Document RAG with Code Support
```python
# Enable code chunking in document RAG
python -m apps.document_rag \
--enable-code-chunking \
--data-dir ./project \
--query "How does authentication work in the codebase?"
```
### Claude Code Integration
When using with Claude Code MCP server, AST chunking provides better context for:
- Code completion and suggestions
- Bug analysis and debugging
- Architecture understanding
- Refactoring assistance
## Troubleshooting
### Common Issues
1. **Fallback to Traditional Chunking**
- Normal behavior for unsupported languages
- Check logs for specific language support
2. **Performance with Large Files**
- Adjust `--max-file-size` parameter
- Use `--exclude-dirs` to skip unnecessary directories
3. **Quality Issues**
- Try different `--ast-chunk-size` values (512, 768, 1024)
- Adjust overlap for better context preservation
### Debug Mode
```bash
export LEANN_LOG_LEVEL=DEBUG
python -m apps.code_rag --repo-dir ./my_code
```
## Migration from Traditional Chunking
Existing workflows continue to work without changes. To enable AST chunking:
```bash
# Before
python -m apps.document_rag --chunk-size 256
# After (maintains traditional chunking for non-code files)
python -m apps.document_rag --enable-code-chunking --chunk-size 256 --ast-chunk-size 768
```
## References
- [astchunk GitHub Repository](https://github.com/yilinjz/astchunk)
- [LEANN MCP Integration](../packages/leann-mcp/README.md)
- [Research Paper](https://arxiv.org/html/2506.15655v1)
---
**Note**: AST chunking maintains full backward compatibility while enhancing code understanding capabilities.

View File

@@ -3,6 +3,7 @@
## 🔥 Core Features
- **🔄 Real-time Embeddings** - Eliminate heavy embedding storage with dynamic computation using optimized ZMQ servers and highly optimized search paradigm (overlapping and batching) with highly optimized embedding engine
- **🧠 AST-Aware Code Chunking** - Intelligent code chunking that preserves semantic boundaries (functions, classes, methods) for Python, Java, C#, and TypeScript files
- **📈 Scalable Architecture** - Handles millions of documents on consumer hardware; the larger your dataset, the more LEANN can save
- **🎯 Graph Pruning** - Advanced techniques to minimize the storage overhead of vector search to a limited footprint
- **🏗️ Pluggable Backends** - HNSW/FAISS (default), with optional DiskANN for large-scale deployments

View File

@@ -83,9 +83,7 @@ def create_diskann_embedding_server(
logger.info(f"Loading PassageManager with metadata_file_path: {passages_file}")
passages = PassageManager(meta["passage_sources"], metadata_file_path=passages_file)
logger.info(
f"Loaded PassageManager with {len(passages.global_offset_map)} passages from metadata"
)
logger.info(f"Loaded PassageManager with {len(passages)} passages from metadata")
# Import protobuf after ensuring the path is correct
try:

View File

@@ -4,8 +4,8 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-diskann"
version = "0.2.9"
dependencies = ["leann-core==0.2.9", "numpy", "protobuf>=3.19.0"]
version = "0.3.2"
dependencies = ["leann-core==0.3.2", "numpy", "protobuf>=3.19.0"]
[tool.scikit-build]
# Key: simplified CMake path

View File

@@ -90,9 +90,7 @@ def create_hnsw_embedding_server(
embedding_dim: int = int(meta.get("dimensions", 0))
except Exception:
embedding_dim = 0
logger.info(
f"Loaded PassageManager with {len(passages.global_offset_map)} passages from metadata"
)
logger.info(f"Loaded PassageManager with {len(passages)} passages from metadata")
# (legacy ZMQ thread removed; using shutdown-capable server only)

View File

@@ -6,10 +6,10 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-hnsw"
version = "0.2.9"
version = "0.3.2"
description = "Custom-built HNSW (Faiss) backend for the Leann toolkit."
dependencies = [
"leann-core==0.2.9",
"leann-core==0.3.2",
"numpy",
"pyzmq>=23.0.0",
"msgpack>=1.0.0",

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann-core"
version = "0.2.9"
version = "0.3.2"
description = "Core API and plugin system for LEANN"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -119,9 +119,12 @@ class PassageManager:
def __init__(
self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None
):
self.offset_maps = {}
self.passage_files = {}
self.global_offset_map = {} # Combined map for fast lookup
self.offset_maps: dict[str, dict[str, int]] = {}
self.passage_files: dict[str, str] = {}
# Avoid materializing a single gigantic global map to reduce memory
# footprint on very large corpora (e.g., 60M+ passages). Instead, keep
# per-shard maps and do a lightweight per-shard lookup on demand.
self._total_count: int = 0
# Derive index base name for standard sibling fallbacks, e.g., <index_name>.passages.*
index_name_base = None
@@ -142,12 +145,25 @@ class PassageManager:
default_name: Optional[str],
source_dict: dict[str, Any],
) -> list[Path]:
"""
Build an ordered list of candidate paths. For relative paths specified in
metadata, prefer resolution relative to the metadata file directory first,
then fall back to CWD-based resolution, and finally to conventional
sibling defaults (e.g., <index_base>.passages.idx / .jsonl).
"""
candidates: list[Path] = []
# 1) Primary as-is (absolute or relative)
# 1) Primary path
if primary:
p = Path(primary)
candidates.append(p if p.is_absolute() else (Path.cwd() / p))
# 2) metadata-relative explicit relative key
if p.is_absolute():
candidates.append(p)
else:
# Prefer metadata-relative resolution for relative paths
if metadata_file_path:
candidates.append(Path(metadata_file_path).parent / p)
# Also consider CWD-relative as a fallback for legacy layouts
candidates.append(Path.cwd() / p)
# 2) metadata-relative explicit relative key (if present)
if metadata_file_path and source_dict.get(relative_key):
candidates.append(Path(metadata_file_path).parent / source_dict[relative_key])
# 3) metadata-relative standard sibling filename
@@ -177,23 +193,28 @@ class PassageManager:
raise FileNotFoundError(f"Passage index file not found: {index_file}")
with open(index_file, "rb") as f:
offset_map = pickle.load(f)
offset_map: dict[str, int] = pickle.load(f)
self.offset_maps[passage_file] = offset_map
self.passage_files[passage_file] = passage_file
# Build global map for O(1) lookup
for passage_id, offset in offset_map.items():
self.global_offset_map[passage_id] = (passage_file, offset)
self._total_count += len(offset_map)
def get_passage(self, passage_id: str) -> dict[str, Any]:
if passage_id in self.global_offset_map:
passage_file, offset = self.global_offset_map[passage_id]
# Lazy file opening - only open when needed
with open(passage_file, encoding="utf-8") as f:
f.seek(offset)
return json.loads(f.readline())
# Fast path: check each shard map (there are typically few shards).
# This avoids building a massive combined dict while keeping lookups
# bounded by the number of shards.
for passage_file, offset_map in self.offset_maps.items():
try:
offset = offset_map[passage_id]
with open(passage_file, encoding="utf-8") as f:
f.seek(offset)
return json.loads(f.readline())
except KeyError:
continue
raise KeyError(f"Passage ID not found: {passage_id}")
def __len__(self) -> int:
return self._total_count
class LeannBuilder:
def __init__(
@@ -307,6 +328,23 @@ class LeannBuilder:
def build_index(self, index_path: str):
if not self.chunks:
raise ValueError("No chunks added.")
# Filter out invalid/empty text chunks early to keep passage and embedding counts aligned
valid_chunks: list[dict[str, Any]] = []
skipped = 0
for chunk in self.chunks:
text = chunk.get("text", "")
if isinstance(text, str) and text.strip():
valid_chunks.append(chunk)
else:
skipped += 1
if skipped > 0:
print(
f"Warning: Skipping {skipped} empty/invalid text chunk(s). Processing {len(valid_chunks)} valid chunks"
)
self.chunks = valid_chunks
if not self.chunks:
raise ValueError("All provided chunks are empty or invalid. Nothing to index.")
if self.dimensions is None:
self.dimensions = len(
compute_embeddings(
@@ -567,7 +605,9 @@ class LeannSearcher:
logger.info(f" Additional kwargs: {kwargs}")
# Smart top_k detection and adjustment
total_docs = len(self.passage_manager.global_offset_map)
# Use PassageManager length (sum of shard sizes) to avoid
# depending on a massive combined map
total_docs = len(self.passage_manager)
original_top_k = top_k
if top_k > total_docs:
top_k = total_docs
@@ -597,7 +637,7 @@ class LeannSearcher:
zmq_port=zmq_port,
)
# logger.info(f" Generated embedding shape: {query_embedding.shape}")
time.time() - start_time
# time.time() - start_time
# logger.info(f" Embedding time: {embedding_time} seconds")
start_time = time.time()
@@ -663,8 +703,9 @@ class LeannSearcher:
This method should be called after you're done using the searcher,
especially in test environments or batch processing scenarios.
"""
if hasattr(self.backend_impl, "embedding_server_manager"):
self.backend_impl.embedding_server_manager.stop_server()
backend = getattr(self.backend_impl, "embedding_server_manager", None)
if backend is not None:
backend.stop_server()
# Enable automatic cleanup patterns
def __enter__(self):

View File

@@ -707,20 +707,28 @@ class GeminiChat(LLMInterface):
logger.info(f"Sending request to Gemini with model {self.model}")
try:
# Set generation configuration
generation_config = {
"temperature": kwargs.get("temperature", 0.7),
"max_output_tokens": kwargs.get("max_tokens", 1000),
}
from google.genai.types import GenerateContentConfig
generation_config = GenerateContentConfig(
temperature=kwargs.get("temperature", 0.7),
max_output_tokens=kwargs.get("max_tokens", 1000),
)
# Handle top_p parameter
if "top_p" in kwargs:
generation_config["top_p"] = kwargs["top_p"]
generation_config.top_p = kwargs["top_p"]
response = self.client.models.generate_content(
model=self.model, contents=prompt, config=generation_config
model=self.model,
contents=prompt,
config=generation_config,
)
return response.text.strip()
# Handle potential None response text
response_text = response.text
if response_text is None:
logger.warning("Gemini returned None response text")
return ""
return response_text.strip()
except Exception as e:
logger.error(f"Error communicating with Gemini: {e}")
return f"Error: Could not get a response from Gemini. Details: {e}"

View File

@@ -1,13 +1,15 @@
import argparse
import asyncio
import sys
from pathlib import Path
from typing import Union
from typing import Any, Optional, Union
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from tqdm import tqdm
from .api import LeannBuilder, LeannChat, LeannSearcher
from .registry import register_project_directory
def extract_pdf_text_with_pymupdf(file_path: str) -> str:
@@ -84,6 +86,7 @@ Examples:
leann search my-docs "query" # Search in my-docs index
leann ask my-docs "question" # Ask my-docs index
leann list # List all stored indexes
leann remove my-docs # Remove an index (local first, then global)
""",
)
@@ -148,6 +151,12 @@ Examples:
type=str,
help="Comma-separated list of file extensions to include (e.g., '.txt,.pdf,.pptx'). If not specified, uses default supported types.",
)
build_parser.add_argument(
"--include-hidden",
action=argparse.BooleanOptionalAction,
default=False,
help="Include hidden files and directories (paths starting with '.') during indexing (default: false)",
)
build_parser.add_argument(
"--doc-chunk-size",
type=int,
@@ -172,6 +181,29 @@ Examples:
default=50,
help="Code chunk overlap (default: 50)",
)
build_parser.add_argument(
"--use-ast-chunking",
action="store_true",
help="Enable AST-aware chunking for code files (requires astchunk)",
)
build_parser.add_argument(
"--ast-chunk-size",
type=int,
default=768,
help="AST chunk size in characters (default: 768)",
)
build_parser.add_argument(
"--ast-chunk-overlap",
type=int,
default=96,
help="AST chunk overlap in characters (default: 96)",
)
build_parser.add_argument(
"--ast-fallback-traditional",
action="store_true",
default=True,
help="Fall back to traditional chunking if AST chunking fails (default: True)",
)
# Search command
search_parser = subparsers.add_parser("search", help="Search documents")
@@ -198,6 +230,11 @@ Examples:
default="global",
help="Pruning strategy (default: global)",
)
search_parser.add_argument(
"--non-interactive",
action="store_true",
help="Non-interactive mode: automatically select index without prompting",
)
# Ask command
ask_parser = subparsers.add_parser("ask", help="Ask questions")
@@ -245,35 +282,18 @@ Examples:
# List command
subparsers.add_parser("list", help="List all indexes")
# Remove command
remove_parser = subparsers.add_parser("remove", help="Remove an index")
remove_parser.add_argument("index_name", help="Index name to remove")
remove_parser.add_argument(
"--force", "-f", action="store_true", help="Force removal without confirmation"
)
return parser
def register_project_dir(self):
"""Register current project directory in global registry"""
global_registry = Path.home() / ".leann" / "projects.json"
global_registry.parent.mkdir(exist_ok=True)
current_dir = str(Path.cwd())
# Load existing registry
projects = []
if global_registry.exists():
try:
import json
with open(global_registry) as f:
projects = json.load(f)
except Exception:
projects = []
# Add current directory if not already present
if current_dir not in projects:
projects.append(current_dir)
# Save registry
import json
with open(global_registry, "w") as f:
json.dump(projects, f, indent=2)
register_project_directory()
def _build_gitignore_parser(self, docs_dir: str):
"""Build gitignore parser using gitignore-parser library."""
@@ -333,8 +353,6 @@ Examples:
return False
def list_indexes(self):
print("Stored LEANN indexes:")
# Get all project directories with .leann
global_registry = Path.home() / ".leann" / "projects.json"
all_projects = []
@@ -360,58 +378,486 @@ Examples:
if (current_path / ".leann" / "indexes").exists() and current_path not in valid_projects:
valid_projects.append(current_path)
if not valid_projects:
print(
"No indexes found. Use 'leann build <name> --docs <dir> [<dir2> ...]' to create one."
)
return
total_indexes = 0
current_dir = Path.cwd()
# Separate current and other projects
other_projects = []
for project_path in valid_projects:
indexes_dir = project_path / ".leann" / "indexes"
if not indexes_dir.exists():
continue
if project_path != current_path:
other_projects.append(project_path)
index_dirs = [d for d in indexes_dir.iterdir() if d.is_dir()]
if not index_dirs:
continue
print("📚 LEANN Indexes")
print("=" * 50)
# Show project header
if project_path == current_dir:
print(f"\n📁 Current project ({project_path}):")
else:
print(f"\n📂 {project_path}:")
total_indexes = 0
current_indexes_count = 0
for index_dir in index_dirs:
# Show current project first (most important)
print("\n🏠 Current Project")
print(f" {current_path}")
print(" " + "" * 45)
current_indexes = self._discover_indexes_in_project(current_path)
if current_indexes:
for idx in current_indexes:
total_indexes += 1
index_name = index_dir.name
meta_file = index_dir / "documents.leann.meta.json"
status = "" if meta_file.exists() else ""
current_indexes_count += 1
type_icon = "📁" if idx["type"] == "cli" else "📄"
print(f" {current_indexes_count}. {type_icon} {idx['name']} {idx['status']}")
if idx["size_mb"] > 0:
print(f" 📦 Size: {idx['size_mb']:.1f} MB")
else:
print(" 📭 No indexes in current project")
print(f" {total_indexes}. {index_name} [{status}]")
if status == "":
size_mb = sum(f.stat().st_size for f in index_dir.iterdir() if f.is_file()) / (
1024 * 1024
# Show other projects (reference information)
if other_projects:
print("\n\n🗂️ Other Projects")
print(" " + "" * 45)
for project_path in other_projects:
project_indexes = self._discover_indexes_in_project(project_path)
if not project_indexes:
continue
print(f"\n 📂 {project_path.name}")
print(f" {project_path}")
for idx in project_indexes:
total_indexes += 1
type_icon = "📁" if idx["type"] == "cli" else "📄"
print(f"{type_icon} {idx['name']} {idx['status']}")
if idx["size_mb"] > 0:
print(f" 📦 {idx['size_mb']:.1f} MB")
# Summary and usage info
print("\n" + "=" * 50)
if total_indexes == 0:
print("💡 Get started:")
print(" leann build my-docs --docs ./documents")
else:
# Count only projects that have at least one discoverable index
projects_count = sum(
1 for p in valid_projects if len(self._discover_indexes_in_project(p)) > 0
)
print(f"📊 Total: {total_indexes} indexes across {projects_count} projects")
if current_indexes_count > 0:
print("\n💫 Quick start (current project):")
# Get first index from current project for example
current_indexes_dir = current_path / ".leann" / "indexes"
if current_indexes_dir.exists():
current_index_dirs = [d for d in current_indexes_dir.iterdir() if d.is_dir()]
if current_index_dirs:
example_name = current_index_dirs[0].name
print(f' leann search {example_name} "your query"')
print(f" leann ask {example_name} --interactive")
else:
print("\n💡 Create your first index:")
print(" leann build my-docs --docs ./documents")
def _discover_indexes_in_project(self, project_path: Path):
"""Discover all indexes in a project directory (both CLI and apps formats)"""
indexes = []
# 1. CLI format: .leann/indexes/index_name/
cli_indexes_dir = project_path / ".leann" / "indexes"
if cli_indexes_dir.exists():
for index_dir in cli_indexes_dir.iterdir():
if index_dir.is_dir():
meta_file = index_dir / "documents.leann.meta.json"
status = "" if meta_file.exists() else ""
size_mb = 0
if meta_file.exists():
try:
size_mb = sum(
f.stat().st_size for f in index_dir.iterdir() if f.is_file()
) / (1024 * 1024)
except (OSError, PermissionError):
pass
indexes.append(
{
"name": index_dir.name,
"type": "cli",
"status": status,
"size_mb": size_mb,
"path": index_dir,
}
)
print(f" Size: {size_mb:.1f} MB")
if total_indexes > 0:
print(f"\nTotal: {total_indexes} indexes across {len(valid_projects)} projects")
print("\nUsage (current project only):")
# 2. Apps format: *.leann.meta.json files anywhere in the project
cli_indexes_dir = project_path / ".leann" / "indexes"
for meta_file in project_path.rglob("*.leann.meta.json"):
if meta_file.is_file():
# Skip CLI-built indexes (which store meta under .leann/indexes/<name>/)
try:
if cli_indexes_dir.exists() and cli_indexes_dir in meta_file.parents:
continue
except Exception:
pass
# Use the parent directory name as the app index display name
display_name = meta_file.parent.name
# Extract file base used to store files
file_base = meta_file.name.replace(".leann.meta.json", "")
# Show example from current project
current_indexes_dir = current_dir / ".leann" / "indexes"
if current_indexes_dir.exists():
current_index_dirs = [d for d in current_indexes_dir.iterdir() if d.is_dir()]
if current_index_dirs:
example_name = current_index_dirs[0].name
print(f' leann search {example_name} "your query"')
print(f" leann ask {example_name} --interactive")
# Apps indexes are considered complete if the .leann.meta.json file exists
status = ""
# Calculate total size of all related files (use file base)
size_mb = 0
try:
index_dir = meta_file.parent
for related_file in index_dir.glob(f"{file_base}.leann*"):
size_mb += related_file.stat().st_size / (1024 * 1024)
except (OSError, PermissionError):
pass
indexes.append(
{
"name": display_name,
"type": "app",
"status": status,
"size_mb": size_mb,
"path": meta_file,
}
)
return indexes
def remove_index(self, index_name: str, force: bool = False):
"""Safely remove an index - always show all matches for transparency"""
# Always do a comprehensive search for safety
print(f"🔍 Searching for all indexes named '{index_name}'...")
all_matches = self._find_all_matching_indexes(index_name)
if not all_matches:
print(f"❌ Index '{index_name}' not found in any project.")
return False
if len(all_matches) == 1:
return self._remove_single_match(all_matches[0], index_name, force)
else:
return self._remove_from_multiple_matches(all_matches, index_name, force)
def _find_all_matching_indexes(self, index_name: str):
"""Find all indexes with the given name across all projects"""
matches = []
# Get all registered projects
global_registry = Path.home() / ".leann" / "projects.json"
all_projects = []
if global_registry.exists():
try:
import json
with open(global_registry) as f:
all_projects = json.load(f)
except Exception:
pass
# Always include current project
current_path = Path.cwd()
if str(current_path) not in all_projects:
all_projects.append(str(current_path))
# Search across all projects
for project_dir in all_projects:
project_path = Path(project_dir)
if not project_path.exists():
continue
# 1) CLI-format index under .leann/indexes/<name>
index_dir = project_path / ".leann" / "indexes" / index_name
if index_dir.exists():
is_current = project_path == current_path
matches.append(
{
"project_path": project_path,
"index_dir": index_dir,
"is_current": is_current,
"kind": "cli",
}
)
# 2) App-format indexes
# We support two ways of addressing apps:
# a) by the file base (e.g., `pdf_documents`)
# b) by the parent directory name (e.g., `new_txt`)
seen_app_meta = set()
# 2a) by file base
for meta_file in project_path.rglob(f"{index_name}.leann.meta.json"):
if meta_file.is_file():
# Skip CLI-built indexes' meta under .leann/indexes
try:
cli_indexes_dir = project_path / ".leann" / "indexes"
if cli_indexes_dir.exists() and cli_indexes_dir in meta_file.parents:
continue
except Exception:
pass
is_current = project_path == current_path
key = (str(project_path), str(meta_file))
if key in seen_app_meta:
continue
seen_app_meta.add(key)
matches.append(
{
"project_path": project_path,
"files_dir": meta_file.parent,
"meta_file": meta_file,
"is_current": is_current,
"kind": "app",
"display_name": meta_file.parent.name,
"file_base": meta_file.name.replace(".leann.meta.json", ""),
}
)
# 2b) by parent directory name
for meta_file in project_path.rglob("*.leann.meta.json"):
if meta_file.is_file() and meta_file.parent.name == index_name:
# Skip CLI-built indexes' meta under .leann/indexes
try:
cli_indexes_dir = project_path / ".leann" / "indexes"
if cli_indexes_dir.exists() and cli_indexes_dir in meta_file.parents:
continue
except Exception:
pass
is_current = project_path == current_path
key = (str(project_path), str(meta_file))
if key in seen_app_meta:
continue
seen_app_meta.add(key)
matches.append(
{
"project_path": project_path,
"files_dir": meta_file.parent,
"meta_file": meta_file,
"is_current": is_current,
"kind": "app",
"display_name": meta_file.parent.name,
"file_base": meta_file.name.replace(".leann.meta.json", ""),
}
)
# Sort: current project first, then by project name
matches.sort(key=lambda x: (not x["is_current"], x["project_path"].name))
return matches
def _remove_single_match(self, match, index_name: str, force: bool):
"""Handle removal when only one match is found"""
project_path = match["project_path"]
is_current = match["is_current"]
kind = match.get("kind", "cli")
if is_current:
location_info = "current project"
emoji = "🏠"
else:
location_info = f"other project '{project_path.name}'"
emoji = "📂"
print(f"✅ Found 1 index named '{index_name}':")
print(f" {emoji} Location: {location_info}")
if kind == "cli":
print(f" 📍 Path: {project_path / '.leann' / 'indexes' / index_name}")
else:
print(f" 📍 Meta: {match['meta_file']}")
if not force:
if not is_current:
print("\n⚠️ CROSS-PROJECT REMOVAL!")
print(" This will delete the index from another project.")
response = input(f" ❓ Confirm removal from {location_info}? (y/N): ").strip().lower()
if response not in ["y", "yes"]:
print(" ❌ Removal cancelled.")
return False
if kind == "cli":
return self._delete_index_directory(
match["index_dir"],
index_name,
project_path if not is_current else None,
is_app=False,
)
else:
return self._delete_index_directory(
match["files_dir"],
match.get("display_name", index_name),
project_path if not is_current else None,
is_app=True,
meta_file=match.get("meta_file"),
app_file_base=match.get("file_base"),
)
def _remove_from_multiple_matches(self, matches, index_name: str, force: bool):
"""Handle removal when multiple matches are found"""
print(f"⚠️ Found {len(matches)} indexes named '{index_name}':")
print(" " + "" * 50)
for i, match in enumerate(matches, 1):
project_path = match["project_path"]
is_current = match["is_current"]
kind = match.get("kind", "cli")
if is_current:
print(f" {i}. 🏠 Current project ({'CLI' if kind == 'cli' else 'APP'})")
else:
print(f" {i}. 📂 {project_path.name} ({'CLI' if kind == 'cli' else 'APP'})")
# Show path details
if kind == "cli":
print(f" 📍 {project_path / '.leann' / 'indexes' / index_name}")
else:
print(f" 📍 {match['meta_file']}")
# Show size info
try:
if kind == "cli":
size_mb = sum(
f.stat().st_size for f in match["index_dir"].iterdir() if f.is_file()
) / (1024 * 1024)
else:
file_base = match.get("file_base")
size_mb = 0.0
if file_base:
size_mb = sum(
f.stat().st_size
for f in match["files_dir"].glob(f"{file_base}.leann*")
if f.is_file()
) / (1024 * 1024)
print(f" 📦 Size: {size_mb:.1f} MB")
except (OSError, PermissionError):
pass
print(" " + "" * 50)
if force:
print(" ❌ Multiple matches found, but --force specified.")
print(" Please run without --force to choose which one to remove.")
return False
try:
choice = input(
f" ❓ Which one to remove? (1-{len(matches)}, or 'c' to cancel): "
).strip()
if choice.lower() == "c":
print(" ❌ Removal cancelled.")
return False
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(matches):
selected_match = matches[choice_idx]
project_path = selected_match["project_path"]
is_current = selected_match["is_current"]
kind = selected_match.get("kind", "cli")
location = "current project" if is_current else f"'{project_path.name}' project"
print(f" 🎯 Selected: Remove from {location}")
# Final confirmation for safety
confirm = input(
f" ❓ FINAL CONFIRMATION - Type '{index_name}' to proceed: "
).strip()
if confirm != index_name:
print(" ❌ Confirmation failed. Removal cancelled.")
return False
if kind == "cli":
return self._delete_index_directory(
selected_match["index_dir"],
index_name,
project_path if not is_current else None,
is_app=False,
)
else:
return self._delete_index_directory(
selected_match["files_dir"],
selected_match.get("display_name", index_name),
project_path if not is_current else None,
is_app=True,
meta_file=selected_match.get("meta_file"),
app_file_base=selected_match.get("file_base"),
)
else:
print(" ❌ Invalid choice. Removal cancelled.")
return False
except (ValueError, KeyboardInterrupt):
print("\n ❌ Invalid input. Removal cancelled.")
return False
def _delete_index_directory(
self,
index_dir: Path,
index_display_name: str,
project_path: Optional[Path] = None,
is_app: bool = False,
meta_file: Optional[Path] = None,
app_file_base: Optional[str] = None,
):
"""Delete a CLI index directory or APP index files safely."""
try:
if is_app:
removed = 0
errors = 0
# Delete only files that belong to this app index (based on file base)
pattern_base = app_file_base or ""
for f in index_dir.glob(f"{pattern_base}.leann*"):
try:
f.unlink()
removed += 1
except Exception:
errors += 1
# Best-effort: also remove the meta file if specified and still exists
if meta_file and meta_file.exists():
try:
meta_file.unlink()
removed += 1
except Exception:
errors += 1
if removed > 0 and errors == 0:
if project_path:
print(
f"✅ App index '{index_display_name}' removed from {project_path.name}"
)
else:
print(f"✅ App index '{index_display_name}' removed successfully")
return True
elif removed > 0 and errors > 0:
print(
f"⚠️ App index '{index_display_name}' partially removed (some files couldn't be deleted)"
)
return True
else:
print(
f"❌ No files found to remove for app index '{index_display_name}' in {index_dir}"
)
return False
else:
import shutil
shutil.rmtree(index_dir)
if project_path:
print(f"✅ Index '{index_display_name}' removed from {project_path.name}")
else:
print(f"✅ Index '{index_display_name}' removed successfully")
return True
except Exception as e:
print(f"❌ Error removing index '{index_display_name}': {e}")
return False
def load_documents(
self, docs_paths: Union[str, list], custom_file_types: Union[str, None] = None
self,
docs_paths: Union[str, list],
custom_file_types: Union[str, None] = None,
include_hidden: bool = False,
args: Optional[dict[str, Any]] = None,
):
# Handle both single path (string) and multiple paths (list) for backward compatibility
if isinstance(docs_paths, str):
@@ -455,6 +901,10 @@ Examples:
all_documents = []
# Helper to detect hidden path components
def _path_has_hidden_segment(p: Path) -> bool:
return any(part.startswith(".") and part not in [".", ".."] for part in p.parts)
# First, process individual files if any
if files:
print(f"\n🔄 Processing {len(files)} individual file{'s' if len(files) > 1 else ''}...")
@@ -467,8 +917,12 @@ Examples:
files_by_dir = defaultdict(list)
for file_path in files:
parent_dir = str(Path(file_path).parent)
files_by_dir[parent_dir].append(file_path)
file_path_obj = Path(file_path)
if not include_hidden and _path_has_hidden_segment(file_path_obj):
print(f" ⚠️ Skipping hidden file: {file_path}")
continue
parent_dir = str(file_path_obj.parent)
files_by_dir[parent_dir].append(str(file_path_obj))
# Load files from each parent directory
for parent_dir, file_list in files_by_dir.items():
@@ -479,6 +933,7 @@ Examples:
file_docs = SimpleDirectoryReader(
parent_dir,
input_files=file_list,
# exclude_hidden only affects directory scans; input_files are explicit
filename_as_id=True,
).load_data()
all_documents.extend(file_docs)
@@ -577,6 +1032,8 @@ Examples:
# Check if file matches any exclude pattern
try:
relative_path = file_path.relative_to(docs_path)
if not include_hidden and _path_has_hidden_segment(relative_path):
continue
if self._should_exclude_file(relative_path, gitignore_matches):
continue
except ValueError:
@@ -604,6 +1061,7 @@ Examples:
try:
default_docs = SimpleDirectoryReader(
str(file_path.parent),
exclude_hidden=not include_hidden,
filename_as_id=True,
required_exts=[file_path.suffix],
).load_data()
@@ -632,6 +1090,7 @@ Examples:
encoding="utf-8",
required_exts=code_extensions,
file_extractor={}, # Use default extractors
exclude_hidden=not include_hidden,
filename_as_id=True,
).load_data(show_progress=True)
@@ -704,18 +1163,50 @@ Examples:
}
print("start chunking documents")
# Add progress bar for document chunking
for doc in tqdm(documents, desc="Chunking documents", unit="doc"):
# Check if this is a code file based on source path
source_path = doc.metadata.get("source", "")
is_code_file = any(source_path.endswith(ext) for ext in code_file_exts)
# Use appropriate parser based on file type
parser = self.code_parser if is_code_file else self.node_parser
nodes = parser.get_nodes_from_documents([doc])
# Check if AST chunking is requested
use_ast = getattr(args, "use_ast_chunking", False)
for node in nodes:
all_texts.append(node.get_content())
if use_ast:
print("🧠 Using AST-aware chunking for code files")
try:
# Import enhanced chunking utilities
# Add apps directory to path to import chunking utilities
apps_dir = Path(__file__).parent.parent.parent.parent.parent / "apps"
if apps_dir.exists():
sys.path.insert(0, str(apps_dir))
from chunking import create_text_chunks
# Use enhanced chunking with AST support
all_texts = create_text_chunks(
documents,
chunk_size=self.node_parser.chunk_size,
chunk_overlap=self.node_parser.chunk_overlap,
use_ast_chunking=True,
ast_chunk_size=getattr(args, "ast_chunk_size", 768),
ast_chunk_overlap=getattr(args, "ast_chunk_overlap", 96),
code_file_extensions=None, # Use defaults
ast_fallback_traditional=getattr(args, "ast_fallback_traditional", True),
)
except ImportError as e:
print(f"⚠️ AST chunking not available ({e}), falling back to traditional chunking")
use_ast = False
if not use_ast:
# Use traditional chunking logic
for doc in tqdm(documents, desc="Chunking documents", unit="doc"):
# Check if this is a code file based on source path
source_path = doc.metadata.get("source", "")
is_code_file = any(source_path.endswith(ext) for ext in code_file_exts)
# Use appropriate parser based on file type
parser = self.code_parser if is_code_file else self.node_parser
nodes = parser.get_nodes_from_documents([doc])
for node in nodes:
all_texts.append(node.get_content())
print(f"Loaded {len(documents)} documents, {len(all_texts)} chunks")
return all_texts
@@ -781,7 +1272,9 @@ Examples:
paragraph_separator="\n\n",
)
all_texts = self.load_documents(docs_paths, args.file_types)
all_texts = self.load_documents(
docs_paths, args.file_types, include_hidden=args.include_hidden, args=args
)
if not all_texts:
print("No documents found")
return
@@ -813,13 +1306,101 @@ Examples:
async def search_documents(self, args):
index_name = args.index_name
query = args.query
index_path = self.get_index_path(index_name)
if not self.index_exists(index_name):
print(
f"Index '{index_name}' not found. Use 'leann build {index_name} --docs <dir> [<dir2> ...]' to create it."
)
return
# First try to find the index in current project
index_path = self.get_index_path(index_name)
if self.index_exists(index_name):
# Found in current project, use it
pass
else:
# Search across all registered projects (like list_indexes does)
all_matches = self._find_all_matching_indexes(index_name)
if not all_matches:
print(
f"Index '{index_name}' not found. Use 'leann build {index_name} --docs <dir> [<dir2> ...]' to create it."
)
return
elif len(all_matches) == 1:
# Found exactly one match, use it
match = all_matches[0]
if match["kind"] == "cli":
index_path = str(match["index_dir"] / "documents.leann")
else:
# App format: use the meta file to construct the path
meta_file = match["meta_file"]
file_base = match["file_base"]
index_path = str(meta_file.parent / f"{file_base}.leann")
project_info = (
"current project"
if match["is_current"]
else f"project '{match['project_path'].name}'"
)
print(f"Using index '{index_name}' from {project_info}")
else:
# Multiple matches found
if args.non_interactive:
# Non-interactive mode: automatically select the best match
# Priority: current project first, then first available
current_matches = [m for m in all_matches if m["is_current"]]
if current_matches:
match = current_matches[0]
location_desc = "current project"
else:
match = all_matches[0]
location_desc = f"project '{match['project_path'].name}'"
if match["kind"] == "cli":
index_path = str(match["index_dir"] / "documents.leann")
else:
meta_file = match["meta_file"]
file_base = match["file_base"]
index_path = str(meta_file.parent / f"{file_base}.leann")
print(
f"Found {len(all_matches)} indexes named '{index_name}', using index from {location_desc}"
)
else:
# Interactive mode: ask user to choose
print(f"Found {len(all_matches)} indexes named '{index_name}':")
for i, match in enumerate(all_matches, 1):
project_path = match["project_path"]
is_current = match["is_current"]
kind = match.get("kind", "cli")
if is_current:
print(
f" {i}. 🏠 Current project ({'CLI' if kind == 'cli' else 'APP'})"
)
else:
print(
f" {i}. 📂 {project_path.name} ({'CLI' if kind == 'cli' else 'APP'})"
)
try:
choice = input(f"Which index to search? (1-{len(all_matches)}): ").strip()
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(all_matches):
match = all_matches[choice_idx]
if match["kind"] == "cli":
index_path = str(match["index_dir"] / "documents.leann")
else:
meta_file = match["meta_file"]
file_base = match["file_base"]
index_path = str(meta_file.parent / f"{file_base}.leann")
project_info = (
"current project"
if match["is_current"]
else f"project '{match['project_path'].name}'"
)
print(f"Using index '{index_name}' from {project_info}")
else:
print("Invalid choice. Aborting search.")
return
except (ValueError, KeyboardInterrupt):
print("Invalid input. Aborting search.")
return
searcher = LeannSearcher(index_path=index_path)
results = searcher.search(
@@ -918,6 +1499,8 @@ Examples:
if args.command == "list":
self.list_indexes()
elif args.command == "remove":
self.remove_index(args.index_name, args.force)
elif args.command == "build":
await self.build_index(args)
elif args.command == "search":
@@ -929,10 +1512,15 @@ Examples:
def main():
import logging
import dotenv
dotenv.load_dotenv()
# Set clean logging for CLI usage
logging.getLogger().setLevel(logging.WARNING) # Only show warnings and errors
cli = LeannCLI()
asyncio.run(cli.run())

View File

@@ -246,6 +246,16 @@ def compute_embeddings_openai(texts: list[str], model_name: str) -> np.ndarray:
except ImportError as e:
raise ImportError(f"OpenAI package not installed: {e}")
# Validate input list
if not texts:
raise ValueError("Cannot compute embeddings for empty text list")
# Extra validation: abort early if any item is empty/whitespace
invalid_count = sum(1 for t in texts if not isinstance(t, str) or not t.strip())
if invalid_count > 0:
raise ValueError(
f"Found {invalid_count} empty/invalid text(s) in input. Upstream should filter before calling OpenAI."
)
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY environment variable not set")

View File

@@ -192,6 +192,7 @@ class EmbeddingServerManager:
stderr_target = None # Direct to console for visible logs
# Start embedding server subprocess
logger.info(f"Starting server process with command: {' '.join(command)}")
self.server_process = subprocess.Popen(
command,
cwd=project_root,

View File

@@ -94,7 +94,7 @@ def handle_request(request):
},
}
# Build simplified command
# Build simplified command with non-interactive flag for MCP compatibility
cmd = [
"leann",
"search",
@@ -102,6 +102,7 @@ def handle_request(request):
args["query"],
f"--top-k={args.get('top_k', 5)}",
f"--complexity={args.get('complexity', 32)}",
"--non-interactive",
]
result = subprocess.run(cmd, capture_output=True, text=True)

View File

@@ -2,11 +2,17 @@
import importlib
import importlib.metadata
from typing import TYPE_CHECKING
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union
if TYPE_CHECKING:
from leann.interface import LeannBackendFactoryInterface
# Set up logger for this module
logger = logging.getLogger(__name__)
BACKEND_REGISTRY: dict[str, "LeannBackendFactoryInterface"] = {}
@@ -14,7 +20,7 @@ def register_backend(name: str):
"""A decorator to register a new backend class."""
def decorator(cls):
print(f"INFO: Registering backend '{name}'")
logger.debug(f"Registering backend '{name}'")
BACKEND_REGISTRY[name] = cls
return cls
@@ -39,3 +45,54 @@ def autodiscover_backends():
# print(f"WARN: Could not import backend module '{backend_module_name}': {e}")
pass
# print("INFO: Backend auto-discovery finished.")
def register_project_directory(project_dir: Optional[Union[str, Path]] = None):
"""
Register a project directory in the global LEANN registry.
This allows `leann list` to discover indexes created by apps or other tools.
Args:
project_dir: Directory to register. If None, uses current working directory.
"""
if project_dir is None:
project_dir = Path.cwd()
else:
project_dir = Path(project_dir)
# Only register directories that have some kind of LEANN content
# Either .leann/indexes/ (CLI format) or *.leann.meta.json files (apps format)
has_cli_indexes = (project_dir / ".leann" / "indexes").exists()
has_app_indexes = any(project_dir.rglob("*.leann.meta.json"))
if not (has_cli_indexes or has_app_indexes):
# Don't register if there are no LEANN indexes
return
global_registry = Path.home() / ".leann" / "projects.json"
global_registry.parent.mkdir(exist_ok=True)
project_str = str(project_dir.resolve())
# Load existing registry
projects = []
if global_registry.exists():
try:
with open(global_registry) as f:
projects = json.load(f)
except Exception:
logger.debug("Could not load existing project registry")
projects = []
# Add project if not already present
if project_str not in projects:
projects.append(project_str)
# Save updated registry
try:
with open(global_registry, "w") as f:
json.dump(projects, f, indent=2)
logger.debug(f"Registered project directory: {project_str}")
except Exception as e:
logger.warning(f"Could not save project registry: {e}")

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann"
version = "0.2.9"
version = "0.3.2"
description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -14,8 +14,6 @@ dependencies = [
"numpy>=1.26.0",
"torch",
"tqdm",
"flask",
"flask_compress",
"datasets>=2.15.0",
"evaluate",
"colorama",
@@ -48,6 +46,13 @@ dependencies = [
"pathspec>=0.12.1",
"nbconvert>=7.16.6",
"gitignore-parser>=0.1.12",
# AST-aware code chunking dependencies
"astchunk>=0.1.0",
"tree-sitter>=0.20.0",
"tree-sitter-python>=0.20.0",
"tree-sitter-java>=0.20.0",
"tree-sitter-c-sharp>=0.20.0",
"tree-sitter-typescript>=0.20.0",
]
[project.optional-dependencies]
@@ -66,9 +71,7 @@ test = [
"pytest>=7.0",
"pytest-timeout>=2.0",
"llama-index-core>=0.12.0",
"llama-index-readers-file>=0.4.0",
"python-dotenv>=1.0.0",
"sentence-transformers>=2.2.0",
]
diskann = [
@@ -100,13 +103,8 @@ leann-backend-hnsw = { path = "packages/leann-backend-hnsw", editable = true }
[tool.ruff]
target-version = "py39"
line-length = 100
extend-exclude = [
"third_party",
"*.egg-info",
"__pycache__",
".git",
".venv",
]
extend-exclude = ["third_party"]
[tool.ruff.lint]
select = [
@@ -129,21 +127,12 @@ ignore = [
"RUF012", # mutable class attributes should be annotated with typing.ClassVar
]
[tool.ruff.lint.per-file-ignores]
"test/**/*.py" = ["E402"] # module level import not at top of file (common in tests)
"examples/**/*.py" = ["E402"] # module level import not at top of file (common in examples)
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"
[dependency-groups]
dev = [
"ruff>=0.12.4",
]
[tool.lychee]
accept = ["200", "403", "429", "503"]
timeout = 20

View File

@@ -0,0 +1,397 @@
"""
Test suite for astchunk integration with LEANN.
Tests AST-aware chunking functionality, language detection, and fallback mechanisms.
"""
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# Add apps directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "apps"))
from typing import Optional
from chunking import (
create_ast_chunks,
create_text_chunks,
create_traditional_chunks,
detect_code_files,
get_language_from_extension,
)
class MockDocument:
"""Mock LlamaIndex Document for testing."""
def __init__(self, content: str, file_path: str = "", metadata: Optional[dict] = None):
self.content = content
self.metadata = metadata or {}
if file_path:
self.metadata["file_path"] = file_path
def get_content(self) -> str:
return self.content
class TestCodeFileDetection:
"""Test code file detection and language mapping."""
def test_detect_code_files_python(self):
"""Test detection of Python files."""
docs = [
MockDocument("print('hello')", "/path/to/file.py"),
MockDocument("This is text", "/path/to/file.txt"),
]
code_docs, text_docs = detect_code_files(docs)
assert len(code_docs) == 1
assert len(text_docs) == 1
assert code_docs[0].metadata["language"] == "python"
assert code_docs[0].metadata["is_code"] is True
assert text_docs[0].metadata["is_code"] is False
def test_detect_code_files_multiple_languages(self):
"""Test detection of multiple programming languages."""
docs = [
MockDocument("def func():", "/path/to/script.py"),
MockDocument("public class Test {}", "/path/to/Test.java"),
MockDocument("interface ITest {}", "/path/to/test.ts"),
MockDocument("using System;", "/path/to/Program.cs"),
MockDocument("Regular text content", "/path/to/document.txt"),
]
code_docs, text_docs = detect_code_files(docs)
assert len(code_docs) == 4
assert len(text_docs) == 1
languages = [doc.metadata["language"] for doc in code_docs]
assert "python" in languages
assert "java" in languages
assert "typescript" in languages
assert "csharp" in languages
def test_detect_code_files_no_file_path(self):
"""Test handling of documents without file paths."""
docs = [
MockDocument("some content"),
MockDocument("other content", metadata={"some_key": "value"}),
]
code_docs, text_docs = detect_code_files(docs)
assert len(code_docs) == 0
assert len(text_docs) == 2
for doc in text_docs:
assert doc.metadata["is_code"] is False
def test_get_language_from_extension(self):
"""Test language detection from file extensions."""
assert get_language_from_extension("test.py") == "python"
assert get_language_from_extension("Test.java") == "java"
assert get_language_from_extension("component.tsx") == "typescript"
assert get_language_from_extension("Program.cs") == "csharp"
assert get_language_from_extension("document.txt") is None
assert get_language_from_extension("") is None
class TestChunkingFunctions:
"""Test various chunking functionality."""
def test_create_traditional_chunks(self):
"""Test traditional text chunking."""
docs = [
MockDocument(
"This is a test document. It has multiple sentences. We want to test chunking."
)
]
chunks = create_traditional_chunks(docs, chunk_size=50, chunk_overlap=10)
assert len(chunks) > 0
assert all(isinstance(chunk, str) for chunk in chunks)
assert all(len(chunk.strip()) > 0 for chunk in chunks)
def test_create_traditional_chunks_empty_docs(self):
"""Test traditional chunking with empty documents."""
chunks = create_traditional_chunks([], chunk_size=50, chunk_overlap=10)
assert chunks == []
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip astchunk tests in CI - dependency may not be available",
)
def test_create_ast_chunks_with_astchunk_available(self):
"""Test AST chunking when astchunk is available."""
python_code = '''
def hello_world():
"""Print hello world message."""
print("Hello, World!")
def add_numbers(a, b):
"""Add two numbers and return the result."""
return a + b
class Calculator:
"""A simple calculator class."""
def __init__(self):
self.history = []
def add(self, a, b):
result = a + b
self.history.append(f"{a} + {b} = {result}")
return result
'''
docs = [MockDocument(python_code, "/test/calculator.py", {"language": "python"})]
try:
chunks = create_ast_chunks(docs, max_chunk_size=200, chunk_overlap=50)
# Should have multiple chunks due to different functions/classes
assert len(chunks) > 0
assert all(isinstance(chunk, str) for chunk in chunks)
assert all(len(chunk.strip()) > 0 for chunk in chunks)
# Check that code structure is somewhat preserved
combined_content = " ".join(chunks)
assert "def hello_world" in combined_content
assert "class Calculator" in combined_content
except ImportError:
# astchunk not available, should fall back to traditional chunking
chunks = create_ast_chunks(docs, max_chunk_size=200, chunk_overlap=50)
assert len(chunks) > 0 # Should still get chunks from fallback
def test_create_ast_chunks_fallback_to_traditional(self):
"""Test AST chunking falls back to traditional when astchunk is not available."""
docs = [MockDocument("def test(): pass", "/test/script.py", {"language": "python"})]
# Mock astchunk import to fail
with patch("chunking.create_ast_chunks"):
# First call (actual test) should import astchunk and potentially fail
# Let's call the actual function to test the import error handling
chunks = create_ast_chunks(docs)
# Should return some chunks (either from astchunk or fallback)
assert isinstance(chunks, list)
def test_create_text_chunks_traditional_mode(self):
"""Test text chunking in traditional mode."""
docs = [
MockDocument("def test(): pass", "/test/script.py"),
MockDocument("This is regular text.", "/test/doc.txt"),
]
chunks = create_text_chunks(docs, use_ast_chunking=False, chunk_size=50, chunk_overlap=10)
assert len(chunks) > 0
assert all(isinstance(chunk, str) for chunk in chunks)
def test_create_text_chunks_ast_mode(self):
"""Test text chunking in AST mode."""
docs = [
MockDocument("def test(): pass", "/test/script.py"),
MockDocument("This is regular text.", "/test/doc.txt"),
]
chunks = create_text_chunks(
docs,
use_ast_chunking=True,
ast_chunk_size=100,
ast_chunk_overlap=20,
chunk_size=50,
chunk_overlap=10,
)
assert len(chunks) > 0
assert all(isinstance(chunk, str) for chunk in chunks)
def test_create_text_chunks_custom_extensions(self):
"""Test text chunking with custom code file extensions."""
docs = [
MockDocument("function test() {}", "/test/script.js"), # Not in default extensions
MockDocument("Regular text", "/test/doc.txt"),
]
# First without custom extensions - should treat .js as text
chunks_without = create_text_chunks(docs, use_ast_chunking=True, code_file_extensions=None)
# Then with custom extensions - should treat .js as code
chunks_with = create_text_chunks(
docs, use_ast_chunking=True, code_file_extensions=[".js", ".jsx"]
)
# Both should return chunks
assert len(chunks_without) > 0
assert len(chunks_with) > 0
class TestIntegrationWithDocumentRAG:
"""Integration tests with the document RAG system."""
@pytest.fixture
def temp_code_dir(self):
"""Create a temporary directory with sample code files."""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create sample Python file
python_file = temp_path / "example.py"
python_file.write_text('''
def fibonacci(n):
"""Calculate fibonacci number."""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
class MathUtils:
@staticmethod
def factorial(n):
if n <= 1:
return 1
return n * MathUtils.factorial(n-1)
''')
# Create sample text file
text_file = temp_path / "readme.txt"
text_file.write_text("This is a sample text file for testing purposes.")
yield temp_path
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip integration tests in CI to avoid dependency issues",
)
def test_document_rag_with_ast_chunking(self, temp_code_dir):
"""Test document RAG with AST chunking enabled."""
with tempfile.TemporaryDirectory() as index_dir:
cmd = [
sys.executable,
"apps/document_rag.py",
"--llm",
"simulated",
"--embedding-model",
"facebook/contriever",
"--embedding-mode",
"sentence-transformers",
"--index-dir",
index_dir,
"--data-dir",
str(temp_code_dir),
"--enable-code-chunking",
"--query",
"How does the fibonacci function work?",
]
env = os.environ.copy()
env["HF_HUB_DISABLE_SYMLINKS"] = "1"
env["TOKENIZERS_PARALLELISM"] = "false"
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300, # 5 minutes
env=env,
)
# Should succeed even if astchunk is not available (fallback)
assert result.returncode == 0, f"Command failed: {result.stderr}"
output = result.stdout + result.stderr
assert "Index saved to" in output or "Using existing index" in output
except subprocess.TimeoutExpired:
pytest.skip("Test timed out - likely due to model download in CI")
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip integration tests in CI to avoid dependency issues",
)
def test_code_rag_application(self, temp_code_dir):
"""Test the specialized code RAG application."""
with tempfile.TemporaryDirectory() as index_dir:
cmd = [
sys.executable,
"apps/code_rag.py",
"--llm",
"simulated",
"--embedding-model",
"facebook/contriever",
"--index-dir",
index_dir,
"--repo-dir",
str(temp_code_dir),
"--query",
"What classes are defined in this code?",
]
env = os.environ.copy()
env["HF_HUB_DISABLE_SYMLINKS"] = "1"
env["TOKENIZERS_PARALLELISM"] = "false"
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300, env=env)
# Should succeed
assert result.returncode == 0, f"Command failed: {result.stderr}"
output = result.stdout + result.stderr
assert "Using AST-aware chunking" in output or "traditional chunking" in output
except subprocess.TimeoutExpired:
pytest.skip("Test timed out - likely due to model download in CI")
class TestErrorHandling:
"""Test error handling and edge cases."""
def test_text_chunking_empty_documents(self):
"""Test text chunking with empty document list."""
chunks = create_text_chunks([])
assert chunks == []
def test_text_chunking_invalid_parameters(self):
"""Test text chunking with invalid parameters."""
docs = [MockDocument("test content")]
# Should handle negative chunk sizes gracefully
chunks = create_text_chunks(
docs, chunk_size=0, chunk_overlap=0, ast_chunk_size=0, ast_chunk_overlap=0
)
# Should still return some result
assert isinstance(chunks, list)
def test_create_ast_chunks_no_language(self):
"""Test AST chunking with documents missing language metadata."""
docs = [MockDocument("def test(): pass", "/test/script.py")] # No language set
chunks = create_ast_chunks(docs)
# Should fall back to traditional chunking
assert isinstance(chunks, list)
assert len(chunks) >= 0 # May be empty if fallback also fails
def test_create_ast_chunks_empty_content(self):
"""Test AST chunking with empty content."""
docs = [MockDocument("", "/test/script.py", {"language": "python"})]
chunks = create_ast_chunks(docs)
# Should handle empty content gracefully
assert isinstance(chunks, list)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -57,6 +57,51 @@ def test_document_rag_simulated(test_data_dir):
assert "This is a simulated answer" in output
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip AST chunking tests in CI to avoid dependency issues",
)
def test_document_rag_with_ast_chunking(test_data_dir):
"""Test document_rag with AST-aware chunking enabled."""
with tempfile.TemporaryDirectory() as temp_dir:
# Use a subdirectory that doesn't exist yet to force index creation
index_dir = Path(temp_dir) / "test_ast_index"
cmd = [
sys.executable,
"apps/document_rag.py",
"--llm",
"simulated",
"--embedding-model",
"facebook/contriever",
"--embedding-mode",
"sentence-transformers",
"--index-dir",
str(index_dir),
"--data-dir",
str(test_data_dir),
"--enable-code-chunking", # Enable AST chunking
"--query",
"What is Pride and Prejudice about?",
]
env = os.environ.copy()
env["HF_HUB_DISABLE_SYMLINKS"] = "1"
env["TOKENIZERS_PARALLELISM"] = "false"
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, env=env)
# Check return code
assert result.returncode == 0, f"Command failed: {result.stderr}"
# Verify output
output = result.stdout + result.stderr
assert "Index saved to" in output or "Using existing index" in output
assert "This is a simulated answer" in output
# Should mention AST chunking if code files are present
# (might not be relevant for the test data, but command should succeed)
@pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY"), reason="OpenAI API key not available")
@pytest.mark.skipif(
os.environ.get("CI") == "true", reason="Skip OpenAI tests in CI to avoid API costs"

7539
uv.lock generated
View File

File diff suppressed because it is too large Load Diff