refactor: embedding server manager

This commit is contained in:
Andy Lee
2025-07-06 01:54:46 +00:00
parent 449983c937
commit a38bc0a3fc
5 changed files with 228 additions and 324 deletions

View File

@@ -0,0 +1 @@
from . import diskann_backend

View File

@@ -12,6 +12,7 @@ import socket
import subprocess
import sys
from leann.embedding_server_manager import EmbeddingServerManager
from leann.registry import register_backend
from leann.interface import (
LeannBackendFactoryInterface,
@@ -42,96 +43,6 @@ def _write_vectors_to_bin(data: np.ndarray, file_path: str):
f.write(struct.pack('I', dim))
f.write(data.tobytes())
def _check_port(port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
class EmbeddingServerManager:
def __init__(self):
self.server_process = None
self.server_port = None
atexit.register(self.stop_server)
def start_server(self, port=5555, model_name="sentence-transformers/all-mpnet-base-v2"):
if self.server_process and self.server_process.poll() is None:
print(f"INFO: Reusing existing server process for this session (PID {self.server_process.pid})")
return True
# 检查端口是否已被其他无关进程占用
if _check_port(port):
print(f"WARNING: Port {port} is already in use. Assuming an external server is running and connecting to it.")
return True
print(f"INFO: Starting session-level embedding server as a background process...")
try:
command = [
sys.executable,
"-m", "packages.leann-backend-diskann.leann_backend_diskann.embedding_server",
"--zmq-port", str(port),
"--model-name", model_name
]
project_root = Path(__file__).parent.parent.parent.parent
print(f"INFO: Running command from project root: {project_root}")
self.server_process = subprocess.Popen(
command,
cwd=project_root,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
text=True,
encoding='utf-8'
)
self.server_port = port
print(f"INFO: Server process started with PID: {self.server_process.pid}")
max_wait, wait_interval = 30, 0.5
for _ in range(int(max_wait / wait_interval)):
if _check_port(port):
print(f"✅ Embedding server is up and ready for this session.")
log_thread = threading.Thread(target=self._log_monitor, daemon=True)
log_thread.start()
return True
if self.server_process.poll() is not None:
print("❌ ERROR: Server process terminated unexpectedly during startup.")
self._log_monitor()
return False
time.sleep(wait_interval)
print(f"❌ ERROR: Server process failed to start listening within {max_wait} seconds.")
self.stop_server()
return False
except Exception as e:
print(f"❌ ERROR: Failed to start embedding server process: {e}")
return False
def _log_monitor(self):
if not self.server_process:
return
try:
if self.server_process.stdout:
for line in iter(self.server_process.stdout.readline, ''):
print(f"[EmbeddingServer LOG]: {line.strip()}")
self.server_process.stdout.close()
if self.server_process.stderr:
for line in iter(self.server_process.stderr.readline, ''):
print(f"[EmbeddingServer ERROR]: {line.strip()}")
self.server_process.stderr.close()
except Exception as e:
print(f"Log monitor error: {e}")
def stop_server(self):
if self.server_process and self.server_process.poll() is None:
print(f"INFO: Terminating session server process (PID: {self.server_process.pid})...")
self.server_process.terminate()
try:
self.server_process.wait(timeout=5)
print("INFO: Server process terminated.")
except subprocess.TimeoutExpired:
print("WARNING: Server process did not terminate gracefully, killing it.")
self.server_process.kill()
self.server_process = None
@register_backend("diskann")
class DiskannBackend(LeannBackendFactoryInterface):
@staticmethod
@@ -143,16 +54,13 @@ class DiskannBackend(LeannBackendFactoryInterface):
path = Path(index_path)
meta_path = path.parent / f"{path.name}.meta.json"
if not meta_path.exists():
raise FileNotFoundError(f"Leann metadata file not found at {meta_path}. Cannot infer vector dimension for searcher.")
raise FileNotFoundError(f"Leann metadata file not found at {meta_path}.")
with open(meta_path, 'r') as f:
meta = json.load(f)
dimensions = meta.get("dimensions")
if not dimensions:
raise ValueError("Dimensions not found in Leann metadata. Please rebuild the index with a newer version of Leann.")
kwargs['dimensions'] = dimensions
# Pass essential metadata to the searcher
kwargs['meta'] = meta
return DiskannSearcher(index_path, **kwargs)
class DiskannBuilder(LeannBackendBuilderInterface):
@@ -215,19 +123,29 @@ class DiskannBuilder(LeannBackendBuilderInterface):
class DiskannSearcher(LeannBackendSearcherInterface):
def __init__(self, index_path: str, **kwargs):
self.meta = kwargs.get("meta", {})
if not self.meta:
raise ValueError("DiskannSearcher requires metadata from .meta.json.")
dimensions = self.meta.get("dimensions")
if not dimensions:
raise ValueError("Dimensions not found in Leann metadata.")
self.distance_metric = self.meta.get("distance_metric", "mips").lower()
metric_enum = METRIC_MAP.get(self.distance_metric)
if metric_enum is None:
raise ValueError(f"Unsupported distance_metric '{self.distance_metric}'.")
self.embedding_model = self.meta.get("embedding_model")
if not self.embedding_model:
print("WARNING: embedding_model not found in meta.json. Recompute will fail if attempted.")
path = Path(index_path)
index_dir = path.parent
index_prefix = path.stem
metric_str = kwargs.get("distance_metric", "mips").lower()
metric_enum = METRIC_MAP.get(metric_str)
if metric_enum is None:
raise ValueError(f"Unsupported distance_metric '{metric_str}'.")
num_threads = kwargs.get("num_threads", 8)
num_nodes_to_cache = kwargs.get("num_nodes_to_cache", 0)
dimensions = kwargs.get("dimensions")
if not dimensions:
raise ValueError("Vector dimension not provided to DiskannSearcher.")
try:
full_index_prefix = str(index_dir / index_prefix)
@@ -235,7 +153,9 @@ class DiskannSearcher(LeannBackendSearcherInterface):
metric_enum, full_index_prefix, num_threads, num_nodes_to_cache, 1, "", ""
)
self.num_threads = num_threads
self.embedding_server_manager = EmbeddingServerManager()
self.embedding_server_manager = EmbeddingServerManager(
backend_module_name="leann_backend_diskann.embedding_server"
)
print("✅ DiskANN index loaded successfully.")
except Exception as e:
print(f"💥 ERROR: Failed to load DiskANN index. Exception: {e}")
@@ -255,12 +175,20 @@ class DiskannSearcher(LeannBackendSearcherInterface):
if recompute_beighbor_embeddings:
print(f"INFO: DiskANN ZMQ mode enabled - ensuring embedding server is running")
zmq_port = kwargs.get("zmq_port", 6666)
embedding_model = kwargs.get("embedding_model", "sentence-transformers/all-mpnet-base-v2")
if not self.embedding_model:
raise ValueError("Cannot use recompute_beighbor_embeddings without 'embedding_model' in meta.json.")
if not self.embedding_server_manager.start_server(zmq_port, embedding_model):
zmq_port = kwargs.get("zmq_port", 6666)
server_started = self.embedding_server_manager.start_server(
port=zmq_port,
model_name=self.embedding_model,
distance_metric=self.distance_metric
)
if not server_started:
print(f"WARNING: Failed to start embedding server, falling back to PQ computation")
kwargs['recompute_beighbor_embeddings'] = False
recompute_beighbor_embeddings = False
if query.dtype != np.float32:
query = query.astype(np.float32)
@@ -292,4 +220,4 @@ class DiskannSearcher(LeannBackendSearcherInterface):
def __del__(self):
if hasattr(self, 'embedding_server_manager'):
self.embedding_server_manager.stop_server()
self.embedding_server_manager.stop_server()