343 lines
11 KiB
Python
343 lines
11 KiB
Python
"""
|
|
iMessage data reader.
|
|
|
|
Reads and processes iMessage conversation data from the macOS Messages database.
|
|
"""
|
|
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from llama_index.core import Document
|
|
from llama_index.core.readers.base import BaseReader
|
|
|
|
|
|
class IMessageReader(BaseReader):
|
|
"""
|
|
iMessage data reader.
|
|
|
|
Reads iMessage conversation data from the macOS Messages database (chat.db).
|
|
Processes conversations into structured documents with metadata.
|
|
"""
|
|
|
|
def __init__(self, concatenate_conversations: bool = True) -> None:
|
|
"""
|
|
Initialize.
|
|
|
|
Args:
|
|
concatenate_conversations: Whether to concatenate messages within conversations for better context
|
|
"""
|
|
self.concatenate_conversations = concatenate_conversations
|
|
|
|
def _get_default_chat_db_path(self) -> Path:
|
|
"""
|
|
Get the default path to the iMessage chat database.
|
|
|
|
Returns:
|
|
Path to the chat.db file
|
|
"""
|
|
home = Path.home()
|
|
return home / "Library" / "Messages" / "chat.db"
|
|
|
|
def _convert_cocoa_timestamp(self, cocoa_timestamp: int) -> str:
|
|
"""
|
|
Convert Cocoa timestamp to readable format.
|
|
|
|
Args:
|
|
cocoa_timestamp: Timestamp in Cocoa format (nanoseconds since 2001-01-01)
|
|
|
|
Returns:
|
|
Formatted timestamp string
|
|
"""
|
|
if cocoa_timestamp == 0:
|
|
return "Unknown"
|
|
|
|
try:
|
|
# Cocoa timestamp is nanoseconds since 2001-01-01 00:00:00 UTC
|
|
# Convert to seconds and add to Unix epoch
|
|
cocoa_epoch = datetime(2001, 1, 1)
|
|
unix_timestamp = cocoa_timestamp / 1_000_000_000 # Convert nanoseconds to seconds
|
|
message_time = cocoa_epoch.timestamp() + unix_timestamp
|
|
return datetime.fromtimestamp(message_time).strftime("%Y-%m-%d %H:%M:%S")
|
|
except (ValueError, OSError):
|
|
return "Unknown"
|
|
|
|
def _get_contact_name(self, handle_id: str) -> str:
|
|
"""
|
|
Get a readable contact name from handle ID.
|
|
|
|
Args:
|
|
handle_id: The handle ID (phone number or email)
|
|
|
|
Returns:
|
|
Formatted contact name
|
|
"""
|
|
if not handle_id:
|
|
return "Unknown"
|
|
|
|
# Clean up phone numbers and emails for display
|
|
if "@" in handle_id:
|
|
return handle_id # Email address
|
|
elif handle_id.startswith("+"):
|
|
return handle_id # International phone number
|
|
else:
|
|
# Try to format as phone number
|
|
digits = "".join(filter(str.isdigit, handle_id))
|
|
if len(digits) == 10:
|
|
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
|
|
elif len(digits) == 11 and digits[0] == "1":
|
|
return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
|
|
else:
|
|
return handle_id
|
|
|
|
def _read_messages_from_db(self, db_path: Path) -> list[dict]:
|
|
"""
|
|
Read messages from the iMessage database.
|
|
|
|
Args:
|
|
db_path: Path to the chat.db file
|
|
|
|
Returns:
|
|
List of message dictionaries
|
|
"""
|
|
if not db_path.exists():
|
|
print(f"iMessage database not found at: {db_path}")
|
|
return []
|
|
|
|
try:
|
|
# Connect to the database
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
# Query to get messages with chat and handle information
|
|
query = """
|
|
SELECT
|
|
m.ROWID as message_id,
|
|
m.text,
|
|
m.date,
|
|
m.is_from_me,
|
|
m.service,
|
|
c.chat_identifier,
|
|
c.display_name as chat_display_name,
|
|
h.id as handle_id,
|
|
c.ROWID as chat_id
|
|
FROM message m
|
|
LEFT JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
|
|
LEFT JOIN chat c ON cmj.chat_id = c.ROWID
|
|
LEFT JOIN handle h ON m.handle_id = h.ROWID
|
|
WHERE m.text IS NOT NULL AND m.text != ''
|
|
ORDER BY c.ROWID, m.date
|
|
"""
|
|
|
|
cursor.execute(query)
|
|
rows = cursor.fetchall()
|
|
|
|
messages = []
|
|
for row in rows:
|
|
(
|
|
message_id,
|
|
text,
|
|
date,
|
|
is_from_me,
|
|
service,
|
|
chat_identifier,
|
|
chat_display_name,
|
|
handle_id,
|
|
chat_id,
|
|
) = row
|
|
|
|
message = {
|
|
"message_id": message_id,
|
|
"text": text,
|
|
"timestamp": self._convert_cocoa_timestamp(date),
|
|
"is_from_me": bool(is_from_me),
|
|
"service": service or "iMessage",
|
|
"chat_identifier": chat_identifier or "Unknown",
|
|
"chat_display_name": chat_display_name or "Unknown Chat",
|
|
"handle_id": handle_id or "Unknown",
|
|
"contact_name": self._get_contact_name(handle_id or ""),
|
|
"chat_id": chat_id,
|
|
}
|
|
messages.append(message)
|
|
|
|
conn.close()
|
|
print(f"Found {len(messages)} messages in database")
|
|
return messages
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"Error reading iMessage database: {e}")
|
|
return []
|
|
except Exception as e:
|
|
print(f"Unexpected error reading iMessage database: {e}")
|
|
return []
|
|
|
|
def _group_messages_by_chat(self, messages: list[dict]) -> dict[int, list[dict]]:
|
|
"""
|
|
Group messages by chat ID.
|
|
|
|
Args:
|
|
messages: List of message dictionaries
|
|
|
|
Returns:
|
|
Dictionary mapping chat_id to list of messages
|
|
"""
|
|
chats = {}
|
|
for message in messages:
|
|
chat_id = message["chat_id"]
|
|
if chat_id not in chats:
|
|
chats[chat_id] = []
|
|
chats[chat_id].append(message)
|
|
|
|
return chats
|
|
|
|
def _create_concatenated_content(self, chat_id: int, messages: list[dict]) -> str:
|
|
"""
|
|
Create concatenated content from chat messages.
|
|
|
|
Args:
|
|
chat_id: The chat ID
|
|
messages: List of messages in the chat
|
|
|
|
Returns:
|
|
Concatenated text content
|
|
"""
|
|
if not messages:
|
|
return ""
|
|
|
|
# Get chat info from first message
|
|
first_msg = messages[0]
|
|
chat_name = first_msg["chat_display_name"]
|
|
chat_identifier = first_msg["chat_identifier"]
|
|
|
|
# Build message content
|
|
message_parts = []
|
|
for message in messages:
|
|
timestamp = message["timestamp"]
|
|
is_from_me = message["is_from_me"]
|
|
text = message["text"]
|
|
contact_name = message["contact_name"]
|
|
|
|
if is_from_me:
|
|
prefix = "[You]"
|
|
else:
|
|
prefix = f"[{contact_name}]"
|
|
|
|
if timestamp != "Unknown":
|
|
prefix += f" ({timestamp})"
|
|
|
|
message_parts.append(f"{prefix}: {text}")
|
|
|
|
concatenated_text = "\n\n".join(message_parts)
|
|
|
|
doc_content = f"""Chat: {chat_name}
|
|
Identifier: {chat_identifier}
|
|
Messages ({len(messages)} messages):
|
|
|
|
{concatenated_text}
|
|
"""
|
|
return doc_content
|
|
|
|
def _create_individual_content(self, message: dict) -> str:
|
|
"""
|
|
Create content for individual message.
|
|
|
|
Args:
|
|
message: Message dictionary
|
|
|
|
Returns:
|
|
Formatted message content
|
|
"""
|
|
timestamp = message["timestamp"]
|
|
is_from_me = message["is_from_me"]
|
|
text = message["text"]
|
|
contact_name = message["contact_name"]
|
|
chat_name = message["chat_display_name"]
|
|
|
|
sender = "You" if is_from_me else contact_name
|
|
|
|
return f"""Message from {sender} in chat "{chat_name}"
|
|
Time: {timestamp}
|
|
Content: {text}
|
|
"""
|
|
|
|
def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]:
|
|
"""
|
|
Load iMessage data and return as documents.
|
|
|
|
Args:
|
|
input_dir: Optional path to directory containing chat.db file.
|
|
If not provided, uses default macOS location.
|
|
**load_kwargs: Additional arguments (unused)
|
|
|
|
Returns:
|
|
List of Document objects containing iMessage data
|
|
"""
|
|
docs = []
|
|
|
|
# Determine database path
|
|
if input_dir:
|
|
db_path = Path(input_dir) / "chat.db"
|
|
else:
|
|
db_path = self._get_default_chat_db_path()
|
|
|
|
print(f"Reading iMessage database from: {db_path}")
|
|
|
|
# Read messages from database
|
|
messages = self._read_messages_from_db(db_path)
|
|
if not messages:
|
|
return docs
|
|
|
|
if self.concatenate_conversations:
|
|
# Group messages by chat and create concatenated documents
|
|
chats = self._group_messages_by_chat(messages)
|
|
|
|
for chat_id, chat_messages in chats.items():
|
|
if not chat_messages:
|
|
continue
|
|
|
|
content = self._create_concatenated_content(chat_id, chat_messages)
|
|
|
|
# Create metadata
|
|
first_msg = chat_messages[0]
|
|
last_msg = chat_messages[-1]
|
|
|
|
metadata = {
|
|
"source": "iMessage",
|
|
"chat_id": chat_id,
|
|
"chat_name": first_msg["chat_display_name"],
|
|
"chat_identifier": first_msg["chat_identifier"],
|
|
"message_count": len(chat_messages),
|
|
"first_message_date": first_msg["timestamp"],
|
|
"last_message_date": last_msg["timestamp"],
|
|
"participants": list(
|
|
{msg["contact_name"] for msg in chat_messages if not msg["is_from_me"]}
|
|
),
|
|
}
|
|
|
|
doc = Document(text=content, metadata=metadata)
|
|
docs.append(doc)
|
|
|
|
else:
|
|
# Create individual documents for each message
|
|
for message in messages:
|
|
content = self._create_individual_content(message)
|
|
|
|
metadata = {
|
|
"source": "iMessage",
|
|
"message_id": message["message_id"],
|
|
"chat_id": message["chat_id"],
|
|
"chat_name": message["chat_display_name"],
|
|
"chat_identifier": message["chat_identifier"],
|
|
"timestamp": message["timestamp"],
|
|
"is_from_me": message["is_from_me"],
|
|
"contact_name": message["contact_name"],
|
|
"service": message["service"],
|
|
}
|
|
|
|
doc = Document(text=content, metadata=metadata)
|
|
docs.append(doc)
|
|
|
|
print(f"Created {len(docs)} documents from iMessage data")
|
|
return docs
|