diff --git a/examples/google_history_reader_leann.py b/examples/google_history_reader_leann.py new file mode 100644 index 0000000..2c7c7a8 --- /dev/null +++ b/examples/google_history_reader_leann.py @@ -0,0 +1,234 @@ +import os +import asyncio +import dotenv +from pathlib import Path +from typing import List, Any +from leann.api import LeannBuilder, LeannSearcher, LeannChat +from llama_index.core.node_parser import SentenceSplitter + +dotenv.load_dotenv() + +def create_leann_index_from_multiple_chrome_profiles(profile_dirs: List[Path], index_path: str = "chrome_history_index.leann", max_count: int = -1): + """ + Create LEANN index from multiple Chrome profile data sources. + + Args: + profile_dirs: List of Path objects pointing to Chrome profile directories + index_path: Path to save the LEANN index + max_count: Maximum number of history entries to process per profile + """ + print("Creating LEANN index from multiple Chrome profile data sources...") + + # Load documents using ChromeHistoryReader from history_data + from history_data.history import ChromeHistoryReader + reader = ChromeHistoryReader() + + INDEX_DIR = Path(index_path).parent + + if not INDEX_DIR.exists(): + print(f"--- Index directory not found, building new index ---") + all_documents = [] + total_processed = 0 + + # Process each Chrome profile directory + for i, profile_dir in enumerate(profile_dirs): + print(f"\nProcessing Chrome profile {i+1}/{len(profile_dirs)}: {profile_dir}") + + try: + documents = reader.load_data( + chrome_profile_path=str(profile_dir), + max_count=max_count + ) + if documents: + print(f"Loaded {len(documents)} history documents from {profile_dir}") + all_documents.extend(documents) + total_processed += len(documents) + + # Check if we've reached the max count + if max_count > 0 and total_processed >= max_count: + print(f"Reached max count of {max_count} documents") + break + else: + print(f"No documents loaded from {profile_dir}") + except Exception as e: + print(f"Error processing {profile_dir}: {e}") + continue + + if not all_documents: + print("No documents loaded from any source. Exiting.") + return None + + print(f"\nTotal loaded {len(all_documents)} history documents from {len(profile_dirs)} profiles") + + # Create text splitter with 256 chunk size + text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) + + # Convert Documents to text strings and chunk them + all_texts = [] + for doc in all_documents: + # Split the document into chunks + nodes = text_splitter.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print(f"Created {len(all_texts)} text chunks from {len(all_documents)} documents") + + # Create LEANN index directory + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + + # Use HNSW backend for better macOS compatibility + builder = LeannBuilder( + backend_name="hnsw", + embedding_model="facebook/contriever", + graph_degree=32, + complexity=64, + is_compact=True, + is_recompute=True, + num_threads=1 # Force single-threaded mode + ) + + print(f"Adding {len(all_texts)} history chunks to index...") + for chunk_text in all_texts: + builder.add_text(chunk_text) + + builder.build_index(index_path) + print(f"\nLEANN index built at {index_path}!") + else: + print(f"--- Using existing index at {INDEX_DIR} ---") + + return index_path + +def create_leann_index(profile_path: str = None, index_path: str = "chrome_history_index.leann", max_count: int = 1000): + """ + Create LEANN index from Chrome history data. + + Args: + profile_path: Path to the Chrome profile directory (optional, uses default if None) + index_path: Path to save the LEANN index + max_count: Maximum number of history entries to process + """ + print("Creating LEANN index from Chrome history data...") + INDEX_DIR = Path(index_path).parent + + if not INDEX_DIR.exists(): + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + + # Load documents using ChromeHistoryReader from history_data + from history_data.history import ChromeHistoryReader + reader = ChromeHistoryReader() + + documents = reader.load_data( + chrome_profile_path=profile_path, + max_count=max_count + ) + + if not documents: + print("No documents loaded. Exiting.") + return None + + print(f"Loaded {len(documents)} history documents") + + # Create text splitter with 256 chunk size + text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) + + # Convert Documents to text strings and chunk them + all_texts = [] + for doc in documents: + # Split the document into chunks + nodes = text_splitter.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print(f"Created {len(all_texts)} text chunks from {len(documents)} documents") + + # Create LEANN index directory + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + + # Use HNSW backend for better macOS compatibility + builder = LeannBuilder( + backend_name="hnsw", + embedding_model="facebook/contriever", + graph_degree=32, + complexity=64, + is_compact=True, + is_recompute=True, + num_threads=1 # Force single-threaded mode + ) + + print(f"Adding {len(all_texts)} history chunks to index...") + for chunk_text in all_texts: + builder.add_text(chunk_text) + + builder.build_index(index_path) + print(f"\nLEANN index built at {index_path}!") + else: + print(f"--- Using existing index at {INDEX_DIR} ---") + + return index_path + +async def query_leann_index(index_path: str, query: str): + """ + Query the LEANN index. + + Args: + index_path: Path to the LEANN index + query: The query string + """ + print(f"\n[PHASE 2] Starting Leann chat session...") + chat = LeannChat(index_path=index_path) + + print(f"You: {query}") + chat_response = chat.ask( + query, + top_k=5, + recompute_beighbor_embeddings=True, + complexity=128, + beam_width=1 + ) + print(f"Leann: {chat_response}") + +async def main(): + # Default Chrome profile path + default_chrome_profile = os.path.expanduser("~/Library/Application Support/Google/Chrome/Default") + + INDEX_DIR = Path("./chrome_history_index_leann") + INDEX_PATH = str(INDEX_DIR / "chrome_history.leann") + + # Find all Chrome profile directories + from history_data.history import ChromeHistoryReader + profile_dirs = ChromeHistoryReader.find_chrome_profiles() + + if not profile_dirs: + print("No Chrome profiles found. Exiting.") + return + + # Create or load the LEANN index from all sources + index_path = create_leann_index_from_multiple_chrome_profiles(profile_dirs, INDEX_PATH) + + if index_path: + # Example queries + queries = [ + "What websites did I visit about machine learning?", + ] + + for query in queries: + print("\n" + "="*60) + await query_leann_index(index_path, query) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/history_data/__init__.py b/examples/history_data/__init__.py new file mode 100644 index 0000000..a3ef5a7 --- /dev/null +++ b/examples/history_data/__init__.py @@ -0,0 +1,3 @@ +from .history import ChromeHistoryReader + +__all__ = ['ChromeHistoryReader'] \ No newline at end of file diff --git a/examples/history_data/history.py b/examples/history_data/history.py new file mode 100644 index 0000000..0258258 --- /dev/null +++ b/examples/history_data/history.py @@ -0,0 +1,176 @@ +import sqlite3 +import os +from pathlib import Path +from typing import List, Any +from llama_index.core import Document +from llama_index.core.readers.base import BaseReader + +class ChromeHistoryReader(BaseReader): + """ + Chrome browser history reader that extracts browsing data from SQLite database. + + Reads Chrome history from the default Chrome profile location and creates documents + with embedded metadata similar to the email reader structure. + """ + + def __init__(self) -> None: + """Initialize.""" + pass + + def load_data(self, input_dir: str = None, **load_kwargs: Any) -> List[Document]: + """ + Load Chrome history data from the default Chrome profile location. + + Args: + input_dir: Not used for Chrome history (kept for compatibility) + **load_kwargs: + max_count (int): Maximum amount of history entries to read. + chrome_profile_path (str): Custom path to Chrome profile directory. + """ + docs: List[Document] = [] + max_count = load_kwargs.get('max_count', 1000) + chrome_profile_path = load_kwargs.get('chrome_profile_path', None) + + # Default Chrome profile path on macOS + if chrome_profile_path is None: + chrome_profile_path = os.path.expanduser("~/Library/Application Support/Google/Chrome/Default") + + history_db_path = os.path.join(chrome_profile_path, "History") + + if not os.path.exists(history_db_path): + print(f"Chrome history database not found at: {history_db_path}") + return docs + + try: + # Connect to the Chrome history database + print(f"Connecting to database: {history_db_path}") + conn = sqlite3.connect(history_db_path) + cursor = conn.cursor() + + # Query to get browsing history with metadata (removed created_time column) + query = """ + SELECT + datetime(last_visit_time/1000000-11644473600,'unixepoch','localtime') as last_visit, + url, + title, + visit_count, + typed_count, + hidden + FROM urls + ORDER BY last_visit_time DESC + """ + + print(f"Executing query on database: {history_db_path}") + cursor.execute(query) + rows = cursor.fetchall() + print(f"Query returned {len(rows)} rows") + + count = 0 + for row in rows: + if count >= max_count and max_count > 0: + break + + last_visit, url, title, visit_count, typed_count, hidden = row + + # Create document content with metadata embedded in text + doc_content = f""" +[BROWSING HISTORY METADATA] +URL: {url} +Title: {title} +Last Visit: {last_visit} +Visit Count: {visit_count} +Typed Count: {typed_count} +Hidden: {hidden} +[END METADATA] + +Title: {title} +URL: {url} +Last visited: {last_visit} +""" + + # Create document with embedded metadata + doc = Document(text=doc_content, metadata={}) + docs.append(doc) + count += 1 + + conn.close() + print(f"Loaded {len(docs)} Chrome history documents") + + except Exception as e: + print(f"Error reading Chrome history: {e}") + return docs + + return docs + + @staticmethod + def find_chrome_profiles() -> List[Path]: + """ + Find all Chrome profile directories. + + Returns: + List of Path objects pointing to Chrome profile directories + """ + chrome_base_path = Path(os.path.expanduser("~/Library/Application Support/Google/Chrome")) + profile_dirs = [] + + if not chrome_base_path.exists(): + print(f"Chrome directory not found at: {chrome_base_path}") + return profile_dirs + + # Find all profile directories + for profile_dir in chrome_base_path.iterdir(): + if profile_dir.is_dir() and profile_dir.name != "System Profile": + history_path = profile_dir / "History" + if history_path.exists(): + profile_dirs.append(profile_dir) + print(f"Found Chrome profile: {profile_dir}") + + print(f"Found {len(profile_dirs)} Chrome profiles") + return profile_dirs + + @staticmethod + def export_history_to_file(output_file: str = "chrome_history_export.txt", max_count: int = 1000): + """ + Export Chrome history to a text file using the same SQL query format. + + Args: + output_file: Path to the output file + max_count: Maximum number of entries to export + """ + chrome_profile_path = os.path.expanduser("~/Library/Application Support/Google/Chrome/Default") + history_db_path = os.path.join(chrome_profile_path, "History") + + if not os.path.exists(history_db_path): + print(f"Chrome history database not found at: {history_db_path}") + return + + try: + conn = sqlite3.connect(history_db_path) + cursor = conn.cursor() + + query = """ + SELECT + datetime(last_visit_time/1000000-11644473600,'unixepoch','localtime') as last_visit, + url, + title, + visit_count, + typed_count, + hidden + FROM urls + ORDER BY last_visit_time DESC + LIMIT ? + """ + + cursor.execute(query, (max_count,)) + rows = cursor.fetchall() + + with open(output_file, 'w', encoding='utf-8') as f: + for row in rows: + last_visit, url, title, visit_count, typed_count, hidden = row + f.write(f"{last_visit}\t{url}\t{title}\t{visit_count}\t{typed_count}\t{hidden}\n") + + conn.close() + print(f"Exported {len(rows)} history entries to {output_file}") + + except Exception as e: + print(f"Error exporting Chrome history: {e}") \ No newline at end of file diff --git a/examples/mail_reader_leann.py b/examples/mail_reader_leann.py index dae6df8..8c5c043 100644 --- a/examples/mail_reader_leann.py +++ b/examples/mail_reader_leann.py @@ -25,54 +25,55 @@ def create_leann_index_from_multiple_sources(messages_dirs: List[Path], index_pa # from email_data.email import EmlxMboxReader # from pathlib import Path # reader = EmlxMboxReader() - - all_documents = [] - total_processed = 0 - - # Process each Messages directory - for i, messages_dir in enumerate(messages_dirs): - print(f"\nProcessing Messages directory {i+1}/{len(messages_dirs)}: {messages_dir}") - - try: - documents = reader.load_data(messages_dir) - if documents: - print(f"Loaded {len(documents)} email documents from {messages_dir}") - all_documents.extend(documents) - total_processed += len(documents) - - # Check if we've reached the max count - if max_count > 0 and total_processed >= max_count: - print(f"Reached max count of {max_count} documents") - break - else: - print(f"No documents loaded from {messages_dir}") - except Exception as e: - print(f"Error processing {messages_dir}: {e}") - continue - - if not all_documents: - print("No documents loaded from any source. Exiting.") - return None - - print(f"\nTotal loaded {len(all_documents)} email documents from {len(messages_dirs)} directories") - - # Create text splitter with 256 chunk size - text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) - - # Convert Documents to text strings and chunk them - all_texts = [] - for doc in all_documents: - # Split the document into chunks - nodes = text_splitter.get_nodes_from_documents([doc]) - for node in nodes: - all_texts.append(node.get_content()) - - print(f"Created {len(all_texts)} text chunks from {len(all_documents)} documents") - - # Create LEANN index directory INDEX_DIR = Path(index_path).parent if not INDEX_DIR.exists(): + print(f"--- Index directory not found, building new index ---") + all_documents = [] + total_processed = 0 + + # Process each Messages directory + for i, messages_dir in enumerate(messages_dirs): + print(f"\nProcessing Messages directory {i+1}/{len(messages_dirs)}: {messages_dir}") + + try: + documents = reader.load_data(messages_dir) + if documents: + print(f"Loaded {len(documents)} email documents from {messages_dir}") + all_documents.extend(documents) + total_processed += len(documents) + + # Check if we've reached the max count + if max_count > 0 and total_processed >= max_count: + print(f"Reached max count of {max_count} documents") + break + else: + print(f"No documents loaded from {messages_dir}") + except Exception as e: + print(f"Error processing {messages_dir}: {e}") + continue + + if not all_documents: + print("No documents loaded from any source. Exiting.") + return None + + print(f"\nTotal loaded {len(all_documents)} email documents from {len(messages_dirs)} directories") + + # Create text splitter with 256 chunk size + text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) + + # Convert Documents to text strings and chunk them + all_texts = [] + for doc in all_documents: + # Split the document into chunks + nodes = text_splitter.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print(f"Created {len(all_texts)} text chunks from {len(all_documents)} documents") + + # Create LEANN index directory + print(f"--- Index directory not found, building new index ---") INDEX_DIR.mkdir(exist_ok=True) @@ -112,35 +113,6 @@ def create_leann_index(mail_path: str, index_path: str = "mail_index.leann", max max_count: Maximum number of emails to process """ print("Creating LEANN index from mail data...") - - # Load documents using EmlxReader from LEANN_email_reader - from LEANN_email_reader import EmlxReader - reader = EmlxReader() - # from email_data.email import EmlxMboxReader - # from pathlib import Path - # reader = EmlxMboxReader() - documents = reader.load_data(Path(mail_path)) - - if not documents: - print("No documents loaded. Exiting.") - return None - - print(f"Loaded {len(documents)} email documents") - - # Create text splitter with 256 chunk size - text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) - - # Convert Documents to text strings and chunk them - all_texts = [] - for doc in documents: - # Split the document into chunks - nodes = text_splitter.get_nodes_from_documents([doc]) - for node in nodes: - all_texts.append(node.get_content()) - - print(f"Created {len(all_texts)} text chunks from {len(documents)} documents") - - # Create LEANN index directory INDEX_DIR = Path(index_path).parent if not INDEX_DIR.exists(): @@ -151,6 +123,42 @@ def create_leann_index(mail_path: str, index_path: str = "mail_index.leann", max print(f"\n[PHASE 1] Building Leann index...") + # Load documents using EmlxReader from LEANN_email_reader + from LEANN_email_reader import EmlxReader + reader = EmlxReader() + # from email_data.email import EmlxMboxReader + # from pathlib import Path + # reader = EmlxMboxReader() + documents = reader.load_data(Path(mail_path)) + + if not documents: + print("No documents loaded. Exiting.") + return None + + print(f"Loaded {len(documents)} email documents") + + # Create text splitter with 256 chunk size + text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) + + # Convert Documents to text strings and chunk them + all_texts = [] + for doc in documents: + # Split the document into chunks + nodes = text_splitter.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print(f"Created {len(all_texts)} text chunks from {len(documents)} documents") + + # Create LEANN index directory + + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + # Use HNSW backend for better macOS compatibility builder = LeannBuilder( backend_name="hnsw", @@ -189,7 +197,7 @@ async def query_leann_index(index_path: str, query: str): query, top_k=5, recompute_beighbor_embeddings=True, - complexity=32, + complexity=128, beam_width=1 ) print(f"Leann: {chat_response}") @@ -198,7 +206,7 @@ async def main(): # Base path to the mail data directory base_mail_path = "/Users/yichuan/Library/Mail/V10/0FCA0879-FD8C-4B7E-83BF-FDDA930791C5/[Gmail].mbox/All Mail.mbox/78BA5BE1-8819-4F9A-9613-EB63772F1DD0/Data" - INDEX_DIR = Path("./mail_index_leann_raw_text_all") + INDEX_DIR = Path("./mail_index_leann_raw_text_all_dicts") INDEX_PATH = str(INDEX_DIR / "mail_documents.leann") # Find all Messages directories