Compare commits

..

12 Commits

Author SHA1 Message Date
Andy Lee
80330f8d97 fix: remove whitespace from blank line
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-11 00:04:32 +00:00
Andy Lee
4772a5bb18 feat: add process group management to prevent hanging subprocesses
- Add start_new_session=True to subprocess.Popen for better isolation
- Use os.killpg() to terminate entire process groups instead of single processes
- Import signal module for SIGTERM/SIGKILL handling
- This ensures child processes of embedding servers are also cleaned up

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 22:11:12 +00:00
Andy Lee
3d67205670 fix: remove Chinese comments to pass ruff check
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 08:31:21 +00:00
Andy Lee
4de709ad4b feat: add ZMQ timeout configurations to prevent hanging
- Add RCVTIMEO (300s) to prevent recv operations from hanging indefinitely
- Add SNDTIMEO (300s) to prevent send operations from hanging indefinitely
- Add IMMEDIATE mode to avoid message queue blocking
- Applied to both api.py and searcher_base.py ZMQ socket connections

This ensures ZMQ operations timeout gracefully instead of hanging the process
when embedding servers become unresponsive.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 08:30:02 +00:00
Andy Lee
48c82ee3e3 fix: remove strict parameter from zip() for Python 3.9 compatibility
The strict parameter for zip() was added in Python 3.10.
Remove it to support Python 3.9.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:58:42 +00:00
Andy Lee
6d1ac4a503 fix: use Python 3.9 compatible builtin generics
- Convert List[str] to list[str], Dict[str, Any] to dict[str, Any], etc.
- Use ruff --unsafe-fixes to automatically apply all type annotation updates
- Remove deprecated typing imports (List, Dict, Tuple) where no longer needed
- Keep Optional[str] syntax (union operator | not supported in Python 3.9)

Now all type annotations are Python 3.9 compatible with modern builtin generics.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:38:33 +00:00
Andy Lee
ffba435252 fix: Python 3.9 compatibility - replace union types and builtin generics
- Replace 'str | None' with 'Optional[str]'
- Replace 'list[str]' with 'List[str]'
- Replace 'dict[' with 'Dict['
- Replace 'tuple[' with 'Tuple['
- Add missing typing imports (List, Dict, Tuple)

Fixes TypeError: unsupported operand type(s) for |: 'type' and 'NoneType'

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:29:46 +00:00
Andy Lee
728fa42ad5 style: run ruff format on modified files
- Format diskann_backend.py and conftest.py according to ruff standards

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:11:16 +00:00
Andy Lee
bce8aca3fa fix: ensure newline at end of conftest.py for ruff compliance 2025-08-09 23:56:18 +00:00
Andy Lee
f4e41e4353 style: fix ruff formatting issues in conftest.py
- Fix import sorting and organization
- Remove trailing whitespace
- Add proper newline at end of file

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-09 23:53:31 +00:00
Andy Lee
75c7b047d7 Merge branch 'main' into fix/clean-hang-solution 2025-08-09 16:49:51 -07:00
Andy Lee
490329dc66 fix: clean and simple hang prevention solution
This commit provides a minimal, focused fix for CI hanging issues by addressing the root causes:

**Key Changes:**

1. **ZMQ Resource Management:**
   - Remove `context.term()` calls that were causing hangs
   - Add `socket.setsockopt(zmq.LINGER, 0)` to prevent blocking on close
   - Keep socket operations simple with default timeouts (no artificial limits)

2. **Process Cleanup:**
   - Add timeout (1s) to final `process.wait()` in embedding server manager
   - Prevent infinite waiting that was causing CI hangs

3. **Resource Cleanup Methods:**
   - Add simple `cleanup()` methods to searchers and API classes
   - Focus on C++ object destruction for DiskANN backend
   - Avoid complex cleanup logic that could introduce new issues

4. **Basic Test Safety:**
   - Simple pytest-timeout configuration (300s)
   - Basic test session cleanup using psutil
   - Minimal conftest.py without complex logic

**Philosophy:**
This solution avoids the complex multi-layered fixes from the previous PR chain.
Instead, it targets the specific root causes:
- ZMQ context termination blocking
- Process wait() without timeout
- C++ resource leaks in backends

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-09 23:45:18 +00:00
26 changed files with 3604 additions and 4896 deletions

View File

@@ -5,16 +5,7 @@ on:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:
inputs:
debug_enabled:
type: boolean
description: 'Run with tmate debugging enabled (SSH access to runner)'
required: false
default: false
jobs:
build:
uses: ./.github/workflows/build-reusable.yml
with:
debug_enabled: ${{ github.event_name == 'workflow_dispatch' && inputs.debug_enabled || false }}

View File

@@ -8,11 +8,6 @@ on:
required: false
type: string
default: ''
debug_enabled:
description: 'Enable tmate debugging session for troubleshooting'
required: false
type: boolean
default: false
jobs:
lint:
@@ -228,48 +223,7 @@ jobs:
python -c "from leann_backend_diskann import _diskannpy; print('_diskannpy imported successfully')" || echo "Failed to import _diskannpy"
ls -la $(python -c "import leann_backend_diskann; import os; print(os.path.dirname(leann_backend_diskann.__file__))" 2>/dev/null) 2>/dev/null || echo "Failed to list module directory"
# Extra debugging for Python 3.13
if [[ "${{ matrix.python }}" == "3.13" ]]; then
echo "=== Python 3.13 Debug Info ==="
echo "Python version details:"
python --version
python -c "import sys; print(f'sys.version_info: {sys.version_info}')"
echo "Pytest version:"
python -m pytest --version
echo "Testing basic pytest collection:"
if [[ "$RUNNER_OS" == "Linux" ]]; then
timeout --signal=INT 10 python -m pytest --collect-only tests/test_ci_minimal.py -v || echo "Collection timed out or failed"
else
# No timeout on macOS/Windows
python -m pytest --collect-only tests/test_ci_minimal.py -v || echo "Collection failed"
fi
echo "Testing single simple test:"
if [[ "$RUNNER_OS" == "Linux" ]]; then
timeout --signal=INT 10 python -m pytest tests/test_ci_minimal.py::test_package_imports --full-trace -v || echo "Simple test timed out or failed"
else
# No timeout on macOS/Windows
python -m pytest tests/test_ci_minimal.py::test_package_imports --full-trace -v || echo "Simple test failed"
fi
fi
# Enable tmate debugging session if requested
- name: Setup tmate session for debugging
if: ${{ inputs.debug_enabled }}
uses: mxschmitt/action-tmate@v3
with:
detached: true
timeout-minutes: 30
limit-access-to-actor: true
- name: Run tests with pytest
# Timeout hierarchy:
# 1. Individual test timeout: 20s (see pyproject.toml markers)
# 2. Pytest session timeout: 300s (see pyproject.toml [tool.pytest.ini_options])
# 3. Outer shell timeout: 360s (300s + 60s buffer for cleanup)
# 4. GitHub Actions job timeout: 6 hours (default)
env:
CI: true # Mark as CI environment to skip memory-intensive tests
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
@@ -282,165 +236,8 @@ jobs:
# Activate virtual environment
source .venv/bin/activate || source .venv/Scripts/activate
# Define comprehensive diagnostic function
diag() {
echo "===== COMPREHENSIVE DIAGNOSTICS BEGIN ====="
date
echo ""
echo "### Current Shell Info ###"
echo "Shell PID: $$"
echo "Shell PPID: $PPID"
echo "Current directory: $(pwd)"
echo ""
echo "### Process Tree (full) ###"
pstree -ap 2>/dev/null || ps auxf || true
echo ""
echo "### All Python/Pytest Processes ###"
ps -ef | grep -E 'python|pytest' | grep -v grep || true
echo ""
echo "### Embedding Server Processes ###"
ps -ef | grep -E 'embedding|zmq|diskann' | grep -v grep || true
echo ""
echo "### Network Listeners ###"
ss -ltnp 2>/dev/null || netstat -ltn 2>/dev/null || true
echo ""
echo "### Open File Descriptors (lsof) ###"
lsof -p $$ 2>/dev/null | head -20 || true
echo ""
echo "### Zombie Processes ###"
ps aux | grep '<defunct>' || echo "No zombie processes"
echo ""
echo "### Current Jobs ###"
jobs -l || true
echo ""
echo "### /proc/PID/fd for current shell ###"
ls -la /proc/$$/fd 2>/dev/null || true
echo ""
echo "===== COMPREHENSIVE DIAGNOSTICS END ====="
}
# Enable verbose logging for debugging
export PYTHONUNBUFFERED=1
export PYTEST_CURRENT_TEST=1
# Run all tests with extensive logging
if [[ "$RUNNER_OS" == "Linux" ]]; then
echo "🚀 Starting Linux test execution with timeout..."
echo "Current time: $(date)"
echo "Shell PID: $$"
echo "Python: $(python --version)"
echo "Pytest: $(pytest --version)"
# Show environment variables for debugging
echo "📦 Environment variables:"
env | grep -E "PYTHON|PYTEST|CI|RUNNER" | sort
# Set trap for diagnostics
trap diag INT TERM EXIT
echo "📋 Pre-test diagnostics:"
ps -ef | grep -E 'python|pytest' | grep -v grep || echo "No python/pytest processes before test"
# Check for any listening ports before test
echo "🔌 Pre-test network state:"
ss -ltn 2>/dev/null | grep -E "555[0-9]|556[0-9]" || echo "No embedding server ports open"
# Set timeouts - outer must be larger than pytest's internal timeout
# IMPORTANT: Keep PYTEST_TIMEOUT_SEC in sync with pyproject.toml [tool.pytest.ini_options] timeout
PYTEST_TIMEOUT_SEC=${PYTEST_TIMEOUT_SEC:-300} # Default 300s, matches pyproject.toml
BUFFER_SEC=${TIMEOUT_BUFFER_SEC:-60} # Buffer for cleanup after pytest timeout
OUTER_TIMEOUT_SEC=${OUTER_TIMEOUT_SEC:-$((PYTEST_TIMEOUT_SEC + BUFFER_SEC))}
echo "⏰ Timeout configuration:"
echo " - Pytest internal timeout: ${PYTEST_TIMEOUT_SEC}s (from pyproject.toml)"
echo " - Cleanup buffer: ${BUFFER_SEC}s"
echo " - Outer shell timeout: ${OUTER_TIMEOUT_SEC}s (${PYTEST_TIMEOUT_SEC}s + ${BUFFER_SEC}s buffer)"
echo " - This ensures pytest can complete its own timeout handling and cleanup"
echo "🏃 Running pytest with ${OUTER_TIMEOUT_SEC}s outer timeout..."
# Export for inner shell
export PYTEST_TIMEOUT_SEC OUTER_TIMEOUT_SEC BUFFER_SEC
timeout --preserve-status --signal=INT --kill-after=10 ${OUTER_TIMEOUT_SEC} bash -c '
echo "⏱️ Pytest starting at: $(date)"
echo "Running command: pytest tests/ -vv --maxfail=3 --tb=short --capture=no"
# Run pytest with maximum verbosity and no output capture
pytest tests/ -vv --maxfail=3 --tb=short --capture=no --log-cli-level=DEBUG 2>&1 | tee pytest.log
PYTEST_EXIT=${PIPESTATUS[0]}
echo "✅ Pytest finished at: $(date) with exit code: $PYTEST_EXIT"
echo "Last 20 lines of pytest output:"
tail -20 pytest.log || true
# Immediately check for leftover processes
echo "🔍 Post-pytest process check:"
ps -ef | grep -E "python|pytest|embedding" | grep -v grep || echo "No leftover processes"
# Clean up any children before exit
echo "🧹 Cleaning up child processes..."
pkill -TERM -P $$ 2>/dev/null || true
sleep 0.5
pkill -KILL -P $$ 2>/dev/null || true
echo "📊 Final check before exit:"
ps -ef | grep -E "python|pytest|embedding" | grep -v grep || echo "All clean"
exit $PYTEST_EXIT
'
EXIT_CODE=$?
echo "🔚 Timeout command exited with code: $EXIT_CODE"
if [ $EXIT_CODE -eq 124 ]; then
echo "⚠️ TIMEOUT TRIGGERED - Tests took more than ${OUTER_TIMEOUT_SEC} seconds!"
echo "📸 Capturing full diagnostics..."
diag
# Run diagnostic script if available
if [ -f scripts/diagnose_hang.sh ]; then
echo "🔍 Running diagnostic script..."
bash scripts/diagnose_hang.sh || true
fi
# More aggressive cleanup
echo "💀 Killing all Python processes owned by runner..."
pkill -9 -u runner python || true
pkill -9 -u runner pytest || true
elif [ $EXIT_CODE -ne 0 ]; then
echo "❌ Tests failed with exit code: $EXIT_CODE"
else
echo "✅ All tests passed!"
fi
# Always show final state
echo "📍 Final state check:"
ps -ef | grep -E 'python|pytest|embedding' | grep -v grep || echo "No Python processes remaining"
exit $EXIT_CODE
else
# For macOS/Windows, run without GNU timeout
echo "🚀 Running tests on $RUNNER_OS..."
pytest tests/ -vv --maxfail=3 --tb=short --capture=no --log-cli-level=INFO
fi
# Provide tmate session on test failure for debugging
- name: Setup tmate session on failure
if: ${{ failure() && (inputs.debug_enabled || contains(github.event.head_commit.message, '[debug]')) }}
uses: mxschmitt/action-tmate@v3
with:
timeout-minutes: 30
limit-access-to-actor: true
# Run all tests
pytest tests/
- name: Run sanity checks (optional)
run: |

View File

@@ -97,7 +97,6 @@ uv sync
</details>
## Quick Start
Our declarative API makes RAG as easy as writing a config file.
@@ -189,7 +188,7 @@ All RAG examples share these common parameters. **Interactive mode** is availabl
--force-rebuild # Force rebuild index even if it exists
# Embedding Parameters
--embedding-model MODEL # e.g., facebook/contriever, text-embedding-3-small, nomic-embed-text, mlx-community/Qwen3-Embedding-0.6B-8bit or nomic-embed-text
--embedding-model MODEL # e.g., facebook/contriever, text-embedding-3-small, nomic-embed-text, or mlx-community/multilingual-e5-base-mlx
--embedding-mode MODE # sentence-transformers, openai, mlx, or ollama
# LLM Parameters (Text generation models)

View File

@@ -178,9 +178,6 @@ class BaseRAGExample(ABC):
config["host"] = args.llm_host
elif args.llm == "hf":
config["model"] = args.llm_model or "Qwen/Qwen2.5-1.5B-Instruct"
elif args.llm == "simulated":
# Simulated LLM doesn't need additional configuration
pass
return config

View File

@@ -236,15 +236,9 @@ python apps/document_rag.py --query "What are the main techniques LEANN explores
3. **Use MLX on Apple Silicon** (optional optimization):
```bash
--embedding-mode mlx --embedding-model mlx-community/Qwen3-Embedding-0.6B-8bit
--embedding-mode mlx --embedding-model mlx-community/multilingual-e5-base-mlx
```
MLX might not be the best choice, as we tested and found that it only offers 1.3x acceleration compared to HF, so maybe using ollama is a better choice for embedding generation
4. **Use Ollama**
```bash
--embedding-mode ollama --embedding-model nomic-embed-text
```
To discover additional embedding models in ollama, check out https://ollama.com/search?c=embedding or read more about embedding models at https://ollama.com/blog/embedding-models, please do check the model size that works best for you
### If Search Quality is Poor
1. **Increase retrieval count**:

View File

@@ -100,7 +100,6 @@ def create_diskann_embedding_server(
socket = context.socket(
zmq.REP
) # REP socket for both BaseSearcher and DiskANN C++ REQ clients
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}")

View File

@@ -4,8 +4,8 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-diskann"
version = "0.2.7"
dependencies = ["leann-core==0.2.7", "numpy", "protobuf>=3.19.0"]
version = "0.2.5"
dependencies = ["leann-core==0.2.5", "numpy", "protobuf>=3.19.0"]
[tool.scikit-build]
# Key: simplified CMake path

View File

@@ -1,6 +1,5 @@
import argparse
import gc # Import garbage collector interface
import logging
import os
import struct
import sys
@@ -8,12 +7,6 @@ import time
import numpy as np
# Set up logging to avoid print buffer issues
logger = logging.getLogger(__name__)
LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper()
log_level = getattr(logging, LOG_LEVEL, logging.WARNING)
logger.setLevel(log_level)
# --- FourCCs (add more if needed) ---
INDEX_HNSW_FLAT_FOURCC = int.from_bytes(b"IHNf", "little")
# Add other HNSW fourccs if you expect different storage types inside HNSW
@@ -250,12 +243,6 @@ def convert_hnsw_graph_to_csr(input_filename, output_filename, prune_embeddings=
output_filename: Output CSR index file
prune_embeddings: Whether to prune embedding storage (write NULL storage marker)
"""
# Disable buffering for print statements to avoid deadlock in CI/pytest
import functools
global print
print = functools.partial(print, flush=True)
print(f"Starting conversion: {input_filename} -> {output_filename}")
start_time = time.time()
original_hnsw_data = {}

View File

@@ -245,25 +245,3 @@ class HNSWSearcher(BaseSearcher):
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
return {"labels": string_labels, "distances": distances}
def cleanup(self):
"""Cleanup HNSW-specific resources including C++ ZMQ connections."""
# Call parent cleanup first
super().cleanup()
# Additional cleanup for C++ side ZMQ connections
# The ZmqDistanceComputer in C++ uses ZMQ connections that need cleanup
try:
# Delete the index to trigger C++ destructors
if hasattr(self, "index"):
del self.index
except Exception:
pass
# Force garbage collection to ensure C++ objects are destroyed
try:
import gc
gc.collect()
except Exception:
pass

View File

@@ -92,7 +92,6 @@ def create_hnsw_embedding_server(
"""ZMQ server thread"""
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"HNSW ZMQ server listening on port {zmq_port}")

View File

@@ -6,10 +6,10 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-hnsw"
version = "0.2.7"
version = "0.2.5"
description = "Custom-built HNSW (Faiss) backend for the Leann toolkit."
dependencies = [
"leann-core==0.2.7",
"leann-core==0.2.5",
"numpy",
"pyzmq>=23.0.0",
"msgpack>=1.0.0",

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann-core"
version = "0.2.7"
version = "0.2.5"
description = "Core API and plugin system for LEANN"
readme = "README.md"
requires-python = ">=3.9"
@@ -31,8 +31,6 @@ dependencies = [
"PyPDF2>=3.0.0",
"pymupdf>=1.23.0",
"pdfplumber>=0.10.0",
"nbconvert>=7.0.0", # For .ipynb file support
"gitignore-parser>=0.1.12", # For proper .gitignore handling
"mlx>=0.26.3; sys_platform == 'darwin'",
"mlx-lm>=0.26.0; sys_platform == 'darwin'",
]

View File

@@ -88,9 +88,9 @@ def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int)
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.setsockopt(zmq.RCVTIMEO, 1000) # 1s timeout on receive
socket.setsockopt(zmq.SNDTIMEO, 1000) # 1s timeout on send
socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection
socket.setsockopt(zmq.RCVTIMEO, 300000)
socket.setsockopt(zmq.SNDTIMEO, 300000)
socket.setsockopt(zmq.IMMEDIATE, 1)
socket.connect(f"tcp://localhost:{port}")
try:
@@ -105,8 +105,8 @@ def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int)
# Convert back to numpy array
embeddings = np.array(embeddings_list, dtype=np.float32)
finally:
socket.close(linger=0)
context.term()
socket.close()
# Don't call context.term() - this was causing hangs
return embeddings
@@ -577,7 +577,6 @@ class LeannSearcher:
enriched_results = []
if "labels" in results and "distances" in results:
logger.info(f" Processing {len(results['labels'][0])} passage IDs:")
# Python 3.9 does not support zip(strict=...); lengths are expected to match
for i, (string_id, dist) in enumerate(
zip(results["labels"][0], results["distances"][0])
):
@@ -605,38 +604,17 @@ class LeannSearcher:
)
except KeyError:
RED = "\033[91m"
RESET = "\033[0m"
logger.error(
f" {RED}{RESET} [{i + 1:2d}] ID: '{string_id}' -> {RED}ERROR: Passage not found!{RESET}"
)
# Define color codes outside the loop for final message
GREEN = "\033[92m"
RESET = "\033[0m"
logger.info(f" {GREEN}✓ Final enriched results: {len(enriched_results)} passages{RESET}")
return enriched_results
def cleanup(self):
"""Explicitly cleanup embedding server and ZMQ resources.
This method should be called after you're done using the searcher,
especially in test environments or batch processing scenarios.
"""
# Stop embedding server
if hasattr(self.backend_impl, "embedding_server_manager"):
self.backend_impl.embedding_server_manager.stop_server()
# Set ZMQ linger but don't terminate global context
try:
import zmq
# Just set linger on the global instance
ctx = zmq.Context.instance()
ctx.linger = 0
# NEVER call ctx.term() or destroy() on the global instance
# That would block waiting for all sockets to close
except Exception:
pass
"""Cleanup embedding server and other resources."""
if hasattr(self.backend_impl, "cleanup"):
self.backend_impl.cleanup()
class LeannChat:
@@ -707,12 +685,3 @@ class LeannChat:
except (KeyboardInterrupt, EOFError):
print("\nGoodbye!")
break
def cleanup(self):
"""Explicitly cleanup embedding server resources.
This method should be called after you're done using the chat interface,
especially in test environments or batch processing scenarios.
"""
if hasattr(self.searcher, "cleanup"):
self.searcher.cleanup()

View File

@@ -1,6 +1,7 @@
import argparse
import asyncio
from pathlib import Path
from typing import Optional
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
@@ -86,9 +87,7 @@ Examples:
# Build command
build_parser = subparsers.add_parser("build", help="Build document index")
build_parser.add_argument(
"index_name", nargs="?", help="Index name (default: current directory name)"
)
build_parser.add_argument("index_name", help="Index name")
build_parser.add_argument(
"--docs", type=str, default=".", help="Documents directory (default: current directory)"
)
@@ -203,37 +202,6 @@ Examples:
with open(global_registry, "w") as f:
json.dump(projects, f, indent=2)
def _build_gitignore_parser(self, docs_dir: str):
"""Build gitignore parser using gitignore-parser library."""
from gitignore_parser import parse_gitignore
# Try to parse the root .gitignore
gitignore_path = Path(docs_dir) / ".gitignore"
if gitignore_path.exists():
try:
# gitignore-parser automatically handles all subdirectory .gitignore files!
matches = parse_gitignore(str(gitignore_path))
print(f"📋 Loaded .gitignore from {docs_dir} (includes all subdirectories)")
return matches
except Exception as e:
print(f"Warning: Could not parse .gitignore: {e}")
else:
print("📋 No .gitignore found")
# Fallback: basic pattern matching for essential files
essential_patterns = {".git", ".DS_Store", "__pycache__", "node_modules", ".venv", "venv"}
def basic_matches(file_path):
path_parts = Path(file_path).parts
return any(part in essential_patterns for part in path_parts)
return basic_matches
def _should_exclude_file(self, relative_path: Path, gitignore_matches) -> bool:
"""Check if a file should be excluded using gitignore parser."""
return gitignore_matches(str(relative_path))
def list_indexes(self):
print("Stored LEANN indexes:")
@@ -310,54 +278,39 @@ Examples:
print(f' leann search {example_name} "your query"')
print(f" leann ask {example_name} --interactive")
def load_documents(self, docs_dir: str, custom_file_types: str | None = None):
def load_documents(self, docs_dir: str, custom_file_types: Optional[str] = None):
print(f"Loading documents from {docs_dir}...")
if custom_file_types:
print(f"Using custom file types: {custom_file_types}")
# Build gitignore parser
gitignore_matches = self._build_gitignore_parser(docs_dir)
# Try to use better PDF parsers first, but only if PDFs are requested
# Try to use better PDF parsers first
documents = []
docs_path = Path(docs_dir)
# Check if we should process PDFs
should_process_pdfs = custom_file_types is None or ".pdf" in custom_file_types
for file_path in docs_path.rglob("*.pdf"):
print(f"Processing PDF: {file_path}")
if should_process_pdfs:
for file_path in docs_path.rglob("*.pdf"):
# Check if file matches any exclude pattern
relative_path = file_path.relative_to(docs_path)
if self._should_exclude_file(relative_path, gitignore_matches):
continue
# Try PyMuPDF first (best quality)
text = extract_pdf_text_with_pymupdf(str(file_path))
if text is None:
# Try pdfplumber
text = extract_pdf_text_with_pdfplumber(str(file_path))
print(f"Processing PDF: {file_path}")
if text:
# Create a simple document structure
from llama_index.core import Document
# Try PyMuPDF first (best quality)
text = extract_pdf_text_with_pymupdf(str(file_path))
if text is None:
# Try pdfplumber
text = extract_pdf_text_with_pdfplumber(str(file_path))
if text:
# Create a simple document structure
from llama_index.core import Document
doc = Document(text=text, metadata={"source": str(file_path)})
documents.append(doc)
else:
# Fallback to default reader
print(f"Using default reader for {file_path}")
try:
default_docs = SimpleDirectoryReader(
str(file_path.parent),
filename_as_id=True,
required_exts=[file_path.suffix],
).load_data()
documents.extend(default_docs)
except Exception as e:
print(f"Warning: Could not process {file_path}: {e}")
doc = Document(text=text, metadata={"source": str(file_path)})
documents.append(doc)
else:
# Fallback to default reader
print(f"Using default reader for {file_path}")
default_docs = SimpleDirectoryReader(
str(file_path.parent),
filename_as_id=True,
required_exts=[file_path.suffix],
).load_data()
documents.extend(default_docs)
# Load other file types with default reader
if custom_file_types:
@@ -423,34 +376,13 @@ Examples:
]
# Try to load other file types, but don't fail if none are found
try:
# Create a custom file filter function using our PathSpec
def file_filter(file_path: str) -> bool:
"""Return True if file should be included (not excluded)"""
try:
docs_path_obj = Path(docs_dir)
file_path_obj = Path(file_path)
relative_path = file_path_obj.relative_to(docs_path_obj)
return not self._should_exclude_file(relative_path, gitignore_matches)
except (ValueError, OSError):
return True # Include files that can't be processed
other_docs = SimpleDirectoryReader(
docs_dir,
recursive=True,
encoding="utf-8",
required_exts=code_extensions,
file_extractor={}, # Use default extractors
filename_as_id=True,
).load_data(show_progress=True)
# Filter documents after loading based on gitignore rules
filtered_docs = []
for doc in other_docs:
file_path = doc.metadata.get("file_path", "")
if file_filter(file_path):
filtered_docs.append(doc)
documents.extend(filtered_docs)
documents.extend(other_docs)
except ValueError as e:
if "No files found" in str(e):
print("No additional files found for other supported types.")
@@ -523,13 +455,7 @@ Examples:
async def build_index(self, args):
docs_dir = args.docs
# Use current directory name if index_name not provided
if args.index_name:
index_name = args.index_name
else:
index_name = Path.cwd().name
print(f"Using current directory name as index: '{index_name}'")
index_name = args.index_name
index_dir = self.indexes_dir / index_name
index_path = self.get_index_path(index_name)

View File

@@ -305,24 +305,14 @@ class EmbeddingServerManager:
project_root = Path(__file__).parent.parent.parent.parent.parent
logger.info(f"Command: {' '.join(command)}")
# In CI environment, redirect output to avoid buffer deadlock
# Embedding servers use many print statements that can fill buffers
is_ci = os.environ.get("CI") == "true"
if is_ci:
stdout_target = subprocess.DEVNULL
stderr_target = subprocess.DEVNULL
logger.info("CI environment detected, redirecting embedding server output to DEVNULL")
else:
stdout_target = None # Direct to console for visible logs
stderr_target = None # Direct to console for visible logs
# IMPORTANT: Use a new session so we can manage the whole process group reliably
# Let server output go directly to console
# The server will respect LEANN_LOG_LEVEL environment variable
self.server_process = subprocess.Popen(
command,
cwd=project_root,
stdout=stdout_target,
stderr=stderr_target,
start_new_session=True,
stdout=None, # Direct to console
stderr=None, # Direct to console
start_new_session=True, # Create new process group for better cleanup
)
self.server_port = port
logger.info(f"Server process started with PID: {self.server_process.pid}")
@@ -364,7 +354,8 @@ class EmbeddingServerManager:
logger.info(
f"Terminating server process (PID: {self.server_process.pid}) for backend {self.backend_module_name}..."
)
# Try terminating the whole process group first (POSIX)
# Try terminating the whole process group first
try:
pgid = os.getpgid(self.server_process.pid)
os.killpg(pgid, signal.SIGTERM)
@@ -379,10 +370,12 @@ class EmbeddingServerManager:
logger.warning(
f"Server process {self.server_process.pid} did not terminate gracefully within 3 seconds, killing it."
)
# Try killing the whole process group
try:
pgid = os.getpgid(self.server_process.pid)
os.killpg(pgid, signal.SIGKILL)
except Exception:
# Fallback to killing just the process
self.server_process.kill()
try:
self.server_process.wait(timeout=2)
@@ -393,21 +386,28 @@ class EmbeddingServerManager:
)
# Don't hang indefinitely
# Clean up process resources without waiting
# The process should already be terminated/killed above
# Don't wait here as it can hang CI indefinitely
# Clean up process resources to prevent resource tracker warnings
try:
self.server_process.wait(timeout=1) # Give it one final chance with timeout
except subprocess.TimeoutExpired:
logger.warning(
f"Process {self.server_process.pid} still hanging after all kill attempts"
)
# Don't wait indefinitely - just abandon it
except Exception:
pass
self.server_process = None
def _launch_server_process_colab(self, command: list, port: int) -> None:
"""Launch the server process with Colab-specific settings."""
logger.info(f"Colab Command: {' '.join(command)}")
# In Colab, redirect to DEVNULL to avoid pipe blocking
# PIPE without reading can cause hangs
# In Colab, we need to be more careful about process management
self.server_process = subprocess.Popen(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
self.server_port = port

View File

@@ -25,61 +25,32 @@ def handle_request(request):
"tools": [
{
"name": "leann_search",
"description": """🔍 Search code using natural language - like having a coding assistant who knows your entire codebase!
🎯 **Perfect for**:
- "How does authentication work?" → finds auth-related code
- "Error handling patterns" → locates try-catch blocks and error logic
- "Database connection setup" → finds DB initialization code
- "API endpoint definitions" → locates route handlers
- "Configuration management" → finds config files and usage
💡 **Pro tip**: Use this before making any changes to understand existing patterns and conventions.""",
"description": "Search LEANN index",
"inputSchema": {
"type": "object",
"properties": {
"index_name": {
"type": "string",
"description": "Name of the LEANN index to search. Use 'leann_list' first to see available indexes.",
},
"query": {
"type": "string",
"description": "Search query - can be natural language (e.g., 'how to handle errors') or technical terms (e.g., 'async function definition')",
},
"top_k": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 20,
"description": "Number of search results to return. Use 5-10 for focused results, 15-20 for comprehensive exploration.",
},
"complexity": {
"type": "integer",
"default": 32,
"minimum": 16,
"maximum": 128,
"description": "Search complexity level. Use 16-32 for fast searches (recommended), 64+ for higher precision when needed.",
},
"index_name": {"type": "string"},
"query": {"type": "string"},
"top_k": {"type": "integer", "default": 5},
},
"required": ["index_name", "query"],
},
},
{
"name": "leann_status",
"description": "📊 Check the health and stats of your code indexes - like a medical checkup for your codebase knowledge!",
"name": "leann_ask",
"description": "Ask question using LEANN RAG",
"inputSchema": {
"type": "object",
"properties": {
"index_name": {
"type": "string",
"description": "Optional: Name of specific index to check. If not provided, shows status of all indexes.",
}
"index_name": {"type": "string"},
"question": {"type": "string"},
},
"required": ["index_name", "question"],
},
},
{
"name": "leann_list",
"description": "📋 Show all your indexed codebases - your personal code library! Use this to see what's available for search.",
"description": "List all LEANN indexes",
"inputSchema": {"type": "object", "properties": {}},
},
]
@@ -92,41 +63,19 @@ def handle_request(request):
try:
if tool_name == "leann_search":
# Validate required parameters
if not args.get("index_name") or not args.get("query"):
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"content": [
{
"type": "text",
"text": "Error: Both index_name and query are required",
}
]
},
}
# Build simplified command
cmd = [
"leann",
"search",
args["index_name"],
args["query"],
"--recompute-embeddings",
f"--top-k={args.get('top_k', 5)}",
f"--complexity={args.get('complexity', 32)}",
]
result = subprocess.run(cmd, capture_output=True, text=True)
elif tool_name == "leann_status":
if args.get("index_name"):
# Check specific index status - for now, we'll use leann list and filter
result = subprocess.run(["leann", "list"], capture_output=True, text=True)
# We could enhance this to show more detailed status per index
else:
# Show all indexes status
result = subprocess.run(["leann", "list"], capture_output=True, text=True)
elif tool_name == "leann_ask":
cmd = f'echo "{args["question"]}" | leann ask {args["index_name"]} --recompute-embeddings --llm ollama --model qwen3:8b'
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
elif tool_name == "leann_list":
result = subprocess.run(["leann", "list"], capture_output=True, text=True)

View File

@@ -138,9 +138,9 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 second timeout
socket.setsockopt(zmq.SNDTIMEO, 5000) # 5 second timeout
socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection
socket.setsockopt(zmq.RCVTIMEO, 300000)
socket.setsockopt(zmq.SNDTIMEO, 300000)
socket.setsockopt(zmq.IMMEDIATE, 1)
socket.connect(f"tcp://localhost:{zmq_port}")
# Send embedding request
@@ -162,9 +162,8 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
raise RuntimeError(f"Failed to compute embeddings via server: {e}")
finally:
if socket:
socket.close(linger=0)
if context:
context.term()
socket.close()
# Don't call context.term() - this was causing hangs
@abstractmethod
def search(
@@ -199,22 +198,10 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
pass
def cleanup(self):
"""Cleanup resources including embedding server and ZMQ connections."""
# Stop embedding server
"""Cleanup resources including embedding server."""
if hasattr(self, "embedding_server_manager"):
self.embedding_server_manager.stop_server()
# Set ZMQ linger but don't terminate global context
try:
import zmq
# Just set linger on the global instance
ctx = zmq.Context.instance()
ctx.linger = 0
# NEVER call ctx.term() on the global instance
except Exception:
pass
def __del__(self):
"""Ensures resources are cleaned up when the searcher is destroyed."""
try:

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann"
version = "0.2.7"
version = "0.2.5"
description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -44,16 +44,13 @@ dependencies = [
"mlx-lm>=0.26.0; sys_platform == 'darwin'",
"psutil>=5.8.0",
"pybind11>=3.0.0",
"pathspec>=0.12.1",
"nbconvert>=7.16.6",
"gitignore-parser>=0.1.12",
]
[project.optional-dependencies]
dev = [
"pytest>=8.3.0", # Minimum version for Python 3.13 support
"pytest-cov>=5.0",
"pytest-xdist>=3.5", # For parallel test execution
"pytest>=7.0",
"pytest-cov>=4.0",
"pytest-xdist>=3.0", # For parallel test execution
"black>=23.0",
"ruff==0.12.7", # Fixed version to ensure consistent formatting across all environments
"matplotlib",
@@ -62,10 +59,8 @@ dev = [
]
test = [
"pytest>=8.3.0", # Minimum version for Python 3.13 support
"pytest-timeout>=2.3",
"anyio>=4.0", # For async test support (includes pytest plugin)
"psutil>=5.9.0", # For process cleanup in tests
"pytest>=7.0",
"pytest-timeout>=2.0", # Simple timeout protection for CI
"llama-index-core>=0.12.0",
"llama-index-readers-file>=0.4.0",
"python-dotenv>=1.0.0",
@@ -157,8 +152,7 @@ markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"openai: marks tests that require OpenAI API key",
]
timeout = 300 # Reduced from 600s (10min) to 300s (5min) for CI safety
timeout_method = "thread" # Use thread method to avoid non-daemon thread issues
timeout = 300 # Simple timeout for CI safety (5 minutes)
addopts = [
"-v",
"--tb=short",

View File

@@ -1,103 +0,0 @@
#!/bin/bash
# Diagnostic script for debugging CI hangs
echo "========================================="
echo " CI HANG DIAGNOSTIC SCRIPT"
echo "========================================="
echo ""
echo "📅 Current time: $(date)"
echo "🖥️ Hostname: $(hostname)"
echo "👤 User: $(whoami)"
echo "📂 Working directory: $(pwd)"
echo ""
echo "=== PYTHON ENVIRONMENT ==="
python --version 2>&1 || echo "Python not found"
pip list 2>&1 | head -20 || echo "pip not available"
echo ""
echo "=== PROCESS INFORMATION ==="
echo "Current shell PID: $$"
echo "Parent PID: $PPID"
echo ""
echo "All Python processes:"
ps aux | grep -E "[p]ython" || echo "No Python processes"
echo ""
echo "All pytest processes:"
ps aux | grep -E "[p]ytest" || echo "No pytest processes"
echo ""
echo "Embedding server processes:"
ps aux | grep -E "[e]mbedding_server" || echo "No embedding server processes"
echo ""
echo "Zombie processes:"
ps aux | grep "<defunct>" || echo "No zombie processes"
echo ""
echo "=== NETWORK INFORMATION ==="
echo "Network listeners on typical embedding server ports:"
ss -ltn 2>/dev/null | grep -E ":555[0-9]|:556[0-9]" || netstat -ltn 2>/dev/null | grep -E ":555[0-9]|:556[0-9]" || echo "No listeners on embedding ports"
echo ""
echo "All network listeners:"
ss -ltn 2>/dev/null | head -20 || netstat -ltn 2>/dev/null | head -20 || echo "Cannot get network info"
echo ""
echo "=== FILE DESCRIPTORS ==="
echo "Open files for current shell:"
lsof -p $$ 2>/dev/null | head -20 || echo "lsof not available"
echo ""
if [ -d "/proc/$$" ]; then
echo "File descriptors for current shell (/proc/$$/fd):"
ls -la /proc/$$/fd 2>/dev/null | head -20 || echo "Cannot access /proc/$$/fd"
echo ""
fi
echo "=== SYSTEM RESOURCES ==="
echo "Memory usage:"
free -h 2>/dev/null || vm_stat 2>/dev/null || echo "Cannot get memory info"
echo ""
echo "Disk usage:"
df -h . 2>/dev/null || echo "Cannot get disk info"
echo ""
echo "CPU info:"
nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo "Cannot get CPU info"
echo ""
echo "=== PYTHON SPECIFIC CHECKS ==="
python -c "
import sys
import os
print(f'Python executable: {sys.executable}')
print(f'Python path: {sys.path[:3]}...')
print(f'Environment PYTHONPATH: {os.environ.get(\"PYTHONPATH\", \"Not set\")}')
print(f'Site packages: {[p for p in sys.path if \"site-packages\" in p][:2]}')
" 2>&1 || echo "Cannot run Python diagnostics"
echo ""
echo "=== ZMQ SPECIFIC CHECKS ==="
python -c "
try:
import zmq
print(f'ZMQ version: {zmq.zmq_version()}')
print(f'PyZMQ version: {zmq.pyzmq_version()}')
ctx = zmq.Context.instance()
print(f'ZMQ context instance: {ctx}')
except Exception as e:
print(f'ZMQ check failed: {e}')
" 2>&1 || echo "Cannot check ZMQ"
echo ""
echo "=== PYTEST CHECK ==="
pytest --version 2>&1 || echo "pytest not found"
echo ""
echo "=== END OF DIAGNOSTICS ==="
echo "Generated at: $(date)"

View File

@@ -1,301 +1,41 @@
"""Global test configuration and cleanup fixtures."""
"""Pytest configuration and fixtures for LEANN tests."""
import faulthandler
import os
import signal
import time
from collections.abc import Generator
import pytest
# Enable faulthandler to dump stack traces
faulthandler.enable()
@pytest.fixture(scope="session", autouse=True)
def _ci_backtraces():
"""Dump stack traces before CI timeout to diagnose hanging."""
if os.getenv("CI") == "true":
# Dump stack traces 10s before the 180s timeout
faulthandler.dump_traceback_later(170, repeat=True)
yield
faulthandler.cancel_dump_traceback_later()
@pytest.fixture(scope="session", autouse=True)
def global_test_cleanup() -> Generator:
"""Global cleanup fixture that runs after all tests.
This ensures all ZMQ connections and child processes are properly cleaned up,
preventing the test runner from hanging on exit.
"""
yield
# Cleanup after all tests
print("\n🧹 Running global test cleanup...")
# 1. Force cleanup of any LeannSearcher instances
try:
import gc
# Force garbage collection to trigger __del__ methods
gc.collect()
time.sleep(0.2)
except Exception:
pass
# 2. Set ZMQ linger but DON'T term Context.instance()
# Terminating the global instance can block if other code still has sockets
try:
import zmq
# Just set linger on the global instance, don't terminate it
ctx = zmq.Context.instance()
ctx.linger = 0
# Do NOT call ctx.term() or ctx.destroy() on the global instance!
# That would block waiting for all sockets to close
except Exception:
pass
# Kill any leftover child processes (including grandchildren)
try:
import psutil
current_process = psutil.Process()
# Get ALL descendants recursively
children = current_process.children(recursive=True)
if children:
print(f"\n⚠️ Cleaning up {len(children)} leftover child processes...")
# First try to terminate gracefully
for child in children:
try:
print(f" Terminating {child.pid} ({child.name()})")
child.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Wait a bit for processes to terminate
gone, alive = psutil.wait_procs(children, timeout=2)
# Force kill any remaining processes
for child in alive:
try:
print(f" Force killing process {child.pid} ({child.name()})")
child.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Final wait to ensure cleanup
psutil.wait_procs(alive, timeout=1)
except ImportError:
# psutil not installed, try basic process cleanup
try:
# Send SIGTERM to all child processes
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
except Exception:
pass
except Exception as e:
print(f"Warning: Error during process cleanup: {e}")
# List and clean up remaining threads
try:
import threading
threads = [t for t in threading.enumerate() if t is not threading.main_thread()]
if threads:
print(f"\n⚠️ {len(threads)} non-main threads still running:")
for t in threads:
print(f" - {t.name} (daemon={t.daemon})")
# Force cleanup of pytest-timeout threads that block exit
if "pytest_timeout" in t.name and not t.daemon:
print(f" 🔧 Converting pytest-timeout thread to daemon: {t.name}")
try:
t.daemon = True
print(" ✓ Converted to daemon thread")
except Exception as e:
print(f" ✗ Failed: {e}")
# Check if only daemon threads remain
non_daemon = [
t for t in threading.enumerate() if t is not threading.main_thread() and not t.daemon
]
if non_daemon:
print(f"\n⚠️ {len(non_daemon)} non-daemon threads still blocking exit")
# Force exit in CI to prevent hanging
if os.environ.get("CI") == "true":
print("🔨 Forcing exit in CI environment...")
os._exit(0)
except Exception as e:
print(f"Thread cleanup error: {e}")
@pytest.fixture
def auto_cleanup_searcher():
"""Fixture that automatically cleans up LeannSearcher instances."""
searchers = []
def register(searcher):
"""Register a searcher for cleanup."""
searchers.append(searcher)
return searcher
yield register
# Cleanup all registered searchers
for searcher in searchers:
try:
searcher.cleanup()
except Exception:
pass
# Force garbage collection
import gc
gc.collect()
time.sleep(0.1)
@pytest.fixture(scope="session", autouse=True)
def _reap_children():
"""Reap all child processes at session end as a safety net."""
yield
# Final aggressive cleanup
try:
import psutil
me = psutil.Process()
kids = me.children(recursive=True)
for p in kids:
try:
p.terminate()
except Exception:
pass
_, alive = psutil.wait_procs(kids, timeout=2)
for p in alive:
try:
p.kill()
except Exception:
pass
except Exception:
pass
@pytest.fixture(autouse=True)
def cleanup_after_each_test():
"""Cleanup after each test to prevent resource leaks."""
def test_environment():
"""Set up test environment variables."""
# Mark as test environment to skip memory-intensive operations
os.environ["CI"] = "true"
yield
# Force garbage collection to trigger any __del__ methods
import gc
gc.collect()
@pytest.fixture(scope="session", autouse=True)
def cleanup_session():
"""Session-level cleanup to ensure no hanging processes."""
yield
# Give a moment for async cleanup
time.sleep(0.1)
def pytest_configure(config):
"""Configure pytest with better timeout handling."""
# Set default timeout method to thread if not specified
if not config.getoption("--timeout-method", None):
config.option.timeout_method = "thread"
# Add more logging
print(f"🔧 Pytest configured at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Python version: {os.sys.version}")
print(f" Platform: {os.sys.platform}")
def pytest_sessionstart(session):
"""Called after the Session object has been created."""
print(f"🏁 Pytest session starting at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Session ID: {id(session)}")
# Show initial process state
# Basic cleanup after all tests
try:
import os
import psutil
current = psutil.Process()
print(f" Current PID: {current.pid}")
print(f" Parent PID: {current.ppid()}")
children = current.children(recursive=True)
if children:
print(f" ⚠️ Already have {len(children)} child processes at start!")
current_process = psutil.Process(os.getpid())
children = current_process.children(recursive=True)
for child in children:
try:
child.terminate()
except psutil.NoSuchProcess:
pass
# Give them time to terminate gracefully
psutil.wait_procs(children, timeout=3)
except Exception:
# Don't fail tests due to cleanup errors
pass
def pytest_sessionfinish(session, exitstatus):
"""Called after whole test run finished."""
print(f"🏁 Pytest session finishing at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Exit status: {exitstatus}")
# Aggressive cleanup before pytest exits
print("🧹 Starting aggressive cleanup...")
# First, clean up child processes
try:
import psutil
current = psutil.Process()
children = current.children(recursive=True)
if children:
print(f" Found {len(children)} child processes to clean up:")
for child in children:
try:
print(f" - PID {child.pid}: {child.name()} (status: {child.status()})")
child.terminate()
except Exception as e:
print(f" - Failed to terminate {child.pid}: {e}")
# Wait briefly then kill
time.sleep(0.5)
_, alive = psutil.wait_procs(children, timeout=1)
for child in alive:
try:
print(f" - Force killing {child.pid}")
child.kill()
except Exception:
pass
else:
print(" No child processes found")
except Exception as e:
print(f" Process cleanup error: {e}")
# Second, clean up problematic threads
try:
import threading
threads = [t for t in threading.enumerate() if t is not threading.main_thread()]
if threads:
print(f" Found {len(threads)} non-main threads:")
for t in threads:
print(f" - {t.name} (daemon={t.daemon})")
# Convert pytest-timeout threads to daemon so they don't block exit
if "pytest_timeout" in t.name and not t.daemon:
try:
t.daemon = True
print(" ✓ Converted to daemon")
except Exception:
pass
# Force exit if non-daemon threads remain in CI
non_daemon = [
t for t in threading.enumerate() if t is not threading.main_thread() and not t.daemon
]
if non_daemon and os.environ.get("CI") == "true":
print(f" ⚠️ {len(non_daemon)} non-daemon threads remain, forcing exit...")
os._exit(exitstatus or 0)
except Exception as e:
print(f" Thread cleanup error: {e}")
print(f"✅ Pytest exiting at {time.strftime('%Y-%m-%d %H:%M:%S')}")

View File

@@ -7,7 +7,6 @@ import tempfile
from pathlib import Path
import pytest
from test_timeout import ci_timeout
def test_imports():
@@ -20,7 +19,6 @@ def test_imports():
os.environ.get("CI") == "true", reason="Skip model tests in CI to avoid MPS memory issues"
)
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
@ci_timeout(120) # 2 minute timeout for backend tests
def test_backend_basic(backend_name):
"""Test basic functionality for each backend."""
from leann.api import LeannBuilder, LeannSearcher, SearchResult
@@ -70,7 +68,6 @@ def test_backend_basic(backend_name):
@pytest.mark.skipif(
os.environ.get("CI") == "true", reason="Skip model tests in CI to avoid MPS memory issues"
)
@ci_timeout(180) # 3 minute timeout for large index test
def test_large_index():
"""Test with larger dataset."""
from leann.api import LeannBuilder, LeannSearcher

View File

@@ -9,7 +9,6 @@ import tempfile
from pathlib import Path
import pytest
from test_timeout import ci_timeout
@pytest.fixture
@@ -59,10 +58,6 @@ def test_document_rag_simulated(test_data_dir):
@pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY"), reason="OpenAI API key not available")
@pytest.mark.skipif(
os.environ.get("CI") == "true", reason="Skip OpenAI embedding tests in CI to avoid hanging"
)
@ci_timeout(60) # 60 second timeout to avoid hanging on OpenAI API calls
def test_document_rag_openai(test_data_dir):
"""Test document_rag with OpenAI embeddings."""
with tempfile.TemporaryDirectory() as temp_dir:

View File

@@ -8,11 +8,9 @@ import tempfile
from pathlib import Path
import pytest
from test_timeout import ci_timeout
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
@ci_timeout(90) # 90 second timeout for this comprehensive test
def test_readme_basic_example(backend_name):
"""Test the basic example from README.md with both backends."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2
@@ -81,7 +79,6 @@ def test_readme_imports():
assert callable(LeannChat)
@ci_timeout(60) # 60 second timeout
def test_backend_options():
"""Test different backend options mentioned in documentation."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2
@@ -118,7 +115,6 @@ def test_backend_options():
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
@ci_timeout(75) # 75 second timeout for LLM tests
def test_llm_config_simulated(backend_name):
"""Test simulated LLM configuration option with both backends."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2

View File

@@ -1,129 +0,0 @@
"""
Test timeout utilities for CI environments.
"""
import functools
import os
import signal
import sys
from typing import Any, Callable
def timeout_test(seconds: int = 30):
"""
Decorator to add timeout to test functions, especially useful in CI environments.
Args:
seconds: Timeout in seconds (default: 30)
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Only apply timeout in CI environment
if os.environ.get("CI") != "true":
return func(*args, **kwargs)
# Set up timeout handler
def timeout_handler(signum, frame):
print(f"\n❌ Test {func.__name__} timed out after {seconds} seconds in CI!")
print("This usually indicates a hanging process or infinite loop.")
# Try to cleanup any hanging processes
try:
import subprocess
subprocess.run(
["pkill", "-f", "embedding_server"], capture_output=True, timeout=2
)
subprocess.run(
["pkill", "-f", "hnsw_embedding"], capture_output=True, timeout=2
)
except Exception:
pass
# Exit with timeout code
sys.exit(124) # Standard timeout exit code
# Set signal handler and alarm
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
signal.alarm(0) # Cancel alarm
return result
except Exception:
signal.alarm(0) # Cancel alarm on exception
raise
finally:
# Restore original handler
signal.signal(signal.SIGALRM, old_handler)
return wrapper
return decorator
def ci_timeout(seconds: int = 60):
"""
Timeout decorator specifically for CI environments.
Uses threading for more reliable timeout handling.
Args:
seconds: Timeout in seconds (default: 60)
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Only apply in CI
if os.environ.get("CI") != "true":
return func(*args, **kwargs)
import threading
result = [None]
exception = [None]
finished = threading.Event()
def target():
try:
result[0] = func(*args, **kwargs)
except Exception as e:
exception[0] = e
finally:
finished.set()
# Start function in thread
thread = threading.Thread(target=target, daemon=True)
thread.start()
# Wait for completion or timeout
if not finished.wait(timeout=seconds):
print(f"\n💥 CI TIMEOUT: Test {func.__name__} exceeded {seconds}s limit!")
print("This usually indicates hanging embedding servers or infinite loops.")
# Try to cleanup embedding servers
try:
import subprocess
subprocess.run(
["pkill", "-9", "-f", "embedding_server"], capture_output=True, timeout=2
)
subprocess.run(
["pkill", "-9", "-f", "hnsw_embedding"], capture_output=True, timeout=2
)
print("Attempted to kill hanging embedding servers.")
except Exception as e:
print(f"Cleanup failed: {e}")
# Raise TimeoutError instead of sys.exit for better pytest integration
raise TimeoutError(f"Test {func.__name__} timed out after {seconds} seconds")
if exception[0]:
raise exception[0]
return result[0]
return wrapper
return decorator

7322
uv.lock generated
View File

File diff suppressed because it is too large Load Diff