fix: clean and simple hang prevention solution
This commit provides a minimal, focused fix for CI hanging issues by addressing the root causes: **Key Changes:** 1. **ZMQ Resource Management:** - Remove `context.term()` calls that were causing hangs - Add `socket.setsockopt(zmq.LINGER, 0)` to prevent blocking on close - Keep socket operations simple with default timeouts (no artificial limits) 2. **Process Cleanup:** - Add timeout (1s) to final `process.wait()` in embedding server manager - Prevent infinite waiting that was causing CI hangs 3. **Resource Cleanup Methods:** - Add simple `cleanup()` methods to searchers and API classes - Focus on C++ object destruction for DiskANN backend - Avoid complex cleanup logic that could introduce new issues 4. **Basic Test Safety:** - Simple pytest-timeout configuration (300s) - Basic test session cleanup using psutil - Minimal conftest.py without complex logic **Philosophy:** This solution avoids the complex multi-layered fixes from the previous PR chain. Instead, it targets the specific root causes: - ZMQ context termination blocking - Process wait() without timeout - C++ resource leaks in backends 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -459,3 +459,24 @@ class DiskannSearcher(BaseSearcher):
|
||||
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
|
||||
|
||||
return {"labels": string_labels, "distances": distances}
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup DiskANN-specific resources including C++ index."""
|
||||
# Call parent cleanup first
|
||||
super().cleanup()
|
||||
|
||||
# Delete the C++ index to trigger destructors
|
||||
try:
|
||||
if hasattr(self, "_index") and self._index is not None:
|
||||
del self._index
|
||||
self._index = None
|
||||
self._current_zmq_port = None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Force garbage collection to ensure C++ objects are destroyed
|
||||
try:
|
||||
import gc
|
||||
gc.collect()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -87,21 +87,23 @@ def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int)
|
||||
# Connect to embedding server
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REQ)
|
||||
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
|
||||
socket.connect(f"tcp://localhost:{port}")
|
||||
|
||||
# Send chunks to server for embedding computation
|
||||
request = chunks
|
||||
socket.send(msgpack.packb(request))
|
||||
try:
|
||||
# Send chunks to server for embedding computation
|
||||
request = chunks
|
||||
socket.send(msgpack.packb(request))
|
||||
|
||||
# Receive embeddings from server
|
||||
response = socket.recv()
|
||||
embeddings_list = msgpack.unpackb(response)
|
||||
# Receive embeddings from server
|
||||
response = socket.recv()
|
||||
embeddings_list = msgpack.unpackb(response)
|
||||
|
||||
# Convert back to numpy array
|
||||
embeddings = np.array(embeddings_list, dtype=np.float32)
|
||||
|
||||
socket.close()
|
||||
context.term()
|
||||
# Convert back to numpy array
|
||||
embeddings = np.array(embeddings_list, dtype=np.float32)
|
||||
finally:
|
||||
socket.close()
|
||||
# Don't call context.term() - this was causing hangs
|
||||
|
||||
return embeddings
|
||||
|
||||
@@ -606,6 +608,11 @@ class LeannSearcher:
|
||||
logger.info(f" {GREEN}✓ Final enriched results: {len(enriched_results)} passages{RESET}")
|
||||
return enriched_results
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup embedding server and other resources."""
|
||||
if hasattr(self.backend_impl, "cleanup"):
|
||||
self.backend_impl.cleanup()
|
||||
|
||||
|
||||
class LeannChat:
|
||||
def __init__(
|
||||
|
||||
@@ -373,7 +373,12 @@ class EmbeddingServerManager:
|
||||
|
||||
# Clean up process resources to prevent resource tracker warnings
|
||||
try:
|
||||
self.server_process.wait() # Ensure process is fully cleaned up
|
||||
self.server_process.wait(timeout=1) # Give it one final chance with timeout
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(
|
||||
f"Process {self.server_process.pid} still hanging after all kill attempts"
|
||||
)
|
||||
# Don't wait indefinitely - just abandon it
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@@ -132,10 +132,12 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
|
||||
import msgpack
|
||||
import zmq
|
||||
|
||||
context = None
|
||||
socket = None
|
||||
try:
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REQ)
|
||||
socket.setsockopt(zmq.RCVTIMEO, 30000) # 30 second timeout
|
||||
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
|
||||
socket.connect(f"tcp://localhost:{zmq_port}")
|
||||
|
||||
# Send embedding request
|
||||
@@ -147,9 +149,6 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
|
||||
response_bytes = socket.recv()
|
||||
response = msgpack.unpackb(response_bytes)
|
||||
|
||||
socket.close()
|
||||
context.term()
|
||||
|
||||
# Convert response to numpy array
|
||||
if isinstance(response, list) and len(response) > 0:
|
||||
return np.array(response, dtype=np.float32)
|
||||
@@ -158,6 +157,10 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to compute embeddings via server: {e}")
|
||||
finally:
|
||||
if socket:
|
||||
socket.close()
|
||||
# Don't call context.term() - this was causing hangs
|
||||
|
||||
@abstractmethod
|
||||
def search(
|
||||
@@ -191,7 +194,15 @@ class BaseSearcher(LeannBackendSearcherInterface, ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def __del__(self):
|
||||
"""Ensures the embedding server is stopped when the searcher is destroyed."""
|
||||
def cleanup(self):
|
||||
"""Cleanup resources including embedding server."""
|
||||
if hasattr(self, "embedding_server_manager"):
|
||||
self.embedding_server_manager.stop_server()
|
||||
|
||||
def __del__(self):
|
||||
"""Ensures resources are cleaned up when the searcher is destroyed."""
|
||||
try:
|
||||
self.cleanup()
|
||||
except Exception:
|
||||
# Ignore errors during destruction
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user