Fix update. Should launch embedding server first (#130)
* fix: set ntotal for storage as well * fix: launch embedding server before adding
This commit is contained in:
@@ -5,6 +5,7 @@ with the correct, original embedding logic from the user's reference code.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import re
|
||||
import subprocess
|
||||
@@ -20,6 +21,7 @@ from leann_backend_hnsw.convert_to_csr import prune_hnsw_embeddings_inplace
|
||||
from leann.interface import LeannBackendSearcherInterface
|
||||
|
||||
from .chat import get_llm
|
||||
from .embedding_server_manager import EmbeddingServerManager
|
||||
from .interface import LeannBackendFactoryInterface
|
||||
from .metadata_filter import MetadataFilterEngine
|
||||
from .registry import BACKEND_REGISTRY
|
||||
@@ -755,40 +757,88 @@ class LeannBuilder:
|
||||
f"Existing index dimension ({index.d}) does not match new embeddings ({embedding_dim})."
|
||||
)
|
||||
|
||||
passage_meta_mode = meta.get("embedding_mode", self.embedding_mode)
|
||||
passage_provider_options = meta.get("embedding_options", self.embedding_options)
|
||||
|
||||
base_id = index.ntotal
|
||||
for offset, chunk in enumerate(valid_chunks):
|
||||
new_id = str(base_id + offset)
|
||||
chunk.setdefault("metadata", {})["id"] = new_id
|
||||
chunk["id"] = new_id
|
||||
|
||||
if needs_recompute:
|
||||
# sequengtially add embeddings
|
||||
for i in range(embeddings.shape[0]):
|
||||
print(f"add {i} embeddings")
|
||||
index.add(1, faiss.swig_ptr(embeddings[i : i + 1]))
|
||||
else:
|
||||
index.add(embeddings.shape[0], faiss.swig_ptr(embeddings))
|
||||
# Append passages/offsets before we attempt index.add so the ZMQ server
|
||||
# can resolve newly assigned IDs during recompute. Keep rollback hooks
|
||||
# so we can restore files if the update fails mid-way.
|
||||
rollback_passages_size = passages_file.stat().st_size if passages_file.exists() else 0
|
||||
offset_map_backup = offset_map.copy()
|
||||
|
||||
# index.add(embeddings.shape[0], faiss.swig_ptr(embeddings))
|
||||
faiss.write_index(index, str(index_file))
|
||||
try:
|
||||
with open(passages_file, "a", encoding="utf-8") as f:
|
||||
for chunk in valid_chunks:
|
||||
offset = f.tell()
|
||||
json.dump(
|
||||
{
|
||||
"id": chunk["id"],
|
||||
"text": chunk["text"],
|
||||
"metadata": chunk.get("metadata", {}),
|
||||
},
|
||||
f,
|
||||
ensure_ascii=False,
|
||||
)
|
||||
f.write("\n")
|
||||
offset_map[chunk["id"]] = offset
|
||||
|
||||
with open(passages_file, "a", encoding="utf-8") as f:
|
||||
for chunk in valid_chunks:
|
||||
offset = f.tell()
|
||||
json.dump(
|
||||
{
|
||||
"id": chunk["id"],
|
||||
"text": chunk["text"],
|
||||
"metadata": chunk.get("metadata", {}),
|
||||
},
|
||||
f,
|
||||
ensure_ascii=False,
|
||||
)
|
||||
f.write("\n")
|
||||
offset_map[chunk["id"]] = offset
|
||||
with open(offset_file, "wb") as f:
|
||||
pickle.dump(offset_map, f)
|
||||
|
||||
with open(offset_file, "wb") as f:
|
||||
pickle.dump(offset_map, f)
|
||||
server_manager: Optional[EmbeddingServerManager] = None
|
||||
server_started = False
|
||||
requested_zmq_port = int(os.getenv("LEANN_UPDATE_ZMQ_PORT", "5557"))
|
||||
|
||||
try:
|
||||
if needs_recompute:
|
||||
server_manager = EmbeddingServerManager(
|
||||
backend_module_name="leann_backend_hnsw.hnsw_embedding_server"
|
||||
)
|
||||
server_started, actual_port = server_manager.start_server(
|
||||
port=requested_zmq_port,
|
||||
model_name=self.embedding_model,
|
||||
embedding_mode=passage_meta_mode,
|
||||
passages_file=str(meta_path),
|
||||
distance_metric=distance_metric,
|
||||
provider_options=passage_provider_options,
|
||||
)
|
||||
if not server_started:
|
||||
raise RuntimeError(
|
||||
"Failed to start HNSW embedding server for recompute update."
|
||||
)
|
||||
if actual_port != requested_zmq_port:
|
||||
server_manager.stop_server()
|
||||
raise RuntimeError(
|
||||
"Embedding server started on unexpected port "
|
||||
f"{actual_port}; expected {requested_zmq_port}. Make sure the desired ZMQ port is free."
|
||||
)
|
||||
|
||||
if needs_recompute:
|
||||
for i in range(embeddings.shape[0]):
|
||||
print(f"add {i} embeddings")
|
||||
index.add(1, faiss.swig_ptr(embeddings[i : i + 1]))
|
||||
else:
|
||||
index.add(embeddings.shape[0], faiss.swig_ptr(embeddings))
|
||||
faiss.write_index(index, str(index_file))
|
||||
finally:
|
||||
if server_started and server_manager is not None:
|
||||
server_manager.stop_server()
|
||||
|
||||
except Exception:
|
||||
# Roll back appended passages/offset map to keep files consistent.
|
||||
if passages_file.exists():
|
||||
with open(passages_file, "rb+") as f:
|
||||
f.truncate(rollback_passages_size)
|
||||
offset_map = offset_map_backup
|
||||
with open(offset_file, "wb") as f:
|
||||
pickle.dump(offset_map, f)
|
||||
raise
|
||||
|
||||
meta["total_passages"] = len(offset_map)
|
||||
with open(meta_path, "w", encoding="utf-8") as f:
|
||||
|
||||
Reference in New Issue
Block a user