diff --git a/README.md b/README.md
index a4b81ce..6cbf20f 100755
--- a/README.md
+++ b/README.md
@@ -20,7 +20,7 @@ LEANN is an innovative vector database that democratizes personal AI. Transform
LEANN achieves this through *graph-based selective recomputation* with *high-degree preserving pruning*, computing embeddings on-demand instead of storing them all. [Illustration Fig โ](#๏ธ-architecture--how-it-works) | [Paper โ](https://arxiv.org/abs/2506.08276)
-**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy.
+**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[live data](#mcp-integration-rag-on-live-data-from-any-platform)** ([Slack](#slack-messages-search-your-team-conversations), [Twitter](#twitter-bookmarks-your-personal-tweet-library)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy.
\* Claude Code only supports basic `grep`-style keyword search. **LEANN** is a drop-in **semantic search MCP service fully compatible with Claude Code**, unlocking intelligent retrieval without changing your workflow. ๐ฅ Check out [the easy setup โ](packages/leann-mcp/README.md)
@@ -72,8 +72,9 @@ uv venv
source .venv/bin/activate
uv pip install leann
```
+
+> Low-resource? See "Low-resource setups" in the [Configuration Guide](docs/configuration-guide.md#low-resource-setups). -->
@@ -176,7 +177,7 @@ response = chat.ask("How much storage does LEANN save?", top_k=1)
## RAG on Everything!
-LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and more.
+LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and **live data from any platform through MCP (Model Context Protocol) servers** - including Slack, Twitter, and more.
@@ -774,6 +775,155 @@ Once your iMessage conversations are indexed, you can search with queries like:
+### MCP Integration: RAG on Live Data from Any Platform
+
+**NEW!** Connect to live data sources through the Model Context Protocol (MCP). LEANN now supports real-time RAG on platforms like Slack, Twitter, and more through standardized MCP servers.
+
+**Key Benefits:**
+- **Live Data Access**: Fetch real-time data without manual exports
+- **Standardized Protocol**: Use any MCP-compatible server
+- **Easy Extension**: Add new platforms with minimal code
+- **Secure Access**: MCP servers handle authentication
+
+
+Slack Messages: Search Your Team Conversations
+
+Transform your Slack workspace into a searchable knowledge base! Find discussions, decisions, and shared knowledge across all your channels.
+
+```bash
+# Test MCP server connection
+python -m apps.slack_rag --mcp-server "slack-mcp-server" --test-connection
+
+# Index and search Slack messages
+python -m apps.slack_rag \
+ --mcp-server "slack-mcp-server" \
+ --workspace-name "my-team" \
+ --channels general dev-team random \
+ --query "What did we decide about the product launch?"
+```
+
+**Setup Requirements:**
+1. Install a Slack MCP server (e.g., `npm install -g slack-mcp-server`)
+2. Create a Slack App and get API credentials:
+ - Go to [api.slack.com/apps](https://api.slack.com/apps) and create a new app
+ - Under "OAuth & Permissions", add these Bot Token Scopes: `channels:read`, `channels:history`, `groups:read`, `groups:history`, `im:read`, `im:history`, `mpim:read`, `mpim:history`
+ - Install the app to your workspace and copy the "Bot User OAuth Token" (starts with `xoxb-`)
+ - Under "App-Level Tokens", create a token with `connections:write` scope (starts with `xapp-`)
+ ```bash
+ export SLACK_BOT_TOKEN="xoxb-your-bot-token"
+ export SLACK_APP_TOKEN="xapp-your-app-token"
+ ```
+3. Test connection with `--test-connection` flag
+
+**Arguments:**
+- `--mcp-server`: Command to start the Slack MCP server
+- `--workspace-name`: Slack workspace name for organization
+- `--channels`: Specific channels to index (optional)
+- `--concatenate-conversations`: Group messages by channel (default: true)
+- `--max-messages-per-channel`: Limit messages per channel (default: 100)
+
+
+
+
+Twitter Bookmarks: Your Personal Tweet Library
+
+Search through your Twitter bookmarks! Find that perfect article, thread, or insight you saved for later.
+
+```bash
+# Test MCP server connection
+python -m apps.twitter_rag --mcp-server "twitter-mcp-server" --test-connection
+
+# Index and search Twitter bookmarks
+python -m apps.twitter_rag \
+ --mcp-server "twitter-mcp-server" \
+ --max-bookmarks 1000 \
+ --query "What AI articles did I bookmark about machine learning?"
+```
+
+**Setup Requirements:**
+1. Install a Twitter MCP server (e.g., `npm install -g twitter-mcp-server`)
+2. Get Twitter API credentials:
+ - Apply for a Twitter Developer Account at [developer.twitter.com](https://developer.twitter.com)
+ - Create a new app in the Twitter Developer Portal
+ - Generate API keys and access tokens with "Read" permissions
+ - For bookmarks access, you may need Twitter API v2 with appropriate scopes
+ ```bash
+ export TWITTER_API_KEY="your-api-key"
+ export TWITTER_API_SECRET="your-api-secret"
+ export TWITTER_ACCESS_TOKEN="your-access-token"
+ export TWITTER_ACCESS_TOKEN_SECRET="your-access-token-secret"
+ ```
+3. Test connection with `--test-connection` flag
+
+**Arguments:**
+- `--mcp-server`: Command to start the Twitter MCP server
+- `--username`: Filter bookmarks by username (optional)
+- `--max-bookmarks`: Maximum bookmarks to fetch (default: 1000)
+- `--no-tweet-content`: Exclude tweet content, only metadata
+- `--no-metadata`: Exclude engagement metadata
+
+
+
+
+๐ก Click to expand: Example queries you can try
+
+**Slack Queries:**
+- "What did the team discuss about the project deadline?"
+- "Find messages about the new feature launch"
+- "Show me conversations about budget planning"
+- "What decisions were made in the dev-team channel?"
+
+**Twitter Queries:**
+- "What AI articles did I bookmark last month?"
+- "Find tweets about machine learning techniques"
+- "Show me bookmarked threads about startup advice"
+- "What Python tutorials did I save?"
+
+
+
+
+๐ง Using MCP with CLI Commands
+
+**Want to use MCP data with regular LEANN CLI?** You can combine MCP apps with CLI commands:
+
+```bash
+# Step 1: Use MCP app to fetch and index data
+python -m apps.slack_rag --mcp-server "slack-mcp-server" --workspace-name "my-team"
+
+# Step 2: The data is now indexed and available via CLI
+leann search slack_messages "project deadline"
+leann ask slack_messages "What decisions were made about the product launch?"
+
+# Same for Twitter bookmarks
+python -m apps.twitter_rag --mcp-server "twitter-mcp-server"
+leann search twitter_bookmarks "machine learning articles"
+```
+
+**MCP vs Manual Export:**
+- **MCP**: Live data, automatic updates, requires server setup
+- **Manual Export**: One-time setup, works offline, requires manual data export
+
+
+
+
+๐ง Adding New MCP Platforms
+
+Want to add support for other platforms? LEANN's MCP integration is designed for easy extension:
+
+1. **Find or create an MCP server** for your platform
+2. **Create a reader class** following the pattern in `apps/slack_data/slack_mcp_reader.py`
+3. **Create a RAG application** following the pattern in `apps/slack_rag.py`
+4. **Test and contribute** back to the community!
+
+**Popular MCP servers to explore:**
+- GitHub repositories and issues
+- Discord messages
+- Notion pages
+- Google Drive documents
+- And many more in the MCP ecosystem!
+
+
+
### ๐ Claude Code Integration: Transform Your Development Workflow!
@@ -805,7 +955,7 @@ Try our fully agentic pipeline with auto query rewriting, semantic search planni
**๐ฅ Ready to supercharge your coding?** [Complete Setup Guide โ](packages/leann-mcp/README.md)
-## ๐ฅ๏ธ Command Line Interface
+## Command Line Interface
LEANN includes a powerful CLI for document processing and search. Perfect for quick document indexing and interactive chat.
@@ -1047,7 +1197,7 @@ MIT License - see [LICENSE](LICENSE) for details.
Core Contributors: [Yichuan Wang](https://yichuan-w.github.io/) & [Zhifei Li](https://github.com/andylizf).
-Active Contributors: [Gabriel Dehan](https://github.com/gabriel-dehan)
+Active Contributors: [Gabriel Dehan](https://github.com/gabriel-dehan), [Aakash Suresh](https://github.com/ASuresh0524)
We welcome more contributors! Feel free to open issues or submit PRs.
diff --git a/apps/slack_data/__init__.py b/apps/slack_data/__init__.py
new file mode 100644
index 0000000..2611c20
--- /dev/null
+++ b/apps/slack_data/__init__.py
@@ -0,0 +1 @@
+# Slack MCP data integration for LEANN
diff --git a/apps/slack_data/slack_mcp_reader.py b/apps/slack_data/slack_mcp_reader.py
new file mode 100644
index 0000000..e55820f
--- /dev/null
+++ b/apps/slack_data/slack_mcp_reader.py
@@ -0,0 +1,334 @@
+#!/usr/bin/env python3
+"""
+Slack MCP Reader for LEANN
+
+This module provides functionality to connect to Slack MCP servers and fetch message data
+for indexing in LEANN. It supports various Slack MCP server implementations and provides
+flexible message processing options.
+"""
+
+import asyncio
+import json
+import logging
+from typing import Any, Optional
+
+logger = logging.getLogger(__name__)
+
+
+class SlackMCPReader:
+ """
+ Reader for Slack data via MCP (Model Context Protocol) servers.
+
+ This class connects to Slack MCP servers to fetch message data and convert it
+ into a format suitable for LEANN indexing.
+ """
+
+ def __init__(
+ self,
+ mcp_server_command: str,
+ workspace_name: Optional[str] = None,
+ concatenate_conversations: bool = True,
+ max_messages_per_conversation: int = 100,
+ ):
+ """
+ Initialize the Slack MCP Reader.
+
+ Args:
+ mcp_server_command: Command to start the MCP server (e.g., 'slack-mcp-server')
+ workspace_name: Optional workspace name to filter messages
+ concatenate_conversations: Whether to group messages by channel/thread
+ max_messages_per_conversation: Maximum messages to include per conversation
+ """
+ self.mcp_server_command = mcp_server_command
+ self.workspace_name = workspace_name
+ self.concatenate_conversations = concatenate_conversations
+ self.max_messages_per_conversation = max_messages_per_conversation
+ self.mcp_process = None
+
+ async def start_mcp_server(self):
+ """Start the MCP server process."""
+ try:
+ self.mcp_process = await asyncio.create_subprocess_exec(
+ *self.mcp_server_command.split(),
+ stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ logger.info(f"Started MCP server: {self.mcp_server_command}")
+ except Exception as e:
+ logger.error(f"Failed to start MCP server: {e}")
+ raise
+
+ async def stop_mcp_server(self):
+ """Stop the MCP server process."""
+ if self.mcp_process:
+ self.mcp_process.terminate()
+ await self.mcp_process.wait()
+ logger.info("Stopped MCP server")
+
+ async def send_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]:
+ """Send a request to the MCP server and get response."""
+ if not self.mcp_process:
+ raise RuntimeError("MCP server not started")
+
+ request_json = json.dumps(request) + "\n"
+ self.mcp_process.stdin.write(request_json.encode())
+ await self.mcp_process.stdin.drain()
+
+ response_line = await self.mcp_process.stdout.readline()
+ if not response_line:
+ raise RuntimeError("No response from MCP server")
+
+ return json.loads(response_line.decode().strip())
+
+ async def initialize_mcp_connection(self):
+ """Initialize the MCP connection."""
+ init_request = {
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": "initialize",
+ "params": {
+ "protocolVersion": "2024-11-05",
+ "capabilities": {},
+ "clientInfo": {"name": "leann-slack-reader", "version": "1.0.0"},
+ },
+ }
+
+ response = await self.send_mcp_request(init_request)
+ if "error" in response:
+ raise RuntimeError(f"MCP initialization failed: {response['error']}")
+
+ logger.info("MCP connection initialized successfully")
+
+ async def list_available_tools(self) -> list[dict[str, Any]]:
+ """List available tools from the MCP server."""
+ list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
+
+ response = await self.send_mcp_request(list_request)
+ if "error" in response:
+ raise RuntimeError(f"Failed to list tools: {response['error']}")
+
+ return response.get("result", {}).get("tools", [])
+
+ async def fetch_slack_messages(
+ self, channel: Optional[str] = None, limit: int = 100
+ ) -> list[dict[str, Any]]:
+ """
+ Fetch Slack messages using MCP tools.
+
+ Args:
+ channel: Optional channel name to filter messages
+ limit: Maximum number of messages to fetch
+
+ Returns:
+ List of message dictionaries
+ """
+ # This is a generic implementation - specific MCP servers may have different tool names
+ # Common tool names might be: 'get_messages', 'list_messages', 'fetch_channel_history'
+
+ tools = await self.list_available_tools()
+ message_tool = None
+
+ # Look for a tool that can fetch messages
+ for tool in tools:
+ tool_name = tool.get("name", "").lower()
+ if any(
+ keyword in tool_name
+ for keyword in ["message", "history", "channel", "conversation"]
+ ):
+ message_tool = tool
+ break
+
+ if not message_tool:
+ raise RuntimeError("No message fetching tool found in MCP server")
+
+ # Prepare tool call parameters
+ tool_params = {"limit": limit}
+ if channel:
+ # Try common parameter names for channel specification
+ for param_name in ["channel", "channel_id", "channel_name"]:
+ tool_params[param_name] = channel
+ break
+
+ fetch_request = {
+ "jsonrpc": "2.0",
+ "id": 3,
+ "method": "tools/call",
+ "params": {"name": message_tool["name"], "arguments": tool_params},
+ }
+
+ response = await self.send_mcp_request(fetch_request)
+ if "error" in response:
+ raise RuntimeError(f"Failed to fetch messages: {response['error']}")
+
+ # Extract messages from response - format may vary by MCP server
+ result = response.get("result", {})
+ if "content" in result and isinstance(result["content"], list):
+ # Some MCP servers return content as a list
+ content = result["content"][0] if result["content"] else {}
+ if "text" in content:
+ try:
+ messages = json.loads(content["text"])
+ except json.JSONDecodeError:
+ # If not JSON, treat as plain text
+ messages = [{"text": content["text"], "channel": channel or "unknown"}]
+ else:
+ messages = result["content"]
+ else:
+ # Direct message format
+ messages = result.get("messages", [result])
+
+ return messages if isinstance(messages, list) else [messages]
+
+ def _format_message(self, message: dict[str, Any]) -> str:
+ """Format a single message for indexing."""
+ text = message.get("text", "")
+ user = message.get("user", message.get("username", "Unknown"))
+ channel = message.get("channel", message.get("channel_name", "Unknown"))
+ timestamp = message.get("ts", message.get("timestamp", ""))
+
+ # Format timestamp if available
+ formatted_time = ""
+ if timestamp:
+ try:
+ import datetime
+
+ if isinstance(timestamp, str) and "." in timestamp:
+ dt = datetime.datetime.fromtimestamp(float(timestamp))
+ formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
+ elif isinstance(timestamp, (int, float)):
+ dt = datetime.datetime.fromtimestamp(timestamp)
+ formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
+ else:
+ formatted_time = str(timestamp)
+ except (ValueError, TypeError):
+ formatted_time = str(timestamp)
+
+ # Build formatted message
+ parts = []
+ if channel:
+ parts.append(f"Channel: #{channel}")
+ if user:
+ parts.append(f"User: {user}")
+ if formatted_time:
+ parts.append(f"Time: {formatted_time}")
+ if text:
+ parts.append(f"Message: {text}")
+
+ return "\n".join(parts)
+
+ def _create_concatenated_content(self, messages: list[dict[str, Any]], channel: str) -> str:
+ """Create concatenated content from multiple messages in a channel."""
+ if not messages:
+ return ""
+
+ # Sort messages by timestamp if available
+ try:
+ messages.sort(key=lambda x: float(x.get("ts", x.get("timestamp", 0))))
+ except (ValueError, TypeError):
+ pass # Keep original order if timestamps aren't numeric
+
+ # Limit messages per conversation
+ if len(messages) > self.max_messages_per_conversation:
+ messages = messages[-self.max_messages_per_conversation :]
+
+ # Create header
+ content_parts = [
+ f"Slack Channel: #{channel}",
+ f"Message Count: {len(messages)}",
+ f"Workspace: {self.workspace_name or 'Unknown'}",
+ "=" * 50,
+ "",
+ ]
+
+ # Add messages
+ for message in messages:
+ formatted_msg = self._format_message(message)
+ if formatted_msg.strip():
+ content_parts.append(formatted_msg)
+ content_parts.append("-" * 30)
+ content_parts.append("")
+
+ return "\n".join(content_parts)
+
+ async def read_slack_data(self, channels: Optional[list[str]] = None) -> list[str]:
+ """
+ Read Slack data and return formatted text chunks.
+
+ Args:
+ channels: Optional list of channel names to fetch. If None, fetches from all available channels.
+
+ Returns:
+ List of formatted text chunks ready for LEANN indexing
+ """
+ try:
+ await self.start_mcp_server()
+ await self.initialize_mcp_connection()
+
+ all_texts = []
+
+ if channels:
+ # Fetch specific channels
+ for channel in channels:
+ try:
+ messages = await self.fetch_slack_messages(channel=channel, limit=1000)
+ if messages:
+ if self.concatenate_conversations:
+ text_content = self._create_concatenated_content(messages, channel)
+ if text_content.strip():
+ all_texts.append(text_content)
+ else:
+ # Process individual messages
+ for message in messages:
+ formatted_msg = self._format_message(message)
+ if formatted_msg.strip():
+ all_texts.append(formatted_msg)
+ except Exception as e:
+ logger.warning(f"Failed to fetch messages from channel {channel}: {e}")
+ continue
+ else:
+ # Fetch from all available channels/conversations
+ # This is a simplified approach - real implementation would need to
+ # discover available channels first
+ try:
+ messages = await self.fetch_slack_messages(limit=1000)
+ if messages:
+ # Group messages by channel if concatenating
+ if self.concatenate_conversations:
+ channel_messages = {}
+ for message in messages:
+ channel = message.get(
+ "channel", message.get("channel_name", "general")
+ )
+ if channel not in channel_messages:
+ channel_messages[channel] = []
+ channel_messages[channel].append(message)
+
+ # Create concatenated content for each channel
+ for channel, msgs in channel_messages.items():
+ text_content = self._create_concatenated_content(msgs, channel)
+ if text_content.strip():
+ all_texts.append(text_content)
+ else:
+ # Process individual messages
+ for message in messages:
+ formatted_msg = self._format_message(message)
+ if formatted_msg.strip():
+ all_texts.append(formatted_msg)
+ except Exception as e:
+ logger.error(f"Failed to fetch messages: {e}")
+
+ return all_texts
+
+ finally:
+ await self.stop_mcp_server()
+
+ async def __aenter__(self):
+ """Async context manager entry."""
+ await self.start_mcp_server()
+ await self.initialize_mcp_connection()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ """Async context manager exit."""
+ await self.stop_mcp_server()
diff --git a/apps/slack_rag.py b/apps/slack_rag.py
new file mode 100644
index 0000000..c3cc0fc
--- /dev/null
+++ b/apps/slack_rag.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python3
+"""
+Slack RAG Application with MCP Support
+
+This application enables RAG (Retrieval-Augmented Generation) on Slack messages
+by connecting to Slack MCP servers to fetch live data and index it in LEANN.
+
+Usage:
+ python -m apps.slack_rag --mcp-server "slack-mcp-server" --query "What did the team discuss about the project?"
+"""
+
+import argparse
+import asyncio
+
+from apps.base_rag_example import BaseRAGExample
+from apps.slack_data.slack_mcp_reader import SlackMCPReader
+
+
+class SlackMCPRAG(BaseRAGExample):
+ """
+ RAG application for Slack messages via MCP servers.
+
+ This class provides a complete RAG pipeline for Slack data, including
+ MCP server connection, data fetching, indexing, and interactive chat.
+ """
+
+ def __init__(self):
+ super().__init__(
+ name="Slack MCP RAG",
+ description="RAG application for Slack messages via MCP servers",
+ default_index_name="slack_messages",
+ )
+
+ def _add_specific_arguments(self, parser: argparse.ArgumentParser):
+ """Add Slack MCP-specific arguments."""
+ parser.add_argument(
+ "--mcp-server",
+ type=str,
+ required=True,
+ help="Command to start the Slack MCP server (e.g., 'slack-mcp-server' or 'npx slack-mcp-server')",
+ )
+
+ parser.add_argument(
+ "--workspace-name",
+ type=str,
+ help="Slack workspace name for better organization and filtering",
+ )
+
+ parser.add_argument(
+ "--channels",
+ nargs="+",
+ help="Specific Slack channels to index (e.g., general random). If not specified, fetches from all available channels",
+ )
+
+ parser.add_argument(
+ "--concatenate-conversations",
+ action="store_true",
+ default=True,
+ help="Group messages by channel/thread for better context (default: True)",
+ )
+
+ parser.add_argument(
+ "--no-concatenate-conversations",
+ action="store_true",
+ help="Process individual messages instead of grouping by channel",
+ )
+
+ parser.add_argument(
+ "--max-messages-per-channel",
+ type=int,
+ default=100,
+ help="Maximum number of messages to include per channel (default: 100)",
+ )
+
+ parser.add_argument(
+ "--test-connection",
+ action="store_true",
+ help="Test MCP server connection and list available tools without indexing",
+ )
+
+ async def test_mcp_connection(self, args) -> bool:
+ """Test the MCP server connection and display available tools."""
+ print(f"Testing connection to MCP server: {args.mcp_server}")
+
+ try:
+ reader = SlackMCPReader(
+ mcp_server_command=args.mcp_server,
+ workspace_name=args.workspace_name,
+ concatenate_conversations=not args.no_concatenate_conversations,
+ max_messages_per_conversation=args.max_messages_per_channel,
+ )
+
+ async with reader:
+ tools = await reader.list_available_tools()
+
+ print("\nโ
Successfully connected to MCP server!")
+ print(f"Available tools ({len(tools)}):")
+
+ for i, tool in enumerate(tools, 1):
+ name = tool.get("name", "Unknown")
+ description = tool.get("description", "No description available")
+ print(f"\n{i}. {name}")
+ print(
+ f" Description: {description[:100]}{'...' if len(description) > 100 else ''}"
+ )
+
+ # Show input schema if available
+ schema = tool.get("inputSchema", {})
+ if schema.get("properties"):
+ props = list(schema["properties"].keys())[:3] # Show first 3 properties
+ print(
+ f" Parameters: {', '.join(props)}{'...' if len(schema['properties']) > 3 else ''}"
+ )
+
+ return True
+
+ except Exception as e:
+ print(f"\nโ Failed to connect to MCP server: {e}")
+ print("\nTroubleshooting tips:")
+ print("1. Make sure the MCP server is installed and accessible")
+ print("2. Check if the server command is correct")
+ print("3. Ensure you have proper authentication/credentials configured")
+ print("4. Try running the MCP server command directly to test it")
+ return False
+
+ async def load_data(self, args) -> list[str]:
+ """Load Slack messages via MCP server."""
+ print(f"Connecting to Slack MCP server: {args.mcp_server}")
+
+ if args.workspace_name:
+ print(f"Workspace: {args.workspace_name}")
+
+ if args.channels:
+ print(f"Channels: {', '.join(args.channels)}")
+ else:
+ print("Fetching from all available channels")
+
+ concatenate = not args.no_concatenate_conversations
+ print(
+ f"Processing mode: {'Concatenated conversations' if concatenate else 'Individual messages'}"
+ )
+
+ try:
+ reader = SlackMCPReader(
+ mcp_server_command=args.mcp_server,
+ workspace_name=args.workspace_name,
+ concatenate_conversations=concatenate,
+ max_messages_per_conversation=args.max_messages_per_channel,
+ )
+
+ texts = await reader.read_slack_data(channels=args.channels)
+
+ if not texts:
+ print("โ No messages found! This could mean:")
+ print("- The MCP server couldn't fetch messages")
+ print("- The specified channels don't exist or are empty")
+ print("- Authentication issues with the Slack workspace")
+ return []
+
+ print(f"โ
Successfully loaded {len(texts)} text chunks from Slack")
+
+ # Show sample of what was loaded
+ if texts:
+ sample_text = texts[0][:200] + "..." if len(texts[0]) > 200 else texts[0]
+ print("\nSample content:")
+ print("-" * 40)
+ print(sample_text)
+ print("-" * 40)
+
+ return texts
+
+ except Exception as e:
+ print(f"โ Error loading Slack data: {e}")
+ print("\nThis might be due to:")
+ print("- MCP server connection issues")
+ print("- Authentication problems")
+ print("- Network connectivity issues")
+ print("- Incorrect channel names")
+ raise
+
+ async def run(self):
+ """Main entry point with MCP connection testing."""
+ args = self.parser.parse_args()
+
+ # Test connection if requested
+ if args.test_connection:
+ success = await self.test_mcp_connection(args)
+ if not success:
+ return
+ print(
+ "\n๐ MCP server is working! You can now run without --test-connection to start indexing."
+ )
+ return
+
+ # Run the standard RAG pipeline
+ await super().run()
+
+
+async def main():
+ """Main entry point for the Slack MCP RAG application."""
+ app = SlackMCPRAG()
+ await app.run()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/apps/twitter_data/__init__.py b/apps/twitter_data/__init__.py
new file mode 100644
index 0000000..e0104f3
--- /dev/null
+++ b/apps/twitter_data/__init__.py
@@ -0,0 +1 @@
+# Twitter MCP data integration for LEANN
diff --git a/apps/twitter_data/twitter_mcp_reader.py b/apps/twitter_data/twitter_mcp_reader.py
new file mode 100644
index 0000000..a4eb909
--- /dev/null
+++ b/apps/twitter_data/twitter_mcp_reader.py
@@ -0,0 +1,295 @@
+#!/usr/bin/env python3
+"""
+Twitter MCP Reader for LEANN
+
+This module provides functionality to connect to Twitter MCP servers and fetch bookmark data
+for indexing in LEANN. It supports various Twitter MCP server implementations and provides
+flexible bookmark processing options.
+"""
+
+import asyncio
+import json
+import logging
+from typing import Any, Optional
+
+logger = logging.getLogger(__name__)
+
+
+class TwitterMCPReader:
+ """
+ Reader for Twitter bookmark data via MCP (Model Context Protocol) servers.
+
+ This class connects to Twitter MCP servers to fetch bookmark data and convert it
+ into a format suitable for LEANN indexing.
+ """
+
+ def __init__(
+ self,
+ mcp_server_command: str,
+ username: Optional[str] = None,
+ include_tweet_content: bool = True,
+ include_metadata: bool = True,
+ max_bookmarks: int = 1000,
+ ):
+ """
+ Initialize the Twitter MCP Reader.
+
+ Args:
+ mcp_server_command: Command to start the MCP server (e.g., 'twitter-mcp-server')
+ username: Optional Twitter username to filter bookmarks
+ include_tweet_content: Whether to include full tweet content
+ include_metadata: Whether to include tweet metadata (likes, retweets, etc.)
+ max_bookmarks: Maximum number of bookmarks to fetch
+ """
+ self.mcp_server_command = mcp_server_command
+ self.username = username
+ self.include_tweet_content = include_tweet_content
+ self.include_metadata = include_metadata
+ self.max_bookmarks = max_bookmarks
+ self.mcp_process = None
+
+ async def start_mcp_server(self):
+ """Start the MCP server process."""
+ try:
+ self.mcp_process = await asyncio.create_subprocess_exec(
+ *self.mcp_server_command.split(),
+ stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ logger.info(f"Started MCP server: {self.mcp_server_command}")
+ except Exception as e:
+ logger.error(f"Failed to start MCP server: {e}")
+ raise
+
+ async def stop_mcp_server(self):
+ """Stop the MCP server process."""
+ if self.mcp_process:
+ self.mcp_process.terminate()
+ await self.mcp_process.wait()
+ logger.info("Stopped MCP server")
+
+ async def send_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]:
+ """Send a request to the MCP server and get response."""
+ if not self.mcp_process:
+ raise RuntimeError("MCP server not started")
+
+ request_json = json.dumps(request) + "\n"
+ self.mcp_process.stdin.write(request_json.encode())
+ await self.mcp_process.stdin.drain()
+
+ response_line = await self.mcp_process.stdout.readline()
+ if not response_line:
+ raise RuntimeError("No response from MCP server")
+
+ return json.loads(response_line.decode().strip())
+
+ async def initialize_mcp_connection(self):
+ """Initialize the MCP connection."""
+ init_request = {
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": "initialize",
+ "params": {
+ "protocolVersion": "2024-11-05",
+ "capabilities": {},
+ "clientInfo": {"name": "leann-twitter-reader", "version": "1.0.0"},
+ },
+ }
+
+ response = await self.send_mcp_request(init_request)
+ if "error" in response:
+ raise RuntimeError(f"MCP initialization failed: {response['error']}")
+
+ logger.info("MCP connection initialized successfully")
+
+ async def list_available_tools(self) -> list[dict[str, Any]]:
+ """List available tools from the MCP server."""
+ list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
+
+ response = await self.send_mcp_request(list_request)
+ if "error" in response:
+ raise RuntimeError(f"Failed to list tools: {response['error']}")
+
+ return response.get("result", {}).get("tools", [])
+
+ async def fetch_twitter_bookmarks(self, limit: Optional[int] = None) -> list[dict[str, Any]]:
+ """
+ Fetch Twitter bookmarks using MCP tools.
+
+ Args:
+ limit: Maximum number of bookmarks to fetch
+
+ Returns:
+ List of bookmark dictionaries
+ """
+ tools = await self.list_available_tools()
+ bookmark_tool = None
+
+ # Look for a tool that can fetch bookmarks
+ for tool in tools:
+ tool_name = tool.get("name", "").lower()
+ if any(keyword in tool_name for keyword in ["bookmark", "saved", "favorite"]):
+ bookmark_tool = tool
+ break
+
+ if not bookmark_tool:
+ raise RuntimeError("No bookmark fetching tool found in MCP server")
+
+ # Prepare tool call parameters
+ tool_params = {}
+ if limit or self.max_bookmarks:
+ tool_params["limit"] = limit or self.max_bookmarks
+ if self.username:
+ tool_params["username"] = self.username
+
+ fetch_request = {
+ "jsonrpc": "2.0",
+ "id": 3,
+ "method": "tools/call",
+ "params": {"name": bookmark_tool["name"], "arguments": tool_params},
+ }
+
+ response = await self.send_mcp_request(fetch_request)
+ if "error" in response:
+ raise RuntimeError(f"Failed to fetch bookmarks: {response['error']}")
+
+ # Extract bookmarks from response
+ result = response.get("result", {})
+ if "content" in result and isinstance(result["content"], list):
+ content = result["content"][0] if result["content"] else {}
+ if "text" in content:
+ try:
+ bookmarks = json.loads(content["text"])
+ except json.JSONDecodeError:
+ # If not JSON, treat as plain text
+ bookmarks = [{"text": content["text"], "source": "twitter"}]
+ else:
+ bookmarks = result["content"]
+ else:
+ bookmarks = result.get("bookmarks", result.get("tweets", [result]))
+
+ return bookmarks if isinstance(bookmarks, list) else [bookmarks]
+
+ def _format_bookmark(self, bookmark: dict[str, Any]) -> str:
+ """Format a single bookmark for indexing."""
+ # Extract tweet information
+ text = bookmark.get("text", bookmark.get("content", ""))
+ author = bookmark.get(
+ "author", bookmark.get("username", bookmark.get("user", {}).get("username", "Unknown"))
+ )
+ timestamp = bookmark.get("created_at", bookmark.get("timestamp", ""))
+ url = bookmark.get("url", bookmark.get("tweet_url", ""))
+
+ # Extract metadata if available
+ likes = bookmark.get("likes", bookmark.get("favorite_count", 0))
+ retweets = bookmark.get("retweets", bookmark.get("retweet_count", 0))
+ replies = bookmark.get("replies", bookmark.get("reply_count", 0))
+
+ # Build formatted bookmark
+ parts = []
+
+ # Header
+ parts.append("=== Twitter Bookmark ===")
+
+ if author:
+ parts.append(f"Author: @{author}")
+
+ if timestamp:
+ # Format timestamp if it's a standard format
+ try:
+ import datetime
+
+ if "T" in str(timestamp): # ISO format
+ dt = datetime.datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
+ formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
+ else:
+ formatted_time = str(timestamp)
+ parts.append(f"Date: {formatted_time}")
+ except (ValueError, TypeError):
+ parts.append(f"Date: {timestamp}")
+
+ if url:
+ parts.append(f"URL: {url}")
+
+ # Tweet content
+ if text and self.include_tweet_content:
+ parts.append("")
+ parts.append("Content:")
+ parts.append(text)
+
+ # Metadata
+ if self.include_metadata and any([likes, retweets, replies]):
+ parts.append("")
+ parts.append("Engagement:")
+ if likes:
+ parts.append(f" Likes: {likes}")
+ if retweets:
+ parts.append(f" Retweets: {retweets}")
+ if replies:
+ parts.append(f" Replies: {replies}")
+
+ # Extract hashtags and mentions if available
+ hashtags = bookmark.get("hashtags", [])
+ mentions = bookmark.get("mentions", [])
+
+ if hashtags or mentions:
+ parts.append("")
+ if hashtags:
+ parts.append(f"Hashtags: {', '.join(hashtags)}")
+ if mentions:
+ parts.append(f"Mentions: {', '.join(mentions)}")
+
+ return "\n".join(parts)
+
+ async def read_twitter_bookmarks(self) -> list[str]:
+ """
+ Read Twitter bookmark data and return formatted text chunks.
+
+ Returns:
+ List of formatted text chunks ready for LEANN indexing
+ """
+ try:
+ await self.start_mcp_server()
+ await self.initialize_mcp_connection()
+
+ print(f"Fetching up to {self.max_bookmarks} bookmarks...")
+ if self.username:
+ print(f"Filtering for user: @{self.username}")
+
+ bookmarks = await self.fetch_twitter_bookmarks()
+
+ if not bookmarks:
+ print("No bookmarks found")
+ return []
+
+ print(f"Processing {len(bookmarks)} bookmarks...")
+
+ all_texts = []
+ processed_count = 0
+
+ for bookmark in bookmarks:
+ try:
+ formatted_bookmark = self._format_bookmark(bookmark)
+ if formatted_bookmark.strip():
+ all_texts.append(formatted_bookmark)
+ processed_count += 1
+ except Exception as e:
+ logger.warning(f"Failed to format bookmark: {e}")
+ continue
+
+ print(f"Successfully processed {processed_count} bookmarks")
+ return all_texts
+
+ finally:
+ await self.stop_mcp_server()
+
+ async def __aenter__(self):
+ """Async context manager entry."""
+ await self.start_mcp_server()
+ await self.initialize_mcp_connection()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ """Async context manager exit."""
+ await self.stop_mcp_server()
diff --git a/apps/twitter_rag.py b/apps/twitter_rag.py
new file mode 100644
index 0000000..a7fd3a4
--- /dev/null
+++ b/apps/twitter_rag.py
@@ -0,0 +1,195 @@
+#!/usr/bin/env python3
+"""
+Twitter RAG Application with MCP Support
+
+This application enables RAG (Retrieval-Augmented Generation) on Twitter bookmarks
+by connecting to Twitter MCP servers to fetch live data and index it in LEANN.
+
+Usage:
+ python -m apps.twitter_rag --mcp-server "twitter-mcp-server" --query "What articles did I bookmark about AI?"
+"""
+
+import argparse
+import asyncio
+
+from apps.base_rag_example import BaseRAGExample
+from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
+
+
+class TwitterMCPRAG(BaseRAGExample):
+ """
+ RAG application for Twitter bookmarks via MCP servers.
+
+ This class provides a complete RAG pipeline for Twitter bookmark data, including
+ MCP server connection, data fetching, indexing, and interactive chat.
+ """
+
+ def __init__(self):
+ super().__init__(
+ name="Twitter MCP RAG",
+ description="RAG application for Twitter bookmarks via MCP servers",
+ default_index_name="twitter_bookmarks",
+ )
+
+ def _add_specific_arguments(self, parser: argparse.ArgumentParser):
+ """Add Twitter MCP-specific arguments."""
+ parser.add_argument(
+ "--mcp-server",
+ type=str,
+ required=True,
+ help="Command to start the Twitter MCP server (e.g., 'twitter-mcp-server' or 'npx twitter-mcp-server')",
+ )
+
+ parser.add_argument(
+ "--username", type=str, help="Twitter username to filter bookmarks (without @)"
+ )
+
+ parser.add_argument(
+ "--max-bookmarks",
+ type=int,
+ default=1000,
+ help="Maximum number of bookmarks to fetch (default: 1000)",
+ )
+
+ parser.add_argument(
+ "--no-tweet-content",
+ action="store_true",
+ help="Exclude tweet content, only include metadata",
+ )
+
+ parser.add_argument(
+ "--no-metadata",
+ action="store_true",
+ help="Exclude engagement metadata (likes, retweets, etc.)",
+ )
+
+ parser.add_argument(
+ "--test-connection",
+ action="store_true",
+ help="Test MCP server connection and list available tools without indexing",
+ )
+
+ async def test_mcp_connection(self, args) -> bool:
+ """Test the MCP server connection and display available tools."""
+ print(f"Testing connection to MCP server: {args.mcp_server}")
+
+ try:
+ reader = TwitterMCPReader(
+ mcp_server_command=args.mcp_server,
+ username=args.username,
+ include_tweet_content=not args.no_tweet_content,
+ include_metadata=not args.no_metadata,
+ max_bookmarks=args.max_bookmarks,
+ )
+
+ async with reader:
+ tools = await reader.list_available_tools()
+
+ print("\nโ
Successfully connected to MCP server!")
+ print(f"Available tools ({len(tools)}):")
+
+ for i, tool in enumerate(tools, 1):
+ name = tool.get("name", "Unknown")
+ description = tool.get("description", "No description available")
+ print(f"\n{i}. {name}")
+ print(
+ f" Description: {description[:100]}{'...' if len(description) > 100 else ''}"
+ )
+
+ # Show input schema if available
+ schema = tool.get("inputSchema", {})
+ if schema.get("properties"):
+ props = list(schema["properties"].keys())[:3] # Show first 3 properties
+ print(
+ f" Parameters: {', '.join(props)}{'...' if len(schema['properties']) > 3 else ''}"
+ )
+
+ return True
+
+ except Exception as e:
+ print(f"\nโ Failed to connect to MCP server: {e}")
+ print("\nTroubleshooting tips:")
+ print("1. Make sure the Twitter MCP server is installed and accessible")
+ print("2. Check if the server command is correct")
+ print("3. Ensure you have proper Twitter API credentials configured")
+ print("4. Verify your Twitter account has bookmarks to fetch")
+ print("5. Try running the MCP server command directly to test it")
+ return False
+
+ async def load_data(self, args) -> list[str]:
+ """Load Twitter bookmarks via MCP server."""
+ print(f"Connecting to Twitter MCP server: {args.mcp_server}")
+
+ if args.username:
+ print(f"Username filter: @{args.username}")
+
+ print(f"Max bookmarks: {args.max_bookmarks}")
+ print(f"Include tweet content: {not args.no_tweet_content}")
+ print(f"Include metadata: {not args.no_metadata}")
+
+ try:
+ reader = TwitterMCPReader(
+ mcp_server_command=args.mcp_server,
+ username=args.username,
+ include_tweet_content=not args.no_tweet_content,
+ include_metadata=not args.no_metadata,
+ max_bookmarks=args.max_bookmarks,
+ )
+
+ texts = await reader.read_twitter_bookmarks()
+
+ if not texts:
+ print("โ No bookmarks found! This could mean:")
+ print("- You don't have any bookmarks on Twitter")
+ print("- The MCP server couldn't access your bookmarks")
+ print("- Authentication issues with Twitter API")
+ print("- The username filter didn't match any bookmarks")
+ return []
+
+ print(f"โ
Successfully loaded {len(texts)} bookmarks from Twitter")
+
+ # Show sample of what was loaded
+ if texts:
+ sample_text = texts[0][:300] + "..." if len(texts[0]) > 300 else texts[0]
+ print("\nSample bookmark:")
+ print("-" * 50)
+ print(sample_text)
+ print("-" * 50)
+
+ return texts
+
+ except Exception as e:
+ print(f"โ Error loading Twitter bookmarks: {e}")
+ print("\nThis might be due to:")
+ print("- MCP server connection issues")
+ print("- Twitter API authentication problems")
+ print("- Network connectivity issues")
+ print("- Rate limiting from Twitter API")
+ raise
+
+ async def run(self):
+ """Main entry point with MCP connection testing."""
+ args = self.parser.parse_args()
+
+ # Test connection if requested
+ if args.test_connection:
+ success = await self.test_mcp_connection(args)
+ if not success:
+ return
+ print(
+ "\n๐ MCP server is working! You can now run without --test-connection to start indexing."
+ )
+ return
+
+ # Run the standard RAG pipeline
+ await super().run()
+
+
+async def main():
+ """Main entry point for the Twitter MCP RAG application."""
+ app = TwitterMCPRAG()
+ await app.run()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/mcp_integration_demo.py b/examples/mcp_integration_demo.py
new file mode 100644
index 0000000..6fc4cfc
--- /dev/null
+++ b/examples/mcp_integration_demo.py
@@ -0,0 +1,178 @@
+#!/usr/bin/env python3
+"""
+MCP Integration Examples for LEANN
+
+This script demonstrates how to use LEANN with different MCP servers for
+RAG on various platforms like Slack and Twitter.
+
+Examples:
+1. Slack message RAG via MCP
+2. Twitter bookmark RAG via MCP
+3. Testing MCP server connections
+"""
+
+import asyncio
+import sys
+from pathlib import Path
+
+# Add the parent directory to the path so we can import from apps
+sys.path.append(str(Path(__file__).parent.parent))
+
+
+async def demo_slack_mcp():
+ """Demonstrate Slack MCP integration."""
+ print("=" * 60)
+ print("๐ฅ Slack MCP RAG Demo")
+ print("=" * 60)
+
+ print("\n1. Testing Slack MCP server connection...")
+
+ # This would typically use a real MCP server command
+ # For demo purposes, we show what the command would look like
+ # slack_app = SlackMCPRAG() # Would be used for actual testing
+
+ # Simulate command line arguments for testing
+ class MockArgs:
+ mcp_server = "slack-mcp-server" # This would be the actual MCP server command
+ workspace_name = "my-workspace"
+ channels = ["general", "random", "dev-team"]
+ no_concatenate_conversations = False
+ max_messages_per_channel = 50
+ test_connection = True
+
+ print(f"MCP Server Command: {MockArgs.mcp_server}")
+ print(f"Workspace: {MockArgs.workspace_name}")
+ print(f"Channels: {', '.join(MockArgs.channels)}")
+
+ # In a real scenario, you would run:
+ # success = await slack_app.test_mcp_connection(MockArgs)
+
+ print("\n๐ Example usage:")
+ print("python -m apps.slack_rag \\")
+ print(" --mcp-server 'slack-mcp-server' \\")
+ print(" --workspace-name 'my-team' \\")
+ print(" --channels general dev-team \\")
+ print(" --test-connection")
+
+ print("\n๐ After indexing, you could query:")
+ print("- 'What did the team discuss about the project deadline?'")
+ print("- 'Find messages about the new feature launch'")
+ print("- 'Show me conversations about budget planning'")
+
+
+async def demo_twitter_mcp():
+ """Demonstrate Twitter MCP integration."""
+ print("\n" + "=" * 60)
+ print("๐ฆ Twitter MCP RAG Demo")
+ print("=" * 60)
+
+ print("\n1. Testing Twitter MCP server connection...")
+
+ # twitter_app = TwitterMCPRAG() # Would be used for actual testing
+
+ class MockArgs:
+ mcp_server = "twitter-mcp-server"
+ username = None # Fetch all bookmarks
+ max_bookmarks = 500
+ no_tweet_content = False
+ no_metadata = False
+ test_connection = True
+
+ print(f"MCP Server Command: {MockArgs.mcp_server}")
+ print(f"Max Bookmarks: {MockArgs.max_bookmarks}")
+ print(f"Include Content: {not MockArgs.no_tweet_content}")
+ print(f"Include Metadata: {not MockArgs.no_metadata}")
+
+ print("\n๐ Example usage:")
+ print("python -m apps.twitter_rag \\")
+ print(" --mcp-server 'twitter-mcp-server' \\")
+ print(" --max-bookmarks 1000 \\")
+ print(" --test-connection")
+
+ print("\n๐ After indexing, you could query:")
+ print("- 'What AI articles did I bookmark last month?'")
+ print("- 'Find tweets about machine learning techniques'")
+ print("- 'Show me bookmarked threads about startup advice'")
+
+
+async def show_mcp_server_setup():
+ """Show how to set up MCP servers."""
+ print("\n" + "=" * 60)
+ print("โ๏ธ MCP Server Setup Guide")
+ print("=" * 60)
+
+ print("\n๐ง Setting up Slack MCP Server:")
+ print("1. Install a Slack MCP server (example commands):")
+ print(" npm install -g slack-mcp-server")
+ print(" # OR")
+ print(" pip install slack-mcp-server")
+
+ print("\n2. Configure Slack credentials:")
+ print(" export SLACK_BOT_TOKEN='xoxb-your-bot-token'")
+ print(" export SLACK_APP_TOKEN='xapp-your-app-token'")
+
+ print("\n3. Test the server:")
+ print(" slack-mcp-server --help")
+
+ print("\n๐ง Setting up Twitter MCP Server:")
+ print("1. Install a Twitter MCP server:")
+ print(" npm install -g twitter-mcp-server")
+ print(" # OR")
+ print(" pip install twitter-mcp-server")
+
+ print("\n2. Configure Twitter API credentials:")
+ print(" export TWITTER_API_KEY='your-api-key'")
+ print(" export TWITTER_API_SECRET='your-api-secret'")
+ print(" export TWITTER_ACCESS_TOKEN='your-access-token'")
+ print(" export TWITTER_ACCESS_TOKEN_SECRET='your-access-token-secret'")
+
+ print("\n3. Test the server:")
+ print(" twitter-mcp-server --help")
+
+
+async def show_integration_benefits():
+ """Show the benefits of MCP integration."""
+ print("\n" + "=" * 60)
+ print("๐ Benefits of MCP Integration")
+ print("=" * 60)
+
+ benefits = [
+ ("๐ Live Data Access", "Fetch real-time data from platforms without manual exports"),
+ ("๐ Standardized Protocol", "Use any MCP-compatible server with minimal code changes"),
+ ("๐ Easy Extension", "Add new platforms by implementing MCP readers"),
+ ("๐ Secure Access", "MCP servers handle authentication and API management"),
+ ("๐ Rich Metadata", "Access full platform metadata (timestamps, engagement, etc.)"),
+ ("โก Efficient Processing", "Stream data directly into LEANN without intermediate files"),
+ ]
+
+ for title, description in benefits:
+ print(f"\n{title}")
+ print(f" {description}")
+
+
+async def main():
+ """Main demo function."""
+ print("๐ฏ LEANN MCP Integration Examples")
+ print("This demo shows how to integrate LEANN with MCP servers for various platforms.")
+
+ await demo_slack_mcp()
+ await demo_twitter_mcp()
+ await show_mcp_server_setup()
+ await show_integration_benefits()
+
+ print("\n" + "=" * 60)
+ print("โจ Next Steps")
+ print("=" * 60)
+ print("1. Install and configure MCP servers for your platforms")
+ print("2. Test connections using --test-connection flag")
+ print("3. Run indexing to build your RAG knowledge base")
+ print("4. Start querying your personal data!")
+
+ print("\n๐ For more information:")
+ print("- Check the README for detailed setup instructions")
+ print("- Look at the apps/slack_rag.py and apps/twitter_rag.py for implementation details")
+ print("- Explore other MCP servers for additional platforms")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/packages/leann-core/src/leann/embedding_compute.py b/packages/leann-core/src/leann/embedding_compute.py
index a01bd3b..06fba3d 100644
--- a/packages/leann-core/src/leann/embedding_compute.py
+++ b/packages/leann-core/src/leann/embedding_compute.py
@@ -183,32 +183,73 @@ def compute_embeddings_sentence_transformers(
}
try:
- # Try local loading first
- model_kwargs["local_files_only"] = True
- tokenizer_kwargs["local_files_only"] = True
+ # Try loading with advanced parameters first (newer versions)
+ local_model_kwargs = model_kwargs.copy()
+ local_tokenizer_kwargs = tokenizer_kwargs.copy()
+ local_model_kwargs["local_files_only"] = True
+ local_tokenizer_kwargs["local_files_only"] = True
model = SentenceTransformer(
model_name,
device=device,
- model_kwargs=model_kwargs,
- tokenizer_kwargs=tokenizer_kwargs,
+ model_kwargs=local_model_kwargs,
+ tokenizer_kwargs=local_tokenizer_kwargs,
local_files_only=True,
)
logger.info("Model loaded successfully! (local + optimized)")
+ except TypeError as e:
+ if "model_kwargs" in str(e) or "tokenizer_kwargs" in str(e):
+ logger.warning(
+ f"Advanced parameters not supported ({e}), using basic initialization..."
+ )
+ # Fallback to basic initialization for older versions
+ try:
+ model = SentenceTransformer(
+ model_name,
+ device=device,
+ local_files_only=True,
+ )
+ logger.info("Model loaded successfully! (local + basic)")
+ except Exception as e2:
+ logger.warning(f"Local loading failed ({e2}), trying network download...")
+ model = SentenceTransformer(
+ model_name,
+ device=device,
+ local_files_only=False,
+ )
+ logger.info("Model loaded successfully! (network + basic)")
+ else:
+ raise
except Exception as e:
logger.warning(f"Local loading failed ({e}), trying network download...")
- # Fallback to network loading
- model_kwargs["local_files_only"] = False
- tokenizer_kwargs["local_files_only"] = False
+ # Fallback to network loading with advanced parameters
+ try:
+ network_model_kwargs = model_kwargs.copy()
+ network_tokenizer_kwargs = tokenizer_kwargs.copy()
+ network_model_kwargs["local_files_only"] = False
+ network_tokenizer_kwargs["local_files_only"] = False
- model = SentenceTransformer(
- model_name,
- device=device,
- model_kwargs=model_kwargs,
- tokenizer_kwargs=tokenizer_kwargs,
- local_files_only=False,
- )
- logger.info("Model loaded successfully! (network + optimized)")
+ model = SentenceTransformer(
+ model_name,
+ device=device,
+ model_kwargs=network_model_kwargs,
+ tokenizer_kwargs=network_tokenizer_kwargs,
+ local_files_only=False,
+ )
+ logger.info("Model loaded successfully! (network + optimized)")
+ except TypeError as e2:
+ if "model_kwargs" in str(e2) or "tokenizer_kwargs" in str(e2):
+ logger.warning(
+ f"Advanced parameters not supported ({e2}), using basic network loading..."
+ )
+ model = SentenceTransformer(
+ model_name,
+ device=device,
+ local_files_only=False,
+ )
+ logger.info("Model loaded successfully! (network + basic)")
+ else:
+ raise
# Apply additional optimizations based on mode
if use_fp16 and device in ["cuda", "mps"]:
diff --git a/tests/test_mcp_integration.py b/tests/test_mcp_integration.py
new file mode 100644
index 0000000..99a5cdb
--- /dev/null
+++ b/tests/test_mcp_integration.py
@@ -0,0 +1,208 @@
+#!/usr/bin/env python3
+"""
+Test script for MCP integration implementations.
+
+This script tests the basic functionality of the MCP readers and RAG applications
+without requiring actual MCP servers to be running.
+"""
+
+import sys
+from pathlib import Path
+
+# Add the parent directory to the path so we can import from apps
+sys.path.append(str(Path(__file__).parent.parent))
+
+from apps.slack_data.slack_mcp_reader import SlackMCPReader
+from apps.slack_rag import SlackMCPRAG
+from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
+from apps.twitter_rag import TwitterMCPRAG
+
+
+def test_slack_reader_initialization():
+ """Test that SlackMCPReader can be initialized with various parameters."""
+ print("Testing SlackMCPReader initialization...")
+
+ # Test basic initialization
+ reader = SlackMCPReader("slack-mcp-server")
+ assert reader.mcp_server_command == "slack-mcp-server"
+ assert reader.concatenate_conversations
+ assert reader.max_messages_per_conversation == 100
+
+ # Test with custom parameters
+ reader = SlackMCPReader(
+ "custom-slack-server",
+ workspace_name="test-workspace",
+ concatenate_conversations=False,
+ max_messages_per_conversation=50,
+ )
+ assert reader.workspace_name == "test-workspace"
+ assert not reader.concatenate_conversations
+ assert reader.max_messages_per_conversation == 50
+
+ print("โ
SlackMCPReader initialization tests passed")
+
+
+def test_twitter_reader_initialization():
+ """Test that TwitterMCPReader can be initialized with various parameters."""
+ print("Testing TwitterMCPReader initialization...")
+
+ # Test basic initialization
+ reader = TwitterMCPReader("twitter-mcp-server")
+ assert reader.mcp_server_command == "twitter-mcp-server"
+ assert reader.include_tweet_content
+ assert reader.include_metadata
+ assert reader.max_bookmarks == 1000
+
+ # Test with custom parameters
+ reader = TwitterMCPReader(
+ "custom-twitter-server",
+ username="testuser",
+ include_tweet_content=False,
+ include_metadata=False,
+ max_bookmarks=500,
+ )
+ assert reader.username == "testuser"
+ assert not reader.include_tweet_content
+ assert not reader.include_metadata
+ assert reader.max_bookmarks == 500
+
+ print("โ
TwitterMCPReader initialization tests passed")
+
+
+def test_slack_message_formatting():
+ """Test Slack message formatting functionality."""
+ print("Testing Slack message formatting...")
+
+ reader = SlackMCPReader("slack-mcp-server")
+
+ # Test basic message formatting
+ message = {
+ "text": "Hello, world!",
+ "user": "john_doe",
+ "channel": "general",
+ "ts": "1234567890.123456",
+ }
+
+ formatted = reader._format_message(message)
+ assert "Channel: #general" in formatted
+ assert "User: john_doe" in formatted
+ assert "Message: Hello, world!" in formatted
+ assert "Time:" in formatted
+
+ # Test with missing fields
+ message = {"text": "Simple message"}
+ formatted = reader._format_message(message)
+ assert "Message: Simple message" in formatted
+
+ print("โ
Slack message formatting tests passed")
+
+
+def test_twitter_bookmark_formatting():
+ """Test Twitter bookmark formatting functionality."""
+ print("Testing Twitter bookmark formatting...")
+
+ reader = TwitterMCPReader("twitter-mcp-server")
+
+ # Test basic bookmark formatting
+ bookmark = {
+ "text": "This is a great article about AI!",
+ "author": "ai_researcher",
+ "created_at": "2024-01-01T12:00:00Z",
+ "url": "https://twitter.com/ai_researcher/status/123456789",
+ "likes": 42,
+ "retweets": 15,
+ }
+
+ formatted = reader._format_bookmark(bookmark)
+ assert "=== Twitter Bookmark ===" in formatted
+ assert "Author: @ai_researcher" in formatted
+ assert "Content:" in formatted
+ assert "This is a great article about AI!" in formatted
+ assert "URL: https://twitter.com" in formatted
+ assert "Likes: 42" in formatted
+ assert "Retweets: 15" in formatted
+
+ # Test with minimal data
+ bookmark = {"text": "Simple tweet"}
+ formatted = reader._format_bookmark(bookmark)
+ assert "=== Twitter Bookmark ===" in formatted
+ assert "Simple tweet" in formatted
+
+ print("โ
Twitter bookmark formatting tests passed")
+
+
+def test_slack_rag_initialization():
+ """Test that SlackMCPRAG can be initialized."""
+ print("Testing SlackMCPRAG initialization...")
+
+ app = SlackMCPRAG()
+ assert app.default_index_name == "slack_messages"
+ assert hasattr(app, "parser")
+
+ print("โ
SlackMCPRAG initialization tests passed")
+
+
+def test_twitter_rag_initialization():
+ """Test that TwitterMCPRAG can be initialized."""
+ print("Testing TwitterMCPRAG initialization...")
+
+ app = TwitterMCPRAG()
+ assert app.default_index_name == "twitter_bookmarks"
+ assert hasattr(app, "parser")
+
+ print("โ
TwitterMCPRAG initialization tests passed")
+
+
+def test_concatenated_content_creation():
+ """Test creation of concatenated content from multiple messages."""
+ print("Testing concatenated content creation...")
+
+ reader = SlackMCPReader("slack-mcp-server", workspace_name="test-workspace")
+
+ messages = [
+ {"text": "First message", "user": "alice", "ts": "1000"},
+ {"text": "Second message", "user": "bob", "ts": "2000"},
+ {"text": "Third message", "user": "charlie", "ts": "3000"},
+ ]
+
+ content = reader._create_concatenated_content(messages, "general")
+
+ assert "Slack Channel: #general" in content
+ assert "Message Count: 3" in content
+ assert "Workspace: test-workspace" in content
+ assert "First message" in content
+ assert "Second message" in content
+ assert "Third message" in content
+
+ print("โ
Concatenated content creation tests passed")
+
+
+def main():
+ """Run all tests."""
+ print("๐งช Running MCP Integration Tests")
+ print("=" * 50)
+
+ try:
+ test_slack_reader_initialization()
+ test_twitter_reader_initialization()
+ test_slack_message_formatting()
+ test_twitter_bookmark_formatting()
+ test_slack_rag_initialization()
+ test_twitter_rag_initialization()
+ test_concatenated_content_creation()
+
+ print("\n" + "=" * 50)
+ print("๐ All tests passed! MCP integration is working correctly.")
+ print("\nNext steps:")
+ print("1. Install actual MCP servers for Slack and Twitter")
+ print("2. Configure API credentials")
+ print("3. Test with --test-connection flag")
+ print("4. Start indexing your live data!")
+
+ except Exception as e:
+ print(f"\nโ Test failed: {e}")
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/test_mcp_standalone.py b/tests/test_mcp_standalone.py
new file mode 100644
index 0000000..c6c6ccd
--- /dev/null
+++ b/tests/test_mcp_standalone.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+"""
+Standalone test script for MCP integration implementations.
+
+This script tests the basic functionality of the MCP readers
+without requiring LEANN core dependencies.
+"""
+
+import json
+import sys
+from pathlib import Path
+
+# Add the parent directory to the path so we can import from apps
+sys.path.append(str(Path(__file__).parent.parent))
+
+
+def test_slack_reader_basic():
+ """Test basic SlackMCPReader functionality without async operations."""
+ print("Testing SlackMCPReader basic functionality...")
+
+ # Import and test initialization
+ from apps.slack_data.slack_mcp_reader import SlackMCPReader
+
+ reader = SlackMCPReader("slack-mcp-server")
+ assert reader.mcp_server_command == "slack-mcp-server"
+ assert reader.concatenate_conversations
+
+ # Test message formatting
+ message = {
+ "text": "Hello team! How's the project going?",
+ "user": "john_doe",
+ "channel": "general",
+ "ts": "1234567890.123456",
+ }
+
+ formatted = reader._format_message(message)
+ assert "Channel: #general" in formatted
+ assert "User: john_doe" in formatted
+ assert "Message: Hello team!" in formatted
+
+ # Test concatenated content creation
+ messages = [
+ {"text": "First message", "user": "alice", "ts": "1000"},
+ {"text": "Second message", "user": "bob", "ts": "2000"},
+ ]
+
+ content = reader._create_concatenated_content(messages, "dev-team")
+ assert "Slack Channel: #dev-team" in content
+ assert "Message Count: 2" in content
+ assert "First message" in content
+ assert "Second message" in content
+
+ print("โ
SlackMCPReader basic tests passed")
+
+
+def test_twitter_reader_basic():
+ """Test basic TwitterMCPReader functionality."""
+ print("Testing TwitterMCPReader basic functionality...")
+
+ from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
+
+ reader = TwitterMCPReader("twitter-mcp-server")
+ assert reader.mcp_server_command == "twitter-mcp-server"
+ assert reader.include_tweet_content
+ assert reader.max_bookmarks == 1000
+
+ # Test bookmark formatting
+ bookmark = {
+ "text": "Amazing article about the future of AI! Must read for everyone interested in tech.",
+ "author": "tech_guru",
+ "created_at": "2024-01-15T14:30:00Z",
+ "url": "https://twitter.com/tech_guru/status/123456789",
+ "likes": 156,
+ "retweets": 42,
+ "replies": 23,
+ "hashtags": ["AI", "tech", "future"],
+ "mentions": ["@openai", "@anthropic"],
+ }
+
+ formatted = reader._format_bookmark(bookmark)
+ assert "=== Twitter Bookmark ===" in formatted
+ assert "Author: @tech_guru" in formatted
+ assert "Amazing article about the future of AI!" in formatted
+ assert "Likes: 156" in formatted
+ assert "Retweets: 42" in formatted
+ assert "Hashtags: AI, tech, future" in formatted
+ assert "Mentions: @openai, @anthropic" in formatted
+
+ # Test with minimal data
+ simple_bookmark = {"text": "Short tweet", "author": "user123"}
+ formatted_simple = reader._format_bookmark(simple_bookmark)
+ assert "=== Twitter Bookmark ===" in formatted_simple
+ assert "Short tweet" in formatted_simple
+ assert "Author: @user123" in formatted_simple
+
+ print("โ
TwitterMCPReader basic tests passed")
+
+
+def test_mcp_request_format():
+ """Test MCP request formatting."""
+ print("Testing MCP request formatting...")
+
+ # Test initialization request format
+ init_request = {
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": "initialize",
+ "params": {
+ "protocolVersion": "2024-11-05",
+ "capabilities": {},
+ "clientInfo": {"name": "leann-slack-reader", "version": "1.0.0"},
+ },
+ }
+
+ # Verify it's valid JSON
+ json_str = json.dumps(init_request)
+ parsed = json.loads(json_str)
+ assert parsed["jsonrpc"] == "2.0"
+ assert parsed["method"] == "initialize"
+ assert parsed["params"]["protocolVersion"] == "2024-11-05"
+
+ # Test tools/list request
+ list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
+
+ json_str = json.dumps(list_request)
+ parsed = json.loads(json_str)
+ assert parsed["method"] == "tools/list"
+
+ print("โ
MCP request formatting tests passed")
+
+
+def test_data_processing():
+ """Test data processing capabilities."""
+ print("Testing data processing capabilities...")
+
+ from apps.slack_data.slack_mcp_reader import SlackMCPReader
+ from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
+
+ # Test Slack message processing with various formats
+ slack_reader = SlackMCPReader("test-server")
+
+ messages_with_timestamps = [
+ {"text": "Meeting in 5 minutes", "user": "alice", "ts": "1000.123"},
+ {"text": "On my way!", "user": "bob", "ts": "1001.456"},
+ {"text": "Starting now", "user": "charlie", "ts": "1002.789"},
+ ]
+
+ content = slack_reader._create_concatenated_content(messages_with_timestamps, "meetings")
+ assert "Meeting in 5 minutes" in content
+ assert "On my way!" in content
+ assert "Starting now" in content
+
+ # Test Twitter bookmark processing with engagement data
+ twitter_reader = TwitterMCPReader("test-server", include_metadata=True)
+
+ high_engagement_bookmark = {
+ "text": "Thread about startup lessons learned ๐งต",
+ "author": "startup_founder",
+ "likes": 1250,
+ "retweets": 340,
+ "replies": 89,
+ }
+
+ formatted = twitter_reader._format_bookmark(high_engagement_bookmark)
+ assert "Thread about startup lessons learned" in formatted
+ assert "Likes: 1250" in formatted
+ assert "Retweets: 340" in formatted
+ assert "Replies: 89" in formatted
+
+ # Test with metadata disabled
+ twitter_reader_no_meta = TwitterMCPReader("test-server", include_metadata=False)
+ formatted_no_meta = twitter_reader_no_meta._format_bookmark(high_engagement_bookmark)
+ assert "Thread about startup lessons learned" in formatted_no_meta
+ assert "Likes:" not in formatted_no_meta
+ assert "Retweets:" not in formatted_no_meta
+
+ print("โ
Data processing tests passed")
+
+
+def main():
+ """Run all standalone tests."""
+ print("๐งช Running MCP Integration Standalone Tests")
+ print("=" * 60)
+ print("Testing core functionality without LEANN dependencies...")
+ print()
+
+ try:
+ test_slack_reader_basic()
+ test_twitter_reader_basic()
+ test_mcp_request_format()
+ test_data_processing()
+
+ print("\n" + "=" * 60)
+ print("๐ All standalone tests passed!")
+ print("\nโจ MCP Integration Summary:")
+ print("- SlackMCPReader: Ready for Slack message processing")
+ print("- TwitterMCPReader: Ready for Twitter bookmark processing")
+ print("- MCP Protocol: Properly formatted JSON-RPC requests")
+ print("- Data Processing: Handles various message/bookmark formats")
+
+ print("\n๐ Next Steps:")
+ print("1. Install MCP servers: npm install -g slack-mcp-server twitter-mcp-server")
+ print("2. Configure API credentials for Slack and Twitter")
+ print("3. Test connections: python -m apps.slack_rag --test-connection")
+ print("4. Start indexing live data from your platforms!")
+
+ print("\n๐ Documentation:")
+ print("- Check README.md for detailed setup instructions")
+ print("- Run examples/mcp_integration_demo.py for usage examples")
+ print("- Explore apps/slack_rag.py and apps/twitter_rag.py for implementation details")
+
+ except Exception as e:
+ print(f"\nโ Test failed: {e}")
+ import traceback
+
+ traceback.print_exc()
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()