core: adopt compatible running server (record PID) and ensure stop_server() can terminate adopted processes; clear server_port on stop

This commit is contained in:
Andy Lee
2025-08-13 22:04:51 -07:00
parent 17e0d7470f
commit 6af8101977

View File

@@ -185,6 +185,7 @@ class EmbeddingServerManager:
self.backend_module_name = backend_module_name
self.server_process: Optional[subprocess.Popen] = None
self.server_port: Optional[int] = None
self._server_pid: Optional[int] = None # Adopted server PID when not launched here
self._atexit_registered = False
# Also register a weakref finalizer to ensure cleanup when manager is GC'ed
try:
@@ -221,6 +222,8 @@ class EmbeddingServerManager:
if is_compatible:
logger.info(f"Found compatible server on port {actual_port}")
# Adopt the existing server so we can clean it up on exit
self._adopt_existing_server(actual_port, model_name, passages_file)
return True, actual_port
# Start a new server
@@ -368,12 +371,36 @@ class EmbeddingServerManager:
def stop_server(self):
"""Stops the embedding server process if it's running."""
if not self.server_process:
if not self.server_process and not self._server_pid:
return
if self.server_process.poll() is not None:
# If we have an adopted PID and no Popen handle, try PID-based stop
if not self.server_process and self._server_pid:
try:
proc = psutil.Process(self._server_pid)
logger.info(f"Terminating adopted server process (PID: {self._server_pid})...")
proc.terminate()
try:
proc.wait(timeout=5)
logger.info(f"Adopted server process {self._server_pid} terminated gracefully.")
except psutil.TimeoutExpired:
logger.warning(
f"Adopted server process {self._server_pid} did not terminate within 5 seconds, force killing..."
)
proc.kill()
finally:
self._server_pid = None
self.server_port = None
except Exception as e:
logger.warning(f"Failed to terminate adopted server PID {self._server_pid}: {e}")
self._server_pid = None
self.server_port = None
return
if self.server_process and self.server_process.poll() is not None:
# Process already terminated
self.server_process = None
self.server_port = None
return
logger.info(
@@ -412,6 +439,7 @@ class EmbeddingServerManager:
logger.warning(f"Error during process cleanup: {e}")
finally:
self.server_process = None
self.server_port = None
def _finalize_process(self) -> None:
"""Best-effort cleanup used by weakref.finalize/atexit."""
@@ -420,6 +448,34 @@ class EmbeddingServerManager:
except Exception:
pass
def _adopt_existing_server(self, port: int, expected_model: str, passages_file: str) -> None:
"""Adopt an existing compatible server so that we can stop it on exit.
This scans processes to find the PID listening on the given port that matches
our expected config, then records its PID and registers atexit cleanup.
"""
try:
adopted_pid: Optional[int] = None
for proc in psutil.process_iter(["pid", "cmdline"]):
if not _is_process_listening_on_port(proc, port):
continue
cmdline = proc.info.get("cmdline") or []
if _check_cmdline_matches_config(cmdline, port, expected_model, passages_file):
adopted_pid = proc.info["pid"]
break
if adopted_pid:
self._server_pid = adopted_pid
self.server_port = port
logger.info(f"Adopted existing embedding server PID {adopted_pid} on port {port}")
if not self._atexit_registered:
atexit.register(self._finalize_process)
self._atexit_registered = True
else:
logger.warning(f"Could not find PID to adopt for server on port {port}")
except Exception as e:
logger.warning(f"Failed to adopt existing server on port {port}: {e}")
def _launch_server_process_colab(self, command: list, port: int) -> None:
"""Launch the server process with Colab-specific settings."""
logger.info(f"Colab Command: {' '.join(command)}")