refactor: Unify examples interface with BaseRAGExample

- Create BaseRAGExample base class for all RAG examples
- Refactor 4 examples to use unified interface:
  - document_rag.py (replaces main_cli_example.py)
  - email_rag.py (replaces mail_reader_leann.py)
  - browser_rag.py (replaces google_history_reader_leann.py)
  - wechat_rag.py (replaces wechat_history_reader_leann.py)
- Maintain 100% parameter compatibility with original files
- Add interactive mode support for all examples
- Unify parameter names (--max-items replaces --max-emails/--max-entries)
- Update README.md with new examples usage
- Add PARAMETER_CONSISTENCY.md documenting all parameter mappings
- Keep main_cli_example.py for backward compatibility with migration notice

All default values, LeannBuilder parameters, and chunking settings
remain identical to ensure full compatibility with existing indexes.
This commit is contained in:
Andy Lee
2025-07-28 23:11:16 -07:00
parent 19bcc07814
commit 46f6f76fc3
8 changed files with 988 additions and 180 deletions

100
README.md
View File

@@ -178,21 +178,39 @@ The example below asks a question about summarizing two papers (uses default dat
```bash
source .venv/bin/activate
python ./examples/main_cli_example.py
python ./examples/document_rag.py --query "What are the main techniques LEANN explores?"
```
<details>
<summary><strong>📋 Click to expand: User Configurable Arguments</strong></summary>
#### Core Parameters (All Examples Share These)
```bash
# Use custom index directory
python examples/main_cli_example.py --index-dir "./my_custom_index"
--index-dir DIR # Directory to store the index
--query "YOUR QUESTION" # Single query to run (interactive mode if omitted)
--max-items N # Max items to process (default: 1000, -1 for all)
--force-rebuild # Force rebuild index even if it exists
# Use custom data directory
python examples/main_cli_example.py --data-dir "./my_documents"
# Embedding Parameters
--embedding-model MODEL # e.g., facebook/contriever, text-embedding-3-small
--embedding-mode MODE # sentence-transformers, openai, or mlx
# Ask a specific question
python examples/main_cli_example.py --query "What are the main findings in these papers?"
# LLM Parameters
--llm TYPE # openai, ollama, or hf
--llm-model MODEL # e.g., gpt-4o, llama3.2:1b
--top-k N # Number of results to retrieve (default: 20)
```
#### Document-Specific Parameters
```bash
# Process custom documents
python examples/document_rag.py --data-dir "./my_documents" --file-types .pdf .txt .md
# Process with custom chunking
python examples/document_rag.py --chunk-size 512 --chunk-overlap 256
# Use different LLM
python examples/document_rag.py --llm ollama --llm-model llama3.2:1b
```
</details>
@@ -208,28 +226,29 @@ python examples/main_cli_example.py --query "What are the main findings in these
**Note:** You need to grant full disk access to your terminal/VS Code in System Preferences → Privacy & Security → Full Disk Access.
```bash
python examples/mail_reader_leann.py --query "What's the food I ordered by DoorDash or Uber Eats mostly?"
python examples/email_rag.py --query "What's the food I ordered by DoorDash or Uber Eats mostly?"
```
**780K email chunks → 78MB storage.** Finally, search your email like you search Google.
<details>
<summary><strong>📋 Click to expand: User Configurable Arguments</strong></summary>
#### Email-Specific Parameters
```bash
# Use default mail path (works for most macOS setups)
python examples/mail_reader_leann.py
# Auto-detect and process all Apple Mail accounts
python examples/email_rag.py
# Run with custom index directory
python examples/mail_reader_leann.py --index-dir "./my_mail_index"
# Process specific mail directory
python examples/email_rag.py --mail-path "~/Library/Mail/V10/..."
# Process all emails (may take time but indexes everything)
python examples/mail_reader_leann.py --max-emails -1
# Process all emails (may take time)
python examples/email_rag.py --max-items -1
# Limit number of emails processed (useful for testing)
python examples/mail_reader_leann.py --max-emails 1000
# Include HTML content
python examples/email_rag.py --include-html
# Run a single query
python examples/mail_reader_leann.py --query "What did my boss say about deadlines?"
# Use different embedding model
python examples/email_rag.py --embedding-model text-embedding-3-small --embedding-mode openai
```
</details>
@@ -250,25 +269,29 @@ Once the index is built, you can ask questions like:
</p>
```bash
python examples/google_history_reader_leann.py --query "Tell me my browser history about machine learning?"
python examples/browser_rag.py --query "Tell me my browser history about machine learning?"
```
**38K browser entries → 6MB storage.** Your browser history becomes your personal search engine.
<details>
<summary><strong>📋 Click to expand: User Configurable Arguments</strong></summary>
#### Browser-Specific Parameters
```bash
# Use default Chrome profile (auto-finds all profiles)
python examples/google_history_reader_leann.py
# Auto-detect and process all Chrome profiles
python examples/browser_rag.py
# Run with custom index directory
python examples/google_history_reader_leann.py --index-dir "./my_chrome_index"
# Process specific Chrome profile
python examples/browser_rag.py --chrome-profile "~/Library/Application Support/Google/Chrome/Default"
# Limit number of history entries processed (useful for testing)
python examples/google_history_reader_leann.py --max-entries 500
# Limit history entries for testing
python examples/browser_rag.py --max-items 500
# Run a single query
python examples/google_history_reader_leann.py --query "What websites did I visit about machine learning?"
# Interactive search mode
python examples/browser_rag.py # Without --query for interactive mode
# Use local LLM for privacy
python examples/browser_rag.py --llm ollama --llm-model llama3.2:1b
```
</details>
@@ -308,7 +331,7 @@ Once the index is built, you can ask questions like:
</p>
```bash
python examples/wechat_history_reader_leann.py --query "Show me all group chats about weekend plans"
python examples/wechat_rag.py --query "Show me all group chats about weekend plans"
```
**400K messages → 64MB storage** Search years of chat history in any language.
@@ -334,21 +357,22 @@ Failed to find or export WeChat data. Exiting.
<details>
<summary><strong>📋 Click to expand: User Configurable Arguments</strong></summary>
#### WeChat-Specific Parameters
```bash
# Use default settings (recommended for first run)
python examples/wechat_history_reader_leann.py
# Auto-export and index WeChat data
python examples/wechat_rag.py
# Run with custom export directory and wehn we run the first time, LEANN will export all chat history automatically for you
python examples/wechat_history_reader_leann.py --export-dir "./my_wechat_exports"
# Use custom export directory
python examples/wechat_rag.py --export-dir "./my_wechat_exports"
# Run with custom index directory
python examples/wechat_history_reader_leann.py --index-dir "./my_wechat_index"
# Force re-export even if data exists
python examples/wechat_rag.py --force-export
# Limit number of chat entries processed (useful for testing)
python examples/wechat_history_reader_leann.py --max-entries 1000
# Limit chat entries for testing
python examples/wechat_rag.py --max-items 1000
# Run a single query
python examples/wechat_history_reader_leann.py --query "Show me conversations about travel plans"
# Use HuggingFace model for Chinese support
python examples/wechat_rag.py --llm hf --llm-model Qwen/Qwen2.5-1.5B-Instruct
```
</details>

View File

@@ -0,0 +1,64 @@
# Parameter Consistency Guide
This document ensures that the new unified interface maintains exact parameter compatibility with the original examples.
## Parameter Mapping
### Common Parameters (All Examples)
| Parameter | Default Value | Notes |
|-----------|--------------|-------|
| `backend_name` | `"hnsw"` | All examples use HNSW backend |
| `graph_degree` | `32` | Consistent across all |
| `complexity` | `64` | Consistent across all |
| `is_compact` | `True` | NOT `compact_index` |
| `is_recompute` | `True` | NOT `use_recomputed_embeddings` |
| `num_threads` | `1` | Force single-threaded mode |
| `chunk_size` | `256` | Consistent across all |
### Example-Specific Defaults
#### document_rag.py (replaces main_cli_example.py)
- `index_dir`: `"./test_doc_files"` (matches original)
- `chunk_overlap`: `128` (matches original)
- `embedding_model`: `"facebook/contriever"`
- `embedding_mode`: `"sentence-transformers"`
- No max limit by default
#### email_rag.py (replaces mail_reader_leann.py)
- `index_dir`: `"./mail_index"` (matches original)
- `max_items`: `1000` (was `max_emails`)
- `chunk_overlap`: `25` (matches original)
- `embedding_model`: `"facebook/contriever"`
- NO `embedding_mode` parameter in LeannBuilder (original doesn't have it)
#### browser_rag.py (replaces google_history_reader_leann.py)
- `index_dir`: `"./google_history_index"` (matches original)
- `max_items`: `1000` (was `max_entries`)
- `chunk_overlap`: `25` (primary value in original)
- `embedding_model`: `"facebook/contriever"`
- `embedding_mode`: `"sentence-transformers"`
#### wechat_rag.py (replaces wechat_history_reader_leann.py)
- `index_dir`: `"./wechat_history_magic_test_11Debug_new"` (matches original)
- `max_items`: `50` (was `max_entries`, much lower default)
- `chunk_overlap`: `25` (matches original)
- `embedding_model`: `"Qwen/Qwen3-Embedding-0.6B"` (special model for Chinese)
- NO `embedding_mode` parameter in LeannBuilder (original doesn't have it)
## Implementation Notes
1. **Parameter Names**: The original files use `is_compact` and `is_recompute`, not the newer names.
2. **Chunk Overlap**: Most examples use `25` except for documents which uses `128`.
3. **Embedding Mode**: Only `google_history_reader_leann.py` and `main_cli_example.py` have this parameter.
4. **Max Items**: Each example has different defaults:
- Email/Browser: 1000
- WeChat: 50
- Documents: unlimited
5. **Special Cases**:
- WeChat uses a specific Chinese embedding model
- Email reader includes HTML processing option

View File

@@ -0,0 +1,274 @@
"""
Base class for unified RAG examples interface.
Provides common parameters and functionality for all RAG examples.
"""
import argparse
import asyncio
import os
from pathlib import Path
from typing import Optional, List, Dict, Any
from abc import ABC, abstractmethod
import dotenv
from leann.api import LeannBuilder, LeannSearcher, LeannChat
from llama_index.core.node_parser import SentenceSplitter
dotenv.load_dotenv()
class BaseRAGExample(ABC):
"""Base class for all RAG examples with unified interface."""
def __init__(
self,
name: str,
description: str,
default_index_name: str,
include_embedding_mode: bool = True,
):
self.name = name
self.description = description
self.default_index_name = default_index_name
self.include_embedding_mode = include_embedding_mode
self.parser = self._create_parser()
def _create_parser(self) -> argparse.ArgumentParser:
"""Create argument parser with common parameters."""
parser = argparse.ArgumentParser(
description=self.description, formatter_class=argparse.RawDescriptionHelpFormatter
)
# Core parameters (all examples share these)
core_group = parser.add_argument_group("Core Parameters")
core_group.add_argument(
"--index-dir",
type=str,
default=f"./{self.default_index_name}",
help=f"Directory to store the index (default: ./{self.default_index_name})",
)
core_group.add_argument(
"--query",
type=str,
default=None,
help="Query to run (if not provided, will run in interactive mode)",
)
# Allow subclasses to override default max_items
max_items_default = getattr(self, "max_items_default", 1000)
core_group.add_argument(
"--max-items",
type=int,
default=max_items_default,
help=f"Maximum number of items to process (default: {max_items_default}, -1 for all)",
)
core_group.add_argument(
"--force-rebuild", action="store_true", help="Force rebuild index even if it exists"
)
# Embedding parameters
embedding_group = parser.add_argument_group("Embedding Parameters")
# Allow subclasses to override default embedding_model
embedding_model_default = getattr(self, "embedding_model_default", "facebook/contriever")
embedding_group.add_argument(
"--embedding-model",
type=str,
default=embedding_model_default,
help=f"Embedding model to use (default: {embedding_model_default})",
)
if self.include_embedding_mode:
embedding_group.add_argument(
"--embedding-mode",
type=str,
default="sentence-transformers",
choices=["sentence-transformers", "openai", "mlx"],
help="Embedding backend mode (default: sentence-transformers)",
)
# LLM parameters
llm_group = parser.add_argument_group("LLM Parameters")
llm_group.add_argument(
"--llm",
type=str,
default="openai",
choices=["openai", "ollama", "hf"],
help="LLM backend to use (default: openai)",
)
llm_group.add_argument(
"--llm-model",
type=str,
default=None,
help="LLM model name (default: gpt-4o for openai, llama3.2:1b for ollama)",
)
llm_group.add_argument(
"--llm-host",
type=str,
default="http://localhost:11434",
help="Host for Ollama API (default: http://localhost:11434)",
)
# Search parameters
search_group = parser.add_argument_group("Search Parameters")
search_group.add_argument(
"--top-k", type=int, default=20, help="Number of results to retrieve (default: 20)"
)
search_group.add_argument(
"--search-complexity",
type=int,
default=64,
help="Search complexity for graph traversal (default: 64)",
)
# Add source-specific parameters
self._add_specific_arguments(parser)
return parser
@abstractmethod
def _add_specific_arguments(self, parser: argparse.ArgumentParser):
"""Add source-specific arguments. Override in subclasses."""
pass
@abstractmethod
async def load_data(self, args) -> List[str]:
"""Load data from the source. Returns list of text chunks."""
pass
def get_llm_config(self, args) -> Dict[str, Any]:
"""Get LLM configuration based on arguments."""
config = {"type": args.llm}
if args.llm == "openai":
config["model"] = args.llm_model or "gpt-4o"
elif args.llm == "ollama":
config["model"] = args.llm_model or "llama3.2:1b"
config["host"] = args.llm_host
elif args.llm == "hf":
config["model"] = args.llm_model or "Qwen/Qwen2.5-1.5B-Instruct"
return config
async def build_index(self, args, texts: List[str]) -> str:
"""Build LEANN index from texts."""
index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann")
print(f"\n[Building Index] Creating {self.name} index...")
print(f"Total text chunks: {len(texts)}")
# Build kwargs for LeannBuilder
builder_kwargs = {
"backend_name": "hnsw",
"embedding_model": args.embedding_model,
"graph_degree": 32,
"complexity": 64,
"is_compact": True,
"is_recompute": True,
"num_threads": 1, # Force single-threaded mode
}
# Only add embedding_mode if it's not suppressed (for compatibility)
if hasattr(args, "embedding_mode") and args.embedding_mode is not None:
builder_kwargs["embedding_mode"] = args.embedding_mode
builder = LeannBuilder(**builder_kwargs)
# Add texts in batches for better progress tracking
batch_size = 1000
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
builder.add_texts(batch)
print(f"Added {min(i + batch_size, len(texts))}/{len(texts)} texts...")
print("Building index structure...")
builder.build_index(index_path)
print(f"Index saved to: {index_path}")
return index_path
async def run_interactive_chat(self, args, index_path: str):
"""Run interactive chat with the index."""
chat = LeannChat(
index_path,
llm_config=self.get_llm_config(args),
system_prompt=f"You are a helpful assistant that answers questions about {self.name} data.",
)
print(f"\n[Interactive Mode] Chat with your {self.name} data!")
print("Type 'quit' or 'exit' to stop.\n")
while True:
try:
query = input("You: ").strip()
if query.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
if not query:
continue
response = await chat.ask(
query, top_k=args.top_k, complexity=args.search_complexity
)
print(f"\nAssistant: {response}\n")
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}")
async def run_single_query(self, args, index_path: str, query: str):
"""Run a single query against the index."""
chat = LeannChat(
index_path,
llm_config=self.get_llm_config(args),
system_prompt=f"You are a helpful assistant that answers questions about {self.name} data.",
)
print(f"\n[Query] {query}")
response = await chat.ask(query, top_k=args.top_k, complexity=args.search_complexity)
print(f"\n[Response] {response}\n")
async def run(self):
"""Main entry point for the example."""
args = self.parser.parse_args()
# Check if index exists
index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann")
index_exists = Path(index_path).exists()
if not index_exists or args.force_rebuild:
# Load data and build index
print(f"\n{'Rebuilding' if index_exists else 'Building'} index...")
texts = await self.load_data(args)
if not texts:
print("No data found to index!")
return
index_path = await self.build_index(args, texts)
else:
print(f"\nUsing existing index: {index_path}")
# Run query or interactive mode
if args.query:
await self.run_single_query(args, index_path, args.query)
else:
await self.run_interactive_chat(args, index_path)
def create_text_chunks(documents, chunk_size=256, chunk_overlap=25) -> List[str]:
"""Helper function to create text chunks from documents."""
node_parser = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
)
all_texts = []
for doc in documents:
nodes = node_parser.get_nodes_from_documents([doc])
if nodes:
all_texts.extend(node.get_content() for node in nodes)
return all_texts

157
examples/browser_rag.py Normal file
View File

@@ -0,0 +1,157 @@
"""
Browser History RAG example using the unified interface.
Supports Chrome browser history.
"""
import os
import sys
from pathlib import Path
from typing import List
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from history_data.history import ChromeHistoryReader
class BrowserRAG(BaseRAGExample):
"""RAG example for Chrome browser history."""
def __init__(self):
super().__init__(
name="Browser History",
description="Process and query Chrome browser history with LEANN",
default_index_name="google_history_index", # Match original: "./google_history_index",
)
def _add_specific_arguments(self, parser):
"""Add browser-specific arguments."""
browser_group = parser.add_argument_group("Browser Parameters")
browser_group.add_argument(
"--chrome-profile",
type=str,
default=None,
help="Path to Chrome profile directory (auto-detected if not specified)",
)
browser_group.add_argument(
"--auto-find-profiles",
action="store_true",
default=True,
help="Automatically find all Chrome profiles (default: True)",
)
def _get_chrome_base_path(self) -> Path:
"""Get the base Chrome profile path based on OS."""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Google" / "Chrome"
elif sys.platform.startswith("linux"):
return Path.home() / ".config" / "google-chrome"
elif sys.platform == "win32":
return Path(os.environ["LOCALAPPDATA"]) / "Google" / "Chrome" / "User Data"
else:
raise ValueError(f"Unsupported platform: {sys.platform}")
def _find_chrome_profiles(self) -> List[Path]:
"""Auto-detect all Chrome profiles."""
base_path = self._get_chrome_base_path()
if not base_path.exists():
return []
profiles = []
# Check Default profile
default_profile = base_path / "Default"
if default_profile.exists() and (default_profile / "History").exists():
profiles.append(default_profile)
# Check numbered profiles
for item in base_path.iterdir():
if item.is_dir() and item.name.startswith("Profile "):
if (item / "History").exists():
profiles.append(item)
return profiles
async def load_data(self, args) -> List[str]:
"""Load browser history and convert to text chunks."""
# Determine Chrome profiles
if args.chrome_profile and not args.auto_find_profiles:
profile_dirs = [Path(args.chrome_profile)]
else:
print("Auto-detecting Chrome profiles...")
profile_dirs = self._find_chrome_profiles()
# If specific profile given, filter to just that one
if args.chrome_profile:
profile_path = Path(args.chrome_profile)
profile_dirs = [p for p in profile_dirs if p == profile_path]
if not profile_dirs:
print("No Chrome profiles found!")
print("Please specify --chrome-profile manually")
return []
print(f"Found {len(profile_dirs)} Chrome profiles")
# Create reader
reader = ChromeHistoryReader()
# Process each profile
all_documents = []
total_processed = 0
for i, profile_dir in enumerate(profile_dirs):
print(f"\nProcessing profile {i + 1}/{len(profile_dirs)}: {profile_dir.name}")
try:
# Apply max_items limit per profile
max_per_profile = -1
if args.max_items > 0:
remaining = args.max_items - total_processed
if remaining <= 0:
break
max_per_profile = remaining
# Load history
documents = reader.load_data(
chrome_profile_path=str(profile_dir),
max_count=max_per_profile,
)
if documents:
all_documents.extend(documents)
total_processed += len(documents)
print(f"Processed {len(documents)} history entries from this profile")
except Exception as e:
print(f"Error processing {profile_dir}: {e}")
continue
if not all_documents:
print("No browser history found to process!")
return []
print(f"\nTotal history entries processed: {len(all_documents)}")
# Convert to text chunks
all_texts = create_text_chunks(all_documents)
return all_texts
if __name__ == "__main__":
import asyncio
# Example queries for browser history RAG
print("\n🌐 Browser History RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'What websites did I visit about machine learning?'")
print("- 'Find my search history about programming'")
print("- 'What YouTube videos did I watch recently?'")
print("- 'Show me websites about travel planning'")
print("\nNote: Make sure Chrome is closed before running\n")
rag = BrowserRAG()
asyncio.run(rag.run())

107
examples/document_rag.py Normal file
View File

@@ -0,0 +1,107 @@
"""
Document RAG example using the unified interface.
Supports PDF, TXT, MD, and other document formats.
"""
import sys
from pathlib import Path
from typing import List
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from llama_index.core import SimpleDirectoryReader
class DocumentRAG(BaseRAGExample):
"""RAG example for document processing (PDF, TXT, MD, etc.)."""
def __init__(self):
super().__init__(
name="Document",
description="Process and query documents (PDF, TXT, MD, etc.) with LEANN",
default_index_name="test_doc_files" # Match original main_cli_example.py default
)
def _add_specific_arguments(self, parser):
"""Add document-specific arguments."""
doc_group = parser.add_argument_group('Document Parameters')
doc_group.add_argument(
"--data-dir",
type=str,
default="examples/data",
help="Directory containing documents to index (default: examples/data)"
)
doc_group.add_argument(
"--file-types",
nargs="+",
default=[".pdf", ".txt", ".md"],
help="File types to process (default: .pdf .txt .md)"
)
doc_group.add_argument(
"--chunk-size",
type=int,
default=256,
help="Text chunk size (default: 256)"
)
doc_group.add_argument(
"--chunk-overlap",
type=int,
default=128,
help="Text chunk overlap (default: 128)"
)
async def load_data(self, args) -> List[str]:
"""Load documents and convert to text chunks."""
print(f"Loading documents from: {args.data_dir}")
print(f"File types: {args.file_types}")
# Check if data directory exists
data_path = Path(args.data_dir)
if not data_path.exists():
raise ValueError(f"Data directory not found: {args.data_dir}")
# Load documents
documents = SimpleDirectoryReader(
args.data_dir,
recursive=True,
encoding="utf-8",
required_exts=args.file_types,
).load_data(show_progress=True)
if not documents:
print(f"No documents found in {args.data_dir} with extensions {args.file_types}")
return []
print(f"Loaded {len(documents)} documents")
# Convert to text chunks
all_texts = create_text_chunks(
documents,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap
)
# Apply max_items limit if specified
if args.max_items > 0 and len(all_texts) > args.max_items:
print(f"Limiting to {args.max_items} chunks (from {len(all_texts)})")
all_texts = all_texts[:args.max_items]
return all_texts
if __name__ == "__main__":
import asyncio
# Example queries for document RAG
print("\n📄 Document RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'What are the main techniques LEANN uses?'")
print("- 'Summarize the key findings in these papers'")
print("- 'What is the storage reduction achieved by LEANN?'")
print("\nOr run without --query for interactive mode\n")
rag = DocumentRAG()
asyncio.run(rag.run())

143
examples/email_rag.py Normal file
View File

@@ -0,0 +1,143 @@
"""
Email RAG example using the unified interface.
Supports Apple Mail on macOS.
"""
import os
import sys
from pathlib import Path
from typing import List
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from email_data.LEANN_email_reader import EmlxReader
class EmailRAG(BaseRAGExample):
"""RAG example for Apple Mail processing."""
def __init__(self):
super().__init__(
name="Email",
description="Process and query Apple Mail emails with LEANN",
default_index_name="mail_index", # Match original: "./mail_index"
include_embedding_mode=False, # Original mail_reader_leann.py doesn't have embedding_mode
)
def _add_specific_arguments(self, parser):
"""Add email-specific arguments."""
email_group = parser.add_argument_group("Email Parameters")
email_group.add_argument(
"--mail-path",
type=str,
default=None,
help="Path to Apple Mail directory (auto-detected if not specified)",
)
email_group.add_argument(
"--include-html", action="store_true", help="Include HTML content in email processing"
)
def _find_mail_directories(self) -> List[Path]:
"""Auto-detect all Apple Mail directories."""
mail_base = Path.home() / "Library" / "Mail"
if not mail_base.exists():
return []
# Find all Messages directories
messages_dirs = []
for item in mail_base.rglob("Messages"):
if item.is_dir():
messages_dirs.append(item)
return messages_dirs
async def load_data(self, args) -> List[str]:
"""Load emails and convert to text chunks."""
# Determine mail directories
if args.mail_path:
messages_dirs = [Path(args.mail_path)]
else:
print("Auto-detecting Apple Mail directories...")
messages_dirs = self._find_mail_directories()
if not messages_dirs:
print("No Apple Mail directories found!")
print("Please specify --mail-path manually")
return []
print(f"Found {len(messages_dirs)} mail directories")
# Create reader
reader = EmlxReader()
# Process each directory
all_documents = []
total_processed = 0
for i, messages_dir in enumerate(messages_dirs):
print(f"\nProcessing directory {i + 1}/{len(messages_dirs)}: {messages_dir}")
try:
# Count emlx files
emlx_files = list(messages_dir.glob("*.emlx"))
print(f"Found {len(emlx_files)} email files")
# Apply max_items limit per directory
max_per_dir = -1
if args.max_items > 0:
remaining = args.max_items - total_processed
if remaining <= 0:
break
max_per_dir = remaining
# Load emails
documents = reader.load_data(
file_path=str(messages_dir),
max_count=max_per_dir,
include_html=args.include_html,
)
if documents:
all_documents.extend(documents)
total_processed += len(documents)
print(f"Processed {len(documents)} emails from this directory")
except Exception as e:
print(f"Error processing {messages_dir}: {e}")
continue
if not all_documents:
print("No emails found to process!")
return []
print(f"\nTotal emails processed: {len(all_documents)}")
# Convert to text chunks
# Email reader uses chunk_overlap=25 as in original
all_texts = create_text_chunks(all_documents, chunk_overlap=25)
return all_texts
if __name__ == "__main__":
import asyncio
# Check platform
if sys.platform != "darwin":
print("\n⚠️ Warning: This example is designed for macOS (Apple Mail)")
print(" Windows/Linux support coming soon!\n")
# Example queries for email RAG
print("\n📧 Email RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'What did my boss say about deadlines?'")
print("- 'Find emails about travel expenses'")
print("- 'Show me emails from last month about the project'")
print("- 'What food did I order from DoorDash?'")
print("\nNote: You may need to grant Full Disk Access to your terminal\n")
rag = EmailRAG()
asyncio.run(rag.run())

View File

@@ -1,146 +1,32 @@
import argparse
import asyncio
from pathlib import Path
#!/usr/bin/env python3
"""
This script has been replaced by document_rag.py with a unified interface.
This file is kept for backward compatibility.
"""
import dotenv
from leann.api import LeannBuilder, LeannChat
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
import sys
import os
dotenv.load_dotenv()
print("=" * 70)
print("NOTICE: This script has been replaced!")
print("=" * 70)
print("\nThe examples have been refactored with a unified interface.")
print("Please use the new script instead:\n")
print(" python examples/document_rag.py")
print("\nThe new script provides:")
print(" ✓ Consistent parameters across all examples")
print(" ✓ Better error handling")
print(" ✓ Interactive mode support")
print(" ✓ More customization options")
print("\nExample usage:")
print(' python examples/document_rag.py --query "What are the main techniques?"')
print(" python examples/document_rag.py # For interactive mode")
print("\nSee README.md for full documentation.")
print("=" * 70)
# If user passed arguments, show how to use them with new script
if len(sys.argv) > 1:
print("\nTo use your arguments with the new script:")
print(f" python examples/document_rag.py {' '.join(sys.argv[1:])}")
async def main(args):
INDEX_DIR = Path(args.index_dir)
INDEX_PATH = str(INDEX_DIR / "pdf_documents.leann")
if not INDEX_DIR.exists():
node_parser = SentenceSplitter(
chunk_size=256, chunk_overlap=128, separator=" ", paragraph_separator="\n\n"
)
print("Loading documents...")
documents = SimpleDirectoryReader(
args.data_dir,
recursive=True,
encoding="utf-8",
required_exts=[".pdf", ".txt", ".md"],
).load_data(show_progress=True)
print("Documents loaded.")
all_texts = []
for doc in documents:
nodes = node_parser.get_nodes_from_documents([doc])
if nodes:
all_texts.extend(node.get_content() for node in nodes)
print("--- Index directory not found, building new index ---")
print("\n[PHASE 1] Building Leann index...")
# LeannBuilder now automatically detects normalized embeddings and sets appropriate distance metric
print(f"Using {args.embedding_model} with {args.embedding_mode} mode")
# Use HNSW backend for better macOS compatibility
builder = LeannBuilder(
backend_name="hnsw",
embedding_model=args.embedding_model,
embedding_mode=args.embedding_mode,
# distance_metric is automatically set based on embedding model
graph_degree=32,
complexity=64,
is_compact=True,
is_recompute=True,
num_threads=1, # Force single-threaded mode
)
print(f"Loaded {len(all_texts)} text chunks from documents.")
for chunk_text in all_texts:
builder.add_text(chunk_text)
builder.build_index(INDEX_PATH)
print(f"\nLeann index built at {INDEX_PATH}!")
else:
print(f"--- Using existing index at {INDEX_DIR} ---")
print("\n[PHASE 2] Starting Leann chat session...")
# Build llm_config based on command line arguments
if args.llm == "simulated":
llm_config = {"type": "simulated"}
elif args.llm == "ollama":
llm_config = {"type": "ollama", "model": args.model, "host": args.host}
elif args.llm == "hf":
llm_config = {"type": "hf", "model": args.model}
elif args.llm == "openai":
llm_config = {"type": "openai", "model": args.model}
else:
raise ValueError(f"Unknown LLM type: {args.llm}")
print(f"Using LLM: {args.llm} with model: {args.model if args.llm != 'simulated' else 'N/A'}")
chat = LeannChat(index_path=INDEX_PATH, llm_config=llm_config)
# query = (
# "什么是盘古大模型以及盘古开发过程中遇到了什么阴暗面,任务令一般在什么城市颁发"
# )
query = args.query
print(f"You: {query}")
chat_response = chat.ask(query, top_k=20, recompute_embeddings=True, complexity=32)
print(f"Leann chat response: \033[36m{chat_response}\033[0m")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run Leann Chat with various LLM backends.")
parser.add_argument(
"--llm",
type=str,
default="openai",
choices=["simulated", "ollama", "hf", "openai"],
help="The LLM backend to use.",
)
parser.add_argument(
"--model",
type=str,
default="gpt-4o",
help="The model name to use (e.g., 'llama3:8b' for ollama, 'deepseek-ai/deepseek-llm-7b-chat' for hf, 'gpt-4o' for openai).",
)
parser.add_argument(
"--embedding-model",
type=str,
default="facebook/contriever",
help="The embedding model to use (e.g., 'facebook/contriever', 'text-embedding-3-small').",
)
parser.add_argument(
"--embedding-mode",
type=str,
default="sentence-transformers",
choices=["sentence-transformers", "openai", "mlx"],
help="The embedding backend mode.",
)
parser.add_argument(
"--host",
type=str,
default="http://localhost:11434",
help="The host for the Ollama API.",
)
parser.add_argument(
"--index-dir",
type=str,
default="./test_doc_files",
help="Directory where the Leann index will be stored.",
)
parser.add_argument(
"--data-dir",
type=str,
default="examples/data",
help="Directory containing documents to index (PDF, TXT, MD files).",
)
parser.add_argument(
"--query",
type=str,
default="Based on the paper, what are the main techniques LEANN explores to reduce the storage overhead and DLPM explore to achieve Fairness and Efiiciency trade-off?",
help="The query to ask the Leann chat system.",
)
args = parser.parse_args()
asyncio.run(main(args))
sys.exit(1)

153
examples/wechat_rag.py Normal file
View File

@@ -0,0 +1,153 @@
"""
WeChat History RAG example using the unified interface.
Supports WeChat chat history export and search.
"""
import subprocess
import sys
from pathlib import Path
from typing import List
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from history_data.wechat_history import WeChatHistoryReader
class WeChatRAG(BaseRAGExample):
"""RAG example for WeChat chat history."""
def __init__(self):
# Set default values BEFORE calling super().__init__
self.max_items_default = 50 # Match original default
self.embedding_model_default = "Qwen/Qwen3-Embedding-0.6B" # Match original default
super().__init__(
name="WeChat History",
description="Process and query WeChat chat history with LEANN",
default_index_name="wechat_history_magic_test_11Debug_new", # Match original default
include_embedding_mode=False, # Original wechat_history_reader_leann.py doesn't have embedding_mode
)
def _add_specific_arguments(self, parser):
"""Add WeChat-specific arguments."""
wechat_group = parser.add_argument_group("WeChat Parameters")
wechat_group.add_argument(
"--export-dir",
type=str,
default="./wechat_export",
help="Directory to store WeChat exports (default: ./wechat_export)",
)
wechat_group.add_argument(
"--force-export",
action="store_true",
help="Force re-export of WeChat data even if exports exist",
)
def _export_wechat_data(self, export_dir: Path) -> bool:
"""Export WeChat data using wechattweak-cli."""
print("Exporting WeChat data...")
# Check if WeChat is running
try:
result = subprocess.run(["pgrep", "WeChat"], capture_output=True, text=True)
if result.returncode != 0:
print("WeChat is not running. Please start WeChat first.")
return False
except Exception:
pass # pgrep might not be available on all systems
# Create export directory
export_dir.mkdir(parents=True, exist_ok=True)
# Run export command
cmd = ["packages/wechat-exporter/wechattweak-cli", "export", str(export_dir)]
try:
print(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("WeChat data exported successfully!")
return True
else:
print(f"Export failed: {result.stderr}")
return False
except FileNotFoundError:
print("\nError: wechattweak-cli not found!")
print("Please install it first:")
print(" sudo packages/wechat-exporter/wechattweak-cli install")
return False
except Exception as e:
print(f"Export error: {e}")
return False
async def load_data(self, args) -> List[str]:
"""Load WeChat history and convert to text chunks."""
export_path = Path(args.export_dir)
# Check if we need to export
need_export = (
args.force_export or not export_path.exists() or not any(export_path.iterdir())
)
if need_export:
if sys.platform != "darwin":
print("\n⚠️ Error: WeChat export is only supported on macOS")
return []
success = self._export_wechat_data(export_path)
if not success:
print("Failed to export WeChat data")
return []
else:
print(f"Using existing WeChat export: {export_path}")
# Load WeChat data
reader = WeChatHistoryReader()
try:
print("\nLoading WeChat history...")
documents = reader.load_data(
wechat_export_dir=str(export_path),
max_count=args.max_items if args.max_items > 0 else -1,
)
if not documents:
print("No WeChat data found!")
return []
print(f"Loaded {len(documents)} chat entries")
# Convert to text chunks
all_texts = create_text_chunks(documents)
return all_texts
except Exception as e:
print(f"Error loading WeChat data: {e}")
return []
if __name__ == "__main__":
import asyncio
# Check platform
if sys.platform != "darwin":
print("\n⚠️ Warning: WeChat export is only supported on macOS")
print(" You can still query existing exports on other platforms\n")
# Example queries for WeChat RAG
print("\n💬 WeChat History RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'Show me conversations about travel plans'")
print("- 'Find group chats about weekend activities'")
print("- '我想买魔术师约翰逊的球衣,给我一些对应聊天记录?'")
print("- 'What did we discuss about the project last month?'")
print("\nNote: WeChat must be running for export to work\n")
rag = WeChatRAG()
asyncio.run(rag.run())