Compare commits

..

1 Commits

Author SHA1 Message Date
aakash
9a9dc40f8f security: Enhance Hugging Face model loading security - resolves #136
BREAKING CHANGE: trust_remote_code now defaults to False for security

- Set trust_remote_code=False by default in HFChat class
- Add explicit trust_remote_code parameter to HFChat.__init__()
- Add security warning when trust_remote_code=True is used
- Update get_llm() function to support trust_remote_code parameter
- Update benchmark utilities (load_hf_model, load_vllm_model, load_qwen_vl_model)
- Add comprehensive documentation for security implications

Security Benefits:
- Prevents arbitrary code execution from compromised model repositories
- Requires explicit opt-in for models that need remote code execution
- Shows clear warnings when security is reduced
- Follows security-by-default principle

Migration Guide:
- Most users: No changes needed (more secure by default)
- Users with models requiring remote code: Add trust_remote_code=True explicitly
- Config users: Add 'trust_remote_code': true to LLM config if needed

Fixes #136
2025-10-07 01:24:44 -07:00
19 changed files with 26 additions and 2477 deletions

165
README.md
View File

@@ -8,12 +8,8 @@
<img src="https://img.shields.io/badge/Platform-Ubuntu%20%26%20Arch%20%26%20WSL%20%7C%20macOS%20(ARM64%2FIntel)-lightgrey" alt="Platform">
<img src="https://img.shields.io/badge/License-MIT-green.svg" alt="MIT License">
<img src="https://img.shields.io/badge/MCP-Native%20Integration-blue" alt="MCP Integration">
<a href="https://join.slack.com/t/leann-e2u9779/shared_invite/zt-3ckd2f6w1-OX08~NN4gkWhh10PRVBj1Q">
<img src="https://img.shields.io/badge/Slack-Join-4A154B?logo=slack&logoColor=white" alt="Join Slack">
</a>
<a href="assets/wechat_user_group.JPG" title="Join WeChat group">
<img src="https://img.shields.io/badge/WeChat-Join-2DC100?logo=wechat&logoColor=white" alt="Join WeChat group">
</a>
<a href="https://join.slack.com/t/leann-e2u9779/shared_invite/zt-3ckd2f6w1-OX08~NN4gkWhh10PRVBj1Q"><img src="https://img.shields.io/badge/Slack-Join-4A154B?logo=slack&logoColor=white" alt="Join Slack">
<a href="assets/wechat_user_group.JPG" title="Join WeChat group"><img src="https://img.shields.io/badge/WeChat-Join-2DC100?logo=wechat&logoColor=white" alt="Join WeChat group"></a>
</p>
<h2 align="center" tabindex="-1" class="heading-element" dir="auto">
@@ -24,7 +20,7 @@ LEANN is an innovative vector database that democratizes personal AI. Transform
LEANN achieves this through *graph-based selective recomputation* with *high-degree preserving pruning*, computing embeddings on-demand instead of storing them all. [Illustration Fig →](#-architecture--how-it-works) | [Paper →](https://arxiv.org/abs/2506.08276)
**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[live data](#mcp-integration-rag-on-live-data-from-any-platform)** ([Slack](#mcp-integration-rag-on-live-data-from-any-platform), [Twitter](#mcp-integration-rag-on-live-data-from-any-platform)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy.
**Ready to RAG Everything?** Transform your laptop into a personal AI assistant that can semantic search your **[file system](#-personal-data-manager-process-any-documents-pdf-txt-md)**, **[emails](#-your-personal-email-secretary-rag-on-apple-mail)**, **[browser history](#-time-machine-for-the-web-rag-your-entire-browser-history)**, **[chat history](#-wechat-detective-unlock-your-golden-memories)** ([WeChat](#-wechat-detective-unlock-your-golden-memories), [iMessage](#-imessage-history-your-personal-conversation-archive)), **[agent memory](#-chatgpt-chat-history-your-personal-ai-conversation-archive)** ([ChatGPT](#-chatgpt-chat-history-your-personal-ai-conversation-archive), [Claude](#-claude-chat-history-your-personal-ai-conversation-archive)), **[codebase](#-claude-code-integration-transform-your-development-workflow)**\* , or external knowledge bases (i.e., 60M documents) - all on your laptop, with zero cloud costs and complete privacy.
\* Claude Code only supports basic `grep`-style keyword search. **LEANN** is a drop-in **semantic search MCP service fully compatible with Claude Code**, unlocking intelligent retrieval without changing your workflow. 🔥 Check out [the easy setup →](packages/leann-mcp/README.md)
@@ -76,9 +72,8 @@ uv venv
source .venv/bin/activate
uv pip install leann
```
<!--
> Low-resource? See "Low-resource setups" in the [Configuration Guide](docs/configuration-guide.md#low-resource-setups). -->
> Low-resource? See Low-resource setups in the [Configuration Guide](docs/configuration-guide.md#low-resource-setups). -->
<details>
<summary>
@@ -181,7 +176,7 @@ response = chat.ask("How much storage does LEANN save?", top_k=1)
## RAG on Everything!
LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and **live data from any platform through MCP (Model Context Protocol) servers** - including Slack, Twitter, and more.
LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, ChatGPT conversations, Claude conversations, iMessage conversations, and more.
@@ -779,154 +774,10 @@ Once your iMessage conversations are indexed, you can search with queries like:
</details>
### MCP Integration: RAG on Live Data from Any Platform
Connect to live data sources through the Model Context Protocol (MCP). LEANN now supports real-time RAG on platforms like Slack, Twitter, and more through standardized MCP servers.
**Key Benefits:**
- **Live Data Access**: Fetch real-time data without manual exports
- **Standardized Protocol**: Use any MCP-compatible server
- **Easy Extension**: Add new platforms with minimal code
- **Secure Access**: MCP servers handle authentication
#### 💬 Slack Messages: Search Your Team Conversations
Transform your Slack workspace into a searchable knowledge base! Find discussions, decisions, and shared knowledge across all your channels.
```bash
# Test MCP server connection
python -m apps.slack_rag --mcp-server "slack-mcp-server" --test-connection
# Index and search Slack messages
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--workspace-name "my-team" \
--channels general dev-team random \
--query "What did we decide about the product launch?"
```
**📖 Comprehensive Setup Guide**: For detailed setup instructions, troubleshooting common issues (like "users cache is not ready yet"), and advanced configuration options, see our [**Slack Setup Guide**](docs/slack-setup-guide.md).
**Quick Setup:**
1. Install a Slack MCP server (e.g., `npm install -g slack-mcp-server`)
2. Create a Slack App and get API credentials (see detailed guide above)
3. Set environment variables:
```bash
export SLACK_BOT_TOKEN="xoxb-your-bot-token"
export SLACK_APP_TOKEN="xapp-your-app-token" # Optional
```
4. Test connection with `--test-connection` flag
**Arguments:**
- `--mcp-server`: Command to start the Slack MCP server
- `--workspace-name`: Slack workspace name for organization
- `--channels`: Specific channels to index (optional)
- `--concatenate-conversations`: Group messages by channel (default: true)
- `--max-messages-per-channel`: Limit messages per channel (default: 100)
- `--max-retries`: Maximum retries for cache sync issues (default: 5)
- `--retry-delay`: Initial delay between retries in seconds (default: 2.0)
#### 🐦 Twitter Bookmarks: Your Personal Tweet Library
Search through your Twitter bookmarks! Find that perfect article, thread, or insight you saved for later.
```bash
# Test MCP server connection
python -m apps.twitter_rag --mcp-server "twitter-mcp-server" --test-connection
# Index and search Twitter bookmarks
python -m apps.twitter_rag \
--mcp-server "twitter-mcp-server" \
--max-bookmarks 1000 \
--query "What AI articles did I bookmark about machine learning?"
```
**Setup Requirements:**
1. Install a Twitter MCP server (e.g., `npm install -g twitter-mcp-server`)
2. Get Twitter API credentials:
- Apply for a Twitter Developer Account at [developer.twitter.com](https://developer.twitter.com)
- Create a new app in the Twitter Developer Portal
- Generate API keys and access tokens with "Read" permissions
- For bookmarks access, you may need Twitter API v2 with appropriate scopes
```bash
export TWITTER_API_KEY="your-api-key"
export TWITTER_API_SECRET="your-api-secret"
export TWITTER_ACCESS_TOKEN="your-access-token"
export TWITTER_ACCESS_TOKEN_SECRET="your-access-token-secret"
```
3. Test connection with `--test-connection` flag
**Arguments:**
- `--mcp-server`: Command to start the Twitter MCP server
- `--username`: Filter bookmarks by username (optional)
- `--max-bookmarks`: Maximum bookmarks to fetch (default: 1000)
- `--no-tweet-content`: Exclude tweet content, only metadata
- `--no-metadata`: Exclude engagement metadata
</details>
<details>
<summary><strong>💡 Click to expand: Example queries you can try</strong></summary>
**Slack Queries:**
- "What did the team discuss about the project deadline?"
- "Find messages about the new feature launch"
- "Show me conversations about budget planning"
- "What decisions were made in the dev-team channel?"
**Twitter Queries:**
- "What AI articles did I bookmark last month?"
- "Find tweets about machine learning techniques"
- "Show me bookmarked threads about startup advice"
- "What Python tutorials did I save?"
</details>
<summary><strong>🔧 Using MCP with CLI Commands</strong></summary>
**Want to use MCP data with regular LEANN CLI?** You can combine MCP apps with CLI commands:
```bash
# Step 1: Use MCP app to fetch and index data
python -m apps.slack_rag --mcp-server "slack-mcp-server" --workspace-name "my-team"
# Step 2: The data is now indexed and available via CLI
leann search slack_messages "project deadline"
leann ask slack_messages "What decisions were made about the product launch?"
# Same for Twitter bookmarks
python -m apps.twitter_rag --mcp-server "twitter-mcp-server"
leann search twitter_bookmarks "machine learning articles"
```
**MCP vs Manual Export:**
- **MCP**: Live data, automatic updates, requires server setup
- **Manual Export**: One-time setup, works offline, requires manual data export
</details>
<details>
<summary><strong>🔧 Adding New MCP Platforms</strong></summary>
Want to add support for other platforms? LEANN's MCP integration is designed for easy extension:
1. **Find or create an MCP server** for your platform
2. **Create a reader class** following the pattern in `apps/slack_data/slack_mcp_reader.py`
3. **Create a RAG application** following the pattern in `apps/slack_rag.py`
4. **Test and contribute** back to the community!
**Popular MCP servers to explore:**
- GitHub repositories and issues
- Discord messages
- Notion pages
- Google Drive documents
- And many more in the MCP ecosystem!
</details>
### 🚀 Claude Code Integration: Transform Your Development Workflow!
<details>
<summary><strong>ASTAware Code Chunking</strong></summary>
<summary><strong>NEW!! ASTAware Code Chunking</strong></summary>
LEANN features intelligent code chunking that preserves semantic boundaries (functions, classes, methods) for Python, Java, C#, and TypeScript, improving code understanding compared to text-based chunking.
@@ -954,7 +805,7 @@ Try our fully agentic pipeline with auto query rewriting, semantic search planni
**🔥 Ready to supercharge your coding?** [Complete Setup Guide →](packages/leann-mcp/README.md)
## Command Line Interface
## 🖥️ Command Line Interface
LEANN includes a powerful CLI for document processing and search. Perfect for quick document indexing and interactive chat.
@@ -1196,7 +1047,7 @@ MIT License - see [LICENSE](LICENSE) for details.
Core Contributors: [Yichuan Wang](https://yichuan-w.github.io/) & [Zhifei Li](https://github.com/andylizf).
Active Contributors: [Gabriel Dehan](https://github.com/gabriel-dehan), [Aakash Suresh](https://github.com/ASuresh0524)
Active Contributors: [Gabriel Dehan](https://github.com/gabriel-dehan)
We welcome more contributors! Feel free to open issues or submit PRs.

View File

@@ -10,39 +10,9 @@ from typing import Any
import dotenv
from leann.api import LeannBuilder, LeannChat
# Optional import: older PyPI builds may not include interactive_utils
try:
from leann.interactive_utils import create_rag_session
except ImportError:
def create_rag_session(app_name: str, data_description: str):
class _SimpleSession:
def run_interactive_loop(self, handler):
print(f"Interactive session for {app_name}: {data_description}")
print("Interactive mode not available in this build")
return _SimpleSession()
from leann.interactive_utils import create_rag_session
from leann.registry import register_project_directory
# Optional import: older PyPI builds may not include settings
try:
from leann.settings import resolve_ollama_host, resolve_openai_api_key, resolve_openai_base_url
except ImportError:
# Minimal fallbacks if settings helpers are unavailable
import os
def resolve_ollama_host(value: str | None) -> str | None:
return value or os.getenv("LEANN_OLLAMA_HOST") or os.getenv("OLLAMA_HOST")
def resolve_openai_api_key(value: str | None) -> str | None:
return value or os.getenv("OPENAI_API_KEY")
def resolve_openai_base_url(value: str | None) -> str | None:
return value or os.getenv("OPENAI_BASE_URL")
from leann.settings import resolve_ollama_host, resolve_openai_api_key, resolve_openai_base_url
dotenv.load_dotenv()

View File

@@ -1 +0,0 @@
# Slack MCP data integration for LEANN

View File

@@ -1,510 +0,0 @@
#!/usr/bin/env python3
"""
Slack MCP Reader for LEANN
This module provides functionality to connect to Slack MCP servers and fetch message data
for indexing in LEANN. It supports various Slack MCP server implementations and provides
flexible message processing options.
"""
import asyncio
import json
import logging
from typing import Any, Optional
logger = logging.getLogger(__name__)
class SlackMCPReader:
"""
Reader for Slack data via MCP (Model Context Protocol) servers.
This class connects to Slack MCP servers to fetch message data and convert it
into a format suitable for LEANN indexing.
"""
def __init__(
self,
mcp_server_command: str,
workspace_name: Optional[str] = None,
concatenate_conversations: bool = True,
max_messages_per_conversation: int = 100,
max_retries: int = 5,
retry_delay: float = 2.0,
):
"""
Initialize the Slack MCP Reader.
Args:
mcp_server_command: Command to start the MCP server (e.g., 'slack-mcp-server')
workspace_name: Optional workspace name to filter messages
concatenate_conversations: Whether to group messages by channel/thread
max_messages_per_conversation: Maximum messages to include per conversation
max_retries: Maximum number of retries for failed operations
retry_delay: Initial delay between retries in seconds
"""
self.mcp_server_command = mcp_server_command
self.workspace_name = workspace_name
self.concatenate_conversations = concatenate_conversations
self.max_messages_per_conversation = max_messages_per_conversation
self.max_retries = max_retries
self.retry_delay = retry_delay
self.mcp_process = None
async def start_mcp_server(self):
"""Start the MCP server process."""
try:
self.mcp_process = await asyncio.create_subprocess_exec(
*self.mcp_server_command.split(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
logger.info(f"Started MCP server: {self.mcp_server_command}")
except Exception as e:
logger.error(f"Failed to start MCP server: {e}")
raise
async def stop_mcp_server(self):
"""Stop the MCP server process."""
if self.mcp_process:
self.mcp_process.terminate()
await self.mcp_process.wait()
logger.info("Stopped MCP server")
async def send_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]:
"""Send a request to the MCP server and get response."""
if not self.mcp_process:
raise RuntimeError("MCP server not started")
request_json = json.dumps(request) + "\n"
self.mcp_process.stdin.write(request_json.encode())
await self.mcp_process.stdin.drain()
response_line = await self.mcp_process.stdout.readline()
if not response_line:
raise RuntimeError("No response from MCP server")
return json.loads(response_line.decode().strip())
async def initialize_mcp_connection(self):
"""Initialize the MCP connection."""
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "leann-slack-reader", "version": "1.0.0"},
},
}
response = await self.send_mcp_request(init_request)
if "error" in response:
raise RuntimeError(f"MCP initialization failed: {response['error']}")
logger.info("MCP connection initialized successfully")
async def list_available_tools(self) -> list[dict[str, Any]]:
"""List available tools from the MCP server."""
list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
response = await self.send_mcp_request(list_request)
if "error" in response:
raise RuntimeError(f"Failed to list tools: {response['error']}")
return response.get("result", {}).get("tools", [])
def _is_cache_sync_error(self, error: dict) -> bool:
"""Check if the error is related to users cache not being ready."""
if isinstance(error, dict):
message = error.get("message", "").lower()
return (
"users cache is not ready" in message or "sync process is still running" in message
)
return False
async def _retry_with_backoff(self, func, *args, **kwargs):
"""Retry a function with exponential backoff, especially for cache sync issues."""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
# Check if this is a cache sync error
error_dict = {}
if hasattr(e, "args") and e.args and isinstance(e.args[0], dict):
error_dict = e.args[0]
elif "Failed to fetch messages" in str(e):
# Try to extract error from the exception message
import re
match = re.search(r"'error':\s*(\{[^}]+\})", str(e))
if match:
try:
error_dict = eval(match.group(1))
except (ValueError, SyntaxError, NameError):
pass
else:
# Try alternative format
match = re.search(r"Failed to fetch messages:\s*(\{[^}]+\})", str(e))
if match:
try:
error_dict = eval(match.group(1))
except (ValueError, SyntaxError, NameError):
pass
if self._is_cache_sync_error(error_dict):
if attempt < self.max_retries:
delay = self.retry_delay * (2**attempt) # Exponential backoff
logger.info(
f"Cache sync not ready, waiting {delay:.1f}s before retry {attempt + 1}/{self.max_retries}"
)
await asyncio.sleep(delay)
continue
else:
logger.warning(
f"Cache sync still not ready after {self.max_retries} retries, giving up"
)
break
else:
# Not a cache sync error, don't retry
break
# If we get here, all retries failed or it's not a retryable error
raise last_exception
async def fetch_slack_messages(
self, channel: Optional[str] = None, limit: int = 100
) -> list[dict[str, Any]]:
"""
Fetch Slack messages using MCP tools with retry logic for cache sync issues.
Args:
channel: Optional channel name to filter messages
limit: Maximum number of messages to fetch
Returns:
List of message dictionaries
"""
return await self._retry_with_backoff(self._fetch_slack_messages_impl, channel, limit)
async def _fetch_slack_messages_impl(
self, channel: Optional[str] = None, limit: int = 100
) -> list[dict[str, Any]]:
"""
Internal implementation of fetch_slack_messages without retry logic.
"""
# This is a generic implementation - specific MCP servers may have different tool names
# Common tool names might be: 'get_messages', 'list_messages', 'fetch_channel_history'
tools = await self.list_available_tools()
logger.info(f"Available tools: {[tool.get('name') for tool in tools]}")
message_tool = None
# Look for a tool that can fetch messages - prioritize conversations_history
message_tool = None
# First, try to find conversations_history specifically
for tool in tools:
tool_name = tool.get("name", "").lower()
if "conversations_history" in tool_name:
message_tool = tool
logger.info(f"Found conversations_history tool: {tool}")
break
# If not found, look for other message-fetching tools
if not message_tool:
for tool in tools:
tool_name = tool.get("name", "").lower()
if any(
keyword in tool_name
for keyword in ["conversations_search", "message", "history"]
):
message_tool = tool
break
if not message_tool:
raise RuntimeError("No message fetching tool found in MCP server")
# Prepare tool call parameters
tool_params = {"limit": "180d"} # Use 180 days to get older messages
if channel:
# For conversations_history, use channel_id parameter
if message_tool["name"] == "conversations_history":
tool_params["channel_id"] = channel
else:
# Try common parameter names for channel specification
for param_name in ["channel", "channel_id", "channel_name"]:
tool_params[param_name] = channel
break
logger.info(f"Tool parameters: {tool_params}")
fetch_request = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": message_tool["name"], "arguments": tool_params},
}
response = await self.send_mcp_request(fetch_request)
if "error" in response:
raise RuntimeError(f"Failed to fetch messages: {response['error']}")
# Extract messages from response - format may vary by MCP server
result = response.get("result", {})
if "content" in result and isinstance(result["content"], list):
# Some MCP servers return content as a list
content = result["content"][0] if result["content"] else {}
if "text" in content:
try:
messages = json.loads(content["text"])
except json.JSONDecodeError:
# If not JSON, try to parse as CSV format (Slack MCP server format)
messages = self._parse_csv_messages(content["text"], channel)
else:
messages = result["content"]
else:
# Direct message format
messages = result.get("messages", [result])
return messages if isinstance(messages, list) else [messages]
def _parse_csv_messages(self, csv_text: str, channel: str) -> list[dict[str, Any]]:
"""Parse CSV format messages from Slack MCP server."""
import csv
import io
messages = []
try:
# Split by lines and process each line as a CSV row
lines = csv_text.strip().split("\n")
if not lines:
return messages
# Skip header line if it exists
start_idx = 0
if lines[0].startswith("MsgID,UserID,UserName"):
start_idx = 1
for line in lines[start_idx:]:
if not line.strip():
continue
# Parse CSV line
reader = csv.reader(io.StringIO(line))
try:
row = next(reader)
if len(row) >= 7: # Ensure we have enough columns
message = {
"ts": row[0],
"user": row[1],
"username": row[2],
"real_name": row[3],
"channel": row[4],
"thread_ts": row[5],
"text": row[6],
"time": row[7] if len(row) > 7 else "",
"reactions": row[8] if len(row) > 8 else "",
"cursor": row[9] if len(row) > 9 else "",
}
messages.append(message)
except Exception as e:
logger.warning(f"Failed to parse CSV line: {line[:100]}... Error: {e}")
continue
except Exception as e:
logger.warning(f"Failed to parse CSV messages: {e}")
# Fallback: treat entire text as one message
messages = [{"text": csv_text, "channel": channel or "unknown"}]
return messages
def _format_message(self, message: dict[str, Any]) -> str:
"""Format a single message for indexing."""
text = message.get("text", "")
user = message.get("user", message.get("username", "Unknown"))
channel = message.get("channel", message.get("channel_name", "Unknown"))
timestamp = message.get("ts", message.get("timestamp", ""))
# Format timestamp if available
formatted_time = ""
if timestamp:
try:
import datetime
if isinstance(timestamp, str) and "." in timestamp:
dt = datetime.datetime.fromtimestamp(float(timestamp))
formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(timestamp, (int, float)):
dt = datetime.datetime.fromtimestamp(timestamp)
formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
formatted_time = str(timestamp)
except (ValueError, TypeError):
formatted_time = str(timestamp)
# Build formatted message
parts = []
if channel:
parts.append(f"Channel: #{channel}")
if user:
parts.append(f"User: {user}")
if formatted_time:
parts.append(f"Time: {formatted_time}")
if text:
parts.append(f"Message: {text}")
return "\n".join(parts)
def _create_concatenated_content(self, messages: list[dict[str, Any]], channel: str) -> str:
"""Create concatenated content from multiple messages in a channel."""
if not messages:
return ""
# Sort messages by timestamp if available
try:
messages.sort(key=lambda x: float(x.get("ts", x.get("timestamp", 0))))
except (ValueError, TypeError):
pass # Keep original order if timestamps aren't numeric
# Limit messages per conversation
if len(messages) > self.max_messages_per_conversation:
messages = messages[-self.max_messages_per_conversation :]
# Create header
content_parts = [
f"Slack Channel: #{channel}",
f"Message Count: {len(messages)}",
f"Workspace: {self.workspace_name or 'Unknown'}",
"=" * 50,
"",
]
# Add messages
for message in messages:
formatted_msg = self._format_message(message)
if formatted_msg.strip():
content_parts.append(formatted_msg)
content_parts.append("-" * 30)
content_parts.append("")
return "\n".join(content_parts)
async def get_all_channels(self) -> list[str]:
"""Get list of all available channels."""
try:
channels_list_request = {
"jsonrpc": "2.0",
"id": 4,
"method": "tools/call",
"params": {"name": "channels_list", "arguments": {}},
}
channels_response = await self.send_mcp_request(channels_list_request)
if "result" in channels_response:
result = channels_response["result"]
if "content" in result and isinstance(result["content"], list):
content = result["content"][0] if result["content"] else {}
if "text" in content:
# Parse the channels from the response
channels = []
lines = content["text"].split("\n")
for line in lines:
if line.strip() and ("#" in line or "C" in line[:10]):
# Extract channel ID or name
parts = line.split()
for part in parts:
if part.startswith("C") and len(part) > 5:
channels.append(part)
elif part.startswith("#"):
channels.append(part[1:]) # Remove #
logger.info(f"Found {len(channels)} channels: {channels}")
return channels
return []
except Exception as e:
logger.warning(f"Failed to get channels list: {e}")
return []
async def read_slack_data(self, channels: Optional[list[str]] = None) -> list[str]:
"""
Read Slack data and return formatted text chunks.
Args:
channels: Optional list of channel names to fetch. If None, fetches from all available channels.
Returns:
List of formatted text chunks ready for LEANN indexing
"""
try:
await self.start_mcp_server()
await self.initialize_mcp_connection()
all_texts = []
if channels:
# Fetch specific channels
for channel in channels:
try:
messages = await self.fetch_slack_messages(channel=channel, limit=1000)
if messages:
if self.concatenate_conversations:
text_content = self._create_concatenated_content(messages, channel)
if text_content.strip():
all_texts.append(text_content)
else:
# Process individual messages
for message in messages:
formatted_msg = self._format_message(message)
if formatted_msg.strip():
all_texts.append(formatted_msg)
except Exception as e:
logger.warning(f"Failed to fetch messages from channel {channel}: {e}")
continue
else:
# Fetch from all available channels
logger.info("Fetching from all available channels...")
all_channels = await self.get_all_channels()
if not all_channels:
# Fallback to common channel names if we can't get the list
all_channels = ["general", "random", "announcements", "C0GN5BX0F"]
logger.info(f"Using fallback channels: {all_channels}")
for channel in all_channels:
try:
logger.info(f"Searching channel: {channel}")
messages = await self.fetch_slack_messages(channel=channel, limit=1000)
if messages:
if self.concatenate_conversations:
text_content = self._create_concatenated_content(messages, channel)
if text_content.strip():
all_texts.append(text_content)
else:
# Process individual messages
for message in messages:
formatted_msg = self._format_message(message)
if formatted_msg.strip():
all_texts.append(formatted_msg)
except Exception as e:
logger.warning(f"Failed to fetch messages from channel {channel}: {e}")
continue
return all_texts
finally:
await self.stop_mcp_server()
async def __aenter__(self):
"""Async context manager entry."""
await self.start_mcp_server()
await self.initialize_mcp_connection()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.stop_mcp_server()

View File

@@ -1,227 +0,0 @@
#!/usr/bin/env python3
"""
Slack RAG Application with MCP Support
This application enables RAG (Retrieval-Augmented Generation) on Slack messages
by connecting to Slack MCP servers to fetch live data and index it in LEANN.
Usage:
python -m apps.slack_rag --mcp-server "slack-mcp-server" --query "What did the team discuss about the project?"
"""
import argparse
import asyncio
from apps.base_rag_example import BaseRAGExample
from apps.slack_data.slack_mcp_reader import SlackMCPReader
class SlackMCPRAG(BaseRAGExample):
"""
RAG application for Slack messages via MCP servers.
This class provides a complete RAG pipeline for Slack data, including
MCP server connection, data fetching, indexing, and interactive chat.
"""
def __init__(self):
super().__init__(
name="Slack MCP RAG",
description="RAG application for Slack messages via MCP servers",
default_index_name="slack_messages",
)
def _add_specific_arguments(self, parser: argparse.ArgumentParser):
"""Add Slack MCP-specific arguments."""
parser.add_argument(
"--mcp-server",
type=str,
required=True,
help="Command to start the Slack MCP server (e.g., 'slack-mcp-server' or 'npx slack-mcp-server')",
)
parser.add_argument(
"--workspace-name",
type=str,
help="Slack workspace name for better organization and filtering",
)
parser.add_argument(
"--channels",
nargs="+",
help="Specific Slack channels to index (e.g., general random). If not specified, fetches from all available channels",
)
parser.add_argument(
"--concatenate-conversations",
action="store_true",
default=True,
help="Group messages by channel/thread for better context (default: True)",
)
parser.add_argument(
"--no-concatenate-conversations",
action="store_true",
help="Process individual messages instead of grouping by channel",
)
parser.add_argument(
"--max-messages-per-channel",
type=int,
default=100,
help="Maximum number of messages to include per channel (default: 100)",
)
parser.add_argument(
"--test-connection",
action="store_true",
help="Test MCP server connection and list available tools without indexing",
)
parser.add_argument(
"--max-retries",
type=int,
default=5,
help="Maximum number of retries for failed operations (default: 5)",
)
parser.add_argument(
"--retry-delay",
type=float,
default=2.0,
help="Initial delay between retries in seconds (default: 2.0)",
)
async def test_mcp_connection(self, args) -> bool:
"""Test the MCP server connection and display available tools."""
print(f"Testing connection to MCP server: {args.mcp_server}")
try:
reader = SlackMCPReader(
mcp_server_command=args.mcp_server,
workspace_name=args.workspace_name,
concatenate_conversations=not args.no_concatenate_conversations,
max_messages_per_conversation=args.max_messages_per_channel,
max_retries=args.max_retries,
retry_delay=args.retry_delay,
)
async with reader:
tools = await reader.list_available_tools()
print("Successfully connected to MCP server!")
print(f"Available tools ({len(tools)}):")
for i, tool in enumerate(tools, 1):
name = tool.get("name", "Unknown")
description = tool.get("description", "No description available")
print(f"\n{i}. {name}")
print(
f" Description: {description[:100]}{'...' if len(description) > 100 else ''}"
)
# Show input schema if available
schema = tool.get("inputSchema", {})
if schema.get("properties"):
props = list(schema["properties"].keys())[:3] # Show first 3 properties
print(
f" Parameters: {', '.join(props)}{'...' if len(schema['properties']) > 3 else ''}"
)
return True
except Exception as e:
print(f"Failed to connect to MCP server: {e}")
print("\nTroubleshooting tips:")
print("1. Make sure the MCP server is installed and accessible")
print("2. Check if the server command is correct")
print("3. Ensure you have proper authentication/credentials configured")
print("4. Try running the MCP server command directly to test it")
return False
async def load_data(self, args) -> list[str]:
"""Load Slack messages via MCP server."""
print(f"Connecting to Slack MCP server: {args.mcp_server}")
if args.workspace_name:
print(f"Workspace: {args.workspace_name}")
# Filter out empty strings from channels
channels = [ch for ch in args.channels if ch.strip()] if args.channels else None
if channels:
print(f"Channels: {', '.join(channels)}")
else:
print("Fetching from all available channels")
concatenate = not args.no_concatenate_conversations
print(
f"Processing mode: {'Concatenated conversations' if concatenate else 'Individual messages'}"
)
try:
reader = SlackMCPReader(
mcp_server_command=args.mcp_server,
workspace_name=args.workspace_name,
concatenate_conversations=concatenate,
max_messages_per_conversation=args.max_messages_per_channel,
max_retries=args.max_retries,
retry_delay=args.retry_delay,
)
texts = await reader.read_slack_data(channels=channels)
if not texts:
print("No messages found! This could mean:")
print("- The MCP server couldn't fetch messages")
print("- The specified channels don't exist or are empty")
print("- Authentication issues with the Slack workspace")
return []
print(f"Successfully loaded {len(texts)} text chunks from Slack")
# Show sample of what was loaded
if texts:
sample_text = texts[0][:200] + "..." if len(texts[0]) > 200 else texts[0]
print("\nSample content:")
print("-" * 40)
print(sample_text)
print("-" * 40)
return texts
except Exception as e:
print(f"Error loading Slack data: {e}")
print("\nThis might be due to:")
print("- MCP server connection issues")
print("- Authentication problems")
print("- Network connectivity issues")
print("- Incorrect channel names")
raise
async def run(self):
"""Main entry point with MCP connection testing."""
args = self.parser.parse_args()
# Test connection if requested
if args.test_connection:
success = await self.test_mcp_connection(args)
if not success:
return
print(
"MCP server is working! You can now run without --test-connection to start indexing."
)
return
# Run the standard RAG pipeline
await super().run()
async def main():
"""Main entry point for the Slack MCP RAG application."""
app = SlackMCPRAG()
await app.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1 +0,0 @@
# Twitter MCP data integration for LEANN

View File

@@ -1,295 +0,0 @@
#!/usr/bin/env python3
"""
Twitter MCP Reader for LEANN
This module provides functionality to connect to Twitter MCP servers and fetch bookmark data
for indexing in LEANN. It supports various Twitter MCP server implementations and provides
flexible bookmark processing options.
"""
import asyncio
import json
import logging
from typing import Any, Optional
logger = logging.getLogger(__name__)
class TwitterMCPReader:
"""
Reader for Twitter bookmark data via MCP (Model Context Protocol) servers.
This class connects to Twitter MCP servers to fetch bookmark data and convert it
into a format suitable for LEANN indexing.
"""
def __init__(
self,
mcp_server_command: str,
username: Optional[str] = None,
include_tweet_content: bool = True,
include_metadata: bool = True,
max_bookmarks: int = 1000,
):
"""
Initialize the Twitter MCP Reader.
Args:
mcp_server_command: Command to start the MCP server (e.g., 'twitter-mcp-server')
username: Optional Twitter username to filter bookmarks
include_tweet_content: Whether to include full tweet content
include_metadata: Whether to include tweet metadata (likes, retweets, etc.)
max_bookmarks: Maximum number of bookmarks to fetch
"""
self.mcp_server_command = mcp_server_command
self.username = username
self.include_tweet_content = include_tweet_content
self.include_metadata = include_metadata
self.max_bookmarks = max_bookmarks
self.mcp_process = None
async def start_mcp_server(self):
"""Start the MCP server process."""
try:
self.mcp_process = await asyncio.create_subprocess_exec(
*self.mcp_server_command.split(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
logger.info(f"Started MCP server: {self.mcp_server_command}")
except Exception as e:
logger.error(f"Failed to start MCP server: {e}")
raise
async def stop_mcp_server(self):
"""Stop the MCP server process."""
if self.mcp_process:
self.mcp_process.terminate()
await self.mcp_process.wait()
logger.info("Stopped MCP server")
async def send_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]:
"""Send a request to the MCP server and get response."""
if not self.mcp_process:
raise RuntimeError("MCP server not started")
request_json = json.dumps(request) + "\n"
self.mcp_process.stdin.write(request_json.encode())
await self.mcp_process.stdin.drain()
response_line = await self.mcp_process.stdout.readline()
if not response_line:
raise RuntimeError("No response from MCP server")
return json.loads(response_line.decode().strip())
async def initialize_mcp_connection(self):
"""Initialize the MCP connection."""
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "leann-twitter-reader", "version": "1.0.0"},
},
}
response = await self.send_mcp_request(init_request)
if "error" in response:
raise RuntimeError(f"MCP initialization failed: {response['error']}")
logger.info("MCP connection initialized successfully")
async def list_available_tools(self) -> list[dict[str, Any]]:
"""List available tools from the MCP server."""
list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
response = await self.send_mcp_request(list_request)
if "error" in response:
raise RuntimeError(f"Failed to list tools: {response['error']}")
return response.get("result", {}).get("tools", [])
async def fetch_twitter_bookmarks(self, limit: Optional[int] = None) -> list[dict[str, Any]]:
"""
Fetch Twitter bookmarks using MCP tools.
Args:
limit: Maximum number of bookmarks to fetch
Returns:
List of bookmark dictionaries
"""
tools = await self.list_available_tools()
bookmark_tool = None
# Look for a tool that can fetch bookmarks
for tool in tools:
tool_name = tool.get("name", "").lower()
if any(keyword in tool_name for keyword in ["bookmark", "saved", "favorite"]):
bookmark_tool = tool
break
if not bookmark_tool:
raise RuntimeError("No bookmark fetching tool found in MCP server")
# Prepare tool call parameters
tool_params = {}
if limit or self.max_bookmarks:
tool_params["limit"] = limit or self.max_bookmarks
if self.username:
tool_params["username"] = self.username
fetch_request = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": bookmark_tool["name"], "arguments": tool_params},
}
response = await self.send_mcp_request(fetch_request)
if "error" in response:
raise RuntimeError(f"Failed to fetch bookmarks: {response['error']}")
# Extract bookmarks from response
result = response.get("result", {})
if "content" in result and isinstance(result["content"], list):
content = result["content"][0] if result["content"] else {}
if "text" in content:
try:
bookmarks = json.loads(content["text"])
except json.JSONDecodeError:
# If not JSON, treat as plain text
bookmarks = [{"text": content["text"], "source": "twitter"}]
else:
bookmarks = result["content"]
else:
bookmarks = result.get("bookmarks", result.get("tweets", [result]))
return bookmarks if isinstance(bookmarks, list) else [bookmarks]
def _format_bookmark(self, bookmark: dict[str, Any]) -> str:
"""Format a single bookmark for indexing."""
# Extract tweet information
text = bookmark.get("text", bookmark.get("content", ""))
author = bookmark.get(
"author", bookmark.get("username", bookmark.get("user", {}).get("username", "Unknown"))
)
timestamp = bookmark.get("created_at", bookmark.get("timestamp", ""))
url = bookmark.get("url", bookmark.get("tweet_url", ""))
# Extract metadata if available
likes = bookmark.get("likes", bookmark.get("favorite_count", 0))
retweets = bookmark.get("retweets", bookmark.get("retweet_count", 0))
replies = bookmark.get("replies", bookmark.get("reply_count", 0))
# Build formatted bookmark
parts = []
# Header
parts.append("=== Twitter Bookmark ===")
if author:
parts.append(f"Author: @{author}")
if timestamp:
# Format timestamp if it's a standard format
try:
import datetime
if "T" in str(timestamp): # ISO format
dt = datetime.datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
formatted_time = str(timestamp)
parts.append(f"Date: {formatted_time}")
except (ValueError, TypeError):
parts.append(f"Date: {timestamp}")
if url:
parts.append(f"URL: {url}")
# Tweet content
if text and self.include_tweet_content:
parts.append("")
parts.append("Content:")
parts.append(text)
# Metadata
if self.include_metadata and any([likes, retweets, replies]):
parts.append("")
parts.append("Engagement:")
if likes:
parts.append(f" Likes: {likes}")
if retweets:
parts.append(f" Retweets: {retweets}")
if replies:
parts.append(f" Replies: {replies}")
# Extract hashtags and mentions if available
hashtags = bookmark.get("hashtags", [])
mentions = bookmark.get("mentions", [])
if hashtags or mentions:
parts.append("")
if hashtags:
parts.append(f"Hashtags: {', '.join(hashtags)}")
if mentions:
parts.append(f"Mentions: {', '.join(mentions)}")
return "\n".join(parts)
async def read_twitter_bookmarks(self) -> list[str]:
"""
Read Twitter bookmark data and return formatted text chunks.
Returns:
List of formatted text chunks ready for LEANN indexing
"""
try:
await self.start_mcp_server()
await self.initialize_mcp_connection()
print(f"Fetching up to {self.max_bookmarks} bookmarks...")
if self.username:
print(f"Filtering for user: @{self.username}")
bookmarks = await self.fetch_twitter_bookmarks()
if not bookmarks:
print("No bookmarks found")
return []
print(f"Processing {len(bookmarks)} bookmarks...")
all_texts = []
processed_count = 0
for bookmark in bookmarks:
try:
formatted_bookmark = self._format_bookmark(bookmark)
if formatted_bookmark.strip():
all_texts.append(formatted_bookmark)
processed_count += 1
except Exception as e:
logger.warning(f"Failed to format bookmark: {e}")
continue
print(f"Successfully processed {processed_count} bookmarks")
return all_texts
finally:
await self.stop_mcp_server()
async def __aenter__(self):
"""Async context manager entry."""
await self.start_mcp_server()
await self.initialize_mcp_connection()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.stop_mcp_server()

View File

@@ -1,195 +0,0 @@
#!/usr/bin/env python3
"""
Twitter RAG Application with MCP Support
This application enables RAG (Retrieval-Augmented Generation) on Twitter bookmarks
by connecting to Twitter MCP servers to fetch live data and index it in LEANN.
Usage:
python -m apps.twitter_rag --mcp-server "twitter-mcp-server" --query "What articles did I bookmark about AI?"
"""
import argparse
import asyncio
from apps.base_rag_example import BaseRAGExample
from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
class TwitterMCPRAG(BaseRAGExample):
"""
RAG application for Twitter bookmarks via MCP servers.
This class provides a complete RAG pipeline for Twitter bookmark data, including
MCP server connection, data fetching, indexing, and interactive chat.
"""
def __init__(self):
super().__init__(
name="Twitter MCP RAG",
description="RAG application for Twitter bookmarks via MCP servers",
default_index_name="twitter_bookmarks",
)
def _add_specific_arguments(self, parser: argparse.ArgumentParser):
"""Add Twitter MCP-specific arguments."""
parser.add_argument(
"--mcp-server",
type=str,
required=True,
help="Command to start the Twitter MCP server (e.g., 'twitter-mcp-server' or 'npx twitter-mcp-server')",
)
parser.add_argument(
"--username", type=str, help="Twitter username to filter bookmarks (without @)"
)
parser.add_argument(
"--max-bookmarks",
type=int,
default=1000,
help="Maximum number of bookmarks to fetch (default: 1000)",
)
parser.add_argument(
"--no-tweet-content",
action="store_true",
help="Exclude tweet content, only include metadata",
)
parser.add_argument(
"--no-metadata",
action="store_true",
help="Exclude engagement metadata (likes, retweets, etc.)",
)
parser.add_argument(
"--test-connection",
action="store_true",
help="Test MCP server connection and list available tools without indexing",
)
async def test_mcp_connection(self, args) -> bool:
"""Test the MCP server connection and display available tools."""
print(f"Testing connection to MCP server: {args.mcp_server}")
try:
reader = TwitterMCPReader(
mcp_server_command=args.mcp_server,
username=args.username,
include_tweet_content=not args.no_tweet_content,
include_metadata=not args.no_metadata,
max_bookmarks=args.max_bookmarks,
)
async with reader:
tools = await reader.list_available_tools()
print("\n✅ Successfully connected to MCP server!")
print(f"Available tools ({len(tools)}):")
for i, tool in enumerate(tools, 1):
name = tool.get("name", "Unknown")
description = tool.get("description", "No description available")
print(f"\n{i}. {name}")
print(
f" Description: {description[:100]}{'...' if len(description) > 100 else ''}"
)
# Show input schema if available
schema = tool.get("inputSchema", {})
if schema.get("properties"):
props = list(schema["properties"].keys())[:3] # Show first 3 properties
print(
f" Parameters: {', '.join(props)}{'...' if len(schema['properties']) > 3 else ''}"
)
return True
except Exception as e:
print(f"\n❌ Failed to connect to MCP server: {e}")
print("\nTroubleshooting tips:")
print("1. Make sure the Twitter MCP server is installed and accessible")
print("2. Check if the server command is correct")
print("3. Ensure you have proper Twitter API credentials configured")
print("4. Verify your Twitter account has bookmarks to fetch")
print("5. Try running the MCP server command directly to test it")
return False
async def load_data(self, args) -> list[str]:
"""Load Twitter bookmarks via MCP server."""
print(f"Connecting to Twitter MCP server: {args.mcp_server}")
if args.username:
print(f"Username filter: @{args.username}")
print(f"Max bookmarks: {args.max_bookmarks}")
print(f"Include tweet content: {not args.no_tweet_content}")
print(f"Include metadata: {not args.no_metadata}")
try:
reader = TwitterMCPReader(
mcp_server_command=args.mcp_server,
username=args.username,
include_tweet_content=not args.no_tweet_content,
include_metadata=not args.no_metadata,
max_bookmarks=args.max_bookmarks,
)
texts = await reader.read_twitter_bookmarks()
if not texts:
print("❌ No bookmarks found! This could mean:")
print("- You don't have any bookmarks on Twitter")
print("- The MCP server couldn't access your bookmarks")
print("- Authentication issues with Twitter API")
print("- The username filter didn't match any bookmarks")
return []
print(f"✅ Successfully loaded {len(texts)} bookmarks from Twitter")
# Show sample of what was loaded
if texts:
sample_text = texts[0][:300] + "..." if len(texts[0]) > 300 else texts[0]
print("\nSample bookmark:")
print("-" * 50)
print(sample_text)
print("-" * 50)
return texts
except Exception as e:
print(f"❌ Error loading Twitter bookmarks: {e}")
print("\nThis might be due to:")
print("- MCP server connection issues")
print("- Twitter API authentication problems")
print("- Network connectivity issues")
print("- Rate limiting from Twitter API")
raise
async def run(self):
"""Main entry point with MCP connection testing."""
args = self.parser.parse_args()
# Test connection if requested
if args.test_connection:
success = await self.test_mcp_connection(args)
if not success:
return
print(
"\n🎉 MCP server is working! You can now run without --test-connection to start indexing."
)
return
# Run the standard RAG pipeline
await super().run()
async def main():
"""Main entry point for the Twitter MCP RAG application."""
app = TwitterMCPRAG()
await app.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,395 +0,0 @@
# Slack Integration Setup Guide
This guide provides step-by-step instructions for setting up Slack integration with LEANN.
## Overview
LEANN's Slack integration uses MCP (Model Context Protocol) servers to fetch and index your Slack messages for RAG (Retrieval-Augmented Generation). This allows you to search through your Slack conversations using natural language queries.
## Prerequisites
1. **Slack Workspace Access**: You need admin or owner permissions in your Slack workspace to create apps and configure OAuth tokens.
2. **Slack MCP Server**: Install a Slack MCP server (e.g., `slack-mcp-server` via npm)
3. **LEANN**: Ensure you have LEANN installed and working
## Step 1: Create a Slack App
### 1.1 Go to Slack API Dashboard
1. Visit [https://api.slack.com/apps](https://api.slack.com/apps)
2. Click **"Create New App"**
3. Choose **"From scratch"**
4. Enter your app name (e.g., "LEANN Slack Integration")
5. Select your workspace
6. Click **"Create App"**
### 1.2 Configure App Permissions
#### Token Scopes
1. In your app dashboard, go to **"OAuth & Permissions"** in the left sidebar
2. Scroll down to **"Scopes"** section
3. Under **"Bot Token Scopes & OAuth Scope"**, click **"Add an OAuth Scope"**
4. Add the following scopes:
- `channels:read` - Read public channel information
- `channels:history` - Read messages in public channels
- `groups:read` - Read private channel information
- `groups:history` - Read messages in private channels
- `im:read` - Read direct message information
- `im:history` - Read direct messages
- `mpim:read` - Read group direct message information
- `mpim:history` - Read group direct messages
- `users:read` - Read user information
- `team:read` - Read workspace information
#### App-Level Tokens (Optional)
Some MCP servers may require app-level tokens:
1. Go to **"Basic Information"** in the left sidebar
2. Scroll down to **"App-Level Tokens"**
3. Click **"Generate Token and Scopes"**
4. Enter a name (e.g., "LEANN Integration")
5. Add the `connections:write` scope
6. Click **"Generate"**
7. Copy the token (starts with `xapp-`)
### 1.3 Install App to Workspace
1. Go to **"OAuth & Permissions"** in the left sidebar
2. Click **"Install to Workspace"**
3. Review the permissions and click **"Allow"**
4. Copy the **"Bot User OAuth Token"** (starts with `xoxb-`)
5. Copy the **"User OAuth Token"** (starts with `xoxp-`)
## Step 2: Install Slack MCP Server
### Option A: Using npm (Recommended)
```bash
# Install globally
npm install -g slack-mcp-server
# Or install locally
npm install slack-mcp-server
```
### Option B: Using npx (No installation required)
```bash
# Use directly without installation
npx slack-mcp-server
```
## Step 3: Install and Configure Ollama (for Real LLM Responses)
### 3.1 Install Ollama
```bash
# Install Ollama using Homebrew (macOS)
brew install ollama
# Or download from https://ollama.ai/
```
### 3.2 Start Ollama Service
```bash
# Start Ollama as a service
brew services start ollama
# Or start manually
ollama serve
```
### 3.3 Pull a Model
```bash
# Pull a lightweight model for testing
ollama pull llama3.2:1b
# Verify the model is available
ollama list
```
## Step 4: Configure Environment Variables
Create a `.env` file or set environment variables:
```bash
# Required: User OAuth Token
SLACK_OAUTH_TOKEN=xoxp-your-user-oauth-token-here
# Optional: App-Level Token (if your MCP server requires it)
SLACK_APP_TOKEN=xapp-your-app-token-here
# Optional: Workspace-specific settings
SLACK_WORKSPACE_ID=T1234567890 # Your workspace ID (optional)
```
## Step 5: Test the Setup
### 5.1 Test MCP Server Connection
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--test-connection \
--workspace-name "Your Workspace Name"
```
This will test the connection and list available tools without indexing any data.
### 5.2 Index a Specific Channel
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--workspace-name "Your Workspace Name" \
--channels general \
--query "What did we discuss about the project?"
```
### 5.3 Real RAG Query Examples
This section demonstrates successful Slack RAG integration queries against the Sky Lab Computing workspace's "random" channel. The system successfully retrieves actual conversation messages and performs semantic search with high relevance scores, including finding specific research paper announcements and technical discussions.
### Example 1: Advisor Models Query
**Query:** "train black-box models to adopt to your personal data"
This query demonstrates the system's ability to find specific research announcements about training black-box models for personal data adaptation.
![Advisor Models Query - Command Setup](videos/slack_integration_1.1.png)
![Advisor Models Query - Search Results](videos/slack_integration_1.2.png)
![Advisor Models Query - LLM Response](videos/slack_integration_1.3.png)
### Example 2: Barbarians at the Gate Query
**Query:** "AI-driven research systems ADRS"
This query demonstrates the system's ability to find specific research announcements about AI-driven research systems and algorithm discovery.
![Barbarians Query - Command Setup](videos/slack_integration_2.1.png)
![Barbarians Query - Search Results](videos/slack_integration_2.2.png)
![Barbarians Query - LLM Response](videos/slack_integration_2.3.png)
### Prerequisites
- Bot is installed in the Sky Lab Computing workspace and invited to the target channel (run `/invite @YourBotName` in the channel if needed)
- Bot token available and exported in the same terminal session
### Commands
1) Set the workspace token for this shell
```bash
export SLACK_MCP_XOXP_TOKEN="xoxp-***-redacted-***"
```
2) Run queries against the "random" channel by channel ID (C0GN5BX0F)
**Advisor Models Query:**
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--workspace-name "Sky Lab Computing" \
--channels C0GN5BX0F \
--max-messages-per-channel 100000 \
--query "train black-box models to adopt to your personal data" \
--llm ollama \
--llm-model "llama3.2:1b" \
--llm-host "http://localhost:11434" \
--no-concatenate-conversations
```
**Barbarians at the Gate Query:**
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--workspace-name "Sky Lab Computing" \
--channels C0GN5BX0F \
--max-messages-per-channel 100000 \
--query "AI-driven research systems ADRS" \
--llm ollama \
--llm-model "llama3.2:1b" \
--llm-host "http://localhost:11434" \
--no-concatenate-conversations
```
These examples demonstrate the system's ability to find and retrieve specific research announcements and technical discussions from the conversation history, showcasing the power of semantic search in Slack data.
3) Optional: Ask a broader question
```bash
python test_channel_by_id_or_name.py \
--channel-id C0GN5BX0F \
--workspace-name "Sky Lab Computing" \
--query "What is LEANN about?"
```
Notes:
- If you see `not_in_channel`, invite the bot to the channel and re-run.
- If you see `channel_not_found`, confirm the channel ID and workspace.
- Deep search via server-side “search” tools may require additional Slack scopes; the example above performs client-side filtering over retrieved history.
## Common Issues and Solutions
### Issue 1: "users cache is not ready yet" Error
**Problem**: You see this warning:
```
WARNING - Failed to fetch messages from channel random: Failed to fetch messages: {'code': -32603, 'message': 'users cache is not ready yet, sync process is still running... please wait'}
```
**Solution**: This is a common timing issue. The LEANN integration now includes automatic retry logic:
1. **Wait and Retry**: The system will automatically retry with exponential backoff (2s, 4s, 8s, etc.)
2. **Increase Retry Parameters**: If needed, you can customize retry behavior:
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--max-retries 10 \
--retry-delay 3.0 \
--channels general \
--query "Your query here"
```
3. **Keep MCP Server Running**: Start the MCP server separately and keep it running:
```bash
# Terminal 1: Start MCP server
slack-mcp-server
# Terminal 2: Run LEANN (it will connect to the running server)
python -m apps.slack_rag --mcp-server "slack-mcp-server" --channels general --query "test"
```
### Issue 2: "No message fetching tool found"
**Problem**: The MCP server doesn't have the expected tools.
**Solution**:
1. Check if your MCP server is properly installed and configured
2. Verify your Slack tokens are correct
3. Try a different MCP server implementation
4. Check the MCP server documentation for required configuration
### Issue 3: Permission Denied Errors
**Problem**: You get permission errors when trying to access channels.
**Solutions**:
1. **Check Bot Permissions**: Ensure your bot has been added to the channels you want to access
2. **Verify Token Scopes**: Make sure you have all required scopes configured
3. **Channel Access**: For private channels, the bot needs to be explicitly invited
4. **Workspace Permissions**: Ensure your Slack app has the necessary workspace permissions
### Issue 4: Empty Results
**Problem**: No messages are returned even though the channel has messages.
**Solutions**:
1. **Check Channel Names**: Ensure channel names are correct (without the # symbol)
2. **Verify Bot Access**: Make sure the bot can access the channels
3. **Check Date Ranges**: Some MCP servers have limitations on message history
4. **Increase Message Limits**: Try increasing the message limit:
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--channels general \
--max-messages-per-channel 1000 \
--query "test"
```
## Advanced Configuration
### Custom MCP Server Commands
If you need to pass additional parameters to your MCP server:
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server --token-file /path/to/tokens.json" \
--workspace-name "Your Workspace" \
--channels general \
--query "Your query"
```
### Multiple Workspaces
To work with multiple Slack workspaces, you can:
1. Create separate apps for each workspace
2. Use different environment variables
3. Run separate instances with different configurations
### Performance Optimization
For better performance with large workspaces:
```bash
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--workspace-name "Your Workspace" \
--max-messages-per-channel 500 \
--no-concatenate-conversations \
--query "Your query"
```
---
## Troubleshooting Checklist
- [ ] Slack app created with proper permissions
- [ ] Bot token (xoxb-) copied correctly
- [ ] App-level token (xapp-) created if needed
- [ ] MCP server installed and accessible
- [ ] Ollama installed and running (`brew services start ollama`)
- [ ] Ollama model pulled (`ollama pull llama3.2:1b`)
- [ ] Environment variables set correctly
- [ ] Bot invited to relevant channels
- [ ] Channel names specified without # symbol
- [ ] Sufficient retry attempts configured
- [ ] Network connectivity to Slack APIs
## Getting Help
If you continue to have issues:
1. **Check Logs**: Look for detailed error messages in the console output
2. **Test MCP Server**: Use `--test-connection` to verify the MCP server is working
3. **Verify Tokens**: Double-check that your Slack tokens are valid and have the right scopes
4. **Check Ollama**: Ensure Ollama is running (`ollama serve`) and the model is available (`ollama list`)
5. **Community Support**: Reach out to the LEANN community for help
## Example Commands
### Basic Usage
```bash
# Test connection
python -m apps.slack_rag --mcp-server "slack-mcp-server" --test-connection
# Index specific channels
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--workspace-name "My Company" \
--channels general random \
--query "What did we decide about the project timeline?"
```
### Advanced Usage
```bash
# With custom retry settings
python -m apps.slack_rag \
--mcp-server "slack-mcp-server" \
--workspace-name "My Company" \
--channels general \
--max-retries 10 \
--retry-delay 5.0 \
--max-messages-per-channel 2000 \
--query "Show me all decisions made in the last month"
```

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 445 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 508 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 437 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 474 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 501 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 454 KiB

View File

@@ -1,178 +0,0 @@
#!/usr/bin/env python3
"""
MCP Integration Examples for LEANN
This script demonstrates how to use LEANN with different MCP servers for
RAG on various platforms like Slack and Twitter.
Examples:
1. Slack message RAG via MCP
2. Twitter bookmark RAG via MCP
3. Testing MCP server connections
"""
import asyncio
import sys
from pathlib import Path
# Add the parent directory to the path so we can import from apps
sys.path.append(str(Path(__file__).parent.parent))
async def demo_slack_mcp():
"""Demonstrate Slack MCP integration."""
print("=" * 60)
print("🔥 Slack MCP RAG Demo")
print("=" * 60)
print("\n1. Testing Slack MCP server connection...")
# This would typically use a real MCP server command
# For demo purposes, we show what the command would look like
# slack_app = SlackMCPRAG() # Would be used for actual testing
# Simulate command line arguments for testing
class MockArgs:
mcp_server = "slack-mcp-server" # This would be the actual MCP server command
workspace_name = "my-workspace"
channels = ["general", "random", "dev-team"]
no_concatenate_conversations = False
max_messages_per_channel = 50
test_connection = True
print(f"MCP Server Command: {MockArgs.mcp_server}")
print(f"Workspace: {MockArgs.workspace_name}")
print(f"Channels: {', '.join(MockArgs.channels)}")
# In a real scenario, you would run:
# success = await slack_app.test_mcp_connection(MockArgs)
print("\n📝 Example usage:")
print("python -m apps.slack_rag \\")
print(" --mcp-server 'slack-mcp-server' \\")
print(" --workspace-name 'my-team' \\")
print(" --channels general dev-team \\")
print(" --test-connection")
print("\n🔍 After indexing, you could query:")
print("- 'What did the team discuss about the project deadline?'")
print("- 'Find messages about the new feature launch'")
print("- 'Show me conversations about budget planning'")
async def demo_twitter_mcp():
"""Demonstrate Twitter MCP integration."""
print("\n" + "=" * 60)
print("🐦 Twitter MCP RAG Demo")
print("=" * 60)
print("\n1. Testing Twitter MCP server connection...")
# twitter_app = TwitterMCPRAG() # Would be used for actual testing
class MockArgs:
mcp_server = "twitter-mcp-server"
username = None # Fetch all bookmarks
max_bookmarks = 500
no_tweet_content = False
no_metadata = False
test_connection = True
print(f"MCP Server Command: {MockArgs.mcp_server}")
print(f"Max Bookmarks: {MockArgs.max_bookmarks}")
print(f"Include Content: {not MockArgs.no_tweet_content}")
print(f"Include Metadata: {not MockArgs.no_metadata}")
print("\n📝 Example usage:")
print("python -m apps.twitter_rag \\")
print(" --mcp-server 'twitter-mcp-server' \\")
print(" --max-bookmarks 1000 \\")
print(" --test-connection")
print("\n🔍 After indexing, you could query:")
print("- 'What AI articles did I bookmark last month?'")
print("- 'Find tweets about machine learning techniques'")
print("- 'Show me bookmarked threads about startup advice'")
async def show_mcp_server_setup():
"""Show how to set up MCP servers."""
print("\n" + "=" * 60)
print("⚙️ MCP Server Setup Guide")
print("=" * 60)
print("\n🔧 Setting up Slack MCP Server:")
print("1. Install a Slack MCP server (example commands):")
print(" npm install -g slack-mcp-server")
print(" # OR")
print(" pip install slack-mcp-server")
print("\n2. Configure Slack credentials:")
print(" export SLACK_BOT_TOKEN='xoxb-your-bot-token'")
print(" export SLACK_APP_TOKEN='xapp-your-app-token'")
print("\n3. Test the server:")
print(" slack-mcp-server --help")
print("\n🔧 Setting up Twitter MCP Server:")
print("1. Install a Twitter MCP server:")
print(" npm install -g twitter-mcp-server")
print(" # OR")
print(" pip install twitter-mcp-server")
print("\n2. Configure Twitter API credentials:")
print(" export TWITTER_API_KEY='your-api-key'")
print(" export TWITTER_API_SECRET='your-api-secret'")
print(" export TWITTER_ACCESS_TOKEN='your-access-token'")
print(" export TWITTER_ACCESS_TOKEN_SECRET='your-access-token-secret'")
print("\n3. Test the server:")
print(" twitter-mcp-server --help")
async def show_integration_benefits():
"""Show the benefits of MCP integration."""
print("\n" + "=" * 60)
print("🌟 Benefits of MCP Integration")
print("=" * 60)
benefits = [
("🔄 Live Data Access", "Fetch real-time data from platforms without manual exports"),
("🔌 Standardized Protocol", "Use any MCP-compatible server with minimal code changes"),
("🚀 Easy Extension", "Add new platforms by implementing MCP readers"),
("🔒 Secure Access", "MCP servers handle authentication and API management"),
("📊 Rich Metadata", "Access full platform metadata (timestamps, engagement, etc.)"),
("⚡ Efficient Processing", "Stream data directly into LEANN without intermediate files"),
]
for title, description in benefits:
print(f"\n{title}")
print(f" {description}")
async def main():
"""Main demo function."""
print("🎯 LEANN MCP Integration Examples")
print("This demo shows how to integrate LEANN with MCP servers for various platforms.")
await demo_slack_mcp()
await demo_twitter_mcp()
await show_mcp_server_setup()
await show_integration_benefits()
print("\n" + "=" * 60)
print("✨ Next Steps")
print("=" * 60)
print("1. Install and configure MCP servers for your platforms")
print("2. Test connections using --test-connection flag")
print("3. Run indexing to build your RAG knowledge base")
print("4. Start querying your personal data!")
print("\n📚 For more information:")
print("- Check the README for detailed setup instructions")
print("- Look at the apps/slack_rag.py and apps/twitter_rag.py for implementation details")
print("- Explore other MCP servers for additional platforms")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -183,73 +183,32 @@ def compute_embeddings_sentence_transformers(
}
try:
# Try loading with advanced parameters first (newer versions)
local_model_kwargs = model_kwargs.copy()
local_tokenizer_kwargs = tokenizer_kwargs.copy()
local_model_kwargs["local_files_only"] = True
local_tokenizer_kwargs["local_files_only"] = True
# Try local loading first
model_kwargs["local_files_only"] = True
tokenizer_kwargs["local_files_only"] = True
model = SentenceTransformer(
model_name,
device=device,
model_kwargs=local_model_kwargs,
tokenizer_kwargs=local_tokenizer_kwargs,
model_kwargs=model_kwargs,
tokenizer_kwargs=tokenizer_kwargs,
local_files_only=True,
)
logger.info("Model loaded successfully! (local + optimized)")
except TypeError as e:
if "model_kwargs" in str(e) or "tokenizer_kwargs" in str(e):
logger.warning(
f"Advanced parameters not supported ({e}), using basic initialization..."
)
# Fallback to basic initialization for older versions
try:
model = SentenceTransformer(
model_name,
device=device,
local_files_only=True,
)
logger.info("Model loaded successfully! (local + basic)")
except Exception as e2:
logger.warning(f"Local loading failed ({e2}), trying network download...")
model = SentenceTransformer(
model_name,
device=device,
local_files_only=False,
)
logger.info("Model loaded successfully! (network + basic)")
else:
raise
except Exception as e:
logger.warning(f"Local loading failed ({e}), trying network download...")
# Fallback to network loading with advanced parameters
try:
network_model_kwargs = model_kwargs.copy()
network_tokenizer_kwargs = tokenizer_kwargs.copy()
network_model_kwargs["local_files_only"] = False
network_tokenizer_kwargs["local_files_only"] = False
# Fallback to network loading
model_kwargs["local_files_only"] = False
tokenizer_kwargs["local_files_only"] = False
model = SentenceTransformer(
model_name,
device=device,
model_kwargs=network_model_kwargs,
tokenizer_kwargs=network_tokenizer_kwargs,
local_files_only=False,
)
logger.info("Model loaded successfully! (network + optimized)")
except TypeError as e2:
if "model_kwargs" in str(e2) or "tokenizer_kwargs" in str(e2):
logger.warning(
f"Advanced parameters not supported ({e2}), using basic network loading..."
)
model = SentenceTransformer(
model_name,
device=device,
local_files_only=False,
)
logger.info("Model loaded successfully! (network + basic)")
else:
raise
model = SentenceTransformer(
model_name,
device=device,
model_kwargs=model_kwargs,
tokenizer_kwargs=tokenizer_kwargs,
local_files_only=False,
)
logger.info("Model loaded successfully! (network + optimized)")
# Apply additional optimizations based on mode
if use_fp16 and device in ["cuda", "mps"]:

View File

@@ -1,208 +0,0 @@
#!/usr/bin/env python3
"""
Test script for MCP integration implementations.
This script tests the basic functionality of the MCP readers and RAG applications
without requiring actual MCP servers to be running.
"""
import sys
from pathlib import Path
# Add the parent directory to the path so we can import from apps
sys.path.append(str(Path(__file__).parent.parent))
from apps.slack_data.slack_mcp_reader import SlackMCPReader
from apps.slack_rag import SlackMCPRAG
from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
from apps.twitter_rag import TwitterMCPRAG
def test_slack_reader_initialization():
"""Test that SlackMCPReader can be initialized with various parameters."""
print("Testing SlackMCPReader initialization...")
# Test basic initialization
reader = SlackMCPReader("slack-mcp-server")
assert reader.mcp_server_command == "slack-mcp-server"
assert reader.concatenate_conversations
assert reader.max_messages_per_conversation == 100
# Test with custom parameters
reader = SlackMCPReader(
"custom-slack-server",
workspace_name="test-workspace",
concatenate_conversations=False,
max_messages_per_conversation=50,
)
assert reader.workspace_name == "test-workspace"
assert not reader.concatenate_conversations
assert reader.max_messages_per_conversation == 50
print("✅ SlackMCPReader initialization tests passed")
def test_twitter_reader_initialization():
"""Test that TwitterMCPReader can be initialized with various parameters."""
print("Testing TwitterMCPReader initialization...")
# Test basic initialization
reader = TwitterMCPReader("twitter-mcp-server")
assert reader.mcp_server_command == "twitter-mcp-server"
assert reader.include_tweet_content
assert reader.include_metadata
assert reader.max_bookmarks == 1000
# Test with custom parameters
reader = TwitterMCPReader(
"custom-twitter-server",
username="testuser",
include_tweet_content=False,
include_metadata=False,
max_bookmarks=500,
)
assert reader.username == "testuser"
assert not reader.include_tweet_content
assert not reader.include_metadata
assert reader.max_bookmarks == 500
print("✅ TwitterMCPReader initialization tests passed")
def test_slack_message_formatting():
"""Test Slack message formatting functionality."""
print("Testing Slack message formatting...")
reader = SlackMCPReader("slack-mcp-server")
# Test basic message formatting
message = {
"text": "Hello, world!",
"user": "john_doe",
"channel": "general",
"ts": "1234567890.123456",
}
formatted = reader._format_message(message)
assert "Channel: #general" in formatted
assert "User: john_doe" in formatted
assert "Message: Hello, world!" in formatted
assert "Time:" in formatted
# Test with missing fields
message = {"text": "Simple message"}
formatted = reader._format_message(message)
assert "Message: Simple message" in formatted
print("✅ Slack message formatting tests passed")
def test_twitter_bookmark_formatting():
"""Test Twitter bookmark formatting functionality."""
print("Testing Twitter bookmark formatting...")
reader = TwitterMCPReader("twitter-mcp-server")
# Test basic bookmark formatting
bookmark = {
"text": "This is a great article about AI!",
"author": "ai_researcher",
"created_at": "2024-01-01T12:00:00Z",
"url": "https://twitter.com/ai_researcher/status/123456789",
"likes": 42,
"retweets": 15,
}
formatted = reader._format_bookmark(bookmark)
assert "=== Twitter Bookmark ===" in formatted
assert "Author: @ai_researcher" in formatted
assert "Content:" in formatted
assert "This is a great article about AI!" in formatted
assert "URL: https://twitter.com" in formatted
assert "Likes: 42" in formatted
assert "Retweets: 15" in formatted
# Test with minimal data
bookmark = {"text": "Simple tweet"}
formatted = reader._format_bookmark(bookmark)
assert "=== Twitter Bookmark ===" in formatted
assert "Simple tweet" in formatted
print("✅ Twitter bookmark formatting tests passed")
def test_slack_rag_initialization():
"""Test that SlackMCPRAG can be initialized."""
print("Testing SlackMCPRAG initialization...")
app = SlackMCPRAG()
assert app.default_index_name == "slack_messages"
assert hasattr(app, "parser")
print("✅ SlackMCPRAG initialization tests passed")
def test_twitter_rag_initialization():
"""Test that TwitterMCPRAG can be initialized."""
print("Testing TwitterMCPRAG initialization...")
app = TwitterMCPRAG()
assert app.default_index_name == "twitter_bookmarks"
assert hasattr(app, "parser")
print("✅ TwitterMCPRAG initialization tests passed")
def test_concatenated_content_creation():
"""Test creation of concatenated content from multiple messages."""
print("Testing concatenated content creation...")
reader = SlackMCPReader("slack-mcp-server", workspace_name="test-workspace")
messages = [
{"text": "First message", "user": "alice", "ts": "1000"},
{"text": "Second message", "user": "bob", "ts": "2000"},
{"text": "Third message", "user": "charlie", "ts": "3000"},
]
content = reader._create_concatenated_content(messages, "general")
assert "Slack Channel: #general" in content
assert "Message Count: 3" in content
assert "Workspace: test-workspace" in content
assert "First message" in content
assert "Second message" in content
assert "Third message" in content
print("✅ Concatenated content creation tests passed")
def main():
"""Run all tests."""
print("🧪 Running MCP Integration Tests")
print("=" * 50)
try:
test_slack_reader_initialization()
test_twitter_reader_initialization()
test_slack_message_formatting()
test_twitter_bookmark_formatting()
test_slack_rag_initialization()
test_twitter_rag_initialization()
test_concatenated_content_creation()
print("\n" + "=" * 50)
print("🎉 All tests passed! MCP integration is working correctly.")
print("\nNext steps:")
print("1. Install actual MCP servers for Slack and Twitter")
print("2. Configure API credentials")
print("3. Test with --test-connection flag")
print("4. Start indexing your live data!")
except Exception as e:
print(f"\n❌ Test failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,221 +0,0 @@
#!/usr/bin/env python3
"""
Standalone test script for MCP integration implementations.
This script tests the basic functionality of the MCP readers
without requiring LEANN core dependencies.
"""
import json
import sys
from pathlib import Path
# Add the parent directory to the path so we can import from apps
sys.path.append(str(Path(__file__).parent.parent))
def test_slack_reader_basic():
"""Test basic SlackMCPReader functionality without async operations."""
print("Testing SlackMCPReader basic functionality...")
# Import and test initialization
from apps.slack_data.slack_mcp_reader import SlackMCPReader
reader = SlackMCPReader("slack-mcp-server")
assert reader.mcp_server_command == "slack-mcp-server"
assert reader.concatenate_conversations
# Test message formatting
message = {
"text": "Hello team! How's the project going?",
"user": "john_doe",
"channel": "general",
"ts": "1234567890.123456",
}
formatted = reader._format_message(message)
assert "Channel: #general" in formatted
assert "User: john_doe" in formatted
assert "Message: Hello team!" in formatted
# Test concatenated content creation
messages = [
{"text": "First message", "user": "alice", "ts": "1000"},
{"text": "Second message", "user": "bob", "ts": "2000"},
]
content = reader._create_concatenated_content(messages, "dev-team")
assert "Slack Channel: #dev-team" in content
assert "Message Count: 2" in content
assert "First message" in content
assert "Second message" in content
print("✅ SlackMCPReader basic tests passed")
def test_twitter_reader_basic():
"""Test basic TwitterMCPReader functionality."""
print("Testing TwitterMCPReader basic functionality...")
from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
reader = TwitterMCPReader("twitter-mcp-server")
assert reader.mcp_server_command == "twitter-mcp-server"
assert reader.include_tweet_content
assert reader.max_bookmarks == 1000
# Test bookmark formatting
bookmark = {
"text": "Amazing article about the future of AI! Must read for everyone interested in tech.",
"author": "tech_guru",
"created_at": "2024-01-15T14:30:00Z",
"url": "https://twitter.com/tech_guru/status/123456789",
"likes": 156,
"retweets": 42,
"replies": 23,
"hashtags": ["AI", "tech", "future"],
"mentions": ["@openai", "@anthropic"],
}
formatted = reader._format_bookmark(bookmark)
assert "=== Twitter Bookmark ===" in formatted
assert "Author: @tech_guru" in formatted
assert "Amazing article about the future of AI!" in formatted
assert "Likes: 156" in formatted
assert "Retweets: 42" in formatted
assert "Hashtags: AI, tech, future" in formatted
assert "Mentions: @openai, @anthropic" in formatted
# Test with minimal data
simple_bookmark = {"text": "Short tweet", "author": "user123"}
formatted_simple = reader._format_bookmark(simple_bookmark)
assert "=== Twitter Bookmark ===" in formatted_simple
assert "Short tweet" in formatted_simple
assert "Author: @user123" in formatted_simple
print("✅ TwitterMCPReader basic tests passed")
def test_mcp_request_format():
"""Test MCP request formatting."""
print("Testing MCP request formatting...")
# Test initialization request format
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "leann-slack-reader", "version": "1.0.0"},
},
}
# Verify it's valid JSON
json_str = json.dumps(init_request)
parsed = json.loads(json_str)
assert parsed["jsonrpc"] == "2.0"
assert parsed["method"] == "initialize"
assert parsed["params"]["protocolVersion"] == "2024-11-05"
# Test tools/list request
list_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
json_str = json.dumps(list_request)
parsed = json.loads(json_str)
assert parsed["method"] == "tools/list"
print("✅ MCP request formatting tests passed")
def test_data_processing():
"""Test data processing capabilities."""
print("Testing data processing capabilities...")
from apps.slack_data.slack_mcp_reader import SlackMCPReader
from apps.twitter_data.twitter_mcp_reader import TwitterMCPReader
# Test Slack message processing with various formats
slack_reader = SlackMCPReader("test-server")
messages_with_timestamps = [
{"text": "Meeting in 5 minutes", "user": "alice", "ts": "1000.123"},
{"text": "On my way!", "user": "bob", "ts": "1001.456"},
{"text": "Starting now", "user": "charlie", "ts": "1002.789"},
]
content = slack_reader._create_concatenated_content(messages_with_timestamps, "meetings")
assert "Meeting in 5 minutes" in content
assert "On my way!" in content
assert "Starting now" in content
# Test Twitter bookmark processing with engagement data
twitter_reader = TwitterMCPReader("test-server", include_metadata=True)
high_engagement_bookmark = {
"text": "Thread about startup lessons learned 🧵",
"author": "startup_founder",
"likes": 1250,
"retweets": 340,
"replies": 89,
}
formatted = twitter_reader._format_bookmark(high_engagement_bookmark)
assert "Thread about startup lessons learned" in formatted
assert "Likes: 1250" in formatted
assert "Retweets: 340" in formatted
assert "Replies: 89" in formatted
# Test with metadata disabled
twitter_reader_no_meta = TwitterMCPReader("test-server", include_metadata=False)
formatted_no_meta = twitter_reader_no_meta._format_bookmark(high_engagement_bookmark)
assert "Thread about startup lessons learned" in formatted_no_meta
assert "Likes:" not in formatted_no_meta
assert "Retweets:" not in formatted_no_meta
print("✅ Data processing tests passed")
def main():
"""Run all standalone tests."""
print("🧪 Running MCP Integration Standalone Tests")
print("=" * 60)
print("Testing core functionality without LEANN dependencies...")
print()
try:
test_slack_reader_basic()
test_twitter_reader_basic()
test_mcp_request_format()
test_data_processing()
print("\n" + "=" * 60)
print("🎉 All standalone tests passed!")
print("\n✨ MCP Integration Summary:")
print("- SlackMCPReader: Ready for Slack message processing")
print("- TwitterMCPReader: Ready for Twitter bookmark processing")
print("- MCP Protocol: Properly formatted JSON-RPC requests")
print("- Data Processing: Handles various message/bookmark formats")
print("\n🚀 Next Steps:")
print("1. Install MCP servers: npm install -g slack-mcp-server twitter-mcp-server")
print("2. Configure API credentials for Slack and Twitter")
print("3. Test connections: python -m apps.slack_rag --test-connection")
print("4. Start indexing live data from your platforms!")
print("\n📖 Documentation:")
print("- Check README.md for detailed setup instructions")
print("- Run examples/mcp_integration_demo.py for usage examples")
print("- Explore apps/slack_rag.py and apps/twitter_rag.py for implementation details")
except Exception as e:
print(f"\n❌ Test failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()