core: purge dead helpers and comments from EmbeddingServerManager; keep only minimal in-process flow

This commit is contained in:
Andy Lee
2025-08-13 23:41:44 -07:00
parent a4346ef701
commit 10bfe9c980
2 changed files with 59 additions and 216 deletions

View File

@@ -278,7 +278,6 @@ jobs:
- name: Run tests with pytest
env:
CI: true
LEANN_SKIP_COMPAT: 1
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
HF_HUB_DISABLE_SYMLINKS: 1
TOKENIZERS_PARALLELISM: false

View File

@@ -8,7 +8,7 @@ import time
from pathlib import Path
from typing import Optional
import psutil
# Lightweight, self-contained server manager with no cross-process inspection
# Set up logging based on environment variable
LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper()
@@ -43,130 +43,7 @@ def _check_port(port: int) -> bool:
return s.connect_ex(("localhost", port)) == 0
def _check_process_matches_config(
port: int, expected_model: str, expected_passages_file: str
) -> bool:
"""
Check if the process using the port matches our expected model and passages file.
Returns True if matches, False otherwise.
"""
try:
for proc in psutil.process_iter(["pid", "cmdline"]):
if not _is_process_listening_on_port(proc, port):
continue
cmdline = proc.info["cmdline"]
if not cmdline:
continue
return _check_cmdline_matches_config(
cmdline, port, expected_model, expected_passages_file
)
logger.debug(f"No process found listening on port {port}")
return False
except Exception as e:
logger.warning(f"Could not check process on port {port}: {e}")
return False
def _is_process_listening_on_port(proc, port: int) -> bool:
"""Check if a process is listening on the given port."""
try:
connections = proc.net_connections()
for conn in connections:
if conn.laddr.port == port and conn.status == psutil.CONN_LISTEN:
return True
return False
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return False
def _check_cmdline_matches_config(
cmdline: list, port: int, expected_model: str, expected_passages_file: str
) -> bool:
"""Check if command line matches our expected configuration."""
cmdline_str = " ".join(cmdline)
logger.debug(f"Found process on port {port}: {cmdline_str}")
# Check if it's our embedding server
is_embedding_server = any(
server_type in cmdline_str
for server_type in [
"embedding_server",
"leann_backend_diskann.embedding_server",
"leann_backend_hnsw.hnsw_embedding_server",
]
)
if not is_embedding_server:
logger.debug(f"Process on port {port} is not our embedding server")
return False
# Check model name
model_matches = _check_model_in_cmdline(cmdline, expected_model)
# Check passages file if provided
passages_matches = _check_passages_in_cmdline(cmdline, expected_passages_file)
result = model_matches and passages_matches
logger.debug(
f"model_matches: {model_matches}, passages_matches: {passages_matches}, overall: {result}"
)
return result
def _check_model_in_cmdline(cmdline: list, expected_model: str) -> bool:
"""Check if the command line contains the expected model."""
if "--model-name" not in cmdline:
return False
model_idx = cmdline.index("--model-name")
if model_idx + 1 >= len(cmdline):
return False
actual_model = cmdline[model_idx + 1]
return actual_model == expected_model
def _check_passages_in_cmdline(cmdline: list, expected_passages_file: str) -> bool:
"""Check if the command line contains the expected passages file."""
if "--passages-file" not in cmdline:
return False # Expected but not found
passages_idx = cmdline.index("--passages-file")
if passages_idx + 1 >= len(cmdline):
return False
actual_passages = cmdline[passages_idx + 1]
expected_path = Path(expected_passages_file).resolve()
actual_path = Path(actual_passages).resolve()
return actual_path == expected_path
def _find_compatible_port_or_next_available(
start_port: int, model_name: str, passages_file: str, max_attempts: int = 100
) -> tuple[int, bool]:
"""
Find a port that either has a compatible server or is available.
Returns (port, is_compatible) where is_compatible indicates if we found a matching server.
"""
for port in range(start_port, start_port + max_attempts):
if not _check_port(port):
# Port is available
return port, False
# Port is in use, check if it's compatible
if _check_process_matches_config(port, model_name, passages_file):
logger.info(f"Found compatible server on port {port}")
return port, True
else:
logger.info(f"Port {port} has incompatible server, trying next port...")
raise RuntimeError(
f"Could not find compatible or available port in range {start_port}-{start_port + max_attempts}"
)
# Note: All cross-process scanning helpers removed for simplicity
class EmbeddingServerManager:
@@ -185,7 +62,8 @@ class EmbeddingServerManager:
self.backend_module_name = backend_module_name
self.server_process: Optional[subprocess.Popen] = None
self.server_port: Optional[int] = None
self._server_pid: Optional[int] = None # Adopted server PID when not launched here
# Track last-started config for in-process reuse only
self._server_config: Optional[dict] = None
self._atexit_registered = False
# Also register a weakref finalizer to ensure cleanup when manager is GC'ed
try:
@@ -205,39 +83,22 @@ class EmbeddingServerManager:
"""Start the embedding server."""
passages_file = kwargs.get("passages_file")
# Check if we have a compatible server already running
if self._has_compatible_running_server(model_name, passages_file):
logger.info("Found compatible running server!")
return True, port
# Check if we have a compatible server already running in-process
if self._has_compatible_running_server(model_name, passages_file, embedding_mode):
logger.info("Reusing in-process compatible server")
return True, self.server_port or port
# For Colab environment, use a different strategy
if _is_colab_environment():
logger.info("Detected Colab environment, using alternative startup strategy")
return self._start_server_colab(port, model_name, embedding_mode, **kwargs)
# In CI or when explicitly requested, skip compatibility scanning to avoid slow/hanging proc scans
skip_compat = (
os.environ.get("LEANN_SKIP_COMPAT", "").lower() in {"1", "true", "yes"}
or os.environ.get("CI") == "true"
)
if skip_compat:
try:
actual_port = _get_available_port(port)
is_compatible = False
except RuntimeError:
logger.error("No available ports found")
return False, port
else:
# Find a compatible port or next available (may scan processes)
actual_port, is_compatible = _find_compatible_port_or_next_available(
port, model_name, passages_file
)
if is_compatible:
logger.info(f"Found compatible server on port {actual_port}")
# Adopt the existing server so we can clean it up on exit
self._adopt_existing_server(actual_port, model_name, passages_file)
return True, actual_port
# Always pick a fresh available port
try:
actual_port = _get_available_port(port)
except RuntimeError:
logger.error("No available ports found")
return False, port
# Start a new server
return self._start_new_server(actual_port, model_name, embedding_mode, **kwargs)
@@ -270,17 +131,20 @@ class EmbeddingServerManager:
logger.error(f"Failed to start embedding server in Colab: {e}")
return False, actual_port
def _has_compatible_running_server(self, model_name: str, passages_file: str) -> bool:
"""Check if we have a compatible running server."""
def _has_compatible_running_server(
self, model_name: str, passages_file: str, embedding_mode: str
) -> bool:
"""Check if current in-process server matches desired config."""
if not (self.server_process and self.server_process.poll() is None and self.server_port):
return False
if _check_process_matches_config(self.server_port, model_name, passages_file):
logger.info(f"Existing server process (PID {self.server_process.pid}) is compatible")
return True
logger.info("Existing server process is incompatible. Should start a new server.")
return False
if not self._server_config:
return False
cfg = self._server_config
return (
cfg.get("model_name") == model_name
and cfg.get("passages_file") == passages_file
and cfg.get("embedding_mode", "sentence-transformers") == embedding_mode
)
def _start_new_server(
self, port: int, model_name: str, embedding_mode: str, **kwargs
@@ -348,6 +212,25 @@ class EmbeddingServerManager:
stderr=stderr_target,
)
self.server_port = port
# Record config for in-process reuse
try:
self._server_config = {
"model_name": command[command.index("--model-name") + 1]
if "--model-name" in command
else "",
"passages_file": command[command.index("--passages-file") + 1]
if "--passages-file" in command
else "",
"embedding_mode": command[command.index("--embedding-mode") + 1]
if "--embedding-mode" in command
else "sentence-transformers",
}
except Exception:
self._server_config = {
"model_name": "",
"passages_file": "",
"embedding_mode": "sentence-transformers",
}
logger.info(f"Server process started with PID: {self.server_process.pid}")
# Register atexit callback only when we actually start a process
@@ -384,36 +267,14 @@ class EmbeddingServerManager:
def stop_server(self):
"""Stops the embedding server process if it's running."""
if not self.server_process and not self._server_pid:
return
# If we have an adopted PID and no Popen handle, try PID-based stop
if not self.server_process and self._server_pid:
try:
proc = psutil.Process(self._server_pid)
logger.info(f"Terminating adopted server process (PID: {self._server_pid})...")
proc.terminate()
try:
proc.wait(timeout=5)
logger.info(f"Adopted server process {self._server_pid} terminated gracefully.")
except psutil.TimeoutExpired:
logger.warning(
f"Adopted server process {self._server_pid} did not terminate within 5 seconds, force killing..."
)
proc.kill()
finally:
self._server_pid = None
self.server_port = None
except Exception as e:
logger.warning(f"Failed to terminate adopted server PID {self._server_pid}: {e}")
self._server_pid = None
self.server_port = None
if not self.server_process:
return
if self.server_process and self.server_process.poll() is not None:
# Process already terminated
self.server_process = None
self.server_port = None
self._server_config = None
return
logger.info(
@@ -453,6 +314,7 @@ class EmbeddingServerManager:
finally:
self.server_process = None
self.server_port = None
self._server_config = None
def _finalize_process(self) -> None:
"""Best-effort cleanup used by weakref.finalize/atexit."""
@@ -461,33 +323,9 @@ class EmbeddingServerManager:
except Exception:
pass
def _adopt_existing_server(self, port: int, expected_model: str, passages_file: str) -> None:
"""Adopt an existing compatible server so that we can stop it on exit.
This scans processes to find the PID listening on the given port that matches
our expected config, then records its PID and registers atexit cleanup.
"""
try:
adopted_pid: Optional[int] = None
for proc in psutil.process_iter(["pid", "cmdline"]):
if not _is_process_listening_on_port(proc, port):
continue
cmdline = proc.info.get("cmdline") or []
if _check_cmdline_matches_config(cmdline, port, expected_model, passages_file):
adopted_pid = proc.info["pid"]
break
if adopted_pid:
self._server_pid = adopted_pid
self.server_port = port
logger.info(f"Adopted existing embedding server PID {adopted_pid} on port {port}")
if not self._atexit_registered:
atexit.register(self._finalize_process)
self._atexit_registered = True
else:
logger.warning(f"Could not find PID to adopt for server on port {port}")
except Exception as e:
logger.warning(f"Failed to adopt existing server on port {port}: {e}")
def _adopt_existing_server(self, *args, **kwargs) -> None:
# Removed: cross-process adoption no longer supported
return
def _launch_server_process_colab(self, command: list, port: int) -> None:
"""Launch the server process with Colab-specific settings."""
@@ -503,10 +341,16 @@ class EmbeddingServerManager:
self.server_port = port
logger.info(f"Colab server process started with PID: {self.server_process.pid}")
# Register atexit callback
# Register atexit callback (unified)
if not self._atexit_registered:
atexit.register(lambda: self.stop_server() if self.server_process else None)
atexit.register(self._finalize_process)
self._atexit_registered = True
# Record config for in-process reuse is best-effort in Colab mode
self._server_config = {
"model_name": "",
"passages_file": "",
"embedding_mode": "sentence-transformers",
}
def _wait_for_server_ready_colab(self, port: int) -> tuple[bool, int]:
"""Wait for the server to be ready with Colab-specific timeout."""