add wechat history extract app
This commit is contained in:
@@ -327,7 +327,7 @@ export NCCL_SOCKET_IFNAME=ens5
|
|||||||
- [ ] Advanced caching strategies
|
- [ ] Advanced caching strategies
|
||||||
- [ ] GPU-accelerated embedding computation
|
- [ ] GPU-accelerated embedding computation
|
||||||
- [ ] Add sleep-time-compute and summarize agent! to summarilze the file on computer!
|
- [ ] Add sleep-time-compute and summarize agent! to summarilze the file on computer!
|
||||||
|
- [ ] Add OpenAI recompute API
|
||||||
|
|
||||||
### 🌟 Q4 2025
|
### 🌟 Q4 2025
|
||||||
|
|
||||||
|
|||||||
705
examples/history_data/wechat_history.py
Normal file
705
examples/history_data/wechat_history.py
Normal file
@@ -0,0 +1,705 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Any, Dict, Optional
|
||||||
|
from llama_index.core import Document
|
||||||
|
from llama_index.core.readers.base import BaseReader
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
class WeChatHistoryReader(BaseReader):
|
||||||
|
"""
|
||||||
|
WeChat chat history reader that extracts chat data from exported JSON files.
|
||||||
|
|
||||||
|
Reads WeChat chat history from exported JSON files (from wechat-exporter tool)
|
||||||
|
and creates documents with embedded metadata similar to the Chrome history reader structure.
|
||||||
|
|
||||||
|
Also includes utilities for automatic WeChat chat history export.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""Initialize."""
|
||||||
|
self.packages_dir = Path(__file__).parent.parent.parent / "packages"
|
||||||
|
self.wechat_exporter_dir = self.packages_dir / "wechat-exporter"
|
||||||
|
self.wechat_decipher_dir = self.packages_dir / "wechat-decipher-macos"
|
||||||
|
|
||||||
|
def check_wechat_running(self) -> bool:
|
||||||
|
"""Check if WeChat is currently running."""
|
||||||
|
try:
|
||||||
|
result = subprocess.run(["pgrep", "-f", "WeChat"], capture_output=True, text=True)
|
||||||
|
return result.returncode == 0
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def install_wechattweak(self) -> bool:
|
||||||
|
"""Install WeChatTweak CLI tool."""
|
||||||
|
try:
|
||||||
|
# Create wechat-exporter directory if it doesn't exist
|
||||||
|
self.wechat_exporter_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
wechattweak_path = self.wechat_exporter_dir / "wechattweak-cli"
|
||||||
|
if not wechattweak_path.exists():
|
||||||
|
print("Downloading WeChatTweak CLI...")
|
||||||
|
subprocess.run([
|
||||||
|
"curl", "-L", "-o", str(wechattweak_path),
|
||||||
|
"https://github.com/JettChenT/WeChatTweak-CLI/releases/latest/download/wechattweak-cli"
|
||||||
|
], check=True)
|
||||||
|
|
||||||
|
# Make executable
|
||||||
|
wechattweak_path.chmod(0o755)
|
||||||
|
|
||||||
|
# Install WeChatTweak
|
||||||
|
print("Installing WeChatTweak...")
|
||||||
|
subprocess.run(["sudo", str(wechattweak_path), "install"], check=True)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error installing WeChatTweak: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def restart_wechat(self):
|
||||||
|
"""Restart WeChat to apply WeChatTweak."""
|
||||||
|
try:
|
||||||
|
print("Restarting WeChat...")
|
||||||
|
subprocess.run(["pkill", "-f", "WeChat"], check=False)
|
||||||
|
time.sleep(2)
|
||||||
|
subprocess.run(["open", "-a", "WeChat"], check=True)
|
||||||
|
time.sleep(5) # Wait for WeChat to start
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error restarting WeChat: {e}")
|
||||||
|
|
||||||
|
def check_api_available(self) -> bool:
|
||||||
|
"""Check if WeChatTweak API is available."""
|
||||||
|
try:
|
||||||
|
result = subprocess.run([
|
||||||
|
"curl", "-s", "http://localhost:48065/wechat/allcontacts"
|
||||||
|
], capture_output=True, text=True, timeout=5)
|
||||||
|
return result.returncode == 0 and result.stdout.strip()
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_readable_text(self, content: str) -> str:
|
||||||
|
"""
|
||||||
|
Extract readable text from message content, removing XML and system messages.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: The raw message content (can be string or dict)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Cleaned, readable text
|
||||||
|
"""
|
||||||
|
if not content:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# Handle dictionary content (like quoted messages)
|
||||||
|
if isinstance(content, dict):
|
||||||
|
# Extract text from dictionary structure
|
||||||
|
text_parts = []
|
||||||
|
if 'title' in content:
|
||||||
|
text_parts.append(str(content['title']))
|
||||||
|
if 'quoted' in content:
|
||||||
|
text_parts.append(str(content['quoted']))
|
||||||
|
if 'content' in content:
|
||||||
|
text_parts.append(str(content['content']))
|
||||||
|
if 'text' in content:
|
||||||
|
text_parts.append(str(content['text']))
|
||||||
|
|
||||||
|
if text_parts:
|
||||||
|
return " | ".join(text_parts)
|
||||||
|
else:
|
||||||
|
# If we can't extract meaningful text from dict, return empty
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# Handle string content
|
||||||
|
if not isinstance(content, str):
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# Remove common prefixes like "wxid_xxx:\n"
|
||||||
|
clean_content = re.sub(r'^wxid_[^:]+:\s*', '', content)
|
||||||
|
clean_content = re.sub(r'^[^:]+:\s*', '', clean_content)
|
||||||
|
|
||||||
|
# If it's just XML or system message, return empty
|
||||||
|
if clean_content.strip().startswith('<') or 'recalled a message' in clean_content:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
return clean_content.strip()
|
||||||
|
|
||||||
|
def _is_text_message(self, content: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if a message contains readable text content.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: The message content (can be string or dict)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the message contains readable text, False otherwise
|
||||||
|
"""
|
||||||
|
if not content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Handle dictionary content
|
||||||
|
if isinstance(content, dict):
|
||||||
|
# Check if dict has any readable text fields
|
||||||
|
text_fields = ['title', 'quoted', 'content', 'text']
|
||||||
|
for field in text_fields:
|
||||||
|
if field in content and content[field]:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Handle string content
|
||||||
|
if not isinstance(content, str):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Skip image messages (contain XML with img tags)
|
||||||
|
if '<img' in content and 'cdnurl' in content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Skip emoji messages (contain emoji XML tags)
|
||||||
|
if '<emoji' in content and 'productid' in content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Skip voice messages
|
||||||
|
if '<voice' in content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Skip video messages
|
||||||
|
if '<video' in content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Skip file messages
|
||||||
|
if '<appmsg' in content and 'appid' in content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Skip system messages (like "recalled a message")
|
||||||
|
if 'recalled a message' in content:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if there's actual readable text (not just XML or system messages)
|
||||||
|
# Remove common prefixes like "wxid_xxx:\n" and check for actual content
|
||||||
|
clean_content = re.sub(r'^wxid_[^:]+:\s*', '', content)
|
||||||
|
clean_content = re.sub(r'^[^:]+:\s*', '', clean_content)
|
||||||
|
|
||||||
|
# If after cleaning we have meaningful text, consider it readable
|
||||||
|
if len(clean_content.strip()) > 0 and not clean_content.strip().startswith('<'):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _concatenate_messages(self, messages: List[Dict], min_length: int = 128, max_length: int = 1000,
|
||||||
|
time_window_minutes: int = 30) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
Concatenate messages based on length and time rules.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
messages: List of message dictionaries
|
||||||
|
min_length: Minimum length for concatenated message groups
|
||||||
|
max_length: Maximum length for concatenated message groups
|
||||||
|
time_window_minutes: Time window in minutes to group messages together
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of concatenated message groups
|
||||||
|
"""
|
||||||
|
if not messages:
|
||||||
|
return []
|
||||||
|
|
||||||
|
concatenated_groups = []
|
||||||
|
current_group = []
|
||||||
|
current_length = 0
|
||||||
|
last_timestamp = None
|
||||||
|
|
||||||
|
for message in messages:
|
||||||
|
# Extract message info
|
||||||
|
content = message.get('content', '')
|
||||||
|
message_text = message.get('message', '')
|
||||||
|
create_time = message.get('createTime', 0)
|
||||||
|
from_user = message.get('fromUser', '')
|
||||||
|
to_user = message.get('toUser', '')
|
||||||
|
is_sent_from_self = message.get('isSentFromSelf', False)
|
||||||
|
|
||||||
|
# Extract readable text
|
||||||
|
readable_text = self._extract_readable_text(content)
|
||||||
|
if not readable_text:
|
||||||
|
readable_text = message_text
|
||||||
|
|
||||||
|
# Skip empty messages
|
||||||
|
if not readable_text.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check time window constraint
|
||||||
|
if last_timestamp is not None and create_time > 0:
|
||||||
|
time_diff_minutes = (create_time - last_timestamp) / 60
|
||||||
|
if time_diff_minutes > time_window_minutes:
|
||||||
|
# Time gap too large, start new group
|
||||||
|
if current_group and current_length >= min_length:
|
||||||
|
concatenated_groups.append({
|
||||||
|
'messages': current_group,
|
||||||
|
'total_length': current_length,
|
||||||
|
'start_time': current_group[0].get('createTime', 0),
|
||||||
|
'end_time': current_group[-1].get('createTime', 0)
|
||||||
|
})
|
||||||
|
current_group = []
|
||||||
|
current_length = 0
|
||||||
|
|
||||||
|
# Check length constraint
|
||||||
|
message_length = len(readable_text)
|
||||||
|
if current_length + message_length > max_length and current_group:
|
||||||
|
# Current group would exceed max length, save it and start new
|
||||||
|
if current_length >= min_length:
|
||||||
|
concatenated_groups.append({
|
||||||
|
'messages': current_group,
|
||||||
|
'total_length': current_length,
|
||||||
|
'start_time': current_group[0].get('createTime', 0),
|
||||||
|
'end_time': current_group[-1].get('createTime', 0)
|
||||||
|
})
|
||||||
|
current_group = []
|
||||||
|
current_length = 0
|
||||||
|
|
||||||
|
# Add message to current group
|
||||||
|
current_group.append(message)
|
||||||
|
current_length += message_length
|
||||||
|
last_timestamp = create_time
|
||||||
|
|
||||||
|
# Add the last group if it meets minimum length
|
||||||
|
if current_group and current_length >= min_length:
|
||||||
|
concatenated_groups.append({
|
||||||
|
'messages': current_group,
|
||||||
|
'total_length': current_length,
|
||||||
|
'start_time': current_group[0].get('createTime', 0),
|
||||||
|
'end_time': current_group[-1].get('createTime', 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
return concatenated_groups
|
||||||
|
|
||||||
|
def _create_concatenated_content(self, message_group: Dict, contact_name: str) -> str:
|
||||||
|
"""
|
||||||
|
Create concatenated content from a group of messages.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message_group: Dictionary containing messages and metadata
|
||||||
|
contact_name: Name of the contact
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted concatenated content
|
||||||
|
"""
|
||||||
|
messages = message_group['messages']
|
||||||
|
start_time = message_group['start_time']
|
||||||
|
end_time = message_group['end_time']
|
||||||
|
|
||||||
|
# Format timestamps
|
||||||
|
if start_time:
|
||||||
|
try:
|
||||||
|
start_timestamp = datetime.fromtimestamp(start_time)
|
||||||
|
start_time_str = start_timestamp.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
except:
|
||||||
|
start_time_str = str(start_time)
|
||||||
|
else:
|
||||||
|
start_time_str = "Unknown"
|
||||||
|
|
||||||
|
if end_time:
|
||||||
|
try:
|
||||||
|
end_timestamp = datetime.fromtimestamp(end_time)
|
||||||
|
end_time_str = end_timestamp.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
except:
|
||||||
|
end_time_str = str(end_time)
|
||||||
|
else:
|
||||||
|
end_time_str = "Unknown"
|
||||||
|
|
||||||
|
# Build concatenated message content
|
||||||
|
message_parts = []
|
||||||
|
for message in messages:
|
||||||
|
content = message.get('content', '')
|
||||||
|
message_text = message.get('message', '')
|
||||||
|
create_time = message.get('createTime', 0)
|
||||||
|
is_sent_from_self = message.get('isSentFromSelf', False)
|
||||||
|
|
||||||
|
# Extract readable text
|
||||||
|
readable_text = self._extract_readable_text(content)
|
||||||
|
if not readable_text:
|
||||||
|
readable_text = message_text
|
||||||
|
|
||||||
|
# Format individual message
|
||||||
|
if create_time:
|
||||||
|
try:
|
||||||
|
timestamp = datetime.fromtimestamp(create_time)
|
||||||
|
time_str = timestamp.strftime('%H:%M:%S')
|
||||||
|
except:
|
||||||
|
time_str = str(create_time)
|
||||||
|
else:
|
||||||
|
time_str = "Unknown"
|
||||||
|
|
||||||
|
sender = "Me" if is_sent_from_self else "Contact"
|
||||||
|
message_parts.append(f"[{time_str}] {sender}: {readable_text}")
|
||||||
|
|
||||||
|
concatenated_text = "\n".join(message_parts)
|
||||||
|
|
||||||
|
# Create final document content
|
||||||
|
doc_content = f"""
|
||||||
|
Contact: {contact_name}
|
||||||
|
Time Range: {start_time_str} - {end_time_str}
|
||||||
|
Messages ({len(messages)} messages, {message_group['total_length']} chars):
|
||||||
|
|
||||||
|
{concatenated_text}
|
||||||
|
"""
|
||||||
|
return doc_content
|
||||||
|
|
||||||
|
def load_data(self, input_dir: str = None, **load_kwargs: Any) -> List[Document]:
|
||||||
|
"""
|
||||||
|
Load WeChat chat history data from exported JSON files.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
input_dir: Directory containing exported WeChat JSON files
|
||||||
|
**load_kwargs:
|
||||||
|
max_count (int): Maximum amount of chat entries to read.
|
||||||
|
wechat_export_dir (str): Custom path to WeChat export directory.
|
||||||
|
include_non_text (bool): Whether to include non-text messages (images, emojis, etc.)
|
||||||
|
concatenate_messages (bool): Whether to concatenate messages based on length rules.
|
||||||
|
min_length (int): Minimum length for concatenated message groups (default: 128).
|
||||||
|
max_length (int): Maximum length for concatenated message groups (default: 1000).
|
||||||
|
time_window_minutes (int): Time window in minutes to group messages together (default: 30).
|
||||||
|
"""
|
||||||
|
docs: List[Document] = []
|
||||||
|
max_count = load_kwargs.get('max_count', 1000)
|
||||||
|
wechat_export_dir = load_kwargs.get('wechat_export_dir', None)
|
||||||
|
include_non_text = load_kwargs.get('include_non_text', False)
|
||||||
|
concatenate_messages = load_kwargs.get('concatenate_messages', False)
|
||||||
|
min_length = load_kwargs.get('min_length', 128)
|
||||||
|
max_length = load_kwargs.get('max_length', 1000)
|
||||||
|
time_window_minutes = load_kwargs.get('time_window_minutes', 30)
|
||||||
|
|
||||||
|
# Default WeChat export path
|
||||||
|
if wechat_export_dir is None:
|
||||||
|
wechat_export_dir = "./wechat_export_test"
|
||||||
|
|
||||||
|
if not os.path.exists(wechat_export_dir):
|
||||||
|
print(f"WeChat export directory not found at: {wechat_export_dir}")
|
||||||
|
return docs
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Find all JSON files in the export directory
|
||||||
|
json_files = list(Path(wechat_export_dir).glob("*.json"))
|
||||||
|
print(f"Found {len(json_files)} WeChat chat history files")
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
for json_file in json_files:
|
||||||
|
if count >= max_count and max_count > 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(json_file, 'r', encoding='utf-8') as f:
|
||||||
|
chat_data = json.load(f)
|
||||||
|
|
||||||
|
# Extract contact name from filename
|
||||||
|
contact_name = json_file.stem
|
||||||
|
|
||||||
|
if concatenate_messages:
|
||||||
|
# Filter messages to only include readable text messages
|
||||||
|
readable_messages = []
|
||||||
|
for message in chat_data:
|
||||||
|
try:
|
||||||
|
content = message.get('content', '')
|
||||||
|
if not include_non_text and not self._is_text_message(content):
|
||||||
|
continue
|
||||||
|
|
||||||
|
readable_text = self._extract_readable_text(content)
|
||||||
|
if not readable_text and not include_non_text:
|
||||||
|
continue
|
||||||
|
|
||||||
|
readable_messages.append(message)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing message in {json_file}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Concatenate messages based on rules
|
||||||
|
message_groups = self._concatenate_messages(
|
||||||
|
readable_messages,
|
||||||
|
min_length=min_length,
|
||||||
|
max_length=max_length,
|
||||||
|
time_window_minutes=time_window_minutes
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create documents from concatenated groups
|
||||||
|
for message_group in message_groups:
|
||||||
|
if count >= max_count and max_count > 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
doc_content = self._create_concatenated_content(message_group, contact_name)
|
||||||
|
doc = Document(text=doc_content, metadata={})
|
||||||
|
docs.append(doc)
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
print(f"Created {len(message_groups)} concatenated message groups for {contact_name}")
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Original single-message processing
|
||||||
|
for message in chat_data:
|
||||||
|
if count >= max_count and max_count > 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Extract message information
|
||||||
|
from_user = message.get('fromUser', '')
|
||||||
|
to_user = message.get('toUser', '')
|
||||||
|
content = message.get('content', '')
|
||||||
|
message_text = message.get('message', '')
|
||||||
|
create_time = message.get('createTime', 0)
|
||||||
|
is_sent_from_self = message.get('isSentFromSelf', False)
|
||||||
|
|
||||||
|
# Handle content that might be dict or string
|
||||||
|
try:
|
||||||
|
# Check if this is a readable text message
|
||||||
|
if not include_non_text and not self._is_text_message(content):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Extract readable text
|
||||||
|
readable_text = self._extract_readable_text(content)
|
||||||
|
if not readable_text and not include_non_text:
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
# Skip messages that cause processing errors
|
||||||
|
print(f"Error processing message in {json_file}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Convert timestamp to readable format
|
||||||
|
if create_time:
|
||||||
|
try:
|
||||||
|
timestamp = datetime.fromtimestamp(create_time)
|
||||||
|
time_str = timestamp.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
except:
|
||||||
|
time_str = str(create_time)
|
||||||
|
else:
|
||||||
|
time_str = "Unknown"
|
||||||
|
|
||||||
|
# Create document content with metadata header and contact info
|
||||||
|
doc_content = f"""
|
||||||
|
Contact: {contact_name}
|
||||||
|
Is sent from self: {is_sent_from_self}
|
||||||
|
Time: {time_str}
|
||||||
|
Message: {readable_text if readable_text else message_text}
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create document with embedded metadata
|
||||||
|
doc = Document(text=doc_content, metadata={})
|
||||||
|
docs.append(doc)
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error reading {json_file}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"Loaded {len(docs)} WeChat chat documents")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error reading WeChat history: {e}")
|
||||||
|
return docs
|
||||||
|
|
||||||
|
return docs
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find_wechat_export_dirs() -> List[Path]:
|
||||||
|
"""
|
||||||
|
Find all WeChat export directories.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of Path objects pointing to WeChat export directories
|
||||||
|
"""
|
||||||
|
export_dirs = []
|
||||||
|
|
||||||
|
# Look for common export directory names
|
||||||
|
possible_dirs = [
|
||||||
|
Path("./wechat_export_test"),
|
||||||
|
Path("./wechat_export"),
|
||||||
|
Path("./wechat_chat_history"),
|
||||||
|
Path("./chat_export")
|
||||||
|
]
|
||||||
|
|
||||||
|
for export_dir in possible_dirs:
|
||||||
|
if export_dir.exists() and export_dir.is_dir():
|
||||||
|
json_files = list(export_dir.glob("*.json"))
|
||||||
|
if json_files:
|
||||||
|
export_dirs.append(export_dir)
|
||||||
|
print(f"Found WeChat export directory: {export_dir} with {len(json_files)} files")
|
||||||
|
|
||||||
|
print(f"Found {len(export_dirs)} WeChat export directories")
|
||||||
|
return export_dirs
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def export_chat_to_file(output_file: str = "wechat_chat_export.txt", max_count: int = 1000, export_dir: str = None, include_non_text: bool = False):
|
||||||
|
"""
|
||||||
|
Export WeChat chat history to a text file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_file: Path to the output file
|
||||||
|
max_count: Maximum number of entries to export
|
||||||
|
export_dir: Directory containing WeChat JSON files
|
||||||
|
include_non_text: Whether to include non-text messages
|
||||||
|
"""
|
||||||
|
if export_dir is None:
|
||||||
|
export_dir = "./wechat_export_test"
|
||||||
|
|
||||||
|
if not os.path.exists(export_dir):
|
||||||
|
print(f"WeChat export directory not found at: {export_dir}")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
json_files = list(Path(export_dir).glob("*.json"))
|
||||||
|
|
||||||
|
with open(output_file, 'w', encoding='utf-8') as f:
|
||||||
|
count = 0
|
||||||
|
for json_file in json_files:
|
||||||
|
if count >= max_count and max_count > 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(json_file, 'r', encoding='utf-8') as json_f:
|
||||||
|
chat_data = json.load(json_f)
|
||||||
|
|
||||||
|
contact_name = json_file.stem
|
||||||
|
f.write(f"\n=== Chat with {contact_name} ===\n")
|
||||||
|
|
||||||
|
for message in chat_data:
|
||||||
|
if count >= max_count and max_count > 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
from_user = message.get('fromUser', '')
|
||||||
|
content = message.get('content', '')
|
||||||
|
message_text = message.get('message', '')
|
||||||
|
create_time = message.get('createTime', 0)
|
||||||
|
|
||||||
|
# Skip non-text messages unless requested
|
||||||
|
if not include_non_text:
|
||||||
|
reader = WeChatHistoryReader()
|
||||||
|
if not reader._is_text_message(content):
|
||||||
|
continue
|
||||||
|
readable_text = reader._extract_readable_text(content)
|
||||||
|
if not readable_text:
|
||||||
|
continue
|
||||||
|
message_text = readable_text
|
||||||
|
|
||||||
|
if create_time:
|
||||||
|
try:
|
||||||
|
timestamp = datetime.fromtimestamp(create_time)
|
||||||
|
time_str = timestamp.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
except:
|
||||||
|
time_str = str(create_time)
|
||||||
|
else:
|
||||||
|
time_str = "Unknown"
|
||||||
|
|
||||||
|
f.write(f"[{time_str}] {from_user}: {message_text}\n")
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {json_file}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"Exported {count} chat entries to {output_file}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error exporting WeChat chat history: {e}")
|
||||||
|
|
||||||
|
def export_wechat_chat_history(self, export_dir: str = "./wechat_export_direct") -> Optional[Path]:
|
||||||
|
"""
|
||||||
|
Export WeChat chat history using wechat-exporter tool.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
export_dir: Directory to save exported chat history
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to export directory if successful, None otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Create export directory
|
||||||
|
export_path = Path(export_dir)
|
||||||
|
export_path.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
print(f"Exporting WeChat chat history to {export_path}...")
|
||||||
|
|
||||||
|
# Check if wechat-exporter directory exists
|
||||||
|
if not self.wechat_exporter_dir.exists():
|
||||||
|
print(f"wechat-exporter directory not found at: {self.wechat_exporter_dir}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Install requirements if needed
|
||||||
|
requirements_file = self.wechat_exporter_dir / "requirements.txt"
|
||||||
|
if requirements_file.exists():
|
||||||
|
print("Installing wechat-exporter requirements...")
|
||||||
|
subprocess.run([
|
||||||
|
"uv", "pip", "install", "-r", str(requirements_file)
|
||||||
|
], check=True)
|
||||||
|
|
||||||
|
# Run the export command
|
||||||
|
print("Running wechat-exporter...")
|
||||||
|
result = subprocess.run([
|
||||||
|
sys.executable, str(self.wechat_exporter_dir / "main.py"),
|
||||||
|
"export-all", str(export_path)
|
||||||
|
], capture_output=True, text=True, check=True)
|
||||||
|
|
||||||
|
print("Export command output:")
|
||||||
|
print(result.stdout)
|
||||||
|
if result.stderr:
|
||||||
|
print("Export errors:")
|
||||||
|
print(result.stderr)
|
||||||
|
|
||||||
|
# Check if export was successful
|
||||||
|
if export_path.exists() and any(export_path.glob("*.json")):
|
||||||
|
json_files = list(export_path.glob("*.json"))
|
||||||
|
print(f"Successfully exported {len(json_files)} chat history files to {export_path}")
|
||||||
|
return export_path
|
||||||
|
else:
|
||||||
|
print("Export completed but no JSON files found")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"Export command failed: {e}")
|
||||||
|
print(f"Command output: {e.stdout}")
|
||||||
|
print(f"Command errors: {e.stderr}")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Export failed: {e}")
|
||||||
|
print("Please ensure WeChat is running and WeChatTweak is installed.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def find_or_export_wechat_data(self, export_dir: str = "./wechat_export_direct") -> List[Path]:
|
||||||
|
"""
|
||||||
|
Find existing WeChat exports or create new ones.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
export_dir: Directory to save exported chat history if needed
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of Path objects pointing to WeChat export directories
|
||||||
|
"""
|
||||||
|
export_dirs = []
|
||||||
|
|
||||||
|
# Look for existing exports in common locations
|
||||||
|
possible_export_dirs = [
|
||||||
|
Path("./wechat_database_export"),
|
||||||
|
Path("./wechat_export_test"),
|
||||||
|
Path("./wechat_export"),
|
||||||
|
Path("./wechat_chat_history"),
|
||||||
|
Path("./chat_export")
|
||||||
|
]
|
||||||
|
|
||||||
|
for export_dir_path in possible_export_dirs:
|
||||||
|
if export_dir_path.exists() and any(export_dir_path.glob("*.json")):
|
||||||
|
export_dirs.append(export_dir_path)
|
||||||
|
print(f"Found existing export: {export_dir_path}")
|
||||||
|
|
||||||
|
# If no existing exports, try to export automatically
|
||||||
|
if not export_dirs:
|
||||||
|
print("No existing WeChat exports found. Starting direct export...")
|
||||||
|
|
||||||
|
# Try to export using wechat-exporter
|
||||||
|
exported_path = self.export_wechat_chat_history(export_dir)
|
||||||
|
if exported_path:
|
||||||
|
export_dirs = [exported_path]
|
||||||
|
else:
|
||||||
|
print("Failed to export WeChat data. Please ensure WeChat is running and WeChatTweak is installed.")
|
||||||
|
|
||||||
|
return export_dirs
|
||||||
249
examples/wechat_history_reader_leann.py
Normal file
249
examples/wechat_history_reader_leann.py
Normal file
@@ -0,0 +1,249 @@
|
|||||||
|
import os
|
||||||
|
import asyncio
|
||||||
|
import dotenv
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Any, Optional
|
||||||
|
from leann.api import LeannBuilder, LeannSearcher, LeannChat
|
||||||
|
from llama_index.core.node_parser import SentenceSplitter
|
||||||
|
import requests
|
||||||
|
import time
|
||||||
|
|
||||||
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
|
def create_leann_index_from_multiple_wechat_exports(export_dirs: List[Path], index_path: str = "wechat_history_index.leann", max_count: int = -1):
|
||||||
|
"""
|
||||||
|
Create LEANN index from multiple WeChat export data sources.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
export_dirs: List of Path objects pointing to WeChat export directories
|
||||||
|
index_path: Path to save the LEANN index
|
||||||
|
max_count: Maximum number of chat entries to process per export
|
||||||
|
"""
|
||||||
|
print("Creating LEANN index from multiple WeChat export data sources...")
|
||||||
|
|
||||||
|
# Load documents using WeChatHistoryReader from history_data
|
||||||
|
from history_data.wechat_history import WeChatHistoryReader
|
||||||
|
reader = WeChatHistoryReader()
|
||||||
|
|
||||||
|
INDEX_DIR = Path(index_path).parent
|
||||||
|
|
||||||
|
if not INDEX_DIR.exists():
|
||||||
|
print(f"--- Index directory not found, building new index ---")
|
||||||
|
all_documents = []
|
||||||
|
total_processed = 0
|
||||||
|
|
||||||
|
# Process each WeChat export directory
|
||||||
|
for i, export_dir in enumerate(export_dirs):
|
||||||
|
print(f"\nProcessing WeChat export {i+1}/{len(export_dirs)}: {export_dir}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
documents = reader.load_data(
|
||||||
|
wechat_export_dir=str(export_dir),
|
||||||
|
max_count=max_count,
|
||||||
|
concatenate_messages=False # Disable concatenation - one message per document
|
||||||
|
)
|
||||||
|
if documents:
|
||||||
|
print(f"Loaded {len(documents)} chat documents from {export_dir}")
|
||||||
|
all_documents.extend(documents)
|
||||||
|
total_processed += len(documents)
|
||||||
|
|
||||||
|
# Check if we've reached the max count
|
||||||
|
if max_count > 0 and total_processed >= max_count:
|
||||||
|
print(f"Reached max count of {max_count} documents")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print(f"No documents loaded from {export_dir}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {export_dir}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not all_documents:
|
||||||
|
print("No documents loaded from any source. Exiting.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
print(f"\nTotal loaded {len(all_documents)} chat documents from {len(export_dirs)} exports")
|
||||||
|
|
||||||
|
# Create text splitter with 256 chunk size
|
||||||
|
text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25)
|
||||||
|
|
||||||
|
# Convert Documents to text strings and chunk them
|
||||||
|
all_texts = []
|
||||||
|
for doc in all_documents:
|
||||||
|
# Split the document into chunks
|
||||||
|
nodes = text_splitter.get_nodes_from_documents([doc])
|
||||||
|
for node in nodes:
|
||||||
|
all_texts.append(node.get_content())
|
||||||
|
|
||||||
|
print(f"Created {len(all_texts)} text chunks from {len(all_documents)} documents")
|
||||||
|
|
||||||
|
# Create LEANN index directory
|
||||||
|
print(f"--- Index directory not found, building new index ---")
|
||||||
|
INDEX_DIR.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
print(f"--- Building new LEANN index ---")
|
||||||
|
|
||||||
|
print(f"\n[PHASE 1] Building Leann index...")
|
||||||
|
|
||||||
|
# Use HNSW backend for better macOS compatibility
|
||||||
|
builder = LeannBuilder(
|
||||||
|
backend_name="hnsw",
|
||||||
|
embedding_model="Qwen/Qwen3-Embedding-0.6B",
|
||||||
|
graph_degree=32,
|
||||||
|
complexity=64,
|
||||||
|
is_compact=True,
|
||||||
|
is_recompute=True,
|
||||||
|
num_threads=1 # Force single-threaded mode
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Adding {len(all_texts)} chat chunks to index...")
|
||||||
|
for chunk_text in all_texts:
|
||||||
|
builder.add_text(chunk_text)
|
||||||
|
|
||||||
|
builder.build_index(index_path)
|
||||||
|
print(f"\nLEANN index built at {index_path}!")
|
||||||
|
else:
|
||||||
|
print(f"--- Using existing index at {INDEX_DIR} ---")
|
||||||
|
|
||||||
|
return index_path
|
||||||
|
|
||||||
|
def create_leann_index(export_dir: str = None, index_path: str = "wechat_history_index.leann", max_count: int = 1000):
|
||||||
|
"""
|
||||||
|
Create LEANN index from WeChat chat history data.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
export_dir: Path to the WeChat export directory (optional, uses default if None)
|
||||||
|
index_path: Path to save the LEANN index
|
||||||
|
max_count: Maximum number of chat entries to process
|
||||||
|
"""
|
||||||
|
print("Creating LEANN index from WeChat chat history data...")
|
||||||
|
INDEX_DIR = Path(index_path).parent
|
||||||
|
|
||||||
|
if not INDEX_DIR.exists():
|
||||||
|
print(f"--- Index directory not found, building new index ---")
|
||||||
|
INDEX_DIR.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
print(f"--- Building new LEANN index ---")
|
||||||
|
|
||||||
|
print(f"\n[PHASE 1] Building Leann index...")
|
||||||
|
|
||||||
|
# Load documents using WeChatHistoryReader from history_data
|
||||||
|
from history_data.wechat_history import WeChatHistoryReader
|
||||||
|
reader = WeChatHistoryReader()
|
||||||
|
|
||||||
|
documents = reader.load_data(
|
||||||
|
wechat_export_dir=export_dir,
|
||||||
|
max_count=max_count,
|
||||||
|
concatenate_messages=False # Disable concatenation - one message per document
|
||||||
|
)
|
||||||
|
|
||||||
|
if not documents:
|
||||||
|
print("No documents loaded. Exiting.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
print(f"Loaded {len(documents)} chat documents")
|
||||||
|
|
||||||
|
# Create text splitter with 256 chunk size
|
||||||
|
text_splitter = SentenceSplitter(chunk_size=256, chunk_overlap=25)
|
||||||
|
|
||||||
|
# Convert Documents to text strings and chunk them
|
||||||
|
all_texts = []
|
||||||
|
for doc in documents:
|
||||||
|
# Split the document into chunks
|
||||||
|
nodes = text_splitter.get_nodes_from_documents([doc])
|
||||||
|
for node in nodes:
|
||||||
|
all_texts.append(node.get_content())
|
||||||
|
|
||||||
|
print(f"Created {len(all_texts)} text chunks from {len(documents)} documents")
|
||||||
|
|
||||||
|
# Create LEANN index directory
|
||||||
|
print(f"--- Index directory not found, building new index ---")
|
||||||
|
INDEX_DIR.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
print(f"--- Building new LEANN index ---")
|
||||||
|
|
||||||
|
print(f"\n[PHASE 1] Building Leann index...")
|
||||||
|
|
||||||
|
# Use HNSW backend for better macOS compatibility
|
||||||
|
builder = LeannBuilder(
|
||||||
|
backend_name="hnsw",
|
||||||
|
embedding_model="mlx-community/Qwen3-Embedding-0.6B-4bit-DWQ", # MLX-optimized model
|
||||||
|
graph_degree=32,
|
||||||
|
complexity=64,
|
||||||
|
is_compact=True,
|
||||||
|
is_recompute=True,
|
||||||
|
num_threads=1 # Force single-threaded mode
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Adding {len(all_texts)} chat chunks to index...")
|
||||||
|
for chunk_text in all_texts:
|
||||||
|
builder.add_text(chunk_text)
|
||||||
|
|
||||||
|
builder.build_index(index_path)
|
||||||
|
print(f"\nLEANN index built at {index_path}!")
|
||||||
|
else:
|
||||||
|
print(f"--- Using existing index at {INDEX_DIR} ---")
|
||||||
|
|
||||||
|
return index_path
|
||||||
|
|
||||||
|
async def query_leann_index(index_path: str, query: str):
|
||||||
|
"""
|
||||||
|
Query the LEANN index.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
index_path: Path to the LEANN index
|
||||||
|
query: The query string
|
||||||
|
"""
|
||||||
|
print(f"\n[PHASE 2] Starting Leann chat session...")
|
||||||
|
chat = LeannChat(index_path=index_path)
|
||||||
|
|
||||||
|
print(f"You: {query}")
|
||||||
|
chat_response = chat.ask(
|
||||||
|
query,
|
||||||
|
top_k=5,
|
||||||
|
recompute_beighbor_embeddings=True,
|
||||||
|
complexity=32,
|
||||||
|
beam_width=1,
|
||||||
|
llm_config={
|
||||||
|
"type": "openai",
|
||||||
|
"model": "gpt-4o",
|
||||||
|
"api_key": os.getenv("OPENAI_API_KEY"),
|
||||||
|
},
|
||||||
|
llm_kwargs={
|
||||||
|
"temperature": 0.0,
|
||||||
|
"max_tokens": 1000
|
||||||
|
}
|
||||||
|
)
|
||||||
|
print(f"Leann: {chat_response}")
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Main function with integrated WeChat export functionality."""
|
||||||
|
|
||||||
|
# Initialize WeChat reader with export capabilities
|
||||||
|
from history_data.wechat_history import WeChatHistoryReader
|
||||||
|
reader = WeChatHistoryReader()
|
||||||
|
|
||||||
|
# Find existing exports or create new ones using the centralized method
|
||||||
|
export_dirs = reader.find_or_export_wechat_data("./wechat_export_direct")
|
||||||
|
|
||||||
|
if not export_dirs:
|
||||||
|
print("Failed to find or export WeChat data. Exiting.")
|
||||||
|
return
|
||||||
|
|
||||||
|
INDEX_DIR = Path("./wechat_history_index_leann_test")
|
||||||
|
INDEX_PATH = str(INDEX_DIR / "wechat_history.leann")
|
||||||
|
|
||||||
|
# Create or load the LEANN index from all sources
|
||||||
|
index_path = create_leann_index_from_multiple_wechat_exports(export_dirs, INDEX_PATH, max_count=5000)
|
||||||
|
|
||||||
|
if index_path:
|
||||||
|
# Example queries
|
||||||
|
queries = [
|
||||||
|
"我想买魔术师约翰逊的球衣,给我一些对应聊天记录?",
|
||||||
|
]
|
||||||
|
|
||||||
|
for query in queries:
|
||||||
|
print("\n" + "="*60)
|
||||||
|
await query_leann_index(index_path, query)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
Submodule packages/leann-backend-diskann/third_party/DiskANN updated: 2dcf156553...c7a9d681cb
Submodule packages/leann-backend-hnsw/third_party/faiss updated: 2547df4377...2365db59a7
115
packages/wechat-exporter/main.py
Normal file
115
packages/wechat-exporter/main.py
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
import json
|
||||||
|
import typer
|
||||||
|
from pathlib import Path
|
||||||
|
import requests
|
||||||
|
from tqdm import tqdm
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from typing_extensions import Annotated
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
app = typer.Typer()
|
||||||
|
|
||||||
|
def get_safe_path(s: str) -> str:
|
||||||
|
"""
|
||||||
|
Remove invalid characters to sanitize a path.
|
||||||
|
:param s: str to sanitize
|
||||||
|
:returns: sanitized str
|
||||||
|
"""
|
||||||
|
ban_chars = "\\ / : * ? \" ' < > | $ \r \n".replace(
|
||||||
|
' ', '')
|
||||||
|
for i in ban_chars:
|
||||||
|
s = s.replace(i, "")
|
||||||
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
def process_history(history: str):
|
||||||
|
if history.startswith("<?xml") or history.startswith("<msg>"):
|
||||||
|
try:
|
||||||
|
root = ET.fromstring(history)
|
||||||
|
title = root.find('.//title').text if root.find('.//title') is not None else None
|
||||||
|
quoted = root.find('.//refermsg/content').text if root.find('.//refermsg/content') is not None else None
|
||||||
|
if title and quoted:
|
||||||
|
return {
|
||||||
|
"title": title,
|
||||||
|
"quoted": process_history(quoted)
|
||||||
|
}
|
||||||
|
if title:
|
||||||
|
return title
|
||||||
|
except Exception:
|
||||||
|
return history
|
||||||
|
return history
|
||||||
|
|
||||||
|
def get_message(history: dict | str):
|
||||||
|
if isinstance(history, dict):
|
||||||
|
if 'title' in history:
|
||||||
|
return history['title']
|
||||||
|
else:
|
||||||
|
return history
|
||||||
|
|
||||||
|
def export_chathistory(user_id: str):
|
||||||
|
res = requests.get("http://localhost:48065/wechat/chatlog", params={
|
||||||
|
"userId": user_id,
|
||||||
|
"count": 100000
|
||||||
|
}).json()
|
||||||
|
for i in range(len(res['chatLogs'])):
|
||||||
|
res['chatLogs'][i]['content'] = process_history(res['chatLogs'][i]['content'])
|
||||||
|
res['chatLogs'][i]['message'] = get_message(res['chatLogs'][i]['content'])
|
||||||
|
return res['chatLogs']
|
||||||
|
|
||||||
|
@app.command()
|
||||||
|
def export_all(dest: Annotated[Path, typer.Argument(help="Destination path to export to.")]):
|
||||||
|
"""
|
||||||
|
Export all users' chat history to json files.
|
||||||
|
"""
|
||||||
|
if not dest.is_dir():
|
||||||
|
if not dest.exists():
|
||||||
|
inp = typer.prompt("Destination path does not exist, create it? (y/n)")
|
||||||
|
if inp.lower() == 'y':
|
||||||
|
dest.mkdir(parents=True)
|
||||||
|
else:
|
||||||
|
typer.echo("Aborted.", err=True)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
typer.echo("Destination path is not a directory!", err=True)
|
||||||
|
return
|
||||||
|
all_users = requests.get("http://localhost:48065/wechat/allcontacts").json()
|
||||||
|
|
||||||
|
exported_count = 0
|
||||||
|
for user in tqdm(all_users):
|
||||||
|
try:
|
||||||
|
usr_chatlog = export_chathistory(user['arg'])
|
||||||
|
|
||||||
|
# Only write file if there are messages
|
||||||
|
if len(usr_chatlog) > 0:
|
||||||
|
out_path = dest/get_safe_path((user['title'] or "")+"-"+user['arg']+'.json')
|
||||||
|
with open(out_path, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(usr_chatlog, f, ensure_ascii=False, indent=2)
|
||||||
|
exported_count += 1
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error exporting {user.get('title', 'Unknown')}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"Exported {exported_count} users' chat history to {dest} in json.")
|
||||||
|
|
||||||
|
@app.command()
|
||||||
|
def export_sqlite(dest: Annotated[Path, typer.Argument(help="Destination path to export to.")] = Path("chatlog.db")):
|
||||||
|
"""
|
||||||
|
Export all users' chat history to a sqlite database.
|
||||||
|
"""
|
||||||
|
connection = sqlite3.connect(dest)
|
||||||
|
cursor = connection.cursor()
|
||||||
|
cursor.execute("CREATE TABLE IF NOT EXISTS chatlog (id INTEGER PRIMARY KEY AUTOINCREMENT, with_id TEXT, from_user TEXT, to_user TEXT, message TEXT, timest DATETIME, auxiliary TEXT)")
|
||||||
|
cursor.execute("CREATE INDEX IF NOT EXISTS chatlog_with_id_index ON chatlog (with_id)")
|
||||||
|
cursor.execute("CREATE TABLE iF NOT EXISTS users (id TEXT PRIMARY KEY, name TEXT)")
|
||||||
|
|
||||||
|
all_users = requests.get("http://localhost:48065/wechat/allcontacts").json()
|
||||||
|
for user in tqdm(all_users):
|
||||||
|
cursor.execute("INSERT OR IGNORE INTO users (id, name) VALUES (?, ?)", (user['arg'], user['title']))
|
||||||
|
usr_chatlog = export_chathistory(user['arg'])
|
||||||
|
for msg in usr_chatlog:
|
||||||
|
cursor.execute("INSERT INTO chatlog (with_id, from_user, to_user, message, timest, auxiliary) VALUES (?, ?, ?, ?, ?, ?)", (user['arg'], msg['fromUser'], msg['toUser'], msg['message'], msg['createTime'], str(msg['content'])))
|
||||||
|
connection.commit()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
app()
|
||||||
BIN
packages/wechat-exporter/wechattweak-cli
Executable file
BIN
packages/wechat-exporter/wechattweak-cli
Executable file
Binary file not shown.
Reference in New Issue
Block a user