From 10bfe9c98059f7e78274eaa3e41b36e55f67bfd2 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Wed, 13 Aug 2025 23:41:44 -0700 Subject: [PATCH] core: purge dead helpers and comments from EmbeddingServerManager; keep only minimal in-process flow --- .github/workflows/build-reusable.yml | 1 - .../src/leann/embedding_server_manager.py | 274 ++++-------------- 2 files changed, 59 insertions(+), 216 deletions(-) diff --git a/.github/workflows/build-reusable.yml b/.github/workflows/build-reusable.yml index 8bec3ed..b2323a3 100644 --- a/.github/workflows/build-reusable.yml +++ b/.github/workflows/build-reusable.yml @@ -278,7 +278,6 @@ jobs: - name: Run tests with pytest env: CI: true - LEANN_SKIP_COMPAT: 1 OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} HF_HUB_DISABLE_SYMLINKS: 1 TOKENIZERS_PARALLELISM: false diff --git a/packages/leann-core/src/leann/embedding_server_manager.py b/packages/leann-core/src/leann/embedding_server_manager.py index 6922d8c..9c7b8b0 100644 --- a/packages/leann-core/src/leann/embedding_server_manager.py +++ b/packages/leann-core/src/leann/embedding_server_manager.py @@ -8,7 +8,7 @@ import time from pathlib import Path from typing import Optional -import psutil +# Lightweight, self-contained server manager with no cross-process inspection # Set up logging based on environment variable LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper() @@ -43,130 +43,7 @@ def _check_port(port: int) -> bool: return s.connect_ex(("localhost", port)) == 0 -def _check_process_matches_config( - port: int, expected_model: str, expected_passages_file: str -) -> bool: - """ - Check if the process using the port matches our expected model and passages file. - Returns True if matches, False otherwise. - """ - try: - for proc in psutil.process_iter(["pid", "cmdline"]): - if not _is_process_listening_on_port(proc, port): - continue - - cmdline = proc.info["cmdline"] - if not cmdline: - continue - - return _check_cmdline_matches_config( - cmdline, port, expected_model, expected_passages_file - ) - - logger.debug(f"No process found listening on port {port}") - return False - - except Exception as e: - logger.warning(f"Could not check process on port {port}: {e}") - return False - - -def _is_process_listening_on_port(proc, port: int) -> bool: - """Check if a process is listening on the given port.""" - try: - connections = proc.net_connections() - for conn in connections: - if conn.laddr.port == port and conn.status == psutil.CONN_LISTEN: - return True - return False - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): - return False - - -def _check_cmdline_matches_config( - cmdline: list, port: int, expected_model: str, expected_passages_file: str -) -> bool: - """Check if command line matches our expected configuration.""" - cmdline_str = " ".join(cmdline) - logger.debug(f"Found process on port {port}: {cmdline_str}") - - # Check if it's our embedding server - is_embedding_server = any( - server_type in cmdline_str - for server_type in [ - "embedding_server", - "leann_backend_diskann.embedding_server", - "leann_backend_hnsw.hnsw_embedding_server", - ] - ) - - if not is_embedding_server: - logger.debug(f"Process on port {port} is not our embedding server") - return False - - # Check model name - model_matches = _check_model_in_cmdline(cmdline, expected_model) - - # Check passages file if provided - passages_matches = _check_passages_in_cmdline(cmdline, expected_passages_file) - - result = model_matches and passages_matches - logger.debug( - f"model_matches: {model_matches}, passages_matches: {passages_matches}, overall: {result}" - ) - return result - - -def _check_model_in_cmdline(cmdline: list, expected_model: str) -> bool: - """Check if the command line contains the expected model.""" - if "--model-name" not in cmdline: - return False - - model_idx = cmdline.index("--model-name") - if model_idx + 1 >= len(cmdline): - return False - - actual_model = cmdline[model_idx + 1] - return actual_model == expected_model - - -def _check_passages_in_cmdline(cmdline: list, expected_passages_file: str) -> bool: - """Check if the command line contains the expected passages file.""" - if "--passages-file" not in cmdline: - return False # Expected but not found - - passages_idx = cmdline.index("--passages-file") - if passages_idx + 1 >= len(cmdline): - return False - - actual_passages = cmdline[passages_idx + 1] - expected_path = Path(expected_passages_file).resolve() - actual_path = Path(actual_passages).resolve() - return actual_path == expected_path - - -def _find_compatible_port_or_next_available( - start_port: int, model_name: str, passages_file: str, max_attempts: int = 100 -) -> tuple[int, bool]: - """ - Find a port that either has a compatible server or is available. - Returns (port, is_compatible) where is_compatible indicates if we found a matching server. - """ - for port in range(start_port, start_port + max_attempts): - if not _check_port(port): - # Port is available - return port, False - - # Port is in use, check if it's compatible - if _check_process_matches_config(port, model_name, passages_file): - logger.info(f"Found compatible server on port {port}") - return port, True - else: - logger.info(f"Port {port} has incompatible server, trying next port...") - - raise RuntimeError( - f"Could not find compatible or available port in range {start_port}-{start_port + max_attempts}" - ) +# Note: All cross-process scanning helpers removed for simplicity class EmbeddingServerManager: @@ -185,7 +62,8 @@ class EmbeddingServerManager: self.backend_module_name = backend_module_name self.server_process: Optional[subprocess.Popen] = None self.server_port: Optional[int] = None - self._server_pid: Optional[int] = None # Adopted server PID when not launched here + # Track last-started config for in-process reuse only + self._server_config: Optional[dict] = None self._atexit_registered = False # Also register a weakref finalizer to ensure cleanup when manager is GC'ed try: @@ -205,39 +83,22 @@ class EmbeddingServerManager: """Start the embedding server.""" passages_file = kwargs.get("passages_file") - # Check if we have a compatible server already running - if self._has_compatible_running_server(model_name, passages_file): - logger.info("Found compatible running server!") - return True, port + # Check if we have a compatible server already running in-process + if self._has_compatible_running_server(model_name, passages_file, embedding_mode): + logger.info("Reusing in-process compatible server") + return True, self.server_port or port # For Colab environment, use a different strategy if _is_colab_environment(): logger.info("Detected Colab environment, using alternative startup strategy") return self._start_server_colab(port, model_name, embedding_mode, **kwargs) - # In CI or when explicitly requested, skip compatibility scanning to avoid slow/hanging proc scans - skip_compat = ( - os.environ.get("LEANN_SKIP_COMPAT", "").lower() in {"1", "true", "yes"} - or os.environ.get("CI") == "true" - ) - if skip_compat: - try: - actual_port = _get_available_port(port) - is_compatible = False - except RuntimeError: - logger.error("No available ports found") - return False, port - else: - # Find a compatible port or next available (may scan processes) - actual_port, is_compatible = _find_compatible_port_or_next_available( - port, model_name, passages_file - ) - - if is_compatible: - logger.info(f"Found compatible server on port {actual_port}") - # Adopt the existing server so we can clean it up on exit - self._adopt_existing_server(actual_port, model_name, passages_file) - return True, actual_port + # Always pick a fresh available port + try: + actual_port = _get_available_port(port) + except RuntimeError: + logger.error("No available ports found") + return False, port # Start a new server return self._start_new_server(actual_port, model_name, embedding_mode, **kwargs) @@ -270,17 +131,20 @@ class EmbeddingServerManager: logger.error(f"Failed to start embedding server in Colab: {e}") return False, actual_port - def _has_compatible_running_server(self, model_name: str, passages_file: str) -> bool: - """Check if we have a compatible running server.""" + def _has_compatible_running_server( + self, model_name: str, passages_file: str, embedding_mode: str + ) -> bool: + """Check if current in-process server matches desired config.""" if not (self.server_process and self.server_process.poll() is None and self.server_port): return False - - if _check_process_matches_config(self.server_port, model_name, passages_file): - logger.info(f"Existing server process (PID {self.server_process.pid}) is compatible") - return True - - logger.info("Existing server process is incompatible. Should start a new server.") - return False + if not self._server_config: + return False + cfg = self._server_config + return ( + cfg.get("model_name") == model_name + and cfg.get("passages_file") == passages_file + and cfg.get("embedding_mode", "sentence-transformers") == embedding_mode + ) def _start_new_server( self, port: int, model_name: str, embedding_mode: str, **kwargs @@ -348,6 +212,25 @@ class EmbeddingServerManager: stderr=stderr_target, ) self.server_port = port + # Record config for in-process reuse + try: + self._server_config = { + "model_name": command[command.index("--model-name") + 1] + if "--model-name" in command + else "", + "passages_file": command[command.index("--passages-file") + 1] + if "--passages-file" in command + else "", + "embedding_mode": command[command.index("--embedding-mode") + 1] + if "--embedding-mode" in command + else "sentence-transformers", + } + except Exception: + self._server_config = { + "model_name": "", + "passages_file": "", + "embedding_mode": "sentence-transformers", + } logger.info(f"Server process started with PID: {self.server_process.pid}") # Register atexit callback only when we actually start a process @@ -384,36 +267,14 @@ class EmbeddingServerManager: def stop_server(self): """Stops the embedding server process if it's running.""" - if not self.server_process and not self._server_pid: - return - - # If we have an adopted PID and no Popen handle, try PID-based stop - if not self.server_process and self._server_pid: - try: - proc = psutil.Process(self._server_pid) - logger.info(f"Terminating adopted server process (PID: {self._server_pid})...") - proc.terminate() - try: - proc.wait(timeout=5) - logger.info(f"Adopted server process {self._server_pid} terminated gracefully.") - except psutil.TimeoutExpired: - logger.warning( - f"Adopted server process {self._server_pid} did not terminate within 5 seconds, force killing..." - ) - proc.kill() - finally: - self._server_pid = None - self.server_port = None - except Exception as e: - logger.warning(f"Failed to terminate adopted server PID {self._server_pid}: {e}") - self._server_pid = None - self.server_port = None + if not self.server_process: return if self.server_process and self.server_process.poll() is not None: # Process already terminated self.server_process = None self.server_port = None + self._server_config = None return logger.info( @@ -453,6 +314,7 @@ class EmbeddingServerManager: finally: self.server_process = None self.server_port = None + self._server_config = None def _finalize_process(self) -> None: """Best-effort cleanup used by weakref.finalize/atexit.""" @@ -461,33 +323,9 @@ class EmbeddingServerManager: except Exception: pass - def _adopt_existing_server(self, port: int, expected_model: str, passages_file: str) -> None: - """Adopt an existing compatible server so that we can stop it on exit. - - This scans processes to find the PID listening on the given port that matches - our expected config, then records its PID and registers atexit cleanup. - """ - try: - adopted_pid: Optional[int] = None - for proc in psutil.process_iter(["pid", "cmdline"]): - if not _is_process_listening_on_port(proc, port): - continue - cmdline = proc.info.get("cmdline") or [] - if _check_cmdline_matches_config(cmdline, port, expected_model, passages_file): - adopted_pid = proc.info["pid"] - break - - if adopted_pid: - self._server_pid = adopted_pid - self.server_port = port - logger.info(f"Adopted existing embedding server PID {adopted_pid} on port {port}") - if not self._atexit_registered: - atexit.register(self._finalize_process) - self._atexit_registered = True - else: - logger.warning(f"Could not find PID to adopt for server on port {port}") - except Exception as e: - logger.warning(f"Failed to adopt existing server on port {port}: {e}") + def _adopt_existing_server(self, *args, **kwargs) -> None: + # Removed: cross-process adoption no longer supported + return def _launch_server_process_colab(self, command: list, port: int) -> None: """Launch the server process with Colab-specific settings.""" @@ -503,10 +341,16 @@ class EmbeddingServerManager: self.server_port = port logger.info(f"Colab server process started with PID: {self.server_process.pid}") - # Register atexit callback + # Register atexit callback (unified) if not self._atexit_registered: - atexit.register(lambda: self.stop_server() if self.server_process else None) + atexit.register(self._finalize_process) self._atexit_registered = True + # Record config for in-process reuse is best-effort in Colab mode + self._server_config = { + "model_name": "", + "passages_file": "", + "embedding_mode": "sentence-transformers", + } def _wait_for_server_ready_colab(self, port: int) -> tuple[bool, int]: """Wait for the server to be ready with Colab-specific timeout."""