Compare commits
11 Commits
fix/twitte
...
feature/op
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c8801480d | ||
|
|
d226f72bc0 | ||
|
|
45b87ce128 | ||
|
|
585ef7785d | ||
|
|
abf312d998 | ||
|
|
5073f312b6 | ||
|
|
76e16338ca | ||
|
|
d6a3c2821c | ||
|
|
ab251ab751 | ||
|
|
28085f6f04 | ||
|
|
6495833887 |
21
README.md
21
README.md
@@ -781,7 +781,7 @@ Once your iMessage conversations are indexed, you can search with queries like:
|
||||
|
||||
### MCP Integration: RAG on Live Data from Any Platform
|
||||
|
||||
**NEW!** Connect to live data sources through the Model Context Protocol (MCP). LEANN now supports real-time RAG on platforms like Slack, Twitter, and more through standardized MCP servers.
|
||||
Connect to live data sources through the Model Context Protocol (MCP). LEANN now supports real-time RAG on platforms like Slack, Twitter, and more through standardized MCP servers.
|
||||
|
||||
**Key Benefits:**
|
||||
- **Live Data Access**: Fetch real-time data without manual exports
|
||||
@@ -805,18 +805,17 @@ python -m apps.slack_rag \
|
||||
--query "What did we decide about the product launch?"
|
||||
```
|
||||
|
||||
**Setup Requirements:**
|
||||
**📖 Comprehensive Setup Guide**: For detailed setup instructions, troubleshooting common issues (like "users cache is not ready yet"), and advanced configuration options, see our [**Slack Setup Guide**](docs/slack-setup-guide.md).
|
||||
|
||||
**Quick Setup:**
|
||||
1. Install a Slack MCP server (e.g., `npm install -g slack-mcp-server`)
|
||||
2. Create a Slack App and get API credentials:
|
||||
- Go to [api.slack.com/apps](https://api.slack.com/apps) and create a new app
|
||||
- Under "OAuth & Permissions", add these Bot Token Scopes: `channels:read`, `channels:history`, `groups:read`, `groups:history`, `im:read`, `im:history`, `mpim:read`, `mpim:history`
|
||||
- Install the app to your workspace and copy the "Bot User OAuth Token" (starts with `xoxb-`)
|
||||
- Under "App-Level Tokens", create a token with `connections:write` scope (starts with `xapp-`)
|
||||
2. Create a Slack App and get API credentials (see detailed guide above)
|
||||
3. Set environment variables:
|
||||
```bash
|
||||
export SLACK_BOT_TOKEN="xoxb-your-bot-token"
|
||||
export SLACK_APP_TOKEN="xapp-your-app-token"
|
||||
export SLACK_APP_TOKEN="xapp-your-app-token" # Optional
|
||||
```
|
||||
3. Test connection with `--test-connection` flag
|
||||
4. Test connection with `--test-connection` flag
|
||||
|
||||
**Arguments:**
|
||||
- `--mcp-server`: Command to start the Slack MCP server
|
||||
@@ -824,6 +823,8 @@ python -m apps.slack_rag \
|
||||
- `--channels`: Specific channels to index (optional)
|
||||
- `--concatenate-conversations`: Group messages by channel (default: true)
|
||||
- `--max-messages-per-channel`: Limit messages per channel (default: 100)
|
||||
- `--max-retries`: Maximum retries for cache sync issues (default: 5)
|
||||
- `--retry-delay`: Initial delay between retries in seconds (default: 2.0)
|
||||
|
||||
#### 🐦 Twitter Bookmarks: Your Personal Tweet Library
|
||||
|
||||
@@ -925,7 +926,7 @@ Want to add support for other platforms? LEANN's MCP integration is designed for
|
||||
### 🚀 Claude Code Integration: Transform Your Development Workflow!
|
||||
|
||||
<details>
|
||||
<summary><strong>NEW!! AST‑Aware Code Chunking</strong></summary>
|
||||
<summary><strong>AST‑Aware Code Chunking</strong></summary>
|
||||
|
||||
LEANN features intelligent code chunking that preserves semantic boundaries (functions, classes, methods) for Python, Java, C#, and TypeScript, improving code understanding compared to text-based chunking.
|
||||
|
||||
|
||||
@@ -10,9 +10,39 @@ from typing import Any
|
||||
|
||||
import dotenv
|
||||
from leann.api import LeannBuilder, LeannChat
|
||||
from leann.interactive_utils import create_rag_session
|
||||
|
||||
# Optional import: older PyPI builds may not include interactive_utils
|
||||
try:
|
||||
from leann.interactive_utils import create_rag_session
|
||||
except ImportError:
|
||||
|
||||
def create_rag_session(app_name: str, data_description: str):
|
||||
class _SimpleSession:
|
||||
def run_interactive_loop(self, handler):
|
||||
print(f"Interactive session for {app_name}: {data_description}")
|
||||
print("Interactive mode not available in this build")
|
||||
|
||||
return _SimpleSession()
|
||||
|
||||
|
||||
from leann.registry import register_project_directory
|
||||
from leann.settings import resolve_ollama_host, resolve_openai_api_key, resolve_openai_base_url
|
||||
|
||||
# Optional import: older PyPI builds may not include settings
|
||||
try:
|
||||
from leann.settings import resolve_ollama_host, resolve_openai_api_key, resolve_openai_base_url
|
||||
except ImportError:
|
||||
# Minimal fallbacks if settings helpers are unavailable
|
||||
import os
|
||||
|
||||
def resolve_ollama_host(value: str | None) -> str | None:
|
||||
return value or os.getenv("LEANN_OLLAMA_HOST") or os.getenv("OLLAMA_HOST")
|
||||
|
||||
def resolve_openai_api_key(value: str | None) -> str | None:
|
||||
return value or os.getenv("OPENAI_API_KEY")
|
||||
|
||||
def resolve_openai_base_url(value: str | None) -> str | None:
|
||||
return value or os.getenv("OPENAI_BASE_URL")
|
||||
|
||||
|
||||
dotenv.load_dotenv()
|
||||
|
||||
|
||||
@@ -29,6 +29,8 @@ class SlackMCPReader:
|
||||
workspace_name: Optional[str] = None,
|
||||
concatenate_conversations: bool = True,
|
||||
max_messages_per_conversation: int = 100,
|
||||
max_retries: int = 5,
|
||||
retry_delay: float = 2.0,
|
||||
):
|
||||
"""
|
||||
Initialize the Slack MCP Reader.
|
||||
@@ -38,11 +40,15 @@ class SlackMCPReader:
|
||||
workspace_name: Optional workspace name to filter messages
|
||||
concatenate_conversations: Whether to group messages by channel/thread
|
||||
max_messages_per_conversation: Maximum messages to include per conversation
|
||||
max_retries: Maximum number of retries for failed operations
|
||||
retry_delay: Initial delay between retries in seconds
|
||||
"""
|
||||
self.mcp_server_command = mcp_server_command
|
||||
self.workspace_name = workspace_name
|
||||
self.concatenate_conversations = concatenate_conversations
|
||||
self.max_messages_per_conversation = max_messages_per_conversation
|
||||
self.max_retries = max_retries
|
||||
self.retry_delay = retry_delay
|
||||
self.mcp_process = None
|
||||
|
||||
async def start_mcp_server(self):
|
||||
@@ -110,11 +116,73 @@ class SlackMCPReader:
|
||||
|
||||
return response.get("result", {}).get("tools", [])
|
||||
|
||||
def _is_cache_sync_error(self, error: dict) -> bool:
|
||||
"""Check if the error is related to users cache not being ready."""
|
||||
if isinstance(error, dict):
|
||||
message = error.get("message", "").lower()
|
||||
return (
|
||||
"users cache is not ready" in message or "sync process is still running" in message
|
||||
)
|
||||
return False
|
||||
|
||||
async def _retry_with_backoff(self, func, *args, **kwargs):
|
||||
"""Retry a function with exponential backoff, especially for cache sync issues."""
|
||||
last_exception = None
|
||||
|
||||
for attempt in range(self.max_retries + 1):
|
||||
try:
|
||||
return await func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
last_exception = e
|
||||
|
||||
# Check if this is a cache sync error
|
||||
error_dict = {}
|
||||
if hasattr(e, "args") and e.args and isinstance(e.args[0], dict):
|
||||
error_dict = e.args[0]
|
||||
elif "Failed to fetch messages" in str(e):
|
||||
# Try to extract error from the exception message
|
||||
import re
|
||||
|
||||
match = re.search(r"'error':\s*(\{[^}]+\})", str(e))
|
||||
if match:
|
||||
try:
|
||||
error_dict = eval(match.group(1))
|
||||
except (ValueError, SyntaxError, NameError):
|
||||
pass
|
||||
else:
|
||||
# Try alternative format
|
||||
match = re.search(r"Failed to fetch messages:\s*(\{[^}]+\})", str(e))
|
||||
if match:
|
||||
try:
|
||||
error_dict = eval(match.group(1))
|
||||
except (ValueError, SyntaxError, NameError):
|
||||
pass
|
||||
|
||||
if self._is_cache_sync_error(error_dict):
|
||||
if attempt < self.max_retries:
|
||||
delay = self.retry_delay * (2**attempt) # Exponential backoff
|
||||
logger.info(
|
||||
f"Cache sync not ready, waiting {delay:.1f}s before retry {attempt + 1}/{self.max_retries}"
|
||||
)
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
logger.warning(
|
||||
f"Cache sync still not ready after {self.max_retries} retries, giving up"
|
||||
)
|
||||
break
|
||||
else:
|
||||
# Not a cache sync error, don't retry
|
||||
break
|
||||
|
||||
# If we get here, all retries failed or it's not a retryable error
|
||||
raise last_exception
|
||||
|
||||
async def fetch_slack_messages(
|
||||
self, channel: Optional[str] = None, limit: int = 100
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Fetch Slack messages using MCP tools.
|
||||
Fetch Slack messages using MCP tools with retry logic for cache sync issues.
|
||||
|
||||
Args:
|
||||
channel: Optional channel name to filter messages
|
||||
@@ -123,32 +191,59 @@ class SlackMCPReader:
|
||||
Returns:
|
||||
List of message dictionaries
|
||||
"""
|
||||
return await self._retry_with_backoff(self._fetch_slack_messages_impl, channel, limit)
|
||||
|
||||
async def _fetch_slack_messages_impl(
|
||||
self, channel: Optional[str] = None, limit: int = 100
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Internal implementation of fetch_slack_messages without retry logic.
|
||||
"""
|
||||
# This is a generic implementation - specific MCP servers may have different tool names
|
||||
# Common tool names might be: 'get_messages', 'list_messages', 'fetch_channel_history'
|
||||
|
||||
tools = await self.list_available_tools()
|
||||
logger.info(f"Available tools: {[tool.get('name') for tool in tools]}")
|
||||
message_tool = None
|
||||
|
||||
# Look for a tool that can fetch messages
|
||||
# Look for a tool that can fetch messages - prioritize conversations_history
|
||||
message_tool = None
|
||||
|
||||
# First, try to find conversations_history specifically
|
||||
for tool in tools:
|
||||
tool_name = tool.get("name", "").lower()
|
||||
if any(
|
||||
keyword in tool_name
|
||||
for keyword in ["message", "history", "channel", "conversation"]
|
||||
):
|
||||
if "conversations_history" in tool_name:
|
||||
message_tool = tool
|
||||
logger.info(f"Found conversations_history tool: {tool}")
|
||||
break
|
||||
|
||||
# If not found, look for other message-fetching tools
|
||||
if not message_tool:
|
||||
for tool in tools:
|
||||
tool_name = tool.get("name", "").lower()
|
||||
if any(
|
||||
keyword in tool_name
|
||||
for keyword in ["conversations_search", "message", "history"]
|
||||
):
|
||||
message_tool = tool
|
||||
break
|
||||
|
||||
if not message_tool:
|
||||
raise RuntimeError("No message fetching tool found in MCP server")
|
||||
|
||||
# Prepare tool call parameters
|
||||
tool_params = {"limit": limit}
|
||||
tool_params = {"limit": "180d"} # Use 180 days to get older messages
|
||||
if channel:
|
||||
# Try common parameter names for channel specification
|
||||
for param_name in ["channel", "channel_id", "channel_name"]:
|
||||
tool_params[param_name] = channel
|
||||
break
|
||||
# For conversations_history, use channel_id parameter
|
||||
if message_tool["name"] == "conversations_history":
|
||||
tool_params["channel_id"] = channel
|
||||
else:
|
||||
# Try common parameter names for channel specification
|
||||
for param_name in ["channel", "channel_id", "channel_name"]:
|
||||
tool_params[param_name] = channel
|
||||
break
|
||||
|
||||
logger.info(f"Tool parameters: {tool_params}")
|
||||
|
||||
fetch_request = {
|
||||
"jsonrpc": "2.0",
|
||||
@@ -170,8 +265,8 @@ class SlackMCPReader:
|
||||
try:
|
||||
messages = json.loads(content["text"])
|
||||
except json.JSONDecodeError:
|
||||
# If not JSON, treat as plain text
|
||||
messages = [{"text": content["text"], "channel": channel or "unknown"}]
|
||||
# If not JSON, try to parse as CSV format (Slack MCP server format)
|
||||
messages = self._parse_csv_messages(content["text"], channel)
|
||||
else:
|
||||
messages = result["content"]
|
||||
else:
|
||||
@@ -180,6 +275,56 @@ class SlackMCPReader:
|
||||
|
||||
return messages if isinstance(messages, list) else [messages]
|
||||
|
||||
def _parse_csv_messages(self, csv_text: str, channel: str) -> list[dict[str, Any]]:
|
||||
"""Parse CSV format messages from Slack MCP server."""
|
||||
import csv
|
||||
import io
|
||||
|
||||
messages = []
|
||||
try:
|
||||
# Split by lines and process each line as a CSV row
|
||||
lines = csv_text.strip().split("\n")
|
||||
if not lines:
|
||||
return messages
|
||||
|
||||
# Skip header line if it exists
|
||||
start_idx = 0
|
||||
if lines[0].startswith("MsgID,UserID,UserName"):
|
||||
start_idx = 1
|
||||
|
||||
for line in lines[start_idx:]:
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
# Parse CSV line
|
||||
reader = csv.reader(io.StringIO(line))
|
||||
try:
|
||||
row = next(reader)
|
||||
if len(row) >= 7: # Ensure we have enough columns
|
||||
message = {
|
||||
"ts": row[0],
|
||||
"user": row[1],
|
||||
"username": row[2],
|
||||
"real_name": row[3],
|
||||
"channel": row[4],
|
||||
"thread_ts": row[5],
|
||||
"text": row[6],
|
||||
"time": row[7] if len(row) > 7 else "",
|
||||
"reactions": row[8] if len(row) > 8 else "",
|
||||
"cursor": row[9] if len(row) > 9 else "",
|
||||
}
|
||||
messages.append(message)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse CSV line: {line[:100]}... Error: {e}")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse CSV messages: {e}")
|
||||
# Fallback: treat entire text as one message
|
||||
messages = [{"text": csv_text, "channel": channel or "unknown"}]
|
||||
|
||||
return messages
|
||||
|
||||
def _format_message(self, message: dict[str, Any]) -> str:
|
||||
"""Format a single message for indexing."""
|
||||
text = message.get("text", "")
|
||||
@@ -251,6 +396,40 @@ class SlackMCPReader:
|
||||
|
||||
return "\n".join(content_parts)
|
||||
|
||||
async def get_all_channels(self) -> list[str]:
|
||||
"""Get list of all available channels."""
|
||||
try:
|
||||
channels_list_request = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": 4,
|
||||
"method": "tools/call",
|
||||
"params": {"name": "channels_list", "arguments": {}},
|
||||
}
|
||||
channels_response = await self.send_mcp_request(channels_list_request)
|
||||
if "result" in channels_response:
|
||||
result = channels_response["result"]
|
||||
if "content" in result and isinstance(result["content"], list):
|
||||
content = result["content"][0] if result["content"] else {}
|
||||
if "text" in content:
|
||||
# Parse the channels from the response
|
||||
channels = []
|
||||
lines = content["text"].split("\n")
|
||||
for line in lines:
|
||||
if line.strip() and ("#" in line or "C" in line[:10]):
|
||||
# Extract channel ID or name
|
||||
parts = line.split()
|
||||
for part in parts:
|
||||
if part.startswith("C") and len(part) > 5:
|
||||
channels.append(part)
|
||||
elif part.startswith("#"):
|
||||
channels.append(part[1:]) # Remove #
|
||||
logger.info(f"Found {len(channels)} channels: {channels}")
|
||||
return channels
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get channels list: {e}")
|
||||
return []
|
||||
|
||||
async def read_slack_data(self, channels: Optional[list[str]] = None) -> list[str]:
|
||||
"""
|
||||
Read Slack data and return formatted text chunks.
|
||||
@@ -287,36 +466,33 @@ class SlackMCPReader:
|
||||
logger.warning(f"Failed to fetch messages from channel {channel}: {e}")
|
||||
continue
|
||||
else:
|
||||
# Fetch from all available channels/conversations
|
||||
# This is a simplified approach - real implementation would need to
|
||||
# discover available channels first
|
||||
try:
|
||||
messages = await self.fetch_slack_messages(limit=1000)
|
||||
if messages:
|
||||
# Group messages by channel if concatenating
|
||||
if self.concatenate_conversations:
|
||||
channel_messages = {}
|
||||
for message in messages:
|
||||
channel = message.get(
|
||||
"channel", message.get("channel_name", "general")
|
||||
)
|
||||
if channel not in channel_messages:
|
||||
channel_messages[channel] = []
|
||||
channel_messages[channel].append(message)
|
||||
# Fetch from all available channels
|
||||
logger.info("Fetching from all available channels...")
|
||||
all_channels = await self.get_all_channels()
|
||||
|
||||
# Create concatenated content for each channel
|
||||
for channel, msgs in channel_messages.items():
|
||||
text_content = self._create_concatenated_content(msgs, channel)
|
||||
if not all_channels:
|
||||
# Fallback to common channel names if we can't get the list
|
||||
all_channels = ["general", "random", "announcements", "C0GN5BX0F"]
|
||||
logger.info(f"Using fallback channels: {all_channels}")
|
||||
|
||||
for channel in all_channels:
|
||||
try:
|
||||
logger.info(f"Searching channel: {channel}")
|
||||
messages = await self.fetch_slack_messages(channel=channel, limit=1000)
|
||||
if messages:
|
||||
if self.concatenate_conversations:
|
||||
text_content = self._create_concatenated_content(messages, channel)
|
||||
if text_content.strip():
|
||||
all_texts.append(text_content)
|
||||
else:
|
||||
# Process individual messages
|
||||
for message in messages:
|
||||
formatted_msg = self._format_message(message)
|
||||
if formatted_msg.strip():
|
||||
all_texts.append(formatted_msg)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch messages: {e}")
|
||||
else:
|
||||
# Process individual messages
|
||||
for message in messages:
|
||||
formatted_msg = self._format_message(message)
|
||||
if formatted_msg.strip():
|
||||
all_texts.append(formatted_msg)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch messages from channel {channel}: {e}")
|
||||
continue
|
||||
|
||||
return all_texts
|
||||
|
||||
|
||||
@@ -78,6 +78,20 @@ class SlackMCPRAG(BaseRAGExample):
|
||||
help="Test MCP server connection and list available tools without indexing",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--max-retries",
|
||||
type=int,
|
||||
default=5,
|
||||
help="Maximum number of retries for failed operations (default: 5)",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--retry-delay",
|
||||
type=float,
|
||||
default=2.0,
|
||||
help="Initial delay between retries in seconds (default: 2.0)",
|
||||
)
|
||||
|
||||
async def test_mcp_connection(self, args) -> bool:
|
||||
"""Test the MCP server connection and display available tools."""
|
||||
print(f"Testing connection to MCP server: {args.mcp_server}")
|
||||
@@ -88,12 +102,14 @@ class SlackMCPRAG(BaseRAGExample):
|
||||
workspace_name=args.workspace_name,
|
||||
concatenate_conversations=not args.no_concatenate_conversations,
|
||||
max_messages_per_conversation=args.max_messages_per_channel,
|
||||
max_retries=args.max_retries,
|
||||
retry_delay=args.retry_delay,
|
||||
)
|
||||
|
||||
async with reader:
|
||||
tools = await reader.list_available_tools()
|
||||
|
||||
print("\n✅ Successfully connected to MCP server!")
|
||||
print("Successfully connected to MCP server!")
|
||||
print(f"Available tools ({len(tools)}):")
|
||||
|
||||
for i, tool in enumerate(tools, 1):
|
||||
@@ -115,7 +131,7 @@ class SlackMCPRAG(BaseRAGExample):
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Failed to connect to MCP server: {e}")
|
||||
print(f"Failed to connect to MCP server: {e}")
|
||||
print("\nTroubleshooting tips:")
|
||||
print("1. Make sure the MCP server is installed and accessible")
|
||||
print("2. Check if the server command is correct")
|
||||
@@ -130,8 +146,11 @@ class SlackMCPRAG(BaseRAGExample):
|
||||
if args.workspace_name:
|
||||
print(f"Workspace: {args.workspace_name}")
|
||||
|
||||
if args.channels:
|
||||
print(f"Channels: {', '.join(args.channels)}")
|
||||
# Filter out empty strings from channels
|
||||
channels = [ch for ch in args.channels if ch.strip()] if args.channels else None
|
||||
|
||||
if channels:
|
||||
print(f"Channels: {', '.join(channels)}")
|
||||
else:
|
||||
print("Fetching from all available channels")
|
||||
|
||||
@@ -146,18 +165,20 @@ class SlackMCPRAG(BaseRAGExample):
|
||||
workspace_name=args.workspace_name,
|
||||
concatenate_conversations=concatenate,
|
||||
max_messages_per_conversation=args.max_messages_per_channel,
|
||||
max_retries=args.max_retries,
|
||||
retry_delay=args.retry_delay,
|
||||
)
|
||||
|
||||
texts = await reader.read_slack_data(channels=args.channels)
|
||||
texts = await reader.read_slack_data(channels=channels)
|
||||
|
||||
if not texts:
|
||||
print("❌ No messages found! This could mean:")
|
||||
print("No messages found! This could mean:")
|
||||
print("- The MCP server couldn't fetch messages")
|
||||
print("- The specified channels don't exist or are empty")
|
||||
print("- Authentication issues with the Slack workspace")
|
||||
return []
|
||||
|
||||
print(f"✅ Successfully loaded {len(texts)} text chunks from Slack")
|
||||
print(f"Successfully loaded {len(texts)} text chunks from Slack")
|
||||
|
||||
# Show sample of what was loaded
|
||||
if texts:
|
||||
@@ -170,7 +191,7 @@ class SlackMCPRAG(BaseRAGExample):
|
||||
return texts
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error loading Slack data: {e}")
|
||||
print(f"Error loading Slack data: {e}")
|
||||
print("\nThis might be due to:")
|
||||
print("- MCP server connection issues")
|
||||
print("- Authentication problems")
|
||||
@@ -188,7 +209,7 @@ class SlackMCPRAG(BaseRAGExample):
|
||||
if not success:
|
||||
return
|
||||
print(
|
||||
"\n🎉 MCP server is working! You can now run without --test-connection to start indexing."
|
||||
"MCP server is working! You can now run without --test-connection to start indexing."
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
395
docs/slack-setup-guide.md
Normal file
395
docs/slack-setup-guide.md
Normal file
@@ -0,0 +1,395 @@
|
||||
# Slack Integration Setup Guide
|
||||
|
||||
This guide provides step-by-step instructions for setting up Slack integration with LEANN.
|
||||
|
||||
## Overview
|
||||
|
||||
LEANN's Slack integration uses MCP (Model Context Protocol) servers to fetch and index your Slack messages for RAG (Retrieval-Augmented Generation). This allows you to search through your Slack conversations using natural language queries.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. **Slack Workspace Access**: You need admin or owner permissions in your Slack workspace to create apps and configure OAuth tokens.
|
||||
|
||||
2. **Slack MCP Server**: Install a Slack MCP server (e.g., `slack-mcp-server` via npm)
|
||||
|
||||
3. **LEANN**: Ensure you have LEANN installed and working
|
||||
|
||||
## Step 1: Create a Slack App
|
||||
|
||||
### 1.1 Go to Slack API Dashboard
|
||||
|
||||
1. Visit [https://api.slack.com/apps](https://api.slack.com/apps)
|
||||
2. Click **"Create New App"**
|
||||
3. Choose **"From scratch"**
|
||||
4. Enter your app name (e.g., "LEANN Slack Integration")
|
||||
5. Select your workspace
|
||||
6. Click **"Create App"**
|
||||
|
||||
### 1.2 Configure App Permissions
|
||||
|
||||
#### Token Scopes
|
||||
|
||||
1. In your app dashboard, go to **"OAuth & Permissions"** in the left sidebar
|
||||
2. Scroll down to **"Scopes"** section
|
||||
3. Under **"Bot Token Scopes & OAuth Scope"**, click **"Add an OAuth Scope"**
|
||||
4. Add the following scopes:
|
||||
- `channels:read` - Read public channel information
|
||||
- `channels:history` - Read messages in public channels
|
||||
- `groups:read` - Read private channel information
|
||||
- `groups:history` - Read messages in private channels
|
||||
- `im:read` - Read direct message information
|
||||
- `im:history` - Read direct messages
|
||||
- `mpim:read` - Read group direct message information
|
||||
- `mpim:history` - Read group direct messages
|
||||
- `users:read` - Read user information
|
||||
- `team:read` - Read workspace information
|
||||
|
||||
#### App-Level Tokens (Optional)
|
||||
|
||||
Some MCP servers may require app-level tokens:
|
||||
|
||||
1. Go to **"Basic Information"** in the left sidebar
|
||||
2. Scroll down to **"App-Level Tokens"**
|
||||
3. Click **"Generate Token and Scopes"**
|
||||
4. Enter a name (e.g., "LEANN Integration")
|
||||
5. Add the `connections:write` scope
|
||||
6. Click **"Generate"**
|
||||
7. Copy the token (starts with `xapp-`)
|
||||
|
||||
### 1.3 Install App to Workspace
|
||||
|
||||
1. Go to **"OAuth & Permissions"** in the left sidebar
|
||||
2. Click **"Install to Workspace"**
|
||||
3. Review the permissions and click **"Allow"**
|
||||
4. Copy the **"Bot User OAuth Token"** (starts with `xoxb-`)
|
||||
5. Copy the **"User OAuth Token"** (starts with `xoxp-`)
|
||||
|
||||
## Step 2: Install Slack MCP Server
|
||||
|
||||
### Option A: Using npm (Recommended)
|
||||
|
||||
```bash
|
||||
# Install globally
|
||||
npm install -g slack-mcp-server
|
||||
|
||||
# Or install locally
|
||||
npm install slack-mcp-server
|
||||
```
|
||||
|
||||
### Option B: Using npx (No installation required)
|
||||
|
||||
```bash
|
||||
# Use directly without installation
|
||||
npx slack-mcp-server
|
||||
```
|
||||
|
||||
## Step 3: Install and Configure Ollama (for Real LLM Responses)
|
||||
|
||||
### 3.1 Install Ollama
|
||||
|
||||
```bash
|
||||
# Install Ollama using Homebrew (macOS)
|
||||
brew install ollama
|
||||
|
||||
# Or download from https://ollama.ai/
|
||||
```
|
||||
|
||||
### 3.2 Start Ollama Service
|
||||
|
||||
```bash
|
||||
# Start Ollama as a service
|
||||
brew services start ollama
|
||||
|
||||
# Or start manually
|
||||
ollama serve
|
||||
```
|
||||
|
||||
### 3.3 Pull a Model
|
||||
|
||||
```bash
|
||||
# Pull a lightweight model for testing
|
||||
ollama pull llama3.2:1b
|
||||
|
||||
# Verify the model is available
|
||||
ollama list
|
||||
```
|
||||
|
||||
## Step 4: Configure Environment Variables
|
||||
|
||||
Create a `.env` file or set environment variables:
|
||||
|
||||
```bash
|
||||
# Required: User OAuth Token
|
||||
SLACK_OAUTH_TOKEN=xoxp-your-user-oauth-token-here
|
||||
|
||||
# Optional: App-Level Token (if your MCP server requires it)
|
||||
SLACK_APP_TOKEN=xapp-your-app-token-here
|
||||
|
||||
# Optional: Workspace-specific settings
|
||||
SLACK_WORKSPACE_ID=T1234567890 # Your workspace ID (optional)
|
||||
```
|
||||
|
||||
## Step 5: Test the Setup
|
||||
|
||||
### 5.1 Test MCP Server Connection
|
||||
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--test-connection \
|
||||
--workspace-name "Your Workspace Name"
|
||||
```
|
||||
|
||||
This will test the connection and list available tools without indexing any data.
|
||||
|
||||
### 5.2 Index a Specific Channel
|
||||
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--workspace-name "Your Workspace Name" \
|
||||
--channels general \
|
||||
--query "What did we discuss about the project?"
|
||||
```
|
||||
|
||||
### 5.3 Real RAG Query Examples
|
||||
|
||||
This section demonstrates successful Slack RAG integration queries against the Sky Lab Computing workspace's "random" channel. The system successfully retrieves actual conversation messages and performs semantic search with high relevance scores, including finding specific research paper announcements and technical discussions.
|
||||
|
||||
### Example 1: Advisor Models Query
|
||||
|
||||
**Query:** "train black-box models to adopt to your personal data"
|
||||
|
||||
This query demonstrates the system's ability to find specific research announcements about training black-box models for personal data adaptation.
|
||||
|
||||

|
||||
|
||||

|
||||
|
||||

|
||||
|
||||
### Example 2: Barbarians at the Gate Query
|
||||
|
||||
**Query:** "AI-driven research systems ADRS"
|
||||
|
||||
This query demonstrates the system's ability to find specific research announcements about AI-driven research systems and algorithm discovery.
|
||||
|
||||

|
||||
|
||||

|
||||
|
||||

|
||||
|
||||
### Prerequisites
|
||||
|
||||
- Bot is installed in the Sky Lab Computing workspace and invited to the target channel (run `/invite @YourBotName` in the channel if needed)
|
||||
- Bot token available and exported in the same terminal session
|
||||
|
||||
### Commands
|
||||
|
||||
1) Set the workspace token for this shell
|
||||
|
||||
```bash
|
||||
export SLACK_MCP_XOXP_TOKEN="xoxp-***-redacted-***"
|
||||
```
|
||||
|
||||
2) Run queries against the "random" channel by channel ID (C0GN5BX0F)
|
||||
|
||||
**Advisor Models Query:**
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--workspace-name "Sky Lab Computing" \
|
||||
--channels C0GN5BX0F \
|
||||
--max-messages-per-channel 100000 \
|
||||
--query "train black-box models to adopt to your personal data" \
|
||||
--llm ollama \
|
||||
--llm-model "llama3.2:1b" \
|
||||
--llm-host "http://localhost:11434" \
|
||||
--no-concatenate-conversations
|
||||
```
|
||||
|
||||
**Barbarians at the Gate Query:**
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--workspace-name "Sky Lab Computing" \
|
||||
--channels C0GN5BX0F \
|
||||
--max-messages-per-channel 100000 \
|
||||
--query "AI-driven research systems ADRS" \
|
||||
--llm ollama \
|
||||
--llm-model "llama3.2:1b" \
|
||||
--llm-host "http://localhost:11434" \
|
||||
--no-concatenate-conversations
|
||||
```
|
||||
|
||||
These examples demonstrate the system's ability to find and retrieve specific research announcements and technical discussions from the conversation history, showcasing the power of semantic search in Slack data.
|
||||
|
||||
3) Optional: Ask a broader question
|
||||
|
||||
```bash
|
||||
python test_channel_by_id_or_name.py \
|
||||
--channel-id C0GN5BX0F \
|
||||
--workspace-name "Sky Lab Computing" \
|
||||
--query "What is LEANN about?"
|
||||
```
|
||||
|
||||
Notes:
|
||||
- If you see `not_in_channel`, invite the bot to the channel and re-run.
|
||||
- If you see `channel_not_found`, confirm the channel ID and workspace.
|
||||
- Deep search via server-side “search” tools may require additional Slack scopes; the example above performs client-side filtering over retrieved history.
|
||||
|
||||
## Common Issues and Solutions
|
||||
|
||||
### Issue 1: "users cache is not ready yet" Error
|
||||
|
||||
**Problem**: You see this warning:
|
||||
```
|
||||
WARNING - Failed to fetch messages from channel random: Failed to fetch messages: {'code': -32603, 'message': 'users cache is not ready yet, sync process is still running... please wait'}
|
||||
```
|
||||
|
||||
**Solution**: This is a common timing issue. The LEANN integration now includes automatic retry logic:
|
||||
|
||||
1. **Wait and Retry**: The system will automatically retry with exponential backoff (2s, 4s, 8s, etc.)
|
||||
2. **Increase Retry Parameters**: If needed, you can customize retry behavior:
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--max-retries 10 \
|
||||
--retry-delay 3.0 \
|
||||
--channels general \
|
||||
--query "Your query here"
|
||||
```
|
||||
3. **Keep MCP Server Running**: Start the MCP server separately and keep it running:
|
||||
```bash
|
||||
# Terminal 1: Start MCP server
|
||||
slack-mcp-server
|
||||
|
||||
# Terminal 2: Run LEANN (it will connect to the running server)
|
||||
python -m apps.slack_rag --mcp-server "slack-mcp-server" --channels general --query "test"
|
||||
```
|
||||
|
||||
### Issue 2: "No message fetching tool found"
|
||||
|
||||
**Problem**: The MCP server doesn't have the expected tools.
|
||||
|
||||
**Solution**:
|
||||
1. Check if your MCP server is properly installed and configured
|
||||
2. Verify your Slack tokens are correct
|
||||
3. Try a different MCP server implementation
|
||||
4. Check the MCP server documentation for required configuration
|
||||
|
||||
### Issue 3: Permission Denied Errors
|
||||
|
||||
**Problem**: You get permission errors when trying to access channels.
|
||||
|
||||
**Solutions**:
|
||||
1. **Check Bot Permissions**: Ensure your bot has been added to the channels you want to access
|
||||
2. **Verify Token Scopes**: Make sure you have all required scopes configured
|
||||
3. **Channel Access**: For private channels, the bot needs to be explicitly invited
|
||||
4. **Workspace Permissions**: Ensure your Slack app has the necessary workspace permissions
|
||||
|
||||
### Issue 4: Empty Results
|
||||
|
||||
**Problem**: No messages are returned even though the channel has messages.
|
||||
|
||||
**Solutions**:
|
||||
1. **Check Channel Names**: Ensure channel names are correct (without the # symbol)
|
||||
2. **Verify Bot Access**: Make sure the bot can access the channels
|
||||
3. **Check Date Ranges**: Some MCP servers have limitations on message history
|
||||
4. **Increase Message Limits**: Try increasing the message limit:
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--channels general \
|
||||
--max-messages-per-channel 1000 \
|
||||
--query "test"
|
||||
```
|
||||
|
||||
## Advanced Configuration
|
||||
|
||||
### Custom MCP Server Commands
|
||||
|
||||
If you need to pass additional parameters to your MCP server:
|
||||
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server --token-file /path/to/tokens.json" \
|
||||
--workspace-name "Your Workspace" \
|
||||
--channels general \
|
||||
--query "Your query"
|
||||
```
|
||||
|
||||
### Multiple Workspaces
|
||||
|
||||
To work with multiple Slack workspaces, you can:
|
||||
|
||||
1. Create separate apps for each workspace
|
||||
2. Use different environment variables
|
||||
3. Run separate instances with different configurations
|
||||
|
||||
### Performance Optimization
|
||||
|
||||
For better performance with large workspaces:
|
||||
|
||||
```bash
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--workspace-name "Your Workspace" \
|
||||
--max-messages-per-channel 500 \
|
||||
--no-concatenate-conversations \
|
||||
--query "Your query"
|
||||
```
|
||||
---
|
||||
|
||||
## Troubleshooting Checklist
|
||||
|
||||
- [ ] Slack app created with proper permissions
|
||||
- [ ] Bot token (xoxb-) copied correctly
|
||||
- [ ] App-level token (xapp-) created if needed
|
||||
- [ ] MCP server installed and accessible
|
||||
- [ ] Ollama installed and running (`brew services start ollama`)
|
||||
- [ ] Ollama model pulled (`ollama pull llama3.2:1b`)
|
||||
- [ ] Environment variables set correctly
|
||||
- [ ] Bot invited to relevant channels
|
||||
- [ ] Channel names specified without # symbol
|
||||
- [ ] Sufficient retry attempts configured
|
||||
- [ ] Network connectivity to Slack APIs
|
||||
|
||||
## Getting Help
|
||||
|
||||
If you continue to have issues:
|
||||
|
||||
1. **Check Logs**: Look for detailed error messages in the console output
|
||||
2. **Test MCP Server**: Use `--test-connection` to verify the MCP server is working
|
||||
3. **Verify Tokens**: Double-check that your Slack tokens are valid and have the right scopes
|
||||
4. **Check Ollama**: Ensure Ollama is running (`ollama serve`) and the model is available (`ollama list`)
|
||||
5. **Community Support**: Reach out to the LEANN community for help
|
||||
|
||||
## Example Commands
|
||||
|
||||
### Basic Usage
|
||||
```bash
|
||||
# Test connection
|
||||
python -m apps.slack_rag --mcp-server "slack-mcp-server" --test-connection
|
||||
|
||||
# Index specific channels
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--workspace-name "My Company" \
|
||||
--channels general random \
|
||||
--query "What did we decide about the project timeline?"
|
||||
```
|
||||
|
||||
### Advanced Usage
|
||||
```bash
|
||||
# With custom retry settings
|
||||
python -m apps.slack_rag \
|
||||
--mcp-server "slack-mcp-server" \
|
||||
--workspace-name "My Company" \
|
||||
--channels general \
|
||||
--max-retries 10 \
|
||||
--retry-delay 5.0 \
|
||||
--max-messages-per-channel 2000 \
|
||||
--query "Show me all decisions made in the last month"
|
||||
```
|
||||
BIN
docs/videos/slack_integration_1.1.png
Normal file
BIN
docs/videos/slack_integration_1.1.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 445 KiB |
BIN
docs/videos/slack_integration_1.2.png
Normal file
BIN
docs/videos/slack_integration_1.2.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 508 KiB |
BIN
docs/videos/slack_integration_1.3.png
Normal file
BIN
docs/videos/slack_integration_1.3.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 437 KiB |
BIN
docs/videos/slack_integration_2.1.png
Normal file
BIN
docs/videos/slack_integration_2.1.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 474 KiB |
BIN
docs/videos/slack_integration_2.2.png
Normal file
BIN
docs/videos/slack_integration_2.2.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 501 KiB |
BIN
docs/videos/slack_integration_2.3.png
Normal file
BIN
docs/videos/slack_integration_2.3.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 454 KiB |
@@ -29,12 +29,25 @@ if(APPLE)
|
||||
set(CMAKE_OSX_DEPLOYMENT_TARGET "11.0" CACHE STRING "Minimum macOS version")
|
||||
endif()
|
||||
|
||||
# Use system ZeroMQ instead of building from source
|
||||
# Find ZMQ using pkg-config with IMPORTED_TARGET for automatic target creation
|
||||
find_package(PkgConfig REQUIRED)
|
||||
pkg_check_modules(ZMQ REQUIRED libzmq)
|
||||
|
||||
# On ARM64 macOS, ensure pkg-config finds ARM64 Homebrew packages first
|
||||
if(APPLE AND CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|arm64")
|
||||
set(ENV{PKG_CONFIG_PATH} "/opt/homebrew/lib/pkgconfig:/opt/homebrew/share/pkgconfig:$ENV{PKG_CONFIG_PATH}")
|
||||
endif()
|
||||
|
||||
pkg_check_modules(ZMQ REQUIRED IMPORTED_TARGET libzmq)
|
||||
|
||||
# This creates PkgConfig::ZMQ target automatically with correct properties
|
||||
if(TARGET PkgConfig::ZMQ)
|
||||
message(STATUS "Found and configured ZMQ target: PkgConfig::ZMQ")
|
||||
else()
|
||||
message(FATAL_ERROR "pkg_check_modules did not create IMPORTED target for ZMQ.")
|
||||
endif()
|
||||
|
||||
# Add cppzmq headers
|
||||
include_directories(third_party/cppzmq)
|
||||
include_directories(SYSTEM third_party/cppzmq)
|
||||
|
||||
# Configure msgpack-c - disable boost dependency
|
||||
set(MSGPACK_USE_BOOST OFF CACHE BOOL "" FORCE)
|
||||
|
||||
@@ -1236,6 +1236,17 @@ class LeannChat:
|
||||
"Please provide the best answer you can based on this context and your knowledge."
|
||||
)
|
||||
|
||||
print("The context provided to the LLM is:")
|
||||
print(f"{'Relevance':<10} | {'Chunk id':<10} | {'Content':<60} | {'Source':<80}")
|
||||
print("-" * 150)
|
||||
for r in results:
|
||||
chunk_relevance = f"{r.score:.3f}"
|
||||
chunk_id = r.id
|
||||
chunk_content = r.text[:60]
|
||||
chunk_source = r.metadata.get("source", "")[:80]
|
||||
print(
|
||||
f"{chunk_relevance:<10} | {chunk_id:<10} | {chunk_content:<60} | {chunk_source:<80}"
|
||||
)
|
||||
ask_time = time.time()
|
||||
ans = self.llm.ask(prompt, **llm_kwargs)
|
||||
ask_time = time.time() - ask_time
|
||||
|
||||
@@ -834,6 +834,11 @@ class OpenAIChat(LLMInterface):
|
||||
|
||||
try:
|
||||
response = self.client.chat.completions.create(**params)
|
||||
print(
|
||||
f"Total tokens = {response.usage.total_tokens}, prompt tokens = {response.usage.prompt_tokens}, completion tokens = {response.usage.completion_tokens}"
|
||||
)
|
||||
if response.choices[0].finish_reason == "length":
|
||||
print("The query is exceeding the maximum allowed number of tokens")
|
||||
return response.choices[0].message.content.strip()
|
||||
except Exception as e:
|
||||
logger.error(f"Error communicating with OpenAI: {e}")
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
@@ -106,7 +107,7 @@ Examples:
|
||||
help="Documents directories and/or files (default: current directory)",
|
||||
)
|
||||
build_parser.add_argument(
|
||||
"--backend",
|
||||
"--backend-name",
|
||||
type=str,
|
||||
default="hnsw",
|
||||
choices=["hnsw", "diskann"],
|
||||
@@ -254,6 +255,11 @@ Examples:
|
||||
action="store_true",
|
||||
help="Non-interactive mode: automatically select index without prompting",
|
||||
)
|
||||
search_parser.add_argument(
|
||||
"--show-metadata",
|
||||
action="store_true",
|
||||
help="Display file paths and metadata in search results",
|
||||
)
|
||||
|
||||
# Ask command
|
||||
ask_parser = subparsers.add_parser("ask", help="Ask questions")
|
||||
@@ -1186,6 +1192,7 @@ Examples:
|
||||
for doc in other_docs:
|
||||
file_path = doc.metadata.get("file_path", "")
|
||||
if file_filter(file_path):
|
||||
doc.metadata["source"] = file_path
|
||||
filtered_docs.append(doc)
|
||||
|
||||
documents.extend(filtered_docs)
|
||||
@@ -1261,7 +1268,7 @@ Examples:
|
||||
from .chunking_utils import create_text_chunks
|
||||
|
||||
# Use enhanced chunking with AST support
|
||||
all_texts = create_text_chunks(
|
||||
chunk_texts = create_text_chunks(
|
||||
documents,
|
||||
chunk_size=self.node_parser.chunk_size,
|
||||
chunk_overlap=self.node_parser.chunk_overlap,
|
||||
@@ -1272,6 +1279,14 @@ Examples:
|
||||
ast_fallback_traditional=getattr(args, "ast_fallback_traditional", True),
|
||||
)
|
||||
|
||||
# Note: AST chunking currently returns plain text chunks without metadata
|
||||
# We preserve basic file info by associating chunks with their source documents
|
||||
# For better metadata preservation, documents list order should be maintained
|
||||
for chunk_text in chunk_texts:
|
||||
# TODO: Enhance create_text_chunks to return metadata alongside text
|
||||
# For now, we store chunks with empty metadata
|
||||
all_texts.append({"text": chunk_text, "metadata": {}})
|
||||
|
||||
except ImportError as e:
|
||||
print(
|
||||
f"⚠️ AST chunking utilities not available in package ({e}), falling back to traditional chunking"
|
||||
@@ -1283,14 +1298,27 @@ Examples:
|
||||
for doc in tqdm(documents, desc="Chunking documents", unit="doc"):
|
||||
# Check if this is a code file based on source path
|
||||
source_path = doc.metadata.get("source", "")
|
||||
file_path = doc.metadata.get("file_path", "")
|
||||
is_code_file = any(source_path.endswith(ext) for ext in code_file_exts)
|
||||
|
||||
# Extract metadata to preserve with chunks
|
||||
chunk_metadata = {
|
||||
"file_path": file_path or source_path,
|
||||
"file_name": doc.metadata.get("file_name", ""),
|
||||
}
|
||||
|
||||
# Add optional metadata if available
|
||||
if "creation_date" in doc.metadata:
|
||||
chunk_metadata["creation_date"] = doc.metadata["creation_date"]
|
||||
if "last_modified_date" in doc.metadata:
|
||||
chunk_metadata["last_modified_date"] = doc.metadata["last_modified_date"]
|
||||
|
||||
# Use appropriate parser based on file type
|
||||
parser = self.code_parser if is_code_file else self.node_parser
|
||||
nodes = parser.get_nodes_from_documents([doc])
|
||||
|
||||
for node in nodes:
|
||||
all_texts.append(node.get_content())
|
||||
all_texts.append({"text": node.get_content(), "metadata": chunk_metadata})
|
||||
|
||||
print(f"Loaded {len(documents)} documents, {len(all_texts)} chunks")
|
||||
return all_texts
|
||||
@@ -1365,7 +1393,7 @@ Examples:
|
||||
|
||||
index_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f"Building index '{index_name}' with {args.backend} backend...")
|
||||
print(f"Building index '{index_name}' with {args.backend_name} backend...")
|
||||
|
||||
embedding_options: dict[str, Any] = {}
|
||||
if args.embedding_mode == "ollama":
|
||||
@@ -1377,7 +1405,7 @@ Examples:
|
||||
embedding_options["api_key"] = resolved_embedding_key
|
||||
|
||||
builder = LeannBuilder(
|
||||
backend_name=args.backend,
|
||||
backend_name=args.backend_name,
|
||||
embedding_model=args.embedding_model,
|
||||
embedding_mode=args.embedding_mode,
|
||||
embedding_options=embedding_options or None,
|
||||
@@ -1388,8 +1416,8 @@ Examples:
|
||||
num_threads=args.num_threads,
|
||||
)
|
||||
|
||||
for chunk_text in all_texts:
|
||||
builder.add_text(chunk_text)
|
||||
for chunk in all_texts:
|
||||
builder.add_text(chunk["text"], metadata=chunk["metadata"])
|
||||
|
||||
builder.build_index(index_path)
|
||||
print(f"Index built at {index_path}")
|
||||
@@ -1510,7 +1538,25 @@ Examples:
|
||||
print(f"Search results for '{query}' (top {len(results)}):")
|
||||
for i, result in enumerate(results, 1):
|
||||
print(f"{i}. Score: {result.score:.3f}")
|
||||
|
||||
# Display metadata if flag is set
|
||||
if args.show_metadata and result.metadata:
|
||||
file_path = result.metadata.get("file_path", "")
|
||||
if file_path:
|
||||
print(f" 📄 File: {file_path}")
|
||||
|
||||
file_name = result.metadata.get("file_name", "")
|
||||
if file_name and file_name != file_path:
|
||||
print(f" 📝 Name: {file_name}")
|
||||
|
||||
# Show timestamps if available
|
||||
if "creation_date" in result.metadata:
|
||||
print(f" 🕐 Created: {result.metadata['creation_date']}")
|
||||
if "last_modified_date" in result.metadata:
|
||||
print(f" 🕑 Modified: {result.metadata['last_modified_date']}")
|
||||
|
||||
print(f" {result.text[:200]}...")
|
||||
print(f" Source: {result.metadata.get('source', '')}")
|
||||
print()
|
||||
|
||||
async def ask_questions(self, args):
|
||||
@@ -1542,6 +1588,7 @@ Examples:
|
||||
llm_kwargs["thinking_budget"] = args.thinking_budget
|
||||
|
||||
def _ask_once(prompt: str) -> None:
|
||||
query_start_time = time.time()
|
||||
response = chat.ask(
|
||||
prompt,
|
||||
top_k=args.top_k,
|
||||
@@ -1552,7 +1599,9 @@ Examples:
|
||||
pruning_strategy=args.pruning_strategy,
|
||||
llm_kwargs=llm_kwargs,
|
||||
)
|
||||
query_completion_time = time.time() - query_start_time
|
||||
print(f"LEANN: {response}")
|
||||
print(f"The query took {query_completion_time:.3f} seconds to finish")
|
||||
|
||||
initial_query = (args.query or "").strip()
|
||||
|
||||
|
||||
@@ -574,9 +574,10 @@ def compute_embeddings_ollama(
|
||||
host: Optional[str] = None,
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Compute embeddings using Ollama API with simplified batch processing.
|
||||
Compute embeddings using Ollama API with true batch processing.
|
||||
|
||||
Uses batch size of 32 for MPS/CPU and 128 for CUDA to optimize performance.
|
||||
Uses the /api/embed endpoint which supports batch inputs.
|
||||
Batch size: 32 for MPS/CPU, 128 for CUDA to optimize performance.
|
||||
|
||||
Args:
|
||||
texts: List of texts to compute embeddings for
|
||||
@@ -681,11 +682,11 @@ def compute_embeddings_ollama(
|
||||
logger.info(f"Resolved model name '{model_name}' to '{resolved_model_name}'")
|
||||
model_name = resolved_model_name
|
||||
|
||||
# Verify the model supports embeddings by testing it
|
||||
# Verify the model supports embeddings by testing it with /api/embed
|
||||
try:
|
||||
test_response = requests.post(
|
||||
f"{resolved_host}/api/embeddings",
|
||||
json={"model": model_name, "prompt": "test"},
|
||||
f"{resolved_host}/api/embed",
|
||||
json={"model": model_name, "input": "test"},
|
||||
timeout=10,
|
||||
)
|
||||
if test_response.status_code != 200:
|
||||
@@ -717,56 +718,55 @@ def compute_embeddings_ollama(
|
||||
# If torch is not available, use conservative batch size
|
||||
batch_size = 32
|
||||
|
||||
logger.info(f"Using batch size: {batch_size}")
|
||||
logger.info(f"Using batch size: {batch_size} for true batch processing")
|
||||
|
||||
def get_batch_embeddings(batch_texts):
|
||||
"""Get embeddings for a batch of texts."""
|
||||
all_embeddings = []
|
||||
failed_indices = []
|
||||
"""Get embeddings for a batch of texts using /api/embed endpoint."""
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
|
||||
for i, text in enumerate(batch_texts):
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
# Truncate very long texts to avoid API issues
|
||||
truncated_texts = [text[:8000] if len(text) > 8000 else text for text in batch_texts]
|
||||
|
||||
# Truncate very long texts to avoid API issues
|
||||
truncated_text = text[:8000] if len(text) > 8000 else text
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{resolved_host}/api/embeddings",
|
||||
json={"model": model_name, "prompt": truncated_text},
|
||||
timeout=30,
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# Use /api/embed endpoint with "input" parameter for batch processing
|
||||
response = requests.post(
|
||||
f"{resolved_host}/api/embed",
|
||||
json={"model": model_name, "input": truncated_texts},
|
||||
timeout=60, # Increased timeout for batch processing
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
batch_embeddings = result.get("embeddings")
|
||||
|
||||
if batch_embeddings is None:
|
||||
raise ValueError("No embeddings returned from API")
|
||||
|
||||
if not isinstance(batch_embeddings, list):
|
||||
raise ValueError(f"Invalid embeddings format: {type(batch_embeddings)}")
|
||||
|
||||
if len(batch_embeddings) != len(batch_texts):
|
||||
raise ValueError(
|
||||
f"Mismatch: requested {len(batch_texts)} embeddings, got {len(batch_embeddings)}"
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
embedding = result.get("embedding")
|
||||
return batch_embeddings, []
|
||||
|
||||
if embedding is None:
|
||||
raise ValueError(f"No embedding returned for text {i}")
|
||||
except requests.exceptions.Timeout:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.warning(f"Timeout for batch after {max_retries} retries")
|
||||
return None, list(range(len(batch_texts)))
|
||||
|
||||
if not isinstance(embedding, list) or len(embedding) == 0:
|
||||
raise ValueError(f"Invalid embedding format for text {i}")
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to get embeddings for batch: {e}")
|
||||
return None, list(range(len(batch_texts)))
|
||||
|
||||
all_embeddings.append(embedding)
|
||||
break
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.warning(f"Timeout for text {i} after {max_retries} retries")
|
||||
failed_indices.append(i)
|
||||
all_embeddings.append(None)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if retry_count >= max_retries:
|
||||
logger.error(f"Failed to get embedding for text {i}: {e}")
|
||||
failed_indices.append(i)
|
||||
all_embeddings.append(None)
|
||||
break
|
||||
return all_embeddings, failed_indices
|
||||
return None, list(range(len(batch_texts)))
|
||||
|
||||
# Process texts in batches
|
||||
all_embeddings = []
|
||||
@@ -784,7 +784,7 @@ def compute_embeddings_ollama(
|
||||
num_batches = (len(texts) + batch_size - 1) // batch_size
|
||||
|
||||
if show_progress:
|
||||
batch_iterator = tqdm(range(num_batches), desc="Computing Ollama embeddings")
|
||||
batch_iterator = tqdm(range(num_batches), desc="Computing Ollama embeddings (batched)")
|
||||
else:
|
||||
batch_iterator = range(num_batches)
|
||||
|
||||
@@ -795,10 +795,14 @@ def compute_embeddings_ollama(
|
||||
|
||||
batch_embeddings, batch_failed = get_batch_embeddings(batch_texts)
|
||||
|
||||
# Adjust failed indices to global indices
|
||||
global_failed = [start_idx + idx for idx in batch_failed]
|
||||
all_failed_indices.extend(global_failed)
|
||||
all_embeddings.extend(batch_embeddings)
|
||||
if batch_embeddings is not None:
|
||||
all_embeddings.extend(batch_embeddings)
|
||||
else:
|
||||
# Entire batch failed, add None placeholders
|
||||
all_embeddings.extend([None] * len(batch_texts))
|
||||
# Adjust failed indices to global indices
|
||||
global_failed = [start_idx + idx for idx in batch_failed]
|
||||
all_failed_indices.extend(global_failed)
|
||||
|
||||
# Handle failed embeddings
|
||||
if all_failed_indices:
|
||||
|
||||
@@ -60,6 +60,11 @@ def handle_request(request):
|
||||
"maximum": 128,
|
||||
"description": "Search complexity level. Use 16-32 for fast searches (recommended), 64+ for higher precision when needed.",
|
||||
},
|
||||
"show_metadata": {
|
||||
"type": "boolean",
|
||||
"default": False,
|
||||
"description": "Include file paths and metadata in search results. Useful for understanding which files contain the results.",
|
||||
},
|
||||
},
|
||||
"required": ["index_name", "query"],
|
||||
},
|
||||
@@ -104,6 +109,8 @@ def handle_request(request):
|
||||
f"--complexity={args.get('complexity', 32)}",
|
||||
"--non-interactive",
|
||||
]
|
||||
if args.get("show_metadata", False):
|
||||
cmd.append("--show-metadata")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
elif tool_name == "leann_list":
|
||||
|
||||
Reference in New Issue
Block a user