diff --git a/.github/workflows/build-reusable.yml b/.github/workflows/build-reusable.yml index 8519c8d..80cfc8e 100644 --- a/.github/workflows/build-reusable.yml +++ b/.github/workflows/build-reusable.yml @@ -263,14 +263,32 @@ jobs: # Activate virtual environment source .venv/bin/activate || source .venv/Scripts/activate + # Define diagnostic function for debugging hangs + diag() { + echo "===== DIAG BEGIN =====" + date + echo "# pstree (current shell group)" + pstree -ap $$ 2>/dev/null || true + echo "# python/pytest processes" + ps -ef | grep -E 'python|pytest|embedding|zmq|diskann' | grep -v grep || true + echo "# network listeners" + ss -ltnp 2>/dev/null || netstat -ltn 2>/dev/null || true + echo "# pytest PIDs" + pgrep -fa pytest || true + echo "===== DIAG END =====" + } + # Run all tests with timeout on Linux to prevent hanging if [[ "$RUNNER_OS" == "Linux" ]]; then echo "Running tests with timeout (Linux)..." - timeout --signal=INT 180 pytest tests/ -v || { + # Set trap for diagnostics + trap diag INT TERM + + timeout --signal=INT 180 pytest tests/ -vv --maxfail=3 || { EXIT_CODE=$? if [ $EXIT_CODE -eq 124 ]; then - echo "⚠️ Tests timed out after 180 seconds - likely process cleanup issue" - echo "Check for lingering ZMQ connections or child processes" + echo "⚠️ Tests timed out after 180 seconds - dumping diagnostics..." + diag # Try to clean up any leftover processes pkill -TERM -P $$ || true sleep 1 @@ -281,7 +299,7 @@ jobs: else # For macOS/Windows, run without GNU timeout echo "Running tests ($RUNNER_OS)..." - pytest tests/ -v + pytest tests/ -vv --maxfail=3 fi - name: Run sanity checks (optional) diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index bbf2769..fc1b812 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -88,6 +88,9 @@ def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int) context = zmq.Context() socket = context.socket(zmq.REQ) socket.setsockopt(zmq.LINGER, 0) # Don't block on close + socket.setsockopt(zmq.RCVTIMEO, 1000) # 1s timeout on receive + socket.setsockopt(zmq.SNDTIMEO, 1000) # 1s timeout on send + socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection socket.connect(f"tcp://localhost:{port}") try: @@ -623,14 +626,15 @@ class LeannSearcher: if hasattr(self.backend_impl, "embedding_server_manager"): self.backend_impl.embedding_server_manager.stop_server() - # Force cleanup of ZMQ connections (especially for C++ side) + # Set ZMQ linger but don't terminate global context try: import zmq - # Aggressively terminate all ZMQ contexts to prevent hanging + # Just set linger on the global instance ctx = zmq.Context.instance() ctx.linger = 0 - # Don't call destroy() here as it might affect other components + # NEVER call ctx.term() or destroy() on the global instance + # That would block waiting for all sockets to close except Exception: pass diff --git a/packages/leann-core/src/leann/searcher_base.py b/packages/leann-core/src/leann/searcher_base.py index 3306ca6..285ab0e 100644 --- a/packages/leann-core/src/leann/searcher_base.py +++ b/packages/leann-core/src/leann/searcher_base.py @@ -137,8 +137,10 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC): try: context = zmq.Context() socket = context.socket(zmq.REQ) - socket.setsockopt(zmq.RCVTIMEO, 30000) # 30 second timeout socket.setsockopt(zmq.LINGER, 0) # Don't block on close + socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 second timeout + socket.setsockopt(zmq.SNDTIMEO, 5000) # 5 second timeout + socket.setsockopt(zmq.IMMEDIATE, 1) # Don't wait for connection socket.connect(f"tcp://localhost:{zmq_port}") # Send embedding request @@ -202,13 +204,14 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC): if hasattr(self, "embedding_server_manager"): self.embedding_server_manager.stop_server() - # Force cleanup of ZMQ connections (especially for C++ side in HNSW/DiskANN) + # Set ZMQ linger but don't terminate global context try: import zmq - # Set short linger to prevent blocking + # Just set linger on the global instance ctx = zmq.Context.instance() ctx.linger = 0 + # NEVER call ctx.term() on the global instance except Exception: pass diff --git a/tests/conftest.py b/tests/conftest.py index 8f165be..f278669 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ """Global test configuration and cleanup fixtures.""" +import faulthandler import os import signal import time @@ -7,6 +8,19 @@ from collections.abc import Generator import pytest +# Enable faulthandler to dump stack traces +faulthandler.enable() + + +@pytest.fixture(scope="session", autouse=True) +def _ci_backtraces(): + """Dump stack traces before CI timeout to diagnose hanging.""" + if os.getenv("CI") == "true": + # Dump stack traces 10s before the 180s timeout + faulthandler.dump_traceback_later(170, repeat=True) + yield + faulthandler.cancel_dump_traceback_later() + @pytest.fixture(scope="session", autouse=True) def global_test_cleanup() -> Generator: @@ -30,33 +44,25 @@ def global_test_cleanup() -> Generator: except Exception: pass - # 2. Terminate ZMQ contexts more aggressively + # 2. Set ZMQ linger but DON'T term Context.instance() + # Terminating the global instance can block if other code still has sockets try: import zmq - # Get the global instance and destroy it + # Just set linger on the global instance, don't terminate it ctx = zmq.Context.instance() ctx.linger = 0 - - # Force termination - this is aggressive but needed for CI - try: - ctx.destroy(linger=0) - except Exception: - pass - - # Also try to terminate the default context - try: - zmq.Context.term(zmq.Context.instance()) - except Exception: - pass + # Do NOT call ctx.term() or ctx.destroy() on the global instance! + # That would block waiting for all sockets to close except Exception: pass - # Kill any leftover child processes + # Kill any leftover child processes (including grandchildren) try: import psutil current_process = psutil.Process() + # Get ALL descendants recursively children = current_process.children(recursive=True) if children: @@ -65,6 +71,7 @@ def global_test_cleanup() -> Generator: # First try to terminate gracefully for child in children: try: + print(f" Terminating {child.pid} ({child.name()})") child.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): pass @@ -79,6 +86,9 @@ def global_test_cleanup() -> Generator: child.kill() except (psutil.NoSuchProcess, psutil.AccessDenied): pass + + # Final wait to ensure cleanup + psutil.wait_procs(alive, timeout=1) except ImportError: # psutil not installed, try basic process cleanup try: @@ -128,6 +138,33 @@ def auto_cleanup_searcher(): time.sleep(0.1) +@pytest.fixture(scope="session", autouse=True) +def _reap_children(): + """Reap all child processes at session end as a safety net.""" + yield + + # Final aggressive cleanup + try: + import psutil + + me = psutil.Process() + kids = me.children(recursive=True) + for p in kids: + try: + p.terminate() + except Exception: + pass + + _, alive = psutil.wait_procs(kids, timeout=2) + for p in alive: + try: + p.kill() + except Exception: + pass + except Exception: + pass + + @pytest.fixture(autouse=True) def cleanup_after_each_test(): """Cleanup after each test to prevent resource leaks."""