Compare commits

..

4 Commits

Author SHA1 Message Date
yichuan-w
9996c29618 format 2025-12-20 01:27:54 +00:00
yichuan-w
12951ad4d5 docs: polish README performance tip section
- Fix typo: 'matrilize' -> 'materialize'
- Improve clarity and formatting of --no-recompute flag explanation
- Add code block for better readability
2025-12-20 01:25:43 +00:00
yichuan-w
a878d2459b Format code style in leann_multi_vector.py for better readability
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-17 09:02:48 +00:00
yichuan-w
6c39a3427f Add custom folder support and improve image loading for multi-vector retrieval
- Enhanced _load_images_from_dir with recursive search support and better error handling
- Added support for WebP format and RGB conversion for all image modes
- Added custom folder CLI arguments (--custom-folder, --recursive, --rebuild-index)
- Improved documentation and removed completed TODO comment

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-17 08:53:41 +00:00
38 changed files with 1272 additions and 1224 deletions

View File

@@ -28,36 +28,15 @@ jobs:
run: | run: |
uv run --only-group lint pre-commit run --all-files --show-diff-on-failure uv run --only-group lint pre-commit run --all-files --show-diff-on-failure
type-check:
name: Type Check with ty
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
ref: ${{ inputs.ref }}
submodules: recursive
- name: Install uv and Python
uses: astral-sh/setup-uv@v6
with:
python-version: '3.11'
- name: Install ty
run: uv tool install ty
- name: Run ty type checker
run: |
# Run ty on core packages, apps, and tests
ty check packages/leann-core/src apps tests
build: build:
needs: [lint, type-check] needs: lint
name: Build ${{ matrix.os }} Python ${{ matrix.python }} name: Build ${{ matrix.os }} Python ${{ matrix.python }}
strategy: strategy:
matrix: matrix:
include: include:
# Note: Python 3.9 dropped - uses PEP 604 union syntax (str | None) - os: ubuntu-22.04
# which requires Python 3.10+ python: '3.9'
- os: ubuntu-22.04 - os: ubuntu-22.04
python: '3.10' python: '3.10'
- os: ubuntu-22.04 - os: ubuntu-22.04
@@ -67,6 +46,8 @@ jobs:
- os: ubuntu-22.04 - os: ubuntu-22.04
python: '3.13' python: '3.13'
# ARM64 Linux builds # ARM64 Linux builds
- os: ubuntu-24.04-arm
python: '3.9'
- os: ubuntu-24.04-arm - os: ubuntu-24.04-arm
python: '3.10' python: '3.10'
- os: ubuntu-24.04-arm - os: ubuntu-24.04-arm
@@ -75,6 +56,8 @@ jobs:
python: '3.12' python: '3.12'
- os: ubuntu-24.04-arm - os: ubuntu-24.04-arm
python: '3.13' python: '3.13'
- os: macos-14
python: '3.9'
- os: macos-14 - os: macos-14
python: '3.10' python: '3.10'
- os: macos-14 - os: macos-14
@@ -83,6 +66,8 @@ jobs:
python: '3.12' python: '3.12'
- os: macos-14 - os: macos-14
python: '3.13' python: '3.13'
- os: macos-15
python: '3.9'
- os: macos-15 - os: macos-15
python: '3.10' python: '3.10'
- os: macos-15 - os: macos-15
@@ -91,24 +76,16 @@ jobs:
python: '3.12' python: '3.12'
- os: macos-15 - os: macos-15
python: '3.13' python: '3.13'
# Intel Mac builds (x86_64) - replaces deprecated macos-13 - os: macos-13
# Note: Python 3.13 excluded - PyTorch has no wheels for macOS x86_64 + Python 3.13 python: '3.9'
# (PyTorch <=2.4.1 lacks cp313, PyTorch >=2.5.0 dropped Intel Mac support) - os: macos-13
- os: macos-15-intel
python: '3.10' python: '3.10'
- os: macos-15-intel - os: macos-13
python: '3.11' python: '3.11'
- os: macos-15-intel - os: macos-13
python: '3.12' python: '3.12'
# macOS 26 (beta) - arm64 # Note: macos-13 + Python 3.13 excluded due to PyTorch compatibility
- os: macos-26 # (PyTorch 2.5+ supports Python 3.13 but not Intel Mac x86_64)
python: '3.10'
- os: macos-26
python: '3.11'
- os: macos-26
python: '3.12'
- os: macos-26
python: '3.13'
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
@@ -227,16 +204,13 @@ jobs:
# Use system clang for better compatibility # Use system clang for better compatibility
export CC=clang export CC=clang
export CXX=clang++ export CXX=clang++
# Set deployment target based on runner # Homebrew libraries on each macOS version require matching minimum version
# macos-15-intel runs macOS 15, so target 15.0 (system libraries require it) if [[ "${{ matrix.os }}" == "macos-13" ]]; then
if [[ "${{ matrix.os }}" == "macos-15-intel" ]]; then export MACOSX_DEPLOYMENT_TARGET=13.0
export MACOSX_DEPLOYMENT_TARGET=15.0 elif [[ "${{ matrix.os }}" == "macos-14" ]]; then
elif [[ "${{ matrix.os }}" == macos-14* ]]; then
export MACOSX_DEPLOYMENT_TARGET=14.0 export MACOSX_DEPLOYMENT_TARGET=14.0
elif [[ "${{ matrix.os }}" == macos-15* ]]; then elif [[ "${{ matrix.os }}" == "macos-15" ]]; then
export MACOSX_DEPLOYMENT_TARGET=15.0 export MACOSX_DEPLOYMENT_TARGET=15.0
elif [[ "${{ matrix.os }}" == macos-26* ]]; then
export MACOSX_DEPLOYMENT_TARGET=26.0
fi fi
uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist
else else
@@ -250,16 +224,14 @@ jobs:
# Use system clang for better compatibility # Use system clang for better compatibility
export CC=clang export CC=clang
export CXX=clang++ export CXX=clang++
# Set deployment target based on runner # DiskANN requires macOS 13.3+ for sgesdd_ LAPACK function
# macos-15-intel runs macOS 15, so target 15.0 (system libraries require it) # But Homebrew libraries on each macOS version require matching minimum version
if [[ "${{ matrix.os }}" == "macos-15-intel" ]]; then if [[ "${{ matrix.os }}" == "macos-13" ]]; then
export MACOSX_DEPLOYMENT_TARGET=15.0 export MACOSX_DEPLOYMENT_TARGET=13.3
elif [[ "${{ matrix.os }}" == macos-14* ]]; then elif [[ "${{ matrix.os }}" == "macos-14" ]]; then
export MACOSX_DEPLOYMENT_TARGET=14.0 export MACOSX_DEPLOYMENT_TARGET=14.0
elif [[ "${{ matrix.os }}" == macos-15* ]]; then elif [[ "${{ matrix.os }}" == "macos-15" ]]; then
export MACOSX_DEPLOYMENT_TARGET=15.0 export MACOSX_DEPLOYMENT_TARGET=15.0
elif [[ "${{ matrix.os }}" == macos-26* ]]; then
export MACOSX_DEPLOYMENT_TARGET=26.0
fi fi
uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist
else else
@@ -297,19 +269,16 @@ jobs:
if: runner.os == 'macOS' if: runner.os == 'macOS'
run: | run: |
# Determine deployment target based on runner OS # Determine deployment target based on runner OS
# macos-15-intel runs macOS 15, so target 15.0 (system libraries require it) # Must match the Homebrew libraries for each macOS version
if [[ "${{ matrix.os }}" == "macos-15-intel" ]]; then if [[ "${{ matrix.os }}" == "macos-13" ]]; then
HNSW_TARGET="15.0" HNSW_TARGET="13.0"
DISKANN_TARGET="15.0" DISKANN_TARGET="13.3"
elif [[ "${{ matrix.os }}" == macos-14* ]]; then elif [[ "${{ matrix.os }}" == "macos-14" ]]; then
HNSW_TARGET="14.0" HNSW_TARGET="14.0"
DISKANN_TARGET="14.0" DISKANN_TARGET="14.0"
elif [[ "${{ matrix.os }}" == macos-15* ]]; then elif [[ "${{ matrix.os }}" == "macos-15" ]]; then
HNSW_TARGET="15.0" HNSW_TARGET="15.0"
DISKANN_TARGET="15.0" DISKANN_TARGET="15.0"
elif [[ "${{ matrix.os }}" == macos-26* ]]; then
HNSW_TARGET="26.0"
DISKANN_TARGET="26.0"
fi fi
# Repair HNSW wheel # Repair HNSW wheel
@@ -365,15 +334,12 @@ jobs:
PY_TAG=$($UV_PY -c "import sys; print(f'cp{sys.version_info[0]}{sys.version_info[1]}')") PY_TAG=$($UV_PY -c "import sys; print(f'cp{sys.version_info[0]}{sys.version_info[1]}')")
if [[ "$RUNNER_OS" == "macOS" ]]; then if [[ "$RUNNER_OS" == "macOS" ]]; then
# macos-15-intel runs macOS 15, so target 15.0 (system libraries require it) if [[ "${{ matrix.os }}" == "macos-13" ]]; then
if [[ "${{ matrix.os }}" == "macos-15-intel" ]]; then export MACOSX_DEPLOYMENT_TARGET=13.3
export MACOSX_DEPLOYMENT_TARGET=15.0 elif [[ "${{ matrix.os }}" == "macos-14" ]]; then
elif [[ "${{ matrix.os }}" == macos-14* ]]; then
export MACOSX_DEPLOYMENT_TARGET=14.0 export MACOSX_DEPLOYMENT_TARGET=14.0
elif [[ "${{ matrix.os }}" == macos-15* ]]; then elif [[ "${{ matrix.os }}" == "macos-15" ]]; then
export MACOSX_DEPLOYMENT_TARGET=15.0 export MACOSX_DEPLOYMENT_TARGET=15.0
elif [[ "${{ matrix.os }}" == macos-26* ]]; then
export MACOSX_DEPLOYMENT_TARGET=26.0
fi fi
fi fi

View File

@@ -36,7 +36,7 @@ LEANN is an innovative vector database that democratizes personal AI. Transform
LEANN achieves this through *graph-based selective recomputation* with *high-degree preserving pruning*, computing embeddings on-demand instead of storing them all. [Illustration Fig →](#-architecture--how-it-works) | [Paper →](https://arxiv.org/abs/2506.08276) LEANN achieves this through *graph-based selective recomputation* with *high-degree preserving pruning*, computing embeddings on-demand instead of storing them all. [Illustration Fig →](#-architecture--how-it-works) | [Paper →](https://arxiv.org/abs/2506.08276)
**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[live data](#mcp-integration-rag-on-live-data-from-any-platform)** ([Slack](#slack-messages-search-your-team-conversations), [Twitter](#-twitter-bookmarks-your-personal-tweet-library)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy. **Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[live data](#mcp-integration-rag-on-live-data-from-any-platform)** ([Slack](#mcp-integration-rag-on-live-data-from-any-platform), [Twitter](#mcp-integration-rag-on-live-data-from-any-platform)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy.
\* Claude Code only supports basic `grep`-style keyword search. **LEANN** is a drop-in **semantic search MCP service fully compatible with Claude Code**, unlocking intelligent retrieval without changing your workflow. 🔥 Check out [the easy setup →](packages/leann-mcp/README.md) \* Claude Code only supports basic `grep`-style keyword search. **LEANN** is a drop-in **semantic search MCP service fully compatible with Claude Code**, unlocking intelligent retrieval without changing your workflow. 🔥 Check out [the easy setup →](packages/leann-mcp/README.md)
@@ -392,54 +392,6 @@ python -m apps.code_rag --repo-dir "./my_codebase" --query "How does authenticat
</details> </details>
### 🎨 ColQwen: Multimodal PDF Retrieval with Vision-Language Models
Search through PDFs using both text and visual understanding with ColQwen2/ColPali models. Perfect for research papers, technical documents, and any PDFs with complex layouts, figures, or diagrams.
> **🍎 Mac Users**: ColQwen is optimized for Apple Silicon with MPS acceleration for faster inference!
```bash
# Build index from PDFs
python -m apps.colqwen_rag build --pdfs ./my_papers/ --index research_papers
# Search with text queries
python -m apps.colqwen_rag search research_papers "How does attention mechanism work?"
# Interactive Q&A
python -m apps.colqwen_rag ask research_papers --interactive
```
<details>
<summary><strong>📋 Click to expand: ColQwen Setup & Usage</strong></summary>
#### Prerequisites
```bash
# Install dependencies
uv pip install colpali_engine pdf2image pillow matplotlib qwen_vl_utils einops seaborn
brew install poppler # macOS only, for PDF processing
```
#### Build Index
```bash
python -m apps.colqwen_rag build \
--pdfs ./pdf_directory/ \
--index my_index \
--model colqwen2 # or colpali
```
#### Search
```bash
python -m apps.colqwen_rag search my_index "your question here" --top-k 5
```
#### Models
- **ColQwen2** (`colqwen2`): Latest vision-language model with improved performance
- **ColPali** (`colpali`): Proven multimodal retriever
For detailed usage, see the [ColQwen Guide](docs/COLQWEN_GUIDE.md).
</details>
### 📧 Your Personal Email Secretary: RAG on Apple Mail! ### 📧 Your Personal Email Secretary: RAG on Apple Mail!
> **Note:** The examples below currently support macOS only. Windows support coming soon. > **Note:** The examples below currently support macOS only. Windows support coming soon.

View File

@@ -257,8 +257,8 @@ class BaseRAGExample(ABC):
pass pass
@abstractmethod @abstractmethod
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load data from the source. Returns list of text chunks as dicts with 'text' and 'metadata' keys.""" """Load data from the source. Returns list of text chunks."""
pass pass
def get_llm_config(self, args) -> dict[str, Any]: def get_llm_config(self, args) -> dict[str, Any]:
@@ -282,8 +282,8 @@ class BaseRAGExample(ABC):
return config return config
async def build_index(self, args, texts: list[dict[str, Any]]) -> str: async def build_index(self, args, texts: list[str]) -> str:
"""Build LEANN index from text chunks (dicts with 'text' and 'metadata' keys).""" """Build LEANN index from texts."""
index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann") index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann")
print(f"\n[Building Index] Creating {self.name} index...") print(f"\n[Building Index] Creating {self.name} index...")
@@ -314,14 +314,8 @@ class BaseRAGExample(ABC):
batch_size = 1000 batch_size = 1000
for i in range(0, len(texts), batch_size): for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size] batch = texts[i : i + batch_size]
for item in batch: for text in batch:
# Handle both dict format (from create_text_chunks) and plain strings builder.add_text(text)
if isinstance(item, dict):
text = item.get("text", "")
metadata = item.get("metadata")
builder.add_text(text, metadata)
else:
builder.add_text(item)
print(f"Added {min(i + batch_size, len(texts))}/{len(texts)} texts...") print(f"Added {min(i + batch_size, len(texts))}/{len(texts)} texts...")
print("Building index structure...") print("Building index structure...")

View File

@@ -6,7 +6,6 @@ Supports Chrome browser history.
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -86,7 +85,7 @@ class BrowserRAG(BaseRAGExample):
return profiles return profiles
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load browser history and convert to text chunks.""" """Load browser history and convert to text chunks."""
# Determine Chrome profiles # Determine Chrome profiles
if args.chrome_profile and not args.auto_find_profiles: if args.chrome_profile and not args.auto_find_profiles:

View File

@@ -5,7 +5,6 @@ Supports ChatGPT export data from chat.html files.
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -81,7 +80,7 @@ class ChatGPTRAG(BaseRAGExample):
return export_files return export_files
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load ChatGPT export data and convert to text chunks.""" """Load ChatGPT export data and convert to text chunks."""
export_path = Path(args.export_path) export_path = Path(args.export_path)

View File

@@ -5,7 +5,6 @@ Supports Claude export data from JSON files.
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -81,7 +80,7 @@ class ClaudeRAG(BaseRAGExample):
return export_files return export_files
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load Claude export data and convert to text chunks.""" """Load Claude export data and convert to text chunks."""
export_path = Path(args.export_path) export_path = Path(args.export_path)

View File

@@ -6,7 +6,6 @@ optimized chunking parameters.
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -78,7 +77,7 @@ class CodeRAG(BaseRAGExample):
help="Try to preserve import statements in chunks (default: True)", help="Try to preserve import statements in chunks (default: True)",
) )
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load code files and convert to AST-aware chunks.""" """Load code files and convert to AST-aware chunks."""
print(f"🔍 Scanning code repository: {args.repo_dir}") print(f"🔍 Scanning code repository: {args.repo_dir}")
print(f"📁 Including extensions: {args.include_extensions}") print(f"📁 Including extensions: {args.include_extensions}")
@@ -89,6 +88,14 @@ class CodeRAG(BaseRAGExample):
if not repo_path.exists(): if not repo_path.exists():
raise ValueError(f"Repository directory not found: {args.repo_dir}") raise ValueError(f"Repository directory not found: {args.repo_dir}")
# Load code files with filtering
reader_kwargs = {
"recursive": True,
"encoding": "utf-8",
"required_exts": args.include_extensions,
"exclude_hidden": True,
}
# Create exclusion filter # Create exclusion filter
def file_filter(file_path: str) -> bool: def file_filter(file_path: str) -> bool:
"""Filter out unwanted files and directories.""" """Filter out unwanted files and directories."""
@@ -113,11 +120,8 @@ class CodeRAG(BaseRAGExample):
# Load documents with file filtering # Load documents with file filtering
documents = SimpleDirectoryReader( documents = SimpleDirectoryReader(
args.repo_dir, args.repo_dir,
file_extractor=None, file_extractor=None, # Use default extractors
recursive=True, **reader_kwargs,
encoding="utf-8",
required_exts=args.include_extensions,
exclude_hidden=True,
).load_data(show_progress=True) ).load_data(show_progress=True)
# Apply custom filtering # Apply custom filtering

View File

@@ -1,364 +0,0 @@
#!/usr/bin/env python3
"""
ColQwen RAG - Easy-to-use multimodal PDF retrieval with ColQwen2/ColPali
Usage:
python -m apps.colqwen_rag build --pdfs ./my_pdfs/ --index my_index
python -m apps.colqwen_rag search my_index "How does attention work?"
python -m apps.colqwen_rag ask my_index --interactive
"""
import argparse
import os
import sys
from pathlib import Path
from typing import Optional, cast
# Add LEANN packages to path
_repo_root = Path(__file__).resolve().parents[1]
_leann_core_src = _repo_root / "packages" / "leann-core" / "src"
_leann_hnsw_pkg = _repo_root / "packages" / "leann-backend-hnsw"
if str(_leann_core_src) not in sys.path:
sys.path.append(str(_leann_core_src))
if str(_leann_hnsw_pkg) not in sys.path:
sys.path.append(str(_leann_hnsw_pkg))
import torch # noqa: E402
from colpali_engine import ColPali, ColPaliProcessor, ColQwen2, ColQwen2Processor # noqa: E402
from colpali_engine.utils.torch_utils import ListDataset # noqa: E402
from pdf2image import convert_from_path # noqa: E402
from PIL import Image # noqa: E402
from torch.utils.data import DataLoader # noqa: E402
from tqdm import tqdm # noqa: E402
# Import the existing multi-vector implementation
sys.path.append(str(_repo_root / "apps" / "multimodal" / "vision-based-pdf-multi-vector"))
from leann_multi_vector import LeannMultiVector # noqa: E402
class ColQwenRAG:
"""Easy-to-use ColQwen RAG system for multimodal PDF retrieval."""
def __init__(self, model_type: str = "colpali"):
"""
Initialize ColQwen RAG system.
Args:
model_type: "colqwen2" or "colpali"
"""
self.model_type = model_type
self.device = self._get_device()
# Use float32 on MPS to avoid memory issues, float16 on CUDA, bfloat16 on CPU
if self.device.type == "mps":
self.dtype = torch.float32
elif self.device.type == "cuda":
self.dtype = torch.float16
else:
self.dtype = torch.bfloat16
print(f"🚀 Initializing {model_type.upper()} on {self.device} with {self.dtype}")
# Load model and processor with MPS-optimized settings
try:
if model_type == "colqwen2":
self.model_name = "vidore/colqwen2-v1.0"
if self.device.type == "mps":
# For MPS, load on CPU first then move to avoid memory allocation issues
self.model = ColQwen2.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
device_map="cpu",
low_cpu_mem_usage=True,
).eval()
self.model = self.model.to(self.device)
else:
self.model = ColQwen2.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
device_map=self.device,
low_cpu_mem_usage=True,
).eval()
self.processor = ColQwen2Processor.from_pretrained(self.model_name)
else: # colpali
self.model_name = "vidore/colpali-v1.2"
if self.device.type == "mps":
# For MPS, load on CPU first then move to avoid memory allocation issues
self.model = ColPali.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
device_map="cpu",
low_cpu_mem_usage=True,
).eval()
self.model = self.model.to(self.device)
else:
self.model = ColPali.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
device_map=self.device,
low_cpu_mem_usage=True,
).eval()
self.processor = ColPaliProcessor.from_pretrained(self.model_name)
except Exception as e:
if "memory" in str(e).lower() or "offload" in str(e).lower():
print(f"⚠️ Memory constraint on {self.device}, using CPU with optimizations...")
self.device = torch.device("cpu")
self.dtype = torch.float32
if model_type == "colqwen2":
self.model = ColQwen2.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
device_map="cpu",
low_cpu_mem_usage=True,
).eval()
else:
self.model = ColPali.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
device_map="cpu",
low_cpu_mem_usage=True,
).eval()
else:
raise
def _get_device(self):
"""Auto-select best available device."""
if torch.cuda.is_available():
return torch.device("cuda")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
return torch.device("mps")
else:
return torch.device("cpu")
def build_index(self, pdf_paths: list[str], index_name: str, pages_dir: Optional[str] = None):
"""
Build multimodal index from PDF files.
Args:
pdf_paths: List of PDF file paths
index_name: Name for the index
pages_dir: Directory to save page images (optional)
"""
print(f"Building index '{index_name}' from {len(pdf_paths)} PDFs...")
# Convert PDFs to images
all_images = []
all_metadata = []
if pages_dir:
os.makedirs(pages_dir, exist_ok=True)
for pdf_path in tqdm(pdf_paths, desc="Converting PDFs"):
try:
images = convert_from_path(pdf_path, dpi=150)
pdf_name = Path(pdf_path).stem
for i, image in enumerate(images):
# Save image if pages_dir specified
if pages_dir:
image_path = Path(pages_dir) / f"{pdf_name}_page_{i + 1}.png"
image.save(image_path)
all_images.append(image)
all_metadata.append(
{
"pdf_path": pdf_path,
"pdf_name": pdf_name,
"page_number": i + 1,
"image_path": str(image_path) if pages_dir else None,
}
)
except Exception as e:
print(f"❌ Error processing {pdf_path}: {e}")
continue
print(f"📄 Converted {len(all_images)} pages from {len(pdf_paths)} PDFs")
print(f"All metadata: {all_metadata}")
# Generate embeddings
print("🧠 Generating embeddings...")
embeddings = self._embed_images(all_images)
# Build LEANN index
print("🔍 Building LEANN index...")
leann_mv = LeannMultiVector(
index_path=index_name,
dim=embeddings.shape[-1],
embedding_model_name=self.model_type,
)
# Create collection and insert data
leann_mv.create_collection()
for i, (embedding, metadata) in enumerate(zip(embeddings, all_metadata)):
data = {
"doc_id": i,
"filepath": metadata.get("image_path", ""),
"colbert_vecs": embedding.numpy(), # Convert tensor to numpy
}
leann_mv.insert(data)
# Build the index
leann_mv.create_index()
print(f"✅ Index '{index_name}' built successfully!")
return leann_mv
def search(self, index_name: str, query: str, top_k: int = 5):
"""
Search the index with a text query.
Args:
index_name: Name of the index to search
query: Text query
top_k: Number of results to return
"""
print(f"🔍 Searching '{index_name}' for: '{query}'")
# Load index
leann_mv = LeannMultiVector(
index_path=index_name,
dim=128, # Will be updated when loading
embedding_model_name=self.model_type,
)
# Generate query embedding
query_embedding = self._embed_query(query)
# Search (returns list of (score, doc_id) tuples)
search_results = leann_mv.search(query_embedding.numpy(), topk=top_k)
# Display results
print(f"\n📋 Top {len(search_results)} results:")
for i, (score, doc_id) in enumerate(search_results, 1):
# Get metadata for this doc_id (we need to load the metadata)
print(f"{i}. Score: {score:.3f} | Doc ID: {doc_id}")
return search_results
def ask(self, index_name: str, interactive: bool = False):
"""
Interactive Q&A with the indexed documents.
Args:
index_name: Name of the index to query
interactive: Whether to run in interactive mode
"""
print(f"💬 ColQwen Chat with '{index_name}'")
if interactive:
print("Type 'quit' to exit, 'help' for commands")
while True:
try:
query = input("\n🤔 Your question: ").strip()
if query.lower() in ["quit", "exit", "q"]:
break
elif query.lower() == "help":
print("Commands: quit/exit/q (exit), help (this message)")
continue
elif not query:
continue
self.search(index_name, query, top_k=3)
# TODO: Add answer generation with Qwen-VL
print("\n💡 For detailed answers, we can integrate Qwen-VL here!")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
break
else:
query = input("🤔 Your question: ").strip()
if query:
self.search(index_name, query)
def _embed_images(self, images: list[Image.Image]) -> torch.Tensor:
"""Generate embeddings for a list of images."""
dataset = ListDataset(images)
dataloader = DataLoader(dataset, batch_size=1, shuffle=False, collate_fn=lambda x: x)
embeddings = []
with torch.no_grad():
for batch in tqdm(dataloader, desc="Embedding images"):
batch_images = cast(list, batch)
batch_inputs = self.processor.process_images(batch_images).to(self.device)
batch_embeddings = self.model(**batch_inputs)
embeddings.append(batch_embeddings.cpu())
return torch.cat(embeddings, dim=0)
def _embed_query(self, query: str) -> torch.Tensor:
"""Generate embedding for a text query."""
with torch.no_grad():
query_inputs = self.processor.process_queries([query]).to(self.device)
query_embedding = self.model(**query_inputs)
return query_embedding.cpu()
def main():
parser = argparse.ArgumentParser(description="ColQwen RAG - Easy multimodal PDF retrieval")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Build command
build_parser = subparsers.add_parser("build", help="Build index from PDFs")
build_parser.add_argument("--pdfs", required=True, help="Directory containing PDF files")
build_parser.add_argument("--index", required=True, help="Index name")
build_parser.add_argument(
"--model", choices=["colqwen2", "colpali"], default="colqwen2", help="Model to use"
)
build_parser.add_argument("--pages-dir", help="Directory to save page images")
# Search command
search_parser = subparsers.add_parser("search", help="Search the index")
search_parser.add_argument("index", help="Index name")
search_parser.add_argument("query", help="Search query")
search_parser.add_argument("--top-k", type=int, default=5, help="Number of results")
search_parser.add_argument(
"--model", choices=["colqwen2", "colpali"], default="colqwen2", help="Model to use"
)
# Ask command
ask_parser = subparsers.add_parser("ask", help="Interactive Q&A")
ask_parser.add_argument("index", help="Index name")
ask_parser.add_argument("--interactive", action="store_true", help="Interactive mode")
ask_parser.add_argument(
"--model", choices=["colqwen2", "colpali"], default="colqwen2", help="Model to use"
)
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Initialize ColQwen RAG
if args.command == "build":
colqwen = ColQwenRAG(args.model)
# Get PDF files
pdf_dir = Path(args.pdfs)
if pdf_dir.is_file() and pdf_dir.suffix.lower() == ".pdf":
pdf_paths = [str(pdf_dir)]
elif pdf_dir.is_dir():
pdf_paths = [str(p) for p in pdf_dir.glob("*.pdf")]
else:
print(f"❌ Invalid PDF path: {args.pdfs}")
return
if not pdf_paths:
print(f"❌ No PDF files found in {args.pdfs}")
return
colqwen.build_index(pdf_paths, args.index, args.pages_dir)
elif args.command == "search":
colqwen = ColQwenRAG(args.model)
colqwen.search(args.index, args.query, args.top_k)
elif args.command == "ask":
colqwen = ColQwenRAG(args.model)
colqwen.ask(args.index, args.interactive)
if __name__ == "__main__":
main()

View File

@@ -5,7 +5,6 @@ Supports PDF, TXT, MD, and other document formats.
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -52,7 +51,7 @@ class DocumentRAG(BaseRAGExample):
help="Enable AST-aware chunking for code files in the data directory", help="Enable AST-aware chunking for code files in the data directory",
) )
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load documents and convert to text chunks.""" """Load documents and convert to text chunks."""
print(f"Loading documents from: {args.data_dir}") print(f"Loading documents from: {args.data_dir}")
if args.file_types: if args.file_types:
@@ -66,12 +65,16 @@ class DocumentRAG(BaseRAGExample):
raise ValueError(f"Data directory not found: {args.data_dir}") raise ValueError(f"Data directory not found: {args.data_dir}")
# Load documents # Load documents
documents = SimpleDirectoryReader( reader_kwargs = {
args.data_dir, "recursive": True,
recursive=True, "encoding": "utf-8",
encoding="utf-8", }
required_exts=args.file_types if args.file_types else None, if args.file_types:
).load_data(show_progress=True) reader_kwargs["required_exts"] = args.file_types
documents = SimpleDirectoryReader(args.data_dir, **reader_kwargs).load_data(
show_progress=True
)
if not documents: if not documents:
print(f"No documents found in {args.data_dir} with extensions {args.file_types}") print(f"No documents found in {args.data_dir} with extensions {args.file_types}")

View File

@@ -127,12 +127,11 @@ class EmlxMboxReader(MboxReader):
def load_data( def load_data(
self, self,
file: Path, # Note: for EmlxMboxReader, this is actually a directory directory: Path,
extra_info: dict | None = None, extra_info: dict | None = None,
fs: AbstractFileSystem | None = None, fs: AbstractFileSystem | None = None,
) -> list[Document]: ) -> list[Document]:
"""Parse .emlx files from directory into strings using MboxReader logic.""" """Parse .emlx files from directory into strings using MboxReader logic."""
directory = file # Rename for clarity - this is a directory of .emlx files
import os import os
import tempfile import tempfile

View File

@@ -5,7 +5,6 @@ Supports Apple Mail on macOS.
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -65,7 +64,7 @@ class EmailRAG(BaseRAGExample):
return messages_dirs return messages_dirs
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load emails and convert to text chunks.""" """Load emails and convert to text chunks."""
# Determine mail directories # Determine mail directories
if args.mail_path: if args.mail_path:

View File

@@ -86,7 +86,7 @@ class WeChatHistoryReader(BaseReader):
text=True, text=True,
timeout=5, timeout=5,
) )
return result.returncode == 0 and bool(result.stdout.strip()) return result.returncode == 0 and result.stdout.strip()
except Exception: except Exception:
return False return False
@@ -314,9 +314,7 @@ class WeChatHistoryReader(BaseReader):
return concatenated_groups return concatenated_groups
def _create_concatenated_content( def _create_concatenated_content(self, message_group: dict, contact_name: str) -> str:
self, message_group: dict, contact_name: str
) -> tuple[str, str]:
""" """
Create concatenated content from a group of messages. Create concatenated content from a group of messages.

View File

@@ -1,219 +0,0 @@
#!/usr/bin/env python3
"""
CLIP Image RAG Application
This application enables RAG (Retrieval-Augmented Generation) on images using CLIP embeddings.
You can index a directory of images and search them using text queries.
Usage:
python -m apps.image_rag --image-dir ./my_images/ --query "a sunset over mountains"
python -m apps.image_rag --image-dir ./my_images/ --interactive
"""
import argparse
import pickle
import tempfile
from pathlib import Path
from typing import Any
import numpy as np
from PIL import Image
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
from apps.base_rag_example import BaseRAGExample
class ImageRAG(BaseRAGExample):
"""
RAG application for images using CLIP embeddings.
This class provides a complete RAG pipeline for image data, including
CLIP embedding generation, indexing, and text-based image search.
"""
def __init__(self):
super().__init__(
name="Image RAG",
description="RAG application for images using CLIP embeddings",
default_index_name="image_index",
)
# Override default embedding model to use CLIP
self.embedding_model_default = "clip-ViT-L-14"
self.embedding_mode_default = "sentence-transformers"
self._image_data: list[dict] = []
def _add_specific_arguments(self, parser: argparse.ArgumentParser):
"""Add image-specific arguments."""
image_group = parser.add_argument_group("Image Parameters")
image_group.add_argument(
"--image-dir",
type=str,
required=True,
help="Directory containing images to index",
)
image_group.add_argument(
"--image-extensions",
type=str,
nargs="+",
default=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"],
help="Image file extensions to process (default: .jpg .jpeg .png .gif .bmp .webp)",
)
image_group.add_argument(
"--batch-size",
type=int,
default=32,
help="Batch size for CLIP embedding generation (default: 32)",
)
async def load_data(self, args) -> list[dict[str, Any]]:
"""Load images, generate CLIP embeddings, and return text descriptions."""
self._image_data = self._load_images_and_embeddings(args)
return [entry["text"] for entry in self._image_data]
def _load_images_and_embeddings(self, args) -> list[dict]:
"""Helper to process images and produce embeddings/metadata."""
image_dir = Path(args.image_dir)
if not image_dir.exists():
raise ValueError(f"Image directory does not exist: {image_dir}")
print(f"📸 Loading images from {image_dir}...")
# Find all image files
image_files = []
for ext in args.image_extensions:
image_files.extend(image_dir.rglob(f"*{ext}"))
image_files.extend(image_dir.rglob(f"*{ext.upper()}"))
if not image_files:
raise ValueError(
f"No images found in {image_dir} with extensions {args.image_extensions}"
)
print(f"✅ Found {len(image_files)} images")
# Limit if max_items is set
if args.max_items > 0:
image_files = image_files[: args.max_items]
print(f"📊 Processing {len(image_files)} images (limited by --max-items)")
# Load CLIP model
print("🔍 Loading CLIP model...")
model = SentenceTransformer(self.embedding_model_default)
# Process images and generate embeddings
print("🖼️ Processing images and generating embeddings...")
image_data = []
batch_images = []
batch_paths = []
for image_path in tqdm(image_files, desc="Processing images"):
try:
image = Image.open(image_path).convert("RGB")
batch_images.append(image)
batch_paths.append(image_path)
# Process in batches
if len(batch_images) >= args.batch_size:
embeddings = model.encode(
batch_images,
convert_to_numpy=True,
normalize_embeddings=True,
batch_size=args.batch_size,
show_progress_bar=False,
)
for img_path, embedding in zip(batch_paths, embeddings):
image_data.append(
{
"text": f"Image: {img_path.name}\nPath: {img_path}",
"metadata": {
"image_path": str(img_path),
"image_name": img_path.name,
"image_dir": str(image_dir),
},
"embedding": embedding.astype(np.float32),
}
)
batch_images = []
batch_paths = []
except Exception as e:
print(f"⚠️ Failed to process {image_path}: {e}")
continue
# Process remaining images
if batch_images:
embeddings = model.encode(
batch_images,
convert_to_numpy=True,
normalize_embeddings=True,
batch_size=len(batch_images),
show_progress_bar=False,
)
for img_path, embedding in zip(batch_paths, embeddings):
image_data.append(
{
"text": f"Image: {img_path.name}\nPath: {img_path}",
"metadata": {
"image_path": str(img_path),
"image_name": img_path.name,
"image_dir": str(image_dir),
},
"embedding": embedding.astype(np.float32),
}
)
print(f"✅ Processed {len(image_data)} images")
return image_data
async def build_index(self, args, texts: list[dict[str, Any]]) -> str:
"""Build index using pre-computed CLIP embeddings."""
from leann.api import LeannBuilder
if not self._image_data or len(self._image_data) != len(texts):
raise RuntimeError("No image data found. Make sure load_data() ran successfully.")
print("🔨 Building LEANN index with CLIP embeddings...")
builder = LeannBuilder(
backend_name=args.backend_name,
embedding_model=self.embedding_model_default,
embedding_mode=self.embedding_mode_default,
is_recompute=False,
distance_metric="cosine",
graph_degree=args.graph_degree,
build_complexity=args.build_complexity,
is_compact=not args.no_compact,
)
for text, data in zip(texts, self._image_data):
builder.add_text(text=text, metadata=data["metadata"])
ids = [str(i) for i in range(len(self._image_data))]
embeddings = np.array([data["embedding"] for data in self._image_data], dtype=np.float32)
with tempfile.NamedTemporaryFile(mode="wb", suffix=".pkl", delete=False) as f:
pickle.dump((ids, embeddings), f)
pkl_path = f.name
try:
index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann")
builder.build_index_from_embeddings(index_path, pkl_path)
print(f"✅ Index built successfully at {index_path}")
return index_path
finally:
Path(pkl_path).unlink()
def main():
"""Main entry point for the image RAG application."""
import asyncio
app = ImageRAG()
asyncio.run(app.run())
if __name__ == "__main__":
main()

View File

@@ -6,7 +6,6 @@ This example demonstrates how to build a RAG system on your iMessage conversatio
import asyncio import asyncio
from pathlib import Path from pathlib import Path
from typing import Any
from leann.chunking_utils import create_text_chunks from leann.chunking_utils import create_text_chunks
@@ -57,7 +56,7 @@ class IMessageRAG(BaseRAGExample):
help="Overlap between text chunks (default: 200)", help="Overlap between text chunks (default: 200)",
) )
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load iMessage history and convert to text chunks.""" """Load iMessage history and convert to text chunks."""
print("Loading iMessage conversation history...") print("Loading iMessage conversation history...")

View File

@@ -18,11 +18,10 @@ _repo_root = Path(__file__).resolve().parents[3]
_leann_core_src = _repo_root / "packages" / "leann-core" / "src" _leann_core_src = _repo_root / "packages" / "leann-core" / "src"
_leann_hnsw_pkg = _repo_root / "packages" / "leann-backend-hnsw" _leann_hnsw_pkg = _repo_root / "packages" / "leann-backend-hnsw"
if str(_leann_core_src) not in sys.path: if str(_leann_core_src) not in sys.path:
sys.path.insert(0, str(_leann_core_src)) sys.path.append(str(_leann_core_src))
if str(_leann_hnsw_pkg) not in sys.path: if str(_leann_hnsw_pkg) not in sys.path:
sys.path.insert(0, str(_leann_hnsw_pkg)) sys.path.append(str(_leann_hnsw_pkg))
from leann_multi_vector import LeannMultiVector
import torch import torch
from colpali_engine.models import ColPali from colpali_engine.models import ColPali
@@ -94,9 +93,9 @@ for batch_doc in tqdm(dataloader):
print(ds[0].shape) print(ds[0].shape)
# %% # %%
# Build HNSW index via LeannMultiVector primitives and run search # Build HNSW index via LeannRetriever primitives and run search
index_path = "./indexes/colpali.leann" index_path = "./indexes/colpali.leann"
retriever = LeannMultiVector(index_path=index_path, dim=int(ds[0].shape[-1])) retriever = LeannRetriever(index_path=index_path, dim=int(ds[0].shape[-1]))
retriever.create_collection() retriever.create_collection()
filepaths = [os.path.join("./pages", name) for name in page_filenames] filepaths = [os.path.join("./pages", name) for name in page_filenames]
for i in range(len(filepaths)): for i in range(len(filepaths)):

View File

@@ -5,7 +5,7 @@ import argparse
import faulthandler import faulthandler
import os import os
import time import time
from typing import Any, Optional, cast from typing import Any, Optional
import numpy as np import numpy as np
from PIL import Image from PIL import Image
@@ -223,7 +223,7 @@ if need_to_build_index:
# Use filenames as identifiers instead of full paths for cleaner metadata # Use filenames as identifiers instead of full paths for cleaner metadata
filepaths = [os.path.basename(fp) for fp in filepaths] filepaths = [os.path.basename(fp) for fp in filepaths]
elif USE_HF_DATASET: elif USE_HF_DATASET:
from datasets import Dataset, DatasetDict, concatenate_datasets, load_dataset from datasets import load_dataset, concatenate_datasets, DatasetDict
# Determine which datasets to load # Determine which datasets to load
if DATASET_NAMES is not None: if DATASET_NAMES is not None:
@@ -281,12 +281,12 @@ if need_to_build_index:
splits_to_load = DATASET_SPLITS splits_to_load = DATASET_SPLITS
# Load and concatenate multiple splits for this dataset # Load and concatenate multiple splits for this dataset
datasets_to_concat: list[Dataset] = [] datasets_to_concat = []
for split in splits_to_load: for split in splits_to_load:
if split not in dataset_dict: if split not in dataset_dict:
print(f" Warning: Split '{split}' not found in dataset. Available splits: {list(dataset_dict.keys())}") print(f" Warning: Split '{split}' not found in dataset. Available splits: {list(dataset_dict.keys())}")
continue continue
split_dataset = cast(Dataset, dataset_dict[split]) split_dataset = dataset_dict[split]
print(f" Loaded split '{split}': {len(split_dataset)} pages") print(f" Loaded split '{split}': {len(split_dataset)} pages")
datasets_to_concat.append(split_dataset) datasets_to_concat.append(split_dataset)

View File

@@ -25,9 +25,9 @@ Usage:
import argparse import argparse
import json import json
import os import os
from typing import Any, Optional, cast from typing import Optional
from datasets import Dataset, load_dataset from datasets import load_dataset
from leann_multi_vector import ( from leann_multi_vector import (
ViDoReBenchmarkEvaluator, ViDoReBenchmarkEvaluator,
_ensure_repo_paths_importable, _ensure_repo_paths_importable,
@@ -151,43 +151,40 @@ def load_vidore_v1_data(
""" """
print(f"Loading dataset: {dataset_path} (split={split})") print(f"Loading dataset: {dataset_path} (split={split})")
# Load queries - cast to Dataset since we know split returns Dataset not DatasetDict # Load queries
query_ds = cast(Dataset, load_dataset(dataset_path, "queries", split=split, revision=revision)) query_ds = load_dataset(dataset_path, "queries", split=split, revision=revision)
queries: dict[str, str] = {} queries = {}
for row in query_ds: for row in query_ds:
row_dict = cast(dict[str, Any], row) query_id = f"query-{split}-{row['query-id']}"
query_id = f"query-{split}-{row_dict['query-id']}" queries[query_id] = row["query"]
queries[query_id] = row_dict["query"]
# Load corpus (images) - cast to Dataset # Load corpus (images)
corpus_ds = cast(Dataset, load_dataset(dataset_path, "corpus", split=split, revision=revision)) corpus_ds = load_dataset(dataset_path, "corpus", split=split, revision=revision)
corpus: dict[str, Any] = {} corpus = {}
for row in corpus_ds: for row in corpus_ds:
row_dict = cast(dict[str, Any], row) corpus_id = f"corpus-{split}-{row['corpus-id']}"
corpus_id = f"corpus-{split}-{row_dict['corpus-id']}"
# Extract image from the dataset row # Extract image from the dataset row
if "image" in row_dict: if "image" in row:
corpus[corpus_id] = row_dict["image"] corpus[corpus_id] = row["image"]
elif "page_image" in row_dict: elif "page_image" in row:
corpus[corpus_id] = row_dict["page_image"] corpus[corpus_id] = row["page_image"]
else: else:
raise ValueError( raise ValueError(
f"No image field found in corpus. Available fields: {list(row_dict.keys())}" f"No image field found in corpus. Available fields: {list(row.keys())}"
) )
# Load qrels (relevance judgments) - cast to Dataset # Load qrels (relevance judgments)
qrels_ds = cast(Dataset, load_dataset(dataset_path, "qrels", split=split, revision=revision)) qrels_ds = load_dataset(dataset_path, "qrels", split=split, revision=revision)
qrels: dict[str, dict[str, int]] = {} qrels = {}
for row in qrels_ds: for row in qrels_ds:
row_dict = cast(dict[str, Any], row) query_id = f"query-{split}-{row['query-id']}"
query_id = f"query-{split}-{row_dict['query-id']}" corpus_id = f"corpus-{split}-{row['corpus-id']}"
corpus_id = f"corpus-{split}-{row_dict['corpus-id']}"
if query_id not in qrels: if query_id not in qrels:
qrels[query_id] = {} qrels[query_id] = {}
qrels[query_id][corpus_id] = int(row_dict["score"]) qrels[query_id][corpus_id] = int(row["score"])
print( print(
f"Loaded {len(queries)} queries, {len(corpus)} corpus items, {len(qrels)} query-relevance mappings" f"Loaded {len(queries)} queries, {len(corpus)} corpus items, {len(qrels)} query-relevance mappings"
@@ -237,8 +234,8 @@ def evaluate_task(
raise ValueError(f"Unknown task: {task_name}. Available: {list(VIDORE_V1_TASKS.keys())}") raise ValueError(f"Unknown task: {task_name}. Available: {list(VIDORE_V1_TASKS.keys())}")
task_config = VIDORE_V1_TASKS[task_name] task_config = VIDORE_V1_TASKS[task_name]
dataset_path = str(task_config["dataset_path"]) dataset_path = task_config["dataset_path"]
revision = str(task_config["revision"]) revision = task_config["revision"]
# Load data # Load data
corpus, queries, qrels = load_vidore_v1_data( corpus, queries, qrels = load_vidore_v1_data(
@@ -289,7 +286,7 @@ def evaluate_task(
) )
# Search queries # Search queries
task_prompt = cast(Optional[dict[str, str]], task_config.get("prompt")) task_prompt = task_config.get("prompt")
results = evaluator.search_queries( results = evaluator.search_queries(
queries=queries, queries=queries,
corpus_ids=corpus_ids_ordered, corpus_ids=corpus_ids_ordered,

View File

@@ -25,9 +25,9 @@ Usage:
import argparse import argparse
import json import json
import os import os
from typing import Any, Optional, cast from typing import Optional
from datasets import Dataset, load_dataset from datasets import load_dataset
from leann_multi_vector import ( from leann_multi_vector import (
ViDoReBenchmarkEvaluator, ViDoReBenchmarkEvaluator,
_ensure_repo_paths_importable, _ensure_repo_paths_importable,
@@ -91,8 +91,8 @@ def load_vidore_v2_data(
""" """
print(f"Loading dataset: {dataset_path} (split={split}, language={language})") print(f"Loading dataset: {dataset_path} (split={split}, language={language})")
# Load queries - cast to Dataset since we know split returns Dataset not DatasetDict # Load queries
query_ds = cast(Dataset, load_dataset(dataset_path, "queries", split=split, revision=revision)) query_ds = load_dataset(dataset_path, "queries", split=split, revision=revision)
# Check if dataset has language field before filtering # Check if dataset has language field before filtering
has_language_field = len(query_ds) > 0 and "language" in query_ds.column_names has_language_field = len(query_ds) > 0 and "language" in query_ds.column_names
@@ -112,9 +112,8 @@ def load_vidore_v2_data(
if len(query_ds_filtered) == 0: if len(query_ds_filtered) == 0:
# Try to get a sample to see actual language values # Try to get a sample to see actual language values
try: try:
sample_ds = cast( sample_ds = load_dataset(
Dataset, dataset_path, "queries", split=split, revision=revision
load_dataset(dataset_path, "queries", split=split, revision=revision),
) )
if len(sample_ds) > 0 and "language" in sample_ds.column_names: if len(sample_ds) > 0 and "language" in sample_ds.column_names:
sample_langs = set(sample_ds["language"]) sample_langs = set(sample_ds["language"])
@@ -127,40 +126,37 @@ def load_vidore_v2_data(
) )
query_ds = query_ds_filtered query_ds = query_ds_filtered
queries: dict[str, str] = {} queries = {}
for row in query_ds: for row in query_ds:
row_dict = cast(dict[str, Any], row) query_id = f"query-{split}-{row['query-id']}"
query_id = f"query-{split}-{row_dict['query-id']}" queries[query_id] = row["query"]
queries[query_id] = row_dict["query"]
# Load corpus (images) - cast to Dataset # Load corpus (images)
corpus_ds = cast(Dataset, load_dataset(dataset_path, "corpus", split=split, revision=revision)) corpus_ds = load_dataset(dataset_path, "corpus", split=split, revision=revision)
corpus: dict[str, Any] = {} corpus = {}
for row in corpus_ds: for row in corpus_ds:
row_dict = cast(dict[str, Any], row) corpus_id = f"corpus-{split}-{row['corpus-id']}"
corpus_id = f"corpus-{split}-{row_dict['corpus-id']}"
# Extract image from the dataset row # Extract image from the dataset row
if "image" in row_dict: if "image" in row:
corpus[corpus_id] = row_dict["image"] corpus[corpus_id] = row["image"]
elif "page_image" in row_dict: elif "page_image" in row:
corpus[corpus_id] = row_dict["page_image"] corpus[corpus_id] = row["page_image"]
else: else:
raise ValueError( raise ValueError(
f"No image field found in corpus. Available fields: {list(row_dict.keys())}" f"No image field found in corpus. Available fields: {list(row.keys())}"
) )
# Load qrels (relevance judgments) - cast to Dataset # Load qrels (relevance judgments)
qrels_ds = cast(Dataset, load_dataset(dataset_path, "qrels", split=split, revision=revision)) qrels_ds = load_dataset(dataset_path, "qrels", split=split, revision=revision)
qrels: dict[str, dict[str, int]] = {} qrels = {}
for row in qrels_ds: for row in qrels_ds:
row_dict = cast(dict[str, Any], row) query_id = f"query-{split}-{row['query-id']}"
query_id = f"query-{split}-{row_dict['query-id']}" corpus_id = f"corpus-{split}-{row['corpus-id']}"
corpus_id = f"corpus-{split}-{row_dict['corpus-id']}"
if query_id not in qrels: if query_id not in qrels:
qrels[query_id] = {} qrels[query_id] = {}
qrels[query_id][corpus_id] = int(row_dict["score"]) qrels[query_id][corpus_id] = int(row["score"])
print( print(
f"Loaded {len(queries)} queries, {len(corpus)} corpus items, {len(qrels)} query-relevance mappings" f"Loaded {len(queries)} queries, {len(corpus)} corpus items, {len(qrels)} query-relevance mappings"
@@ -208,13 +204,13 @@ def evaluate_task(
raise ValueError(f"Unknown task: {task_name}. Available: {list(VIDORE_V2_TASKS.keys())}") raise ValueError(f"Unknown task: {task_name}. Available: {list(VIDORE_V2_TASKS.keys())}")
task_config = VIDORE_V2_TASKS[task_name] task_config = VIDORE_V2_TASKS[task_name]
dataset_path = str(task_config["dataset_path"]) dataset_path = task_config["dataset_path"]
revision = str(task_config["revision"]) revision = task_config["revision"]
# Determine language # Determine language
if language is None: if language is None:
# Use first language if multiple available # Use first language if multiple available
languages = cast(Optional[list[str]], task_config.get("languages")) languages = task_config.get("languages")
if languages is None: if languages is None:
# Task doesn't support language filtering (e.g., Vidore2ESGReportsHLRetrieval) # Task doesn't support language filtering (e.g., Vidore2ESGReportsHLRetrieval)
language = None language = None
@@ -273,7 +269,7 @@ def evaluate_task(
) )
# Search queries # Search queries
task_prompt = cast(Optional[dict[str, str]], task_config.get("prompt")) task_prompt = task_config.get("prompt")
results = evaluator.search_queries( results = evaluator.search_queries(
queries=queries, queries=queries,
corpus_ids=corpus_ids_ordered, corpus_ids=corpus_ids_ordered,

View File

@@ -177,9 +177,7 @@ class SlackMCPReader:
break break
# If we get here, all retries failed or it's not a retryable error # If we get here, all retries failed or it's not a retryable error
if last_exception is not None:
raise last_exception raise last_exception
raise RuntimeError("Unexpected error: no exception captured during retry loop")
async def fetch_slack_messages( async def fetch_slack_messages(
self, channel: Optional[str] = None, limit: int = 100 self, channel: Optional[str] = None, limit: int = 100
@@ -269,10 +267,7 @@ class SlackMCPReader:
messages = json.loads(content["text"]) messages = json.loads(content["text"])
except json.JSONDecodeError: except json.JSONDecodeError:
# If not JSON, try to parse as CSV format (Slack MCP server format) # If not JSON, try to parse as CSV format (Slack MCP server format)
text_content = content.get("text", "") messages = self._parse_csv_messages(content["text"], channel)
messages = self._parse_csv_messages(
text_content if text_content else "", channel or "unknown"
)
else: else:
messages = result["content"] messages = result["content"]
else: else:

View File

@@ -11,7 +11,6 @@ Usage:
import argparse import argparse
import asyncio import asyncio
from typing import Any
from apps.base_rag_example import BaseRAGExample from apps.base_rag_example import BaseRAGExample
from apps.slack_data.slack_mcp_reader import SlackMCPReader from apps.slack_data.slack_mcp_reader import SlackMCPReader
@@ -140,7 +139,7 @@ class SlackMCPRAG(BaseRAGExample):
print("4. Try running the MCP server command directly to test it") print("4. Try running the MCP server command directly to test it")
return False return False
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load Slack messages via MCP server.""" """Load Slack messages via MCP server."""
print(f"Connecting to Slack MCP server: {args.mcp_server}") print(f"Connecting to Slack MCP server: {args.mcp_server}")
@@ -189,8 +188,7 @@ class SlackMCPRAG(BaseRAGExample):
print(sample_text) print(sample_text)
print("-" * 40) print("-" * 40)
# Convert strings to dict format expected by base class return texts
return [{"text": text, "metadata": {"source": "slack"}} for text in texts]
except Exception as e: except Exception as e:
print(f"Error loading Slack data: {e}") print(f"Error loading Slack data: {e}")

View File

@@ -11,7 +11,6 @@ Usage:
import argparse import argparse
import asyncio import asyncio
from typing import Any
from apps.base_rag_example import BaseRAGExample from apps.base_rag_example import BaseRAGExample
from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
@@ -117,7 +116,7 @@ class TwitterMCPRAG(BaseRAGExample):
print("5. Try running the MCP server command directly to test it") print("5. Try running the MCP server command directly to test it")
return False return False
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load Twitter bookmarks via MCP server.""" """Load Twitter bookmarks via MCP server."""
print(f"Connecting to Twitter MCP server: {args.mcp_server}") print(f"Connecting to Twitter MCP server: {args.mcp_server}")
@@ -157,8 +156,7 @@ class TwitterMCPRAG(BaseRAGExample):
print(sample_text) print(sample_text)
print("-" * 50) print("-" * 50)
# Convert strings to dict format expected by base class return texts
return [{"text": text, "metadata": {"source": "twitter"}} for text in texts]
except Exception as e: except Exception as e:
print(f"❌ Error loading Twitter bookmarks: {e}") print(f"❌ Error loading Twitter bookmarks: {e}")

View File

@@ -6,7 +6,6 @@ Supports WeChat chat history export and search.
import subprocess import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -92,7 +91,7 @@ class WeChatRAG(BaseRAGExample):
print(f"Export error: {e}") print(f"Export error: {e}")
return False return False
async def load_data(self, args) -> list[dict[str, Any]]: async def load_data(self, args) -> list[str]:
"""Load WeChat history and convert to text chunks.""" """Load WeChat history and convert to text chunks."""
# Initialize WeChat reader with export capabilities # Initialize WeChat reader with export capabilities
reader = WeChatHistoryReader() reader = WeChatHistoryReader()

View File

@@ -1,200 +0,0 @@
# ColQwen Integration Guide
Easy-to-use multimodal PDF retrieval with ColQwen2/ColPali models.
## Quick Start
> **🍎 Mac Users**: ColQwen is optimized for Apple Silicon with MPS acceleration for faster inference!
### 1. Install Dependencies
```bash
uv pip install colpali_engine pdf2image pillow matplotlib qwen_vl_utils einops seaborn
brew install poppler # macOS only, for PDF processing
```
### 2. Basic Usage
```bash
# Build index from PDFs
python -m apps.colqwen_rag build --pdfs ./my_papers/ --index research_papers
# Search with text queries
python -m apps.colqwen_rag search research_papers "How does attention mechanism work?"
# Interactive Q&A
python -m apps.colqwen_rag ask research_papers --interactive
```
## Commands
### Build Index
```bash
python -m apps.colqwen_rag build \
--pdfs ./pdf_directory/ \
--index my_index \
--model colqwen2 \
--pages-dir ./page_images/ # Optional: save page images
```
**Options:**
- `--pdfs`: Directory containing PDF files (or single PDF path)
- `--index`: Name for the index (required)
- `--model`: `colqwen2` (default) or `colpali`
- `--pages-dir`: Directory to save page images (optional)
### Search Index
```bash
python -m apps.colqwen_rag search my_index "your question here" --top-k 5
```
**Options:**
- `--top-k`: Number of results to return (default: 5)
- `--model`: Model used for search (should match build model)
### Interactive Q&A
```bash
python -m apps.colqwen_rag ask my_index --interactive
```
**Commands in interactive mode:**
- Type your questions naturally
- `help`: Show available commands
- `quit`/`exit`/`q`: Exit interactive mode
## 🧪 Test & Reproduce Results
Run the reproduction test for issue #119:
```bash
python test_colqwen_reproduction.py
```
This will:
1. ✅ Check dependencies
2. 📥 Download sample PDF (Attention Is All You Need paper)
3. 🏗️ Build test index
4. 🔍 Run sample queries
5. 📊 Show how to generate similarity maps
## 🎨 Advanced: Similarity Maps
For visual similarity analysis, use the existing advanced script:
```bash
cd apps/multimodal/vision-based-pdf-multi-vector/
python multi-vector-leann-similarity-map.py
```
Edit the script to customize:
- `QUERY`: Your question
- `MODEL`: "colqwen2" or "colpali"
- `USE_HF_DATASET`: Use HuggingFace dataset or local PDFs
- `SIMILARITY_MAP`: Generate heatmaps
- `ANSWER`: Enable Qwen-VL answer generation
## 🔧 How It Works
### ColQwen2 vs ColPali
- **ColQwen2** (`vidore/colqwen2-v1.0`): Latest vision-language model
- **ColPali** (`vidore/colpali-v1.2`): Proven multimodal retriever
### Architecture
1. **PDF → Images**: Convert PDF pages to images (150 DPI)
2. **Vision Encoding**: Process images with ColQwen2/ColPali
3. **Multi-Vector Index**: Build LEANN HNSW index with multiple embeddings per page
4. **Query Processing**: Encode text queries with same model
5. **Similarity Search**: Find most relevant pages/regions
6. **Visual Maps**: Generate attention heatmaps (optional)
### Device Support
- **CUDA**: Best performance with GPU acceleration
- **MPS**: Apple Silicon Mac support
- **CPU**: Fallback for any system (slower)
Auto-detection: CUDA > MPS > CPU
## 📊 Performance Tips
### For Best Performance:
```bash
# Use ColQwen2 for latest features
--model colqwen2
# Save page images for reuse
--pages-dir ./cached_pages/
# Adjust batch size based on GPU memory
# (automatically handled)
```
### For Large Document Sets:
- Process PDFs in batches
- Use SSD storage for index files
- Consider using CUDA if available
## 🔗 Related Resources
- **Fast-PLAID**: https://github.com/lightonai/fast-plaid
- **Pylate**: https://github.com/lightonai/pylate
- **ColBERT**: https://github.com/stanford-futuredata/ColBERT
- **ColPali Paper**: Vision-Language Models for Document Retrieval
- **Issue #119**: https://github.com/yichuan-w/LEANN/issues/119
## 🐛 Troubleshooting
### PDF Conversion Issues (macOS)
```bash
# Install poppler
brew install poppler
which pdfinfo && pdfinfo -v
```
### Memory Issues
- Reduce batch size (automatically handled)
- Use CPU instead of GPU: `export CUDA_VISIBLE_DEVICES=""`
- Process fewer PDFs at once
### Model Download Issues
- Ensure internet connection for first run
- Models are cached after first download
- Use HuggingFace mirrors if needed
### Import Errors
```bash
# Ensure all dependencies installed
uv pip install colpali_engine pdf2image pillow matplotlib qwen_vl_utils einops seaborn
# Check PyTorch installation
python -c "import torch; print(torch.__version__)"
```
## 💡 Examples
### Research Paper Analysis
```bash
# Index your research papers
python -m apps.colqwen_rag build --pdfs ~/Papers/AI/ --index ai_papers
# Ask research questions
python -m apps.colqwen_rag search ai_papers "What are the limitations of transformer models?"
python -m apps.colqwen_rag search ai_papers "How does BERT compare to GPT?"
```
### Document Q&A
```bash
# Index business documents
python -m apps.colqwen_rag build --pdfs ~/Documents/Reports/ --index reports
# Interactive analysis
python -m apps.colqwen_rag ask reports --interactive
```
### Visual Analysis
```bash
# Generate similarity maps for specific queries
cd apps/multimodal/vision-based-pdf-multi-vector/
# Edit multi-vector-leann-similarity-map.py with your query
python multi-vector-leann-similarity-map.py
# Check ./figures/ for generated heatmaps
```
---
**🎯 This integration makes ColQwen as easy to use as other LEANN features while maintaining the full power of multimodal document understanding!**

View File

@@ -7,7 +7,7 @@ name = "leann-core"
version = "0.3.5" version = "0.3.5"
description = "Core API and plugin system for LEANN" description = "Core API and plugin system for LEANN"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.9"
license = { text = "MIT" } license = { text = "MIT" }
# All required dependencies included # All required dependencies included

View File

@@ -239,11 +239,11 @@ def create_ast_chunks(
chunks = chunk_builder.chunkify(code_content) chunks = chunk_builder.chunkify(code_content)
for chunk in chunks: for chunk in chunks:
chunk_text: str | None = None chunk_text = None
astchunk_metadata: dict[str, Any] = {} astchunk_metadata = {}
if hasattr(chunk, "text"): if hasattr(chunk, "text"):
chunk_text = str(chunk.text) if chunk.text else None chunk_text = chunk.text
elif isinstance(chunk, str): elif isinstance(chunk, str):
chunk_text = chunk chunk_text = chunk
elif isinstance(chunk, dict): elif isinstance(chunk, dict):

View File

@@ -19,7 +19,7 @@ from .settings import (
) )
def extract_pdf_text_with_pymupdf(file_path: str) -> str | None: def extract_pdf_text_with_pymupdf(file_path: str) -> str:
"""Extract text from PDF using PyMuPDF for better quality.""" """Extract text from PDF using PyMuPDF for better quality."""
try: try:
import fitz # PyMuPDF import fitz # PyMuPDF
@@ -35,7 +35,7 @@ def extract_pdf_text_with_pymupdf(file_path: str) -> str | None:
return None return None
def extract_pdf_text_with_pdfplumber(file_path: str) -> str | None: def extract_pdf_text_with_pdfplumber(file_path: str) -> str:
"""Extract text from PDF using pdfplumber for better quality.""" """Extract text from PDF using pdfplumber for better quality."""
try: try:
import pdfplumber import pdfplumber

View File

@@ -451,8 +451,7 @@ def compute_embeddings_sentence_transformers(
# TODO: Haven't tested this yet # TODO: Haven't tested this yet
torch.set_num_threads(min(8, os.cpu_count() or 4)) torch.set_num_threads(min(8, os.cpu_count() or 4))
try: try:
# PyTorch's ContextProp type is complex; cast for type checker torch.backends.mkldnn.enabled = True
torch.backends.mkldnn.enabled = True # type: ignore[assignment]
except AttributeError: except AttributeError:
pass pass

View File

@@ -11,15 +11,14 @@ from pathlib import Path
from typing import Callable, Optional from typing import Callable, Optional
# Try to import readline with fallback for Windows # Try to import readline with fallback for Windows
HAS_READLINE = False
readline = None # type: ignore[assignment]
try: try:
import readline # type: ignore[no-redef] import readline
HAS_READLINE = True HAS_READLINE = True
except ImportError: except ImportError:
# Windows doesn't have readline by default # Windows doesn't have readline by default
pass HAS_READLINE = False
readline = None
class InteractiveSession: class InteractiveSession:

View File

@@ -7,7 +7,7 @@ operators for different data types including numbers, strings, booleans, and lis
""" """
import logging import logging
from typing import Any, Optional, Union from typing import Any, Union
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -47,7 +47,7 @@ class MetadataFilterEngine:
} }
def apply_filters( def apply_filters(
self, search_results: list[dict[str, Any]], metadata_filters: Optional[MetadataFilters] self, search_results: list[dict[str, Any]], metadata_filters: MetadataFilters
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """
Apply metadata filters to a list of search results. Apply metadata filters to a list of search results.

View File

@@ -56,9 +56,7 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
with open(meta_path, encoding="utf-8") as f: with open(meta_path, encoding="utf-8") as f:
return json.load(f) return json.load(f)
def _ensure_server_running( def _ensure_server_running(self, passages_source_file: str, port: int, **kwargs) -> int:
self, passages_source_file: str, port: Optional[int], **kwargs
) -> int:
""" """
Ensures the embedding server is running if recompute is needed. Ensures the embedding server is running if recompute is needed.
This is a helper for subclasses. This is a helper for subclasses.
@@ -83,7 +81,7 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
} }
server_started, actual_port = self.embedding_server_manager.start_server( server_started, actual_port = self.embedding_server_manager.start_server(
port=port if port is not None else 5557, port=port,
model_name=self.embedding_model, model_name=self.embedding_model,
embedding_mode=self.embedding_mode, embedding_mode=self.embedding_mode,
passages_file=passages_source_file, passages_file=passages_source_file,
@@ -100,7 +98,7 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
self, self,
query: str, query: str,
use_server_if_available: bool = True, use_server_if_available: bool = True,
zmq_port: Optional[int] = None, zmq_port: int = 5557,
query_template: Optional[str] = None, query_template: Optional[str] = None,
) -> np.ndarray: ) -> np.ndarray:
""" """

View File

@@ -7,7 +7,7 @@ name = "leann"
version = "0.3.5" version = "0.3.5"
description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!" description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.9"
license = { text = "MIT" } license = { text = "MIT" }
authors = [ authors = [
{ name = "LEANN Team" } { name = "LEANN Team" }
@@ -18,10 +18,10 @@ classifiers = [
"Intended Audience :: Developers", "Intended Audience :: Developers",
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
] ]
# Default installation: core + hnsw + diskann # Default installation: core + hnsw + diskann

View File

@@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "leann-workspace" name = "leann-workspace"
version = "0.1.0" version = "0.1.0"
requires-python = ">=3.10" requires-python = ">=3.9"
dependencies = [ dependencies = [
"leann-core", "leann-core",
@@ -157,19 +157,6 @@ exclude = ["localhost", "127.0.0.1", "example.com"]
exclude_path = [".git/", ".venv/", "__pycache__/", "third_party/"] exclude_path = [".git/", ".venv/", "__pycache__/", "third_party/"]
scheme = ["https", "http"] scheme = ["https", "http"]
[tool.ty]
# Type checking with ty (Astral's fast Python type checker)
# ty is 10-100x faster than mypy. See: https://docs.astral.sh/ty/
[tool.ty.environment]
python-version = "3.11"
extra-paths = ["apps", "packages/leann-core/src"]
[tool.ty.rules]
# Disable some noisy rules that have many false positives
possibly-missing-attribute = "ignore"
unresolved-import = "ignore" # Many optional dependencies
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = ["tests"] testpaths = ["tests"]
python_files = ["test_*.py"] python_files = ["test_*.py"]

View File

@@ -91,7 +91,7 @@ def test_large_index():
builder.build_index(index_path) builder.build_index(index_path)
searcher = LeannSearcher(index_path) searcher = LeannSearcher(index_path)
results = searcher.search("word10 word20", top_k=10) results = searcher.search(["word10 word20"], top_k=10)
assert len(results) == 10 assert len(results[0]) == 10
# Cleanup # Cleanup
searcher.cleanup() searcher.cleanup()

View File

@@ -123,7 +123,7 @@ class TestPromptTemplateStoredInEmbeddingOptions:
cli = LeannCLI() cli = LeannCLI()
# Mock load_documents to return a document so builder is created # Mock load_documents to return a document so builder is created
cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}]) # type: ignore[assignment] cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}])
parser = cli.create_parser() parser = cli.create_parser()
@@ -175,7 +175,7 @@ class TestPromptTemplateStoredInEmbeddingOptions:
cli = LeannCLI() cli = LeannCLI()
# Mock load_documents to return a document so builder is created # Mock load_documents to return a document so builder is created
cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}]) # type: ignore[assignment] cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}])
parser = cli.create_parser() parser = cli.create_parser()
@@ -230,7 +230,7 @@ class TestPromptTemplateStoredInEmbeddingOptions:
cli = LeannCLI() cli = LeannCLI()
# Mock load_documents to return a document so builder is created # Mock load_documents to return a document so builder is created
cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}]) # type: ignore[assignment] cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}])
parser = cli.create_parser() parser = cli.create_parser()
@@ -307,7 +307,7 @@ class TestPromptTemplateStoredInEmbeddingOptions:
cli = LeannCLI() cli = LeannCLI()
# Mock load_documents to return a document so builder is created # Mock load_documents to return a document so builder is created
cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}]) # type: ignore[assignment] cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}])
parser = cli.create_parser() parser = cli.create_parser()
@@ -376,7 +376,7 @@ class TestPromptTemplateStoredInEmbeddingOptions:
cli = LeannCLI() cli = LeannCLI()
# Mock load_documents to return a document so builder is created # Mock load_documents to return a document so builder is created
cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}]) # type: ignore[assignment] cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}])
parser = cli.create_parser() parser = cli.create_parser()
@@ -432,7 +432,7 @@ class TestPromptTemplateFlowsToComputeEmbeddings:
cli = LeannCLI() cli = LeannCLI()
# Mock load_documents to return a simple document # Mock load_documents to return a simple document
cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}]) # type: ignore[assignment] cli.load_documents = Mock(return_value=[{"text": "test content", "metadata": {}}])
parser = cli.create_parser() parser = cli.create_parser()

View File

@@ -67,7 +67,7 @@ def check_lmstudio_available() -> bool:
return False return False
def get_lmstudio_first_model() -> str | None: def get_lmstudio_first_model() -> str:
"""Get the first available model from LM Studio.""" """Get the first available model from LM Studio."""
try: try:
response = requests.get("http://localhost:1234/v1/models", timeout=5.0) response = requests.get("http://localhost:1234/v1/models", timeout=5.0)
@@ -91,7 +91,6 @@ class TestPromptTemplateOpenAI:
model_name = get_lmstudio_first_model() model_name = get_lmstudio_first_model()
if not model_name: if not model_name:
pytest.skip("No models loaded in LM Studio") pytest.skip("No models loaded in LM Studio")
assert model_name is not None # Type narrowing for type checker
texts = ["artificial intelligence", "machine learning"] texts = ["artificial intelligence", "machine learning"]
prompt_template = "search_query: " prompt_template = "search_query: "
@@ -121,7 +120,6 @@ class TestPromptTemplateOpenAI:
model_name = get_lmstudio_first_model() model_name = get_lmstudio_first_model()
if not model_name: if not model_name:
pytest.skip("No models loaded in LM Studio") pytest.skip("No models loaded in LM Studio")
assert model_name is not None # Type narrowing for type checker
text = "machine learning" text = "machine learning"
base_url = "http://localhost:1234/v1" base_url = "http://localhost:1234/v1"
@@ -273,7 +271,6 @@ class TestLMStudioSDK:
model_name = get_lmstudio_first_model() model_name = get_lmstudio_first_model()
if not model_name: if not model_name:
pytest.skip("No models loaded in LM Studio") pytest.skip("No models loaded in LM Studio")
assert model_name is not None # Type narrowing for type checker
try: try:
from leann.embedding_compute import _query_lmstudio_context_limit from leann.embedding_compute import _query_lmstudio_context_limit

View File

@@ -581,18 +581,7 @@ class TestQueryTemplateApplicationInComputeEmbedding:
# Create a concrete implementation for testing # Create a concrete implementation for testing
class TestSearcher(BaseSearcher): class TestSearcher(BaseSearcher):
def search( def search(self, query_vectors, top_k, complexity, beam_width=1, **kwargs):
self,
query,
top_k,
complexity=64,
beam_width=1,
prune_ratio=0.0,
recompute_embeddings=False,
pruning_strategy="global",
zmq_port=None,
**kwargs,
):
return {"labels": [], "distances": []} return {"labels": [], "distances": []}
searcher = object.__new__(TestSearcher) searcher = object.__new__(TestSearcher)
@@ -636,18 +625,7 @@ class TestQueryTemplateApplicationInComputeEmbedding:
# Create a concrete implementation for testing # Create a concrete implementation for testing
class TestSearcher(BaseSearcher): class TestSearcher(BaseSearcher):
def search( def search(self, query_vectors, top_k, complexity, beam_width=1, **kwargs):
self,
query,
top_k,
complexity=64,
beam_width=1,
prune_ratio=0.0,
recompute_embeddings=False,
pruning_strategy="global",
zmq_port=None,
**kwargs,
):
return {"labels": [], "distances": []} return {"labels": [], "distances": []}
searcher = object.__new__(TestSearcher) searcher = object.__new__(TestSearcher)
@@ -693,18 +671,7 @@ class TestQueryTemplateApplicationInComputeEmbedding:
from leann.searcher_base import BaseSearcher from leann.searcher_base import BaseSearcher
class TestSearcher(BaseSearcher): class TestSearcher(BaseSearcher):
def search( def search(self, query_vectors, top_k, complexity, beam_width=1, **kwargs):
self,
query,
top_k,
complexity=64,
beam_width=1,
prune_ratio=0.0,
recompute_embeddings=False,
pruning_strategy="global",
zmq_port=None,
**kwargs,
):
return {"labels": [], "distances": []} return {"labels": [], "distances": []}
searcher = object.__new__(TestSearcher) searcher = object.__new__(TestSearcher)
@@ -743,18 +710,7 @@ class TestQueryTemplateApplicationInComputeEmbedding:
from leann.searcher_base import BaseSearcher from leann.searcher_base import BaseSearcher
class TestSearcher(BaseSearcher): class TestSearcher(BaseSearcher):
def search( def search(self, query_vectors, top_k, complexity, beam_width=1, **kwargs):
self,
query,
top_k,
complexity=64,
beam_width=1,
prune_ratio=0.0,
recompute_embeddings=False,
pruning_strategy="global",
zmq_port=None,
**kwargs,
):
return {"labels": [], "distances": []} return {"labels": [], "distances": []}
searcher = object.__new__(TestSearcher) searcher = object.__new__(TestSearcher)
@@ -818,18 +774,7 @@ class TestQueryTemplateApplicationInComputeEmbedding:
from leann.searcher_base import BaseSearcher from leann.searcher_base import BaseSearcher
class TestSearcher(BaseSearcher): class TestSearcher(BaseSearcher):
def search( def search(self, query_vectors, top_k, complexity, beam_width=1, **kwargs):
self,
query,
top_k,
complexity=64,
beam_width=1,
prune_ratio=0.0,
recompute_embeddings=False,
pruning_strategy="global",
zmq_port=None,
**kwargs,
):
return {"labels": [], "distances": []} return {"labels": [], "distances": []}
searcher = object.__new__(TestSearcher) searcher = object.__new__(TestSearcher)

View File

@@ -97,17 +97,17 @@ def test_backend_options():
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
# Use smaller model in CI to avoid memory issues # Use smaller model in CI to avoid memory issues
is_ci = os.environ.get("CI") == "true" if os.environ.get("CI") == "true":
embedding_model = ( model_args = {
"sentence-transformers/all-MiniLM-L6-v2" if is_ci else "facebook/contriever" "embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
) "dimensions": 384,
dimensions = 384 if is_ci else None }
else:
model_args = {}
# Test HNSW backend (as shown in README) # Test HNSW backend (as shown in README)
hnsw_path = str(Path(temp_dir) / "test_hnsw.leann") hnsw_path = str(Path(temp_dir) / "test_hnsw.leann")
builder_hnsw = LeannBuilder( builder_hnsw = LeannBuilder(backend_name="hnsw", **model_args)
backend_name="hnsw", embedding_model=embedding_model, dimensions=dimensions
)
builder_hnsw.add_text("Test document for HNSW backend") builder_hnsw.add_text("Test document for HNSW backend")
builder_hnsw.build_index(hnsw_path) builder_hnsw.build_index(hnsw_path)
assert Path(hnsw_path).parent.exists() assert Path(hnsw_path).parent.exists()
@@ -115,9 +115,7 @@ def test_backend_options():
# Test DiskANN backend (mentioned as available option) # Test DiskANN backend (mentioned as available option)
diskann_path = str(Path(temp_dir) / "test_diskann.leann") diskann_path = str(Path(temp_dir) / "test_diskann.leann")
builder_diskann = LeannBuilder( builder_diskann = LeannBuilder(backend_name="diskann", **model_args)
backend_name="diskann", embedding_model=embedding_model, dimensions=dimensions
)
builder_diskann.add_text("Test document for DiskANN backend") builder_diskann.add_text("Test document for DiskANN backend")
builder_diskann.build_index(diskann_path) builder_diskann.build_index(diskann_path)
assert Path(diskann_path).parent.exists() assert Path(diskann_path).parent.exists()

1163
uv.lock generated
View File

File diff suppressed because it is too large Load Diff