diff --git a/packages/leann-core/src/leann/embedding_server_manager.py b/packages/leann-core/src/leann/embedding_server_manager.py index 4358b00..2432325 100644 --- a/packages/leann-core/src/leann/embedding_server_manager.py +++ b/packages/leann-core/src/leann/embedding_server_manager.py @@ -185,6 +185,7 @@ class EmbeddingServerManager: self.backend_module_name = backend_module_name self.server_process: Optional[subprocess.Popen] = None self.server_port: Optional[int] = None + self._server_pid: Optional[int] = None # Adopted server PID when not launched here self._atexit_registered = False # Also register a weakref finalizer to ensure cleanup when manager is GC'ed try: @@ -221,6 +222,8 @@ class EmbeddingServerManager: if is_compatible: logger.info(f"Found compatible server on port {actual_port}") + # Adopt the existing server so we can clean it up on exit + self._adopt_existing_server(actual_port, model_name, passages_file) return True, actual_port # Start a new server @@ -368,12 +371,36 @@ class EmbeddingServerManager: def stop_server(self): """Stops the embedding server process if it's running.""" - if not self.server_process: + if not self.server_process and not self._server_pid: return - if self.server_process.poll() is not None: + # If we have an adopted PID and no Popen handle, try PID-based stop + if not self.server_process and self._server_pid: + try: + proc = psutil.Process(self._server_pid) + logger.info(f"Terminating adopted server process (PID: {self._server_pid})...") + proc.terminate() + try: + proc.wait(timeout=5) + logger.info(f"Adopted server process {self._server_pid} terminated gracefully.") + except psutil.TimeoutExpired: + logger.warning( + f"Adopted server process {self._server_pid} did not terminate within 5 seconds, force killing..." + ) + proc.kill() + finally: + self._server_pid = None + self.server_port = None + except Exception as e: + logger.warning(f"Failed to terminate adopted server PID {self._server_pid}: {e}") + self._server_pid = None + self.server_port = None + return + + if self.server_process and self.server_process.poll() is not None: # Process already terminated self.server_process = None + self.server_port = None return logger.info( @@ -412,6 +439,7 @@ class EmbeddingServerManager: logger.warning(f"Error during process cleanup: {e}") finally: self.server_process = None + self.server_port = None def _finalize_process(self) -> None: """Best-effort cleanup used by weakref.finalize/atexit.""" @@ -420,6 +448,34 @@ class EmbeddingServerManager: except Exception: pass + def _adopt_existing_server(self, port: int, expected_model: str, passages_file: str) -> None: + """Adopt an existing compatible server so that we can stop it on exit. + + This scans processes to find the PID listening on the given port that matches + our expected config, then records its PID and registers atexit cleanup. + """ + try: + adopted_pid: Optional[int] = None + for proc in psutil.process_iter(["pid", "cmdline"]): + if not _is_process_listening_on_port(proc, port): + continue + cmdline = proc.info.get("cmdline") or [] + if _check_cmdline_matches_config(cmdline, port, expected_model, passages_file): + adopted_pid = proc.info["pid"] + break + + if adopted_pid: + self._server_pid = adopted_pid + self.server_port = port + logger.info(f"Adopted existing embedding server PID {adopted_pid} on port {port}") + if not self._atexit_registered: + atexit.register(self._finalize_process) + self._atexit_registered = True + else: + logger.warning(f"Could not find PID to adopt for server on port {port}") + except Exception as e: + logger.warning(f"Failed to adopt existing server on port {port}: {e}") + def _launch_server_process_colab(self, command: list, port: int) -> None: """Launch the server process with Colab-specific settings.""" logger.info(f"Colab Command: {' '.join(command)}")