Add Claude RAG support - resolves #100
- Implement ClaudeReader for parsing JSON exports from Claude - Add claude_rag.py following BaseRAGExample pattern - Support both concatenated conversations and individual messages - Handle multiple JSON formats and structures - Include comprehensive error handling and user guidance - Add metadata extraction (titles, timestamps, roles) - Integrate with existing LEANN chunking and embedding systems Features: ✅ JSON parsing from Claude exports ✅ ZIP file extraction support ✅ Multiple JSON format support (list, single object, wrapped) ✅ Conversation detection and structuring ✅ Message role identification (user/assistant) ✅ Metadata extraction and preservation ✅ Dual processing modes (concatenated/separate) ✅ Command-line interface with all LEANN options ✅ Comprehensive error handling ✅ Multiple input format support (.json, .zip, directories) Usage: python -m apps.claude_rag --export-path claude_export.json python -m apps.claude_rag --export-path claude_export.zip --query 'Python help'
This commit is contained in:
0
apps/claude_data/__init__.py
Normal file
0
apps/claude_data/__init__.py
Normal file
420
apps/claude_data/claude_reader.py
Normal file
420
apps/claude_data/claude_reader.py
Normal file
@@ -0,0 +1,420 @@
|
||||
"""
|
||||
Claude export data reader.
|
||||
|
||||
Reads and processes Claude conversation data from exported JSON files.
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from zipfile import ZipFile
|
||||
|
||||
from llama_index.core import Document
|
||||
from llama_index.core.readers.base import BaseReader
|
||||
|
||||
|
||||
class ClaudeReader(BaseReader):
|
||||
"""
|
||||
Claude export data reader.
|
||||
|
||||
Reads Claude conversation data from exported JSON files or zip archives.
|
||||
Processes conversations into structured documents with metadata.
|
||||
"""
|
||||
|
||||
def __init__(self, concatenate_conversations: bool = True) -> None:
|
||||
"""
|
||||
Initialize.
|
||||
|
||||
Args:
|
||||
concatenate_conversations: Whether to concatenate messages within conversations for better context
|
||||
"""
|
||||
self.concatenate_conversations = concatenate_conversations
|
||||
|
||||
def _extract_json_from_zip(self, zip_path: Path) -> list[str]:
|
||||
"""
|
||||
Extract JSON files from Claude export zip file.
|
||||
|
||||
Args:
|
||||
zip_path: Path to the Claude export zip file
|
||||
|
||||
Returns:
|
||||
List of JSON content strings, or empty list if not found
|
||||
"""
|
||||
json_contents = []
|
||||
try:
|
||||
with ZipFile(zip_path, "r") as zip_file:
|
||||
# Look for JSON files
|
||||
json_files = [f for f in zip_file.namelist() if f.endswith(".json")]
|
||||
|
||||
if not json_files:
|
||||
print(f"No JSON files found in {zip_path}")
|
||||
return []
|
||||
|
||||
print(f"Found {len(json_files)} JSON files in archive")
|
||||
|
||||
for json_file in json_files:
|
||||
with zip_file.open(json_file) as f:
|
||||
content = f.read().decode("utf-8", errors="ignore")
|
||||
json_contents.append(content)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting JSON from zip {zip_path}: {e}")
|
||||
|
||||
return json_contents
|
||||
|
||||
def _parse_claude_json(self, json_content: str) -> list[dict]:
|
||||
"""
|
||||
Parse Claude JSON export to extract conversations.
|
||||
|
||||
Args:
|
||||
json_content: JSON content from Claude export
|
||||
|
||||
Returns:
|
||||
List of conversation dictionaries
|
||||
"""
|
||||
try:
|
||||
data = json.loads(json_content)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error parsing JSON: {e}")
|
||||
return []
|
||||
|
||||
conversations = []
|
||||
|
||||
# Handle different possible JSON structures
|
||||
if isinstance(data, list):
|
||||
# If data is a list of conversations
|
||||
for item in data:
|
||||
conversation = self._extract_conversation_from_json(item)
|
||||
if conversation:
|
||||
conversations.append(conversation)
|
||||
elif isinstance(data, dict):
|
||||
# Check for common structures
|
||||
if "conversations" in data:
|
||||
# Structure: {"conversations": [...]}
|
||||
for item in data["conversations"]:
|
||||
conversation = self._extract_conversation_from_json(item)
|
||||
if conversation:
|
||||
conversations.append(conversation)
|
||||
elif "messages" in data:
|
||||
# Single conversation with messages
|
||||
conversation = self._extract_conversation_from_json(data)
|
||||
if conversation:
|
||||
conversations.append(conversation)
|
||||
else:
|
||||
# Try to treat the whole object as a conversation
|
||||
conversation = self._extract_conversation_from_json(data)
|
||||
if conversation:
|
||||
conversations.append(conversation)
|
||||
|
||||
return conversations
|
||||
|
||||
def _extract_conversation_from_json(self, conv_data: dict) -> dict | None:
|
||||
"""
|
||||
Extract conversation data from a JSON object.
|
||||
|
||||
Args:
|
||||
conv_data: Dictionary containing conversation data
|
||||
|
||||
Returns:
|
||||
Dictionary with conversation data or None
|
||||
"""
|
||||
if not isinstance(conv_data, dict):
|
||||
return None
|
||||
|
||||
messages = []
|
||||
|
||||
# Look for messages in various possible structures
|
||||
message_sources = []
|
||||
if "messages" in conv_data:
|
||||
message_sources = conv_data["messages"]
|
||||
elif "chat" in conv_data:
|
||||
message_sources = conv_data["chat"]
|
||||
elif "conversation" in conv_data:
|
||||
message_sources = conv_data["conversation"]
|
||||
else:
|
||||
# If no clear message structure, try to extract from the object itself
|
||||
if "content" in conv_data and "role" in conv_data:
|
||||
message_sources = [conv_data]
|
||||
|
||||
for msg_data in message_sources:
|
||||
message = self._extract_message_from_json(msg_data)
|
||||
if message:
|
||||
messages.append(message)
|
||||
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
# Extract conversation metadata
|
||||
title = self._extract_title_from_conversation(conv_data, messages)
|
||||
timestamp = self._extract_timestamp_from_conversation(conv_data)
|
||||
|
||||
return {"title": title, "messages": messages, "timestamp": timestamp}
|
||||
|
||||
def _extract_message_from_json(self, msg_data: dict) -> dict | None:
|
||||
"""
|
||||
Extract message data from a JSON message object.
|
||||
|
||||
Args:
|
||||
msg_data: Dictionary containing message data
|
||||
|
||||
Returns:
|
||||
Dictionary with message data or None
|
||||
"""
|
||||
if not isinstance(msg_data, dict):
|
||||
return None
|
||||
|
||||
# Extract content from various possible fields
|
||||
content = ""
|
||||
content_fields = ["content", "text", "message", "body"]
|
||||
for field in content_fields:
|
||||
if msg_data.get(field):
|
||||
content = str(msg_data[field])
|
||||
break
|
||||
|
||||
if not content or len(content.strip()) < 3:
|
||||
return None
|
||||
|
||||
# Extract role (user/assistant/human/ai/claude)
|
||||
role = "mixed" # Default role
|
||||
role_fields = ["role", "sender", "from", "author", "type"]
|
||||
for field in role_fields:
|
||||
if msg_data.get(field):
|
||||
role_value = str(msg_data[field]).lower()
|
||||
if role_value in ["user", "human", "person"]:
|
||||
role = "user"
|
||||
elif role_value in ["assistant", "ai", "claude", "bot"]:
|
||||
role = "assistant"
|
||||
break
|
||||
|
||||
# Extract timestamp
|
||||
timestamp = self._extract_timestamp_from_message(msg_data)
|
||||
|
||||
return {"role": role, "content": content, "timestamp": timestamp}
|
||||
|
||||
def _extract_timestamp_from_message(self, msg_data: dict) -> str | None:
|
||||
"""Extract timestamp from message data."""
|
||||
timestamp_fields = ["timestamp", "created_at", "date", "time"]
|
||||
for field in timestamp_fields:
|
||||
if msg_data.get(field):
|
||||
return str(msg_data[field])
|
||||
return None
|
||||
|
||||
def _extract_timestamp_from_conversation(self, conv_data: dict) -> str | None:
|
||||
"""Extract timestamp from conversation data."""
|
||||
timestamp_fields = ["timestamp", "created_at", "date", "updated_at", "last_updated"]
|
||||
for field in timestamp_fields:
|
||||
if conv_data.get(field):
|
||||
return str(conv_data[field])
|
||||
return None
|
||||
|
||||
def _extract_title_from_conversation(self, conv_data: dict, messages: list) -> str:
|
||||
"""Extract or generate title for conversation."""
|
||||
# Try to find explicit title
|
||||
title_fields = ["title", "name", "subject", "topic"]
|
||||
for field in title_fields:
|
||||
if conv_data.get(field):
|
||||
return str(conv_data[field])
|
||||
|
||||
# Generate title from first user message
|
||||
for message in messages:
|
||||
if message.get("role") == "user":
|
||||
content = message.get("content", "")
|
||||
if content:
|
||||
# Use first 50 characters as title
|
||||
title = content[:50].strip()
|
||||
if len(content) > 50:
|
||||
title += "..."
|
||||
return title
|
||||
|
||||
return "Claude Conversation"
|
||||
|
||||
def _create_concatenated_content(self, conversation: dict) -> str:
|
||||
"""
|
||||
Create concatenated content from conversation messages.
|
||||
|
||||
Args:
|
||||
conversation: Dictionary containing conversation data
|
||||
|
||||
Returns:
|
||||
Formatted concatenated content
|
||||
"""
|
||||
title = conversation.get("title", "Claude Conversation")
|
||||
messages = conversation.get("messages", [])
|
||||
timestamp = conversation.get("timestamp", "Unknown")
|
||||
|
||||
# Build message content
|
||||
message_parts = []
|
||||
for i, message in enumerate(messages):
|
||||
role = message.get("role", "mixed")
|
||||
content = message.get("content", "")
|
||||
msg_timestamp = message.get("timestamp", "")
|
||||
|
||||
if role == "user":
|
||||
prefix = "[You]"
|
||||
elif role == "assistant":
|
||||
prefix = "[Claude]"
|
||||
else:
|
||||
prefix = "[Message]"
|
||||
|
||||
# Add timestamp if available
|
||||
if msg_timestamp:
|
||||
prefix += f" ({msg_timestamp})"
|
||||
|
||||
message_parts.append(f"{prefix}: {content}")
|
||||
|
||||
concatenated_text = "\n\n".join(message_parts)
|
||||
|
||||
# Create final document content
|
||||
doc_content = f"""Conversation: {title}
|
||||
Date: {timestamp}
|
||||
Messages ({len(messages)} messages):
|
||||
|
||||
{concatenated_text}
|
||||
"""
|
||||
return doc_content
|
||||
|
||||
def load_data(self, input_dir: str | None = None, **load_kwargs: Any) -> list[Document]:
|
||||
"""
|
||||
Load Claude export data.
|
||||
|
||||
Args:
|
||||
input_dir: Directory containing Claude export files or path to specific file
|
||||
**load_kwargs:
|
||||
max_count (int): Maximum number of conversations to process
|
||||
claude_export_path (str): Specific path to Claude export file/directory
|
||||
include_metadata (bool): Whether to include metadata in documents
|
||||
"""
|
||||
docs: list[Document] = []
|
||||
max_count = load_kwargs.get("max_count", -1)
|
||||
claude_export_path = load_kwargs.get("claude_export_path", input_dir)
|
||||
include_metadata = load_kwargs.get("include_metadata", True)
|
||||
|
||||
if not claude_export_path:
|
||||
print("No Claude export path provided")
|
||||
return docs
|
||||
|
||||
export_path = Path(claude_export_path)
|
||||
|
||||
if not export_path.exists():
|
||||
print(f"Claude export path not found: {export_path}")
|
||||
return docs
|
||||
|
||||
json_contents = []
|
||||
|
||||
# Handle different input types
|
||||
if export_path.is_file():
|
||||
if export_path.suffix.lower() == ".zip":
|
||||
# Extract JSON from zip file
|
||||
json_contents = self._extract_json_from_zip(export_path)
|
||||
elif export_path.suffix.lower() == ".json":
|
||||
# Read JSON file directly
|
||||
try:
|
||||
with open(export_path, encoding="utf-8", errors="ignore") as f:
|
||||
json_contents.append(f.read())
|
||||
except Exception as e:
|
||||
print(f"Error reading JSON file {export_path}: {e}")
|
||||
return docs
|
||||
else:
|
||||
print(f"Unsupported file type: {export_path.suffix}")
|
||||
return docs
|
||||
|
||||
elif export_path.is_dir():
|
||||
# Look for JSON files in directory
|
||||
json_files = list(export_path.glob("*.json"))
|
||||
zip_files = list(export_path.glob("*.zip"))
|
||||
|
||||
if json_files:
|
||||
print(f"Found {len(json_files)} JSON files in directory")
|
||||
for json_file in json_files:
|
||||
try:
|
||||
with open(json_file, encoding="utf-8", errors="ignore") as f:
|
||||
json_contents.append(f.read())
|
||||
except Exception as e:
|
||||
print(f"Error reading JSON file {json_file}: {e}")
|
||||
continue
|
||||
|
||||
if zip_files:
|
||||
print(f"Found {len(zip_files)} ZIP files in directory")
|
||||
for zip_file in zip_files:
|
||||
zip_contents = self._extract_json_from_zip(zip_file)
|
||||
json_contents.extend(zip_contents)
|
||||
|
||||
if not json_files and not zip_files:
|
||||
print(f"No JSON or ZIP files found in {export_path}")
|
||||
return docs
|
||||
|
||||
if not json_contents:
|
||||
print("No JSON content found to process")
|
||||
return docs
|
||||
|
||||
# Parse conversations from JSON content
|
||||
print("Parsing Claude conversations from JSON...")
|
||||
all_conversations = []
|
||||
for json_content in json_contents:
|
||||
conversations = self._parse_claude_json(json_content)
|
||||
all_conversations.extend(conversations)
|
||||
|
||||
if not all_conversations:
|
||||
print("No conversations found in JSON content")
|
||||
return docs
|
||||
|
||||
print(f"Found {len(all_conversations)} conversations")
|
||||
|
||||
# Process conversations into documents
|
||||
count = 0
|
||||
for conversation in all_conversations:
|
||||
if max_count > 0 and count >= max_count:
|
||||
break
|
||||
|
||||
if self.concatenate_conversations:
|
||||
# Create one document per conversation with concatenated messages
|
||||
doc_content = self._create_concatenated_content(conversation)
|
||||
|
||||
metadata = {}
|
||||
if include_metadata:
|
||||
metadata = {
|
||||
"title": conversation.get("title", "Claude Conversation"),
|
||||
"timestamp": conversation.get("timestamp", "Unknown"),
|
||||
"message_count": len(conversation.get("messages", [])),
|
||||
"source": "Claude Export",
|
||||
}
|
||||
|
||||
doc = Document(text=doc_content, metadata=metadata)
|
||||
docs.append(doc)
|
||||
count += 1
|
||||
|
||||
else:
|
||||
# Create separate documents for each message
|
||||
for message in conversation.get("messages", []):
|
||||
if max_count > 0 and count >= max_count:
|
||||
break
|
||||
|
||||
role = message.get("role", "mixed")
|
||||
content = message.get("content", "")
|
||||
msg_timestamp = message.get("timestamp", "")
|
||||
|
||||
if not content.strip():
|
||||
continue
|
||||
|
||||
# Create document content with context
|
||||
doc_content = f"""Conversation: {conversation.get("title", "Claude Conversation")}
|
||||
Role: {role}
|
||||
Timestamp: {msg_timestamp or conversation.get("timestamp", "Unknown")}
|
||||
Message: {content}
|
||||
"""
|
||||
|
||||
metadata = {}
|
||||
if include_metadata:
|
||||
metadata = {
|
||||
"conversation_title": conversation.get("title", "Claude Conversation"),
|
||||
"role": role,
|
||||
"timestamp": msg_timestamp or conversation.get("timestamp", "Unknown"),
|
||||
"source": "Claude Export",
|
||||
}
|
||||
|
||||
doc = Document(text=doc_content, metadata=metadata)
|
||||
docs.append(doc)
|
||||
count += 1
|
||||
|
||||
print(f"Created {len(docs)} documents from Claude export")
|
||||
return docs
|
||||
189
apps/claude_rag.py
Normal file
189
apps/claude_rag.py
Normal file
@@ -0,0 +1,189 @@
|
||||
"""
|
||||
Claude RAG example using the unified interface.
|
||||
Supports Claude export data from JSON files.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from base_rag_example import BaseRAGExample
|
||||
from chunking import create_text_chunks
|
||||
|
||||
from .claude_data.claude_reader import ClaudeReader
|
||||
|
||||
|
||||
class ClaudeRAG(BaseRAGExample):
|
||||
"""RAG example for Claude conversation data."""
|
||||
|
||||
def __init__(self):
|
||||
# Set default values BEFORE calling super().__init__
|
||||
self.max_items_default = -1 # Process all conversations by default
|
||||
self.embedding_model_default = (
|
||||
"sentence-transformers/all-MiniLM-L6-v2" # Fast 384-dim model
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
name="Claude",
|
||||
description="Process and query Claude conversation exports with LEANN",
|
||||
default_index_name="claude_conversations_index",
|
||||
)
|
||||
|
||||
def _add_specific_arguments(self, parser):
|
||||
"""Add Claude-specific arguments."""
|
||||
claude_group = parser.add_argument_group("Claude Parameters")
|
||||
claude_group.add_argument(
|
||||
"--export-path",
|
||||
type=str,
|
||||
default="./claude_export",
|
||||
help="Path to Claude export file (.json or .zip) or directory containing exports (default: ./claude_export)",
|
||||
)
|
||||
claude_group.add_argument(
|
||||
"--concatenate-conversations",
|
||||
action="store_true",
|
||||
default=True,
|
||||
help="Concatenate messages within conversations for better context (default: True)",
|
||||
)
|
||||
claude_group.add_argument(
|
||||
"--separate-messages",
|
||||
action="store_true",
|
||||
help="Process each message as a separate document (overrides --concatenate-conversations)",
|
||||
)
|
||||
claude_group.add_argument(
|
||||
"--chunk-size", type=int, default=512, help="Text chunk size (default: 512)"
|
||||
)
|
||||
claude_group.add_argument(
|
||||
"--chunk-overlap", type=int, default=128, help="Text chunk overlap (default: 128)"
|
||||
)
|
||||
|
||||
def _find_claude_exports(self, export_path: Path) -> list[Path]:
|
||||
"""
|
||||
Find Claude export files in the given path.
|
||||
|
||||
Args:
|
||||
export_path: Path to search for exports
|
||||
|
||||
Returns:
|
||||
List of paths to Claude export files
|
||||
"""
|
||||
export_files = []
|
||||
|
||||
if export_path.is_file():
|
||||
if export_path.suffix.lower() in [".zip", ".json"]:
|
||||
export_files.append(export_path)
|
||||
elif export_path.is_dir():
|
||||
# Look for zip and json files
|
||||
export_files.extend(export_path.glob("*.zip"))
|
||||
export_files.extend(export_path.glob("*.json"))
|
||||
|
||||
return export_files
|
||||
|
||||
async def load_data(self, args) -> list[str]:
|
||||
"""Load Claude export data and convert to text chunks."""
|
||||
export_path = Path(args.export_path)
|
||||
|
||||
if not export_path.exists():
|
||||
print(f"Claude export path not found: {export_path}")
|
||||
print(
|
||||
"Please ensure you have exported your Claude data and placed it in the correct location."
|
||||
)
|
||||
print("\nTo export your Claude data:")
|
||||
print("1. Open Claude in your browser")
|
||||
print("2. Look for export/download options in settings or conversation menu")
|
||||
print("3. Download the conversation data (usually in JSON format)")
|
||||
print("4. Place the file/directory at the specified path")
|
||||
print(
|
||||
"\nNote: Claude export methods may vary. Check Claude's help documentation for current instructions."
|
||||
)
|
||||
return []
|
||||
|
||||
# Find export files
|
||||
export_files = self._find_claude_exports(export_path)
|
||||
|
||||
if not export_files:
|
||||
print(f"No Claude export files (.json or .zip) found in: {export_path}")
|
||||
return []
|
||||
|
||||
print(f"Found {len(export_files)} Claude export files")
|
||||
|
||||
# Create reader with appropriate settings
|
||||
concatenate = args.concatenate_conversations and not args.separate_messages
|
||||
reader = ClaudeReader(concatenate_conversations=concatenate)
|
||||
|
||||
# Process each export file
|
||||
all_documents = []
|
||||
total_processed = 0
|
||||
|
||||
for i, export_file in enumerate(export_files):
|
||||
print(f"\nProcessing export file {i + 1}/{len(export_files)}: {export_file.name}")
|
||||
|
||||
try:
|
||||
# Apply max_items limit per file
|
||||
max_per_file = -1
|
||||
if args.max_items > 0:
|
||||
remaining = args.max_items - total_processed
|
||||
if remaining <= 0:
|
||||
break
|
||||
max_per_file = remaining
|
||||
|
||||
# Load conversations
|
||||
documents = reader.load_data(
|
||||
claude_export_path=str(export_file),
|
||||
max_count=max_per_file,
|
||||
include_metadata=True,
|
||||
)
|
||||
|
||||
if documents:
|
||||
all_documents.extend(documents)
|
||||
total_processed += len(documents)
|
||||
print(f"Processed {len(documents)} conversations from this file")
|
||||
else:
|
||||
print(f"No conversations loaded from {export_file}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing {export_file}: {e}")
|
||||
continue
|
||||
|
||||
if not all_documents:
|
||||
print("No conversations found to process!")
|
||||
print("\nTroubleshooting:")
|
||||
print("- Ensure the export file is a valid Claude export")
|
||||
print("- Check that the JSON file contains conversation data")
|
||||
print("- Try using a different export format or method")
|
||||
print("- Check Claude's documentation for current export procedures")
|
||||
return []
|
||||
|
||||
print(f"\nTotal conversations processed: {len(all_documents)}")
|
||||
print("Now starting to split into text chunks... this may take some time")
|
||||
|
||||
# Convert to text chunks
|
||||
all_texts = create_text_chunks(
|
||||
all_documents, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap
|
||||
)
|
||||
|
||||
print(f"Created {len(all_texts)} text chunks from {len(all_documents)} conversations")
|
||||
return all_texts
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
|
||||
# Example queries for Claude RAG
|
||||
print("\n🤖 Claude RAG Example")
|
||||
print("=" * 50)
|
||||
print("\nExample queries you can try:")
|
||||
print("- 'What did I ask Claude about Python programming?'")
|
||||
print("- 'Show me conversations about machine learning'")
|
||||
print("- 'Find discussions about code optimization'")
|
||||
print("- 'What advice did Claude give me about software design?'")
|
||||
print("- 'Search for conversations about debugging techniques'")
|
||||
print("\nTo get started:")
|
||||
print("1. Export your Claude conversation data")
|
||||
print("2. Place the JSON/ZIP file in ./claude_export/")
|
||||
print("3. Run this script to build your personal Claude knowledge base!")
|
||||
print("\nOr run without --query for interactive mode\n")
|
||||
|
||||
rag = ClaudeRAG()
|
||||
asyncio.run(rag.run())
|
||||
Reference in New Issue
Block a user