diff --git a/README.md b/README.md index 6a7926d..b064cc8 100755 --- a/README.md +++ b/README.md @@ -327,7 +327,7 @@ export NCCL_SOCKET_IFNAME=ens5 - [ ] Advanced caching strategies - [ ] GPU-accelerated embedding computation - [ ] Add sleep-time-compute and summarize agent! to summarilze the file on computer! - +- [ ] Add OpenAI recompute API ### 🌟 Q4 2025 diff --git a/examples/history_data/wechat_history.py b/examples/history_data/wechat_history.py new file mode 100644 index 0000000..0879933 --- /dev/null +++ b/examples/history_data/wechat_history.py @@ -0,0 +1,705 @@ +import json +import os +import re +import subprocess +import sys +import time +from pathlib import Path +from typing import List, Any, Dict, Optional +from llama_index.core import Document +from llama_index.core.readers.base import BaseReader +from datetime import datetime + +class WeChatHistoryReader(BaseReader): + """ + WeChat chat history reader that extracts chat data from exported JSON files. + + Reads WeChat chat history from exported JSON files (from wechat-exporter tool) + and creates documents with embedded metadata similar to the Chrome history reader structure. + + Also includes utilities for automatic WeChat chat history export. + """ + + def __init__(self) -> None: + """Initialize.""" + self.packages_dir = Path(__file__).parent.parent.parent / "packages" + self.wechat_exporter_dir = self.packages_dir / "wechat-exporter" + self.wechat_decipher_dir = self.packages_dir / "wechat-decipher-macos" + + def check_wechat_running(self) -> bool: + """Check if WeChat is currently running.""" + try: + result = subprocess.run(["pgrep", "-f", "WeChat"], capture_output=True, text=True) + return result.returncode == 0 + except Exception: + return False + + def install_wechattweak(self) -> bool: + """Install WeChatTweak CLI tool.""" + try: + # Create wechat-exporter directory if it doesn't exist + self.wechat_exporter_dir.mkdir(parents=True, exist_ok=True) + + wechattweak_path = self.wechat_exporter_dir / "wechattweak-cli" + if not wechattweak_path.exists(): + print("Downloading WeChatTweak CLI...") + subprocess.run([ + "curl", "-L", "-o", str(wechattweak_path), + "https://github.com/JettChenT/WeChatTweak-CLI/releases/latest/download/wechattweak-cli" + ], check=True) + + # Make executable + wechattweak_path.chmod(0o755) + + # Install WeChatTweak + print("Installing WeChatTweak...") + subprocess.run(["sudo", str(wechattweak_path), "install"], check=True) + return True + except Exception as e: + print(f"Error installing WeChatTweak: {e}") + return False + + def restart_wechat(self): + """Restart WeChat to apply WeChatTweak.""" + try: + print("Restarting WeChat...") + subprocess.run(["pkill", "-f", "WeChat"], check=False) + time.sleep(2) + subprocess.run(["open", "-a", "WeChat"], check=True) + time.sleep(5) # Wait for WeChat to start + except Exception as e: + print(f"Error restarting WeChat: {e}") + + def check_api_available(self) -> bool: + """Check if WeChatTweak API is available.""" + try: + result = subprocess.run([ + "curl", "-s", "http://localhost:48065/wechat/allcontacts" + ], capture_output=True, text=True, timeout=5) + return result.returncode == 0 and result.stdout.strip() + except Exception: + return False + + + + + def _extract_readable_text(self, content: str) -> str: + """ + Extract readable text from message content, removing XML and system messages. + + Args: + content: The raw message content (can be string or dict) + + Returns: + Cleaned, readable text + """ + if not content: + return "" + + # Handle dictionary content (like quoted messages) + if isinstance(content, dict): + # Extract text from dictionary structure + text_parts = [] + if 'title' in content: + text_parts.append(str(content['title'])) + if 'quoted' in content: + text_parts.append(str(content['quoted'])) + if 'content' in content: + text_parts.append(str(content['content'])) + if 'text' in content: + text_parts.append(str(content['text'])) + + if text_parts: + return " | ".join(text_parts) + else: + # If we can't extract meaningful text from dict, return empty + return "" + + # Handle string content + if not isinstance(content, str): + return "" + + # Remove common prefixes like "wxid_xxx:\n" + clean_content = re.sub(r'^wxid_[^:]+:\s*', '', content) + clean_content = re.sub(r'^[^:]+:\s*', '', clean_content) + + # If it's just XML or system message, return empty + if clean_content.strip().startswith('<') or 'recalled a message' in clean_content: + return "" + + return clean_content.strip() + + def _is_text_message(self, content: str) -> bool: + """ + Check if a message contains readable text content. + + Args: + content: The message content (can be string or dict) + + Returns: + True if the message contains readable text, False otherwise + """ + if not content: + return False + + # Handle dictionary content + if isinstance(content, dict): + # Check if dict has any readable text fields + text_fields = ['title', 'quoted', 'content', 'text'] + for field in text_fields: + if field in content and content[field]: + return True + return False + + # Handle string content + if not isinstance(content, str): + return False + + # Skip image messages (contain XML with img tags) + if ' 0 and not clean_content.strip().startswith('<'): + return True + + return False + + def _concatenate_messages(self, messages: List[Dict], min_length: int = 128, max_length: int = 1000, + time_window_minutes: int = 30) -> List[Dict]: + """ + Concatenate messages based on length and time rules. + + Args: + messages: List of message dictionaries + min_length: Minimum length for concatenated message groups + max_length: Maximum length for concatenated message groups + time_window_minutes: Time window in minutes to group messages together + + Returns: + List of concatenated message groups + """ + if not messages: + return [] + + concatenated_groups = [] + current_group = [] + current_length = 0 + last_timestamp = None + + for message in messages: + # Extract message info + content = message.get('content', '') + message_text = message.get('message', '') + create_time = message.get('createTime', 0) + from_user = message.get('fromUser', '') + to_user = message.get('toUser', '') + is_sent_from_self = message.get('isSentFromSelf', False) + + # Extract readable text + readable_text = self._extract_readable_text(content) + if not readable_text: + readable_text = message_text + + # Skip empty messages + if not readable_text.strip(): + continue + + # Check time window constraint + if last_timestamp is not None and create_time > 0: + time_diff_minutes = (create_time - last_timestamp) / 60 + if time_diff_minutes > time_window_minutes: + # Time gap too large, start new group + if current_group and current_length >= min_length: + concatenated_groups.append({ + 'messages': current_group, + 'total_length': current_length, + 'start_time': current_group[0].get('createTime', 0), + 'end_time': current_group[-1].get('createTime', 0) + }) + current_group = [] + current_length = 0 + + # Check length constraint + message_length = len(readable_text) + if current_length + message_length > max_length and current_group: + # Current group would exceed max length, save it and start new + if current_length >= min_length: + concatenated_groups.append({ + 'messages': current_group, + 'total_length': current_length, + 'start_time': current_group[0].get('createTime', 0), + 'end_time': current_group[-1].get('createTime', 0) + }) + current_group = [] + current_length = 0 + + # Add message to current group + current_group.append(message) + current_length += message_length + last_timestamp = create_time + + # Add the last group if it meets minimum length + if current_group and current_length >= min_length: + concatenated_groups.append({ + 'messages': current_group, + 'total_length': current_length, + 'start_time': current_group[0].get('createTime', 0), + 'end_time': current_group[-1].get('createTime', 0) + }) + + return concatenated_groups + + def _create_concatenated_content(self, message_group: Dict, contact_name: str) -> str: + """ + Create concatenated content from a group of messages. + + Args: + message_group: Dictionary containing messages and metadata + contact_name: Name of the contact + + Returns: + Formatted concatenated content + """ + messages = message_group['messages'] + start_time = message_group['start_time'] + end_time = message_group['end_time'] + + # Format timestamps + if start_time: + try: + start_timestamp = datetime.fromtimestamp(start_time) + start_time_str = start_timestamp.strftime('%Y-%m-%d %H:%M:%S') + except: + start_time_str = str(start_time) + else: + start_time_str = "Unknown" + + if end_time: + try: + end_timestamp = datetime.fromtimestamp(end_time) + end_time_str = end_timestamp.strftime('%Y-%m-%d %H:%M:%S') + except: + end_time_str = str(end_time) + else: + end_time_str = "Unknown" + + # Build concatenated message content + message_parts = [] + for message in messages: + content = message.get('content', '') + message_text = message.get('message', '') + create_time = message.get('createTime', 0) + is_sent_from_self = message.get('isSentFromSelf', False) + + # Extract readable text + readable_text = self._extract_readable_text(content) + if not readable_text: + readable_text = message_text + + # Format individual message + if create_time: + try: + timestamp = datetime.fromtimestamp(create_time) + time_str = timestamp.strftime('%H:%M:%S') + except: + time_str = str(create_time) + else: + time_str = "Unknown" + + sender = "Me" if is_sent_from_self else "Contact" + message_parts.append(f"[{time_str}] {sender}: {readable_text}") + + concatenated_text = "\n".join(message_parts) + + # Create final document content + doc_content = f""" +Contact: {contact_name} +Time Range: {start_time_str} - {end_time_str} +Messages ({len(messages)} messages, {message_group['total_length']} chars): + +{concatenated_text} +""" + return doc_content + + def load_data(self, input_dir: str = None, **load_kwargs: Any) -> List[Document]: + """ + Load WeChat chat history data from exported JSON files. + + Args: + input_dir: Directory containing exported WeChat JSON files + **load_kwargs: + max_count (int): Maximum amount of chat entries to read. + wechat_export_dir (str): Custom path to WeChat export directory. + include_non_text (bool): Whether to include non-text messages (images, emojis, etc.) + concatenate_messages (bool): Whether to concatenate messages based on length rules. + min_length (int): Minimum length for concatenated message groups (default: 128). + max_length (int): Maximum length for concatenated message groups (default: 1000). + time_window_minutes (int): Time window in minutes to group messages together (default: 30). + """ + docs: List[Document] = [] + max_count = load_kwargs.get('max_count', 1000) + wechat_export_dir = load_kwargs.get('wechat_export_dir', None) + include_non_text = load_kwargs.get('include_non_text', False) + concatenate_messages = load_kwargs.get('concatenate_messages', False) + min_length = load_kwargs.get('min_length', 128) + max_length = load_kwargs.get('max_length', 1000) + time_window_minutes = load_kwargs.get('time_window_minutes', 30) + + # Default WeChat export path + if wechat_export_dir is None: + wechat_export_dir = "./wechat_export_test" + + if not os.path.exists(wechat_export_dir): + print(f"WeChat export directory not found at: {wechat_export_dir}") + return docs + + try: + # Find all JSON files in the export directory + json_files = list(Path(wechat_export_dir).glob("*.json")) + print(f"Found {len(json_files)} WeChat chat history files") + + count = 0 + for json_file in json_files: + if count >= max_count and max_count > 0: + break + + try: + with open(json_file, 'r', encoding='utf-8') as f: + chat_data = json.load(f) + + # Extract contact name from filename + contact_name = json_file.stem + + if concatenate_messages: + # Filter messages to only include readable text messages + readable_messages = [] + for message in chat_data: + try: + content = message.get('content', '') + if not include_non_text and not self._is_text_message(content): + continue + + readable_text = self._extract_readable_text(content) + if not readable_text and not include_non_text: + continue + + readable_messages.append(message) + except Exception as e: + print(f"Error processing message in {json_file}: {e}") + continue + + # Concatenate messages based on rules + message_groups = self._concatenate_messages( + readable_messages, + min_length=min_length, + max_length=max_length, + time_window_minutes=time_window_minutes + ) + + # Create documents from concatenated groups + for message_group in message_groups: + if count >= max_count and max_count > 0: + break + + doc_content = self._create_concatenated_content(message_group, contact_name) + doc = Document(text=doc_content, metadata={}) + docs.append(doc) + count += 1 + + print(f"Created {len(message_groups)} concatenated message groups for {contact_name}") + + else: + # Original single-message processing + for message in chat_data: + if count >= max_count and max_count > 0: + break + + # Extract message information + from_user = message.get('fromUser', '') + to_user = message.get('toUser', '') + content = message.get('content', '') + message_text = message.get('message', '') + create_time = message.get('createTime', 0) + is_sent_from_self = message.get('isSentFromSelf', False) + + # Handle content that might be dict or string + try: + # Check if this is a readable text message + if not include_non_text and not self._is_text_message(content): + continue + + # Extract readable text + readable_text = self._extract_readable_text(content) + if not readable_text and not include_non_text: + continue + except Exception as e: + # Skip messages that cause processing errors + print(f"Error processing message in {json_file}: {e}") + continue + + # Convert timestamp to readable format + if create_time: + try: + timestamp = datetime.fromtimestamp(create_time) + time_str = timestamp.strftime('%Y-%m-%d %H:%M:%S') + except: + time_str = str(create_time) + else: + time_str = "Unknown" + + # Create document content with metadata header and contact info + doc_content = f""" +Contact: {contact_name} +Is sent from self: {is_sent_from_self} +Time: {time_str} +Message: {readable_text if readable_text else message_text} +""" + + # Create document with embedded metadata + doc = Document(text=doc_content, metadata={}) + docs.append(doc) + count += 1 + + except Exception as e: + print(f"Error reading {json_file}: {e}") + continue + + print(f"Loaded {len(docs)} WeChat chat documents") + + except Exception as e: + print(f"Error reading WeChat history: {e}") + return docs + + return docs + + @staticmethod + def find_wechat_export_dirs() -> List[Path]: + """ + Find all WeChat export directories. + + Returns: + List of Path objects pointing to WeChat export directories + """ + export_dirs = [] + + # Look for common export directory names + possible_dirs = [ + Path("./wechat_export_test"), + Path("./wechat_export"), + Path("./wechat_chat_history"), + Path("./chat_export") + ] + + for export_dir in possible_dirs: + if export_dir.exists() and export_dir.is_dir(): + json_files = list(export_dir.glob("*.json")) + if json_files: + export_dirs.append(export_dir) + print(f"Found WeChat export directory: {export_dir} with {len(json_files)} files") + + print(f"Found {len(export_dirs)} WeChat export directories") + return export_dirs + + @staticmethod + def export_chat_to_file(output_file: str = "wechat_chat_export.txt", max_count: int = 1000, export_dir: str = None, include_non_text: bool = False): + """ + Export WeChat chat history to a text file. + + Args: + output_file: Path to the output file + max_count: Maximum number of entries to export + export_dir: Directory containing WeChat JSON files + include_non_text: Whether to include non-text messages + """ + if export_dir is None: + export_dir = "./wechat_export_test" + + if not os.path.exists(export_dir): + print(f"WeChat export directory not found at: {export_dir}") + return + + try: + json_files = list(Path(export_dir).glob("*.json")) + + with open(output_file, 'w', encoding='utf-8') as f: + count = 0 + for json_file in json_files: + if count >= max_count and max_count > 0: + break + + try: + with open(json_file, 'r', encoding='utf-8') as json_f: + chat_data = json.load(json_f) + + contact_name = json_file.stem + f.write(f"\n=== Chat with {contact_name} ===\n") + + for message in chat_data: + if count >= max_count and max_count > 0: + break + + from_user = message.get('fromUser', '') + content = message.get('content', '') + message_text = message.get('message', '') + create_time = message.get('createTime', 0) + + # Skip non-text messages unless requested + if not include_non_text: + reader = WeChatHistoryReader() + if not reader._is_text_message(content): + continue + readable_text = reader._extract_readable_text(content) + if not readable_text: + continue + message_text = readable_text + + if create_time: + try: + timestamp = datetime.fromtimestamp(create_time) + time_str = timestamp.strftime('%Y-%m-%d %H:%M:%S') + except: + time_str = str(create_time) + else: + time_str = "Unknown" + + f.write(f"[{time_str}] {from_user}: {message_text}\n") + count += 1 + + except Exception as e: + print(f"Error processing {json_file}: {e}") + continue + + print(f"Exported {count} chat entries to {output_file}") + + except Exception as e: + print(f"Error exporting WeChat chat history: {e}") + + def export_wechat_chat_history(self, export_dir: str = "./wechat_export_direct") -> Optional[Path]: + """ + Export WeChat chat history using wechat-exporter tool. + + Args: + export_dir: Directory to save exported chat history + + Returns: + Path to export directory if successful, None otherwise + """ + try: + import subprocess + import sys + + # Create export directory + export_path = Path(export_dir) + export_path.mkdir(exist_ok=True) + + print(f"Exporting WeChat chat history to {export_path}...") + + # Check if wechat-exporter directory exists + if not self.wechat_exporter_dir.exists(): + print(f"wechat-exporter directory not found at: {self.wechat_exporter_dir}") + return None + + # Install requirements if needed + requirements_file = self.wechat_exporter_dir / "requirements.txt" + if requirements_file.exists(): + print("Installing wechat-exporter requirements...") + subprocess.run([ + "uv", "pip", "install", "-r", str(requirements_file) + ], check=True) + + # Run the export command + print("Running wechat-exporter...") + result = subprocess.run([ + sys.executable, str(self.wechat_exporter_dir / "main.py"), + "export-all", str(export_path) + ], capture_output=True, text=True, check=True) + + print("Export command output:") + print(result.stdout) + if result.stderr: + print("Export errors:") + print(result.stderr) + + # Check if export was successful + if export_path.exists() and any(export_path.glob("*.json")): + json_files = list(export_path.glob("*.json")) + print(f"Successfully exported {len(json_files)} chat history files to {export_path}") + return export_path + else: + print("Export completed but no JSON files found") + return None + + except subprocess.CalledProcessError as e: + print(f"Export command failed: {e}") + print(f"Command output: {e.stdout}") + print(f"Command errors: {e.stderr}") + return None + except Exception as e: + print(f"Export failed: {e}") + print("Please ensure WeChat is running and WeChatTweak is installed.") + return None + + def find_or_export_wechat_data(self, export_dir: str = "./wechat_export_direct") -> List[Path]: + """ + Find existing WeChat exports or create new ones. + + Args: + export_dir: Directory to save exported chat history if needed + + Returns: + List of Path objects pointing to WeChat export directories + """ + export_dirs = [] + + # Look for existing exports in common locations + possible_export_dirs = [ + Path("./wechat_database_export"), + Path("./wechat_export_test"), + Path("./wechat_export"), + Path("./wechat_chat_history"), + Path("./chat_export") + ] + + for export_dir_path in possible_export_dirs: + if export_dir_path.exists() and any(export_dir_path.glob("*.json")): + export_dirs.append(export_dir_path) + print(f"Found existing export: {export_dir_path}") + + # If no existing exports, try to export automatically + if not export_dirs: + print("No existing WeChat exports found. Starting direct export...") + + # Try to export using wechat-exporter + exported_path = self.export_wechat_chat_history(export_dir) + if exported_path: + export_dirs = [exported_path] + else: + print("Failed to export WeChat data. Please ensure WeChat is running and WeChatTweak is installed.") + + return export_dirs \ No newline at end of file diff --git a/examples/wechat_history_reader_leann.py b/examples/wechat_history_reader_leann.py new file mode 100644 index 0000000..30c7302 --- /dev/null +++ b/examples/wechat_history_reader_leann.py @@ -0,0 +1,249 @@ +import os +import asyncio +import dotenv +from pathlib import Path +from typing import List, Any, Optional +from leann.api import LeannBuilder, LeannSearcher, LeannChat +from llama_index.core.node_parser import SentenceSplitter +import requests +import time + +dotenv.load_dotenv() + +def create_leann_index_from_multiple_wechat_exports(export_dirs: List[Path], index_path: str = "wechat_history_index.leann", max_count: int = -1): + """ + Create LEANN index from multiple WeChat export data sources. + + Args: + export_dirs: List of Path objects pointing to WeChat export directories + index_path: Path to save the LEANN index + max_count: Maximum number of chat entries to process per export + """ + print("Creating LEANN index from multiple WeChat export data sources...") + + # Load documents using WeChatHistoryReader from history_data + from history_data.wechat_history import WeChatHistoryReader + reader = WeChatHistoryReader() + + INDEX_DIR = Path(index_path).parent + + if not INDEX_DIR.exists(): + print(f"--- Index directory not found, building new index ---") + all_documents = [] + total_processed = 0 + + # Process each WeChat export directory + for i, export_dir in enumerate(export_dirs): + print(f"\nProcessing WeChat export {i+1}/{len(export_dirs)}: {export_dir}") + + try: + documents = reader.load_data( + wechat_export_dir=str(export_dir), + max_count=max_count, + concatenate_messages=False # Disable concatenation - one message per document + ) + if documents: + print(f"Loaded {len(documents)} chat documents from {export_dir}") + all_documents.extend(documents) + total_processed += len(documents) + + # Check if we've reached the max count + if max_count > 0 and total_processed >= max_count: + print(f"Reached max count of {max_count} documents") + break + else: + print(f"No documents loaded from {export_dir}") + except Exception as e: + print(f"Error processing {export_dir}: {e}") + continue + + if not all_documents: + print("No documents loaded from any source. Exiting.") + return None + + print(f"\nTotal loaded {len(all_documents)} chat documents from {len(export_dirs)} exports") + + # Create text splitter with 256 chunk size + text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) + + # Convert Documents to text strings and chunk them + all_texts = [] + for doc in all_documents: + # Split the document into chunks + nodes = text_splitter.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print(f"Created {len(all_texts)} text chunks from {len(all_documents)} documents") + + # Create LEANN index directory + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + + # Use HNSW backend for better macOS compatibility + builder = LeannBuilder( + backend_name="hnsw", + embedding_model="Qwen/Qwen3-Embedding-0.6B", + graph_degree=32, + complexity=64, + is_compact=True, + is_recompute=True, + num_threads=1 # Force single-threaded mode + ) + + print(f"Adding {len(all_texts)} chat chunks to index...") + for chunk_text in all_texts: + builder.add_text(chunk_text) + + builder.build_index(index_path) + print(f"\nLEANN index built at {index_path}!") + else: + print(f"--- Using existing index at {INDEX_DIR} ---") + + return index_path + +def create_leann_index(export_dir: str = None, index_path: str = "wechat_history_index.leann", max_count: int = 1000): + """ + Create LEANN index from WeChat chat history data. + + Args: + export_dir: Path to the WeChat export directory (optional, uses default if None) + index_path: Path to save the LEANN index + max_count: Maximum number of chat entries to process + """ + print("Creating LEANN index from WeChat chat history data...") + INDEX_DIR = Path(index_path).parent + + if not INDEX_DIR.exists(): + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + + # Load documents using WeChatHistoryReader from history_data + from history_data.wechat_history import WeChatHistoryReader + reader = WeChatHistoryReader() + + documents = reader.load_data( + wechat_export_dir=export_dir, + max_count=max_count, + concatenate_messages=False # Disable concatenation - one message per document + ) + + if not documents: + print("No documents loaded. Exiting.") + return None + + print(f"Loaded {len(documents)} chat documents") + + # Create text splitter with 256 chunk size + text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25) + + # Convert Documents to text strings and chunk them + all_texts = [] + for doc in documents: + # Split the document into chunks + nodes = text_splitter.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print(f"Created {len(all_texts)} text chunks from {len(documents)} documents") + + # Create LEANN index directory + print(f"--- Index directory not found, building new index ---") + INDEX_DIR.mkdir(exist_ok=True) + + print(f"--- Building new LEANN index ---") + + print(f"\n[PHASE 1] Building Leann index...") + + # Use HNSW backend for better macOS compatibility + builder = LeannBuilder( + backend_name="hnsw", + embedding_model="mlx-community/Qwen3-Embedding-0.6B-4bit-DWQ", # MLX-optimized model + graph_degree=32, + complexity=64, + is_compact=True, + is_recompute=True, + num_threads=1 # Force single-threaded mode + ) + + print(f"Adding {len(all_texts)} chat chunks to index...") + for chunk_text in all_texts: + builder.add_text(chunk_text) + + builder.build_index(index_path) + print(f"\nLEANN index built at {index_path}!") + else: + print(f"--- Using existing index at {INDEX_DIR} ---") + + return index_path + +async def query_leann_index(index_path: str, query: str): + """ + Query the LEANN index. + + Args: + index_path: Path to the LEANN index + query: The query string + """ + print(f"\n[PHASE 2] Starting Leann chat session...") + chat = LeannChat(index_path=index_path) + + print(f"You: {query}") + chat_response = chat.ask( + query, + top_k=5, + recompute_beighbor_embeddings=True, + complexity=32, + beam_width=1, + llm_config={ + "type": "openai", + "model": "gpt-4o", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + llm_kwargs={ + "temperature": 0.0, + "max_tokens": 1000 + } + ) + print(f"Leann: {chat_response}") + +async def main(): + """Main function with integrated WeChat export functionality.""" + + # Initialize WeChat reader with export capabilities + from history_data.wechat_history import WeChatHistoryReader + reader = WeChatHistoryReader() + + # Find existing exports or create new ones using the centralized method + export_dirs = reader.find_or_export_wechat_data("./wechat_export_direct") + + if not export_dirs: + print("Failed to find or export WeChat data. Exiting.") + return + + INDEX_DIR = Path("./wechat_history_index_leann_test") + INDEX_PATH = str(INDEX_DIR / "wechat_history.leann") + + # Create or load the LEANN index from all sources + index_path = create_leann_index_from_multiple_wechat_exports(export_dirs, INDEX_PATH, max_count=5000) + + if index_path: + # Example queries + queries = [ + "我想买魔术师约翰逊的球衣,给我一些对应聊天记录?", + ] + + for query in queries: + print("\n" + "="*60) + await query_leann_index(index_path, query) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/packages/leann-backend-diskann/third_party/DiskANN b/packages/leann-backend-diskann/third_party/DiskANN index 2dcf156..c7a9d68 160000 --- a/packages/leann-backend-diskann/third_party/DiskANN +++ b/packages/leann-backend-diskann/third_party/DiskANN @@ -1 +1 @@ -Subproject commit 2dcf156553050eeaf56e7b003f416fab70465429 +Subproject commit c7a9d681cbb9d03d899fb442d7dab0625dfac44c diff --git a/packages/leann-backend-hnsw/third_party/faiss b/packages/leann-backend-hnsw/third_party/faiss index 2547df4..2365db5 160000 --- a/packages/leann-backend-hnsw/third_party/faiss +++ b/packages/leann-backend-hnsw/third_party/faiss @@ -1 +1 @@ -Subproject commit 2547df4377ae097e2eabc9b019c15135b1fea2b4 +Subproject commit 2365db59a7ba253e8b075fbfa43a5c0d15dbda84 diff --git a/packages/wechat-exporter/main.py b/packages/wechat-exporter/main.py new file mode 100644 index 0000000..689bd73 --- /dev/null +++ b/packages/wechat-exporter/main.py @@ -0,0 +1,115 @@ +import json +import typer +from pathlib import Path +import requests +from tqdm import tqdm +import xml.etree.ElementTree as ET +from typing_extensions import Annotated +import sqlite3 + +app = typer.Typer() + +def get_safe_path(s: str) -> str: + """ + Remove invalid characters to sanitize a path. + :param s: str to sanitize + :returns: sanitized str + """ + ban_chars = "\\ / : * ? \" ' < > | $ \r \n".replace( + ' ', '') + for i in ban_chars: + s = s.replace(i, "") + return s + + +def process_history(history: str): + if history.startswith(""): + try: + root = ET.fromstring(history) + title = root.find('.//title').text if root.find('.//title') is not None else None + quoted = root.find('.//refermsg/content').text if root.find('.//refermsg/content') is not None else None + if title and quoted: + return { + "title": title, + "quoted": process_history(quoted) + } + if title: + return title + except Exception: + return history + return history + +def get_message(history: dict | str): + if isinstance(history, dict): + if 'title' in history: + return history['title'] + else: + return history + +def export_chathistory(user_id: str): + res = requests.get("http://localhost:48065/wechat/chatlog", params={ + "userId": user_id, + "count": 100000 + }).json() + for i in range(len(res['chatLogs'])): + res['chatLogs'][i]['content'] = process_history(res['chatLogs'][i]['content']) + res['chatLogs'][i]['message'] = get_message(res['chatLogs'][i]['content']) + return res['chatLogs'] + +@app.command() +def export_all(dest: Annotated[Path, typer.Argument(help="Destination path to export to.")]): + """ + Export all users' chat history to json files. + """ + if not dest.is_dir(): + if not dest.exists(): + inp = typer.prompt("Destination path does not exist, create it? (y/n)") + if inp.lower() == 'y': + dest.mkdir(parents=True) + else: + typer.echo("Aborted.", err=True) + return + else: + typer.echo("Destination path is not a directory!", err=True) + return + all_users = requests.get("http://localhost:48065/wechat/allcontacts").json() + + exported_count = 0 + for user in tqdm(all_users): + try: + usr_chatlog = export_chathistory(user['arg']) + + # Only write file if there are messages + if len(usr_chatlog) > 0: + out_path = dest/get_safe_path((user['title'] or "")+"-"+user['arg']+'.json') + with open(out_path, 'w', encoding='utf-8') as f: + json.dump(usr_chatlog, f, ensure_ascii=False, indent=2) + exported_count += 1 + except Exception as e: + print(f"Error exporting {user.get('title', 'Unknown')}: {e}") + continue + + print(f"Exported {exported_count} users' chat history to {dest} in json.") + +@app.command() +def export_sqlite(dest: Annotated[Path, typer.Argument(help="Destination path to export to.")] = Path("chatlog.db")): + """ + Export all users' chat history to a sqlite database. + """ + connection = sqlite3.connect(dest) + cursor = connection.cursor() + cursor.execute("CREATE TABLE IF NOT EXISTS chatlog (id INTEGER PRIMARY KEY AUTOINCREMENT, with_id TEXT, from_user TEXT, to_user TEXT, message TEXT, timest DATETIME, auxiliary TEXT)") + cursor.execute("CREATE INDEX IF NOT EXISTS chatlog_with_id_index ON chatlog (with_id)") + cursor.execute("CREATE TABLE iF NOT EXISTS users (id TEXT PRIMARY KEY, name TEXT)") + + all_users = requests.get("http://localhost:48065/wechat/allcontacts").json() + for user in tqdm(all_users): + cursor.execute("INSERT OR IGNORE INTO users (id, name) VALUES (?, ?)", (user['arg'], user['title'])) + usr_chatlog = export_chathistory(user['arg']) + for msg in usr_chatlog: + cursor.execute("INSERT INTO chatlog (with_id, from_user, to_user, message, timest, auxiliary) VALUES (?, ?, ?, ?, ?, ?)", (user['arg'], msg['fromUser'], msg['toUser'], msg['message'], msg['createTime'], str(msg['content']))) + connection.commit() + + +if __name__ == "__main__": + app() diff --git a/packages/wechat-exporter/wechattweak-cli b/packages/wechat-exporter/wechattweak-cli new file mode 100755 index 0000000..51e211e Binary files /dev/null and b/packages/wechat-exporter/wechattweak-cli differ