diff --git a/README.md b/README.md
index 0e7904e..a4b81ce 100755
--- a/README.md
+++ b/README.md
@@ -20,7 +20,7 @@ LEANN is an innovative vector database that democratizes personal AI. Transform
LEANN achieves this through *graph-based selective recomputation* with *high-degree preserving pruning*, computing embeddings on-demand instead of storing them all. [Illustration Fig ā](#ļø-architecture--how-it-works) | [Paper ā](https://arxiv.org/abs/2506.08276)
-**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)**, **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy.
+**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy.
\* Claude Code only supports basic `grep`-style keyword search. **LEANN** is a drop-in **semantic search MCP service fully compatible with Claude Code**, unlocking intelligent retrieval without changing your workflow. š„ Check out [the easy setup ā](packages/leann-mcp/README.md)
@@ -176,7 +176,7 @@ response = chat.ask("How much storage does LEANN save?", top_k=1)
## RAG on Everything!
-LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, and more.
+LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and more.
@@ -542,6 +542,238 @@ Once the index is built, you can ask questions like:
+### š¤ ChatGPT Chat History: Your Personal AI Conversation Archive!
+
+Transform your ChatGPT conversations into a searchable knowledge base! Search through all your ChatGPT discussions about coding, research, brainstorming, and more.
+
+```bash
+python -m apps.chatgpt_rag --export-path chatgpt_export.html --query "How do I create a list in Python?"
+```
+
+**Unlock your AI conversation history.** Never lose track of valuable insights from your ChatGPT discussions again.
+
+
+š Click to expand: How to Export ChatGPT Data
+
+**Step-by-step export process:**
+
+1. **Sign in to ChatGPT**
+2. **Click your profile icon** in the top right corner
+3. **Navigate to Settings** ā **Data Controls**
+4. **Click "Export"** under Export Data
+5. **Confirm the export** request
+6. **Download the ZIP file** from the email link (expires in 24 hours)
+7. **Extract or use directly** with LEANN
+
+**Supported formats:**
+- `.html` files from ChatGPT exports
+- `.zip` archives from ChatGPT
+- Directories with multiple export files
+
+
+
+
+š Click to expand: ChatGPT-Specific Arguments
+
+#### Parameters
+```bash
+--export-path PATH # Path to ChatGPT export file (.html/.zip) or directory (default: ./chatgpt_export)
+--separate-messages # Process each message separately instead of concatenated conversations
+--chunk-size N # Text chunk size (default: 512)
+--chunk-overlap N # Overlap between chunks (default: 128)
+```
+
+#### Example Commands
+```bash
+# Basic usage with HTML export
+python -m apps.chatgpt_rag --export-path conversations.html
+
+# Process ZIP archive from ChatGPT
+python -m apps.chatgpt_rag --export-path chatgpt_export.zip
+
+# Search with specific query
+python -m apps.chatgpt_rag --export-path chatgpt_data.html --query "Python programming help"
+
+# Process individual messages for fine-grained search
+python -m apps.chatgpt_rag --separate-messages --export-path chatgpt_export.html
+
+# Process directory containing multiple exports
+python -m apps.chatgpt_rag --export-path ./chatgpt_exports/ --max-items 1000
+```
+
+
+
+
+š” Click to expand: Example queries you can try
+
+Once your ChatGPT conversations are indexed, you can search with queries like:
+- "What did I ask ChatGPT about Python programming?"
+- "Show me conversations about machine learning algorithms"
+- "Find discussions about web development frameworks"
+- "What coding advice did ChatGPT give me?"
+- "Search for conversations about debugging techniques"
+- "Find ChatGPT's recommendations for learning resources"
+
+
+
+### š¤ Claude Chat History: Your Personal AI Conversation Archive!
+
+Transform your Claude conversations into a searchable knowledge base! Search through all your Claude discussions about coding, research, brainstorming, and more.
+
+```bash
+python -m apps.claude_rag --export-path claude_export.json --query "What did I ask about Python dictionaries?"
+```
+
+**Unlock your AI conversation history.** Never lose track of valuable insights from your Claude discussions again.
+
+
+š Click to expand: How to Export Claude Data
+
+**Step-by-step export process:**
+
+1. **Open Claude** in your browser
+2. **Navigate to Settings** (look for gear icon or settings menu)
+3. **Find Export/Download** options in your account settings
+4. **Download conversation data** (usually in JSON format)
+5. **Place the file** in your project directory
+
+*Note: Claude export methods may vary depending on the interface you're using. Check Claude's help documentation for the most current export instructions.*
+
+**Supported formats:**
+- `.json` files (recommended)
+- `.zip` archives containing JSON data
+- Directories with multiple export files
+
+
+
+
+š Click to expand: Claude-Specific Arguments
+
+#### Parameters
+```bash
+--export-path PATH # Path to Claude export file (.json/.zip) or directory (default: ./claude_export)
+--separate-messages # Process each message separately instead of concatenated conversations
+--chunk-size N # Text chunk size (default: 512)
+--chunk-overlap N # Overlap between chunks (default: 128)
+```
+
+#### Example Commands
+```bash
+# Basic usage with JSON export
+python -m apps.claude_rag --export-path my_claude_conversations.json
+
+# Process ZIP archive from Claude
+python -m apps.claude_rag --export-path claude_export.zip
+
+# Search with specific query
+python -m apps.claude_rag --export-path claude_data.json --query "machine learning advice"
+
+# Process individual messages for fine-grained search
+python -m apps.claude_rag --separate-messages --export-path claude_export.json
+
+# Process directory containing multiple exports
+python -m apps.claude_rag --export-path ./claude_exports/ --max-items 1000
+```
+
+
+
+
+š” Click to expand: Example queries you can try
+
+Once your Claude conversations are indexed, you can search with queries like:
+- "What did I ask Claude about Python programming?"
+- "Show me conversations about machine learning algorithms"
+- "Find discussions about software architecture patterns"
+- "What debugging advice did Claude give me?"
+- "Search for conversations about data structures"
+- "Find Claude's recommendations for learning resources"
+
+
+
+### š¬ iMessage History: Your Personal Conversation Archive!
+
+Transform your iMessage conversations into a searchable knowledge base! Search through all your text messages, group chats, and conversations with friends, family, and colleagues.
+
+```bash
+python -m apps.imessage_rag --query "What did we discuss about the weekend plans?"
+```
+
+**Unlock your message history.** Never lose track of important conversations, shared links, or memorable moments from your iMessage history.
+
+
+š Click to expand: How to Access iMessage Data
+
+**iMessage data location:**
+
+iMessage conversations are stored in a SQLite database on your Mac at:
+```
+~/Library/Messages/chat.db
+```
+
+**Important setup requirements:**
+
+1. **Grant Full Disk Access** to your terminal or IDE:
+ - Open **System Preferences** ā **Security & Privacy** ā **Privacy**
+ - Select **Full Disk Access** from the left sidebar
+ - Click the **+** button and add your terminal app (Terminal, iTerm2) or IDE (VS Code, etc.)
+ - Restart your terminal/IDE after granting access
+
+2. **Alternative: Use a backup database**
+ - If you have Time Machine backups or manual copies of the database
+ - Use `--db-path` to specify a custom location
+
+**Supported formats:**
+- Direct access to `~/Library/Messages/chat.db` (default)
+- Custom database path with `--db-path`
+- Works with backup copies of the database
+
+
+
+
+š Click to expand: iMessage-Specific Arguments
+
+#### Parameters
+```bash
+--db-path PATH # Path to chat.db file (default: ~/Library/Messages/chat.db)
+--concatenate-conversations # Group messages by conversation (default: True)
+--no-concatenate-conversations # Process each message individually
+--chunk-size N # Text chunk size (default: 1000)
+--chunk-overlap N # Overlap between chunks (default: 200)
+```
+
+#### Example Commands
+```bash
+# Basic usage (requires Full Disk Access)
+python -m apps.imessage_rag
+
+# Search with specific query
+python -m apps.imessage_rag --query "family dinner plans"
+
+# Use custom database path
+python -m apps.imessage_rag --db-path /path/to/backup/chat.db
+
+# Process individual messages instead of conversations
+python -m apps.imessage_rag --no-concatenate-conversations
+
+# Limit processing for testing
+python -m apps.imessage_rag --max-items 100 --query "weekend"
+```
+
+
+
+
+š” Click to expand: Example queries you can try
+
+Once your iMessage conversations are indexed, you can search with queries like:
+- "What did we discuss about vacation plans?"
+- "Find messages about restaurant recommendations"
+- "Show me conversations with John about the project"
+- "Search for shared links about technology"
+- "Find group chat discussions about weekend events"
+- "What did mom say about the family gathering?"
+
+
+
### š Claude Code Integration: Transform Your Development Workflow!
diff --git a/apps/chatgpt_data/__init__.py b/apps/chatgpt_data/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/apps/chatgpt_data/chatgpt_reader.py b/apps/chatgpt_data/chatgpt_reader.py
new file mode 100644
index 0000000..c52ce22
--- /dev/null
+++ b/apps/chatgpt_data/chatgpt_reader.py
@@ -0,0 +1,413 @@
+"""
+ChatGPT export data reader.
+
+Reads and processes ChatGPT export data from chat.html files.
+"""
+
+import re
+from pathlib import Path
+from typing import Any
+from zipfile import ZipFile
+
+from bs4 import BeautifulSoup
+from llama_index.core import Document
+from llama_index.core.readers.base import BaseReader
+
+
+class ChatGPTReader(BaseReader):
+ """
+ ChatGPT export data reader.
+
+ Reads ChatGPT conversation data from exported chat.html files or zip archives.
+ Processes conversations into structured documents with metadata.
+ """
+
+ def __init__(self, concatenate_conversations: bool = True) -> None:
+ """
+ Initialize.
+
+ Args:
+ concatenate_conversations: Whether to concatenate messages within conversations for better context
+ """
+ try:
+ from bs4 import BeautifulSoup # noqa
+ except ImportError:
+ raise ImportError("`beautifulsoup4` package not found: `pip install beautifulsoup4`")
+
+ self.concatenate_conversations = concatenate_conversations
+
+ def _extract_html_from_zip(self, zip_path: Path) -> str | None:
+ """
+ Extract chat.html from ChatGPT export zip file.
+
+ Args:
+ zip_path: Path to the ChatGPT export zip file
+
+ Returns:
+ HTML content as string, or None if not found
+ """
+ try:
+ with ZipFile(zip_path, "r") as zip_file:
+ # Look for chat.html or conversations.html
+ html_files = [
+ f
+ for f in zip_file.namelist()
+ if f.endswith(".html") and ("chat" in f.lower() or "conversation" in f.lower())
+ ]
+
+ if not html_files:
+ print(f"No HTML chat file found in {zip_path}")
+ return None
+
+ # Use the first HTML file found
+ html_file = html_files[0]
+ print(f"Found HTML file: {html_file}")
+
+ with zip_file.open(html_file) as f:
+ return f.read().decode("utf-8", errors="ignore")
+
+ except Exception as e:
+ print(f"Error extracting HTML from zip {zip_path}: {e}")
+ return None
+
+ def _parse_chatgpt_html(self, html_content: str) -> list[dict]:
+ """
+ Parse ChatGPT HTML export to extract conversations.
+
+ Args:
+ html_content: HTML content from ChatGPT export
+
+ Returns:
+ List of conversation dictionaries
+ """
+ soup = BeautifulSoup(html_content, "html.parser")
+ conversations = []
+
+ # Try different possible structures for ChatGPT exports
+ # Structure 1: Look for conversation containers
+ conversation_containers = soup.find_all(
+ ["div", "section"], class_=re.compile(r"conversation|chat", re.I)
+ )
+
+ if not conversation_containers:
+ # Structure 2: Look for message containers directly
+ conversation_containers = [soup] # Use the entire document as one conversation
+
+ for container in conversation_containers:
+ conversation = self._extract_conversation_from_container(container)
+ if conversation and conversation.get("messages"):
+ conversations.append(conversation)
+
+ # If no structured conversations found, try to extract all text as one conversation
+ if not conversations:
+ all_text = soup.get_text(separator="\n", strip=True)
+ if all_text:
+ conversations.append(
+ {
+ "title": "ChatGPT Conversation",
+ "messages": [{"role": "mixed", "content": all_text, "timestamp": None}],
+ "timestamp": None,
+ }
+ )
+
+ return conversations
+
+ def _extract_conversation_from_container(self, container) -> dict | None:
+ """
+ Extract conversation data from a container element.
+
+ Args:
+ container: BeautifulSoup element containing conversation
+
+ Returns:
+ Dictionary with conversation data or None
+ """
+ messages = []
+
+ # Look for message elements with various possible structures
+ message_selectors = ['[class*="message"]', '[class*="chat"]', "[data-message]", "p", "div"]
+
+ for selector in message_selectors:
+ message_elements = container.select(selector)
+ if message_elements:
+ break
+ else:
+ message_elements = []
+
+ # If no structured messages found, treat the entire container as one message
+ if not message_elements:
+ text_content = container.get_text(separator="\n", strip=True)
+ if text_content:
+ messages.append({"role": "mixed", "content": text_content, "timestamp": None})
+ else:
+ for element in message_elements:
+ message = self._extract_message_from_element(element)
+ if message:
+ messages.append(message)
+
+ if not messages:
+ return None
+
+ # Try to extract conversation title
+ title_element = container.find(["h1", "h2", "h3", "title"])
+ title = title_element.get_text(strip=True) if title_element else "ChatGPT Conversation"
+
+ # Try to extract timestamp from various possible locations
+ timestamp = self._extract_timestamp_from_container(container)
+
+ return {"title": title, "messages": messages, "timestamp": timestamp}
+
+ def _extract_message_from_element(self, element) -> dict | None:
+ """
+ Extract message data from an element.
+
+ Args:
+ element: BeautifulSoup element containing message
+
+ Returns:
+ Dictionary with message data or None
+ """
+ text_content = element.get_text(separator=" ", strip=True)
+
+ # Skip empty or very short messages
+ if not text_content or len(text_content.strip()) < 3:
+ return None
+
+ # Try to determine role (user/assistant) from class names or content
+ role = "mixed" # Default role
+
+ class_names = " ".join(element.get("class", [])).lower()
+ if "user" in class_names or "human" in class_names:
+ role = "user"
+ elif "assistant" in class_names or "ai" in class_names or "gpt" in class_names:
+ role = "assistant"
+ elif text_content.lower().startswith(("you:", "user:", "me:")):
+ role = "user"
+ text_content = re.sub(r"^(you|user|me):\s*", "", text_content, flags=re.IGNORECASE)
+ elif text_content.lower().startswith(("chatgpt:", "assistant:", "ai:")):
+ role = "assistant"
+ text_content = re.sub(
+ r"^(chatgpt|assistant|ai):\s*", "", text_content, flags=re.IGNORECASE
+ )
+
+ # Try to extract timestamp
+ timestamp = self._extract_timestamp_from_element(element)
+
+ return {"role": role, "content": text_content, "timestamp": timestamp}
+
+ def _extract_timestamp_from_element(self, element) -> str | None:
+ """Extract timestamp from element."""
+ # Look for timestamp in various attributes and child elements
+ timestamp_attrs = ["data-timestamp", "timestamp", "datetime"]
+ for attr in timestamp_attrs:
+ if element.get(attr):
+ return element.get(attr)
+
+ # Look for time elements
+ time_element = element.find("time")
+ if time_element:
+ return time_element.get("datetime") or time_element.get_text(strip=True)
+
+ # Look for date-like text patterns
+ text = element.get_text()
+ date_patterns = [r"\d{4}-\d{2}-\d{2}", r"\d{1,2}/\d{1,2}/\d{4}", r"\w+ \d{1,2}, \d{4}"]
+
+ for pattern in date_patterns:
+ match = re.search(pattern, text)
+ if match:
+ return match.group()
+
+ return None
+
+ def _extract_timestamp_from_container(self, container) -> str | None:
+ """Extract timestamp from conversation container."""
+ return self._extract_timestamp_from_element(container)
+
+ def _create_concatenated_content(self, conversation: dict) -> str:
+ """
+ Create concatenated content from conversation messages.
+
+ Args:
+ conversation: Dictionary containing conversation data
+
+ Returns:
+ Formatted concatenated content
+ """
+ title = conversation.get("title", "ChatGPT Conversation")
+ messages = conversation.get("messages", [])
+ timestamp = conversation.get("timestamp", "Unknown")
+
+ # Build message content
+ message_parts = []
+ for message in messages:
+ role = message.get("role", "mixed")
+ content = message.get("content", "")
+ msg_timestamp = message.get("timestamp", "")
+
+ if role == "user":
+ prefix = "[You]"
+ elif role == "assistant":
+ prefix = "[ChatGPT]"
+ else:
+ prefix = "[Message]"
+
+ # Add timestamp if available
+ if msg_timestamp:
+ prefix += f" ({msg_timestamp})"
+
+ message_parts.append(f"{prefix}: {content}")
+
+ concatenated_text = "\n\n".join(message_parts)
+
+ # Create final document content
+ doc_content = f"""Conversation: {title}
+Date: {timestamp}
+Messages ({len(messages)} messages):
+
+{concatenated_text}
+"""
+ return doc_content
+
+ def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]:
+ """
+ Load ChatGPT export data.
+
+ Args:
+ input_dir: Directory containing ChatGPT export files or path to specific file
+ **load_kwargs:
+ max_count (int): Maximum number of conversations to process
+ chatgpt_export_path (str): Specific path to ChatGPT export file/directory
+ include_metadata (bool): Whether to include metadata in documents
+ """
+ docs: list[Document] = []
+ max_count = load_kwargs.get("max_count", -1)
+ chatgpt_export_path = load_kwargs.get("chatgpt_export_path", input_dir)
+ include_metadata = load_kwargs.get("include_metadata", True)
+
+ if not chatgpt_export_path:
+ print("No ChatGPT export path provided")
+ return docs
+
+ export_path = Path(chatgpt_export_path)
+
+ if not export_path.exists():
+ print(f"ChatGPT export path not found: {export_path}")
+ return docs
+
+ html_content = None
+
+ # Handle different input types
+ if export_path.is_file():
+ if export_path.suffix.lower() == ".zip":
+ # Extract HTML from zip file
+ html_content = self._extract_html_from_zip(export_path)
+ elif export_path.suffix.lower() == ".html":
+ # Read HTML file directly
+ try:
+ with open(export_path, encoding="utf-8", errors="ignore") as f:
+ html_content = f.read()
+ except Exception as e:
+ print(f"Error reading HTML file {export_path}: {e}")
+ return docs
+ else:
+ print(f"Unsupported file type: {export_path.suffix}")
+ return docs
+
+ elif export_path.is_dir():
+ # Look for HTML files in directory
+ html_files = list(export_path.glob("*.html"))
+ zip_files = list(export_path.glob("*.zip"))
+
+ if html_files:
+ # Use first HTML file found
+ html_file = html_files[0]
+ print(f"Found HTML file: {html_file}")
+ try:
+ with open(html_file, encoding="utf-8", errors="ignore") as f:
+ html_content = f.read()
+ except Exception as e:
+ print(f"Error reading HTML file {html_file}: {e}")
+ return docs
+
+ elif zip_files:
+ # Use first zip file found
+ zip_file = zip_files[0]
+ print(f"Found zip file: {zip_file}")
+ html_content = self._extract_html_from_zip(zip_file)
+
+ else:
+ print(f"No HTML or zip files found in {export_path}")
+ return docs
+
+ if not html_content:
+ print("No HTML content found to process")
+ return docs
+
+ # Parse conversations from HTML
+ print("Parsing ChatGPT conversations from HTML...")
+ conversations = self._parse_chatgpt_html(html_content)
+
+ if not conversations:
+ print("No conversations found in HTML content")
+ return docs
+
+ print(f"Found {len(conversations)} conversations")
+
+ # Process conversations into documents
+ count = 0
+ for conversation in conversations:
+ if max_count > 0 and count >= max_count:
+ break
+
+ if self.concatenate_conversations:
+ # Create one document per conversation with concatenated messages
+ doc_content = self._create_concatenated_content(conversation)
+
+ metadata = {}
+ if include_metadata:
+ metadata = {
+ "title": conversation.get("title", "ChatGPT Conversation"),
+ "timestamp": conversation.get("timestamp", "Unknown"),
+ "message_count": len(conversation.get("messages", [])),
+ "source": "ChatGPT Export",
+ }
+
+ doc = Document(text=doc_content, metadata=metadata)
+ docs.append(doc)
+ count += 1
+
+ else:
+ # Create separate documents for each message
+ for message in conversation.get("messages", []):
+ if max_count > 0 and count >= max_count:
+ break
+
+ role = message.get("role", "mixed")
+ content = message.get("content", "")
+ msg_timestamp = message.get("timestamp", "")
+
+ if not content.strip():
+ continue
+
+ # Create document content with context
+ doc_content = f"""Conversation: {conversation.get("title", "ChatGPT Conversation")}
+Role: {role}
+Timestamp: {msg_timestamp or conversation.get("timestamp", "Unknown")}
+Message: {content}
+"""
+
+ metadata = {}
+ if include_metadata:
+ metadata = {
+ "conversation_title": conversation.get("title", "ChatGPT Conversation"),
+ "role": role,
+ "timestamp": msg_timestamp or conversation.get("timestamp", "Unknown"),
+ "source": "ChatGPT Export",
+ }
+
+ doc = Document(text=doc_content, metadata=metadata)
+ docs.append(doc)
+ count += 1
+
+ print(f"Created {len(docs)} documents from ChatGPT export")
+ return docs
diff --git a/apps/chatgpt_rag.py b/apps/chatgpt_rag.py
new file mode 100644
index 0000000..3c92d04
--- /dev/null
+++ b/apps/chatgpt_rag.py
@@ -0,0 +1,186 @@
+"""
+ChatGPT RAG example using the unified interface.
+Supports ChatGPT export data from chat.html files.
+"""
+
+import sys
+from pathlib import Path
+
+# Add parent directory to path for imports
+sys.path.insert(0, str(Path(__file__).parent))
+
+from base_rag_example import BaseRAGExample
+from chunking import create_text_chunks
+
+from .chatgpt_data.chatgpt_reader import ChatGPTReader
+
+
+class ChatGPTRAG(BaseRAGExample):
+ """RAG example for ChatGPT conversation data."""
+
+ def __init__(self):
+ # Set default values BEFORE calling super().__init__
+ self.max_items_default = -1 # Process all conversations by default
+ self.embedding_model_default = (
+ "sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model
+ )
+
+ super().__init__(
+ name="ChatGPT",
+ description="Process and query ChatGPT conversation exports with LEANN",
+ default_index_name="chatgpt_conversations_index",
+ )
+
+ def _add_specific_arguments(self, parser):
+ """Add ChatGPT-specific arguments."""
+ chatgpt_group = parser.add_argument_group("ChatGPT Parameters")
+ chatgpt_group.add_argument(
+ "--export-path",
+ type=str,
+ default="./chatgpt_export",
+ help="Path to ChatGPT export file (.zip or .html) or directory containing exports (default: ./chatgpt_export)",
+ )
+ chatgpt_group.add_argument(
+ "--concatenate-conversations",
+ action="store_true",
+ default=True,
+ help="Concatenate messages within conversations for better context (default: True)",
+ )
+ chatgpt_group.add_argument(
+ "--separate-messages",
+ action="store_true",
+ help="Process each message as a separate document (overrides --concatenate-conversations)",
+ )
+ chatgpt_group.add_argument(
+ "--chunk-size", type=int, default=512, help="Text chunk size (default: 512)"
+ )
+ chatgpt_group.add_argument(
+ "--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)"
+ )
+
+ def _find_chatgpt_exports(self, export_path: Path) -> list[Path]:
+ """
+ Find ChatGPT export files in the given path.
+
+ Args:
+ export_path: Path to search for exports
+
+ Returns:
+ List of paths to ChatGPT export files
+ """
+ export_files = []
+
+ if export_path.is_file():
+ if export_path.suffix.lower() in [".zip", ".html"]:
+ export_files.append(export_path)
+ elif export_path.is_dir():
+ # Look for zip and html files
+ export_files.extend(export_path.glob("*.zip"))
+ export_files.extend(export_path.glob("*.html"))
+
+ return export_files
+
+ async def load_data(self, args) -> list[str]:
+ """Load ChatGPT export data and convert to text chunks."""
+ export_path = Path(args.export_path)
+
+ if not export_path.exists():
+ print(f"ChatGPT export path not found: {export_path}")
+ print(
+ "Please ensure you have exported your ChatGPT data and placed it in the correct location."
+ )
+ print("\nTo export your ChatGPT data:")
+ print("1. Sign in to ChatGPT")
+ print("2. Click on your profile icon ā Settings ā Data Controls")
+ print("3. Click 'Export' under Export Data")
+ print("4. Download the zip file from the email link")
+ print("5. Extract or place the file/directory at the specified path")
+ return []
+
+ # Find export files
+ export_files = self._find_chatgpt_exports(export_path)
+
+ if not export_files:
+ print(f"No ChatGPT export files (.zip or .html) found in: {export_path}")
+ return []
+
+ print(f"Found {len(export_files)} ChatGPT export files")
+
+ # Create reader with appropriate settings
+ concatenate = args.concatenate_conversations and not args.separate_messages
+ reader = ChatGPTReader(concatenate_conversations=concatenate)
+
+ # Process each export file
+ all_documents = []
+ total_processed = 0
+
+ for i, export_file in enumerate(export_files):
+ print(f"\nProcessing export file {i + 1}/{len(export_files)}: {export_file.name}")
+
+ try:
+ # Apply max_items limit per file
+ max_per_file = -1
+ if args.max_items > 0:
+ remaining = args.max_items - total_processed
+ if remaining <= 0:
+ break
+ max_per_file = remaining
+
+ # Load conversations
+ documents = reader.load_data(
+ chatgpt_export_path=str(export_file),
+ max_count=max_per_file,
+ include_metadata=True,
+ )
+
+ if documents:
+ all_documents.extend(documents)
+ total_processed += len(documents)
+ print(f"Processed {len(documents)} conversations from this file")
+ else:
+ print(f"No conversations loaded from {export_file}")
+
+ except Exception as e:
+ print(f"Error processing {export_file}: {e}")
+ continue
+
+ if not all_documents:
+ print("No conversations found to process!")
+ print("\nTroubleshooting:")
+ print("- Ensure the export file is a valid ChatGPT export")
+ print("- Check that the HTML file contains conversation data")
+ print("- Try extracting the zip file and pointing to the HTML file directly")
+ return []
+
+ print(f"\nTotal conversations processed: {len(all_documents)}")
+ print("Now starting to split into text chunks... this may take some time")
+
+ # Convert to text chunks
+ all_texts = create_text_chunks(
+ all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
+ )
+
+ print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations")
+ return all_texts
+
+
+if __name__ == "__main__":
+ import asyncio
+
+ # Example queries for ChatGPT RAG
+ print("\nš¤ ChatGPT RAG Example")
+ print("=" * 50)
+ print("\nExample queries you can try:")
+ print("- 'What did I ask about Python programming?'")
+ print("- 'Show me conversations about machine learning'")
+ print("- 'Find discussions about travel planning'")
+ print("- 'What advice did ChatGPT give me about career development?'")
+ print("- 'Search for conversations about cooking recipes'")
+ print("\nTo get started:")
+ print("1. Export your ChatGPT data from Settings ā Data Controls ā Export")
+ print("2. Place the downloaded zip file or extracted HTML in ./chatgpt_export/")
+ print("3. Run this script to build your personal ChatGPT knowledge base!")
+ print("\nOr run without --query for interactive mode\n")
+
+ rag = ChatGPTRAG()
+ asyncio.run(rag.run())
diff --git a/apps/claude_data/__init__.py b/apps/claude_data/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/apps/claude_data/claude_reader.py b/apps/claude_data/claude_reader.py
new file mode 100644
index 0000000..1af1097
--- /dev/null
+++ b/apps/claude_data/claude_reader.py
@@ -0,0 +1,420 @@
+"""
+Claude export data reader.
+
+Reads and processes Claude conversation data from exported JSON files.
+"""
+
+import json
+from pathlib import Path
+from typing import Any
+from zipfile import ZipFile
+
+from llama_index.core import Document
+from llama_index.core.readers.base import BaseReader
+
+
+class ClaudeReader(BaseReader):
+ """
+ Claude export data reader.
+
+ Reads Claude conversation data from exported JSON files or zip archives.
+ Processes conversations into structured documents with metadata.
+ """
+
+ def __init__(self, concatenate_conversations: bool = True) -> None:
+ """
+ Initialize.
+
+ Args:
+ concatenate_conversations: Whether to concatenate messages within conversations for better context
+ """
+ self.concatenate_conversations = concatenate_conversations
+
+ def _extract_json_from_zip(self, zip_path: Path) -> list[str]:
+ """
+ Extract JSON files from Claude export zip file.
+
+ Args:
+ zip_path: Path to the Claude export zip file
+
+ Returns:
+ List of JSON content strings, or empty list if not found
+ """
+ json_contents = []
+ try:
+ with ZipFile(zip_path, "r") as zip_file:
+ # Look for JSON files
+ json_files = [f for f in zip_file.namelist() if f.endswith(".json")]
+
+ if not json_files:
+ print(f"No JSON files found in {zip_path}")
+ return []
+
+ print(f"Found {len(json_files)} JSON files in archive")
+
+ for json_file in json_files:
+ with zip_file.open(json_file) as f:
+ content = f.read().decode("utf-8", errors="ignore")
+ json_contents.append(content)
+
+ except Exception as e:
+ print(f"Error extracting JSON from zip {zip_path}: {e}")
+
+ return json_contents
+
+ def _parse_claude_json(self, json_content: str) -> list[dict]:
+ """
+ Parse Claude JSON export to extract conversations.
+
+ Args:
+ json_content: JSON content from Claude export
+
+ Returns:
+ List of conversation dictionaries
+ """
+ try:
+ data = json.loads(json_content)
+ except json.JSONDecodeError as e:
+ print(f"Error parsing JSON: {e}")
+ return []
+
+ conversations = []
+
+ # Handle different possible JSON structures
+ if isinstance(data, list):
+ # If data is a list of conversations
+ for item in data:
+ conversation = self._extract_conversation_from_json(item)
+ if conversation:
+ conversations.append(conversation)
+ elif isinstance(data, dict):
+ # Check for common structures
+ if "conversations" in data:
+ # Structure: {"conversations": [...]}
+ for item in data["conversations"]:
+ conversation = self._extract_conversation_from_json(item)
+ if conversation:
+ conversations.append(conversation)
+ elif "messages" in data:
+ # Single conversation with messages
+ conversation = self._extract_conversation_from_json(data)
+ if conversation:
+ conversations.append(conversation)
+ else:
+ # Try to treat the whole object as a conversation
+ conversation = self._extract_conversation_from_json(data)
+ if conversation:
+ conversations.append(conversation)
+
+ return conversations
+
+ def _extract_conversation_from_json(self, conv_data: dict) -> dict | None:
+ """
+ Extract conversation data from a JSON object.
+
+ Args:
+ conv_data: Dictionary containing conversation data
+
+ Returns:
+ Dictionary with conversation data or None
+ """
+ if not isinstance(conv_data, dict):
+ return None
+
+ messages = []
+
+ # Look for messages in various possible structures
+ message_sources = []
+ if "messages" in conv_data:
+ message_sources = conv_data["messages"]
+ elif "chat" in conv_data:
+ message_sources = conv_data["chat"]
+ elif "conversation" in conv_data:
+ message_sources = conv_data["conversation"]
+ else:
+ # If no clear message structure, try to extract from the object itself
+ if "content" in conv_data and "role" in conv_data:
+ message_sources = [conv_data]
+
+ for msg_data in message_sources:
+ message = self._extract_message_from_json(msg_data)
+ if message:
+ messages.append(message)
+
+ if not messages:
+ return None
+
+ # Extract conversation metadata
+ title = self._extract_title_from_conversation(conv_data, messages)
+ timestamp = self._extract_timestamp_from_conversation(conv_data)
+
+ return {"title": title, "messages": messages, "timestamp": timestamp}
+
+ def _extract_message_from_json(self, msg_data: dict) -> dict | None:
+ """
+ Extract message data from a JSON message object.
+
+ Args:
+ msg_data: Dictionary containing message data
+
+ Returns:
+ Dictionary with message data or None
+ """
+ if not isinstance(msg_data, dict):
+ return None
+
+ # Extract content from various possible fields
+ content = ""
+ content_fields = ["content", "text", "message", "body"]
+ for field in content_fields:
+ if msg_data.get(field):
+ content = str(msg_data[field])
+ break
+
+ if not content or len(content.strip()) < 3:
+ return None
+
+ # Extract role (user/assistant/human/ai/claude)
+ role = "mixed" # Default role
+ role_fields = ["role", "sender", "from", "author", "type"]
+ for field in role_fields:
+ if msg_data.get(field):
+ role_value = str(msg_data[field]).lower()
+ if role_value in ["user", "human", "person"]:
+ role = "user"
+ elif role_value in ["assistant", "ai", "claude", "bot"]:
+ role = "assistant"
+ break
+
+ # Extract timestamp
+ timestamp = self._extract_timestamp_from_message(msg_data)
+
+ return {"role": role, "content": content, "timestamp": timestamp}
+
+ def _extract_timestamp_from_message(self, msg_data: dict) -> str | None:
+ """Extract timestamp from message data."""
+ timestamp_fields = ["timestamp", "created_at", "date", "time"]
+ for field in timestamp_fields:
+ if msg_data.get(field):
+ return str(msg_data[field])
+ return None
+
+ def _extract_timestamp_from_conversation(self, conv_data: dict) -> str | None:
+ """Extract timestamp from conversation data."""
+ timestamp_fields = ["timestamp", "created_at", "date", "updated_at", "last_updated"]
+ for field in timestamp_fields:
+ if conv_data.get(field):
+ return str(conv_data[field])
+ return None
+
+ def _extract_title_from_conversation(self, conv_data: dict, messages: list) -> str:
+ """Extract or generate title for conversation."""
+ # Try to find explicit title
+ title_fields = ["title", "name", "subject", "topic"]
+ for field in title_fields:
+ if conv_data.get(field):
+ return str(conv_data[field])
+
+ # Generate title from first user message
+ for message in messages:
+ if message.get("role") == "user":
+ content = message.get("content", "")
+ if content:
+ # Use first 50 characters as title
+ title = content[:50].strip()
+ if len(content) > 50:
+ title += "..."
+ return title
+
+ return "Claude Conversation"
+
+ def _create_concatenated_content(self, conversation: dict) -> str:
+ """
+ Create concatenated content from conversation messages.
+
+ Args:
+ conversation: Dictionary containing conversation data
+
+ Returns:
+ Formatted concatenated content
+ """
+ title = conversation.get("title", "Claude Conversation")
+ messages = conversation.get("messages", [])
+ timestamp = conversation.get("timestamp", "Unknown")
+
+ # Build message content
+ message_parts = []
+ for message in messages:
+ role = message.get("role", "mixed")
+ content = message.get("content", "")
+ msg_timestamp = message.get("timestamp", "")
+
+ if role == "user":
+ prefix = "[You]"
+ elif role == "assistant":
+ prefix = "[Claude]"
+ else:
+ prefix = "[Message]"
+
+ # Add timestamp if available
+ if msg_timestamp:
+ prefix += f" ({msg_timestamp})"
+
+ message_parts.append(f"{prefix}: {content}")
+
+ concatenated_text = "\n\n".join(message_parts)
+
+ # Create final document content
+ doc_content = f"""Conversation: {title}
+Date: {timestamp}
+Messages ({len(messages)} messages):
+
+{concatenated_text}
+"""
+ return doc_content
+
+ def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]:
+ """
+ Load Claude export data.
+
+ Args:
+ input_dir: Directory containing Claude export files or path to specific file
+ **load_kwargs:
+ max_count (int): Maximum number of conversations to process
+ claude_export_path (str): Specific path to Claude export file/directory
+ include_metadata (bool): Whether to include metadata in documents
+ """
+ docs: list[Document] = []
+ max_count = load_kwargs.get("max_count", -1)
+ claude_export_path = load_kwargs.get("claude_export_path", input_dir)
+ include_metadata = load_kwargs.get("include_metadata", True)
+
+ if not claude_export_path:
+ print("No Claude export path provided")
+ return docs
+
+ export_path = Path(claude_export_path)
+
+ if not export_path.exists():
+ print(f"Claude export path not found: {export_path}")
+ return docs
+
+ json_contents = []
+
+ # Handle different input types
+ if export_path.is_file():
+ if export_path.suffix.lower() == ".zip":
+ # Extract JSON from zip file
+ json_contents = self._extract_json_from_zip(export_path)
+ elif export_path.suffix.lower() == ".json":
+ # Read JSON file directly
+ try:
+ with open(export_path, encoding="utf-8", errors="ignore") as f:
+ json_contents.append(f.read())
+ except Exception as e:
+ print(f"Error reading JSON file {export_path}: {e}")
+ return docs
+ else:
+ print(f"Unsupported file type: {export_path.suffix}")
+ return docs
+
+ elif export_path.is_dir():
+ # Look for JSON files in directory
+ json_files = list(export_path.glob("*.json"))
+ zip_files = list(export_path.glob("*.zip"))
+
+ if json_files:
+ print(f"Found {len(json_files)} JSON files in directory")
+ for json_file in json_files:
+ try:
+ with open(json_file, encoding="utf-8", errors="ignore") as f:
+ json_contents.append(f.read())
+ except Exception as e:
+ print(f"Error reading JSON file {json_file}: {e}")
+ continue
+
+ if zip_files:
+ print(f"Found {len(zip_files)} ZIP files in directory")
+ for zip_file in zip_files:
+ zip_contents = self._extract_json_from_zip(zip_file)
+ json_contents.extend(zip_contents)
+
+ if not json_files and not zip_files:
+ print(f"No JSON or ZIP files found in {export_path}")
+ return docs
+
+ if not json_contents:
+ print("No JSON content found to process")
+ return docs
+
+ # Parse conversations from JSON content
+ print("Parsing Claude conversations from JSON...")
+ all_conversations = []
+ for json_content in json_contents:
+ conversations = self._parse_claude_json(json_content)
+ all_conversations.extend(conversations)
+
+ if not all_conversations:
+ print("No conversations found in JSON content")
+ return docs
+
+ print(f"Found {len(all_conversations)} conversations")
+
+ # Process conversations into documents
+ count = 0
+ for conversation in all_conversations:
+ if max_count > 0 and count >= max_count:
+ break
+
+ if self.concatenate_conversations:
+ # Create one document per conversation with concatenated messages
+ doc_content = self._create_concatenated_content(conversation)
+
+ metadata = {}
+ if include_metadata:
+ metadata = {
+ "title": conversation.get("title", "Claude Conversation"),
+ "timestamp": conversation.get("timestamp", "Unknown"),
+ "message_count": len(conversation.get("messages", [])),
+ "source": "Claude Export",
+ }
+
+ doc = Document(text=doc_content, metadata=metadata)
+ docs.append(doc)
+ count += 1
+
+ else:
+ # Create separate documents for each message
+ for message in conversation.get("messages", []):
+ if max_count > 0 and count >= max_count:
+ break
+
+ role = message.get("role", "mixed")
+ content = message.get("content", "")
+ msg_timestamp = message.get("timestamp", "")
+
+ if not content.strip():
+ continue
+
+ # Create document content with context
+ doc_content = f"""Conversation: {conversation.get("title", "Claude Conversation")}
+Role: {role}
+Timestamp: {msg_timestamp or conversation.get("timestamp", "Unknown")}
+Message: {content}
+"""
+
+ metadata = {}
+ if include_metadata:
+ metadata = {
+ "conversation_title": conversation.get("title", "Claude Conversation"),
+ "role": role,
+ "timestamp": msg_timestamp or conversation.get("timestamp", "Unknown"),
+ "source": "Claude Export",
+ }
+
+ doc = Document(text=doc_content, metadata=metadata)
+ docs.append(doc)
+ count += 1
+
+ print(f"Created {len(docs)} documents from Claude export")
+ return docs
diff --git a/apps/claude_rag.py b/apps/claude_rag.py
new file mode 100644
index 0000000..43b499e
--- /dev/null
+++ b/apps/claude_rag.py
@@ -0,0 +1,189 @@
+"""
+Claude RAG example using the unified interface.
+Supports Claude export data from JSON files.
+"""
+
+import sys
+from pathlib import Path
+
+# Add parent directory to path for imports
+sys.path.insert(0, str(Path(__file__).parent))
+
+from base_rag_example import BaseRAGExample
+from chunking import create_text_chunks
+
+from .claude_data.claude_reader import ClaudeReader
+
+
+class ClaudeRAG(BaseRAGExample):
+ """RAG example for Claude conversation data."""
+
+ def __init__(self):
+ # Set default values BEFORE calling super().__init__
+ self.max_items_default = -1 # Process all conversations by default
+ self.embedding_model_default = (
+ "sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model
+ )
+
+ super().__init__(
+ name="Claude",
+ description="Process and query Claude conversation exports with LEANN",
+ default_index_name="claude_conversations_index",
+ )
+
+ def _add_specific_arguments(self, parser):
+ """Add Claude-specific arguments."""
+ claude_group = parser.add_argument_group("Claude Parameters")
+ claude_group.add_argument(
+ "--export-path",
+ type=str,
+ default="./claude_export",
+ help="Path to Claude export file (.json or .zip) or directory containing exports (default: ./claude_export)",
+ )
+ claude_group.add_argument(
+ "--concatenate-conversations",
+ action="store_true",
+ default=True,
+ help="Concatenate messages within conversations for better context (default: True)",
+ )
+ claude_group.add_argument(
+ "--separate-messages",
+ action="store_true",
+ help="Process each message as a separate document (overrides --concatenate-conversations)",
+ )
+ claude_group.add_argument(
+ "--chunk-size", type=int, default=512, help="Text chunk size (default: 512)"
+ )
+ claude_group.add_argument(
+ "--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)"
+ )
+
+ def _find_claude_exports(self, export_path: Path) -> list[Path]:
+ """
+ Find Claude export files in the given path.
+
+ Args:
+ export_path: Path to search for exports
+
+ Returns:
+ List of paths to Claude export files
+ """
+ export_files = []
+
+ if export_path.is_file():
+ if export_path.suffix.lower() in [".zip", ".json"]:
+ export_files.append(export_path)
+ elif export_path.is_dir():
+ # Look for zip and json files
+ export_files.extend(export_path.glob("*.zip"))
+ export_files.extend(export_path.glob("*.json"))
+
+ return export_files
+
+ async def load_data(self, args) -> list[str]:
+ """Load Claude export data and convert to text chunks."""
+ export_path = Path(args.export_path)
+
+ if not export_path.exists():
+ print(f"Claude export path not found: {export_path}")
+ print(
+ "Please ensure you have exported your Claude data and placed it in the correct location."
+ )
+ print("\nTo export your Claude data:")
+ print("1. Open Claude in your browser")
+ print("2. Look for export/download options in settings or conversation menu")
+ print("3. Download the conversation data (usually in JSON format)")
+ print("4. Place the file/directory at the specified path")
+ print(
+ "\nNote: Claude export methods may vary. Check Claude's help documentation for current instructions."
+ )
+ return []
+
+ # Find export files
+ export_files = self._find_claude_exports(export_path)
+
+ if not export_files:
+ print(f"No Claude export files (.json or .zip) found in: {export_path}")
+ return []
+
+ print(f"Found {len(export_files)} Claude export files")
+
+ # Create reader with appropriate settings
+ concatenate = args.concatenate_conversations and not args.separate_messages
+ reader = ClaudeReader(concatenate_conversations=concatenate)
+
+ # Process each export file
+ all_documents = []
+ total_processed = 0
+
+ for i, export_file in enumerate(export_files):
+ print(f"\nProcessing export file {i + 1}/{len(export_files)}: {export_file.name}")
+
+ try:
+ # Apply max_items limit per file
+ max_per_file = -1
+ if args.max_items > 0:
+ remaining = args.max_items - total_processed
+ if remaining <= 0:
+ break
+ max_per_file = remaining
+
+ # Load conversations
+ documents = reader.load_data(
+ claude_export_path=str(export_file),
+ max_count=max_per_file,
+ include_metadata=True,
+ )
+
+ if documents:
+ all_documents.extend(documents)
+ total_processed += len(documents)
+ print(f"Processed {len(documents)} conversations from this file")
+ else:
+ print(f"No conversations loaded from {export_file}")
+
+ except Exception as e:
+ print(f"Error processing {export_file}: {e}")
+ continue
+
+ if not all_documents:
+ print("No conversations found to process!")
+ print("\nTroubleshooting:")
+ print("- Ensure the export file is a valid Claude export")
+ print("- Check that the JSON file contains conversation data")
+ print("- Try using a different export format or method")
+ print("- Check Claude's documentation for current export procedures")
+ return []
+
+ print(f"\nTotal conversations processed: {len(all_documents)}")
+ print("Now starting to split into text chunks... this may take some time")
+
+ # Convert to text chunks
+ all_texts = create_text_chunks(
+ all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
+ )
+
+ print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations")
+ return all_texts
+
+
+if __name__ == "__main__":
+ import asyncio
+
+ # Example queries for Claude RAG
+ print("\nš¤ Claude RAG Example")
+ print("=" * 50)
+ print("\nExample queries you can try:")
+ print("- 'What did I ask Claude about Python programming?'")
+ print("- 'Show me conversations about machine learning'")
+ print("- 'Find discussions about code optimization'")
+ print("- 'What advice did Claude give me about software design?'")
+ print("- 'Search for conversations about debugging techniques'")
+ print("\nTo get started:")
+ print("1. Export your Claude conversation data")
+ print("2. Place the JSON/ZIP file in ./claude_export/")
+ print("3. Run this script to build your personal Claude knowledge base!")
+ print("\nOr run without --query for interactive mode\n")
+
+ rag = ClaudeRAG()
+ asyncio.run(rag.run())
diff --git a/apps/imessage_data/__init__.py b/apps/imessage_data/__init__.py
new file mode 100644
index 0000000..9e9e3fc
--- /dev/null
+++ b/apps/imessage_data/__init__.py
@@ -0,0 +1 @@
+"""iMessage data processing module."""
diff --git a/apps/imessage_data/imessage_reader.py b/apps/imessage_data/imessage_reader.py
new file mode 100644
index 0000000..4dfc0af
--- /dev/null
+++ b/apps/imessage_data/imessage_reader.py
@@ -0,0 +1,342 @@
+"""
+iMessage data reader.
+
+Reads and processes iMessage conversation data from the macOS Messages database.
+"""
+
+import sqlite3
+from datetime import datetime
+from pathlib import Path
+from typing import Any
+
+from llama_index.core import Document
+from llama_index.core.readers.base import BaseReader
+
+
+class IMessageReader(BaseReader):
+ """
+ iMessage data reader.
+
+ Reads iMessage conversation data from the macOS Messages database (chat.db).
+ Processes conversations into structured documents with metadata.
+ """
+
+ def __init__(self, concatenate_conversations: bool = True) -> None:
+ """
+ Initialize.
+
+ Args:
+ concatenate_conversations: Whether to concatenate messages within conversations for better context
+ """
+ self.concatenate_conversations = concatenate_conversations
+
+ def _get_default_chat_db_path(self) -> Path:
+ """
+ Get the default path to the iMessage chat database.
+
+ Returns:
+ Path to the chat.db file
+ """
+ home = Path.home()
+ return home / "Library" / "Messages" / "chat.db"
+
+ def _convert_cocoa_timestamp(self, cocoa_timestamp: int) -> str:
+ """
+ Convert Cocoa timestamp to readable format.
+
+ Args:
+ cocoa_timestamp: Timestamp in Cocoa format (nanoseconds since 2001-01-01)
+
+ Returns:
+ Formatted timestamp string
+ """
+ if cocoa_timestamp == 0:
+ return "Unknown"
+
+ try:
+ # Cocoa timestamp is nanoseconds since 2001-01-01 00:00:00 UTC
+ # Convert to seconds and add to Unix epoch
+ cocoa_epoch = datetime(2001, 1, 1)
+ unix_timestamp = cocoa_timestamp / 1_000_000_000 # Convert nanoseconds to seconds
+ message_time = cocoa_epoch.timestamp() + unix_timestamp
+ return datetime.fromtimestamp(message_time).strftime("%Y-%m-%d %H:%M:%S")
+ except (ValueError, OSError):
+ return "Unknown"
+
+ def _get_contact_name(self, handle_id: str) -> str:
+ """
+ Get a readable contact name from handle ID.
+
+ Args:
+ handle_id: The handle ID (phone number or email)
+
+ Returns:
+ Formatted contact name
+ """
+ if not handle_id:
+ return "Unknown"
+
+ # Clean up phone numbers and emails for display
+ if "@" in handle_id:
+ return handle_id # Email address
+ elif handle_id.startswith("+"):
+ return handle_id # International phone number
+ else:
+ # Try to format as phone number
+ digits = "".join(filter(str.isdigit, handle_id))
+ if len(digits) == 10:
+ return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
+ elif len(digits) == 11 and digits[0] == "1":
+ return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
+ else:
+ return handle_id
+
+ def _read_messages_from_db(self, db_path: Path) -> list[dict]:
+ """
+ Read messages from the iMessage database.
+
+ Args:
+ db_path: Path to the chat.db file
+
+ Returns:
+ List of message dictionaries
+ """
+ if not db_path.exists():
+ print(f"iMessage database not found at: {db_path}")
+ return []
+
+ try:
+ # Connect to the database
+ conn = sqlite3.connect(str(db_path))
+ cursor = conn.cursor()
+
+ # Query to get messages with chat and handle information
+ query = """
+ SELECT
+ m.ROWID as message_id,
+ m.text,
+ m.date,
+ m.is_from_me,
+ m.service,
+ c.chat_identifier,
+ c.display_name as chat_display_name,
+ h.id as handle_id,
+ c.ROWID as chat_id
+ FROM message m
+ LEFT JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
+ LEFT JOIN chat c ON cmj.chat_id = c.ROWID
+ LEFT JOIN handle h ON m.handle_id = h.ROWID
+ WHERE m.text IS NOT NULL AND m.text != ''
+ ORDER BY c.ROWID, m.date
+ """
+
+ cursor.execute(query)
+ rows = cursor.fetchall()
+
+ messages = []
+ for row in rows:
+ (
+ message_id,
+ text,
+ date,
+ is_from_me,
+ service,
+ chat_identifier,
+ chat_display_name,
+ handle_id,
+ chat_id,
+ ) = row
+
+ message = {
+ "message_id": message_id,
+ "text": text,
+ "timestamp": self._convert_cocoa_timestamp(date),
+ "is_from_me": bool(is_from_me),
+ "service": service or "iMessage",
+ "chat_identifier": chat_identifier or "Unknown",
+ "chat_display_name": chat_display_name or "Unknown Chat",
+ "handle_id": handle_id or "Unknown",
+ "contact_name": self._get_contact_name(handle_id or ""),
+ "chat_id": chat_id,
+ }
+ messages.append(message)
+
+ conn.close()
+ print(f"Found {len(messages)} messages in database")
+ return messages
+
+ except sqlite3.Error as e:
+ print(f"Error reading iMessage database: {e}")
+ return []
+ except Exception as e:
+ print(f"Unexpected error reading iMessage database: {e}")
+ return []
+
+ def _group_messages_by_chat(self, messages: list[dict]) -> dict[int, list[dict]]:
+ """
+ Group messages by chat ID.
+
+ Args:
+ messages: List of message dictionaries
+
+ Returns:
+ Dictionary mapping chat_id to list of messages
+ """
+ chats = {}
+ for message in messages:
+ chat_id = message["chat_id"]
+ if chat_id not in chats:
+ chats[chat_id] = []
+ chats[chat_id].append(message)
+
+ return chats
+
+ def _create_concatenated_content(self, chat_id: int, messages: list[dict]) -> str:
+ """
+ Create concatenated content from chat messages.
+
+ Args:
+ chat_id: The chat ID
+ messages: List of messages in the chat
+
+ Returns:
+ Concatenated text content
+ """
+ if not messages:
+ return ""
+
+ # Get chat info from first message
+ first_msg = messages[0]
+ chat_name = first_msg["chat_display_name"]
+ chat_identifier = first_msg["chat_identifier"]
+
+ # Build message content
+ message_parts = []
+ for message in messages:
+ timestamp = message["timestamp"]
+ is_from_me = message["is_from_me"]
+ text = message["text"]
+ contact_name = message["contact_name"]
+
+ if is_from_me:
+ prefix = "[You]"
+ else:
+ prefix = f"[{contact_name}]"
+
+ if timestamp != "Unknown":
+ prefix += f" ({timestamp})"
+
+ message_parts.append(f"{prefix}: {text}")
+
+ concatenated_text = "\n\n".join(message_parts)
+
+ doc_content = f"""Chat: {chat_name}
+Identifier: {chat_identifier}
+Messages ({len(messages)} messages):
+
+{concatenated_text}
+"""
+ return doc_content
+
+ def _create_individual_content(self, message: dict) -> str:
+ """
+ Create content for individual message.
+
+ Args:
+ message: Message dictionary
+
+ Returns:
+ Formatted message content
+ """
+ timestamp = message["timestamp"]
+ is_from_me = message["is_from_me"]
+ text = message["text"]
+ contact_name = message["contact_name"]
+ chat_name = message["chat_display_name"]
+
+ sender = "You" if is_from_me else contact_name
+
+ return f"""Message from {sender} in chat "{chat_name}"
+Time: {timestamp}
+Content: {text}
+"""
+
+ def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]:
+ """
+ Load iMessage data and return as documents.
+
+ Args:
+ input_dir: Optional path to directory containing chat.db file.
+ If not provided, uses default macOS location.
+ **load_kwargs: Additional arguments (unused)
+
+ Returns:
+ List of Document objects containing iMessage data
+ """
+ docs = []
+
+ # Determine database path
+ if input_dir:
+ db_path = Path(input_dir) / "chat.db"
+ else:
+ db_path = self._get_default_chat_db_path()
+
+ print(f"Reading iMessage database from: {db_path}")
+
+ # Read messages from database
+ messages = self._read_messages_from_db(db_path)
+ if not messages:
+ return docs
+
+ if self.concatenate_conversations:
+ # Group messages by chat and create concatenated documents
+ chats = self._group_messages_by_chat(messages)
+
+ for chat_id, chat_messages in chats.items():
+ if not chat_messages:
+ continue
+
+ content = self._create_concatenated_content(chat_id, chat_messages)
+
+ # Create metadata
+ first_msg = chat_messages[0]
+ last_msg = chat_messages[-1]
+
+ metadata = {
+ "source": "iMessage",
+ "chat_id": chat_id,
+ "chat_name": first_msg["chat_display_name"],
+ "chat_identifier": first_msg["chat_identifier"],
+ "message_count": len(chat_messages),
+ "first_message_date": first_msg["timestamp"],
+ "last_message_date": last_msg["timestamp"],
+ "participants": list(
+ {msg["contact_name"] for msg in chat_messages if not msg["is_from_me"]}
+ ),
+ }
+
+ doc = Document(text=content, metadata=metadata)
+ docs.append(doc)
+
+ else:
+ # Create individual documents for each message
+ for message in messages:
+ content = self._create_individual_content(message)
+
+ metadata = {
+ "source": "iMessage",
+ "message_id": message["message_id"],
+ "chat_id": message["chat_id"],
+ "chat_name": message["chat_display_name"],
+ "chat_identifier": message["chat_identifier"],
+ "timestamp": message["timestamp"],
+ "is_from_me": message["is_from_me"],
+ "contact_name": message["contact_name"],
+ "service": message["service"],
+ }
+
+ doc = Document(text=content, metadata=metadata)
+ docs.append(doc)
+
+ print(f"Created {len(docs)} documents from iMessage data")
+ return docs
diff --git a/apps/imessage_rag.py b/apps/imessage_rag.py
new file mode 100644
index 0000000..50032ec
--- /dev/null
+++ b/apps/imessage_rag.py
@@ -0,0 +1,125 @@
+"""
+iMessage RAG Example.
+
+This example demonstrates how to build a RAG system on your iMessage conversation history.
+"""
+
+import asyncio
+from pathlib import Path
+
+from leann.chunking_utils import create_text_chunks
+
+from apps.base_rag_example import BaseRAGExample
+from apps.imessage_data.imessage_reader import IMessageReader
+
+
+class IMessageRAG(BaseRAGExample):
+ """RAG example for iMessage conversation history."""
+
+ def __init__(self):
+ super().__init__(
+ name="iMessage",
+ description="RAG on your iMessage conversation history",
+ default_index_name="imessage_index",
+ )
+
+ def _add_specific_arguments(self, parser):
+ """Add iMessage-specific arguments."""
+ imessage_group = parser.add_argument_group("iMessage Parameters")
+ imessage_group.add_argument(
+ "--db-path",
+ type=str,
+ default=None,
+ help="Path to iMessage chat.db file (default: ~/Library/Messages/chat.db)",
+ )
+ imessage_group.add_argument(
+ "--concatenate-conversations",
+ action="store_true",
+ default=True,
+ help="Concatenate messages within conversations for better context (default: True)",
+ )
+ imessage_group.add_argument(
+ "--no-concatenate-conversations",
+ action="store_true",
+ help="Process each message individually instead of concatenating by conversation",
+ )
+ imessage_group.add_argument(
+ "--chunk-size",
+ type=int,
+ default=1000,
+ help="Maximum characters per text chunk (default: 1000)",
+ )
+ imessage_group.add_argument(
+ "--chunk-overlap",
+ type=int,
+ default=200,
+ help="Overlap between text chunks (default: 200)",
+ )
+
+ async def load_data(self, args) -> list[str]:
+ """Load iMessage history and convert to text chunks."""
+ print("Loading iMessage conversation history...")
+
+ # Determine concatenation setting
+ concatenate = args.concatenate_conversations and not args.no_concatenate_conversations
+
+ # Initialize iMessage reader
+ reader = IMessageReader(concatenate_conversations=concatenate)
+
+ # Load documents
+ try:
+ if args.db_path:
+ # Use custom database path
+ db_dir = str(Path(args.db_path).parent)
+ documents = reader.load_data(input_dir=db_dir)
+ else:
+ # Use default macOS location
+ documents = reader.load_data()
+
+ except Exception as e:
+ print(f"Error loading iMessage data: {e}")
+ print("\nTroubleshooting tips:")
+ print("1. Make sure you have granted Full Disk Access to your terminal/IDE")
+ print("2. Check that the iMessage database exists at ~/Library/Messages/chat.db")
+ print("3. Try specifying a custom path with --db-path if you have a backup")
+ return []
+
+ if not documents:
+ print("No iMessage conversations found!")
+ return []
+
+ print(f"Loaded {len(documents)} iMessage documents")
+
+ # Show some statistics
+ total_messages = sum(doc.metadata.get("message_count", 1) for doc in documents)
+ print(f"Total messages: {total_messages}")
+
+ if concatenate:
+ # Show chat statistics
+ chat_names = [doc.metadata.get("chat_name", "Unknown") for doc in documents]
+ unique_chats = len(set(chat_names))
+ print(f"Unique conversations: {unique_chats}")
+
+ # Convert to text chunks
+ all_texts = create_text_chunks(
+ documents,
+ chunk_size=args.chunk_size,
+ chunk_overlap=args.chunk_overlap,
+ )
+
+ # Apply max_items limit if specified
+ if args.max_items > 0:
+ all_texts = all_texts[: args.max_items]
+ print(f"Limited to {len(all_texts)} text chunks (max_items={args.max_items})")
+
+ return all_texts
+
+
+async def main():
+ """Main entry point."""
+ app = IMessageRAG()
+ await app.run()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())