diff --git a/examples/LEANN_email_reader.py b/examples/LEANN_email_reader.py new file mode 100644 index 0000000..81e9bc2 --- /dev/null +++ b/examples/LEANN_email_reader.py @@ -0,0 +1,130 @@ +import os +import email +from pathlib import Path +from typing import List, Any +from llama_index.core import Document +from llama_index.core.readers.base import BaseReader + +class EmlxReader(BaseReader): + """ + Apple Mail .emlx file reader with embedded metadata. + + Reads individual .emlx files from Apple Mail's storage format. + """ + + def __init__(self) -> None: + """Initialize.""" + pass + + def load_data(self, input_dir: str, **load_kwargs: Any) -> List[Document]: + """ + Load data from the input directory containing .emlx files. + + Args: + input_dir: Directory containing .emlx files + **load_kwargs: + max_count (int): Maximum amount of messages to read. + """ + docs: List[Document] = [] + max_count = load_kwargs.get('max_count', 1000) + count = 0 + + # Walk through the directory recursively + for dirpath, dirnames, filenames in os.walk(input_dir): + # Skip hidden directories + dirnames[:] = [d for d in dirnames if not d.startswith(".")] + + for filename in filenames: + if count >= max_count: + break + + if filename.endswith(".emlx"): + filepath = os.path.join(dirpath, filename) + try: + # Read the .emlx file + with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + + # .emlx files have a length prefix followed by the email content + # The first line contains the length, followed by the email + lines = content.split('\n', 1) + if len(lines) >= 2: + email_content = lines[1] + + # Parse the email using Python's email module + try: + msg = email.message_from_string(email_content) + + # Extract email metadata + subject = msg.get('Subject', 'No Subject') + from_addr = msg.get('From', 'Unknown') + to_addr = msg.get('To', 'Unknown') + date = msg.get('Date', 'Unknown') + + # Extract email body + body = "" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain" or part.get_content_type() == "text/html": + # if part.get_content_type() == "text/html": + # continue + body += part.get_payload(decode=True).decode('utf-8', errors='ignore') + # break + else: + body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') + + # Create document content with metadata embedded in text + doc_content = f""" +[EMAIL METADATA] +File: {filename} +From: {from_addr} +To: {to_addr} +Subject: {subject} +Date: {date} +[END METADATA] + +{body} +""" + + # No separate metadata - everything is in the text + doc = Document(text=doc_content, metadata={}) + docs.append(doc) + count += 1 + + except Exception as e: + print(f"Error parsing email from {filepath}: {e}") + continue + + except Exception as e: + print(f"Error reading file {filepath}: {e}") + continue + + print(f"Loaded {len(docs)} email documents") + return docs + + @staticmethod + def find_all_messages_directories(base_path: str) -> List[Path]: + """ + Find all Messages directories under the given base path. + + Args: + base_path: Base path to search for Messages directories + + Returns: + List of Path objects pointing to Messages directories + """ + base_path_obj = Path(base_path) + messages_dirs = [] + + if not base_path_obj.exists(): + print(f"Base path {base_path} does not exist") + return messages_dirs + + # Find all Messages directories recursively + for messages_dir in base_path_obj.rglob("Messages"): + if messages_dir.is_dir(): + messages_dirs.append(messages_dir) + print(f"Found Messages directory: {messages_dir}") + + print(f"Found {len(messages_dirs)} Messages directories") + return messages_dirs \ No newline at end of file diff --git a/examples/email_data/email.py b/examples/email_data/email.py new file mode 100644 index 0000000..689618b --- /dev/null +++ b/examples/email_data/email.py @@ -0,0 +1,192 @@ +""" +Mbox parser. + +Contains simple parser for mbox files. + +""" + +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional +from fsspec import AbstractFileSystem + +from llama_index.core.readers.base import BaseReader +from llama_index.core.schema import Document + +logger = logging.getLogger(__name__) + + +class MboxReader(BaseReader): + """ + Mbox parser. + + Extract messages from mailbox files. + Returns string including date, subject, sender, receiver and + content for each message. + + """ + + DEFAULT_MESSAGE_FORMAT: str = ( + "Date: {_date}\n" + "From: {_from}\n" + "To: {_to}\n" + "Subject: {_subject}\n" + "Content: {_content}" + ) + + def __init__( + self, + *args: Any, + max_count: int = 0, + message_format: str = DEFAULT_MESSAGE_FORMAT, + **kwargs: Any, + ) -> None: + """Init params.""" + try: + from bs4 import BeautifulSoup # noqa + except ImportError: + raise ImportError( + "`beautifulsoup4` package not found: `pip install beautifulsoup4`" + ) + + super().__init__(*args, **kwargs) + self.max_count = max_count + self.message_format = message_format + + def load_data( + self, + file: Path, + extra_info: Optional[Dict] = None, + fs: Optional[AbstractFileSystem] = None, + ) -> List[Document]: + """Parse file into string.""" + # Import required libraries + import mailbox + from email.parser import BytesParser + from email.policy import default + + from bs4 import BeautifulSoup + + if fs: + logger.warning( + "fs was specified but MboxReader doesn't support loading " + "from fsspec filesystems. Will load from local filesystem instead." + ) + + i = 0 + results: List[str] = [] + # Load file using mailbox + bytes_parser = BytesParser(policy=default).parse + mbox = mailbox.mbox(file, factory=bytes_parser) # type: ignore + + # Iterate through all messages + for _, _msg in enumerate(mbox): + try: + msg: mailbox.mboxMessage = _msg + # Parse multipart messages + if msg.is_multipart(): + for part in msg.walk(): + ctype = part.get_content_type() + cdispo = str(part.get("Content-Disposition")) + if "attachment" in cdispo: + print(f"Attachment found: {part.get_filename()}") + if ctype == "text/plain" and "attachment" not in cdispo: + content = part.get_payload(decode=True) # decode + break + # Get plain message payload for non-multipart messages + else: + content = msg.get_payload(decode=True) + + # Parse message HTML content and remove unneeded whitespace + soup = BeautifulSoup(content) + stripped_content = " ".join(soup.get_text().split()) + # Format message to include date, sender, receiver and subject + msg_string = self.message_format.format( + _date=msg["date"], + _from=msg["from"], + _to=msg["to"], + _subject=msg["subject"], + _content=stripped_content, + ) + # Add message string to results + results.append(msg_string) + except Exception as e: + logger.warning(f"Failed to parse message:\n{_msg}\n with exception {e}") + + # Increment counter and return if max count is met + i += 1 + if self.max_count > 0 and i >= self.max_count: + break + + return [Document(text=result, metadata=extra_info or {}) for result in results] + + +class EmlxMboxReader(MboxReader): + """ + EmlxMboxReader - Modified MboxReader that handles directories of .emlx files. + + Extends MboxReader to work with Apple Mail's .emlx format by: + 1. Reading .emlx files from a directory + 2. Converting them to mbox format in memory + 3. Using the parent MboxReader's parsing logic + """ + + def load_data( + self, + directory: Path, + extra_info: Optional[Dict] = None, + fs: Optional[AbstractFileSystem] = None, + ) -> List[Document]: + """Parse .emlx files from directory into strings using MboxReader logic.""" + import tempfile + import os + + if fs: + logger.warning( + "fs was specified but EmlxMboxReader doesn't support loading " + "from fsspec filesystems. Will load from local filesystem instead." + ) + + # Find all .emlx files in the directory + emlx_files = list(directory.glob("*.emlx")) + logger.info(f"Found {len(emlx_files)} .emlx files in {directory}") + + if not emlx_files: + logger.warning(f"No .emlx files found in {directory}") + return [] + + # Create a temporary mbox file + with tempfile.NamedTemporaryFile(mode='w', suffix='.mbox', delete=False) as temp_mbox: + temp_mbox_path = temp_mbox.name + + # Convert .emlx files to mbox format + for emlx_file in emlx_files: + try: + # Read the .emlx file + with open(emlx_file, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + + # .emlx format: first line is length, rest is email content + lines = content.split('\n', 1) + if len(lines) >= 2: + email_content = lines[1] # Skip the length line + + # Write to mbox format (each message starts with "From " and ends with blank line) + temp_mbox.write(f"From {emlx_file.name} {email_content}\n\n") + + except Exception as e: + logger.warning(f"Failed to process {emlx_file}: {e}") + continue + + # Close the temporary file so MboxReader can read it + temp_mbox.close() + + try: + # Use the parent MboxReader's logic to parse the mbox file + return super().load_data(Path(temp_mbox_path), extra_info, fs) + finally: + # Clean up temporary file + try: + os.unlink(temp_mbox_path) + except: + pass \ No newline at end of file diff --git a/examples/mail_reader_leann.py b/examples/mail_reader_leann.py index fdb674c..dae6df8 100644 --- a/examples/mail_reader_leann.py +++ b/examples/mail_reader_leann.py @@ -4,11 +4,104 @@ import dotenv from pathlib import Path from typing import List, Any from leann.api import LeannBuilder, LeannSearcher, LeannChat -from mail_reader_llamaindex import EmlxReader from llama_index.core.node_parser import SentenceSplitter dotenv.load_dotenv() +def create_leann_index_from_multiple_sources(messages_dirs: List[Path], index_path: str = "mail_index.leann", max_count: int = -1): + """ + Create LEANN index from multiple mail data sources. + + Args: + messages_dirs: List of Path objects pointing to Messages directories + index_path: Path to save the LEANN index + max_count: Maximum number of emails to process per directory + """ + print("Creating LEANN index from multiple mail data sources...") + + # Load documents using EmlxReader from LEANN_email_reader + from LEANN_email_reader import EmlxReader + reader = EmlxReader() + # from email_data.email import EmlxMboxReader + # from pathlib import Path + # reader = EmlxMboxReader() + + all_documents = [] + total_processed = 0 + + # Process each Messages directory + for i, messages_dir in enumerate(messages_dirs): + print(f"\nProcessing Messages directory {i+1}/{len(messages_dirs)}: {messages_dir}") + + try: + documents = reader.load_data(messages_dir) + if documents: + print(f"Loaded {len(documents)} email documents from {messages_dir}") + all_documents.extend(documents) + total_processed += len(documents) + + # Check if we've reached the max count + if max_count > 0 and total_processed >= max_count: + print(f"Reached max count of {max_count} documents") + break + else: + print(f"No documents loaded from {messages_dir}") + except Exception as e: + print(f"Error processing {messages_dir}: {e}") + continue + + if not all_documents: + print("No documents loaded from any source. Exiting.") + return None + + print(f"\nTotal loaded {len(all_documents)} email documents from {len(messages_dirs)} directories") + + # Create text splitter with 256 chunk size + text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) + + # Convert Documents to text strings and chunk them + all_texts = [] + for doc in all_documents: + # Split the document into chunks + nodes = text_splitter.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print(f"Created {len(all_texts)} text chunks from {len(all_documents)} documents") + + # Create LEANN index directory + INDEX_DIR = Path(index_path).parent + + if not INDEX_DIR.exists(): + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + + # Use HNSW backend for better macOS compatibility + builder = LeannBuilder( + backend_name="hnsw", + embedding_model="facebook/contriever", + graph_degree=32, + complexity=64, + is_compact=True, + is_recompute=True, + num_threads=1 # Force single-threaded mode + ) + + print(f"Adding {len(all_texts)} email chunks to index...") + for chunk_text in all_texts: + builder.add_text(chunk_text) + + builder.build_index(index_path) + print(f"\nLEANN index built at {index_path}!") + else: + print(f"--- Using existing index at {INDEX_DIR} ---") + + return index_path + def create_leann_index(mail_path: str, index_path: str = "mail_index.leann", max_count: int = 1000): """ Create LEANN index from mail data. @@ -20,9 +113,13 @@ def create_leann_index(mail_path: str, index_path: str = "mail_index.leann", max """ print("Creating LEANN index from mail data...") - # Load documents using EmlxReader from mail_reader_llamaindex + # Load documents using EmlxReader from LEANN_email_reader + from LEANN_email_reader import EmlxReader reader = EmlxReader() - documents = reader.load_data(mail_path, max_count=max_count) + # from email_data.email import EmlxMboxReader + # from pathlib import Path + # reader = EmlxMboxReader() + documents = reader.load_data(Path(mail_path)) if not documents: print("No documents loaded. Exiting.") @@ -98,13 +195,22 @@ async def query_leann_index(index_path: str, query: str): print(f"Leann: {chat_response}") async def main(): - mail_path = "/Users/yichuan/Library/Mail/V10/0FCA0879-FD8C-4B7E-83BF-FDDA930791C5/[Gmail].mbox/All Mail.mbox/78BA5BE1-8819-4F9A-9613-EB63772F1DD0/Data/9/Messages" + # Base path to the mail data directory + base_mail_path = "/Users/yichuan/Library/Mail/V10/0FCA0879-FD8C-4B7E-83BF-FDDA930791C5/[Gmail].mbox/All Mail.mbox/78BA5BE1-8819-4F9A-9613-EB63772F1DD0/Data" - INDEX_DIR = Path("./mail_index_leann_raw_text") + INDEX_DIR = Path("./mail_index_leann_raw_text_all") INDEX_PATH = str(INDEX_DIR / "mail_documents.leann") - # Create or load the LEANN index - index_path = create_leann_index(mail_path, INDEX_PATH, max_count=1000) + # Find all Messages directories + from LEANN_email_reader import EmlxReader + messages_dirs = EmlxReader.find_all_messages_directories(base_mail_path) + + if not messages_dirs: + print("No Messages directories found. Exiting.") + return + + # Create or load the LEANN index from all sources + index_path = create_leann_index_from_multiple_sources(messages_dirs, INDEX_PATH) if index_path: # Example queries diff --git a/examples/mail_reader_llamaindex.py b/examples/mail_reader_llamaindex.py index fadb1cf..97abe10 100644 --- a/examples/mail_reader_llamaindex.py +++ b/examples/mail_reader_llamaindex.py @@ -1,9 +1,7 @@ import os -import email from pathlib import Path from typing import List, Any -from llama_index.core import VectorStoreIndex, Document, StorageContext -from llama_index.core.readers.base import BaseReader +from llama_index.core import VectorStoreIndex, StorageContext from llama_index.core.node_parser import SentenceSplitter # --- EMBEDDING MODEL --- @@ -12,102 +10,8 @@ import torch # --- END EMBEDDING MODEL --- -class EmlxReader(BaseReader): - """ - Apple Mail .emlx file reader with embedded metadata. - - Reads individual .emlx files from Apple Mail's storage format. - """ - - def __init__(self) -> None: - """Initialize.""" - pass - - def load_data(self, input_dir: str, **load_kwargs: Any) -> List[Document]: - """ - Load data from the input directory containing .emlx files. - - Args: - input_dir: Directory containing .emlx files - **load_kwargs: - max_count (int): Maximum amount of messages to read. - """ - docs: List[Document] = [] - max_count = load_kwargs.get('max_count', 1000) - count = 0 - - # Walk through the directory recursively - for dirpath, dirnames, filenames in os.walk(input_dir): - # Skip hidden directories - dirnames[:] = [d for d in dirnames if not d.startswith(".")] - - for filename in filenames: - if count >= max_count: - break - - if filename.endswith(".emlx"): - filepath = os.path.join(dirpath, filename) - try: - # Read the .emlx file - with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: - content = f.read() - - # .emlx files have a length prefix followed by the email content - # The first line contains the length, followed by the email - lines = content.split('\n', 1) - if len(lines) >= 2: - email_content = lines[1] - - # Parse the email using Python's email module - try: - msg = email.message_from_string(email_content) - - # Extract email metadata - subject = msg.get('Subject', 'No Subject') - from_addr = msg.get('From', 'Unknown') - to_addr = msg.get('To', 'Unknown') - date = msg.get('Date', 'Unknown') - - # Extract email body - body = "" - if msg.is_multipart(): - for part in msg.walk(): - if part.get_content_type() == "text/plain" or part.get_content_type() == "text/html": - if part.get_content_type() == "text/html": - continue - body += part.get_payload(decode=True).decode('utf-8', errors='ignore') - # break - else: - body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') - - # Create document content with metadata embedded in text - doc_content = f""" -[EMAIL METADATA] -File: {filename} -From: {from_addr} -To: {to_addr} -Subject: {subject} -Date: {date} -[END METADATA] - -{body} -""" - - # No separate metadata - everything is in the text - doc = Document(text=doc_content, metadata={}) - docs.append(doc) - count += 1 - - except Exception as e: - print(f"Error parsing email from {filepath}: {e}") - continue - - except Exception as e: - print(f"Error reading file {filepath}: {e}") - continue - - print(f"Loaded {len(docs)} email documents") - return docs +# Import EmlxReader from the new module +from LEANN_email_reader import EmlxReader def create_and_save_index(mail_path: str, save_dir: str = "mail_index_embedded", max_count: int = 1000): print("Creating index from mail data with embedded metadata...")