- Implement SlackMCPReader for connecting to Slack MCP servers - Implement TwitterMCPReader for connecting to Twitter MCP servers - Add SlackRAG and TwitterRAG applications with full CLI support - Support live data fetching via Model Context Protocol (MCP) - Add comprehensive documentation and usage examples - Include connection testing capabilities with --test-connection flag - Add standalone tests for core functionality - Update README with detailed MCP integration guide Resolves #36
335 lines
13 KiB
Python
335 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Slack MCP Reader for LEANN
|
|
|
|
This module provides functionality to connect to Slack MCP servers and fetch message data
|
|
for indexing in LEANN. It supports various Slack MCP server implementations and provides
|
|
flexible message processing options.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SlackMCPReader:
|
|
"""
|
|
Reader for Slack data via MCP (Model Context Protocol) servers.
|
|
|
|
This class connects to Slack MCP servers to fetch message data and convert it
|
|
into a format suitable for LEANN indexing.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
mcp_server_command: str,
|
|
workspace_name: Optional[str] = None,
|
|
concatenate_conversations: bool = True,
|
|
max_messages_per_conversation: int = 100,
|
|
):
|
|
"""
|
|
Initialize the Slack MCP Reader.
|
|
|
|
Args:
|
|
mcp_server_command: Command to start the MCP server (e.g., 'slack-mcp-server')
|
|
workspace_name: Optional workspace name to filter messages
|
|
concatenate_conversations: Whether to group messages by channel/thread
|
|
max_messages_per_conversation: Maximum messages to include per conversation
|
|
"""
|
|
self.mcp_server_command = mcp_server_command
|
|
self.workspace_name = workspace_name
|
|
self.concatenate_conversations = concatenate_conversations
|
|
self.max_messages_per_conversation = max_messages_per_conversation
|
|
self.mcp_process = None
|
|
|
|
async def start_mcp_server(self):
|
|
"""Start the MCP server process."""
|
|
try:
|
|
self.mcp_process = await asyncio.create_subprocess_exec(
|
|
*self.mcp_server_command.split(),
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
logger.info(f"Started MCP server: {self.mcp_server_command}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start MCP server: {e}")
|
|
raise
|
|
|
|
async def stop_mcp_server(self):
|
|
"""Stop the MCP server process."""
|
|
if self.mcp_process:
|
|
self.mcp_process.terminate()
|
|
await self.mcp_process.wait()
|
|
logger.info("Stopped MCP server")
|
|
|
|
async def send_mcp_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Send a request to the MCP server and get response."""
|
|
if not self.mcp_process:
|
|
raise RuntimeError("MCP server not started")
|
|
|
|
request_json = json.dumps(request) + "\n"
|
|
self.mcp_process.stdin.write(request_json.encode())
|
|
await self.mcp_process.stdin.drain()
|
|
|
|
response_line = await self.mcp_process.stdout.readline()
|
|
if not response_line:
|
|
raise RuntimeError("No response from MCP server")
|
|
|
|
return json.loads(response_line.decode().strip())
|
|
|
|
async def initialize_mcp_connection(self):
|
|
"""Initialize the MCP connection."""
|
|
init_request = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "initialize",
|
|
"params": {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {},
|
|
"clientInfo": {"name": "leann-slack-reader", "version": "1.0.0"},
|
|
},
|
|
}
|
|
|
|
response = await self.send_mcp_request(init_request)
|
|
if "error" in response:
|
|
raise RuntimeError(f"MCP initialization failed: {response['error']}")
|
|
|
|
logger.info("MCP connection initialized successfully")
|
|
|
|
async def list_available_tools(self) -> List[Dict[str, Any]]:
|
|
"""List available tools from the MCP server."""
|
|
list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
|
|
|
|
response = await self.send_mcp_request(list_request)
|
|
if "error" in response:
|
|
raise RuntimeError(f"Failed to list tools: {response['error']}")
|
|
|
|
return response.get("result", {}).get("tools", [])
|
|
|
|
async def fetch_slack_messages(
|
|
self, channel: Optional[str] = None, limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fetch Slack messages using MCP tools.
|
|
|
|
Args:
|
|
channel: Optional channel name to filter messages
|
|
limit: Maximum number of messages to fetch
|
|
|
|
Returns:
|
|
List of message dictionaries
|
|
"""
|
|
# This is a generic implementation - specific MCP servers may have different tool names
|
|
# Common tool names might be: 'get_messages', 'list_messages', 'fetch_channel_history'
|
|
|
|
tools = await self.list_available_tools()
|
|
message_tool = None
|
|
|
|
# Look for a tool that can fetch messages
|
|
for tool in tools:
|
|
tool_name = tool.get("name", "").lower()
|
|
if any(
|
|
keyword in tool_name
|
|
for keyword in ["message", "history", "channel", "conversation"]
|
|
):
|
|
message_tool = tool
|
|
break
|
|
|
|
if not message_tool:
|
|
raise RuntimeError("No message fetching tool found in MCP server")
|
|
|
|
# Prepare tool call parameters
|
|
tool_params = {"limit": limit}
|
|
if channel:
|
|
# Try common parameter names for channel specification
|
|
for param_name in ["channel", "channel_id", "channel_name"]:
|
|
tool_params[param_name] = channel
|
|
break
|
|
|
|
fetch_request = {
|
|
"jsonrpc": "2.0",
|
|
"id": 3,
|
|
"method": "tools/call",
|
|
"params": {"name": message_tool["name"], "arguments": tool_params},
|
|
}
|
|
|
|
response = await self.send_mcp_request(fetch_request)
|
|
if "error" in response:
|
|
raise RuntimeError(f"Failed to fetch messages: {response['error']}")
|
|
|
|
# Extract messages from response - format may vary by MCP server
|
|
result = response.get("result", {})
|
|
if "content" in result and isinstance(result["content"], list):
|
|
# Some MCP servers return content as a list
|
|
content = result["content"][0] if result["content"] else {}
|
|
if "text" in content:
|
|
try:
|
|
messages = json.loads(content["text"])
|
|
except json.JSONDecodeError:
|
|
# If not JSON, treat as plain text
|
|
messages = [{"text": content["text"], "channel": channel or "unknown"}]
|
|
else:
|
|
messages = result["content"]
|
|
else:
|
|
# Direct message format
|
|
messages = result.get("messages", [result])
|
|
|
|
return messages if isinstance(messages, list) else [messages]
|
|
|
|
def _format_message(self, message: Dict[str, Any]) -> str:
|
|
"""Format a single message for indexing."""
|
|
text = message.get("text", "")
|
|
user = message.get("user", message.get("username", "Unknown"))
|
|
channel = message.get("channel", message.get("channel_name", "Unknown"))
|
|
timestamp = message.get("ts", message.get("timestamp", ""))
|
|
|
|
# Format timestamp if available
|
|
formatted_time = ""
|
|
if timestamp:
|
|
try:
|
|
import datetime
|
|
|
|
if isinstance(timestamp, str) and "." in timestamp:
|
|
dt = datetime.datetime.fromtimestamp(float(timestamp))
|
|
formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
elif isinstance(timestamp, (int, float)):
|
|
dt = datetime.datetime.fromtimestamp(timestamp)
|
|
formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
else:
|
|
formatted_time = str(timestamp)
|
|
except (ValueError, TypeError):
|
|
formatted_time = str(timestamp)
|
|
|
|
# Build formatted message
|
|
parts = []
|
|
if channel:
|
|
parts.append(f"Channel: #{channel}")
|
|
if user:
|
|
parts.append(f"User: {user}")
|
|
if formatted_time:
|
|
parts.append(f"Time: {formatted_time}")
|
|
if text:
|
|
parts.append(f"Message: {text}")
|
|
|
|
return "\n".join(parts)
|
|
|
|
def _create_concatenated_content(self, messages: List[Dict[str, Any]], channel: str) -> str:
|
|
"""Create concatenated content from multiple messages in a channel."""
|
|
if not messages:
|
|
return ""
|
|
|
|
# Sort messages by timestamp if available
|
|
try:
|
|
messages.sort(key=lambda x: float(x.get("ts", x.get("timestamp", 0))))
|
|
except (ValueError, TypeError):
|
|
pass # Keep original order if timestamps aren't numeric
|
|
|
|
# Limit messages per conversation
|
|
if len(messages) > self.max_messages_per_conversation:
|
|
messages = messages[-self.max_messages_per_conversation :]
|
|
|
|
# Create header
|
|
content_parts = [
|
|
f"Slack Channel: #{channel}",
|
|
f"Message Count: {len(messages)}",
|
|
f"Workspace: {self.workspace_name or 'Unknown'}",
|
|
"=" * 50,
|
|
"",
|
|
]
|
|
|
|
# Add messages
|
|
for message in messages:
|
|
formatted_msg = self._format_message(message)
|
|
if formatted_msg.strip():
|
|
content_parts.append(formatted_msg)
|
|
content_parts.append("-" * 30)
|
|
content_parts.append("")
|
|
|
|
return "\n".join(content_parts)
|
|
|
|
async def read_slack_data(self, channels: Optional[List[str]] = None) -> List[str]:
|
|
"""
|
|
Read Slack data and return formatted text chunks.
|
|
|
|
Args:
|
|
channels: Optional list of channel names to fetch. If None, fetches from all available channels.
|
|
|
|
Returns:
|
|
List of formatted text chunks ready for LEANN indexing
|
|
"""
|
|
try:
|
|
await self.start_mcp_server()
|
|
await self.initialize_mcp_connection()
|
|
|
|
all_texts = []
|
|
|
|
if channels:
|
|
# Fetch specific channels
|
|
for channel in channels:
|
|
try:
|
|
messages = await self.fetch_slack_messages(channel=channel, limit=1000)
|
|
if messages:
|
|
if self.concatenate_conversations:
|
|
text_content = self._create_concatenated_content(messages, channel)
|
|
if text_content.strip():
|
|
all_texts.append(text_content)
|
|
else:
|
|
# Process individual messages
|
|
for message in messages:
|
|
formatted_msg = self._format_message(message)
|
|
if formatted_msg.strip():
|
|
all_texts.append(formatted_msg)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch messages from channel {channel}: {e}")
|
|
continue
|
|
else:
|
|
# Fetch from all available channels/conversations
|
|
# This is a simplified approach - real implementation would need to
|
|
# discover available channels first
|
|
try:
|
|
messages = await self.fetch_slack_messages(limit=1000)
|
|
if messages:
|
|
# Group messages by channel if concatenating
|
|
if self.concatenate_conversations:
|
|
channel_messages = {}
|
|
for message in messages:
|
|
channel = message.get(
|
|
"channel", message.get("channel_name", "general")
|
|
)
|
|
if channel not in channel_messages:
|
|
channel_messages[channel] = []
|
|
channel_messages[channel].append(message)
|
|
|
|
# Create concatenated content for each channel
|
|
for channel, msgs in channel_messages.items():
|
|
text_content = self._create_concatenated_content(msgs, channel)
|
|
if text_content.strip():
|
|
all_texts.append(text_content)
|
|
else:
|
|
# Process individual messages
|
|
for message in messages:
|
|
formatted_msg = self._format_message(message)
|
|
if formatted_msg.strip():
|
|
all_texts.append(formatted_msg)
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch messages: {e}")
|
|
|
|
return all_texts
|
|
|
|
finally:
|
|
await self.stop_mcp_server()
|
|
|
|
async def __aenter__(self):
|
|
"""Async context manager entry."""
|
|
await self.start_mcp_server()
|
|
await self.initialize_mcp_connection()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""Async context manager exit."""
|
|
await self.stop_mcp_server()
|