Merge pull request #163 from orbisai0security/fix-semgrep-python.lang.security.audit.eval-detected.eval-detected-apps-slack-data-slack-mc-b134f52c-f326

Fix: Unsafe Code Execution Function Could Allow External Code Injection in apps/slack_data/slack_mcp_reader.py
This commit is contained in:
Aakash Suresh
2025-11-13 14:35:44 -08:00
committed by GitHub
2 changed files with 6 additions and 5 deletions

View File

@@ -14,6 +14,6 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: lycheeverse/lychee-action@v2 - uses: lycheeverse/lychee-action@v2
with: with:
args: --no-progress --insecure --user-agent 'curl/7.68.0' README.md docs/ apps/ examples/ benchmarks/ args: --no-progress --insecure --user-agent 'curl/7.68.0' --exclude '.*api\.star-history\.com.*' --accept 200,201,202,203,204,205,206,207,208,226,300,301,302,303,304,305,306,307,308,503 README.md docs/ apps/ examples/ benchmarks/
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -7,6 +7,7 @@ for indexing in LEANN. It supports various Slack MCP server implementations and
flexible message processing options. flexible message processing options.
""" """
import ast
import asyncio import asyncio
import json import json
import logging import logging
@@ -146,16 +147,16 @@ class SlackMCPReader:
match = re.search(r"'error':\s*(\{[^}]+\})", str(e)) match = re.search(r"'error':\s*(\{[^}]+\})", str(e))
if match: if match:
try: try:
error_dict = eval(match.group(1)) error_dict = ast.literal_eval(match.group(1))
except (ValueError, SyntaxError, NameError): except (ValueError, SyntaxError):
pass pass
else: else:
# Try alternative format # Try alternative format
match = re.search(r"Failed to fetch messages:\s*(\{[^}]+\})", str(e)) match = re.search(r"Failed to fetch messages:\s*(\{[^}]+\})", str(e))
if match: if match:
try: try:
error_dict = eval(match.group(1)) error_dict = ast.literal_eval(match.group(1))
except (ValueError, SyntaxError, NameError): except (ValueError, SyntaxError):
pass pass
if self._is_cache_sync_error(error_dict): if self._is_cache_sync_error(error_dict):