fix: comprehensive ZMQ timeout and cleanup fixes based on detailed analysis
Based on excellent diagnostic suggestions, implemented multiple fixes: 1. Diagnostics: - Added faulthandler to dump stack traces 10s before CI timeout - Enhanced CI script with trap handler to show processes/network on timeout - Added diag() function to capture pstree, processes, network listeners 2. ZMQ Socket Timeouts (critical fix): - Added RCVTIMEO=1000ms and SNDTIMEO=1000ms to all client sockets - Added IMMEDIATE=1 to avoid connection blocking - Reduced searcher timeout from 30s to 5s - This prevents infinite blocking on recv/send operations 3. Context.instance() Fix (major issue): - NEVER call term() or destroy() on Context.instance() - This was causing blocking as it waits for ALL sockets to close - Now only set linger=0 without terminating 4. Enhanced Process Cleanup: - Added _reap_children fixture for aggressive session-end cleanup - Better recursive child process termination - Added final wait to ensure cleanup completes The 180s timeout was happening because: - ZMQ recv() was blocking indefinitely without timeout - Context.instance().term() was waiting for all sockets - Child processes weren't being fully cleaned up These changes should prevent the hanging completely.
This commit is contained in:
26
.github/workflows/build-reusable.yml
vendored
26
.github/workflows/build-reusable.yml
vendored
@@ -263,14 +263,32 @@ jobs:
|
||||
# Activate virtual environment
|
||||
source .venv/bin/activate || source .venv/Scripts/activate
|
||||
|
||||
# Define diagnostic function for debugging hangs
|
||||
diag() {
|
||||
echo "===== DIAG BEGIN ====="
|
||||
date
|
||||
echo "# pstree (current shell group)"
|
||||
pstree -ap $$ 2>/dev/null || true
|
||||
echo "# python/pytest processes"
|
||||
ps -ef | grep -E 'python|pytest|embedding|zmq|diskann' | grep -v grep || true
|
||||
echo "# network listeners"
|
||||
ss -ltnp 2>/dev/null || netstat -ltn 2>/dev/null || true
|
||||
echo "# pytest PIDs"
|
||||
pgrep -fa pytest || true
|
||||
echo "===== DIAG END ====="
|
||||
}
|
||||
|
||||
# Run all tests with timeout on Linux to prevent hanging
|
||||
if [[ "$RUNNER_OS" == "Linux" ]]; then
|
||||
echo "Running tests with timeout (Linux)..."
|
||||
timeout --signal=INT 180 pytest tests/ -v || {
|
||||
# Set trap for diagnostics
|
||||
trap diag INT TERM
|
||||
|
||||
timeout --signal=INT 180 pytest tests/ -vv --maxfail=3 || {
|
||||
EXIT_CODE=$?
|
||||
if [ $EXIT_CODE -eq 124 ]; then
|
||||
echo "⚠️ Tests timed out after 180 seconds - likely process cleanup issue"
|
||||
echo "Check for lingering ZMQ connections or child processes"
|
||||
echo "⚠️ Tests timed out after 180 seconds - dumping diagnostics..."
|
||||
diag
|
||||
# Try to clean up any leftover processes
|
||||
pkill -TERM -P $$ || true
|
||||
sleep 1
|
||||
@@ -281,7 +299,7 @@ jobs:
|
||||
else
|
||||
# For macOS/Windows, run without GNU timeout
|
||||
echo "Running tests ($RUNNER_OS)..."
|
||||
pytest tests/ -v
|
||||
pytest tests/ -vv --maxfail=3
|
||||
fi
|
||||
|
||||
- name: Run sanity checks (optional)
|
||||
|
||||
@@ -88,6 +88,9 @@ def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int)
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REQ)
|
||||
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
|
||||
socket.setsockopt(zmq.RCVTIMEO, 1000) # 1s timeout on receive
|
||||
socket.setsockopt(zmq.SNDTIMEO, 1000) # 1s timeout on send
|
||||
socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection
|
||||
socket.connect(f"tcp://localhost:{port}")
|
||||
|
||||
try:
|
||||
@@ -623,14 +626,15 @@ class LeannSearcher:
|
||||
if hasattr(self.backend_impl, "embedding_server_manager"):
|
||||
self.backend_impl.embedding_server_manager.stop_server()
|
||||
|
||||
# Force cleanup of ZMQ connections (especially for C++ side)
|
||||
# Set ZMQ linger but don't terminate global context
|
||||
try:
|
||||
import zmq
|
||||
|
||||
# Aggressively terminate all ZMQ contexts to prevent hanging
|
||||
# Just set linger on the global instance
|
||||
ctx = zmq.Context.instance()
|
||||
ctx.linger = 0
|
||||
# Don't call destroy() here as it might affect other components
|
||||
# NEVER call ctx.term() or destroy() on the global instance
|
||||
# That would block waiting for all sockets to close
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@@ -137,8 +137,10 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
|
||||
try:
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REQ)
|
||||
socket.setsockopt(zmq.RCVTIMEO, 30000) # 30 second timeout
|
||||
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
|
||||
socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 second timeout
|
||||
socket.setsockopt(zmq.SNDTIMEO, 5000) # 5 second timeout
|
||||
socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection
|
||||
socket.connect(f"tcp://localhost:{zmq_port}")
|
||||
|
||||
# Send embedding request
|
||||
@@ -202,13 +204,14 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
|
||||
if hasattr(self, "embedding_server_manager"):
|
||||
self.embedding_server_manager.stop_server()
|
||||
|
||||
# Force cleanup of ZMQ connections (especially for C++ side in HNSW/DiskANN)
|
||||
# Set ZMQ linger but don't terminate global context
|
||||
try:
|
||||
import zmq
|
||||
|
||||
# Set short linger to prevent blocking
|
||||
# Just set linger on the global instance
|
||||
ctx = zmq.Context.instance()
|
||||
ctx.linger = 0
|
||||
# NEVER call ctx.term() on the global instance
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Global test configuration and cleanup fixtures."""
|
||||
|
||||
import faulthandler
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
@@ -7,6 +8,19 @@ from collections.abc import Generator
|
||||
|
||||
import pytest
|
||||
|
||||
# Enable faulthandler to dump stack traces
|
||||
faulthandler.enable()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def _ci_backtraces():
|
||||
"""Dump stack traces before CI timeout to diagnose hanging."""
|
||||
if os.getenv("CI") == "true":
|
||||
# Dump stack traces 10s before the 180s timeout
|
||||
faulthandler.dump_traceback_later(170, repeat=True)
|
||||
yield
|
||||
faulthandler.cancel_dump_traceback_later()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def global_test_cleanup() -> Generator:
|
||||
@@ -30,33 +44,25 @@ def global_test_cleanup() -> Generator:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2. Terminate ZMQ contexts more aggressively
|
||||
# 2. Set ZMQ linger but DON'T term Context.instance()
|
||||
# Terminating the global instance can block if other code still has sockets
|
||||
try:
|
||||
import zmq
|
||||
|
||||
# Get the global instance and destroy it
|
||||
# Just set linger on the global instance, don't terminate it
|
||||
ctx = zmq.Context.instance()
|
||||
ctx.linger = 0
|
||||
|
||||
# Force termination - this is aggressive but needed for CI
|
||||
try:
|
||||
ctx.destroy(linger=0)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Also try to terminate the default context
|
||||
try:
|
||||
zmq.Context.term(zmq.Context.instance())
|
||||
except Exception:
|
||||
pass
|
||||
# Do NOT call ctx.term() or ctx.destroy() on the global instance!
|
||||
# That would block waiting for all sockets to close
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Kill any leftover child processes
|
||||
# Kill any leftover child processes (including grandchildren)
|
||||
try:
|
||||
import psutil
|
||||
|
||||
current_process = psutil.Process()
|
||||
# Get ALL descendants recursively
|
||||
children = current_process.children(recursive=True)
|
||||
|
||||
if children:
|
||||
@@ -65,6 +71,7 @@ def global_test_cleanup() -> Generator:
|
||||
# First try to terminate gracefully
|
||||
for child in children:
|
||||
try:
|
||||
print(f" Terminating {child.pid} ({child.name()})")
|
||||
child.terminate()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
@@ -79,6 +86,9 @@ def global_test_cleanup() -> Generator:
|
||||
child.kill()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
# Final wait to ensure cleanup
|
||||
psutil.wait_procs(alive, timeout=1)
|
||||
except ImportError:
|
||||
# psutil not installed, try basic process cleanup
|
||||
try:
|
||||
@@ -128,6 +138,33 @@ def auto_cleanup_searcher():
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def _reap_children():
|
||||
"""Reap all child processes at session end as a safety net."""
|
||||
yield
|
||||
|
||||
# Final aggressive cleanup
|
||||
try:
|
||||
import psutil
|
||||
|
||||
me = psutil.Process()
|
||||
kids = me.children(recursive=True)
|
||||
for p in kids:
|
||||
try:
|
||||
p.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_, alive = psutil.wait_procs(kids, timeout=2)
|
||||
for p in alive:
|
||||
try:
|
||||
p.kill()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_after_each_test():
|
||||
"""Cleanup after each test to prevent resource leaks."""
|
||||
|
||||
Reference in New Issue
Block a user