fix: supress resources leak logs

This commit is contained in:
Andy Lee
2025-07-22 19:53:45 -07:00
parent d3f85678ec
commit 43155d2811
4 changed files with 59 additions and 28 deletions

View File

@@ -1,40 +1,40 @@
import argparse import argparse
from llama_index.core import SimpleDirectoryReader, Settings from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter from llama_index.core.node_parser import SentenceSplitter
import asyncio import asyncio
import dotenv import dotenv
from leann.api import LeannBuilder, LeannSearcher, LeannChat from leann.api import LeannBuilder, LeannChat
import shutil
from pathlib import Path from pathlib import Path
dotenv.load_dotenv() dotenv.load_dotenv()
node_parser = SentenceSplitter(
chunk_size=256, chunk_overlap=128, separator=" ", paragraph_separator="\n\n"
)
print("Loading documents...")
documents = SimpleDirectoryReader(
"examples/data",
recursive=True,
encoding="utf-8",
required_exts=[".pdf", ".txt", ".md"],
).load_data(show_progress=True)
print("Documents loaded.")
all_texts = []
for doc in documents:
nodes = node_parser.get_nodes_from_documents([doc])
for node in nodes:
all_texts.append(node.get_content())
async def main(args): async def main(args):
INDEX_DIR = Path(args.index_dir) INDEX_DIR = Path(args.index_dir)
INDEX_PATH = str(INDEX_DIR / "pdf_documents.leann") INDEX_PATH = str(INDEX_DIR / "pdf_documents.leann")
if not INDEX_DIR.exists(): if not INDEX_DIR.exists():
print(f"--- Index directory not found, building new index ---") node_parser = SentenceSplitter(
chunk_size=256, chunk_overlap=128, separator=" ", paragraph_separator="\n\n"
)
print(f"\n[PHASE 1] Building Leann index...") print("Loading documents...")
documents = SimpleDirectoryReader(
"examples/data",
recursive=True,
encoding="utf-8",
required_exts=[".pdf", ".txt", ".md"],
).load_data(show_progress=True)
print("Documents loaded.")
all_texts = []
for doc in documents:
nodes = node_parser.get_nodes_from_documents([doc])
for node in nodes:
all_texts.append(node.get_content())
print("--- Index directory not found, building new index ---")
print("\n[PHASE 1] Building Leann index...")
# Use HNSW backend for better macOS compatibility # Use HNSW backend for better macOS compatibility
builder = LeannBuilder( builder = LeannBuilder(

View File

@@ -94,7 +94,9 @@ def create_diskann_embedding_server(
def zmq_server_thread(): def zmq_server_thread():
"""ZMQ server thread using REP socket for universal compatibility""" """ZMQ server thread using REP socket for universal compatibility"""
context = zmq.Context() context = zmq.Context()
socket = context.socket(zmq.REP) # REP socket for both BaseSearcher and DiskANN C++ REQ clients socket = context.socket(
zmq.REP
) # REP socket for both BaseSearcher and DiskANN C++ REQ clients
socket.bind(f"tcp://*:{zmq_port}") socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}") logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}")
@@ -128,7 +130,9 @@ def create_diskann_embedding_server(
node_ids = list(req_proto.node_ids) node_ids = list(req_proto.node_ids)
if not node_ids: if not node_ids:
raise RuntimeError(f"PROTOBUF: Received empty node_ids! Message size: {len(message)}") raise RuntimeError(
f"PROTOBUF: Received empty node_ids! Message size: {len(message)}"
)
logger.info( logger.info(
f"✅ PROTOBUF: Node ID request for {len(node_ids)} node embeddings: {node_ids[:10]}" f"✅ PROTOBUF: Node ID request for {len(node_ids)} node embeddings: {node_ids[:10]}"
@@ -175,9 +179,7 @@ def create_diskann_embedding_server(
raise raise
# Debug logging # Debug logging
logger.debug( logger.debug(f"Processing {len(texts)} texts")
f"Processing {len(texts)} texts"
)
logger.debug( logger.debug(
f"Text lengths: {[len(t) for t in texts[:5]]}" f"Text lengths: {[len(t) for t in texts[:5]]}"
) # Show first 5 ) # Show first 5
@@ -220,6 +222,7 @@ def create_diskann_embedding_server(
except Exception as e: except Exception as e:
logger.error(f"Error in ZMQ server loop: {e}") logger.error(f"Error in ZMQ server loop: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
raise raise
@@ -237,6 +240,17 @@ def create_diskann_embedding_server(
if __name__ == "__main__": if __name__ == "__main__":
import signal
import sys
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down gracefully...")
sys.exit(0)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
parser = argparse.ArgumentParser(description="DiskANN Embedding service") parser = argparse.ArgumentParser(description="DiskANN Embedding service")
parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on") parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on")
parser.add_argument( parser.add_argument(

View File

@@ -268,6 +268,17 @@ def create_hnsw_embedding_server(
if __name__ == "__main__": if __name__ == "__main__":
import signal
import sys
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down gracefully...")
sys.exit(0)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
parser = argparse.ArgumentParser(description="HNSW Embedding service") parser = argparse.ArgumentParser(description="HNSW Embedding service")
parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on") parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on")
parser.add_argument( parser.add_argument(

View File

@@ -13,7 +13,7 @@ import psutil
LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper() LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper()
logging.basicConfig( logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO), level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s - %(levelname)s - %(message)s", format="%(levelname)s - %(name)s - %(message)s",
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -339,4 +339,10 @@ class EmbeddingServerManager:
) )
self.server_process.kill() self.server_process.kill()
# Clean up process resources to prevent resource tracker warnings
try:
self.server_process.wait() # Ensure process is fully cleaned up
except Exception:
pass
self.server_process = None self.server_process = None