Compare commits

..

1 Commits

Author SHA1 Message Date
yichuan520030910320
cd343e1d7a style: apply ruff format 2025-08-15 17:44:50 -07:00
17 changed files with 107 additions and 632 deletions

1
.gitattributes vendored Normal file
View File

@@ -0,0 +1 @@
paper_plot/data/big_graph_degree_data.npz filter=lfs diff=lfs merge=lfs -text

1
.gitignore vendored
View File

@@ -18,7 +18,6 @@ demo/experiment_results/**/*.json
*.eml
*.emlx
*.json
!.vscode/*.json
*.sh
*.txt
!CMakeLists.txt

View File

@@ -1,5 +0,0 @@
{
"recommendations": [
"charliermarsh.ruff",
]
}

22
.vscode/settings.json vendored
View File

@@ -1,22 +0,0 @@
{
"python.defaultInterpreterPath": ".venv/bin/python",
"python.terminal.activateEnvironment": true,
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit"
},
"editor.insertSpaces": true,
"editor.tabSize": 4
},
"ruff.enable": true,
"files.watcherExclude": {
"**/.venv/**": true,
"**/__pycache__/**": true,
"**/*.egg-info/**": true,
"**/build/**": true,
"**/dist/**": true
}
}

View File

@@ -5,7 +5,7 @@
<p align="center">
<img src="https://img.shields.io/badge/Python-3.9%20%7C%203.10%20%7C%203.11%20%7C%203.12%20%7C%203.13-blue.svg" alt="Python Versions">
<img src="https://github.com/yichuan-w/LEANN/actions/workflows/build-and-publish.yml/badge.svg" alt="CI Status">
<img src="https://img.shields.io/badge/Platform-Ubuntu%20%26%20Arch%20%26%20WSL%20%7C%20macOS%20(ARM64%2FIntel)-lightgrey" alt="Platform">
<img src="https://img.shields.io/badge/Platform-Ubuntu%20%7C%20macOS%20(ARM64%2FIntel)-lightgrey" alt="Platform">
<img src="https://img.shields.io/badge/License-MIT-green.svg" alt="MIT License">
<img src="https://img.shields.io/badge/MCP-Native%20Integration-blue" alt="MCP Integration">
</p>
@@ -94,9 +94,7 @@ CC=$(brew --prefix llvm)/bin/clang CXX=$(brew --prefix llvm)/bin/clang++ uv sync
**Linux:**
```bash
# Ubuntu/Debian (For Arch Linux: sudo pacman -S blas lapack openblas libaio boost protobuf abseil-cpp zeromq)
sudo apt-get update && sudo apt-get install -y libomp-dev libboost-all-dev protobuf-compiler libabsl-dev libmkl-full-dev libaio-dev libzmq3-dev
sudo apt-get install libomp-dev libboost-all-dev protobuf-compiler libabsl-dev libmkl-full-dev libaio-dev libzmq3-dev
uv sync
```
@@ -428,21 +426,21 @@ Once the index is built, you can ask questions like:
**The future of code assistance is here.** Transform your development workflow with LEANN's native MCP integration for Claude Code. Index your entire codebase and get intelligent code assistance directly in your IDE.
**Key features:**
- 🔍 **Semantic code search** across your entire project, fully local index and lightweight
- 🔍 **Semantic code search** across your entire project
- 📚 **Context-aware assistance** for debugging and development
- 🚀 **Zero-config setup** with automatic language detection
```bash
# Install LEANN globally for MCP integration
uv tool install leann-core --with leann
claude mcp add --scope user leann-server -- leann_mcp
uv tool install leann-core
# Setup is automatic - just start using Claude Code!
```
Try our fully agentic pipeline with auto query rewriting, semantic search planning, and more:
![LEANN MCP Integration](assets/mcp_leann.png)
**🔥 Ready to supercharge your coding?** [Complete Setup Guide →](packages/leann-mcp/README.md)
**Ready to supercharge your coding?** [Complete Setup Guide →](packages/leann-mcp/README.md)
## 🖥️ Command Line Interface
@@ -459,8 +457,7 @@ leann --help
**To make it globally available:**
```bash
# Install the LEANN CLI globally using uv tool
uv tool install leann-core --with leann
uv tool install leann-core
# Now you can use leann from anywhere without activating venv
leann --help
@@ -484,9 +481,6 @@ leann ask my-docs --interactive
# List all your indexes
leann list
# Remove an index
leann remove my-docs
```
**Key CLI features:**
@@ -499,7 +493,7 @@ leann remove my-docs
<details>
<summary><strong>📋 Click to expand: Complete CLI Reference</strong></summary>
You can use `leann --help`, or `leann build --help`, `leann search --help`, `leann ask --help`, `leann list --help`, `leann remove --help` to get the complete CLI reference.
You can use `leann --help`, or `leann build --help`, `leann search --help`, `leann ask --help` to get the complete CLI reference.
**Build Command:**
```bash
@@ -537,31 +531,6 @@ Options:
--top-k N Retrieval count (default: 20)
```
**List Command:**
```bash
leann list
# Lists all indexes across all projects with status indicators:
# ✅ - Index is complete and ready to use
# ❌ - Index is incomplete or corrupted
# 📁 - CLI-created index (in .leann/indexes/)
# 📄 - App-created index (*.leann.meta.json files)
```
**Remove Command:**
```bash
leann remove INDEX_NAME [OPTIONS]
Options:
--force, -f Force removal without confirmation
# Smart removal: automatically finds and safely removes indexes
# - Shows all matching indexes across projects
# - Requires confirmation for cross-project removal
# - Interactive selection when multiple matches found
# - Supports both CLI and app-created indexes
```
</details>
## 🏗️ Architecture & How It Works

View File

@@ -10,7 +10,6 @@ from typing import Any
import dotenv
from leann.api import LeannBuilder, LeannChat
from leann.registry import register_project_directory
from llama_index.core.node_parser import SentenceSplitter
dotenv.load_dotenv()
@@ -215,11 +214,6 @@ class BaseRAGExample(ABC):
builder.build_index(index_path)
print(f"Index saved to: {index_path}")
# Register project directory so leann list can discover this index
# The index is saved as args.index_dir/index_name.leann
# We want to register the current working directory where the app is run
register_project_directory(Path.cwd())
return index_path
async def run_interactive_chat(self, args, index_path: str):

View File

@@ -4,8 +4,8 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-diskann"
version = "0.3.0"
dependencies = ["leann-core==0.3.0", "numpy", "protobuf>=3.19.0"]
version = "0.2.9"
dependencies = ["leann-core==0.2.9", "numpy", "protobuf>=3.19.0"]
[tool.scikit-build]
# Key: simplified CMake path

View File

@@ -6,10 +6,10 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-hnsw"
version = "0.3.0"
version = "0.2.9"
description = "Custom-built HNSW (Faiss) backend for the Leann toolkit."
dependencies = [
"leann-core==0.3.0",
"leann-core==0.2.9",
"numpy",
"pyzmq>=23.0.0",
"msgpack>=1.0.0",

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann-core"
version = "0.3.0"
version = "0.2.9"
description = "Core API and plugin system for LEANN"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -46,7 +46,6 @@ def compute_embeddings(
- "sentence-transformers": Use sentence-transformers library (default)
- "mlx": Use MLX backend for Apple Silicon
- "openai": Use OpenAI embedding API
- "gemini": Use Google Gemini embedding API
use_server: Whether to use embedding server (True for search, False for build)
Returns:
@@ -307,23 +306,6 @@ class LeannBuilder:
def build_index(self, index_path: str):
if not self.chunks:
raise ValueError("No chunks added.")
# Filter out invalid/empty text chunks early to keep passage and embedding counts aligned
valid_chunks: list[dict[str, Any]] = []
skipped = 0
for chunk in self.chunks:
text = chunk.get("text", "")
if isinstance(text, str) and text.strip():
valid_chunks.append(chunk)
else:
skipped += 1
if skipped > 0:
print(
f"Warning: Skipping {skipped} empty/invalid text chunk(s). Processing {len(valid_chunks)} valid chunks"
)
self.chunks = valid_chunks
if not self.chunks:
raise ValueError("All provided chunks are empty or invalid. Nothing to index.")
if self.dimensions is None:
self.dimensions = len(
compute_embeddings(
@@ -614,7 +596,7 @@ class LeannSearcher:
zmq_port=zmq_port,
)
# logger.info(f" Generated embedding shape: {query_embedding.shape}")
# time.time() - start_time
time.time() - start_time
# logger.info(f" Embedding time: {embedding_time} seconds")
start_time = time.time()
@@ -680,9 +662,8 @@ class LeannSearcher:
This method should be called after you're done using the searcher,
especially in test environments or batch processing scenarios.
"""
backend = getattr(self.backend_impl, "embedding_server_manager", None)
if backend is not None:
backend.stop_server()
if hasattr(self.backend_impl, "embedding_server_manager"):
self.backend_impl.embedding_server_manager.stop_server()
# Enable automatic cleanup patterns
def __enter__(self):

View File

@@ -680,60 +680,6 @@ class HFChat(LLMInterface):
return response.strip()
class GeminiChat(LLMInterface):
"""LLM interface for Google Gemini models."""
def __init__(self, model: str = "gemini-2.5-flash", api_key: Optional[str] = None):
self.model = model
self.api_key = api_key or os.getenv("GEMINI_API_KEY")
if not self.api_key:
raise ValueError(
"Gemini API key is required. Set GEMINI_API_KEY environment variable or pass api_key parameter."
)
logger.info(f"Initializing Gemini Chat with model='{model}'")
try:
import google.genai as genai
self.client = genai.Client(api_key=self.api_key)
except ImportError:
raise ImportError(
"The 'google-genai' library is required for Gemini models. Please install it with 'uv pip install google-genai'."
)
def ask(self, prompt: str, **kwargs) -> str:
logger.info(f"Sending request to Gemini with model {self.model}")
try:
from google.genai.types import GenerateContentConfig
generation_config = GenerateContentConfig(
temperature=kwargs.get("temperature", 0.7),
max_output_tokens=kwargs.get("max_tokens", 1000),
)
# Handle top_p parameter
if "top_p" in kwargs:
generation_config.top_p = kwargs["top_p"]
response = self.client.models.generate_content(
model=self.model,
contents=prompt,
config=generation_config,
)
# Handle potential None response text
response_text = response.text
if response_text is None:
logger.warning("Gemini returned None response text")
return ""
return response_text.strip()
except Exception as e:
logger.error(f"Error communicating with Gemini: {e}")
return f"Error: Could not get a response from Gemini. Details: {e}"
class OpenAIChat(LLMInterface):
"""LLM interface for OpenAI models."""
@@ -847,8 +793,6 @@ def get_llm(llm_config: Optional[dict[str, Any]] = None) -> LLMInterface:
return HFChat(model_name=model or "deepseek-ai/deepseek-llm-7b-chat")
elif llm_type == "openai":
return OpenAIChat(model=model or "gpt-4o", api_key=llm_config.get("api_key"))
elif llm_type == "gemini":
return GeminiChat(model=model or "gemini-2.5-flash", api_key=llm_config.get("api_key"))
elif llm_type == "simulated":
return SimulatedChat()
else:

View File

@@ -1,14 +1,13 @@
import argparse
import asyncio
from pathlib import Path
from typing import Optional, Union
from typing import Union
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from tqdm import tqdm
from .api import LeannBuilder, LeannChat, LeannSearcher
from .registry import register_project_directory
def extract_pdf_text_with_pymupdf(file_path: str) -> str:
@@ -85,7 +84,6 @@ Examples:
leann search my-docs "query" # Search in my-docs index
leann ask my-docs "question" # Ask my-docs index
leann list # List all stored indexes
leann remove my-docs # Remove an index (local first, then global)
""",
)
@@ -253,18 +251,35 @@ Examples:
# List command
subparsers.add_parser("list", help="List all indexes")
# Remove command
remove_parser = subparsers.add_parser("remove", help="Remove an index")
remove_parser.add_argument("index_name", help="Index name to remove")
remove_parser.add_argument(
"--force", "-f", action="store_true", help="Force removal without confirmation"
)
return parser
def register_project_dir(self):
"""Register current project directory in global registry"""
register_project_directory()
global_registry = Path.home() / ".leann" / "projects.json"
global_registry.parent.mkdir(exist_ok=True)
current_dir = str(Path.cwd())
# Load existing registry
projects = []
if global_registry.exists():
try:
import json
with open(global_registry) as f:
projects = json.load(f)
except Exception:
projects = []
# Add current directory if not already present
if current_dir not in projects:
projects.append(current_dir)
# Save registry
import json
with open(global_registry, "w") as f:
json.dump(projects, f, indent=2)
def _build_gitignore_parser(self, docs_dir: str):
"""Build gitignore parser using gitignore-parser library."""
@@ -324,6 +339,8 @@ Examples:
return False
def list_indexes(self):
print("Stored LEANN indexes:")
# Get all project directories with .leann
global_registry = Path.home() / ".leann" / "projects.json"
all_projects = []
@@ -349,320 +366,55 @@ Examples:
if (current_path / ".leann" / "indexes").exists() and current_path not in valid_projects:
valid_projects.append(current_path)
# Separate current and other projects
other_projects = []
for project_path in valid_projects:
if project_path != current_path:
other_projects.append(project_path)
print("📚 LEANN Indexes")
print("=" * 50)
if not valid_projects:
print(
"No indexes found. Use 'leann build <name> --docs <dir> [<dir2> ...]' to create one."
)
return
total_indexes = 0
current_indexes_count = 0
current_dir = Path.cwd()
# Show current project first (most important)
print("\n🏠 Current Project")
print(f" {current_path}")
print(" " + "" * 45)
current_indexes = self._discover_indexes_in_project(current_path)
if current_indexes:
for idx in current_indexes:
total_indexes += 1
current_indexes_count += 1
type_icon = "📁" if idx["type"] == "cli" else "📄"
print(f" {current_indexes_count}. {type_icon} {idx['name']} {idx['status']}")
if idx["size_mb"] > 0:
print(f" 📦 Size: {idx['size_mb']:.1f} MB")
else:
print(" 📭 No indexes in current project")
# Show other projects (reference information)
if other_projects:
print("\n\n🗂️ Other Projects")
print(" " + "" * 45)
for project_path in other_projects:
project_indexes = self._discover_indexes_in_project(project_path)
if not project_indexes:
continue
print(f"\n 📂 {project_path.name}")
print(f" {project_path}")
for idx in project_indexes:
total_indexes += 1
type_icon = "📁" if idx["type"] == "cli" else "📄"
print(f"{type_icon} {idx['name']} {idx['status']}")
if idx["size_mb"] > 0:
print(f" 📦 {idx['size_mb']:.1f} MB")
# Summary and usage info
print("\n" + "=" * 50)
if total_indexes == 0:
print("💡 Get started:")
print(" leann build my-docs --docs ./documents")
else:
projects_count = len(
[
p
for p in valid_projects
if (p / ".leann" / "indexes").exists()
and list((p / ".leann" / "indexes").iterdir())
]
)
print(f"📊 Total: {total_indexes} indexes across {projects_count} projects")
if current_indexes_count > 0:
print("\n💫 Quick start (current project):")
# Get first index from current project for example
current_indexes_dir = current_path / ".leann" / "indexes"
if current_indexes_dir.exists():
current_index_dirs = [d for d in current_indexes_dir.iterdir() if d.is_dir()]
if current_index_dirs:
example_name = current_index_dirs[0].name
print(f' leann search {example_name} "your query"')
print(f" leann ask {example_name} --interactive")
else:
print("\n💡 Create your first index:")
print(" leann build my-docs --docs ./documents")
def _discover_indexes_in_project(self, project_path: Path):
"""Discover all indexes in a project directory (both CLI and apps formats)"""
indexes = []
# 1. CLI format: .leann/indexes/index_name/
cli_indexes_dir = project_path / ".leann" / "indexes"
if cli_indexes_dir.exists():
for index_dir in cli_indexes_dir.iterdir():
if index_dir.is_dir():
meta_file = index_dir / "documents.leann.meta.json"
status = "" if meta_file.exists() else ""
size_mb = 0
if meta_file.exists():
try:
size_mb = sum(
f.stat().st_size for f in index_dir.iterdir() if f.is_file()
) / (1024 * 1024)
except (OSError, PermissionError):
pass
indexes.append(
{
"name": index_dir.name,
"type": "cli",
"status": status,
"size_mb": size_mb,
"path": index_dir,
}
)
# 2. Apps format: *.leann.meta.json files anywhere in the project
for meta_file in project_path.rglob("*.leann.meta.json"):
if meta_file.is_file():
# Extract index name from filename (remove .leann.meta.json extension)
index_name = meta_file.name.replace(".leann.meta.json", "")
# Apps indexes are considered complete if the .leann.meta.json file exists
status = ""
# Calculate total size of all related files
size_mb = 0
try:
index_dir = meta_file.parent
for related_file in index_dir.glob(f"{index_name}.leann*"):
size_mb += related_file.stat().st_size / (1024 * 1024)
except (OSError, PermissionError):
pass
indexes.append(
{
"name": index_name,
"type": "app",
"status": status,
"size_mb": size_mb,
"path": meta_file,
}
)
return indexes
def remove_index(self, index_name: str, force: bool = False):
"""Safely remove an index - always show all matches for transparency"""
# Always do a comprehensive search for safety
print(f"🔍 Searching for all indexes named '{index_name}'...")
all_matches = self._find_all_matching_indexes(index_name)
if not all_matches:
print(f"❌ Index '{index_name}' not found in any project.")
return False
if len(all_matches) == 1:
return self._remove_single_match(all_matches[0], index_name, force)
else:
return self._remove_from_multiple_matches(all_matches, index_name, force)
def _find_all_matching_indexes(self, index_name: str):
"""Find all indexes with the given name across all projects"""
matches = []
# Get all registered projects
global_registry = Path.home() / ".leann" / "projects.json"
all_projects = []
if global_registry.exists():
try:
import json
with open(global_registry) as f:
all_projects = json.load(f)
except Exception:
pass
# Always include current project
current_path = Path.cwd()
if str(current_path) not in all_projects:
all_projects.append(str(current_path))
# Search across all projects
for project_dir in all_projects:
project_path = Path(project_dir)
if not project_path.exists():
for project_path in valid_projects:
indexes_dir = project_path / ".leann" / "indexes"
if not indexes_dir.exists():
continue
index_dir = project_path / ".leann" / "indexes" / index_name
if index_dir.exists():
is_current = project_path == current_path
matches.append(
{"project_path": project_path, "index_dir": index_dir, "is_current": is_current}
)
index_dirs = [d for d in indexes_dir.iterdir() if d.is_dir()]
if not index_dirs:
continue
# Sort: current project first, then by project name
matches.sort(key=lambda x: (not x["is_current"], x["project_path"].name))
return matches
def _remove_single_match(self, match, index_name: str, force: bool):
"""Handle removal when only one match is found"""
project_path = match["project_path"]
index_dir = match["index_dir"]
is_current = match["is_current"]
if is_current:
location_info = "current project"
emoji = "🏠"
else:
location_info = f"other project '{project_path.name}'"
emoji = "📂"
print(f"✅ Found 1 index named '{index_name}':")
print(f" {emoji} Location: {location_info}")
print(f" 📍 Path: {project_path}")
if not force:
if not is_current:
print("\n⚠️ CROSS-PROJECT REMOVAL!")
print(" This will delete the index from another project.")
response = input(f" ❓ Confirm removal from {location_info}? (y/N): ").strip().lower()
if response not in ["y", "yes"]:
print(" ❌ Removal cancelled.")
return False
return self._delete_index_directory(
index_dir, index_name, project_path if not is_current else None
)
def _remove_from_multiple_matches(self, matches, index_name: str, force: bool):
"""Handle removal when multiple matches are found"""
print(f"⚠️ Found {len(matches)} indexes named '{index_name}':")
print(" " + "" * 50)
for i, match in enumerate(matches, 1):
project_path = match["project_path"]
is_current = match["is_current"]
if is_current:
print(f" {i}. 🏠 Current project")
print(f" 📍 {project_path}")
# Show project header
if project_path == current_dir:
print(f"\n📁 Current project ({project_path}):")
else:
print(f" {i}. 📂 {project_path.name}")
print(f" 📍 {project_path}")
print(f"\n📂 {project_path}:")
# Show size info
try:
size_mb = sum(
f.stat().st_size for f in match["index_dir"].iterdir() if f.is_file()
) / (1024 * 1024)
print(f" 📦 Size: {size_mb:.1f} MB")
except (OSError, PermissionError):
pass
for index_dir in index_dirs:
total_indexes += 1
index_name = index_dir.name
meta_file = index_dir / "documents.leann.meta.json"
status = "" if meta_file.exists() else ""
print(" " + "" * 50)
print(f" {total_indexes}. {index_name} [{status}]")
if status == "":
size_mb = sum(f.stat().st_size for f in index_dir.iterdir() if f.is_file()) / (
1024 * 1024
)
print(f" Size: {size_mb:.1f} MB")
if force:
print(" ❌ Multiple matches found, but --force specified.")
print(" Please run without --force to choose which one to remove.")
return False
if total_indexes > 0:
print(f"\nTotal: {total_indexes} indexes across {len(valid_projects)} projects")
print("\nUsage (current project only):")
try:
choice = input(
f" ❓ Which one to remove? (1-{len(matches)}, or 'c' to cancel): "
).strip()
if choice.lower() == "c":
print(" ❌ Removal cancelled.")
return False
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(matches):
selected_match = matches[choice_idx]
project_path = selected_match["project_path"]
index_dir = selected_match["index_dir"]
is_current = selected_match["is_current"]
location = "current project" if is_current else f"'{project_path.name}' project"
print(f" 🎯 Selected: Remove from {location}")
# Final confirmation for safety
confirm = input(
f" ❓ FINAL CONFIRMATION - Type '{index_name}' to proceed: "
).strip()
if confirm != index_name:
print(" ❌ Confirmation failed. Removal cancelled.")
return False
return self._delete_index_directory(
index_dir, index_name, project_path if not is_current else None
)
else:
print(" ❌ Invalid choice. Removal cancelled.")
return False
except (ValueError, KeyboardInterrupt):
print("\n ❌ Invalid input. Removal cancelled.")
return False
def _delete_index_directory(
self, index_dir: Path, index_name: str, project_path: Optional[Path] = None
):
"""Actually delete the index directory"""
try:
import shutil
shutil.rmtree(index_dir)
if project_path:
print(f"✅ Index '{index_name}' removed from {project_path.name}")
else:
print(f"✅ Index '{index_name}' removed successfully")
return True
except Exception as e:
print(f"❌ Error removing index '{index_name}': {e}")
return False
# Show example from current project
current_indexes_dir = current_dir / ".leann" / "indexes"
if current_indexes_dir.exists():
current_index_dirs = [d for d in current_indexes_dir.iterdir() if d.is_dir()]
if current_index_dirs:
example_name = current_index_dirs[0].name
print(f' leann search {example_name} "your query"')
print(f" leann ask {example_name} --interactive")
def load_documents(
self,
@@ -1190,8 +942,6 @@ Examples:
if args.command == "list":
self.list_indexes()
elif args.command == "remove":
self.remove_index(args.index_name, args.force)
elif args.command == "build":
await self.build_index(args)
elif args.command == "search":
@@ -1203,15 +953,10 @@ Examples:
def main():
import logging
import dotenv
dotenv.load_dotenv()
# Set clean logging for CLI usage
logging.getLogger().setLevel(logging.WARNING) # Only show warnings and errors
cli = LeannCLI()
asyncio.run(cli.run())

View File

@@ -57,8 +57,6 @@ def compute_embeddings(
return compute_embeddings_mlx(texts, model_name)
elif mode == "ollama":
return compute_embeddings_ollama(texts, model_name, is_build=is_build)
elif mode == "gemini":
return compute_embeddings_gemini(texts, model_name, is_build=is_build)
else:
raise ValueError(f"Unsupported embedding mode: {mode}")
@@ -246,16 +244,6 @@ def compute_embeddings_openai(texts: list[str], model_name: str) -> np.ndarray:
except ImportError as e:
raise ImportError(f"OpenAI package not installed: {e}")
# Validate input list
if not texts:
raise ValueError("Cannot compute embeddings for empty text list")
# Extra validation: abort early if any item is empty/whitespace
invalid_count = sum(1 for t in texts if not isinstance(t, str) or not t.strip())
if invalid_count > 0:
raise ValueError(
f"Found {invalid_count} empty/invalid text(s) in input. Upstream should filter before calling OpenAI."
)
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY environment variable not set")
@@ -670,83 +658,3 @@ def compute_embeddings_ollama(
logger.info(f"Generated {len(embeddings)} embeddings, dimension: {embeddings.shape[1]}")
return embeddings
def compute_embeddings_gemini(
texts: list[str], model_name: str = "text-embedding-004", is_build: bool = False
) -> np.ndarray:
"""
Compute embeddings using Google Gemini API.
Args:
texts: List of texts to compute embeddings for
model_name: Gemini model name (default: "text-embedding-004")
is_build: Whether this is a build operation (shows progress bar)
Returns:
Embeddings array, shape: (len(texts), embedding_dim)
"""
try:
import os
import google.genai as genai
except ImportError as e:
raise ImportError(f"Google GenAI package not installed: {e}")
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise RuntimeError("GEMINI_API_KEY environment variable not set")
# Cache Gemini client
cache_key = "gemini_client"
if cache_key in _model_cache:
client = _model_cache[cache_key]
else:
client = genai.Client(api_key=api_key)
_model_cache[cache_key] = client
logger.info("Gemini client cached")
logger.info(
f"Computing embeddings for {len(texts)} texts using Gemini API, model: '{model_name}'"
)
# Gemini supports batch embedding
max_batch_size = 100 # Conservative batch size for Gemini
all_embeddings = []
try:
from tqdm import tqdm
total_batches = (len(texts) + max_batch_size - 1) // max_batch_size
batch_range = range(0, len(texts), max_batch_size)
batch_iterator = tqdm(
batch_range, desc="Computing embeddings", unit="batch", total=total_batches
)
except ImportError:
# Fallback when tqdm is not available
batch_iterator = range(0, len(texts), max_batch_size)
for i in batch_iterator:
batch_texts = texts[i : i + max_batch_size]
try:
# Use the embed_content method from the new Google GenAI SDK
response = client.models.embed_content(
model=model_name,
contents=batch_texts,
config=genai.types.EmbedContentConfig(
task_type="RETRIEVAL_DOCUMENT" # For document embedding
),
)
# Extract embeddings from response
for embedding_data in response.embeddings:
all_embeddings.append(embedding_data.values)
except Exception as e:
logger.error(f"Batch {i} failed: {e}")
raise
embeddings = np.array(all_embeddings, dtype=np.float32)
logger.info(f"Generated {len(embeddings)} embeddings, dimension: {embeddings.shape[1]}")
return embeddings

View File

@@ -2,17 +2,11 @@
import importlib
import importlib.metadata
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from leann.interface import LeannBackendFactoryInterface
# Set up logger for this module
logger = logging.getLogger(__name__)
BACKEND_REGISTRY: dict[str, "LeannBackendFactoryInterface"] = {}
@@ -20,7 +14,7 @@ def register_backend(name: str):
"""A decorator to register a new backend class."""
def decorator(cls):
logger.debug(f"Registering backend '{name}'")
print(f"INFO: Registering backend '{name}'")
BACKEND_REGISTRY[name] = cls
return cls
@@ -45,54 +39,3 @@ def autodiscover_backends():
# print(f"WARN: Could not import backend module '{backend_module_name}': {e}")
pass
# print("INFO: Backend auto-discovery finished.")
def register_project_directory(project_dir: Optional[Union[str, Path]] = None):
"""
Register a project directory in the global LEANN registry.
This allows `leann list` to discover indexes created by apps or other tools.
Args:
project_dir: Directory to register. If None, uses current working directory.
"""
if project_dir is None:
project_dir = Path.cwd()
else:
project_dir = Path(project_dir)
# Only register directories that have some kind of LEANN content
# Either .leann/indexes/ (CLI format) or *.leann.meta.json files (apps format)
has_cli_indexes = (project_dir / ".leann" / "indexes").exists()
has_app_indexes = any(project_dir.rglob("*.leann.meta.json"))
if not (has_cli_indexes or has_app_indexes):
# Don't register if there are no LEANN indexes
return
global_registry = Path.home() / ".leann" / "projects.json"
global_registry.parent.mkdir(exist_ok=True)
project_str = str(project_dir.resolve())
# Load existing registry
projects = []
if global_registry.exists():
try:
with open(global_registry) as f:
projects = json.load(f)
except Exception:
logger.debug("Could not load existing project registry")
projects = []
# Add project if not already present
if project_str not in projects:
projects.append(project_str)
# Save updated registry
try:
with open(global_registry, "w") as f:
json.dump(projects, f, indent=2)
logger.debug(f"Registered project directory: {project_str}")
except Exception as e:
logger.warning(f"Could not save project registry: {e}")

View File

@@ -92,7 +92,7 @@ leann build docs-and-configs --docs $(git ls-files "*.md" "*.yml" "*.yaml" "*.js
```
## **Try this in Claude Code:**
**Try this in Claude Code:**
```
Help me understand this codebase. List available indexes and search for authentication patterns.
```

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann"
version = "0.3.0"
version = "0.2.9"
description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -14,6 +14,8 @@ dependencies = [
"numpy>=1.26.0",
"torch",
"tqdm",
"flask",
"flask_compress",
"datasets>=2.15.0",
"evaluate",
"colorama",
@@ -64,7 +66,9 @@ test = [
"pytest>=7.0",
"pytest-timeout>=2.0",
"llama-index-core>=0.12.0",
"llama-index-readers-file>=0.4.0",
"python-dotenv>=1.0.0",
"sentence-transformers>=2.2.0",
]
diskann = [
@@ -96,8 +100,13 @@ leann-backend-hnsw = { path = "packages/leann-backend-hnsw", editable = true }
[tool.ruff]
target-version = "py39"
line-length = 100
extend-exclude = ["third_party"]
extend-exclude = [
"third_party",
"*.egg-info",
"__pycache__",
".git",
".venv",
]
[tool.ruff.lint]
select = [
@@ -120,12 +129,21 @@ ignore = [
"RUF012", # mutable class attributes should be annotated with typing.ClassVar
]
[tool.ruff.lint.per-file-ignores]
"test/**/*.py" = ["E402"] # module level import not at top of file (common in tests)
"examples/**/*.py" = ["E402"] # module level import not at top of file (common in examples)
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"
[dependency-groups]
dev = [
"ruff>=0.12.4",
]
[tool.lychee]
accept = ["200", "403", "429", "503"]
timeout = 20