From 5c7210d6d1e9470c910e14e4181b157b2f0d962b Mon Sep 17 00:00:00 2001 From: aakash Date: Fri, 3 Oct 2025 19:57:51 -0700 Subject: [PATCH] feat: Add MCP integration support for Slack and Twitter - Implement SlackMCPReader for connecting to Slack MCP servers - Implement TwitterMCPReader for connecting to Twitter MCP servers - Add SlackRAG and TwitterRAG applications with full CLI support - Support live data fetching via Model Context Protocol (MCP) - Add comprehensive documentation and usage examples - Include connection testing capabilities with --test-connection flag - Add standalone tests for core functionality - Update README with detailed MCP integration guide Resolves #36 --- README.md | 119 ++++++++- apps/slack_data/__init__.py | 1 + apps/slack_data/slack_mcp_reader.py | 334 ++++++++++++++++++++++++ apps/slack_rag.py | 204 +++++++++++++++ apps/twitter_data/__init__.py | 1 + apps/twitter_data/twitter_mcp_reader.py | 295 +++++++++++++++++++++ apps/twitter_rag.py | 190 ++++++++++++++ examples/mcp_integration_demo.py | 181 +++++++++++++ tests/test_mcp_integration.py | 209 +++++++++++++++ tests/test_mcp_standalone.py | 225 ++++++++++++++++ 10 files changed, 1758 insertions(+), 1 deletion(-) create mode 100644 apps/slack_data/__init__.py create mode 100644 apps/slack_data/slack_mcp_reader.py create mode 100644 apps/slack_rag.py create mode 100644 apps/twitter_data/__init__.py create mode 100644 apps/twitter_data/twitter_mcp_reader.py create mode 100644 apps/twitter_rag.py create mode 100644 examples/mcp_integration_demo.py create mode 100644 tests/test_mcp_integration.py create mode 100644 tests/test_mcp_standalone.py diff --git a/README.md b/README.md index 929e755..dbf2f67 100755 --- a/README.md +++ b/README.md @@ -176,7 +176,7 @@ response = chat.ask("How much storage does LEANN save?", top_k=1) ## RAG on Everything! -LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and more. +LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and **live data from any platform through MCP (Model Context Protocol) servers** - including Slack, Twitter, and more. @@ -709,6 +709,123 @@ Once your iMessage conversations are indexed, you can search with queries like: +### ๐Ÿ”Œ MCP Integration: RAG on Live Data from Any Platform! + +**NEW!** Connect to live data sources through the Model Context Protocol (MCP). LEANN now supports real-time RAG on platforms like Slack, Twitter, and more through standardized MCP servers. + +**Key Benefits:** +- ๐Ÿ”„ **Live Data Access**: Fetch real-time data without manual exports +- ๐Ÿ”Œ **Standardized Protocol**: Use any MCP-compatible server +- ๐Ÿš€ **Easy Extension**: Add new platforms with minimal code +- ๐Ÿ”’ **Secure Access**: MCP servers handle authentication + +
+๐Ÿ’ฌ Slack Messages: Search Your Team Conversations + +Transform your Slack workspace into a searchable knowledge base! Find discussions, decisions, and shared knowledge across all your channels. + +```bash +# Test MCP server connection +python -m apps.slack_rag --mcp-server "slack-mcp-server" --test-connection + +# Index and search Slack messages +python -m apps.slack_rag \ + --mcp-server "slack-mcp-server" \ + --workspace-name "my-team" \ + --channels general dev-team random \ + --query "What did we decide about the product launch?" +``` + +**Setup Requirements:** +1. Install a Slack MCP server (e.g., `npm install -g slack-mcp-server`) +2. Configure Slack API credentials: + ```bash + export SLACK_BOT_TOKEN="xoxb-your-bot-token" + export SLACK_APP_TOKEN="xapp-your-app-token" + ``` +3. Test connection with `--test-connection` flag + +**Arguments:** +- `--mcp-server`: Command to start the Slack MCP server +- `--workspace-name`: Slack workspace name for organization +- `--channels`: Specific channels to index (optional) +- `--concatenate-conversations`: Group messages by channel (default: true) +- `--max-messages-per-channel`: Limit messages per channel (default: 100) + +
+ +
+๐Ÿฆ Twitter Bookmarks: Your Personal Tweet Library + +Search through your Twitter bookmarks! Find that perfect article, thread, or insight you saved for later. + +```bash +# Test MCP server connection +python -m apps.twitter_rag --mcp-server "twitter-mcp-server" --test-connection + +# Index and search Twitter bookmarks +python -m apps.twitter_rag \ + --mcp-server "twitter-mcp-server" \ + --max-bookmarks 1000 \ + --query "What AI articles did I bookmark about machine learning?" +``` + +**Setup Requirements:** +1. Install a Twitter MCP server (e.g., `npm install -g twitter-mcp-server`) +2. Configure Twitter API credentials: + ```bash + export TWITTER_API_KEY="your-api-key" + export TWITTER_API_SECRET="your-api-secret" + export TWITTER_ACCESS_TOKEN="your-access-token" + export TWITTER_ACCESS_TOKEN_SECRET="your-access-token-secret" + ``` +3. Test connection with `--test-connection` flag + +**Arguments:** +- `--mcp-server`: Command to start the Twitter MCP server +- `--username`: Filter bookmarks by username (optional) +- `--max-bookmarks`: Maximum bookmarks to fetch (default: 1000) +- `--no-tweet-content`: Exclude tweet content, only metadata +- `--no-metadata`: Exclude engagement metadata + +
+ +
+๐Ÿ’ก Click to expand: Example queries you can try + +**Slack Queries:** +- "What did the team discuss about the project deadline?" +- "Find messages about the new feature launch" +- "Show me conversations about budget planning" +- "What decisions were made in the dev-team channel?" + +**Twitter Queries:** +- "What AI articles did I bookmark last month?" +- "Find tweets about machine learning techniques" +- "Show me bookmarked threads about startup advice" +- "What Python tutorials did I save?" + +
+ +
+๐Ÿ”ง Adding New MCP Platforms + +Want to add support for other platforms? LEANN's MCP integration is designed for easy extension: + +1. **Find or create an MCP server** for your platform +2. **Create a reader class** following the pattern in `apps/slack_data/slack_mcp_reader.py` +3. **Create a RAG application** following the pattern in `apps/slack_rag.py` +4. **Test and contribute** back to the community! + +**Popular MCP servers to explore:** +- GitHub repositories and issues +- Discord messages +- Notion pages +- Google Drive documents +- And many more in the MCP ecosystem! + +
+ ### ๐Ÿš€ Claude Code Integration: Transform Your Development Workflow!
diff --git a/apps/slack_data/__init__.py b/apps/slack_data/__init__.py new file mode 100644 index 0000000..2611c20 --- /dev/null +++ b/apps/slack_data/__init__.py @@ -0,0 +1 @@ +# Slack MCP data integration for LEANN diff --git a/apps/slack_data/slack_mcp_reader.py b/apps/slack_data/slack_mcp_reader.py new file mode 100644 index 0000000..52b12d3 --- /dev/null +++ b/apps/slack_data/slack_mcp_reader.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python3 +""" +Slack MCP Reader for LEANN + +This module provides functionality to connect to Slack MCP servers and fetch message data +for indexing in LEANN. It supports various Slack MCP server implementations and provides +flexible message processing options. +""" + +import asyncio +import json +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +class SlackMCPReader: + """ + Reader for Slack data via MCP (Model Context Protocol) servers. + + This class connects to Slack MCP servers to fetch message data and convert it + into a format suitable for LEANN indexing. + """ + + def __init__( + self, + mcp_server_command: str, + workspace_name: Optional[str] = None, + concatenate_conversations: bool = True, + max_messages_per_conversation: int = 100, + ): + """ + Initialize the Slack MCP Reader. + + Args: + mcp_server_command: Command to start the MCP server (e.g., 'slack-mcp-server') + workspace_name: Optional workspace name to filter messages + concatenate_conversations: Whether to group messages by channel/thread + max_messages_per_conversation: Maximum messages to include per conversation + """ + self.mcp_server_command = mcp_server_command + self.workspace_name = workspace_name + self.concatenate_conversations = concatenate_conversations + self.max_messages_per_conversation = max_messages_per_conversation + self.mcp_process = None + + async def start_mcp_server(self): + """Start the MCP server process.""" + try: + self.mcp_process = await asyncio.create_subprocess_exec( + *self.mcp_server_command.split(), + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + logger.info(f"Started MCP server: {self.mcp_server_command}") + except Exception as e: + logger.error(f"Failed to start MCP server: {e}") + raise + + async def stop_mcp_server(self): + """Stop the MCP server process.""" + if self.mcp_process: + self.mcp_process.terminate() + await self.mcp_process.wait() + logger.info("Stopped MCP server") + + async def send_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Send a request to the MCP server and get response.""" + if not self.mcp_process: + raise RuntimeError("MCP server not started") + + request_json = json.dumps(request) + "\n" + self.mcp_process.stdin.write(request_json.encode()) + await self.mcp_process.stdin.drain() + + response_line = await self.mcp_process.stdout.readline() + if not response_line: + raise RuntimeError("No response from MCP server") + + return json.loads(response_line.decode().strip()) + + async def initialize_mcp_connection(self): + """Initialize the MCP connection.""" + init_request = { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "leann-slack-reader", "version": "1.0.0"}, + }, + } + + response = await self.send_mcp_request(init_request) + if "error" in response: + raise RuntimeError(f"MCP initialization failed: {response['error']}") + + logger.info("MCP connection initialized successfully") + + async def list_available_tools(self) -> List[Dict[str, Any]]: + """List available tools from the MCP server.""" + list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}} + + response = await self.send_mcp_request(list_request) + if "error" in response: + raise RuntimeError(f"Failed to list tools: {response['error']}") + + return response.get("result", {}).get("tools", []) + + async def fetch_slack_messages( + self, channel: Optional[str] = None, limit: int = 100 + ) -> List[Dict[str, Any]]: + """ + Fetch Slack messages using MCP tools. + + Args: + channel: Optional channel name to filter messages + limit: Maximum number of messages to fetch + + Returns: + List of message dictionaries + """ + # This is a generic implementation - specific MCP servers may have different tool names + # Common tool names might be: 'get_messages', 'list_messages', 'fetch_channel_history' + + tools = await self.list_available_tools() + message_tool = None + + # Look for a tool that can fetch messages + for tool in tools: + tool_name = tool.get("name", "").lower() + if any( + keyword in tool_name + for keyword in ["message", "history", "channel", "conversation"] + ): + message_tool = tool + break + + if not message_tool: + raise RuntimeError("No message fetching tool found in MCP server") + + # Prepare tool call parameters + tool_params = {"limit": limit} + if channel: + # Try common parameter names for channel specification + for param_name in ["channel", "channel_id", "channel_name"]: + tool_params[param_name] = channel + break + + fetch_request = { + "jsonrpc": "2.0", + "id": 3, + "method": "tools/call", + "params": {"name": message_tool["name"], "arguments": tool_params}, + } + + response = await self.send_mcp_request(fetch_request) + if "error" in response: + raise RuntimeError(f"Failed to fetch messages: {response['error']}") + + # Extract messages from response - format may vary by MCP server + result = response.get("result", {}) + if "content" in result and isinstance(result["content"], list): + # Some MCP servers return content as a list + content = result["content"][0] if result["content"] else {} + if "text" in content: + try: + messages = json.loads(content["text"]) + except json.JSONDecodeError: + # If not JSON, treat as plain text + messages = [{"text": content["text"], "channel": channel or "unknown"}] + else: + messages = result["content"] + else: + # Direct message format + messages = result.get("messages", [result]) + + return messages if isinstance(messages, list) else [messages] + + def _format_message(self, message: Dict[str, Any]) -> str: + """Format a single message for indexing.""" + text = message.get("text", "") + user = message.get("user", message.get("username", "Unknown")) + channel = message.get("channel", message.get("channel_name", "Unknown")) + timestamp = message.get("ts", message.get("timestamp", "")) + + # Format timestamp if available + formatted_time = "" + if timestamp: + try: + import datetime + + if isinstance(timestamp, str) and "." in timestamp: + dt = datetime.datetime.fromtimestamp(float(timestamp)) + formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S") + elif isinstance(timestamp, (int, float)): + dt = datetime.datetime.fromtimestamp(timestamp) + formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S") + else: + formatted_time = str(timestamp) + except (ValueError, TypeError): + formatted_time = str(timestamp) + + # Build formatted message + parts = [] + if channel: + parts.append(f"Channel: #{channel}") + if user: + parts.append(f"User: {user}") + if formatted_time: + parts.append(f"Time: {formatted_time}") + if text: + parts.append(f"Message: {text}") + + return "\n".join(parts) + + def _create_concatenated_content(self, messages: List[Dict[str, Any]], channel: str) -> str: + """Create concatenated content from multiple messages in a channel.""" + if not messages: + return "" + + # Sort messages by timestamp if available + try: + messages.sort(key=lambda x: float(x.get("ts", x.get("timestamp", 0)))) + except (ValueError, TypeError): + pass # Keep original order if timestamps aren't numeric + + # Limit messages per conversation + if len(messages) > self.max_messages_per_conversation: + messages = messages[-self.max_messages_per_conversation :] + + # Create header + content_parts = [ + f"Slack Channel: #{channel}", + f"Message Count: {len(messages)}", + f"Workspace: {self.workspace_name or 'Unknown'}", + "=" * 50, + "", + ] + + # Add messages + for message in messages: + formatted_msg = self._format_message(message) + if formatted_msg.strip(): + content_parts.append(formatted_msg) + content_parts.append("-" * 30) + content_parts.append("") + + return "\n".join(content_parts) + + async def read_slack_data(self, channels: Optional[List[str]] = None) -> List[str]: + """ + Read Slack data and return formatted text chunks. + + Args: + channels: Optional list of channel names to fetch. If None, fetches from all available channels. + + Returns: + List of formatted text chunks ready for LEANN indexing + """ + try: + await self.start_mcp_server() + await self.initialize_mcp_connection() + + all_texts = [] + + if channels: + # Fetch specific channels + for channel in channels: + try: + messages = await self.fetch_slack_messages(channel=channel, limit=1000) + if messages: + if self.concatenate_conversations: + text_content = self._create_concatenated_content(messages, channel) + if text_content.strip(): + all_texts.append(text_content) + else: + # Process individual messages + for message in messages: + formatted_msg = self._format_message(message) + if formatted_msg.strip(): + all_texts.append(formatted_msg) + except Exception as e: + logger.warning(f"Failed to fetch messages from channel {channel}: {e}") + continue + else: + # Fetch from all available channels/conversations + # This is a simplified approach - real implementation would need to + # discover available channels first + try: + messages = await self.fetch_slack_messages(limit=1000) + if messages: + # Group messages by channel if concatenating + if self.concatenate_conversations: + channel_messages = {} + for message in messages: + channel = message.get( + "channel", message.get("channel_name", "general") + ) + if channel not in channel_messages: + channel_messages[channel] = [] + channel_messages[channel].append(message) + + # Create concatenated content for each channel + for channel, msgs in channel_messages.items(): + text_content = self._create_concatenated_content(msgs, channel) + if text_content.strip(): + all_texts.append(text_content) + else: + # Process individual messages + for message in messages: + formatted_msg = self._format_message(message) + if formatted_msg.strip(): + all_texts.append(formatted_msg) + except Exception as e: + logger.error(f"Failed to fetch messages: {e}") + + return all_texts + + finally: + await self.stop_mcp_server() + + async def __aenter__(self): + """Async context manager entry.""" + await self.start_mcp_server() + await self.initialize_mcp_connection() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.stop_mcp_server() diff --git a/apps/slack_rag.py b/apps/slack_rag.py new file mode 100644 index 0000000..1169c88 --- /dev/null +++ b/apps/slack_rag.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +""" +Slack RAG Application with MCP Support + +This application enables RAG (Retrieval-Augmented Generation) on Slack messages +by connecting to Slack MCP servers to fetch live data and index it in LEANN. + +Usage: + python -m apps.slack_rag --mcp-server "slack-mcp-server" --query "What did the team discuss about the project?" +""" + +import argparse +import asyncio +from typing import List + +from apps.base_rag_example import BaseRAGExample +from apps.slack_data.slack_mcp_reader import SlackMCPReader + + +class SlackMCPRAG(BaseRAGExample): + """ + RAG application for Slack messages via MCP servers. + + This class provides a complete RAG pipeline for Slack data, including + MCP server connection, data fetching, indexing, and interactive chat. + """ + + def __init__(self): + super().__init__() + self.default_index_name = "slack_messages" + + def _add_specific_arguments(self, parser: argparse.ArgumentParser): + """Add Slack MCP-specific arguments.""" + parser.add_argument( + "--mcp-server", + type=str, + required=True, + help="Command to start the Slack MCP server (e.g., 'slack-mcp-server' or 'npx slack-mcp-server')", + ) + + parser.add_argument( + "--workspace-name", + type=str, + help="Slack workspace name for better organization and filtering", + ) + + parser.add_argument( + "--channels", + nargs="+", + help="Specific Slack channels to index (e.g., general random). If not specified, fetches from all available channels", + ) + + parser.add_argument( + "--concatenate-conversations", + action="store_true", + default=True, + help="Group messages by channel/thread for better context (default: True)", + ) + + parser.add_argument( + "--no-concatenate-conversations", + action="store_true", + help="Process individual messages instead of grouping by channel", + ) + + parser.add_argument( + "--max-messages-per-channel", + type=int, + default=100, + help="Maximum number of messages to include per channel (default: 100)", + ) + + parser.add_argument( + "--test-connection", + action="store_true", + help="Test MCP server connection and list available tools without indexing", + ) + + async def test_mcp_connection(self, args) -> bool: + """Test the MCP server connection and display available tools.""" + print(f"Testing connection to MCP server: {args.mcp_server}") + + try: + reader = SlackMCPReader( + mcp_server_command=args.mcp_server, + workspace_name=args.workspace_name, + concatenate_conversations=not args.no_concatenate_conversations, + max_messages_per_conversation=args.max_messages_per_channel, + ) + + async with reader: + tools = await reader.list_available_tools() + + print("\nโœ… Successfully connected to MCP server!") + print(f"Available tools ({len(tools)}):") + + for i, tool in enumerate(tools, 1): + name = tool.get("name", "Unknown") + description = tool.get("description", "No description available") + print(f"\n{i}. {name}") + print( + f" Description: {description[:100]}{'...' if len(description) > 100 else ''}" + ) + + # Show input schema if available + schema = tool.get("inputSchema", {}) + if schema.get("properties"): + props = list(schema["properties"].keys())[:3] # Show first 3 properties + print( + f" Parameters: {', '.join(props)}{'...' if len(schema['properties']) > 3 else ''}" + ) + + return True + + except Exception as e: + print(f"\nโŒ Failed to connect to MCP server: {e}") + print("\nTroubleshooting tips:") + print("1. Make sure the MCP server is installed and accessible") + print("2. Check if the server command is correct") + print("3. Ensure you have proper authentication/credentials configured") + print("4. Try running the MCP server command directly to test it") + return False + + async def load_data(self, args) -> List[str]: + """Load Slack messages via MCP server.""" + print(f"Connecting to Slack MCP server: {args.mcp_server}") + + if args.workspace_name: + print(f"Workspace: {args.workspace_name}") + + if args.channels: + print(f"Channels: {', '.join(args.channels)}") + else: + print("Fetching from all available channels") + + concatenate = not args.no_concatenate_conversations + print( + f"Processing mode: {'Concatenated conversations' if concatenate else 'Individual messages'}" + ) + + try: + reader = SlackMCPReader( + mcp_server_command=args.mcp_server, + workspace_name=args.workspace_name, + concatenate_conversations=concatenate, + max_messages_per_conversation=args.max_messages_per_channel, + ) + + texts = await reader.read_slack_data(channels=args.channels) + + if not texts: + print("โŒ No messages found! This could mean:") + print("- The MCP server couldn't fetch messages") + print("- The specified channels don't exist or are empty") + print("- Authentication issues with the Slack workspace") + return [] + + print(f"โœ… Successfully loaded {len(texts)} text chunks from Slack") + + # Show sample of what was loaded + if texts: + sample_text = texts[0][:200] + "..." if len(texts[0]) > 200 else texts[0] + print("\nSample content:") + print("-" * 40) + print(sample_text) + print("-" * 40) + + return texts + + except Exception as e: + print(f"โŒ Error loading Slack data: {e}") + print("\nThis might be due to:") + print("- MCP server connection issues") + print("- Authentication problems") + print("- Network connectivity issues") + print("- Incorrect channel names") + raise + + async def run(self): + """Main entry point with MCP connection testing.""" + args = self.parser.parse_args() + + # Test connection if requested + if args.test_connection: + success = await self.test_mcp_connection(args) + if not success: + return + print( + "\n๐ŸŽ‰ MCP server is working! You can now run without --test-connection to start indexing." + ) + return + + # Run the standard RAG pipeline + await super().run() + + +async def main(): + """Main entry point for the Slack MCP RAG application.""" + app = SlackMCPRAG() + await app.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/apps/twitter_data/__init__.py b/apps/twitter_data/__init__.py new file mode 100644 index 0000000..e0104f3 --- /dev/null +++ b/apps/twitter_data/__init__.py @@ -0,0 +1 @@ +# Twitter MCP data integration for LEANN diff --git a/apps/twitter_data/twitter_mcp_reader.py b/apps/twitter_data/twitter_mcp_reader.py new file mode 100644 index 0000000..695be50 --- /dev/null +++ b/apps/twitter_data/twitter_mcp_reader.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +""" +Twitter MCP Reader for LEANN + +This module provides functionality to connect to Twitter MCP servers and fetch bookmark data +for indexing in LEANN. It supports various Twitter MCP server implementations and provides +flexible bookmark processing options. +""" + +import asyncio +import json +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +class TwitterMCPReader: + """ + Reader for Twitter bookmark data via MCP (Model Context Protocol) servers. + + This class connects to Twitter MCP servers to fetch bookmark data and convert it + into a format suitable for LEANN indexing. + """ + + def __init__( + self, + mcp_server_command: str, + username: Optional[str] = None, + include_tweet_content: bool = True, + include_metadata: bool = True, + max_bookmarks: int = 1000, + ): + """ + Initialize the Twitter MCP Reader. + + Args: + mcp_server_command: Command to start the MCP server (e.g., 'twitter-mcp-server') + username: Optional Twitter username to filter bookmarks + include_tweet_content: Whether to include full tweet content + include_metadata: Whether to include tweet metadata (likes, retweets, etc.) + max_bookmarks: Maximum number of bookmarks to fetch + """ + self.mcp_server_command = mcp_server_command + self.username = username + self.include_tweet_content = include_tweet_content + self.include_metadata = include_metadata + self.max_bookmarks = max_bookmarks + self.mcp_process = None + + async def start_mcp_server(self): + """Start the MCP server process.""" + try: + self.mcp_process = await asyncio.create_subprocess_exec( + *self.mcp_server_command.split(), + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + logger.info(f"Started MCP server: {self.mcp_server_command}") + except Exception as e: + logger.error(f"Failed to start MCP server: {e}") + raise + + async def stop_mcp_server(self): + """Stop the MCP server process.""" + if self.mcp_process: + self.mcp_process.terminate() + await self.mcp_process.wait() + logger.info("Stopped MCP server") + + async def send_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Send a request to the MCP server and get response.""" + if not self.mcp_process: + raise RuntimeError("MCP server not started") + + request_json = json.dumps(request) + "\n" + self.mcp_process.stdin.write(request_json.encode()) + await self.mcp_process.stdin.drain() + + response_line = await self.mcp_process.stdout.readline() + if not response_line: + raise RuntimeError("No response from MCP server") + + return json.loads(response_line.decode().strip()) + + async def initialize_mcp_connection(self): + """Initialize the MCP connection.""" + init_request = { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "leann-twitter-reader", "version": "1.0.0"}, + }, + } + + response = await self.send_mcp_request(init_request) + if "error" in response: + raise RuntimeError(f"MCP initialization failed: {response['error']}") + + logger.info("MCP connection initialized successfully") + + async def list_available_tools(self) -> List[Dict[str, Any]]: + """List available tools from the MCP server.""" + list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}} + + response = await self.send_mcp_request(list_request) + if "error" in response: + raise RuntimeError(f"Failed to list tools: {response['error']}") + + return response.get("result", {}).get("tools", []) + + async def fetch_twitter_bookmarks(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: + """ + Fetch Twitter bookmarks using MCP tools. + + Args: + limit: Maximum number of bookmarks to fetch + + Returns: + List of bookmark dictionaries + """ + tools = await self.list_available_tools() + bookmark_tool = None + + # Look for a tool that can fetch bookmarks + for tool in tools: + tool_name = tool.get("name", "").lower() + if any(keyword in tool_name for keyword in ["bookmark", "saved", "favorite"]): + bookmark_tool = tool + break + + if not bookmark_tool: + raise RuntimeError("No bookmark fetching tool found in MCP server") + + # Prepare tool call parameters + tool_params = {} + if limit or self.max_bookmarks: + tool_params["limit"] = limit or self.max_bookmarks + if self.username: + tool_params["username"] = self.username + + fetch_request = { + "jsonrpc": "2.0", + "id": 3, + "method": "tools/call", + "params": {"name": bookmark_tool["name"], "arguments": tool_params}, + } + + response = await self.send_mcp_request(fetch_request) + if "error" in response: + raise RuntimeError(f"Failed to fetch bookmarks: {response['error']}") + + # Extract bookmarks from response + result = response.get("result", {}) + if "content" in result and isinstance(result["content"], list): + content = result["content"][0] if result["content"] else {} + if "text" in content: + try: + bookmarks = json.loads(content["text"]) + except json.JSONDecodeError: + # If not JSON, treat as plain text + bookmarks = [{"text": content["text"], "source": "twitter"}] + else: + bookmarks = result["content"] + else: + bookmarks = result.get("bookmarks", result.get("tweets", [result])) + + return bookmarks if isinstance(bookmarks, list) else [bookmarks] + + def _format_bookmark(self, bookmark: Dict[str, Any]) -> str: + """Format a single bookmark for indexing.""" + # Extract tweet information + text = bookmark.get("text", bookmark.get("content", "")) + author = bookmark.get( + "author", bookmark.get("username", bookmark.get("user", {}).get("username", "Unknown")) + ) + timestamp = bookmark.get("created_at", bookmark.get("timestamp", "")) + url = bookmark.get("url", bookmark.get("tweet_url", "")) + + # Extract metadata if available + likes = bookmark.get("likes", bookmark.get("favorite_count", 0)) + retweets = bookmark.get("retweets", bookmark.get("retweet_count", 0)) + replies = bookmark.get("replies", bookmark.get("reply_count", 0)) + + # Build formatted bookmark + parts = [] + + # Header + parts.append("=== Twitter Bookmark ===") + + if author: + parts.append(f"Author: @{author}") + + if timestamp: + # Format timestamp if it's a standard format + try: + import datetime + + if "T" in str(timestamp): # ISO format + dt = datetime.datetime.fromisoformat(timestamp.replace("Z", "+00:00")) + formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S") + else: + formatted_time = str(timestamp) + parts.append(f"Date: {formatted_time}") + except (ValueError, TypeError): + parts.append(f"Date: {timestamp}") + + if url: + parts.append(f"URL: {url}") + + # Tweet content + if text and self.include_tweet_content: + parts.append("") + parts.append("Content:") + parts.append(text) + + # Metadata + if self.include_metadata and any([likes, retweets, replies]): + parts.append("") + parts.append("Engagement:") + if likes: + parts.append(f" Likes: {likes}") + if retweets: + parts.append(f" Retweets: {retweets}") + if replies: + parts.append(f" Replies: {replies}") + + # Extract hashtags and mentions if available + hashtags = bookmark.get("hashtags", []) + mentions = bookmark.get("mentions", []) + + if hashtags or mentions: + parts.append("") + if hashtags: + parts.append(f"Hashtags: {', '.join(hashtags)}") + if mentions: + parts.append(f"Mentions: {', '.join(mentions)}") + + return "\n".join(parts) + + async def read_twitter_bookmarks(self) -> List[str]: + """ + Read Twitter bookmark data and return formatted text chunks. + + Returns: + List of formatted text chunks ready for LEANN indexing + """ + try: + await self.start_mcp_server() + await self.initialize_mcp_connection() + + print(f"Fetching up to {self.max_bookmarks} bookmarks...") + if self.username: + print(f"Filtering for user: @{self.username}") + + bookmarks = await self.fetch_twitter_bookmarks() + + if not bookmarks: + print("No bookmarks found") + return [] + + print(f"Processing {len(bookmarks)} bookmarks...") + + all_texts = [] + processed_count = 0 + + for bookmark in bookmarks: + try: + formatted_bookmark = self._format_bookmark(bookmark) + if formatted_bookmark.strip(): + all_texts.append(formatted_bookmark) + processed_count += 1 + except Exception as e: + logger.warning(f"Failed to format bookmark: {e}") + continue + + print(f"Successfully processed {processed_count} bookmarks") + return all_texts + + finally: + await self.stop_mcp_server() + + async def __aenter__(self): + """Async context manager entry.""" + await self.start_mcp_server() + await self.initialize_mcp_connection() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.stop_mcp_server() diff --git a/apps/twitter_rag.py b/apps/twitter_rag.py new file mode 100644 index 0000000..bc34135 --- /dev/null +++ b/apps/twitter_rag.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +""" +Twitter RAG Application with MCP Support + +This application enables RAG (Retrieval-Augmented Generation) on Twitter bookmarks +by connecting to Twitter MCP servers to fetch live data and index it in LEANN. + +Usage: + python -m apps.twitter_rag --mcp-server "twitter-mcp-server" --query "What articles did I bookmark about AI?" +""" + +import argparse +import asyncio +from pathlib import Path +from typing import List + +from apps.base_rag_example import BaseRAGExample +from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader + + +class TwitterMCPRAG(BaseRAGExample): + """ + RAG application for Twitter bookmarks via MCP servers. + + This class provides a complete RAG pipeline for Twitter bookmark data, including + MCP server connection, data fetching, indexing, and interactive chat. + """ + + def __init__(self): + super().__init__() + self.default_index_name = "twitter_bookmarks" + + def _add_specific_arguments(self, parser: argparse.ArgumentParser): + """Add Twitter MCP-specific arguments.""" + parser.add_argument( + "--mcp-server", + type=str, + required=True, + help="Command to start the Twitter MCP server (e.g., 'twitter-mcp-server' or 'npx twitter-mcp-server')" + ) + + parser.add_argument( + "--username", + type=str, + help="Twitter username to filter bookmarks (without @)" + ) + + parser.add_argument( + "--max-bookmarks", + type=int, + default=1000, + help="Maximum number of bookmarks to fetch (default: 1000)" + ) + + parser.add_argument( + "--no-tweet-content", + action="store_true", + help="Exclude tweet content, only include metadata" + ) + + parser.add_argument( + "--no-metadata", + action="store_true", + help="Exclude engagement metadata (likes, retweets, etc.)" + ) + + parser.add_argument( + "--test-connection", + action="store_true", + help="Test MCP server connection and list available tools without indexing" + ) + + async def test_mcp_connection(self, args) -> bool: + """Test the MCP server connection and display available tools.""" + print(f"Testing connection to MCP server: {args.mcp_server}") + + try: + reader = TwitterMCPReader( + mcp_server_command=args.mcp_server, + username=args.username, + include_tweet_content=not args.no_tweet_content, + include_metadata=not args.no_metadata, + max_bookmarks=args.max_bookmarks, + ) + + async with reader: + tools = await reader.list_available_tools() + + print(f"\nโœ… Successfully connected to MCP server!") + print(f"Available tools ({len(tools)}):") + + for i, tool in enumerate(tools, 1): + name = tool.get("name", "Unknown") + description = tool.get("description", "No description available") + print(f"\n{i}. {name}") + print(f" Description: {description[:100]}{'...' if len(description) > 100 else ''}") + + # Show input schema if available + schema = tool.get("inputSchema", {}) + if schema.get("properties"): + props = list(schema["properties"].keys())[:3] # Show first 3 properties + print(f" Parameters: {', '.join(props)}{'...' if len(schema['properties']) > 3 else ''}") + + return True + + except Exception as e: + print(f"\nโŒ Failed to connect to MCP server: {e}") + print("\nTroubleshooting tips:") + print("1. Make sure the Twitter MCP server is installed and accessible") + print("2. Check if the server command is correct") + print("3. Ensure you have proper Twitter API credentials configured") + print("4. Verify your Twitter account has bookmarks to fetch") + print("5. Try running the MCP server command directly to test it") + return False + + async def load_data(self, args) -> List[str]: + """Load Twitter bookmarks via MCP server.""" + print(f"Connecting to Twitter MCP server: {args.mcp_server}") + + if args.username: + print(f"Username filter: @{args.username}") + + print(f"Max bookmarks: {args.max_bookmarks}") + print(f"Include tweet content: {not args.no_tweet_content}") + print(f"Include metadata: {not args.no_metadata}") + + try: + reader = TwitterMCPReader( + mcp_server_command=args.mcp_server, + username=args.username, + include_tweet_content=not args.no_tweet_content, + include_metadata=not args.no_metadata, + max_bookmarks=args.max_bookmarks, + ) + + texts = await reader.read_twitter_bookmarks() + + if not texts: + print("โŒ No bookmarks found! This could mean:") + print("- You don't have any bookmarks on Twitter") + print("- The MCP server couldn't access your bookmarks") + print("- Authentication issues with Twitter API") + print("- The username filter didn't match any bookmarks") + return [] + + print(f"โœ… Successfully loaded {len(texts)} bookmarks from Twitter") + + # Show sample of what was loaded + if texts: + sample_text = texts[0][:300] + "..." if len(texts[0]) > 300 else texts[0] + print(f"\nSample bookmark:") + print("-" * 50) + print(sample_text) + print("-" * 50) + + return texts + + except Exception as e: + print(f"โŒ Error loading Twitter bookmarks: {e}") + print("\nThis might be due to:") + print("- MCP server connection issues") + print("- Twitter API authentication problems") + print("- Network connectivity issues") + print("- Rate limiting from Twitter API") + raise + + async def run(self): + """Main entry point with MCP connection testing.""" + args = self.parser.parse_args() + + # Test connection if requested + if args.test_connection: + success = await self.test_mcp_connection(args) + if not success: + return + print(f"\n๐ŸŽ‰ MCP server is working! You can now run without --test-connection to start indexing.") + return + + # Run the standard RAG pipeline + await super().run() + + +async def main(): + """Main entry point for the Twitter MCP RAG application.""" + app = TwitterMCPRAG() + await app.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/mcp_integration_demo.py b/examples/mcp_integration_demo.py new file mode 100644 index 0000000..98f25cc --- /dev/null +++ b/examples/mcp_integration_demo.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +""" +MCP Integration Examples for LEANN + +This script demonstrates how to use LEANN with different MCP servers for +RAG on various platforms like Slack and Twitter. + +Examples: +1. Slack message RAG via MCP +2. Twitter bookmark RAG via MCP +3. Testing MCP server connections +""" + +import asyncio +import sys +from pathlib import Path + +# Add the parent directory to the path so we can import from apps +sys.path.append(str(Path(__file__).parent.parent)) + +from apps.slack_rag import SlackMCPRAG +from apps.twitter_rag import TwitterMCPRAG + + +async def demo_slack_mcp(): + """Demonstrate Slack MCP integration.""" + print("=" * 60) + print("๐Ÿ”ฅ Slack MCP RAG Demo") + print("=" * 60) + + print("\n1. Testing Slack MCP server connection...") + + # This would typically use a real MCP server command + # For demo purposes, we show what the command would look like + slack_app = SlackMCPRAG() + + # Simulate command line arguments for testing + class MockArgs: + mcp_server = "slack-mcp-server" # This would be the actual MCP server command + workspace_name = "my-workspace" + channels = ["general", "random", "dev-team"] + no_concatenate_conversations = False + max_messages_per_channel = 50 + test_connection = True + + print(f"MCP Server Command: {MockArgs.mcp_server}") + print(f"Workspace: {MockArgs.workspace_name}") + print(f"Channels: {', '.join(MockArgs.channels)}") + + # In a real scenario, you would run: + # success = await slack_app.test_mcp_connection(MockArgs) + + print("\n๐Ÿ“ Example usage:") + print("python -m apps.slack_rag \\") + print(" --mcp-server 'slack-mcp-server' \\") + print(" --workspace-name 'my-team' \\") + print(" --channels general dev-team \\") + print(" --test-connection") + + print("\n๐Ÿ” After indexing, you could query:") + print("- 'What did the team discuss about the project deadline?'") + print("- 'Find messages about the new feature launch'") + print("- 'Show me conversations about budget planning'") + + +async def demo_twitter_mcp(): + """Demonstrate Twitter MCP integration.""" + print("\n" + "=" * 60) + print("๐Ÿฆ Twitter MCP RAG Demo") + print("=" * 60) + + print("\n1. Testing Twitter MCP server connection...") + + twitter_app = TwitterMCPRAG() + + class MockArgs: + mcp_server = "twitter-mcp-server" + username = None # Fetch all bookmarks + max_bookmarks = 500 + no_tweet_content = False + no_metadata = False + test_connection = True + + print(f"MCP Server Command: {MockArgs.mcp_server}") + print(f"Max Bookmarks: {MockArgs.max_bookmarks}") + print(f"Include Content: {not MockArgs.no_tweet_content}") + print(f"Include Metadata: {not MockArgs.no_metadata}") + + print("\n๐Ÿ“ Example usage:") + print("python -m apps.twitter_rag \\") + print(" --mcp-server 'twitter-mcp-server' \\") + print(" --max-bookmarks 1000 \\") + print(" --test-connection") + + print("\n๐Ÿ” After indexing, you could query:") + print("- 'What AI articles did I bookmark last month?'") + print("- 'Find tweets about machine learning techniques'") + print("- 'Show me bookmarked threads about startup advice'") + + +async def show_mcp_server_setup(): + """Show how to set up MCP servers.""" + print("\n" + "=" * 60) + print("โš™๏ธ MCP Server Setup Guide") + print("=" * 60) + + print("\n๐Ÿ”ง Setting up Slack MCP Server:") + print("1. Install a Slack MCP server (example commands):") + print(" npm install -g slack-mcp-server") + print(" # OR") + print(" pip install slack-mcp-server") + + print("\n2. Configure Slack credentials:") + print(" export SLACK_BOT_TOKEN='xoxb-your-bot-token'") + print(" export SLACK_APP_TOKEN='xapp-your-app-token'") + + print("\n3. Test the server:") + print(" slack-mcp-server --help") + + print("\n๐Ÿ”ง Setting up Twitter MCP Server:") + print("1. Install a Twitter MCP server:") + print(" npm install -g twitter-mcp-server") + print(" # OR") + print(" pip install twitter-mcp-server") + + print("\n2. Configure Twitter API credentials:") + print(" export TWITTER_API_KEY='your-api-key'") + print(" export TWITTER_API_SECRET='your-api-secret'") + print(" export TWITTER_ACCESS_TOKEN='your-access-token'") + print(" export TWITTER_ACCESS_TOKEN_SECRET='your-access-token-secret'") + + print("\n3. Test the server:") + print(" twitter-mcp-server --help") + + +async def show_integration_benefits(): + """Show the benefits of MCP integration.""" + print("\n" + "=" * 60) + print("๐ŸŒŸ Benefits of MCP Integration") + print("=" * 60) + + benefits = [ + ("๐Ÿ”„ Live Data Access", "Fetch real-time data from platforms without manual exports"), + ("๐Ÿ”Œ Standardized Protocol", "Use any MCP-compatible server with minimal code changes"), + ("๐Ÿš€ Easy Extension", "Add new platforms by implementing MCP readers"), + ("๐Ÿ”’ Secure Access", "MCP servers handle authentication and API management"), + ("๐Ÿ“Š Rich Metadata", "Access full platform metadata (timestamps, engagement, etc.)"), + ("โšก Efficient Processing", "Stream data directly into LEANN without intermediate files"), + ] + + for title, description in benefits: + print(f"\n{title}") + print(f" {description}") + + +async def main(): + """Main demo function.""" + print("๐ŸŽฏ LEANN MCP Integration Examples") + print("This demo shows how to integrate LEANN with MCP servers for various platforms.") + + await demo_slack_mcp() + await demo_twitter_mcp() + await show_mcp_server_setup() + await show_integration_benefits() + + print("\n" + "=" * 60) + print("โœจ Next Steps") + print("=" * 60) + print("1. Install and configure MCP servers for your platforms") + print("2. Test connections using --test-connection flag") + print("3. Run indexing to build your RAG knowledge base") + print("4. Start querying your personal data!") + + print("\n๐Ÿ“š For more information:") + print("- Check the README for detailed setup instructions") + print("- Look at the apps/slack_rag.py and apps/twitter_rag.py for implementation details") + print("- Explore other MCP servers for additional platforms") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_mcp_integration.py b/tests/test_mcp_integration.py new file mode 100644 index 0000000..60ea263 --- /dev/null +++ b/tests/test_mcp_integration.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +""" +Test script for MCP integration implementations. + +This script tests the basic functionality of the MCP readers and RAG applications +without requiring actual MCP servers to be running. +""" + +import sys +import asyncio +from pathlib import Path + +# Add the parent directory to the path so we can import from apps +sys.path.append(str(Path(__file__).parent.parent)) + +from apps.slack_data.slack_mcp_reader import SlackMCPReader +from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader +from apps.slack_rag import SlackMCPRAG +from apps.twitter_rag import TwitterMCPRAG + + +def test_slack_reader_initialization(): + """Test that SlackMCPReader can be initialized with various parameters.""" + print("Testing SlackMCPReader initialization...") + + # Test basic initialization + reader = SlackMCPReader("slack-mcp-server") + assert reader.mcp_server_command == "slack-mcp-server" + assert reader.concatenate_conversations == True + assert reader.max_messages_per_conversation == 100 + + # Test with custom parameters + reader = SlackMCPReader( + "custom-slack-server", + workspace_name="test-workspace", + concatenate_conversations=False, + max_messages_per_conversation=50 + ) + assert reader.workspace_name == "test-workspace" + assert reader.concatenate_conversations == False + assert reader.max_messages_per_conversation == 50 + + print("โœ… SlackMCPReader initialization tests passed") + + +def test_twitter_reader_initialization(): + """Test that TwitterMCPReader can be initialized with various parameters.""" + print("Testing TwitterMCPReader initialization...") + + # Test basic initialization + reader = TwitterMCPReader("twitter-mcp-server") + assert reader.mcp_server_command == "twitter-mcp-server" + assert reader.include_tweet_content == True + assert reader.include_metadata == True + assert reader.max_bookmarks == 1000 + + # Test with custom parameters + reader = TwitterMCPReader( + "custom-twitter-server", + username="testuser", + include_tweet_content=False, + include_metadata=False, + max_bookmarks=500 + ) + assert reader.username == "testuser" + assert reader.include_tweet_content == False + assert reader.include_metadata == False + assert reader.max_bookmarks == 500 + + print("โœ… TwitterMCPReader initialization tests passed") + + +def test_slack_message_formatting(): + """Test Slack message formatting functionality.""" + print("Testing Slack message formatting...") + + reader = SlackMCPReader("slack-mcp-server") + + # Test basic message formatting + message = { + "text": "Hello, world!", + "user": "john_doe", + "channel": "general", + "ts": "1234567890.123456" + } + + formatted = reader._format_message(message) + assert "Channel: #general" in formatted + assert "User: john_doe" in formatted + assert "Message: Hello, world!" in formatted + assert "Time:" in formatted + + # Test with missing fields + message = {"text": "Simple message"} + formatted = reader._format_message(message) + assert "Message: Simple message" in formatted + + print("โœ… Slack message formatting tests passed") + + +def test_twitter_bookmark_formatting(): + """Test Twitter bookmark formatting functionality.""" + print("Testing Twitter bookmark formatting...") + + reader = TwitterMCPReader("twitter-mcp-server") + + # Test basic bookmark formatting + bookmark = { + "text": "This is a great article about AI!", + "author": "ai_researcher", + "created_at": "2024-01-01T12:00:00Z", + "url": "https://twitter.com/ai_researcher/status/123456789", + "likes": 42, + "retweets": 15 + } + + formatted = reader._format_bookmark(bookmark) + assert "=== Twitter Bookmark ===" in formatted + assert "Author: @ai_researcher" in formatted + assert "Content:" in formatted + assert "This is a great article about AI!" in formatted + assert "URL: https://twitter.com" in formatted + assert "Likes: 42" in formatted + assert "Retweets: 15" in formatted + + # Test with minimal data + bookmark = {"text": "Simple tweet"} + formatted = reader._format_bookmark(bookmark) + assert "=== Twitter Bookmark ===" in formatted + assert "Simple tweet" in formatted + + print("โœ… Twitter bookmark formatting tests passed") + + +def test_slack_rag_initialization(): + """Test that SlackMCPRAG can be initialized.""" + print("Testing SlackMCPRAG initialization...") + + app = SlackMCPRAG() + assert app.default_index_name == "slack_messages" + assert hasattr(app, 'parser') + + print("โœ… SlackMCPRAG initialization tests passed") + + +def test_twitter_rag_initialization(): + """Test that TwitterMCPRAG can be initialized.""" + print("Testing TwitterMCPRAG initialization...") + + app = TwitterMCPRAG() + assert app.default_index_name == "twitter_bookmarks" + assert hasattr(app, 'parser') + + print("โœ… TwitterMCPRAG initialization tests passed") + + +def test_concatenated_content_creation(): + """Test creation of concatenated content from multiple messages.""" + print("Testing concatenated content creation...") + + reader = SlackMCPReader("slack-mcp-server", workspace_name="test-workspace") + + messages = [ + {"text": "First message", "user": "alice", "ts": "1000"}, + {"text": "Second message", "user": "bob", "ts": "2000"}, + {"text": "Third message", "user": "charlie", "ts": "3000"} + ] + + content = reader._create_concatenated_content(messages, "general") + + assert "Slack Channel: #general" in content + assert "Message Count: 3" in content + assert "Workspace: test-workspace" in content + assert "First message" in content + assert "Second message" in content + assert "Third message" in content + + print("โœ… Concatenated content creation tests passed") + + +def main(): + """Run all tests.""" + print("๐Ÿงช Running MCP Integration Tests") + print("=" * 50) + + try: + test_slack_reader_initialization() + test_twitter_reader_initialization() + test_slack_message_formatting() + test_twitter_bookmark_formatting() + test_slack_rag_initialization() + test_twitter_rag_initialization() + test_concatenated_content_creation() + + print("\n" + "=" * 50) + print("๐ŸŽ‰ All tests passed! MCP integration is working correctly.") + print("\nNext steps:") + print("1. Install actual MCP servers for Slack and Twitter") + print("2. Configure API credentials") + print("3. Test with --test-connection flag") + print("4. Start indexing your live data!") + + except Exception as e: + print(f"\nโŒ Test failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_mcp_standalone.py b/tests/test_mcp_standalone.py new file mode 100644 index 0000000..3822746 --- /dev/null +++ b/tests/test_mcp_standalone.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +""" +Standalone test script for MCP integration implementations. + +This script tests the basic functionality of the MCP readers +without requiring LEANN core dependencies. +""" + +import sys +import json +from pathlib import Path + +# Add the parent directory to the path so we can import from apps +sys.path.append(str(Path(__file__).parent.parent)) + + +def test_slack_reader_basic(): + """Test basic SlackMCPReader functionality without async operations.""" + print("Testing SlackMCPReader basic functionality...") + + # Import and test initialization + from apps.slack_data.slack_mcp_reader import SlackMCPReader + + reader = SlackMCPReader("slack-mcp-server") + assert reader.mcp_server_command == "slack-mcp-server" + assert reader.concatenate_conversations == True + + # Test message formatting + message = { + "text": "Hello team! How's the project going?", + "user": "john_doe", + "channel": "general", + "ts": "1234567890.123456" + } + + formatted = reader._format_message(message) + assert "Channel: #general" in formatted + assert "User: john_doe" in formatted + assert "Message: Hello team!" in formatted + + # Test concatenated content creation + messages = [ + {"text": "First message", "user": "alice", "ts": "1000"}, + {"text": "Second message", "user": "bob", "ts": "2000"} + ] + + content = reader._create_concatenated_content(messages, "dev-team") + assert "Slack Channel: #dev-team" in content + assert "Message Count: 2" in content + assert "First message" in content + assert "Second message" in content + + print("โœ… SlackMCPReader basic tests passed") + + +def test_twitter_reader_basic(): + """Test basic TwitterMCPReader functionality.""" + print("Testing TwitterMCPReader basic functionality...") + + from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader + + reader = TwitterMCPReader("twitter-mcp-server") + assert reader.mcp_server_command == "twitter-mcp-server" + assert reader.include_tweet_content == True + assert reader.max_bookmarks == 1000 + + # Test bookmark formatting + bookmark = { + "text": "Amazing article about the future of AI! Must read for everyone interested in tech.", + "author": "tech_guru", + "created_at": "2024-01-15T14:30:00Z", + "url": "https://twitter.com/tech_guru/status/123456789", + "likes": 156, + "retweets": 42, + "replies": 23, + "hashtags": ["AI", "tech", "future"], + "mentions": ["@openai", "@anthropic"] + } + + formatted = reader._format_bookmark(bookmark) + assert "=== Twitter Bookmark ===" in formatted + assert "Author: @tech_guru" in formatted + assert "Amazing article about the future of AI!" in formatted + assert "Likes: 156" in formatted + assert "Retweets: 42" in formatted + assert "Hashtags: AI, tech, future" in formatted + assert "Mentions: @openai, @anthropic" in formatted + + # Test with minimal data + simple_bookmark = {"text": "Short tweet", "author": "user123"} + formatted_simple = reader._format_bookmark(simple_bookmark) + assert "=== Twitter Bookmark ===" in formatted_simple + assert "Short tweet" in formatted_simple + assert "Author: @user123" in formatted_simple + + print("โœ… TwitterMCPReader basic tests passed") + + +def test_mcp_request_format(): + """Test MCP request formatting.""" + print("Testing MCP request formatting...") + + # Test initialization request format + init_request = { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "leann-slack-reader", "version": "1.0.0"} + } + } + + # Verify it's valid JSON + json_str = json.dumps(init_request) + parsed = json.loads(json_str) + assert parsed["jsonrpc"] == "2.0" + assert parsed["method"] == "initialize" + assert parsed["params"]["protocolVersion"] == "2024-11-05" + + # Test tools/list request + list_request = { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/list", + "params": {} + } + + json_str = json.dumps(list_request) + parsed = json.loads(json_str) + assert parsed["method"] == "tools/list" + + print("โœ… MCP request formatting tests passed") + + +def test_data_processing(): + """Test data processing capabilities.""" + print("Testing data processing capabilities...") + + from apps.slack_data.slack_mcp_reader import SlackMCPReader + from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader + + # Test Slack message processing with various formats + slack_reader = SlackMCPReader("test-server") + + messages_with_timestamps = [ + {"text": "Meeting in 5 minutes", "user": "alice", "ts": "1000.123"}, + {"text": "On my way!", "user": "bob", "ts": "1001.456"}, + {"text": "Starting now", "user": "charlie", "ts": "1002.789"} + ] + + content = slack_reader._create_concatenated_content(messages_with_timestamps, "meetings") + assert "Meeting in 5 minutes" in content + assert "On my way!" in content + assert "Starting now" in content + + # Test Twitter bookmark processing with engagement data + twitter_reader = TwitterMCPReader("test-server", include_metadata=True) + + high_engagement_bookmark = { + "text": "Thread about startup lessons learned ๐Ÿงต", + "author": "startup_founder", + "likes": 1250, + "retweets": 340, + "replies": 89 + } + + formatted = twitter_reader._format_bookmark(high_engagement_bookmark) + assert "Thread about startup lessons learned" in formatted + assert "Likes: 1250" in formatted + assert "Retweets: 340" in formatted + assert "Replies: 89" in formatted + + # Test with metadata disabled + twitter_reader_no_meta = TwitterMCPReader("test-server", include_metadata=False) + formatted_no_meta = twitter_reader_no_meta._format_bookmark(high_engagement_bookmark) + assert "Thread about startup lessons learned" in formatted_no_meta + assert "Likes:" not in formatted_no_meta + assert "Retweets:" not in formatted_no_meta + + print("โœ… Data processing tests passed") + + +def main(): + """Run all standalone tests.""" + print("๐Ÿงช Running MCP Integration Standalone Tests") + print("=" * 60) + print("Testing core functionality without LEANN dependencies...") + print() + + try: + test_slack_reader_basic() + test_twitter_reader_basic() + test_mcp_request_format() + test_data_processing() + + print("\n" + "=" * 60) + print("๐ŸŽ‰ All standalone tests passed!") + print("\nโœจ MCP Integration Summary:") + print("- SlackMCPReader: Ready for Slack message processing") + print("- TwitterMCPReader: Ready for Twitter bookmark processing") + print("- MCP Protocol: Properly formatted JSON-RPC requests") + print("- Data Processing: Handles various message/bookmark formats") + + print("\n๐Ÿš€ Next Steps:") + print("1. Install MCP servers: npm install -g slack-mcp-server twitter-mcp-server") + print("2. Configure API credentials for Slack and Twitter") + print("3. Test connections: python -m apps.slack_rag --test-connection") + print("4. Start indexing live data from your platforms!") + + print("\n๐Ÿ“– Documentation:") + print("- Check README.md for detailed setup instructions") + print("- Run examples/mcp_integration_demo.py for usage examples") + print("- Explore apps/slack_rag.py and apps/twitter_rag.py for implementation details") + + except Exception as e: + print(f"\nโŒ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main()