fix: restart embedding server when passages change
This commit is contained in:
137
tests/test_embedding_server_manager.py
Normal file
137
tests/test_embedding_server_manager.py
Normal file
@@ -0,0 +1,137 @@
|
||||
import json
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from leann.embedding_server_manager import EmbeddingServerManager
|
||||
|
||||
|
||||
class DummyProcess:
|
||||
def __init__(self):
|
||||
self.pid = 12345
|
||||
self._terminated = False
|
||||
|
||||
def poll(self):
|
||||
return 0 if self._terminated else None
|
||||
|
||||
def terminate(self):
|
||||
self._terminated = True
|
||||
|
||||
def kill(self):
|
||||
self._terminated = True
|
||||
|
||||
def wait(self, timeout=None):
|
||||
self._terminated = True
|
||||
return 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def embedding_manager(monkeypatch):
|
||||
manager = EmbeddingServerManager("leann_backend_hnsw.hnsw_embedding_server")
|
||||
|
||||
def fake_get_available_port(start_port):
|
||||
return start_port
|
||||
|
||||
monkeypatch.setattr(
|
||||
"leann.embedding_server_manager._get_available_port",
|
||||
fake_get_available_port,
|
||||
)
|
||||
|
||||
start_calls = []
|
||||
|
||||
def fake_start_new_server(self, port, model_name, embedding_mode, **kwargs):
|
||||
config_signature = kwargs.get("config_signature")
|
||||
start_calls.append(config_signature)
|
||||
self.server_process = DummyProcess()
|
||||
self.server_port = port
|
||||
self._server_config = config_signature
|
||||
return True, port
|
||||
|
||||
monkeypatch.setattr(
|
||||
EmbeddingServerManager,
|
||||
"_start_new_server",
|
||||
fake_start_new_server,
|
||||
)
|
||||
|
||||
# Ensure stop_server doesn't try to operate on real subprocesses
|
||||
def fake_stop_server(self):
|
||||
self.server_process = None
|
||||
self.server_port = None
|
||||
self._server_config = None
|
||||
|
||||
monkeypatch.setattr(EmbeddingServerManager, "stop_server", fake_stop_server)
|
||||
|
||||
return manager, start_calls
|
||||
|
||||
|
||||
def _write_meta(meta_path, passages_name, index_name, total):
|
||||
meta_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"backend_name": "hnsw",
|
||||
"embedding_model": "test-model",
|
||||
"embedding_mode": "sentence-transformers",
|
||||
"dimensions": 3,
|
||||
"backend_kwargs": {},
|
||||
"passage_sources": [
|
||||
{
|
||||
"type": "jsonl",
|
||||
"path": passages_name,
|
||||
"index_path": index_name,
|
||||
}
|
||||
],
|
||||
"total_passages": total,
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
|
||||
def test_server_restarts_when_metadata_changes(tmp_path, embedding_manager):
|
||||
manager, start_calls = embedding_manager
|
||||
|
||||
meta_path = tmp_path / "example.meta.json"
|
||||
passages_path = tmp_path / "example.passages.jsonl"
|
||||
index_path = tmp_path / "example.passages.idx"
|
||||
|
||||
passages_path.write_text("first\n", encoding="utf-8")
|
||||
index_path.write_bytes(b"index")
|
||||
_write_meta(meta_path, passages_path.name, index_path.name, total=1)
|
||||
|
||||
# Initial start populates signature
|
||||
ok, port = manager.start_server(
|
||||
port=6000,
|
||||
model_name="test-model",
|
||||
passages_file=str(meta_path),
|
||||
)
|
||||
assert ok
|
||||
assert port == 6000
|
||||
assert len(start_calls) == 1
|
||||
|
||||
initial_signature = start_calls[0]["passages_signature"]
|
||||
|
||||
# No metadata change => reuse existing server
|
||||
ok, port_again = manager.start_server(
|
||||
port=6000,
|
||||
model_name="test-model",
|
||||
passages_file=str(meta_path),
|
||||
)
|
||||
assert ok
|
||||
assert port_again == 6000
|
||||
assert len(start_calls) == 1
|
||||
|
||||
# Modify passage data and metadata to force signature change
|
||||
time.sleep(0.01) # Ensure filesystem timestamps move forward
|
||||
passages_path.write_text("second\n", encoding="utf-8")
|
||||
_write_meta(meta_path, passages_path.name, index_path.name, total=2)
|
||||
|
||||
ok, port_third = manager.start_server(
|
||||
port=6000,
|
||||
model_name="test-model",
|
||||
passages_file=str(meta_path),
|
||||
)
|
||||
assert ok
|
||||
assert port_third == 6000
|
||||
assert len(start_calls) == 2
|
||||
|
||||
updated_signature = start_calls[1]["passages_signature"]
|
||||
assert updated_signature != initial_signature
|
||||
Reference in New Issue
Block a user