Compare commits

..

12 Commits

Author SHA1 Message Date
Andy Lee
80330f8d97 fix: remove whitespace from blank line
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-11 00:04:32 +00:00
Andy Lee
4772a5bb18 feat: add process group management to prevent hanging subprocesses
- Add start_new_session=True to subprocess.Popen for better isolation
- Use os.killpg() to terminate entire process groups instead of single processes
- Import signal module for SIGTERM/SIGKILL handling
- This ensures child processes of embedding servers are also cleaned up

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 22:11:12 +00:00
Andy Lee
3d67205670 fix: remove Chinese comments to pass ruff check
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 08:31:21 +00:00
Andy Lee
4de709ad4b feat: add ZMQ timeout configurations to prevent hanging
- Add RCVTIMEO (300s) to prevent recv operations from hanging indefinitely
- Add SNDTIMEO (300s) to prevent send operations from hanging indefinitely
- Add IMMEDIATE mode to avoid message queue blocking
- Applied to both api.py and searcher_base.py ZMQ socket connections

This ensures ZMQ operations timeout gracefully instead of hanging the process
when embedding servers become unresponsive.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 08:30:02 +00:00
Andy Lee
48c82ee3e3 fix: remove strict parameter from zip() for Python 3.9 compatibility
The strict parameter for zip() was added in Python 3.10.
Remove it to support Python 3.9.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:58:42 +00:00
Andy Lee
6d1ac4a503 fix: use Python 3.9 compatible builtin generics
- Convert List[str] to list[str], Dict[str, Any] to dict[str, Any], etc.
- Use ruff --unsafe-fixes to automatically apply all type annotation updates
- Remove deprecated typing imports (List, Dict, Tuple) where no longer needed
- Keep Optional[str] syntax (union operator | not supported in Python 3.9)

Now all type annotations are Python 3.9 compatible with modern builtin generics.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:38:33 +00:00
Andy Lee
ffba435252 fix: Python 3.9 compatibility - replace union types and builtin generics
- Replace 'str | None' with 'Optional[str]'
- Replace 'list[str]' with 'List[str]'
- Replace 'dict[' with 'Dict['
- Replace 'tuple[' with 'Tuple['
- Add missing typing imports (List, Dict, Tuple)

Fixes TypeError: unsupported operand type(s) for |: 'type' and 'NoneType'

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:29:46 +00:00
Andy Lee
728fa42ad5 style: run ruff format on modified files
- Format diskann_backend.py and conftest.py according to ruff standards

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-10 00:11:16 +00:00
Andy Lee
bce8aca3fa fix: ensure newline at end of conftest.py for ruff compliance 2025-08-09 23:56:18 +00:00
Andy Lee
f4e41e4353 style: fix ruff formatting issues in conftest.py
- Fix import sorting and organization
- Remove trailing whitespace
- Add proper newline at end of file

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-09 23:53:31 +00:00
Andy Lee
75c7b047d7 Merge branch 'main' into fix/clean-hang-solution 2025-08-09 16:49:51 -07:00
Andy Lee
490329dc66 fix: clean and simple hang prevention solution
This commit provides a minimal, focused fix for CI hanging issues by addressing the root causes:

**Key Changes:**

1. **ZMQ Resource Management:**
   - Remove `context.term()` calls that were causing hangs
   - Add `socket.setsockopt(zmq.LINGER, 0)` to prevent blocking on close
   - Keep socket operations simple with default timeouts (no artificial limits)

2. **Process Cleanup:**
   - Add timeout (1s) to final `process.wait()` in embedding server manager
   - Prevent infinite waiting that was causing CI hangs

3. **Resource Cleanup Methods:**
   - Add simple `cleanup()` methods to searchers and API classes
   - Focus on C++ object destruction for DiskANN backend
   - Avoid complex cleanup logic that could introduce new issues

4. **Basic Test Safety:**
   - Simple pytest-timeout configuration (300s)
   - Basic test session cleanup using psutil
   - Minimal conftest.py without complex logic

**Philosophy:**
This solution avoids the complex multi-layered fixes from the previous PR chain.
Instead, it targets the specific root causes:
- ZMQ context termination blocking
- Process wait() without timeout
- C++ resource leaks in backends

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-09 23:45:18 +00:00
26 changed files with 3604 additions and 4896 deletions

View File

@@ -5,16 +5,7 @@ on:
branches: [ main ] branches: [ main ]
pull_request: pull_request:
branches: [ main ] branches: [ main ]
workflow_dispatch:
inputs:
debug_enabled:
type: boolean
description: 'Run with tmate debugging enabled (SSH access to runner)'
required: false
default: false
jobs: jobs:
build: build:
uses: ./.github/workflows/build-reusable.yml uses: ./.github/workflows/build-reusable.yml
with:
debug_enabled: ${{ github.event_name == 'workflow_dispatch' && inputs.debug_enabled || false }}

View File

@@ -8,11 +8,6 @@ on:
required: false required: false
type: string type: string
default: '' default: ''
debug_enabled:
description: 'Enable tmate debugging session for troubleshooting'
required: false
type: boolean
default: false
jobs: jobs:
lint: lint:
@@ -228,48 +223,7 @@ jobs:
python -c "from leann_backend_diskann import _diskannpy; print('_diskannpy imported successfully')" || echo "Failed to import _diskannpy" python -c "from leann_backend_diskann import _diskannpy; print('_diskannpy imported successfully')" || echo "Failed to import _diskannpy"
ls -la $(python -c "import leann_backend_diskann; import os; print(os.path.dirname(leann_backend_diskann.__file__))" 2>/dev/null) 2>/dev/null || echo "Failed to list module directory" ls -la $(python -c "import leann_backend_diskann; import os; print(os.path.dirname(leann_backend_diskann.__file__))" 2>/dev/null) 2>/dev/null || echo "Failed to list module directory"
# Extra debugging for Python 3.13
if [[ "${{ matrix.python }}" == "3.13" ]]; then
echo "=== Python 3.13 Debug Info ==="
echo "Python version details:"
python --version
python -c "import sys; print(f'sys.version_info: {sys.version_info}')"
echo "Pytest version:"
python -m pytest --version
echo "Testing basic pytest collection:"
if [[ "$RUNNER_OS" == "Linux" ]]; then
timeout --signal=INT 10 python -m pytest --collect-only tests/test_ci_minimal.py -v || echo "Collection timed out or failed"
else
# No timeout on macOS/Windows
python -m pytest --collect-only tests/test_ci_minimal.py -v || echo "Collection failed"
fi
echo "Testing single simple test:"
if [[ "$RUNNER_OS" == "Linux" ]]; then
timeout --signal=INT 10 python -m pytest tests/test_ci_minimal.py::test_package_imports --full-trace -v || echo "Simple test timed out or failed"
else
# No timeout on macOS/Windows
python -m pytest tests/test_ci_minimal.py::test_package_imports --full-trace -v || echo "Simple test failed"
fi
fi
# Enable tmate debugging session if requested
- name: Setup tmate session for debugging
if: ${{ inputs.debug_enabled }}
uses: mxschmitt/action-tmate@v3
with:
detached: true
timeout-minutes: 30
limit-access-to-actor: true
- name: Run tests with pytest - name: Run tests with pytest
# Timeout hierarchy:
# 1. Individual test timeout: 20s (see pyproject.toml markers)
# 2. Pytest session timeout: 300s (see pyproject.toml [tool.pytest.ini_options])
# 3. Outer shell timeout: 360s (300s + 60s buffer for cleanup)
# 4. GitHub Actions job timeout: 6 hours (default)
env: env:
CI: true # Mark as CI environment to skip memory-intensive tests CI: true # Mark as CI environment to skip memory-intensive tests
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
@@ -282,165 +236,8 @@ jobs:
# Activate virtual environment # Activate virtual environment
source .venv/bin/activate || source .venv/Scripts/activate source .venv/bin/activate || source .venv/Scripts/activate
# Define comprehensive diagnostic function # Run all tests
diag() { pytest tests/
echo "===== COMPREHENSIVE DIAGNOSTICS BEGIN ====="
date
echo ""
echo "### Current Shell Info ###"
echo "Shell PID: $$"
echo "Shell PPID: $PPID"
echo "Current directory: $(pwd)"
echo ""
echo "### Process Tree (full) ###"
pstree -ap 2>/dev/null || ps auxf || true
echo ""
echo "### All Python/Pytest Processes ###"
ps -ef | grep -E 'python|pytest' | grep -v grep || true
echo ""
echo "### Embedding Server Processes ###"
ps -ef | grep -E 'embedding|zmq|diskann' | grep -v grep || true
echo ""
echo "### Network Listeners ###"
ss -ltnp 2>/dev/null || netstat -ltn 2>/dev/null || true
echo ""
echo "### Open File Descriptors (lsof) ###"
lsof -p $$ 2>/dev/null | head -20 || true
echo ""
echo "### Zombie Processes ###"
ps aux | grep '<defunct>' || echo "No zombie processes"
echo ""
echo "### Current Jobs ###"
jobs -l || true
echo ""
echo "### /proc/PID/fd for current shell ###"
ls -la /proc/$$/fd 2>/dev/null || true
echo ""
echo "===== COMPREHENSIVE DIAGNOSTICS END ====="
}
# Enable verbose logging for debugging
export PYTHONUNBUFFERED=1
export PYTEST_CURRENT_TEST=1
# Run all tests with extensive logging
if [[ "$RUNNER_OS" == "Linux" ]]; then
echo "🚀 Starting Linux test execution with timeout..."
echo "Current time: $(date)"
echo "Shell PID: $$"
echo "Python: $(python --version)"
echo "Pytest: $(pytest --version)"
# Show environment variables for debugging
echo "📦 Environment variables:"
env | grep -E "PYTHON|PYTEST|CI|RUNNER" | sort
# Set trap for diagnostics
trap diag INT TERM EXIT
echo "📋 Pre-test diagnostics:"
ps -ef | grep -E 'python|pytest' | grep -v grep || echo "No python/pytest processes before test"
# Check for any listening ports before test
echo "🔌 Pre-test network state:"
ss -ltn 2>/dev/null | grep -E "555[0-9]|556[0-9]" || echo "No embedding server ports open"
# Set timeouts - outer must be larger than pytest's internal timeout
# IMPORTANT: Keep PYTEST_TIMEOUT_SEC in sync with pyproject.toml [tool.pytest.ini_options] timeout
PYTEST_TIMEOUT_SEC=${PYTEST_TIMEOUT_SEC:-300} # Default 300s, matches pyproject.toml
BUFFER_SEC=${TIMEOUT_BUFFER_SEC:-60} # Buffer for cleanup after pytest timeout
OUTER_TIMEOUT_SEC=${OUTER_TIMEOUT_SEC:-$((PYTEST_TIMEOUT_SEC + BUFFER_SEC))}
echo "⏰ Timeout configuration:"
echo " - Pytest internal timeout: ${PYTEST_TIMEOUT_SEC}s (from pyproject.toml)"
echo " - Cleanup buffer: ${BUFFER_SEC}s"
echo " - Outer shell timeout: ${OUTER_TIMEOUT_SEC}s (${PYTEST_TIMEOUT_SEC}s + ${BUFFER_SEC}s buffer)"
echo " - This ensures pytest can complete its own timeout handling and cleanup"
echo "🏃 Running pytest with ${OUTER_TIMEOUT_SEC}s outer timeout..."
# Export for inner shell
export PYTEST_TIMEOUT_SEC OUTER_TIMEOUT_SEC BUFFER_SEC
timeout --preserve-status --signal=INT --kill-after=10 ${OUTER_TIMEOUT_SEC} bash -c '
echo "⏱️ Pytest starting at: $(date)"
echo "Running command: pytest tests/ -vv --maxfail=3 --tb=short --capture=no"
# Run pytest with maximum verbosity and no output capture
pytest tests/ -vv --maxfail=3 --tb=short --capture=no --log-cli-level=DEBUG 2>&1 | tee pytest.log
PYTEST_EXIT=${PIPESTATUS[0]}
echo "✅ Pytest finished at: $(date) with exit code: $PYTEST_EXIT"
echo "Last 20 lines of pytest output:"
tail -20 pytest.log || true
# Immediately check for leftover processes
echo "🔍 Post-pytest process check:"
ps -ef | grep -E "python|pytest|embedding" | grep -v grep || echo "No leftover processes"
# Clean up any children before exit
echo "🧹 Cleaning up child processes..."
pkill -TERM -P $$ 2>/dev/null || true
sleep 0.5
pkill -KILL -P $$ 2>/dev/null || true
echo "📊 Final check before exit:"
ps -ef | grep -E "python|pytest|embedding" | grep -v grep || echo "All clean"
exit $PYTEST_EXIT
'
EXIT_CODE=$?
echo "🔚 Timeout command exited with code: $EXIT_CODE"
if [ $EXIT_CODE -eq 124 ]; then
echo "⚠️ TIMEOUT TRIGGERED - Tests took more than ${OUTER_TIMEOUT_SEC} seconds!"
echo "📸 Capturing full diagnostics..."
diag
# Run diagnostic script if available
if [ -f scripts/diagnose_hang.sh ]; then
echo "🔍 Running diagnostic script..."
bash scripts/diagnose_hang.sh || true
fi
# More aggressive cleanup
echo "💀 Killing all Python processes owned by runner..."
pkill -9 -u runner python || true
pkill -9 -u runner pytest || true
elif [ $EXIT_CODE -ne 0 ]; then
echo "❌ Tests failed with exit code: $EXIT_CODE"
else
echo "✅ All tests passed!"
fi
# Always show final state
echo "📍 Final state check:"
ps -ef | grep -E 'python|pytest|embedding' | grep -v grep || echo "No Python processes remaining"
exit $EXIT_CODE
else
# For macOS/Windows, run without GNU timeout
echo "🚀 Running tests on $RUNNER_OS..."
pytest tests/ -vv --maxfail=3 --tb=short --capture=no --log-cli-level=INFO
fi
# Provide tmate session on test failure for debugging
- name: Setup tmate session on failure
if: ${{ failure() && (inputs.debug_enabled || contains(github.event.head_commit.message, '[debug]')) }}
uses: mxschmitt/action-tmate@v3
with:
timeout-minutes: 30
limit-access-to-actor: true
- name: Run sanity checks (optional) - name: Run sanity checks (optional)
run: | run: |

View File

@@ -97,7 +97,6 @@ uv sync
</details> </details>
## Quick Start ## Quick Start
Our declarative API makes RAG as easy as writing a config file. Our declarative API makes RAG as easy as writing a config file.
@@ -189,7 +188,7 @@ All RAG examples share these common parameters. **Interactive mode** is availabl
--force-rebuild # Force rebuild index even if it exists --force-rebuild # Force rebuild index even if it exists
# Embedding Parameters # Embedding Parameters
--embedding-model MODEL # e.g., facebook/contriever, text-embedding-3-small, nomic-embed-text, mlx-community/Qwen3-Embedding-0.6B-8bit or nomic-embed-text --embedding-model MODEL # e.g., facebook/contriever, text-embedding-3-small, nomic-embed-text, or mlx-community/multilingual-e5-base-mlx
--embedding-mode MODE # sentence-transformers, openai, mlx, or ollama --embedding-mode MODE # sentence-transformers, openai, mlx, or ollama
# LLM Parameters (Text generation models) # LLM Parameters (Text generation models)

View File

@@ -178,9 +178,6 @@ class BaseRAGExample(ABC):
config["host"] = args.llm_host config["host"] = args.llm_host
elif args.llm == "hf": elif args.llm == "hf":
config["model"] = args.llm_model or "Qwen/Qwen2.5-1.5B-Instruct" config["model"] = args.llm_model or "Qwen/Qwen2.5-1.5B-Instruct"
elif args.llm == "simulated":
# Simulated LLM doesn't need additional configuration
pass
return config return config

View File

@@ -236,15 +236,9 @@ python apps/document_rag.py --query "What are the main techniques LEANN explores
3. **Use MLX on Apple Silicon** (optional optimization): 3. **Use MLX on Apple Silicon** (optional optimization):
```bash ```bash
--embedding-mode mlx --embedding-model mlx-community/Qwen3-Embedding-0.6B-8bit --embedding-mode mlx --embedding-model mlx-community/multilingual-e5-base-mlx
``` ```
MLX might not be the best choice, as we tested and found that it only offers 1.3x acceleration compared to HF, so maybe using ollama is a better choice for embedding generation
4. **Use Ollama**
```bash
--embedding-mode ollama --embedding-model nomic-embed-text
```
To discover additional embedding models in ollama, check out https://ollama.com/search?c=embedding or read more about embedding models at https://ollama.com/blog/embedding-models, please do check the model size that works best for you
### If Search Quality is Poor ### If Search Quality is Poor
1. **Increase retrieval count**: 1. **Increase retrieval count**:

View File

@@ -100,7 +100,6 @@ def create_diskann_embedding_server(
socket = context.socket( socket = context.socket(
zmq.REP zmq.REP
) # REP socket for both BaseSearcher and DiskANN C++ REQ clients ) # REP socket for both BaseSearcher and DiskANN C++ REQ clients
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.bind(f"tcp://*:{zmq_port}") socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}") logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}")

View File

@@ -4,8 +4,8 @@ build-backend = "scikit_build_core.build"
[project] [project]
name = "leann-backend-diskann" name = "leann-backend-diskann"
version = "0.2.7" version = "0.2.5"
dependencies = ["leann-core==0.2.7", "numpy", "protobuf>=3.19.0"] dependencies = ["leann-core==0.2.5", "numpy", "protobuf>=3.19.0"]
[tool.scikit-build] [tool.scikit-build]
# Key: simplified CMake path # Key: simplified CMake path

View File

@@ -1,6 +1,5 @@
import argparse import argparse
import gc # Import garbage collector interface import gc # Import garbage collector interface
import logging
import os import os
import struct import struct
import sys import sys
@@ -8,12 +7,6 @@ import time
import numpy as np import numpy as np
# Set up logging to avoid print buffer issues
logger = logging.getLogger(__name__)
LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper()
log_level = getattr(logging, LOG_LEVEL, logging.WARNING)
logger.setLevel(log_level)
# --- FourCCs (add more if needed) --- # --- FourCCs (add more if needed) ---
INDEX_HNSW_FLAT_FOURCC = int.from_bytes(b"IHNf", "little") INDEX_HNSW_FLAT_FOURCC = int.from_bytes(b"IHNf", "little")
# Add other HNSW fourccs if you expect different storage types inside HNSW # Add other HNSW fourccs if you expect different storage types inside HNSW
@@ -250,12 +243,6 @@ def convert_hnsw_graph_to_csr(input_filename, output_filename, prune_embeddings=
output_filename: Output CSR index file output_filename: Output CSR index file
prune_embeddings: Whether to prune embedding storage (write NULL storage marker) prune_embeddings: Whether to prune embedding storage (write NULL storage marker)
""" """
# Disable buffering for print statements to avoid deadlock in CI/pytest
import functools
global print
print = functools.partial(print, flush=True)
print(f"Starting conversion: {input_filename} -> {output_filename}") print(f"Starting conversion: {input_filename} -> {output_filename}")
start_time = time.time() start_time = time.time()
original_hnsw_data = {} original_hnsw_data = {}

View File

@@ -245,25 +245,3 @@ class HNSWSearcher(BaseSearcher):
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels] string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
return {"labels": string_labels, "distances": distances} return {"labels": string_labels, "distances": distances}
def cleanup(self):
"""Cleanup HNSW-specific resources including C++ ZMQ connections."""
# Call parent cleanup first
super().cleanup()
# Additional cleanup for C++ side ZMQ connections
# The ZmqDistanceComputer in C++ uses ZMQ connections that need cleanup
try:
# Delete the index to trigger C++ destructors
if hasattr(self, "index"):
del self.index
except Exception:
pass
# Force garbage collection to ensure C++ objects are destroyed
try:
import gc
gc.collect()
except Exception:
pass

View File

@@ -92,7 +92,6 @@ def create_hnsw_embedding_server(
"""ZMQ server thread""" """ZMQ server thread"""
context = zmq.Context() context = zmq.Context()
socket = context.socket(zmq.REP) socket = context.socket(zmq.REP)
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.bind(f"tcp://*:{zmq_port}") socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"HNSW ZMQ server listening on port {zmq_port}") logger.info(f"HNSW ZMQ server listening on port {zmq_port}")

View File

@@ -6,10 +6,10 @@ build-backend = "scikit_build_core.build"
[project] [project]
name = "leann-backend-hnsw" name = "leann-backend-hnsw"
version = "0.2.7" version = "0.2.5"
description = "Custom-built HNSW (Faiss) backend for the Leann toolkit." description = "Custom-built HNSW (Faiss) backend for the Leann toolkit."
dependencies = [ dependencies = [
"leann-core==0.2.7", "leann-core==0.2.5",
"numpy", "numpy",
"pyzmq>=23.0.0", "pyzmq>=23.0.0",
"msgpack>=1.0.0", "msgpack>=1.0.0",

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "leann-core" name = "leann-core"
version = "0.2.7" version = "0.2.5"
description = "Core API and plugin system for LEANN" description = "Core API and plugin system for LEANN"
readme = "README.md" readme = "README.md"
requires-python = ">=3.9" requires-python = ">=3.9"
@@ -31,8 +31,6 @@ dependencies = [
"PyPDF2>=3.0.0", "PyPDF2>=3.0.0",
"pymupdf>=1.23.0", "pymupdf>=1.23.0",
"pdfplumber>=0.10.0", "pdfplumber>=0.10.0",
"nbconvert>=7.0.0", # For .ipynb file support
"gitignore-parser>=0.1.12", # For proper .gitignore handling
"mlx>=0.26.3; sys_platform == 'darwin'", "mlx>=0.26.3; sys_platform == 'darwin'",
"mlx-lm>=0.26.0; sys_platform == 'darwin'", "mlx-lm>=0.26.0; sys_platform == 'darwin'",
] ]

View File

@@ -88,9 +88,9 @@ def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int)
context = zmq.Context() context = zmq.Context()
socket = context.socket(zmq.REQ) socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0) # Don't block on close socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.setsockopt(zmq.RCVTIMEO, 1000) # 1s timeout on receive socket.setsockopt(zmq.RCVTIMEO, 300000)
socket.setsockopt(zmq.SNDTIMEO, 1000) # 1s timeout on send socket.setsockopt(zmq.SNDTIMEO, 300000)
socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection socket.setsockopt(zmq.IMMEDIATE, 1)
socket.connect(f"tcp://localhost:{port}") socket.connect(f"tcp://localhost:{port}")
try: try:
@@ -105,8 +105,8 @@ def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int)
# Convert back to numpy array # Convert back to numpy array
embeddings = np.array(embeddings_list, dtype=np.float32) embeddings = np.array(embeddings_list, dtype=np.float32)
finally: finally:
socket.close(linger=0) socket.close()
context.term() # Don't call context.term() - this was causing hangs
return embeddings return embeddings
@@ -577,7 +577,6 @@ class LeannSearcher:
enriched_results = [] enriched_results = []
if "labels" in results and "distances" in results: if "labels" in results and "distances" in results:
logger.info(f" Processing {len(results['labels'][0])} passage IDs:") logger.info(f" Processing {len(results['labels'][0])} passage IDs:")
# Python 3.9 does not support zip(strict=...); lengths are expected to match
for i, (string_id, dist) in enumerate( for i, (string_id, dist) in enumerate(
zip(results["labels"][0], results["distances"][0]) zip(results["labels"][0], results["distances"][0])
): ):
@@ -605,38 +604,17 @@ class LeannSearcher:
) )
except KeyError: except KeyError:
RED = "\033[91m" RED = "\033[91m"
RESET = "\033[0m"
logger.error( logger.error(
f" {RED}{RESET} [{i + 1:2d}] ID: '{string_id}' -> {RED}ERROR: Passage not found!{RESET}" f" {RED}{RESET} [{i + 1:2d}] ID: '{string_id}' -> {RED}ERROR: Passage not found!{RESET}"
) )
# Define color codes outside the loop for final message
GREEN = "\033[92m"
RESET = "\033[0m"
logger.info(f" {GREEN}✓ Final enriched results: {len(enriched_results)} passages{RESET}") logger.info(f" {GREEN}✓ Final enriched results: {len(enriched_results)} passages{RESET}")
return enriched_results return enriched_results
def cleanup(self): def cleanup(self):
"""Explicitly cleanup embedding server and ZMQ resources. """Cleanup embedding server and other resources."""
if hasattr(self.backend_impl, "cleanup"):
This method should be called after you're done using the searcher, self.backend_impl.cleanup()
especially in test environments or batch processing scenarios.
"""
# Stop embedding server
if hasattr(self.backend_impl, "embedding_server_manager"):
self.backend_impl.embedding_server_manager.stop_server()
# Set ZMQ linger but don't terminate global context
try:
import zmq
# Just set linger on the global instance
ctx = zmq.Context.instance()
ctx.linger = 0
# NEVER call ctx.term() or destroy() on the global instance
# That would block waiting for all sockets to close
except Exception:
pass
class LeannChat: class LeannChat:
@@ -707,12 +685,3 @@ class LeannChat:
except (KeyboardInterrupt, EOFError): except (KeyboardInterrupt, EOFError):
print("\nGoodbye!") print("\nGoodbye!")
break break
def cleanup(self):
"""Explicitly cleanup embedding server resources.
This method should be called after you're done using the chat interface,
especially in test environments or batch processing scenarios.
"""
if hasattr(self.searcher, "cleanup"):
self.searcher.cleanup()

View File

@@ -1,6 +1,7 @@
import argparse import argparse
import asyncio import asyncio
from pathlib import Path from pathlib import Path
from typing import Optional
from llama_index.core import SimpleDirectoryReader from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter from llama_index.core.node_parser import SentenceSplitter
@@ -86,9 +87,7 @@ Examples:
# Build command # Build command
build_parser = subparsers.add_parser("build", help="Build document index") build_parser = subparsers.add_parser("build", help="Build document index")
build_parser.add_argument( build_parser.add_argument("index_name", help="Index name")
"index_name", nargs="?", help="Index name (default: current directory name)"
)
build_parser.add_argument( build_parser.add_argument(
"--docs", type=str, default=".", help="Documents directory (default: current directory)" "--docs", type=str, default=".", help="Documents directory (default: current directory)"
) )
@@ -203,37 +202,6 @@ Examples:
with open(global_registry, "w") as f: with open(global_registry, "w") as f:
json.dump(projects, f, indent=2) json.dump(projects, f, indent=2)
def _build_gitignore_parser(self, docs_dir: str):
"""Build gitignore parser using gitignore-parser library."""
from gitignore_parser import parse_gitignore
# Try to parse the root .gitignore
gitignore_path = Path(docs_dir) / ".gitignore"
if gitignore_path.exists():
try:
# gitignore-parser automatically handles all subdirectory .gitignore files!
matches = parse_gitignore(str(gitignore_path))
print(f"📋 Loaded .gitignore from {docs_dir} (includes all subdirectories)")
return matches
except Exception as e:
print(f"Warning: Could not parse .gitignore: {e}")
else:
print("📋 No .gitignore found")
# Fallback: basic pattern matching for essential files
essential_patterns = {".git", ".DS_Store", "__pycache__", "node_modules", ".venv", "venv"}
def basic_matches(file_path):
path_parts = Path(file_path).parts
return any(part in essential_patterns for part in path_parts)
return basic_matches
def _should_exclude_file(self, relative_path: Path, gitignore_matches) -> bool:
"""Check if a file should be excluded using gitignore parser."""
return gitignore_matches(str(relative_path))
def list_indexes(self): def list_indexes(self):
print("Stored LEANN indexes:") print("Stored LEANN indexes:")
@@ -310,54 +278,39 @@ Examples:
print(f' leann search {example_name} "your query"') print(f' leann search {example_name} "your query"')
print(f" leann ask {example_name} --interactive") print(f" leann ask {example_name} --interactive")
def load_documents(self, docs_dir: str, custom_file_types: str | None = None): def load_documents(self, docs_dir: str, custom_file_types: Optional[str] = None):
print(f"Loading documents from {docs_dir}...") print(f"Loading documents from {docs_dir}...")
if custom_file_types: if custom_file_types:
print(f"Using custom file types: {custom_file_types}") print(f"Using custom file types: {custom_file_types}")
# Build gitignore parser # Try to use better PDF parsers first
gitignore_matches = self._build_gitignore_parser(docs_dir)
# Try to use better PDF parsers first, but only if PDFs are requested
documents = [] documents = []
docs_path = Path(docs_dir) docs_path = Path(docs_dir)
# Check if we should process PDFs for file_path in docs_path.rglob("*.pdf"):
should_process_pdfs = custom_file_types is None or ".pdf" in custom_file_types print(f"Processing PDF: {file_path}")
if should_process_pdfs: # Try PyMuPDF first (best quality)
for file_path in docs_path.rglob("*.pdf"): text = extract_pdf_text_with_pymupdf(str(file_path))
# Check if file matches any exclude pattern if text is None:
relative_path = file_path.relative_to(docs_path) # Try pdfplumber
if self._should_exclude_file(relative_path, gitignore_matches): text = extract_pdf_text_with_pdfplumber(str(file_path))
continue
print(f"Processing PDF: {file_path}") if text:
# Create a simple document structure
from llama_index.core import Document
# Try PyMuPDF first (best quality) doc = Document(text=text, metadata={"source": str(file_path)})
text = extract_pdf_text_with_pymupdf(str(file_path)) documents.append(doc)
if text is None: else:
# Try pdfplumber # Fallback to default reader
text = extract_pdf_text_with_pdfplumber(str(file_path)) print(f"Using default reader for {file_path}")
default_docs = SimpleDirectoryReader(
if text: str(file_path.parent),
# Create a simple document structure filename_as_id=True,
from llama_index.core import Document required_exts=[file_path.suffix],
).load_data()
doc = Document(text=text, metadata={"source": str(file_path)}) documents.extend(default_docs)
documents.append(doc)
else:
# Fallback to default reader
print(f"Using default reader for {file_path}")
try:
default_docs = SimpleDirectoryReader(
str(file_path.parent),
filename_as_id=True,
required_exts=[file_path.suffix],
).load_data()
documents.extend(default_docs)
except Exception as e:
print(f"Warning: Could not process {file_path}: {e}")
# Load other file types with default reader # Load other file types with default reader
if custom_file_types: if custom_file_types:
@@ -423,34 +376,13 @@ Examples:
] ]
# Try to load other file types, but don't fail if none are found # Try to load other file types, but don't fail if none are found
try: try:
# Create a custom file filter function using our PathSpec
def file_filter(file_path: str) -> bool:
"""Return True if file should be included (not excluded)"""
try:
docs_path_obj = Path(docs_dir)
file_path_obj = Path(file_path)
relative_path = file_path_obj.relative_to(docs_path_obj)
return not self._should_exclude_file(relative_path, gitignore_matches)
except (ValueError, OSError):
return True # Include files that can't be processed
other_docs = SimpleDirectoryReader( other_docs = SimpleDirectoryReader(
docs_dir, docs_dir,
recursive=True, recursive=True,
encoding="utf-8", encoding="utf-8",
required_exts=code_extensions, required_exts=code_extensions,
file_extractor={}, # Use default extractors
filename_as_id=True,
).load_data(show_progress=True) ).load_data(show_progress=True)
documents.extend(other_docs)
# Filter documents after loading based on gitignore rules
filtered_docs = []
for doc in other_docs:
file_path = doc.metadata.get("file_path", "")
if file_filter(file_path):
filtered_docs.append(doc)
documents.extend(filtered_docs)
except ValueError as e: except ValueError as e:
if "No files found" in str(e): if "No files found" in str(e):
print("No additional files found for other supported types.") print("No additional files found for other supported types.")
@@ -523,13 +455,7 @@ Examples:
async def build_index(self, args): async def build_index(self, args):
docs_dir = args.docs docs_dir = args.docs
# Use current directory name if index_name not provided index_name = args.index_name
if args.index_name:
index_name = args.index_name
else:
index_name = Path.cwd().name
print(f"Using current directory name as index: '{index_name}'")
index_dir = self.indexes_dir / index_name index_dir = self.indexes_dir / index_name
index_path = self.get_index_path(index_name) index_path = self.get_index_path(index_name)

View File

@@ -305,24 +305,14 @@ class EmbeddingServerManager:
project_root = Path(__file__).parent.parent.parent.parent.parent project_root = Path(__file__).parent.parent.parent.parent.parent
logger.info(f"Command: {' '.join(command)}") logger.info(f"Command: {' '.join(command)}")
# In CI environment, redirect output to avoid buffer deadlock # Let server output go directly to console
# Embedding servers use many print statements that can fill buffers # The server will respect LEANN_LOG_LEVEL environment variable
is_ci = os.environ.get("CI") == "true"
if is_ci:
stdout_target = subprocess.DEVNULL
stderr_target = subprocess.DEVNULL
logger.info("CI environment detected, redirecting embedding server output to DEVNULL")
else:
stdout_target = None # Direct to console for visible logs
stderr_target = None # Direct to console for visible logs
# IMPORTANT: Use a new session so we can manage the whole process group reliably
self.server_process = subprocess.Popen( self.server_process = subprocess.Popen(
command, command,
cwd=project_root, cwd=project_root,
stdout=stdout_target, stdout=None, # Direct to console
stderr=stderr_target, stderr=None, # Direct to console
start_new_session=True, start_new_session=True, # Create new process group for better cleanup
) )
self.server_port = port self.server_port = port
logger.info(f"Server process started with PID: {self.server_process.pid}") logger.info(f"Server process started with PID: {self.server_process.pid}")
@@ -364,7 +354,8 @@ class EmbeddingServerManager:
logger.info( logger.info(
f"Terminating server process (PID: {self.server_process.pid}) for backend {self.backend_module_name}..." f"Terminating server process (PID: {self.server_process.pid}) for backend {self.backend_module_name}..."
) )
# Try terminating the whole process group first (POSIX)
# Try terminating the whole process group first
try: try:
pgid = os.getpgid(self.server_process.pid) pgid = os.getpgid(self.server_process.pid)
os.killpg(pgid, signal.SIGTERM) os.killpg(pgid, signal.SIGTERM)
@@ -379,10 +370,12 @@ class EmbeddingServerManager:
logger.warning( logger.warning(
f"Server process {self.server_process.pid} did not terminate gracefully within 3 seconds, killing it." f"Server process {self.server_process.pid} did not terminate gracefully within 3 seconds, killing it."
) )
# Try killing the whole process group
try: try:
pgid = os.getpgid(self.server_process.pid) pgid = os.getpgid(self.server_process.pid)
os.killpg(pgid, signal.SIGKILL) os.killpg(pgid, signal.SIGKILL)
except Exception: except Exception:
# Fallback to killing just the process
self.server_process.kill() self.server_process.kill()
try: try:
self.server_process.wait(timeout=2) self.server_process.wait(timeout=2)
@@ -393,21 +386,28 @@ class EmbeddingServerManager:
) )
# Don't hang indefinitely # Don't hang indefinitely
# Clean up process resources without waiting # Clean up process resources to prevent resource tracker warnings
# The process should already be terminated/killed above try:
# Don't wait here as it can hang CI indefinitely self.server_process.wait(timeout=1) # Give it one final chance with timeout
except subprocess.TimeoutExpired:
logger.warning(
f"Process {self.server_process.pid} still hanging after all kill attempts"
)
# Don't wait indefinitely - just abandon it
except Exception:
pass
self.server_process = None self.server_process = None
def _launch_server_process_colab(self, command: list, port: int) -> None: def _launch_server_process_colab(self, command: list, port: int) -> None:
"""Launch the server process with Colab-specific settings.""" """Launch the server process with Colab-specific settings."""
logger.info(f"Colab Command: {' '.join(command)}") logger.info(f"Colab Command: {' '.join(command)}")
# In Colab, redirect to DEVNULL to avoid pipe blocking # In Colab, we need to be more careful about process management
# PIPE without reading can cause hangs
self.server_process = subprocess.Popen( self.server_process = subprocess.Popen(
command, command,
stdout=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, stderr=subprocess.PIPE,
text=True, text=True,
) )
self.server_port = port self.server_port = port

View File

@@ -25,61 +25,32 @@ def handle_request(request):
"tools": [ "tools": [
{ {
"name": "leann_search", "name": "leann_search",
"description": """🔍 Search code using natural language - like having a coding assistant who knows your entire codebase! "description": "Search LEANN index",
🎯 **Perfect for**:
- "How does authentication work?" → finds auth-related code
- "Error handling patterns" → locates try-catch blocks and error logic
- "Database connection setup" → finds DB initialization code
- "API endpoint definitions" → locates route handlers
- "Configuration management" → finds config files and usage
💡 **Pro tip**: Use this before making any changes to understand existing patterns and conventions.""",
"inputSchema": { "inputSchema": {
"type": "object", "type": "object",
"properties": { "properties": {
"index_name": { "index_name": {"type": "string"},
"type": "string", "query": {"type": "string"},
"description": "Name of the LEANN index to search. Use 'leann_list' first to see available indexes.", "top_k": {"type": "integer", "default": 5},
},
"query": {
"type": "string",
"description": "Search query - can be natural language (e.g., 'how to handle errors') or technical terms (e.g., 'async function definition')",
},
"top_k": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 20,
"description": "Number of search results to return. Use 5-10 for focused results, 15-20 for comprehensive exploration.",
},
"complexity": {
"type": "integer",
"default": 32,
"minimum": 16,
"maximum": 128,
"description": "Search complexity level. Use 16-32 for fast searches (recommended), 64+ for higher precision when needed.",
},
}, },
"required": ["index_name", "query"], "required": ["index_name", "query"],
}, },
}, },
{ {
"name": "leann_status", "name": "leann_ask",
"description": "📊 Check the health and stats of your code indexes - like a medical checkup for your codebase knowledge!", "description": "Ask question using LEANN RAG",
"inputSchema": { "inputSchema": {
"type": "object", "type": "object",
"properties": { "properties": {
"index_name": { "index_name": {"type": "string"},
"type": "string", "question": {"type": "string"},
"description": "Optional: Name of specific index to check. If not provided, shows status of all indexes.",
}
}, },
"required": ["index_name", "question"],
}, },
}, },
{ {
"name": "leann_list", "name": "leann_list",
"description": "📋 Show all your indexed codebases - your personal code library! Use this to see what's available for search.", "description": "List all LEANN indexes",
"inputSchema": {"type": "object", "properties": {}}, "inputSchema": {"type": "object", "properties": {}},
}, },
] ]
@@ -92,41 +63,19 @@ def handle_request(request):
try: try:
if tool_name == "leann_search": if tool_name == "leann_search":
# Validate required parameters
if not args.get("index_name") or not args.get("query"):
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"content": [
{
"type": "text",
"text": "Error: Both index_name and query are required",
}
]
},
}
# Build simplified command
cmd = [ cmd = [
"leann", "leann",
"search", "search",
args["index_name"], args["index_name"],
args["query"], args["query"],
"--recompute-embeddings",
f"--top-k={args.get('top_k', 5)}", f"--top-k={args.get('top_k', 5)}",
f"--complexity={args.get('complexity', 32)}",
] ]
result = subprocess.run(cmd, capture_output=True, text=True) result = subprocess.run(cmd, capture_output=True, text=True)
elif tool_name == "leann_status": elif tool_name == "leann_ask":
if args.get("index_name"): cmd = f'echo "{args["question"]}" | leann ask {args["index_name"]} --recompute-embeddings --llm ollama --model qwen3:8b'
# Check specific index status - for now, we'll use leann list and filter result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
result = subprocess.run(["leann", "list"], capture_output=True, text=True)
# We could enhance this to show more detailed status per index
else:
# Show all indexes status
result = subprocess.run(["leann", "list"], capture_output=True, text=True)
elif tool_name == "leann_list": elif tool_name == "leann_list":
result = subprocess.run(["leann", "list"], capture_output=True, text=True) result = subprocess.run(["leann", "list"], capture_output=True, text=True)

View File

@@ -138,9 +138,9 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
context = zmq.Context() context = zmq.Context()
socket = context.socket(zmq.REQ) socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0) # Don't block on close socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 second timeout socket.setsockopt(zmq.RCVTIMEO, 300000)
socket.setsockopt(zmq.SNDTIMEO, 5000) # 5 second timeout socket.setsockopt(zmq.SNDTIMEO, 300000)
socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection socket.setsockopt(zmq.IMMEDIATE, 1)
socket.connect(f"tcp://localhost:{zmq_port}") socket.connect(f"tcp://localhost:{zmq_port}")
# Send embedding request # Send embedding request
@@ -162,9 +162,8 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
raise RuntimeError(f"Failed to compute embeddings via server: {e}") raise RuntimeError(f"Failed to compute embeddings via server: {e}")
finally: finally:
if socket: if socket:
socket.close(linger=0) socket.close()
if context: # Don't call context.term() - this was causing hangs
context.term()
@abstractmethod @abstractmethod
def search( def search(
@@ -199,22 +198,10 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
pass pass
def cleanup(self): def cleanup(self):
"""Cleanup resources including embedding server and ZMQ connections.""" """Cleanup resources including embedding server."""
# Stop embedding server
if hasattr(self, "embedding_server_manager"): if hasattr(self, "embedding_server_manager"):
self.embedding_server_manager.stop_server() self.embedding_server_manager.stop_server()
# Set ZMQ linger but don't terminate global context
try:
import zmq
# Just set linger on the global instance
ctx = zmq.Context.instance()
ctx.linger = 0
# NEVER call ctx.term() on the global instance
except Exception:
pass
def __del__(self): def __del__(self):
"""Ensures resources are cleaned up when the searcher is destroyed.""" """Ensures resources are cleaned up when the searcher is destroyed."""
try: try:

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "leann" name = "leann"
version = "0.2.7" version = "0.2.5"
description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!" description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!"
readme = "README.md" readme = "README.md"
requires-python = ">=3.9" requires-python = ">=3.9"

View File

@@ -44,16 +44,13 @@ dependencies = [
"mlx-lm>=0.26.0; sys_platform == 'darwin'", "mlx-lm>=0.26.0; sys_platform == 'darwin'",
"psutil>=5.8.0", "psutil>=5.8.0",
"pybind11>=3.0.0", "pybind11>=3.0.0",
"pathspec>=0.12.1",
"nbconvert>=7.16.6",
"gitignore-parser>=0.1.12",
] ]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"pytest>=8.3.0", # Minimum version for Python 3.13 support "pytest>=7.0",
"pytest-cov>=5.0", "pytest-cov>=4.0",
"pytest-xdist>=3.5", # For parallel test execution "pytest-xdist>=3.0", # For parallel test execution
"black>=23.0", "black>=23.0",
"ruff==0.12.7", # Fixed version to ensure consistent formatting across all environments "ruff==0.12.7", # Fixed version to ensure consistent formatting across all environments
"matplotlib", "matplotlib",
@@ -62,10 +59,8 @@ dev = [
] ]
test = [ test = [
"pytest>=8.3.0", # Minimum version for Python 3.13 support "pytest>=7.0",
"pytest-timeout>=2.3", "pytest-timeout>=2.0", # Simple timeout protection for CI
"anyio>=4.0", # For async test support (includes pytest plugin)
"psutil>=5.9.0", # For process cleanup in tests
"llama-index-core>=0.12.0", "llama-index-core>=0.12.0",
"llama-index-readers-file>=0.4.0", "llama-index-readers-file>=0.4.0",
"python-dotenv>=1.0.0", "python-dotenv>=1.0.0",
@@ -157,8 +152,7 @@ markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')", "slow: marks tests as slow (deselect with '-m \"not slow\"')",
"openai: marks tests that require OpenAI API key", "openai: marks tests that require OpenAI API key",
] ]
timeout = 300 # Reduced from 600s (10min) to 300s (5min) for CI safety timeout = 300 # Simple timeout for CI safety (5 minutes)
timeout_method = "thread" # Use thread method to avoid non-daemon thread issues
addopts = [ addopts = [
"-v", "-v",
"--tb=short", "--tb=short",

View File

@@ -1,103 +0,0 @@
#!/bin/bash
# Diagnostic script for debugging CI hangs
echo "========================================="
echo " CI HANG DIAGNOSTIC SCRIPT"
echo "========================================="
echo ""
echo "📅 Current time: $(date)"
echo "🖥️ Hostname: $(hostname)"
echo "👤 User: $(whoami)"
echo "📂 Working directory: $(pwd)"
echo ""
echo "=== PYTHON ENVIRONMENT ==="
python --version 2>&1 || echo "Python not found"
pip list 2>&1 | head -20 || echo "pip not available"
echo ""
echo "=== PROCESS INFORMATION ==="
echo "Current shell PID: $$"
echo "Parent PID: $PPID"
echo ""
echo "All Python processes:"
ps aux | grep -E "[p]ython" || echo "No Python processes"
echo ""
echo "All pytest processes:"
ps aux | grep -E "[p]ytest" || echo "No pytest processes"
echo ""
echo "Embedding server processes:"
ps aux | grep -E "[e]mbedding_server" || echo "No embedding server processes"
echo ""
echo "Zombie processes:"
ps aux | grep "<defunct>" || echo "No zombie processes"
echo ""
echo "=== NETWORK INFORMATION ==="
echo "Network listeners on typical embedding server ports:"
ss -ltn 2>/dev/null | grep -E ":555[0-9]|:556[0-9]" || netstat -ltn 2>/dev/null | grep -E ":555[0-9]|:556[0-9]" || echo "No listeners on embedding ports"
echo ""
echo "All network listeners:"
ss -ltn 2>/dev/null | head -20 || netstat -ltn 2>/dev/null | head -20 || echo "Cannot get network info"
echo ""
echo "=== FILE DESCRIPTORS ==="
echo "Open files for current shell:"
lsof -p $$ 2>/dev/null | head -20 || echo "lsof not available"
echo ""
if [ -d "/proc/$$" ]; then
echo "File descriptors for current shell (/proc/$$/fd):"
ls -la /proc/$$/fd 2>/dev/null | head -20 || echo "Cannot access /proc/$$/fd"
echo ""
fi
echo "=== SYSTEM RESOURCES ==="
echo "Memory usage:"
free -h 2>/dev/null || vm_stat 2>/dev/null || echo "Cannot get memory info"
echo ""
echo "Disk usage:"
df -h . 2>/dev/null || echo "Cannot get disk info"
echo ""
echo "CPU info:"
nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo "Cannot get CPU info"
echo ""
echo "=== PYTHON SPECIFIC CHECKS ==="
python -c "
import sys
import os
print(f'Python executable: {sys.executable}')
print(f'Python path: {sys.path[:3]}...')
print(f'Environment PYTHONPATH: {os.environ.get(\"PYTHONPATH\", \"Not set\")}')
print(f'Site packages: {[p for p in sys.path if \"site-packages\" in p][:2]}')
" 2>&1 || echo "Cannot run Python diagnostics"
echo ""
echo "=== ZMQ SPECIFIC CHECKS ==="
python -c "
try:
import zmq
print(f'ZMQ version: {zmq.zmq_version()}')
print(f'PyZMQ version: {zmq.pyzmq_version()}')
ctx = zmq.Context.instance()
print(f'ZMQ context instance: {ctx}')
except Exception as e:
print(f'ZMQ check failed: {e}')
" 2>&1 || echo "Cannot check ZMQ"
echo ""
echo "=== PYTEST CHECK ==="
pytest --version 2>&1 || echo "pytest not found"
echo ""
echo "=== END OF DIAGNOSTICS ==="
echo "Generated at: $(date)"

View File

@@ -1,301 +1,41 @@
"""Global test configuration and cleanup fixtures.""" """Pytest configuration and fixtures for LEANN tests."""
import faulthandler
import os import os
import signal
import time
from collections.abc import Generator
import pytest import pytest
# Enable faulthandler to dump stack traces
faulthandler.enable()
@pytest.fixture(scope="session", autouse=True)
def _ci_backtraces():
"""Dump stack traces before CI timeout to diagnose hanging."""
if os.getenv("CI") == "true":
# Dump stack traces 10s before the 180s timeout
faulthandler.dump_traceback_later(170, repeat=True)
yield
faulthandler.cancel_dump_traceback_later()
@pytest.fixture(scope="session", autouse=True)
def global_test_cleanup() -> Generator:
"""Global cleanup fixture that runs after all tests.
This ensures all ZMQ connections and child processes are properly cleaned up,
preventing the test runner from hanging on exit.
"""
yield
# Cleanup after all tests
print("\n🧹 Running global test cleanup...")
# 1. Force cleanup of any LeannSearcher instances
try:
import gc
# Force garbage collection to trigger __del__ methods
gc.collect()
time.sleep(0.2)
except Exception:
pass
# 2. Set ZMQ linger but DON'T term Context.instance()
# Terminating the global instance can block if other code still has sockets
try:
import zmq
# Just set linger on the global instance, don't terminate it
ctx = zmq.Context.instance()
ctx.linger = 0
# Do NOT call ctx.term() or ctx.destroy() on the global instance!
# That would block waiting for all sockets to close
except Exception:
pass
# Kill any leftover child processes (including grandchildren)
try:
import psutil
current_process = psutil.Process()
# Get ALL descendants recursively
children = current_process.children(recursive=True)
if children:
print(f"\n⚠️ Cleaning up {len(children)} leftover child processes...")
# First try to terminate gracefully
for child in children:
try:
print(f" Terminating {child.pid} ({child.name()})")
child.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Wait a bit for processes to terminate
gone, alive = psutil.wait_procs(children, timeout=2)
# Force kill any remaining processes
for child in alive:
try:
print(f" Force killing process {child.pid} ({child.name()})")
child.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Final wait to ensure cleanup
psutil.wait_procs(alive, timeout=1)
except ImportError:
# psutil not installed, try basic process cleanup
try:
# Send SIGTERM to all child processes
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
except Exception:
pass
except Exception as e:
print(f"Warning: Error during process cleanup: {e}")
# List and clean up remaining threads
try:
import threading
threads = [t for t in threading.enumerate() if t is not threading.main_thread()]
if threads:
print(f"\n⚠️ {len(threads)} non-main threads still running:")
for t in threads:
print(f" - {t.name} (daemon={t.daemon})")
# Force cleanup of pytest-timeout threads that block exit
if "pytest_timeout" in t.name and not t.daemon:
print(f" 🔧 Converting pytest-timeout thread to daemon: {t.name}")
try:
t.daemon = True
print(" ✓ Converted to daemon thread")
except Exception as e:
print(f" ✗ Failed: {e}")
# Check if only daemon threads remain
non_daemon = [
t for t in threading.enumerate() if t is not threading.main_thread() and not t.daemon
]
if non_daemon:
print(f"\n⚠️ {len(non_daemon)} non-daemon threads still blocking exit")
# Force exit in CI to prevent hanging
if os.environ.get("CI") == "true":
print("🔨 Forcing exit in CI environment...")
os._exit(0)
except Exception as e:
print(f"Thread cleanup error: {e}")
@pytest.fixture
def auto_cleanup_searcher():
"""Fixture that automatically cleans up LeannSearcher instances."""
searchers = []
def register(searcher):
"""Register a searcher for cleanup."""
searchers.append(searcher)
return searcher
yield register
# Cleanup all registered searchers
for searcher in searchers:
try:
searcher.cleanup()
except Exception:
pass
# Force garbage collection
import gc
gc.collect()
time.sleep(0.1)
@pytest.fixture(scope="session", autouse=True)
def _reap_children():
"""Reap all child processes at session end as a safety net."""
yield
# Final aggressive cleanup
try:
import psutil
me = psutil.Process()
kids = me.children(recursive=True)
for p in kids:
try:
p.terminate()
except Exception:
pass
_, alive = psutil.wait_procs(kids, timeout=2)
for p in alive:
try:
p.kill()
except Exception:
pass
except Exception:
pass
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def cleanup_after_each_test(): def test_environment():
"""Cleanup after each test to prevent resource leaks.""" """Set up test environment variables."""
# Mark as test environment to skip memory-intensive operations
os.environ["CI"] = "true"
yield yield
# Force garbage collection to trigger any __del__ methods
import gc
gc.collect() @pytest.fixture(scope="session", autouse=True)
def cleanup_session():
"""Session-level cleanup to ensure no hanging processes."""
yield
# Give a moment for async cleanup # Basic cleanup after all tests
time.sleep(0.1)
def pytest_configure(config):
"""Configure pytest with better timeout handling."""
# Set default timeout method to thread if not specified
if not config.getoption("--timeout-method", None):
config.option.timeout_method = "thread"
# Add more logging
print(f"🔧 Pytest configured at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Python version: {os.sys.version}")
print(f" Platform: {os.sys.platform}")
def pytest_sessionstart(session):
"""Called after the Session object has been created."""
print(f"🏁 Pytest session starting at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Session ID: {id(session)}")
# Show initial process state
try: try:
import os
import psutil import psutil
current = psutil.Process() current_process = psutil.Process(os.getpid())
print(f" Current PID: {current.pid}") children = current_process.children(recursive=True)
print(f" Parent PID: {current.ppid()}")
children = current.children(recursive=True) for child in children:
if children: try:
print(f" ⚠️ Already have {len(children)} child processes at start!") child.terminate()
except psutil.NoSuchProcess:
pass
# Give them time to terminate gracefully
psutil.wait_procs(children, timeout=3)
except Exception: except Exception:
# Don't fail tests due to cleanup errors
pass pass
def pytest_sessionfinish(session, exitstatus):
"""Called after whole test run finished."""
print(f"🏁 Pytest session finishing at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Exit status: {exitstatus}")
# Aggressive cleanup before pytest exits
print("🧹 Starting aggressive cleanup...")
# First, clean up child processes
try:
import psutil
current = psutil.Process()
children = current.children(recursive=True)
if children:
print(f" Found {len(children)} child processes to clean up:")
for child in children:
try:
print(f" - PID {child.pid}: {child.name()} (status: {child.status()})")
child.terminate()
except Exception as e:
print(f" - Failed to terminate {child.pid}: {e}")
# Wait briefly then kill
time.sleep(0.5)
_, alive = psutil.wait_procs(children, timeout=1)
for child in alive:
try:
print(f" - Force killing {child.pid}")
child.kill()
except Exception:
pass
else:
print(" No child processes found")
except Exception as e:
print(f" Process cleanup error: {e}")
# Second, clean up problematic threads
try:
import threading
threads = [t for t in threading.enumerate() if t is not threading.main_thread()]
if threads:
print(f" Found {len(threads)} non-main threads:")
for t in threads:
print(f" - {t.name} (daemon={t.daemon})")
# Convert pytest-timeout threads to daemon so they don't block exit
if "pytest_timeout" in t.name and not t.daemon:
try:
t.daemon = True
print(" ✓ Converted to daemon")
except Exception:
pass
# Force exit if non-daemon threads remain in CI
non_daemon = [
t for t in threading.enumerate() if t is not threading.main_thread() and not t.daemon
]
if non_daemon and os.environ.get("CI") == "true":
print(f" ⚠️ {len(non_daemon)} non-daemon threads remain, forcing exit...")
os._exit(exitstatus or 0)
except Exception as e:
print(f" Thread cleanup error: {e}")
print(f"✅ Pytest exiting at {time.strftime('%Y-%m-%d %H:%M:%S')}")

View File

@@ -7,7 +7,6 @@ import tempfile
from pathlib import Path from pathlib import Path
import pytest import pytest
from test_timeout import ci_timeout
def test_imports(): def test_imports():
@@ -20,7 +19,6 @@ def test_imports():
os.environ.get("CI") == "true", reason="Skip model tests in CI to avoid MPS memory issues" os.environ.get("CI") == "true", reason="Skip model tests in CI to avoid MPS memory issues"
) )
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"]) @pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
@ci_timeout(120) # 2 minute timeout for backend tests
def test_backend_basic(backend_name): def test_backend_basic(backend_name):
"""Test basic functionality for each backend.""" """Test basic functionality for each backend."""
from leann.api import LeannBuilder, LeannSearcher, SearchResult from leann.api import LeannBuilder, LeannSearcher, SearchResult
@@ -70,7 +68,6 @@ def test_backend_basic(backend_name):
@pytest.mark.skipif( @pytest.mark.skipif(
os.environ.get("CI") == "true", reason="Skip model tests in CI to avoid MPS memory issues" os.environ.get("CI") == "true", reason="Skip model tests in CI to avoid MPS memory issues"
) )
@ci_timeout(180) # 3 minute timeout for large index test
def test_large_index(): def test_large_index():
"""Test with larger dataset.""" """Test with larger dataset."""
from leann.api import LeannBuilder, LeannSearcher from leann.api import LeannBuilder, LeannSearcher

View File

@@ -9,7 +9,6 @@ import tempfile
from pathlib import Path from pathlib import Path
import pytest import pytest
from test_timeout import ci_timeout
@pytest.fixture @pytest.fixture
@@ -59,10 +58,6 @@ def test_document_rag_simulated(test_data_dir):
@pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY"), reason="OpenAI API key not available") @pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY"), reason="OpenAI API key not available")
@pytest.mark.skipif(
os.environ.get("CI") == "true", reason="Skip OpenAI embedding tests in CI to avoid hanging"
)
@ci_timeout(60) # 60 second timeout to avoid hanging on OpenAI API calls
def test_document_rag_openai(test_data_dir): def test_document_rag_openai(test_data_dir):
"""Test document_rag with OpenAI embeddings.""" """Test document_rag with OpenAI embeddings."""
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:

View File

@@ -8,11 +8,9 @@ import tempfile
from pathlib import Path from pathlib import Path
import pytest import pytest
from test_timeout import ci_timeout
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"]) @pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
@ci_timeout(90) # 90 second timeout for this comprehensive test
def test_readme_basic_example(backend_name): def test_readme_basic_example(backend_name):
"""Test the basic example from README.md with both backends.""" """Test the basic example from README.md with both backends."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2 # Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2
@@ -81,7 +79,6 @@ def test_readme_imports():
assert callable(LeannChat) assert callable(LeannChat)
@ci_timeout(60) # 60 second timeout
def test_backend_options(): def test_backend_options():
"""Test different backend options mentioned in documentation.""" """Test different backend options mentioned in documentation."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2 # Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2
@@ -118,7 +115,6 @@ def test_backend_options():
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"]) @pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
@ci_timeout(75) # 75 second timeout for LLM tests
def test_llm_config_simulated(backend_name): def test_llm_config_simulated(backend_name):
"""Test simulated LLM configuration option with both backends.""" """Test simulated LLM configuration option with both backends."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2 # Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2

View File

@@ -1,129 +0,0 @@
"""
Test timeout utilities for CI environments.
"""
import functools
import os
import signal
import sys
from typing import Any, Callable
def timeout_test(seconds: int = 30):
"""
Decorator to add timeout to test functions, especially useful in CI environments.
Args:
seconds: Timeout in seconds (default: 30)
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Only apply timeout in CI environment
if os.environ.get("CI") != "true":
return func(*args, **kwargs)
# Set up timeout handler
def timeout_handler(signum, frame):
print(f"\n❌ Test {func.__name__} timed out after {seconds} seconds in CI!")
print("This usually indicates a hanging process or infinite loop.")
# Try to cleanup any hanging processes
try:
import subprocess
subprocess.run(
["pkill", "-f", "embedding_server"], capture_output=True, timeout=2
)
subprocess.run(
["pkill", "-f", "hnsw_embedding"], capture_output=True, timeout=2
)
except Exception:
pass
# Exit with timeout code
sys.exit(124) # Standard timeout exit code
# Set signal handler and alarm
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
signal.alarm(0) # Cancel alarm
return result
except Exception:
signal.alarm(0) # Cancel alarm on exception
raise
finally:
# Restore original handler
signal.signal(signal.SIGALRM, old_handler)
return wrapper
return decorator
def ci_timeout(seconds: int = 60):
"""
Timeout decorator specifically for CI environments.
Uses threading for more reliable timeout handling.
Args:
seconds: Timeout in seconds (default: 60)
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Only apply in CI
if os.environ.get("CI") != "true":
return func(*args, **kwargs)
import threading
result = [None]
exception = [None]
finished = threading.Event()
def target():
try:
result[0] = func(*args, **kwargs)
except Exception as e:
exception[0] = e
finally:
finished.set()
# Start function in thread
thread = threading.Thread(target=target, daemon=True)
thread.start()
# Wait for completion or timeout
if not finished.wait(timeout=seconds):
print(f"\n💥 CI TIMEOUT: Test {func.__name__} exceeded {seconds}s limit!")
print("This usually indicates hanging embedding servers or infinite loops.")
# Try to cleanup embedding servers
try:
import subprocess
subprocess.run(
["pkill", "-9", "-f", "embedding_server"], capture_output=True, timeout=2
)
subprocess.run(
["pkill", "-9", "-f", "hnsw_embedding"], capture_output=True, timeout=2
)
print("Attempted to kill hanging embedding servers.")
except Exception as e:
print(f"Cleanup failed: {e}")
# Raise TimeoutError instead of sys.exit for better pytest integration
raise TimeoutError(f"Test {func.__name__} timed out after {seconds} seconds")
if exception[0]:
raise exception[0]
return result[0]
return wrapper
return decorator

7322
uv.lock generated
View File

File diff suppressed because it is too large Load Diff