From 658bce47effde86da09acf4d653732a8875bc871 Mon Sep 17 00:00:00 2001 From: Aakash Suresh Date: Thu, 2 Oct 2025 10:40:57 -0700 Subject: [PATCH] Feature/imessage rag support (#131) --- README.md | 236 ++++++++++++++- apps/chatgpt_data/__init__.py | 0 apps/chatgpt_data/chatgpt_reader.py | 413 +++++++++++++++++++++++++ apps/chatgpt_rag.py | 186 ++++++++++++ apps/claude_data/__init__.py | 0 apps/claude_data/claude_reader.py | 420 ++++++++++++++++++++++++++ apps/claude_rag.py | 189 ++++++++++++ apps/imessage_data/__init__.py | 1 + apps/imessage_data/imessage_reader.py | 342 +++++++++++++++++++++ apps/imessage_rag.py | 125 ++++++++ 10 files changed, 1910 insertions(+), 2 deletions(-) create mode 100644 apps/chatgpt_data/__init__.py create mode 100644 apps/chatgpt_data/chatgpt_reader.py create mode 100644 apps/chatgpt_rag.py create mode 100644 apps/claude_data/__init__.py create mode 100644 apps/claude_data/claude_reader.py create mode 100644 apps/claude_rag.py create mode 100644 apps/imessage_data/__init__.py create mode 100644 apps/imessage_data/imessage_reader.py create mode 100644 apps/imessage_rag.py diff --git a/README.md b/README.md index 0e7904e..a4b81ce 100755 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ LEANN is an innovative vector database that democratizes personal AI. Transform LEANN achieves this through *graph-based selective recomputation* with *high-degree preserving pruning*, computing embeddings on-demand instead of storing them all. [Illustration Fig →](#ļø-architecture--how-it-works) | [Paper →](https://arxiv.org/abs/2506.08276) -**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)**, **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy. +**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy. \* Claude Code only supports basic `grep`-style keyword search. **LEANN** is a drop-in **semantic search MCP service fully compatible with Claude Code**, unlocking intelligent retrieval without changing your workflow. šŸ”„ Check out [the easy setup →](packages/leann-mcp/README.md) @@ -176,7 +176,7 @@ response = chat.ask("How much storage does LEANN save?", top_k=1) ## RAG on Everything! -LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, and more. +LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and more. @@ -542,6 +542,238 @@ Once the index is built, you can ask questions like: +### šŸ¤– ChatGPT Chat History: Your Personal AI Conversation Archive! + +Transform your ChatGPT conversations into a searchable knowledge base! Search through all your ChatGPT discussions about coding, research, brainstorming, and more. + +```bash +python -m apps.chatgpt_rag --export-path chatgpt_export.html --query "How do I create a list in Python?" +``` + +**Unlock your AI conversation history.** Never lose track of valuable insights from your ChatGPT discussions again. + +
+šŸ“‹ Click to expand: How to Export ChatGPT Data + +**Step-by-step export process:** + +1. **Sign in to ChatGPT** +2. **Click your profile icon** in the top right corner +3. **Navigate to Settings** → **Data Controls** +4. **Click "Export"** under Export Data +5. **Confirm the export** request +6. **Download the ZIP file** from the email link (expires in 24 hours) +7. **Extract or use directly** with LEANN + +**Supported formats:** +- `.html` files from ChatGPT exports +- `.zip` archives from ChatGPT +- Directories with multiple export files + +
+ +
+šŸ“‹ Click to expand: ChatGPT-Specific Arguments + +#### Parameters +```bash +--export-path PATH # Path to ChatGPT export file (.html/.zip) or directory (default: ./chatgpt_export) +--separate-messages # Process each message separately instead of concatenated conversations +--chunk-size N # Text chunk size (default: 512) +--chunk-overlap N # Overlap between chunks (default: 128) +``` + +#### Example Commands +```bash +# Basic usage with HTML export +python -m apps.chatgpt_rag --export-path conversations.html + +# Process ZIP archive from ChatGPT +python -m apps.chatgpt_rag --export-path chatgpt_export.zip + +# Search with specific query +python -m apps.chatgpt_rag --export-path chatgpt_data.html --query "Python programming help" + +# Process individual messages for fine-grained search +python -m apps.chatgpt_rag --separate-messages --export-path chatgpt_export.html + +# Process directory containing multiple exports +python -m apps.chatgpt_rag --export-path ./chatgpt_exports/ --max-items 1000 +``` + +
+ +
+šŸ’” Click to expand: Example queries you can try + +Once your ChatGPT conversations are indexed, you can search with queries like: +- "What did I ask ChatGPT about Python programming?" +- "Show me conversations about machine learning algorithms" +- "Find discussions about web development frameworks" +- "What coding advice did ChatGPT give me?" +- "Search for conversations about debugging techniques" +- "Find ChatGPT's recommendations for learning resources" + +
+ +### šŸ¤– Claude Chat History: Your Personal AI Conversation Archive! + +Transform your Claude conversations into a searchable knowledge base! Search through all your Claude discussions about coding, research, brainstorming, and more. + +```bash +python -m apps.claude_rag --export-path claude_export.json --query "What did I ask about Python dictionaries?" +``` + +**Unlock your AI conversation history.** Never lose track of valuable insights from your Claude discussions again. + +
+šŸ“‹ Click to expand: How to Export Claude Data + +**Step-by-step export process:** + +1. **Open Claude** in your browser +2. **Navigate to Settings** (look for gear icon or settings menu) +3. **Find Export/Download** options in your account settings +4. **Download conversation data** (usually in JSON format) +5. **Place the file** in your project directory + +*Note: Claude export methods may vary depending on the interface you're using. Check Claude's help documentation for the most current export instructions.* + +**Supported formats:** +- `.json` files (recommended) +- `.zip` archives containing JSON data +- Directories with multiple export files + +
+ +
+šŸ“‹ Click to expand: Claude-Specific Arguments + +#### Parameters +```bash +--export-path PATH # Path to Claude export file (.json/.zip) or directory (default: ./claude_export) +--separate-messages # Process each message separately instead of concatenated conversations +--chunk-size N # Text chunk size (default: 512) +--chunk-overlap N # Overlap between chunks (default: 128) +``` + +#### Example Commands +```bash +# Basic usage with JSON export +python -m apps.claude_rag --export-path my_claude_conversations.json + +# Process ZIP archive from Claude +python -m apps.claude_rag --export-path claude_export.zip + +# Search with specific query +python -m apps.claude_rag --export-path claude_data.json --query "machine learning advice" + +# Process individual messages for fine-grained search +python -m apps.claude_rag --separate-messages --export-path claude_export.json + +# Process directory containing multiple exports +python -m apps.claude_rag --export-path ./claude_exports/ --max-items 1000 +``` + +
+ +
+šŸ’” Click to expand: Example queries you can try + +Once your Claude conversations are indexed, you can search with queries like: +- "What did I ask Claude about Python programming?" +- "Show me conversations about machine learning algorithms" +- "Find discussions about software architecture patterns" +- "What debugging advice did Claude give me?" +- "Search for conversations about data structures" +- "Find Claude's recommendations for learning resources" + +
+ +### šŸ’¬ iMessage History: Your Personal Conversation Archive! + +Transform your iMessage conversations into a searchable knowledge base! Search through all your text messages, group chats, and conversations with friends, family, and colleagues. + +```bash +python -m apps.imessage_rag --query "What did we discuss about the weekend plans?" +``` + +**Unlock your message history.** Never lose track of important conversations, shared links, or memorable moments from your iMessage history. + +
+šŸ“‹ Click to expand: How to Access iMessage Data + +**iMessage data location:** + +iMessage conversations are stored in a SQLite database on your Mac at: +``` +~/Library/Messages/chat.db +``` + +**Important setup requirements:** + +1. **Grant Full Disk Access** to your terminal or IDE: + - Open **System Preferences** → **Security & Privacy** → **Privacy** + - Select **Full Disk Access** from the left sidebar + - Click the **+** button and add your terminal app (Terminal, iTerm2) or IDE (VS Code, etc.) + - Restart your terminal/IDE after granting access + +2. **Alternative: Use a backup database** + - If you have Time Machine backups or manual copies of the database + - Use `--db-path` to specify a custom location + +**Supported formats:** +- Direct access to `~/Library/Messages/chat.db` (default) +- Custom database path with `--db-path` +- Works with backup copies of the database + +
+ +
+šŸ“‹ Click to expand: iMessage-Specific Arguments + +#### Parameters +```bash +--db-path PATH # Path to chat.db file (default: ~/Library/Messages/chat.db) +--concatenate-conversations # Group messages by conversation (default: True) +--no-concatenate-conversations # Process each message individually +--chunk-size N # Text chunk size (default: 1000) +--chunk-overlap N # Overlap between chunks (default: 200) +``` + +#### Example Commands +```bash +# Basic usage (requires Full Disk Access) +python -m apps.imessage_rag + +# Search with specific query +python -m apps.imessage_rag --query "family dinner plans" + +# Use custom database path +python -m apps.imessage_rag --db-path /path/to/backup/chat.db + +# Process individual messages instead of conversations +python -m apps.imessage_rag --no-concatenate-conversations + +# Limit processing for testing +python -m apps.imessage_rag --max-items 100 --query "weekend" +``` + +
+ +
+šŸ’” Click to expand: Example queries you can try + +Once your iMessage conversations are indexed, you can search with queries like: +- "What did we discuss about vacation plans?" +- "Find messages about restaurant recommendations" +- "Show me conversations with John about the project" +- "Search for shared links about technology" +- "Find group chat discussions about weekend events" +- "What did mom say about the family gathering?" + +
+ ### šŸš€ Claude Code Integration: Transform Your Development Workflow!
diff --git a/apps/chatgpt_data/__init__.py b/apps/chatgpt_data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/chatgpt_data/chatgpt_reader.py b/apps/chatgpt_data/chatgpt_reader.py new file mode 100644 index 0000000..c52ce22 --- /dev/null +++ b/apps/chatgpt_data/chatgpt_reader.py @@ -0,0 +1,413 @@ +""" +ChatGPT export data reader. + +Reads and processes ChatGPT export data from chat.html files. +""" + +import re +from pathlib import Path +from typing import Any +from zipfile import ZipFile + +from bs4 import BeautifulSoup +from llama_index.core import Document +from llama_index.core.readers.base import BaseReader + + +class ChatGPTReader(BaseReader): + """ + ChatGPT export data reader. + + Reads ChatGPT conversation data from exported chat.html files or zip archives. + Processes conversations into structured documents with metadata. + """ + + def __init__(self, concatenate_conversations: bool = True) -> None: + """ + Initialize. + + Args: + concatenate_conversations: Whether to concatenate messages within conversations for better context + """ + try: + from bs4 import BeautifulSoup # noqa + except ImportError: + raise ImportError("`beautifulsoup4` package not found: `pip install beautifulsoup4`") + + self.concatenate_conversations = concatenate_conversations + + def _extract_html_from_zip(self, zip_path: Path) -> str | None: + """ + Extract chat.html from ChatGPT export zip file. + + Args: + zip_path: Path to the ChatGPT export zip file + + Returns: + HTML content as string, or None if not found + """ + try: + with ZipFile(zip_path, "r") as zip_file: + # Look for chat.html or conversations.html + html_files = [ + f + for f in zip_file.namelist() + if f.endswith(".html") and ("chat" in f.lower() or "conversation" in f.lower()) + ] + + if not html_files: + print(f"No HTML chat file found in {zip_path}") + return None + + # Use the first HTML file found + html_file = html_files[0] + print(f"Found HTML file: {html_file}") + + with zip_file.open(html_file) as f: + return f.read().decode("utf-8", errors="ignore") + + except Exception as e: + print(f"Error extracting HTML from zip {zip_path}: {e}") + return None + + def _parse_chatgpt_html(self, html_content: str) -> list[dict]: + """ + Parse ChatGPT HTML export to extract conversations. + + Args: + html_content: HTML content from ChatGPT export + + Returns: + List of conversation dictionaries + """ + soup = BeautifulSoup(html_content, "html.parser") + conversations = [] + + # Try different possible structures for ChatGPT exports + # Structure 1: Look for conversation containers + conversation_containers = soup.find_all( + ["div", "section"], class_=re.compile(r"conversation|chat", re.I) + ) + + if not conversation_containers: + # Structure 2: Look for message containers directly + conversation_containers = [soup] # Use the entire document as one conversation + + for container in conversation_containers: + conversation = self._extract_conversation_from_container(container) + if conversation and conversation.get("messages"): + conversations.append(conversation) + + # If no structured conversations found, try to extract all text as one conversation + if not conversations: + all_text = soup.get_text(separator="\n", strip=True) + if all_text: + conversations.append( + { + "title": "ChatGPT Conversation", + "messages": [{"role": "mixed", "content": all_text, "timestamp": None}], + "timestamp": None, + } + ) + + return conversations + + def _extract_conversation_from_container(self, container) -> dict | None: + """ + Extract conversation data from a container element. + + Args: + container: BeautifulSoup element containing conversation + + Returns: + Dictionary with conversation data or None + """ + messages = [] + + # Look for message elements with various possible structures + message_selectors = ['[class*="message"]', '[class*="chat"]', "[data-message]", "p", "div"] + + for selector in message_selectors: + message_elements = container.select(selector) + if message_elements: + break + else: + message_elements = [] + + # If no structured messages found, treat the entire container as one message + if not message_elements: + text_content = container.get_text(separator="\n", strip=True) + if text_content: + messages.append({"role": "mixed", "content": text_content, "timestamp": None}) + else: + for element in message_elements: + message = self._extract_message_from_element(element) + if message: + messages.append(message) + + if not messages: + return None + + # Try to extract conversation title + title_element = container.find(["h1", "h2", "h3", "title"]) + title = title_element.get_text(strip=True) if title_element else "ChatGPT Conversation" + + # Try to extract timestamp from various possible locations + timestamp = self._extract_timestamp_from_container(container) + + return {"title": title, "messages": messages, "timestamp": timestamp} + + def _extract_message_from_element(self, element) -> dict | None: + """ + Extract message data from an element. + + Args: + element: BeautifulSoup element containing message + + Returns: + Dictionary with message data or None + """ + text_content = element.get_text(separator=" ", strip=True) + + # Skip empty or very short messages + if not text_content or len(text_content.strip()) < 3: + return None + + # Try to determine role (user/assistant) from class names or content + role = "mixed" # Default role + + class_names = " ".join(element.get("class", [])).lower() + if "user" in class_names or "human" in class_names: + role = "user" + elif "assistant" in class_names or "ai" in class_names or "gpt" in class_names: + role = "assistant" + elif text_content.lower().startswith(("you:", "user:", "me:")): + role = "user" + text_content = re.sub(r"^(you|user|me):\s*", "", text_content, flags=re.IGNORECASE) + elif text_content.lower().startswith(("chatgpt:", "assistant:", "ai:")): + role = "assistant" + text_content = re.sub( + r"^(chatgpt|assistant|ai):\s*", "", text_content, flags=re.IGNORECASE + ) + + # Try to extract timestamp + timestamp = self._extract_timestamp_from_element(element) + + return {"role": role, "content": text_content, "timestamp": timestamp} + + def _extract_timestamp_from_element(self, element) -> str | None: + """Extract timestamp from element.""" + # Look for timestamp in various attributes and child elements + timestamp_attrs = ["data-timestamp", "timestamp", "datetime"] + for attr in timestamp_attrs: + if element.get(attr): + return element.get(attr) + + # Look for time elements + time_element = element.find("time") + if time_element: + return time_element.get("datetime") or time_element.get_text(strip=True) + + # Look for date-like text patterns + text = element.get_text() + date_patterns = [r"\d{4}-\d{2}-\d{2}", r"\d{1,2}/\d{1,2}/\d{4}", r"\w+ \d{1,2}, \d{4}"] + + for pattern in date_patterns: + match = re.search(pattern, text) + if match: + return match.group() + + return None + + def _extract_timestamp_from_container(self, container) -> str | None: + """Extract timestamp from conversation container.""" + return self._extract_timestamp_from_element(container) + + def _create_concatenated_content(self, conversation: dict) -> str: + """ + Create concatenated content from conversation messages. + + Args: + conversation: Dictionary containing conversation data + + Returns: + Formatted concatenated content + """ + title = conversation.get("title", "ChatGPT Conversation") + messages = conversation.get("messages", []) + timestamp = conversation.get("timestamp", "Unknown") + + # Build message content + message_parts = [] + for message in messages: + role = message.get("role", "mixed") + content = message.get("content", "") + msg_timestamp = message.get("timestamp", "") + + if role == "user": + prefix = "[You]" + elif role == "assistant": + prefix = "[ChatGPT]" + else: + prefix = "[Message]" + + # Add timestamp if available + if msg_timestamp: + prefix += f" ({msg_timestamp})" + + message_parts.append(f"{prefix}: {content}") + + concatenated_text = "\n\n".join(message_parts) + + # Create final document content + doc_content = f"""Conversation: {title} +Date: {timestamp} +Messages ({len(messages)} messages): + +{concatenated_text} +""" + return doc_content + + def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]: + """ + Load ChatGPT export data. + + Args: + input_dir: Directory containing ChatGPT export files or path to specific file + **load_kwargs: + max_count (int): Maximum number of conversations to process + chatgpt_export_path (str): Specific path to ChatGPT export file/directory + include_metadata (bool): Whether to include metadata in documents + """ + docs: list[Document] = [] + max_count = load_kwargs.get("max_count", -1) + chatgpt_export_path = load_kwargs.get("chatgpt_export_path", input_dir) + include_metadata = load_kwargs.get("include_metadata", True) + + if not chatgpt_export_path: + print("No ChatGPT export path provided") + return docs + + export_path = Path(chatgpt_export_path) + + if not export_path.exists(): + print(f"ChatGPT export path not found: {export_path}") + return docs + + html_content = None + + # Handle different input types + if export_path.is_file(): + if export_path.suffix.lower() == ".zip": + # Extract HTML from zip file + html_content = self._extract_html_from_zip(export_path) + elif export_path.suffix.lower() == ".html": + # Read HTML file directly + try: + with open(export_path, encoding="utf-8", errors="ignore") as f: + html_content = f.read() + except Exception as e: + print(f"Error reading HTML file {export_path}: {e}") + return docs + else: + print(f"Unsupported file type: {export_path.suffix}") + return docs + + elif export_path.is_dir(): + # Look for HTML files in directory + html_files = list(export_path.glob("*.html")) + zip_files = list(export_path.glob("*.zip")) + + if html_files: + # Use first HTML file found + html_file = html_files[0] + print(f"Found HTML file: {html_file}") + try: + with open(html_file, encoding="utf-8", errors="ignore") as f: + html_content = f.read() + except Exception as e: + print(f"Error reading HTML file {html_file}: {e}") + return docs + + elif zip_files: + # Use first zip file found + zip_file = zip_files[0] + print(f"Found zip file: {zip_file}") + html_content = self._extract_html_from_zip(zip_file) + + else: + print(f"No HTML or zip files found in {export_path}") + return docs + + if not html_content: + print("No HTML content found to process") + return docs + + # Parse conversations from HTML + print("Parsing ChatGPT conversations from HTML...") + conversations = self._parse_chatgpt_html(html_content) + + if not conversations: + print("No conversations found in HTML content") + return docs + + print(f"Found {len(conversations)} conversations") + + # Process conversations into documents + count = 0 + for conversation in conversations: + if max_count > 0 and count >= max_count: + break + + if self.concatenate_conversations: + # Create one document per conversation with concatenated messages + doc_content = self._create_concatenated_content(conversation) + + metadata = {} + if include_metadata: + metadata = { + "title": conversation.get("title", "ChatGPT Conversation"), + "timestamp": conversation.get("timestamp", "Unknown"), + "message_count": len(conversation.get("messages", [])), + "source": "ChatGPT Export", + } + + doc = Document(text=doc_content, metadata=metadata) + docs.append(doc) + count += 1 + + else: + # Create separate documents for each message + for message in conversation.get("messages", []): + if max_count > 0 and count >= max_count: + break + + role = message.get("role", "mixed") + content = message.get("content", "") + msg_timestamp = message.get("timestamp", "") + + if not content.strip(): + continue + + # Create document content with context + doc_content = f"""Conversation: {conversation.get("title", "ChatGPT Conversation")} +Role: {role} +Timestamp: {msg_timestamp or conversation.get("timestamp", "Unknown")} +Message: {content} +""" + + metadata = {} + if include_metadata: + metadata = { + "conversation_title": conversation.get("title", "ChatGPT Conversation"), + "role": role, + "timestamp": msg_timestamp or conversation.get("timestamp", "Unknown"), + "source": "ChatGPT Export", + } + + doc = Document(text=doc_content, metadata=metadata) + docs.append(doc) + count += 1 + + print(f"Created {len(docs)} documents from ChatGPT export") + return docs diff --git a/apps/chatgpt_rag.py b/apps/chatgpt_rag.py new file mode 100644 index 0000000..3c92d04 --- /dev/null +++ b/apps/chatgpt_rag.py @@ -0,0 +1,186 @@ +""" +ChatGPT RAG example using the unified interface. +Supports ChatGPT export data from chat.html files. +""" + +import sys +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent)) + +from base_rag_example import BaseRAGExample +from chunking import create_text_chunks + +from .chatgpt_data.chatgpt_reader import ChatGPTReader + + +class ChatGPTRAG(BaseRAGExample): + """RAG example for ChatGPT conversation data.""" + + def __init__(self): + # Set default values BEFORE calling super().__init__ + self.max_items_default = -1 # Process all conversations by default + self.embedding_model_default = ( + "sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model + ) + + super().__init__( + name="ChatGPT", + description="Process and query ChatGPT conversation exports with LEANN", + default_index_name="chatgpt_conversations_index", + ) + + def _add_specific_arguments(self, parser): + """Add ChatGPT-specific arguments.""" + chatgpt_group = parser.add_argument_group("ChatGPT Parameters") + chatgpt_group.add_argument( + "--export-path", + type=str, + default="./chatgpt_export", + help="Path to ChatGPT export file (.zip or .html) or directory containing exports (default: ./chatgpt_export)", + ) + chatgpt_group.add_argument( + "--concatenate-conversations", + action="store_true", + default=True, + help="Concatenate messages within conversations for better context (default: True)", + ) + chatgpt_group.add_argument( + "--separate-messages", + action="store_true", + help="Process each message as a separate document (overrides --concatenate-conversations)", + ) + chatgpt_group.add_argument( + "--chunk-size", type=int, default=512, help="Text chunk size (default: 512)" + ) + chatgpt_group.add_argument( + "--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)" + ) + + def _find_chatgpt_exports(self, export_path: Path) -> list[Path]: + """ + Find ChatGPT export files in the given path. + + Args: + export_path: Path to search for exports + + Returns: + List of paths to ChatGPT export files + """ + export_files = [] + + if export_path.is_file(): + if export_path.suffix.lower() in [".zip", ".html"]: + export_files.append(export_path) + elif export_path.is_dir(): + # Look for zip and html files + export_files.extend(export_path.glob("*.zip")) + export_files.extend(export_path.glob("*.html")) + + return export_files + + async def load_data(self, args) -> list[str]: + """Load ChatGPT export data and convert to text chunks.""" + export_path = Path(args.export_path) + + if not export_path.exists(): + print(f"ChatGPT export path not found: {export_path}") + print( + "Please ensure you have exported your ChatGPT data and placed it in the correct location." + ) + print("\nTo export your ChatGPT data:") + print("1. Sign in to ChatGPT") + print("2. Click on your profile icon → Settings → Data Controls") + print("3. Click 'Export' under Export Data") + print("4. Download the zip file from the email link") + print("5. Extract or place the file/directory at the specified path") + return [] + + # Find export files + export_files = self._find_chatgpt_exports(export_path) + + if not export_files: + print(f"No ChatGPT export files (.zip or .html) found in: {export_path}") + return [] + + print(f"Found {len(export_files)} ChatGPT export files") + + # Create reader with appropriate settings + concatenate = args.concatenate_conversations and not args.separate_messages + reader = ChatGPTReader(concatenate_conversations=concatenate) + + # Process each export file + all_documents = [] + total_processed = 0 + + for i, export_file in enumerate(export_files): + print(f"\nProcessing export file {i + 1}/{len(export_files)}: {export_file.name}") + + try: + # Apply max_items limit per file + max_per_file = -1 + if args.max_items > 0: + remaining = args.max_items - total_processed + if remaining <= 0: + break + max_per_file = remaining + + # Load conversations + documents = reader.load_data( + chatgpt_export_path=str(export_file), + max_count=max_per_file, + include_metadata=True, + ) + + if documents: + all_documents.extend(documents) + total_processed += len(documents) + print(f"Processed {len(documents)} conversations from this file") + else: + print(f"No conversations loaded from {export_file}") + + except Exception as e: + print(f"Error processing {export_file}: {e}") + continue + + if not all_documents: + print("No conversations found to process!") + print("\nTroubleshooting:") + print("- Ensure the export file is a valid ChatGPT export") + print("- Check that the HTML file contains conversation data") + print("- Try extracting the zip file and pointing to the HTML file directly") + return [] + + print(f"\nTotal conversations processed: {len(all_documents)}") + print("Now starting to split into text chunks... this may take some time") + + # Convert to text chunks + all_texts = create_text_chunks( + all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap + ) + + print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations") + return all_texts + + +if __name__ == "__main__": + import asyncio + + # Example queries for ChatGPT RAG + print("\nšŸ¤– ChatGPT RAG Example") + print("=" * 50) + print("\nExample queries you can try:") + print("- 'What did I ask about Python programming?'") + print("- 'Show me conversations about machine learning'") + print("- 'Find discussions about travel planning'") + print("- 'What advice did ChatGPT give me about career development?'") + print("- 'Search for conversations about cooking recipes'") + print("\nTo get started:") + print("1. Export your ChatGPT data from Settings → Data Controls → Export") + print("2. Place the downloaded zip file or extracted HTML in ./chatgpt_export/") + print("3. Run this script to build your personal ChatGPT knowledge base!") + print("\nOr run without --query for interactive mode\n") + + rag = ChatGPTRAG() + asyncio.run(rag.run()) diff --git a/apps/claude_data/__init__.py b/apps/claude_data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/claude_data/claude_reader.py b/apps/claude_data/claude_reader.py new file mode 100644 index 0000000..1af1097 --- /dev/null +++ b/apps/claude_data/claude_reader.py @@ -0,0 +1,420 @@ +""" +Claude export data reader. + +Reads and processes Claude conversation data from exported JSON files. +""" + +import json +from pathlib import Path +from typing import Any +from zipfile import ZipFile + +from llama_index.core import Document +from llama_index.core.readers.base import BaseReader + + +class ClaudeReader(BaseReader): + """ + Claude export data reader. + + Reads Claude conversation data from exported JSON files or zip archives. + Processes conversations into structured documents with metadata. + """ + + def __init__(self, concatenate_conversations: bool = True) -> None: + """ + Initialize. + + Args: + concatenate_conversations: Whether to concatenate messages within conversations for better context + """ + self.concatenate_conversations = concatenate_conversations + + def _extract_json_from_zip(self, zip_path: Path) -> list[str]: + """ + Extract JSON files from Claude export zip file. + + Args: + zip_path: Path to the Claude export zip file + + Returns: + List of JSON content strings, or empty list if not found + """ + json_contents = [] + try: + with ZipFile(zip_path, "r") as zip_file: + # Look for JSON files + json_files = [f for f in zip_file.namelist() if f.endswith(".json")] + + if not json_files: + print(f"No JSON files found in {zip_path}") + return [] + + print(f"Found {len(json_files)} JSON files in archive") + + for json_file in json_files: + with zip_file.open(json_file) as f: + content = f.read().decode("utf-8", errors="ignore") + json_contents.append(content) + + except Exception as e: + print(f"Error extracting JSON from zip {zip_path}: {e}") + + return json_contents + + def _parse_claude_json(self, json_content: str) -> list[dict]: + """ + Parse Claude JSON export to extract conversations. + + Args: + json_content: JSON content from Claude export + + Returns: + List of conversation dictionaries + """ + try: + data = json.loads(json_content) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return [] + + conversations = [] + + # Handle different possible JSON structures + if isinstance(data, list): + # If data is a list of conversations + for item in data: + conversation = self._extract_conversation_from_json(item) + if conversation: + conversations.append(conversation) + elif isinstance(data, dict): + # Check for common structures + if "conversations" in data: + # Structure: {"conversations": [...]} + for item in data["conversations"]: + conversation = self._extract_conversation_from_json(item) + if conversation: + conversations.append(conversation) + elif "messages" in data: + # Single conversation with messages + conversation = self._extract_conversation_from_json(data) + if conversation: + conversations.append(conversation) + else: + # Try to treat the whole object as a conversation + conversation = self._extract_conversation_from_json(data) + if conversation: + conversations.append(conversation) + + return conversations + + def _extract_conversation_from_json(self, conv_data: dict) -> dict | None: + """ + Extract conversation data from a JSON object. + + Args: + conv_data: Dictionary containing conversation data + + Returns: + Dictionary with conversation data or None + """ + if not isinstance(conv_data, dict): + return None + + messages = [] + + # Look for messages in various possible structures + message_sources = [] + if "messages" in conv_data: + message_sources = conv_data["messages"] + elif "chat" in conv_data: + message_sources = conv_data["chat"] + elif "conversation" in conv_data: + message_sources = conv_data["conversation"] + else: + # If no clear message structure, try to extract from the object itself + if "content" in conv_data and "role" in conv_data: + message_sources = [conv_data] + + for msg_data in message_sources: + message = self._extract_message_from_json(msg_data) + if message: + messages.append(message) + + if not messages: + return None + + # Extract conversation metadata + title = self._extract_title_from_conversation(conv_data, messages) + timestamp = self._extract_timestamp_from_conversation(conv_data) + + return {"title": title, "messages": messages, "timestamp": timestamp} + + def _extract_message_from_json(self, msg_data: dict) -> dict | None: + """ + Extract message data from a JSON message object. + + Args: + msg_data: Dictionary containing message data + + Returns: + Dictionary with message data or None + """ + if not isinstance(msg_data, dict): + return None + + # Extract content from various possible fields + content = "" + content_fields = ["content", "text", "message", "body"] + for field in content_fields: + if msg_data.get(field): + content = str(msg_data[field]) + break + + if not content or len(content.strip()) < 3: + return None + + # Extract role (user/assistant/human/ai/claude) + role = "mixed" # Default role + role_fields = ["role", "sender", "from", "author", "type"] + for field in role_fields: + if msg_data.get(field): + role_value = str(msg_data[field]).lower() + if role_value in ["user", "human", "person"]: + role = "user" + elif role_value in ["assistant", "ai", "claude", "bot"]: + role = "assistant" + break + + # Extract timestamp + timestamp = self._extract_timestamp_from_message(msg_data) + + return {"role": role, "content": content, "timestamp": timestamp} + + def _extract_timestamp_from_message(self, msg_data: dict) -> str | None: + """Extract timestamp from message data.""" + timestamp_fields = ["timestamp", "created_at", "date", "time"] + for field in timestamp_fields: + if msg_data.get(field): + return str(msg_data[field]) + return None + + def _extract_timestamp_from_conversation(self, conv_data: dict) -> str | None: + """Extract timestamp from conversation data.""" + timestamp_fields = ["timestamp", "created_at", "date", "updated_at", "last_updated"] + for field in timestamp_fields: + if conv_data.get(field): + return str(conv_data[field]) + return None + + def _extract_title_from_conversation(self, conv_data: dict, messages: list) -> str: + """Extract or generate title for conversation.""" + # Try to find explicit title + title_fields = ["title", "name", "subject", "topic"] + for field in title_fields: + if conv_data.get(field): + return str(conv_data[field]) + + # Generate title from first user message + for message in messages: + if message.get("role") == "user": + content = message.get("content", "") + if content: + # Use first 50 characters as title + title = content[:50].strip() + if len(content) > 50: + title += "..." + return title + + return "Claude Conversation" + + def _create_concatenated_content(self, conversation: dict) -> str: + """ + Create concatenated content from conversation messages. + + Args: + conversation: Dictionary containing conversation data + + Returns: + Formatted concatenated content + """ + title = conversation.get("title", "Claude Conversation") + messages = conversation.get("messages", []) + timestamp = conversation.get("timestamp", "Unknown") + + # Build message content + message_parts = [] + for message in messages: + role = message.get("role", "mixed") + content = message.get("content", "") + msg_timestamp = message.get("timestamp", "") + + if role == "user": + prefix = "[You]" + elif role == "assistant": + prefix = "[Claude]" + else: + prefix = "[Message]" + + # Add timestamp if available + if msg_timestamp: + prefix += f" ({msg_timestamp})" + + message_parts.append(f"{prefix}: {content}") + + concatenated_text = "\n\n".join(message_parts) + + # Create final document content + doc_content = f"""Conversation: {title} +Date: {timestamp} +Messages ({len(messages)} messages): + +{concatenated_text} +""" + return doc_content + + def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]: + """ + Load Claude export data. + + Args: + input_dir: Directory containing Claude export files or path to specific file + **load_kwargs: + max_count (int): Maximum number of conversations to process + claude_export_path (str): Specific path to Claude export file/directory + include_metadata (bool): Whether to include metadata in documents + """ + docs: list[Document] = [] + max_count = load_kwargs.get("max_count", -1) + claude_export_path = load_kwargs.get("claude_export_path", input_dir) + include_metadata = load_kwargs.get("include_metadata", True) + + if not claude_export_path: + print("No Claude export path provided") + return docs + + export_path = Path(claude_export_path) + + if not export_path.exists(): + print(f"Claude export path not found: {export_path}") + return docs + + json_contents = [] + + # Handle different input types + if export_path.is_file(): + if export_path.suffix.lower() == ".zip": + # Extract JSON from zip file + json_contents = self._extract_json_from_zip(export_path) + elif export_path.suffix.lower() == ".json": + # Read JSON file directly + try: + with open(export_path, encoding="utf-8", errors="ignore") as f: + json_contents.append(f.read()) + except Exception as e: + print(f"Error reading JSON file {export_path}: {e}") + return docs + else: + print(f"Unsupported file type: {export_path.suffix}") + return docs + + elif export_path.is_dir(): + # Look for JSON files in directory + json_files = list(export_path.glob("*.json")) + zip_files = list(export_path.glob("*.zip")) + + if json_files: + print(f"Found {len(json_files)} JSON files in directory") + for json_file in json_files: + try: + with open(json_file, encoding="utf-8", errors="ignore") as f: + json_contents.append(f.read()) + except Exception as e: + print(f"Error reading JSON file {json_file}: {e}") + continue + + if zip_files: + print(f"Found {len(zip_files)} ZIP files in directory") + for zip_file in zip_files: + zip_contents = self._extract_json_from_zip(zip_file) + json_contents.extend(zip_contents) + + if not json_files and not zip_files: + print(f"No JSON or ZIP files found in {export_path}") + return docs + + if not json_contents: + print("No JSON content found to process") + return docs + + # Parse conversations from JSON content + print("Parsing Claude conversations from JSON...") + all_conversations = [] + for json_content in json_contents: + conversations = self._parse_claude_json(json_content) + all_conversations.extend(conversations) + + if not all_conversations: + print("No conversations found in JSON content") + return docs + + print(f"Found {len(all_conversations)} conversations") + + # Process conversations into documents + count = 0 + for conversation in all_conversations: + if max_count > 0 and count >= max_count: + break + + if self.concatenate_conversations: + # Create one document per conversation with concatenated messages + doc_content = self._create_concatenated_content(conversation) + + metadata = {} + if include_metadata: + metadata = { + "title": conversation.get("title", "Claude Conversation"), + "timestamp": conversation.get("timestamp", "Unknown"), + "message_count": len(conversation.get("messages", [])), + "source": "Claude Export", + } + + doc = Document(text=doc_content, metadata=metadata) + docs.append(doc) + count += 1 + + else: + # Create separate documents for each message + for message in conversation.get("messages", []): + if max_count > 0 and count >= max_count: + break + + role = message.get("role", "mixed") + content = message.get("content", "") + msg_timestamp = message.get("timestamp", "") + + if not content.strip(): + continue + + # Create document content with context + doc_content = f"""Conversation: {conversation.get("title", "Claude Conversation")} +Role: {role} +Timestamp: {msg_timestamp or conversation.get("timestamp", "Unknown")} +Message: {content} +""" + + metadata = {} + if include_metadata: + metadata = { + "conversation_title": conversation.get("title", "Claude Conversation"), + "role": role, + "timestamp": msg_timestamp or conversation.get("timestamp", "Unknown"), + "source": "Claude Export", + } + + doc = Document(text=doc_content, metadata=metadata) + docs.append(doc) + count += 1 + + print(f"Created {len(docs)} documents from Claude export") + return docs diff --git a/apps/claude_rag.py b/apps/claude_rag.py new file mode 100644 index 0000000..43b499e --- /dev/null +++ b/apps/claude_rag.py @@ -0,0 +1,189 @@ +""" +Claude RAG example using the unified interface. +Supports Claude export data from JSON files. +""" + +import sys +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent)) + +from base_rag_example import BaseRAGExample +from chunking import create_text_chunks + +from .claude_data.claude_reader import ClaudeReader + + +class ClaudeRAG(BaseRAGExample): + """RAG example for Claude conversation data.""" + + def __init__(self): + # Set default values BEFORE calling super().__init__ + self.max_items_default = -1 # Process all conversations by default + self.embedding_model_default = ( + "sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model + ) + + super().__init__( + name="Claude", + description="Process and query Claude conversation exports with LEANN", + default_index_name="claude_conversations_index", + ) + + def _add_specific_arguments(self, parser): + """Add Claude-specific arguments.""" + claude_group = parser.add_argument_group("Claude Parameters") + claude_group.add_argument( + "--export-path", + type=str, + default="./claude_export", + help="Path to Claude export file (.json or .zip) or directory containing exports (default: ./claude_export)", + ) + claude_group.add_argument( + "--concatenate-conversations", + action="store_true", + default=True, + help="Concatenate messages within conversations for better context (default: True)", + ) + claude_group.add_argument( + "--separate-messages", + action="store_true", + help="Process each message as a separate document (overrides --concatenate-conversations)", + ) + claude_group.add_argument( + "--chunk-size", type=int, default=512, help="Text chunk size (default: 512)" + ) + claude_group.add_argument( + "--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)" + ) + + def _find_claude_exports(self, export_path: Path) -> list[Path]: + """ + Find Claude export files in the given path. + + Args: + export_path: Path to search for exports + + Returns: + List of paths to Claude export files + """ + export_files = [] + + if export_path.is_file(): + if export_path.suffix.lower() in [".zip", ".json"]: + export_files.append(export_path) + elif export_path.is_dir(): + # Look for zip and json files + export_files.extend(export_path.glob("*.zip")) + export_files.extend(export_path.glob("*.json")) + + return export_files + + async def load_data(self, args) -> list[str]: + """Load Claude export data and convert to text chunks.""" + export_path = Path(args.export_path) + + if not export_path.exists(): + print(f"Claude export path not found: {export_path}") + print( + "Please ensure you have exported your Claude data and placed it in the correct location." + ) + print("\nTo export your Claude data:") + print("1. Open Claude in your browser") + print("2. Look for export/download options in settings or conversation menu") + print("3. Download the conversation data (usually in JSON format)") + print("4. Place the file/directory at the specified path") + print( + "\nNote: Claude export methods may vary. Check Claude's help documentation for current instructions." + ) + return [] + + # Find export files + export_files = self._find_claude_exports(export_path) + + if not export_files: + print(f"No Claude export files (.json or .zip) found in: {export_path}") + return [] + + print(f"Found {len(export_files)} Claude export files") + + # Create reader with appropriate settings + concatenate = args.concatenate_conversations and not args.separate_messages + reader = ClaudeReader(concatenate_conversations=concatenate) + + # Process each export file + all_documents = [] + total_processed = 0 + + for i, export_file in enumerate(export_files): + print(f"\nProcessing export file {i + 1}/{len(export_files)}: {export_file.name}") + + try: + # Apply max_items limit per file + max_per_file = -1 + if args.max_items > 0: + remaining = args.max_items - total_processed + if remaining <= 0: + break + max_per_file = remaining + + # Load conversations + documents = reader.load_data( + claude_export_path=str(export_file), + max_count=max_per_file, + include_metadata=True, + ) + + if documents: + all_documents.extend(documents) + total_processed += len(documents) + print(f"Processed {len(documents)} conversations from this file") + else: + print(f"No conversations loaded from {export_file}") + + except Exception as e: + print(f"Error processing {export_file}: {e}") + continue + + if not all_documents: + print("No conversations found to process!") + print("\nTroubleshooting:") + print("- Ensure the export file is a valid Claude export") + print("- Check that the JSON file contains conversation data") + print("- Try using a different export format or method") + print("- Check Claude's documentation for current export procedures") + return [] + + print(f"\nTotal conversations processed: {len(all_documents)}") + print("Now starting to split into text chunks... this may take some time") + + # Convert to text chunks + all_texts = create_text_chunks( + all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap + ) + + print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations") + return all_texts + + +if __name__ == "__main__": + import asyncio + + # Example queries for Claude RAG + print("\nšŸ¤– Claude RAG Example") + print("=" * 50) + print("\nExample queries you can try:") + print("- 'What did I ask Claude about Python programming?'") + print("- 'Show me conversations about machine learning'") + print("- 'Find discussions about code optimization'") + print("- 'What advice did Claude give me about software design?'") + print("- 'Search for conversations about debugging techniques'") + print("\nTo get started:") + print("1. Export your Claude conversation data") + print("2. Place the JSON/ZIP file in ./claude_export/") + print("3. Run this script to build your personal Claude knowledge base!") + print("\nOr run without --query for interactive mode\n") + + rag = ClaudeRAG() + asyncio.run(rag.run()) diff --git a/apps/imessage_data/__init__.py b/apps/imessage_data/__init__.py new file mode 100644 index 0000000..9e9e3fc --- /dev/null +++ b/apps/imessage_data/__init__.py @@ -0,0 +1 @@ +"""iMessage data processing module.""" diff --git a/apps/imessage_data/imessage_reader.py b/apps/imessage_data/imessage_reader.py new file mode 100644 index 0000000..4dfc0af --- /dev/null +++ b/apps/imessage_data/imessage_reader.py @@ -0,0 +1,342 @@ +""" +iMessage data reader. + +Reads and processes iMessage conversation data from the macOS Messages database. +""" + +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Any + +from llama_index.core import Document +from llama_index.core.readers.base import BaseReader + + +class IMessageReader(BaseReader): + """ + iMessage data reader. + + Reads iMessage conversation data from the macOS Messages database (chat.db). + Processes conversations into structured documents with metadata. + """ + + def __init__(self, concatenate_conversations: bool = True) -> None: + """ + Initialize. + + Args: + concatenate_conversations: Whether to concatenate messages within conversations for better context + """ + self.concatenate_conversations = concatenate_conversations + + def _get_default_chat_db_path(self) -> Path: + """ + Get the default path to the iMessage chat database. + + Returns: + Path to the chat.db file + """ + home = Path.home() + return home / "Library" / "Messages" / "chat.db" + + def _convert_cocoa_timestamp(self, cocoa_timestamp: int) -> str: + """ + Convert Cocoa timestamp to readable format. + + Args: + cocoa_timestamp: Timestamp in Cocoa format (nanoseconds since 2001-01-01) + + Returns: + Formatted timestamp string + """ + if cocoa_timestamp == 0: + return "Unknown" + + try: + # Cocoa timestamp is nanoseconds since 2001-01-01 00:00:00 UTC + # Convert to seconds and add to Unix epoch + cocoa_epoch = datetime(2001, 1, 1) + unix_timestamp = cocoa_timestamp / 1_000_000_000 # Convert nanoseconds to seconds + message_time = cocoa_epoch.timestamp() + unix_timestamp + return datetime.fromtimestamp(message_time).strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, OSError): + return "Unknown" + + def _get_contact_name(self, handle_id: str) -> str: + """ + Get a readable contact name from handle ID. + + Args: + handle_id: The handle ID (phone number or email) + + Returns: + Formatted contact name + """ + if not handle_id: + return "Unknown" + + # Clean up phone numbers and emails for display + if "@" in handle_id: + return handle_id # Email address + elif handle_id.startswith("+"): + return handle_id # International phone number + else: + # Try to format as phone number + digits = "".join(filter(str.isdigit, handle_id)) + if len(digits) == 10: + return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}" + elif len(digits) == 11 and digits[0] == "1": + return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:]}" + else: + return handle_id + + def _read_messages_from_db(self, db_path: Path) -> list[dict]: + """ + Read messages from the iMessage database. + + Args: + db_path: Path to the chat.db file + + Returns: + List of message dictionaries + """ + if not db_path.exists(): + print(f"iMessage database not found at: {db_path}") + return [] + + try: + # Connect to the database + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Query to get messages with chat and handle information + query = """ + SELECT + m.ROWID as message_id, + m.text, + m.date, + m.is_from_me, + m.service, + c.chat_identifier, + c.display_name as chat_display_name, + h.id as handle_id, + c.ROWID as chat_id + FROM message m + LEFT JOIN chat_message_join cmj ON m.ROWID = cmj.message_id + LEFT JOIN chat c ON cmj.chat_id = c.ROWID + LEFT JOIN handle h ON m.handle_id = h.ROWID + WHERE m.text IS NOT NULL AND m.text != '' + ORDER BY c.ROWID, m.date + """ + + cursor.execute(query) + rows = cursor.fetchall() + + messages = [] + for row in rows: + ( + message_id, + text, + date, + is_from_me, + service, + chat_identifier, + chat_display_name, + handle_id, + chat_id, + ) = row + + message = { + "message_id": message_id, + "text": text, + "timestamp": self._convert_cocoa_timestamp(date), + "is_from_me": bool(is_from_me), + "service": service or "iMessage", + "chat_identifier": chat_identifier or "Unknown", + "chat_display_name": chat_display_name or "Unknown Chat", + "handle_id": handle_id or "Unknown", + "contact_name": self._get_contact_name(handle_id or ""), + "chat_id": chat_id, + } + messages.append(message) + + conn.close() + print(f"Found {len(messages)} messages in database") + return messages + + except sqlite3.Error as e: + print(f"Error reading iMessage database: {e}") + return [] + except Exception as e: + print(f"Unexpected error reading iMessage database: {e}") + return [] + + def _group_messages_by_chat(self, messages: list[dict]) -> dict[int, list[dict]]: + """ + Group messages by chat ID. + + Args: + messages: List of message dictionaries + + Returns: + Dictionary mapping chat_id to list of messages + """ + chats = {} + for message in messages: + chat_id = message["chat_id"] + if chat_id not in chats: + chats[chat_id] = [] + chats[chat_id].append(message) + + return chats + + def _create_concatenated_content(self, chat_id: int, messages: list[dict]) -> str: + """ + Create concatenated content from chat messages. + + Args: + chat_id: The chat ID + messages: List of messages in the chat + + Returns: + Concatenated text content + """ + if not messages: + return "" + + # Get chat info from first message + first_msg = messages[0] + chat_name = first_msg["chat_display_name"] + chat_identifier = first_msg["chat_identifier"] + + # Build message content + message_parts = [] + for message in messages: + timestamp = message["timestamp"] + is_from_me = message["is_from_me"] + text = message["text"] + contact_name = message["contact_name"] + + if is_from_me: + prefix = "[You]" + else: + prefix = f"[{contact_name}]" + + if timestamp != "Unknown": + prefix += f" ({timestamp})" + + message_parts.append(f"{prefix}: {text}") + + concatenated_text = "\n\n".join(message_parts) + + doc_content = f"""Chat: {chat_name} +Identifier: {chat_identifier} +Messages ({len(messages)} messages): + +{concatenated_text} +""" + return doc_content + + def _create_individual_content(self, message: dict) -> str: + """ + Create content for individual message. + + Args: + message: Message dictionary + + Returns: + Formatted message content + """ + timestamp = message["timestamp"] + is_from_me = message["is_from_me"] + text = message["text"] + contact_name = message["contact_name"] + chat_name = message["chat_display_name"] + + sender = "You" if is_from_me else contact_name + + return f"""Message from {sender} in chat "{chat_name}" +Time: {timestamp} +Content: {text} +""" + + def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]: + """ + Load iMessage data and return as documents. + + Args: + input_dir: Optional path to directory containing chat.db file. + If not provided, uses default macOS location. + **load_kwargs: Additional arguments (unused) + + Returns: + List of Document objects containing iMessage data + """ + docs = [] + + # Determine database path + if input_dir: + db_path = Path(input_dir) / "chat.db" + else: + db_path = self._get_default_chat_db_path() + + print(f"Reading iMessage database from: {db_path}") + + # Read messages from database + messages = self._read_messages_from_db(db_path) + if not messages: + return docs + + if self.concatenate_conversations: + # Group messages by chat and create concatenated documents + chats = self._group_messages_by_chat(messages) + + for chat_id, chat_messages in chats.items(): + if not chat_messages: + continue + + content = self._create_concatenated_content(chat_id, chat_messages) + + # Create metadata + first_msg = chat_messages[0] + last_msg = chat_messages[-1] + + metadata = { + "source": "iMessage", + "chat_id": chat_id, + "chat_name": first_msg["chat_display_name"], + "chat_identifier": first_msg["chat_identifier"], + "message_count": len(chat_messages), + "first_message_date": first_msg["timestamp"], + "last_message_date": last_msg["timestamp"], + "participants": list( + {msg["contact_name"] for msg in chat_messages if not msg["is_from_me"]} + ), + } + + doc = Document(text=content, metadata=metadata) + docs.append(doc) + + else: + # Create individual documents for each message + for message in messages: + content = self._create_individual_content(message) + + metadata = { + "source": "iMessage", + "message_id": message["message_id"], + "chat_id": message["chat_id"], + "chat_name": message["chat_display_name"], + "chat_identifier": message["chat_identifier"], + "timestamp": message["timestamp"], + "is_from_me": message["is_from_me"], + "contact_name": message["contact_name"], + "service": message["service"], + } + + doc = Document(text=content, metadata=metadata) + docs.append(doc) + + print(f"Created {len(docs)} documents from iMessage data") + return docs diff --git a/apps/imessage_rag.py b/apps/imessage_rag.py new file mode 100644 index 0000000..50032ec --- /dev/null +++ b/apps/imessage_rag.py @@ -0,0 +1,125 @@ +""" +iMessage RAG Example. + +This example demonstrates how to build a RAG system on your iMessage conversation history. +""" + +import asyncio +from pathlib import Path + +from leann.chunking_utils import create_text_chunks + +from apps.base_rag_example import BaseRAGExample +from apps.imessage_data.imessage_reader import IMessageReader + + +class IMessageRAG(BaseRAGExample): + """RAG example for iMessage conversation history.""" + + def __init__(self): + super().__init__( + name="iMessage", + description="RAG on your iMessage conversation history", + default_index_name="imessage_index", + ) + + def _add_specific_arguments(self, parser): + """Add iMessage-specific arguments.""" + imessage_group = parser.add_argument_group("iMessage Parameters") + imessage_group.add_argument( + "--db-path", + type=str, + default=None, + help="Path to iMessage chat.db file (default: ~/Library/Messages/chat.db)", + ) + imessage_group.add_argument( + "--concatenate-conversations", + action="store_true", + default=True, + help="Concatenate messages within conversations for better context (default: True)", + ) + imessage_group.add_argument( + "--no-concatenate-conversations", + action="store_true", + help="Process each message individually instead of concatenating by conversation", + ) + imessage_group.add_argument( + "--chunk-size", + type=int, + default=1000, + help="Maximum characters per text chunk (default: 1000)", + ) + imessage_group.add_argument( + "--chunk-overlap", + type=int, + default=200, + help="Overlap between text chunks (default: 200)", + ) + + async def load_data(self, args) -> list[str]: + """Load iMessage history and convert to text chunks.""" + print("Loading iMessage conversation history...") + + # Determine concatenation setting + concatenate = args.concatenate_conversations and not args.no_concatenate_conversations + + # Initialize iMessage reader + reader = IMessageReader(concatenate_conversations=concatenate) + + # Load documents + try: + if args.db_path: + # Use custom database path + db_dir = str(Path(args.db_path).parent) + documents = reader.load_data(input_dir=db_dir) + else: + # Use default macOS location + documents = reader.load_data() + + except Exception as e: + print(f"Error loading iMessage data: {e}") + print("\nTroubleshooting tips:") + print("1. Make sure you have granted Full Disk Access to your terminal/IDE") + print("2. Check that the iMessage database exists at ~/Library/Messages/chat.db") + print("3. Try specifying a custom path with --db-path if you have a backup") + return [] + + if not documents: + print("No iMessage conversations found!") + return [] + + print(f"Loaded {len(documents)} iMessage documents") + + # Show some statistics + total_messages = sum(doc.metadata.get("message_count", 1) for doc in documents) + print(f"Total messages: {total_messages}") + + if concatenate: + # Show chat statistics + chat_names = [doc.metadata.get("chat_name", "Unknown") for doc in documents] + unique_chats = len(set(chat_names)) + print(f"Unique conversations: {unique_chats}") + + # Convert to text chunks + all_texts = create_text_chunks( + documents, + chunk_size=args.chunk_size, + chunk_overlap=args.chunk_overlap, + ) + + # Apply max_items limit if specified + if args.max_items > 0: + all_texts = all_texts[: args.max_items] + print(f"Limited to {len(all_texts)} text chunks (max_items={args.max_items})") + + return all_texts + + +async def main(): + """Main entry point.""" + app = IMessageRAG() + await app.run() + + +if __name__ == "__main__": + asyncio.run(main())