diff --git a/examples/main_cli_example.py b/examples/main_cli_example.py index d52d7ce..9b093d2 100644 --- a/examples/main_cli_example.py +++ b/examples/main_cli_example.py @@ -1,40 +1,40 @@ import argparse -from llama_index.core import SimpleDirectoryReader, Settings +from llama_index.core import SimpleDirectoryReader from llama_index.core.node_parser import SentenceSplitter import asyncio import dotenv -from leann.api import LeannBuilder, LeannSearcher, LeannChat -import shutil +from leann.api import LeannBuilder, LeannChat from pathlib import Path dotenv.load_dotenv() -node_parser = SentenceSplitter( - chunk_size=256, chunk_overlap=128, separator=" ", paragraph_separator="\n\n" -) -print("Loading documents...") -documents = SimpleDirectoryReader( - "examples/data", - recursive=True, - encoding="utf-8", - required_exts=[".pdf", ".txt", ".md"], -).load_data(show_progress=True) -print("Documents loaded.") -all_texts = [] -for doc in documents: - nodes = node_parser.get_nodes_from_documents([doc]) - for node in nodes: - all_texts.append(node.get_content()) - async def main(args): INDEX_DIR = Path(args.index_dir) INDEX_PATH = str(INDEX_DIR / "pdf_documents.leann") if not INDEX_DIR.exists(): - print(f"--- Index directory not found, building new index ---") + node_parser = SentenceSplitter( + chunk_size=256, chunk_overlap=128, separator=" ", paragraph_separator="\n\n" + ) - print(f"\n[PHASE 1] Building Leann index...") + print("Loading documents...") + documents = SimpleDirectoryReader( + "examples/data", + recursive=True, + encoding="utf-8", + required_exts=[".pdf", ".txt", ".md"], + ).load_data(show_progress=True) + print("Documents loaded.") + all_texts = [] + for doc in documents: + nodes = node_parser.get_nodes_from_documents([doc]) + for node in nodes: + all_texts.append(node.get_content()) + + print("--- Index directory not found, building new index ---") + + print("\n[PHASE 1] Building Leann index...") # Use HNSW backend for better macOS compatibility builder = LeannBuilder( diff --git a/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py b/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py index 18bcd09..6cbf7c2 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py @@ -94,7 +94,9 @@ def create_diskann_embedding_server( def zmq_server_thread(): """ZMQ server thread using REP socket for universal compatibility""" context = zmq.Context() - socket = context.socket(zmq.REP) # REP socket for both BaseSearcher and DiskANN C++ REQ clients + socket = context.socket( + zmq.REP + ) # REP socket for both BaseSearcher and DiskANN C++ REQ clients socket.bind(f"tcp://*:{zmq_port}") logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}") @@ -128,7 +130,9 @@ def create_diskann_embedding_server( node_ids = list(req_proto.node_ids) if not node_ids: - raise RuntimeError(f"PROTOBUF: Received empty node_ids! Message size: {len(message)}") + raise RuntimeError( + f"PROTOBUF: Received empty node_ids! Message size: {len(message)}" + ) logger.info( f"✅ PROTOBUF: Node ID request for {len(node_ids)} node embeddings: {node_ids[:10]}" @@ -175,9 +179,7 @@ def create_diskann_embedding_server( raise # Debug logging - logger.debug( - f"Processing {len(texts)} texts" - ) + logger.debug(f"Processing {len(texts)} texts") logger.debug( f"Text lengths: {[len(t) for t in texts[:5]]}" ) # Show first 5 @@ -220,6 +222,7 @@ def create_diskann_embedding_server( except Exception as e: logger.error(f"Error in ZMQ server loop: {e}") import traceback + traceback.print_exc() raise @@ -237,6 +240,17 @@ def create_diskann_embedding_server( if __name__ == "__main__": + import signal + import sys + + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}, shutting down gracefully...") + sys.exit(0) + + # Register signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + parser = argparse.ArgumentParser(description="DiskANN Embedding service") parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on") parser.add_argument( diff --git a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py index 07e74c4..87b89ed 100644 --- a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py +++ b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py @@ -268,6 +268,17 @@ def create_hnsw_embedding_server( if __name__ == "__main__": + import signal + import sys + + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}, shutting down gracefully...") + sys.exit(0) + + # Register signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + parser = argparse.ArgumentParser(description="HNSW Embedding service") parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on") parser.add_argument( diff --git a/packages/leann-core/src/leann/embedding_server_manager.py b/packages/leann-core/src/leann/embedding_server_manager.py index 9ef7c78..2921d18 100644 --- a/packages/leann-core/src/leann/embedding_server_manager.py +++ b/packages/leann-core/src/leann/embedding_server_manager.py @@ -13,7 +13,7 @@ import psutil LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper() logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), - format="%(asctime)s - %(levelname)s - %(message)s", + format="%(levelname)s - %(name)s - %(message)s", ) logger = logging.getLogger(__name__) @@ -339,4 +339,10 @@ class EmbeddingServerManager: ) self.server_process.kill() + # Clean up process resources to prevent resource tracker warnings + try: + self.server_process.wait() # Ensure process is fully cleaned up + except Exception: + pass + self.server_process = None