Merge branch 'refactor-app' of https://github.com/yichuan-w/LEANN into refactor-app

This commit is contained in:
yichuan520030910320
2025-08-03 23:02:12 -07:00
45 changed files with 155 additions and 1927 deletions

0
apps/__init__.py Normal file
View File

296
apps/base_rag_example.py Normal file
View File

@@ -0,0 +1,296 @@
"""
Base class for unified RAG examples interface.
Provides common parameters and functionality for all RAG examples.
"""
import argparse
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any
import dotenv
from leann.api import LeannBuilder, LeannChat
from llama_index.core.node_parser import SentenceSplitter
dotenv.load_dotenv()
class BaseRAGExample(ABC):
"""Base class for all RAG examples with unified interface."""
def __init__(
self,
name: str,
description: str,
default_index_name: str,
):
self.name = name
self.description = description
self.default_index_name = default_index_name
self.parser = self._create_parser()
def _create_parser(self) -> argparse.ArgumentParser:
"""Create argument parser with common parameters."""
parser = argparse.ArgumentParser(
description=self.description, formatter_class=argparse.RawDescriptionHelpFormatter
)
# Core parameters (all examples share these)
core_group = parser.add_argument_group("Core Parameters")
core_group.add_argument(
"--index-dir",
type=str,
default=f"./{self.default_index_name}",
help=f"Directory to store the index (default: ./{self.default_index_name})",
)
core_group.add_argument(
"--query",
type=str,
default=None,
help="Query to run (if not provided, will run in interactive mode)",
)
# Allow subclasses to override default max_items
max_items_default = getattr(self, "max_items_default", -1)
core_group.add_argument(
"--max-items",
type=int,
default=max_items_default,
help="Maximum number of items to process -1 for all, means index all documents, and you should set it to a reasonable number if you have a large dataset and try at the first time)",
)
core_group.add_argument(
"--force-rebuild", action="store_true", help="Force rebuild index even if it exists"
)
# Embedding parameters
embedding_group = parser.add_argument_group("Embedding Parameters")
# Allow subclasses to override default embedding_model
embedding_model_default = getattr(self, "embedding_model_default", "facebook/contriever")
embedding_group.add_argument(
"--embedding-model",
type=str,
default=embedding_model_default,
help=f"Embedding model to use (default: {embedding_model_default})",
)
embedding_group.add_argument(
"--embedding-mode",
type=str,
default="sentence-transformers",
choices=["sentence-transformers", "openai", "mlx"],
help="Embedding backend mode (default: sentence-transformers)",
)
# LLM parameters
llm_group = parser.add_argument_group("LLM Parameters")
llm_group.add_argument(
"--llm",
type=str,
default="openai",
choices=["openai", "ollama", "hf"],
help="LLM backend to use (default: openai)",
)
llm_group.add_argument(
"--llm-model",
type=str,
default=None,
help="LLM model name (default: gpt-4o for openai, llama3.2:1b for ollama)",
)
llm_group.add_argument(
"--llm-host",
type=str,
default="http://localhost:11434",
help="Host for Ollama API (default: http://localhost:11434)",
)
# Search parameters
search_group = parser.add_argument_group("Search Parameters")
search_group.add_argument(
"--top-k", type=int, default=20, help="Number of results to retrieve (default: 20)"
)
search_group.add_argument(
"--search-complexity",
type=int,
default=32,
help="Search complexity for graph traversal (default: 64)",
)
# Index building parameters
index_group = parser.add_argument_group("Index Building Parameters")
index_group.add_argument(
"--backend-name",
type=str,
default="hnsw",
choices=["hnsw", "diskann"],
help="Backend to use for index (default: hnsw)",
)
index_group.add_argument(
"--graph-degree",
type=int,
default=32,
help="Graph degree for index construction (default: 32)",
)
index_group.add_argument(
"--build-complexity",
type=int,
default=64,
help="Build complexity for index construction (default: 64)",
)
index_group.add_argument(
"--no-compact",
action="store_true",
help="Disable compact index storage",
)
index_group.add_argument(
"--no-recompute",
action="store_true",
help="Disable embedding recomputation",
)
# Add source-specific parameters
self._add_specific_arguments(parser)
return parser
@abstractmethod
def _add_specific_arguments(self, parser: argparse.ArgumentParser):
"""Add source-specific arguments. Override in subclasses."""
pass
@abstractmethod
async def load_data(self, args) -> list[str]:
"""Load data from the source. Returns list of text chunks."""
pass
def get_llm_config(self, args) -> dict[str, Any]:
"""Get LLM configuration based on arguments."""
config = {"type": args.llm}
if args.llm == "openai":
config["model"] = args.llm_model or "gpt-4o"
elif args.llm == "ollama":
config["model"] = args.llm_model or "llama3.2:1b"
config["host"] = args.llm_host
elif args.llm == "hf":
config["model"] = args.llm_model or "Qwen/Qwen2.5-1.5B-Instruct"
return config
async def build_index(self, args, texts: list[str]) -> str:
"""Build LEANN index from texts."""
index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann")
print(f"\n[Building Index] Creating {self.name} index...")
print(f"Total text chunks: {len(texts)}")
builder = LeannBuilder(
backend_name=args.backend_name,
embedding_model=args.embedding_model,
embedding_mode=args.embedding_mode,
graph_degree=args.graph_degree,
complexity=args.build_complexity,
is_compact=not args.no_compact,
is_recompute=not args.no_recompute,
num_threads=1, # Force single-threaded mode
)
# Add texts in batches for better progress tracking
batch_size = 1000
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
for text in batch:
builder.add_text(text)
print(f"Added {min(i + batch_size, len(texts))}/{len(texts)} texts...")
print("Building index structure...")
builder.build_index(index_path)
print(f"Index saved to: {index_path}")
return index_path
async def run_interactive_chat(self, args, index_path: str):
"""Run interactive chat with the index."""
chat = LeannChat(
index_path,
llm_config=self.get_llm_config(args),
system_prompt=f"You are a helpful assistant that answers questions about {self.name} data.",
complexity=args.search_complexity,
)
print(f"\n[Interactive Mode] Chat with your {self.name} data!")
print("Type 'quit' or 'exit' to stop.\n")
while True:
try:
query = input("You: ").strip()
if query.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
if not query:
continue
response = chat.ask(query, top_k=args.top_k, complexity=args.search_complexity)
print(f"\nAssistant: {response}\n")
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}")
async def run_single_query(self, args, index_path: str, query: str):
"""Run a single query against the index."""
chat = LeannChat(
index_path,
llm_config=self.get_llm_config(args),
system_prompt=f"You are a helpful assistant that answers questions about {self.name} data.",
complexity=args.search_complexity,
)
print(f"\n[Query]: \033[36m{query}\033[0m")
response = chat.ask(query, top_k=args.top_k, complexity=args.search_complexity)
print(f"\n[Response]: \033[36m{response}\033[0m")
async def run(self):
"""Main entry point for the example."""
args = self.parser.parse_args()
# Check if index exists
index_path = str(Path(args.index_dir) / f"{self.default_index_name}.leann")
index_exists = Path(args.index_dir).exists()
if not index_exists or args.force_rebuild:
# Load data and build index
print(f"\n{'Rebuilding' if index_exists else 'Building'} index...")
texts = await self.load_data(args)
if not texts:
print("No data found to index!")
return
index_path = await self.build_index(args, texts)
else:
print(f"\nUsing existing index in {args.index_dir}")
# Run query or interactive mode
if args.query:
await self.run_single_query(args, index_path, args.query)
else:
await self.run_interactive_chat(args, index_path)
def create_text_chunks(documents, chunk_size=256, chunk_overlap=25) -> list[str]:
"""Helper function to create text chunks from documents."""
node_parser = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
)
all_texts = []
for doc in documents:
nodes = node_parser.get_nodes_from_documents([doc])
if nodes:
all_texts.extend(node.get_content() for node in nodes)
return all_texts

170
apps/browser_rag.py Normal file
View File

@@ -0,0 +1,170 @@
"""
Browser History RAG example using the unified interface.
Supports Chrome browser history.
"""
import os
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from .history_data.history import ChromeHistoryReader
class BrowserRAG(BaseRAGExample):
"""RAG example for Chrome browser history."""
def __init__(self):
# Set default values BEFORE calling super().__init__
self.embedding_model_default = (
"sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model
)
super().__init__(
name="Browser History",
description="Process and query Chrome browser history with LEANN",
default_index_name="google_history_index",
)
def _add_specific_arguments(self, parser):
"""Add browser-specific arguments."""
browser_group = parser.add_argument_group("Browser Parameters")
browser_group.add_argument(
"--chrome-profile",
type=str,
default=None,
help="Path to Chrome profile directory (auto-detected if not specified)",
)
browser_group.add_argument(
"--auto-find-profiles",
action="store_true",
default=True,
help="Automatically find all Chrome profiles (default: True)",
)
browser_group.add_argument(
"--chunk-size", type=int, default=256, help="Text chunk size (default: 256)"
)
browser_group.add_argument(
"--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)"
)
def _get_chrome_base_path(self) -> Path:
"""Get the base Chrome profile path based on OS."""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Google" / "Chrome"
elif sys.platform.startswith("linux"):
return Path.home() / ".config" / "google-chrome"
elif sys.platform == "win32":
return Path(os.environ["LOCALAPPDATA"]) / "Google" / "Chrome" / "User Data"
else:
raise ValueError(f"Unsupported platform: {sys.platform}")
def _find_chrome_profiles(self) -> list[Path]:
"""Auto-detect all Chrome profiles."""
base_path = self._get_chrome_base_path()
if not base_path.exists():
return []
profiles = []
# Check Default profile
default_profile = base_path / "Default"
if default_profile.exists() and (default_profile / "History").exists():
profiles.append(default_profile)
# Check numbered profiles
for item in base_path.iterdir():
if item.is_dir() and item.name.startswith("Profile "):
if (item / "History").exists():
profiles.append(item)
return profiles
async def load_data(self, args) -> list[str]:
"""Load browser history and convert to text chunks."""
# Determine Chrome profiles
if args.chrome_profile and not args.auto_find_profiles:
profile_dirs = [Path(args.chrome_profile)]
else:
print("Auto-detecting Chrome profiles...")
profile_dirs = self._find_chrome_profiles()
# If specific profile given, filter to just that one
if args.chrome_profile:
profile_path = Path(args.chrome_profile)
profile_dirs = [p for p in profile_dirs if p == profile_path]
if not profile_dirs:
print("No Chrome profiles found!")
print("Please specify --chrome-profile manually")
return []
print(f"Found {len(profile_dirs)} Chrome profiles")
# Create reader
reader = ChromeHistoryReader()
# Process each profile
all_documents = []
total_processed = 0
for i, profile_dir in enumerate(profile_dirs):
print(f"\nProcessing profile {i + 1}/{len(profile_dirs)}: {profile_dir.name}")
try:
# Apply max_items limit per profile
max_per_profile = -1
if args.max_items > 0:
remaining = args.max_items - total_processed
if remaining <= 0:
break
max_per_profile = remaining
# Load history
documents = reader.load_data(
chrome_profile_path=str(profile_dir),
max_count=max_per_profile,
)
if documents:
all_documents.extend(documents)
total_processed += len(documents)
print(f"Processed {len(documents)} history entries from this profile")
except Exception as e:
print(f"Error processing {profile_dir}: {e}")
continue
if not all_documents:
print("No browser history found to process!")
return []
print(f"\nTotal history entries processed: {len(all_documents)}")
# Convert to text chunks
all_texts = create_text_chunks(
all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
)
return all_texts
if __name__ == "__main__":
import asyncio
# Example queries for browser history RAG
print("\n🌐 Browser History RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'What websites did I visit about machine learning?'")
print("- 'Find my search history about programming'")
print("- 'What YouTube videos did I watch recently?'")
print("- 'Show me websites about travel planning'")
print("\nNote: Make sure Chrome is closed before running\n")
rag = BrowserRAG()
asyncio.run(rag.run())

106
apps/document_rag.py Normal file
View File

@@ -0,0 +1,106 @@
"""
Document RAG example using the unified interface.
Supports PDF, TXT, MD, and other document formats.
"""
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from llama_index.core import SimpleDirectoryReader
class DocumentRAG(BaseRAGExample):
"""RAG example for document processing (PDF, TXT, MD, etc.)."""
def __init__(self):
super().__init__(
name="Document",
description="Process and query documents (PDF, TXT, MD, etc.) with LEANN",
default_index_name="test_doc_files",
)
def _add_specific_arguments(self, parser):
"""Add document-specific arguments."""
doc_group = parser.add_argument_group("Document Parameters")
doc_group.add_argument(
"--data-dir",
type=str,
default="data",
help="Directory containing documents to index (default: data)",
)
doc_group.add_argument(
"--file-types",
nargs="+",
default=None,
help="Filter by file types (e.g., .pdf .txt .md). If not specified, all supported types are processed",
)
doc_group.add_argument(
"--chunk-size", type=int, default=256, help="Text chunk size (default: 256)"
)
doc_group.add_argument(
"--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)"
)
async def load_data(self, args) -> list[str]:
"""Load documents and convert to text chunks."""
print(f"Loading documents from: {args.data_dir}")
if args.file_types:
print(f"Filtering by file types: {args.file_types}")
else:
print("Processing all supported file types")
# Check if data directory exists
data_path = Path(args.data_dir)
if not data_path.exists():
raise ValueError(f"Data directory not found: {args.data_dir}")
# Load documents
reader_kwargs = {
"recursive": True,
"encoding": "utf-8",
}
if args.file_types:
reader_kwargs["required_exts"] = args.file_types
documents = SimpleDirectoryReader(args.data_dir, **reader_kwargs).load_data(
show_progress=True
)
if not documents:
print(f"No documents found in {args.data_dir} with extensions {args.file_types}")
return []
print(f"Loaded {len(documents)} documents")
# Convert to text chunks
all_texts = create_text_chunks(
documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
)
# Apply max_items limit if specified
if args.max_items > 0 and len(all_texts) > args.max_items:
print(f"Limiting to {args.max_items} chunks (from {len(all_texts)})")
all_texts = all_texts[: args.max_items]
return all_texts
if __name__ == "__main__":
import asyncio
# Example queries for document RAG
print("\n📄 Document RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'What are the main techniques LEANN uses?'")
print("- 'What is the technique DLPM?'")
print("- 'Who does Elizabeth Bennet marry?'")
print("- 'What is the problem of developing pan gu model? (盘古大模型开发中遇到什么问题?)'")
print("\nOr run without --query for interactive mode\n")
rag = DocumentRAG()
asyncio.run(rag.run())

View File

@@ -0,0 +1,167 @@
import email
import os
from pathlib import Path
from typing import Any
from llama_index.core import Document
from llama_index.core.readers.base import BaseReader
def find_all_messages_directories(root: str | None = None) -> list[Path]:
"""
Recursively find all 'Messages' directories under the given root.
Returns a list of Path objects.
"""
if root is None:
# Auto-detect user's mail path
home_dir = os.path.expanduser("~")
root = os.path.join(home_dir, "Library", "Mail")
messages_dirs = []
for dirpath, _dirnames, _filenames in os.walk(root):
if os.path.basename(dirpath) == "Messages":
messages_dirs.append(Path(dirpath))
return messages_dirs
class EmlxReader(BaseReader):
"""
Apple Mail .emlx file reader with embedded metadata.
Reads individual .emlx files from Apple Mail's storage format.
"""
def __init__(self, include_html: bool = False) -> None:
"""
Initialize.
Args:
include_html: Whether to include HTML content in the email body (default: False)
"""
self.include_html = include_html
def load_data(self, input_dir: str, **load_kwargs: Any) -> list[Document]:
"""
Load data from the input directory containing .emlx files.
Args:
input_dir: Directory containing .emlx files
**load_kwargs:
max_count (int): Maximum amount of messages to read.
"""
docs: list[Document] = []
max_count = load_kwargs.get("max_count", 1000)
count = 0
total_files = 0
successful_files = 0
failed_files = 0
print(f"Starting to process directory: {input_dir}")
# Walk through the directory recursively
for dirpath, dirnames, filenames in os.walk(input_dir):
# Skip hidden directories
dirnames[:] = [d for d in dirnames if not d.startswith(".")]
for filename in filenames:
# Check if we've reached the max count (skip if max_count == -1)
if max_count > 0 and count >= max_count:
break
if filename.endswith(".emlx"):
total_files += 1
filepath = os.path.join(dirpath, filename)
try:
# Read the .emlx file
with open(filepath, encoding="utf-8", errors="ignore") as f:
content = f.read()
# .emlx files have a length prefix followed by the email content
# The first line contains the length, followed by the email
lines = content.split("\n", 1)
if len(lines) >= 2:
email_content = lines[1]
# Parse the email using Python's email module
try:
msg = email.message_from_string(email_content)
# Extract email metadata
subject = msg.get("Subject", "No Subject")
from_addr = msg.get("From", "Unknown")
to_addr = msg.get("To", "Unknown")
date = msg.get("Date", "Unknown")
# Extract email body
body = ""
if msg.is_multipart():
for part in msg.walk():
if (
part.get_content_type() == "text/plain"
or part.get_content_type() == "text/html"
):
if (
part.get_content_type() == "text/html"
and not self.include_html
):
continue
try:
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
except Exception as e:
print(f"Error decoding payload: {e}")
continue
else:
try:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode("utf-8", errors="ignore")
except Exception as e:
print(f"Error decoding single part payload: {e}")
body = ""
# Only create document if we have some content
if body.strip() or subject != "No Subject":
# Create document content with metadata embedded in text
doc_content = f"""
[File]: {filename}
[From]: {from_addr}
[To]: {to_addr}
[Subject]: {subject}
[Date]: {date}
[EMAIL BODY Start]:
{body}
"""
# No separate metadata - everything is in the text
doc = Document(text=doc_content, metadata={})
docs.append(doc)
count += 1
successful_files += 1
# Print first few successful files for debugging
if successful_files <= 3:
print(
f"Successfully loaded: {filename} - Subject: {subject[:50]}..."
)
except Exception as e:
failed_files += 1
if failed_files <= 5: # Only print first few errors
print(f"Error parsing email from {filepath}: {e}")
continue
except Exception as e:
failed_files += 1
if failed_files <= 5: # Only print first few errors
print(f"Error reading file {filepath}: {e}")
continue
print("Processing summary:")
print(f" Total .emlx files found: {total_files}")
print(f" Successfully loaded: {successful_files}")
print(f" Failed to load: {failed_files}")
print(f" Final documents: {len(docs)}")
return docs

186
apps/email_data/email.py Normal file
View File

@@ -0,0 +1,186 @@
"""
Mbox parser.
Contains simple parser for mbox files.
"""
import logging
from pathlib import Path
from typing import Any
from fsspec import AbstractFileSystem
from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document
logger = logging.getLogger(__name__)
class MboxReader(BaseReader):
"""
Mbox parser.
Extract messages from mailbox files.
Returns string including date, subject, sender, receiver and
content for each message.
"""
DEFAULT_MESSAGE_FORMAT: str = (
"Date: {_date}\nFrom: {_from}\nTo: {_to}\nSubject: {_subject}\nContent: {_content}"
)
def __init__(
self,
*args: Any,
max_count: int = 0,
message_format: str = DEFAULT_MESSAGE_FORMAT,
**kwargs: Any,
) -> None:
"""Init params."""
try:
from bs4 import BeautifulSoup # noqa
except ImportError:
raise ImportError("`beautifulsoup4` package not found: `pip install beautifulsoup4`")
super().__init__(*args, **kwargs)
self.max_count = max_count
self.message_format = message_format
def load_data(
self,
file: Path,
extra_info: dict | None = None,
fs: AbstractFileSystem | None = None,
) -> list[Document]:
"""Parse file into string."""
# Import required libraries
import mailbox
from email.parser import BytesParser
from email.policy import default
from bs4 import BeautifulSoup
if fs:
logger.warning(
"fs was specified but MboxReader doesn't support loading "
"from fsspec filesystems. Will load from local filesystem instead."
)
i = 0
results: list[str] = []
# Load file using mailbox
bytes_parser = BytesParser(policy=default).parse
mbox = mailbox.mbox(file, factory=bytes_parser) # type: ignore
# Iterate through all messages
for _, _msg in enumerate(mbox):
try:
msg: mailbox.mboxMessage = _msg
# Parse multipart messages
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
cdispo = str(part.get("Content-Disposition"))
if "attachment" in cdispo:
print(f"Attachment found: {part.get_filename()}")
if ctype == "text/plain" and "attachment" not in cdispo:
content = part.get_payload(decode=True) # decode
break
# Get plain message payload for non-multipart messages
else:
content = msg.get_payload(decode=True)
# Parse message HTML content and remove unneeded whitespace
soup = BeautifulSoup(content)
stripped_content = " ".join(soup.get_text().split())
# Format message to include date, sender, receiver and subject
msg_string = self.message_format.format(
_date=msg["date"],
_from=msg["from"],
_to=msg["to"],
_subject=msg["subject"],
_content=stripped_content,
)
# Add message string to results
results.append(msg_string)
except Exception as e:
logger.warning(f"Failed to parse message:\n{_msg}\n with exception {e}")
# Increment counter and return if max count is met
i += 1
if self.max_count > 0 and i >= self.max_count:
break
return [Document(text=result, metadata=extra_info or {}) for result in results]
class EmlxMboxReader(MboxReader):
"""
EmlxMboxReader - Modified MboxReader that handles directories of .emlx files.
Extends MboxReader to work with Apple Mail's .emlx format by:
1. Reading .emlx files from a directory
2. Converting them to mbox format in memory
3. Using the parent MboxReader's parsing logic
"""
def load_data(
self,
directory: Path,
extra_info: dict | None = None,
fs: AbstractFileSystem | None = None,
) -> list[Document]:
"""Parse .emlx files from directory into strings using MboxReader logic."""
import os
import tempfile
if fs:
logger.warning(
"fs was specified but EmlxMboxReader doesn't support loading "
"from fsspec filesystems. Will load from local filesystem instead."
)
# Find all .emlx files in the directory
emlx_files = list(directory.glob("*.emlx"))
logger.info(f"Found {len(emlx_files)} .emlx files in {directory}")
if not emlx_files:
logger.warning(f"No .emlx files found in {directory}")
return []
# Create a temporary mbox file
with tempfile.NamedTemporaryFile(mode="w", suffix=".mbox", delete=False) as temp_mbox:
temp_mbox_path = temp_mbox.name
# Convert .emlx files to mbox format
for emlx_file in emlx_files:
try:
# Read the .emlx file
with open(emlx_file, encoding="utf-8", errors="ignore") as f:
content = f.read()
# .emlx format: first line is length, rest is email content
lines = content.split("\n", 1)
if len(lines) >= 2:
email_content = lines[1] # Skip the length line
# Write to mbox format (each message starts with "From " and ends with blank line)
temp_mbox.write(f"From {emlx_file.name} {email_content}\n\n")
except Exception as e:
logger.warning(f"Failed to process {emlx_file}: {e}")
continue
# Close the temporary file so MboxReader can read it
temp_mbox.close()
try:
# Use the parent MboxReader's logic to parse the mbox file
return super().load_data(Path(temp_mbox_path), extra_info, fs)
finally:
# Clean up temporary file
try:
os.unlink(temp_mbox_path)
except OSError:
pass

156
apps/email_rag.py Normal file
View File

@@ -0,0 +1,156 @@
"""
Email RAG example using the unified interface.
Supports Apple Mail on macOS.
"""
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks
from .email_data.LEANN_email_reader import EmlxReader
class EmailRAG(BaseRAGExample):
"""RAG example for Apple Mail processing."""
def __init__(self):
# Set default values BEFORE calling super().__init__
self.max_items_default = -1 # Process all emails by default
self.embedding_model_default = (
"sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model
)
super().__init__(
name="Email",
description="Process and query Apple Mail emails with LEANN",
default_index_name="mail_index",
)
def _add_specific_arguments(self, parser):
"""Add email-specific arguments."""
email_group = parser.add_argument_group("Email Parameters")
email_group.add_argument(
"--mail-path",
type=str,
default=None,
help="Path to Apple Mail directory (auto-detected if not specified)",
)
email_group.add_argument(
"--include-html", action="store_true", help="Include HTML content in email processing"
)
email_group.add_argument(
"--chunk-size", type=int, default=256, help="Text chunk size (default: 256)"
)
email_group.add_argument(
"--chunk-overlap", type=int, default=25, help="Text chunk overlap (default: 25)"
)
def _find_mail_directories(self) -> list[Path]:
"""Auto-detect all Apple Mail directories."""
mail_base = Path.home() / "Library" / "Mail"
if not mail_base.exists():
return []
# Find all Messages directories
messages_dirs = []
for item in mail_base.rglob("Messages"):
if item.is_dir():
messages_dirs.append(item)
return messages_dirs
async def load_data(self, args) -> list[str]:
"""Load emails and convert to text chunks."""
# Determine mail directories
if args.mail_path:
messages_dirs = [Path(args.mail_path)]
else:
print("Auto-detecting Apple Mail directories...")
messages_dirs = self._find_mail_directories()
if not messages_dirs:
print("No Apple Mail directories found!")
print("Please specify --mail-path manually")
return []
print(f"Found {len(messages_dirs)} mail directories")
# Create reader
reader = EmlxReader(include_html=args.include_html)
# Process each directory
all_documents = []
total_processed = 0
for i, messages_dir in enumerate(messages_dirs):
print(f"\nProcessing directory {i + 1}/{len(messages_dirs)}: {messages_dir}")
try:
# Count emlx files
emlx_files = list(messages_dir.glob("*.emlx"))
print(f"Found {len(emlx_files)} email files")
# Apply max_items limit per directory
max_per_dir = -1 # Default to process all
if args.max_items > 0:
remaining = args.max_items - total_processed
if remaining <= 0:
break
max_per_dir = remaining
# If args.max_items == -1, max_per_dir stays -1 (process all)
# Load emails - fix the parameter passing
documents = reader.load_data(
input_dir=str(messages_dir),
max_count=max_per_dir,
)
if documents:
all_documents.extend(documents)
total_processed += len(documents)
print(f"Processed {len(documents)} emails from this directory")
except Exception as e:
print(f"Error processing {messages_dir}: {e}")
continue
if not all_documents:
print("No emails found to process!")
return []
print(f"\nTotal emails processed: {len(all_documents)}")
print("now starting to split into text chunks ... take some time")
# Convert to text chunks
# Email reader uses chunk_overlap=25 as in original
all_texts = create_text_chunks(
all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
)
return all_texts
if __name__ == "__main__":
import asyncio
# Check platform
if sys.platform != "darwin":
print("\n⚠️ Warning: This example is designed for macOS (Apple Mail)")
print(" Windows/Linux support coming soon!\n")
# Example queries for email RAG
print("\n📧 Email RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'What did my boss say about deadlines?'")
print("- 'Find emails about travel expenses'")
print("- 'Show me emails from last month about the project'")
print("- 'What food did I order from DoorDash?'")
print("\nNote: You may need to grant Full Disk Access to your terminal\n")
rag = EmailRAG()
asyncio.run(rag.run())

View File

@@ -0,0 +1,3 @@
from .history import ChromeHistoryReader
__all__ = ["ChromeHistoryReader"]

View File

@@ -0,0 +1,186 @@
import os
import sqlite3
from pathlib import Path
from typing import Any
from llama_index.core import Document
from llama_index.core.readers.base import BaseReader
class ChromeHistoryReader(BaseReader):
"""
Chrome browser history reader that extracts browsing data from SQLite database.
Reads Chrome history from the default Chrome profile location and creates documents
with embedded metadata similar to the email reader structure.
"""
def __init__(self) -> None:
"""Initialize."""
pass
def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]:
"""
Load Chrome history data from the default Chrome profile location.
Args:
input_dir: Not used for Chrome history (kept for compatibility)
**load_kwargs:
max_count (int): Maximum amount of history entries to read.
chrome_profile_path (str): Custom path to Chrome profile directory.
"""
docs: list[Document] = []
max_count = load_kwargs.get("max_count", 1000)
chrome_profile_path = load_kwargs.get("chrome_profile_path", None)
# Default Chrome profile path on macOS
if chrome_profile_path is None:
chrome_profile_path = os.path.expanduser(
"~/Library/Application Support/Google/Chrome/Default"
)
history_db_path = os.path.join(chrome_profile_path, "History")
if not os.path.exists(history_db_path):
print(f"Chrome history database not found at: {history_db_path}")
return docs
try:
# Connect to the Chrome history database
print(f"Connecting to database: {history_db_path}")
conn = sqlite3.connect(history_db_path)
cursor = conn.cursor()
# Query to get browsing history with metadata (removed created_time column)
query = """
SELECT
datetime(last_visit_time/1000000-11644473600,'unixepoch','localtime') as last_visit,
url,
title,
visit_count,
typed_count,
hidden
FROM urls
ORDER BY last_visit_time DESC
"""
print(f"Executing query on database: {history_db_path}")
cursor.execute(query)
rows = cursor.fetchall()
print(f"Query returned {len(rows)} rows")
count = 0
for row in rows:
if count >= max_count and max_count > 0:
break
last_visit, url, title, visit_count, typed_count, hidden = row
# Create document content with metadata embedded in text
doc_content = f"""
[Title]: {title}
[URL of the page]: {url}
[Last visited time]: {last_visit}
[Visit times]: {visit_count}
[Typed times]: {typed_count}
"""
# Create document with embedded metadata
doc = Document(text=doc_content, metadata={"title": title[0:150]})
# if len(title) > 150:
# print(f"Title is too long: {title}")
docs.append(doc)
count += 1
conn.close()
print(f"Loaded {len(docs)} Chrome history documents")
except Exception as e:
print(f"Error reading Chrome history: {e}")
# add you may need to close your browser to make the database file available
# also highlight in red
print(
"\033[91mYou may need to close your browser to make the database file available\033[0m"
)
return docs
return docs
@staticmethod
def find_chrome_profiles() -> list[Path]:
"""
Find all Chrome profile directories.
Returns:
List of Path objects pointing to Chrome profile directories
"""
chrome_base_path = Path(os.path.expanduser("~/Library/Application Support/Google/Chrome"))
profile_dirs = []
if not chrome_base_path.exists():
print(f"Chrome directory not found at: {chrome_base_path}")
return profile_dirs
# Find all profile directories
for profile_dir in chrome_base_path.iterdir():
if profile_dir.is_dir() and profile_dir.name != "System Profile":
history_path = profile_dir / "History"
if history_path.exists():
profile_dirs.append(profile_dir)
print(f"Found Chrome profile: {profile_dir}")
print(f"Found {len(profile_dirs)} Chrome profiles")
return profile_dirs
@staticmethod
def export_history_to_file(
output_file: str = "chrome_history_export.txt", max_count: int = 1000
):
"""
Export Chrome history to a text file using the same SQL query format.
Args:
output_file: Path to the output file
max_count: Maximum number of entries to export
"""
chrome_profile_path = os.path.expanduser(
"~/Library/Application Support/Google/Chrome/Default"
)
history_db_path = os.path.join(chrome_profile_path, "History")
if not os.path.exists(history_db_path):
print(f"Chrome history database not found at: {history_db_path}")
return
try:
conn = sqlite3.connect(history_db_path)
cursor = conn.cursor()
query = """
SELECT
datetime(last_visit_time/1000000-11644473600,'unixepoch','localtime') as last_visit,
url,
title,
visit_count,
typed_count,
hidden
FROM urls
ORDER BY last_visit_time DESC
LIMIT ?
"""
cursor.execute(query, (max_count,))
rows = cursor.fetchall()
with open(output_file, "w", encoding="utf-8") as f:
for row in rows:
last_visit, url, title, visit_count, typed_count, hidden = row
f.write(
f"{last_visit}\t{url}\t{title}\t{visit_count}\t{typed_count}\t{hidden}\n"
)
conn.close()
print(f"Exported {len(rows)} history entries to {output_file}")
except Exception as e:
print(f"Error exporting Chrome history: {e}")

View File

@@ -0,0 +1,774 @@
import json
import os
import re
import subprocess
import time
from datetime import datetime
from pathlib import Path
from typing import Any
from llama_index.core import Document
from llama_index.core.readers.base import BaseReader
class WeChatHistoryReader(BaseReader):
"""
WeChat chat history reader that extracts chat data from exported JSON files.
Reads WeChat chat history from exported JSON files (from wechat-exporter tool)
and creates documents with embedded metadata similar to the Chrome history reader structure.
Also includes utilities for automatic WeChat chat history export.
"""
def __init__(self) -> None:
"""Initialize."""
self.packages_dir = Path(__file__).parent.parent.parent / "packages"
self.wechat_exporter_dir = self.packages_dir / "wechat-exporter"
self.wechat_decipher_dir = self.packages_dir / "wechat-decipher-macos"
def check_wechat_running(self) -> bool:
"""Check if WeChat is currently running."""
try:
result = subprocess.run(["pgrep", "-f", "WeChat"], capture_output=True, text=True)
return result.returncode == 0
except Exception:
return False
def install_wechattweak(self) -> bool:
"""Install WeChatTweak CLI tool."""
try:
# Create wechat-exporter directory if it doesn't exist
self.wechat_exporter_dir.mkdir(parents=True, exist_ok=True)
wechattweak_path = self.wechat_exporter_dir / "wechattweak-cli"
if not wechattweak_path.exists():
print("Downloading WeChatTweak CLI...")
subprocess.run(
[
"curl",
"-L",
"-o",
str(wechattweak_path),
"https://github.com/JettChenT/WeChatTweak-CLI/releases/latest/download/wechattweak-cli",
],
check=True,
)
# Make executable
wechattweak_path.chmod(0o755)
# Install WeChatTweak
print("Installing WeChatTweak...")
subprocess.run(["sudo", str(wechattweak_path), "install"], check=True)
return True
except Exception as e:
print(f"Error installing WeChatTweak: {e}")
return False
def restart_wechat(self):
"""Restart WeChat to apply WeChatTweak."""
try:
print("Restarting WeChat...")
subprocess.run(["pkill", "-f", "WeChat"], check=False)
time.sleep(2)
subprocess.run(["open", "-a", "WeChat"], check=True)
time.sleep(5) # Wait for WeChat to start
except Exception as e:
print(f"Error restarting WeChat: {e}")
def check_api_available(self) -> bool:
"""Check if WeChatTweak API is available."""
try:
result = subprocess.run(
["curl", "-s", "http://localhost:48065/wechat/allcontacts"],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0 and result.stdout.strip()
except Exception:
return False
def _extract_readable_text(self, content: str) -> str:
"""
Extract readable text from message content, removing XML and system messages.
Args:
content: The raw message content (can be string or dict)
Returns:
Cleaned, readable text
"""
if not content:
return ""
# Handle dictionary content (like quoted messages)
if isinstance(content, dict):
# Extract text from dictionary structure
text_parts = []
if "title" in content:
text_parts.append(str(content["title"]))
if "quoted" in content:
text_parts.append(str(content["quoted"]))
if "content" in content:
text_parts.append(str(content["content"]))
if "text" in content:
text_parts.append(str(content["text"]))
if text_parts:
return " | ".join(text_parts)
else:
# If we can't extract meaningful text from dict, return empty
return ""
# Handle string content
if not isinstance(content, str):
return ""
# Remove common prefixes like "wxid_xxx:\n"
clean_content = re.sub(r"^wxid_[^:]+:\s*", "", content)
clean_content = re.sub(r"^[^:]+:\s*", "", clean_content)
# If it's just XML or system message, return empty
if clean_content.strip().startswith("<") or "recalled a message" in clean_content:
return ""
return clean_content.strip()
def _is_text_message(self, content: str) -> bool:
"""
Check if a message contains readable text content.
Args:
content: The message content (can be string or dict)
Returns:
True if the message contains readable text, False otherwise
"""
if not content:
return False
# Handle dictionary content
if isinstance(content, dict):
# Check if dict has any readable text fields
text_fields = ["title", "quoted", "content", "text"]
for field in text_fields:
if content.get(field):
return True
return False
# Handle string content
if not isinstance(content, str):
return False
# Skip image messages (contain XML with img tags)
if "<img" in content and "cdnurl" in content:
return False
# Skip emoji messages (contain emoji XML tags)
if "<emoji" in content and "productid" in content:
return False
# Skip voice messages
if "<voice" in content:
return False
# Skip video messages
if "<video" in content:
return False
# Skip file messages
if "<appmsg" in content and "appid" in content:
return False
# Skip system messages (like "recalled a message")
if "recalled a message" in content:
return False
# Check if there's actual readable text (not just XML or system messages)
# Remove common prefixes like "wxid_xxx:\n" and check for actual content
clean_content = re.sub(r"^wxid_[^:]+:\s*", "", content)
clean_content = re.sub(r"^[^:]+:\s*", "", clean_content)
# If after cleaning we have meaningful text, consider it readable
if len(clean_content.strip()) > 0 and not clean_content.strip().startswith("<"):
return True
return False
def _concatenate_messages(
self,
messages: list[dict],
max_length: int = 128,
time_window_minutes: int = 30,
overlap_messages: int = 0,
) -> list[dict]:
"""
Concatenate messages based on length and time rules.
Args:
messages: List of message dictionaries
max_length: Maximum length for concatenated message groups. Use -1 to disable length constraint.
time_window_minutes: Time window in minutes to group messages together. Use -1 to disable time constraint.
overlap_messages: Number of messages to overlap between consecutive groups
Returns:
List of concatenated message groups
"""
if not messages:
return []
concatenated_groups = []
current_group = []
current_length = 0
last_timestamp = None
for message in messages:
# Extract message info
content = message.get("content", "")
message_text = message.get("message", "")
create_time = message.get("createTime", 0)
message.get("fromUser", "")
message.get("toUser", "")
message.get("isSentFromSelf", False)
# Extract readable text
readable_text = self._extract_readable_text(content)
if not readable_text:
readable_text = message_text
# Skip empty messages
if not readable_text.strip():
continue
# Check time window constraint (only if time_window_minutes != -1)
if time_window_minutes != -1 and last_timestamp is not None and create_time > 0:
time_diff_minutes = (create_time - last_timestamp) / 60
if time_diff_minutes > time_window_minutes:
# Time gap too large, start new group
if current_group:
concatenated_groups.append(
{
"messages": current_group,
"total_length": current_length,
"start_time": current_group[0].get("createTime", 0),
"end_time": current_group[-1].get("createTime", 0),
}
)
# Keep last few messages for overlap
if overlap_messages > 0 and len(current_group) > overlap_messages:
current_group = current_group[-overlap_messages:]
current_length = sum(
len(
self._extract_readable_text(msg.get("content", ""))
or msg.get("message", "")
)
for msg in current_group
)
else:
current_group = []
current_length = 0
# Check length constraint (only if max_length != -1)
message_length = len(readable_text)
if max_length != -1 and current_length + message_length > max_length and current_group:
# Current group would exceed max length, save it and start new
concatenated_groups.append(
{
"messages": current_group,
"total_length": current_length,
"start_time": current_group[0].get("createTime", 0),
"end_time": current_group[-1].get("createTime", 0),
}
)
# Keep last few messages for overlap
if overlap_messages > 0 and len(current_group) > overlap_messages:
current_group = current_group[-overlap_messages:]
current_length = sum(
len(
self._extract_readable_text(msg.get("content", ""))
or msg.get("message", "")
)
for msg in current_group
)
else:
current_group = []
current_length = 0
# Add message to current group
current_group.append(message)
current_length += message_length
last_timestamp = create_time
# Add the last group if it exists
if current_group:
concatenated_groups.append(
{
"messages": current_group,
"total_length": current_length,
"start_time": current_group[0].get("createTime", 0),
"end_time": current_group[-1].get("createTime", 0),
}
)
return concatenated_groups
def _create_concatenated_content(self, message_group: dict, contact_name: str) -> str:
"""
Create concatenated content from a group of messages.
Args:
message_group: Dictionary containing messages and metadata
contact_name: Name of the contact
Returns:
Formatted concatenated content
"""
messages = message_group["messages"]
start_time = message_group["start_time"]
end_time = message_group["end_time"]
# Format timestamps
if start_time:
try:
start_timestamp = datetime.fromtimestamp(start_time)
start_time_str = start_timestamp.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
start_time_str = str(start_time)
else:
start_time_str = "Unknown"
if end_time:
try:
end_timestamp = datetime.fromtimestamp(end_time)
end_time_str = end_timestamp.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
end_time_str = str(end_time)
else:
end_time_str = "Unknown"
# Build concatenated message content
message_parts = []
for message in messages:
content = message.get("content", "")
message_text = message.get("message", "")
create_time = message.get("createTime", 0)
is_sent_from_self = message.get("isSentFromSelf", False)
# Extract readable text
readable_text = self._extract_readable_text(content)
if not readable_text:
readable_text = message_text
# Format individual message
if create_time:
try:
timestamp = datetime.fromtimestamp(create_time)
# change to YYYY-MM-DD HH:MM:SS
time_str = timestamp.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
time_str = str(create_time)
else:
time_str = "Unknown"
sender = "[Me]" if is_sent_from_self else "[Contact]"
message_parts.append(f"({time_str}) {sender}: {readable_text}")
concatenated_text = "\n".join(message_parts)
# Create final document content
doc_content = f"""
Contact: {contact_name}
Time Range: {start_time_str} - {end_time_str}
Messages ({len(messages)} messages, {message_group["total_length"]} chars):
{concatenated_text}
"""
# TODO @yichuan give better format and rich info here!
doc_content = f"""
{concatenated_text}
"""
return doc_content, contact_name
def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]:
"""
Load WeChat chat history data from exported JSON files.
Args:
input_dir: Directory containing exported WeChat JSON files
**load_kwargs:
max_count (int): Maximum amount of chat entries to read.
wechat_export_dir (str): Custom path to WeChat export directory.
include_non_text (bool): Whether to include non-text messages (images, emojis, etc.)
concatenate_messages (bool): Whether to concatenate messages based on length rules.
max_length (int): Maximum length for concatenated message groups (default: 1000).
time_window_minutes (int): Time window in minutes to group messages together (default: 30).
overlap_messages (int): Number of messages to overlap between consecutive groups (default: 2).
"""
docs: list[Document] = []
max_count = load_kwargs.get("max_count", 1000)
wechat_export_dir = load_kwargs.get("wechat_export_dir", None)
include_non_text = load_kwargs.get("include_non_text", False)
concatenate_messages = load_kwargs.get("concatenate_messages", False)
max_length = load_kwargs.get("max_length", 1000)
time_window_minutes = load_kwargs.get("time_window_minutes", 30)
# Default WeChat export path
if wechat_export_dir is None:
wechat_export_dir = "./wechat_export_test"
if not os.path.exists(wechat_export_dir):
print(f"WeChat export directory not found at: {wechat_export_dir}")
return docs
try:
# Find all JSON files in the export directory
json_files = list(Path(wechat_export_dir).glob("*.json"))
print(f"Found {len(json_files)} WeChat chat history files")
count = 0
for json_file in json_files:
if count >= max_count and max_count > 0:
break
try:
with open(json_file, encoding="utf-8") as f:
chat_data = json.load(f)
# Extract contact name from filename
contact_name = json_file.stem
if concatenate_messages:
# Filter messages to only include readable text messages
readable_messages = []
for message in chat_data:
try:
content = message.get("content", "")
if not include_non_text and not self._is_text_message(content):
continue
readable_text = self._extract_readable_text(content)
if not readable_text and not include_non_text:
continue
readable_messages.append(message)
except Exception as e:
print(f"Error processing message in {json_file}: {e}")
continue
# Concatenate messages based on rules
message_groups = self._concatenate_messages(
readable_messages,
max_length=max_length,
time_window_minutes=time_window_minutes,
overlap_messages=0, # No overlap between groups
)
# Create documents from concatenated groups
for message_group in message_groups:
if count >= max_count and max_count > 0:
break
doc_content, contact_name = self._create_concatenated_content(
message_group, contact_name
)
doc = Document(
text=doc_content,
metadata={"contact_name": contact_name},
)
docs.append(doc)
count += 1
print(
f"Created {len(message_groups)} concatenated message groups for {contact_name}"
)
else:
# Original single-message processing
for message in chat_data:
if count >= max_count and max_count > 0:
break
# Extract message information
message.get("fromUser", "")
message.get("toUser", "")
content = message.get("content", "")
message_text = message.get("message", "")
create_time = message.get("createTime", 0)
is_sent_from_self = message.get("isSentFromSelf", False)
# Handle content that might be dict or string
try:
# Check if this is a readable text message
if not include_non_text and not self._is_text_message(content):
continue
# Extract readable text
readable_text = self._extract_readable_text(content)
if not readable_text and not include_non_text:
continue
except Exception as e:
# Skip messages that cause processing errors
print(f"Error processing message in {json_file}: {e}")
continue
# Convert timestamp to readable format
if create_time:
try:
timestamp = datetime.fromtimestamp(create_time)
time_str = timestamp.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
time_str = str(create_time)
else:
time_str = "Unknown"
# Create document content with metadata header and contact info
doc_content = f"""
Contact: {contact_name}
Is sent from self: {is_sent_from_self}
Time: {time_str}
Message: {readable_text if readable_text else message_text}
"""
# Create document with embedded metadata
doc = Document(
text=doc_content, metadata={"contact_name": contact_name}
)
docs.append(doc)
count += 1
except Exception as e:
print(f"Error reading {json_file}: {e}")
continue
print(f"Loaded {len(docs)} WeChat chat documents")
except Exception as e:
print(f"Error reading WeChat history: {e}")
return docs
return docs
@staticmethod
def find_wechat_export_dirs() -> list[Path]:
"""
Find all WeChat export directories.
Returns:
List of Path objects pointing to WeChat export directories
"""
export_dirs = []
# Look for common export directory names
possible_dirs = [
Path("./wechat_export"),
Path("./wechat_export_direct"),
Path("./wechat_chat_history"),
Path("./chat_export"),
]
for export_dir in possible_dirs:
if export_dir.exists() and export_dir.is_dir():
json_files = list(export_dir.glob("*.json"))
if json_files:
export_dirs.append(export_dir)
print(
f"Found WeChat export directory: {export_dir} with {len(json_files)} files"
)
print(f"Found {len(export_dirs)} WeChat export directories")
return export_dirs
@staticmethod
def export_chat_to_file(
output_file: str = "wechat_chat_export.txt",
max_count: int = 1000,
export_dir: str | None = None,
include_non_text: bool = False,
):
"""
Export WeChat chat history to a text file.
Args:
output_file: Path to the output file
max_count: Maximum number of entries to export
export_dir: Directory containing WeChat JSON files
include_non_text: Whether to include non-text messages
"""
if export_dir is None:
export_dir = "./wechat_export_test"
if not os.path.exists(export_dir):
print(f"WeChat export directory not found at: {export_dir}")
return
try:
json_files = list(Path(export_dir).glob("*.json"))
with open(output_file, "w", encoding="utf-8") as f:
count = 0
for json_file in json_files:
if count >= max_count and max_count > 0:
break
try:
with open(json_file, encoding="utf-8") as json_f:
chat_data = json.load(json_f)
contact_name = json_file.stem
f.write(f"\n=== Chat with {contact_name} ===\n")
for message in chat_data:
if count >= max_count and max_count > 0:
break
from_user = message.get("fromUser", "")
content = message.get("content", "")
message_text = message.get("message", "")
create_time = message.get("createTime", 0)
# Skip non-text messages unless requested
if not include_non_text:
reader = WeChatHistoryReader()
if not reader._is_text_message(content):
continue
readable_text = reader._extract_readable_text(content)
if not readable_text:
continue
message_text = readable_text
if create_time:
try:
timestamp = datetime.fromtimestamp(create_time)
time_str = timestamp.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
time_str = str(create_time)
else:
time_str = "Unknown"
f.write(f"[{time_str}] {from_user}: {message_text}\n")
count += 1
except Exception as e:
print(f"Error processing {json_file}: {e}")
continue
print(f"Exported {count} chat entries to {output_file}")
except Exception as e:
print(f"Error exporting WeChat chat history: {e}")
def export_wechat_chat_history(self, export_dir: str = "./wechat_export_direct") -> Path | None:
"""
Export WeChat chat history using wechat-exporter tool.
Args:
export_dir: Directory to save exported chat history
Returns:
Path to export directory if successful, None otherwise
"""
try:
import subprocess
import sys
# Create export directory
export_path = Path(export_dir)
export_path.mkdir(exist_ok=True)
print(f"Exporting WeChat chat history to {export_path}...")
# Check if wechat-exporter directory exists
if not self.wechat_exporter_dir.exists():
print(f"wechat-exporter directory not found at: {self.wechat_exporter_dir}")
return None
# Install requirements if needed
requirements_file = self.wechat_exporter_dir / "requirements.txt"
if requirements_file.exists():
print("Installing wechat-exporter requirements...")
subprocess.run(["uv", "pip", "install", "-r", str(requirements_file)], check=True)
# Run the export command
print("Running wechat-exporter...")
result = subprocess.run(
[
sys.executable,
str(self.wechat_exporter_dir / "main.py"),
"export-all",
str(export_path),
],
capture_output=True,
text=True,
check=True,
)
print("Export command output:")
print(result.stdout)
if result.stderr:
print("Export errors:")
print(result.stderr)
# Check if export was successful
if export_path.exists() and any(export_path.glob("*.json")):
json_files = list(export_path.glob("*.json"))
print(
f"Successfully exported {len(json_files)} chat history files to {export_path}"
)
return export_path
else:
print("Export completed but no JSON files found")
return None
except subprocess.CalledProcessError as e:
print(f"Export command failed: {e}")
print(f"Command output: {e.stdout}")
print(f"Command errors: {e.stderr}")
return None
except Exception as e:
print(f"Export failed: {e}")
print("Please ensure WeChat is running and WeChatTweak is installed.")
return None
def find_or_export_wechat_data(self, export_dir: str = "./wechat_export_direct") -> list[Path]:
"""
Find existing WeChat exports or create new ones.
Args:
export_dir: Directory to save exported chat history if needed
Returns:
List of Path objects pointing to WeChat export directories
"""
export_dirs = []
# Look for existing exports in common locations
possible_export_dirs = [
Path("./wechat_database_export"),
Path("./wechat_export_test"),
Path("./wechat_export"),
Path("./wechat_export_direct"),
Path("./wechat_chat_history"),
Path("./chat_export"),
]
for export_dir_path in possible_export_dirs:
if export_dir_path.exists() and any(export_dir_path.glob("*.json")):
export_dirs.append(export_dir_path)
print(f"Found existing export: {export_dir_path}")
# If no existing exports, try to export automatically
if not export_dirs:
print("No existing WeChat exports found. Starting direct export...")
# Try to export using wechat-exporter
exported_path = self.export_wechat_chat_history(export_dir)
if exported_path:
export_dirs = [exported_path]
else:
print(
"Failed to export WeChat data. Please ensure WeChat is running and WeChatTweak is installed."
)
return export_dirs

189
apps/wechat_rag.py Normal file
View File

@@ -0,0 +1,189 @@
"""
WeChat History RAG example using the unified interface.
Supports WeChat chat history export and search.
"""
import subprocess
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample
from .history_data.wechat_history import WeChatHistoryReader
class WeChatRAG(BaseRAGExample):
"""RAG example for WeChat chat history."""
def __init__(self):
# Set default values BEFORE calling super().__init__
self.max_items_default = -1 # Match original default
self.embedding_model_default = (
"sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model
)
super().__init__(
name="WeChat History",
description="Process and query WeChat chat history with LEANN",
default_index_name="wechat_history_magic_test_11Debug_new",
)
def _add_specific_arguments(self, parser):
"""Add WeChat-specific arguments."""
wechat_group = parser.add_argument_group("WeChat Parameters")
wechat_group.add_argument(
"--export-dir",
type=str,
default="./wechat_export",
help="Directory to store WeChat exports (default: ./wechat_export)",
)
wechat_group.add_argument(
"--force-export",
action="store_true",
help="Force re-export of WeChat data even if exports exist",
)
wechat_group.add_argument(
"--chunk-size", type=int, default=192, help="Text chunk size (default: 192)"
)
wechat_group.add_argument(
"--chunk-overlap", type=int, default=64, help="Text chunk overlap (default: 64)"
)
def _export_wechat_data(self, export_dir: Path) -> bool:
"""Export WeChat data using wechattweak-cli."""
print("Exporting WeChat data...")
# Check if WeChat is running
try:
result = subprocess.run(["pgrep", "WeChat"], capture_output=True, text=True)
if result.returncode != 0:
print("WeChat is not running. Please start WeChat first.")
return False
except Exception:
pass # pgrep might not be available on all systems
# Create export directory
export_dir.mkdir(parents=True, exist_ok=True)
# Run export command
cmd = ["packages/wechat-exporter/wechattweak-cli", "export", str(export_dir)]
try:
print(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("WeChat data exported successfully!")
return True
else:
print(f"Export failed: {result.stderr}")
return False
except FileNotFoundError:
print("\nError: wechattweak-cli not found!")
print("Please install it first:")
print(" sudo packages/wechat-exporter/wechattweak-cli install")
return False
except Exception as e:
print(f"Export error: {e}")
return False
async def load_data(self, args) -> list[str]:
"""Load WeChat history and convert to text chunks."""
# Initialize WeChat reader with export capabilities
reader = WeChatHistoryReader()
# Find existing exports or create new ones using the centralized method
export_dirs = reader.find_or_export_wechat_data(args.export_dir)
if not export_dirs:
print("Failed to find or export WeChat data. Trying to find any existing exports...")
# Try to find any existing exports in common locations
export_dirs = reader.find_wechat_export_dirs()
if not export_dirs:
print("No WeChat data found. Please ensure WeChat exports exist.")
return []
# Load documents from all found export directories
all_documents = []
total_processed = 0
for i, export_dir in enumerate(export_dirs):
print(f"\nProcessing WeChat export {i + 1}/{len(export_dirs)}: {export_dir}")
try:
# Apply max_items limit per export
max_per_export = -1
if args.max_items > 0:
remaining = args.max_items - total_processed
if remaining <= 0:
break
max_per_export = remaining
documents = reader.load_data(
wechat_export_dir=str(export_dir),
max_count=max_per_export,
concatenate_messages=True, # Enable message concatenation for better context
)
if documents:
print(f"Loaded {len(documents)} chat documents from {export_dir}")
all_documents.extend(documents)
total_processed += len(documents)
else:
print(f"No documents loaded from {export_dir}")
except Exception as e:
print(f"Error processing {export_dir}: {e}")
continue
if not all_documents:
print("No documents loaded from any source. Exiting.")
return []
print(f"\nTotal loaded {len(all_documents)} chat documents from {len(export_dirs)} exports")
print("now starting to split into text chunks ... take some time")
# Convert to text chunks with contact information
all_texts = []
for doc in all_documents:
# Split the document into chunks
from llama_index.core.node_parser import SentenceSplitter
text_splitter = SentenceSplitter(
chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
)
nodes = text_splitter.get_nodes_from_documents([doc])
for node in nodes:
# Add contact information to each chunk
contact_name = doc.metadata.get("contact_name", "Unknown")
text = f"[Contact] means the message is from: {contact_name}\n" + node.get_content()
all_texts.append(text)
print(f"Created {len(all_texts)} text chunks from {len(all_documents)} documents")
return all_texts
if __name__ == "__main__":
import asyncio
# Check platform
if sys.platform != "darwin":
print("\n⚠️ Warning: WeChat export is only supported on macOS")
print(" You can still query existing exports on other platforms\n")
# Example queries for WeChat RAG
print("\n💬 WeChat History RAG Example")
print("=" * 50)
print("\nExample queries you can try:")
print("- 'Show me conversations about travel plans'")
print("- 'Find group chats about weekend activities'")
print("- '我想买魔术师约翰逊的球衣,给我一些对应聊天记录?'")
print("- 'What did we discuss about the project last month?'")
print("\nNote: WeChat must be running for export to work\n")
rag = WeChatRAG()
asyncio.run(rag.run())