diff --git a/issue_159.py b/issue_159.py index 61dc1d7..e0c43b0 100644 --- a/issue_159.py +++ b/issue_159.py @@ -9,120 +9,125 @@ Configuration: - backend: hnsw """ -import time import os +import time from pathlib import Path + from leann.api import LeannBuilder, LeannSearcher +os.environ["LEANN_LOG_LEVEL"] = "DEBUG" + # Configuration matching the issue INDEX_PATH = "./test_issue_159.leann" EMBEDDING_MODEL = "BAAI/bge-large-zh-v1.5" BACKEND_NAME = "hnsw" -BEAM_WIDTH = 10 # Note: beam_width is mainly for DiskANN, not HNSW + def generate_test_data(num_chunks=90000, chunk_size=2000): """Generate test data similar to 180MB text (~90K chunks)""" # Each chunk is approximately 2000 characters # 90K chunks * 2000 chars ≈ 180MB chunks = [] - base_text = "这是一个测试文档。LEANN是一个创新的向量数据库,通过图基选择性重计算实现97%的存储节省。" - + base_text = ( + "这是一个测试文档。LEANN是一个创新的向量数据库,通过图基选择性重计算实现97%的存储节省。" + ) + for i in range(num_chunks): chunk = f"{base_text} 文档编号: {i}. " * (chunk_size // len(base_text) + 1) chunks.append(chunk[:chunk_size]) - + return chunks + def test_search_performance(): """Test search performance with different configurations""" print("=" * 80) print("Testing LEANN Search Performance (Issue #159)") print("=" * 80) - - # Check if index exists - skip build if it does - index_path = Path(INDEX_PATH) - if True: + + meta_path = Path(f"{INDEX_PATH}.meta.json") + if meta_path.exists(): print(f"\n✓ Index already exists at {INDEX_PATH}") print(" Skipping build phase. Delete the index to rebuild.") else: - print(f"\n📦 Building index...") + print("\n📦 Building index...") print(f" Backend: {BACKEND_NAME}") print(f" Embedding Model: {EMBEDDING_MODEL}") - print(f" Generating test data (~90K chunks, ~180MB)...") - + print(" Generating test data (~90K chunks, ~180MB)...") + chunks = generate_test_data(num_chunks=90000) print(f" Generated {len(chunks)} chunks") - print(f" Total text size: {sum(len(c) for c in chunks) / (1024*1024):.2f} MB") - + print(f" Total text size: {sum(len(c) for c in chunks) / (1024 * 1024):.2f} MB") + builder = LeannBuilder( backend_name=BACKEND_NAME, embedding_model=EMBEDDING_MODEL, ) - - print(f" Adding chunks to builder...") + + print(" Adding chunks to builder...") start_time = time.time() for i, chunk in enumerate(chunks): builder.add_text(chunk) if (i + 1) % 10000 == 0: print(f" Added {i + 1}/{len(chunks)} chunks...") - - print(f" Building index...") + + print(" Building index...") build_start = time.time() builder.build_index(INDEX_PATH) build_time = time.time() - build_start print(f" ✓ Index built in {build_time:.2f} seconds") - + # Test search with different complexity values - print(f"\n🔍 Testing search performance...") + print("\n🔍 Testing search performance...") searcher = LeannSearcher(INDEX_PATH) - + test_query = "LEANN向量数据库存储优化" - + # Test with default complexity (64) - print(f"\n Test 1: Default complexity (64) `1 ") + print("\n Test 1: Default complexity (64) `1 ") print(f" Query: '{test_query}'") start_time = time.time() - results = searcher.search(test_query, top_k=10, complexity=64, beam_width=BEAM_WIDTH) + results = searcher.search(test_query, top_k=10, complexity=64) search_time = time.time() - start_time print(f" ✓ Search completed in {search_time:.2f} seconds") print(f" Results: {len(results)} items") - + # Test with default complexity (64) - print(f"\n Test 1: Default complexity (64)") + print("\n Test 1: Default complexity (64)") print(f" Query: '{test_query}'") start_time = time.time() - results = searcher.search(test_query, top_k=10, complexity=64, beam_width=BEAM_WIDTH) + results = searcher.search(test_query, top_k=10, complexity=64) search_time = time.time() - start_time print(f" ✓ Search completed in {search_time:.2f} seconds") print(f" Results: {len(results)} items") - + # Test with lower complexity (32) - print(f"\n Test 2: Lower complexity (32)") + print("\n Test 2: Lower complexity (32)") print(f" Query: '{test_query}'") start_time = time.time() - results = searcher.search(test_query, top_k=10, complexity=32, beam_width=BEAM_WIDTH) + results = searcher.search(test_query, top_k=10, complexity=32) search_time = time.time() - start_time print(f" ✓ Search completed in {search_time:.2f} seconds") print(f" Results: {len(results)} items") - + # Test with even lower complexity (16) - print(f"\n Test 3: Lower complexity (16)") + print("\n Test 3: Lower complexity (16)") print(f" Query: '{test_query}'") start_time = time.time() - results = searcher.search(test_query, top_k=10, complexity=16, beam_width=BEAM_WIDTH) + results = searcher.search(test_query, top_k=10, complexity=16) search_time = time.time() - start_time print(f" ✓ Search completed in {search_time:.2f} seconds") print(f" Results: {len(results)} items") - + # Test with minimal complexity (8) - print(f"\n Test 4: Minimal complexity (8)") + print("\n Test 4: Minimal complexity (8)") print(f" Query: '{test_query}'") start_time = time.time() - results = searcher.search(test_query, top_k=10, complexity=8, beam_width=BEAM_WIDTH) + results = searcher.search(test_query, top_k=10, complexity=8) search_time = time.time() - start_time print(f" ✓ Search completed in {search_time:.2f} seconds") print(f" Results: {len(results)} items") - + print("\n" + "=" * 80) print("Performance Analysis:") print("=" * 80) @@ -139,6 +144,6 @@ def test_search_performance(): print("- Consider using DiskANN backend for better performance on large datasets") print("- Or use a smaller embedding model if speed is critical") + if __name__ == "__main__": test_search_performance() - diff --git a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py index ae58223..b55fa24 100644 --- a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py +++ b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py @@ -191,9 +191,7 @@ def create_hnsw_embedding_server( ) rep_socket.send(msgpack.packb(embeddings.tolist())) e2e_end = time.time() - logger.info( - f"⏱️ Direct text embedding E2E time: {e2e_end - e2e_start:.6f}s" - ) + logger.info(f"⏱️ Direct text embedding E2E time: {e2e_end - e2e_start:.6f}s") def _handle_distance_request(request: list[Any]) -> None: nonlocal last_request_type, last_request_length @@ -253,22 +251,14 @@ def create_hnsw_embedding_server( except Exception as exc: logger.error(f"Distance computation error, using sentinels: {exc}") - rep_socket.send( - msgpack.packb([response_distances], use_single_float=True) - ) + rep_socket.send(msgpack.packb([response_distances], use_single_float=True)) e2e_end = time.time() - logger.info( - f"⏱️ Distance calculation E2E time: {e2e_end - e2e_start:.6f}s" - ) + logger.info(f"⏱️ Distance calculation E2E time: {e2e_end - e2e_start:.6f}s") def _handle_embedding_by_id(request: Any) -> None: nonlocal last_request_type, last_request_length - if ( - isinstance(request, list) - and len(request) == 1 - and isinstance(request[0], list) - ): + if isinstance(request, list) and len(request) == 1 and isinstance(request[0], list): node_ids = request[0] elif isinstance(request, list): node_ids = request @@ -336,11 +326,9 @@ def create_hnsw_embedding_server( logger.error(f"Embedding computation error, returning zeros: {exc}") response_payload = [dims, flat_data] - rep_socket.send( - msgpack.packb(response_payload, use_single_float=True) - ) + rep_socket.send(msgpack.packb(response_payload, use_single_float=True)) e2e_end = time.time() - logger.info(f"⏱️ ZMQ E2E time: {e2e_end - e2e_start:.6f}s") + logger.info(f"⏱️ Fallback Embed by Id E2E time: {e2e_end - e2e_start:.6f}s") try: while not shutdown_event.is_set(): @@ -359,9 +347,7 @@ def create_hnsw_embedding_server( logger.error(f"Error unpacking ZMQ message: {exc}") try: safe = _build_safe_fallback() - rep_socket.send( - msgpack.packb(safe, use_single_float=True) - ) + rep_socket.send(msgpack.packb(safe, use_single_float=True)) except Exception: pass continue @@ -399,9 +385,7 @@ def create_hnsw_embedding_server( logger.error(f"Error in ZMQ server loop: {exc}") try: safe = _build_safe_fallback() - rep_socket.send( - msgpack.packb(safe, use_single_float=True) - ) + rep_socket.send(msgpack.packb(safe, use_single_float=True)) except Exception: pass finally: diff --git a/packages/leann-backend-hnsw/third_party/faiss b/packages/leann-backend-hnsw/third_party/faiss index e2d243c..301bf24 160000 --- a/packages/leann-backend-hnsw/third_party/faiss +++ b/packages/leann-backend-hnsw/third_party/faiss @@ -1 +1 @@ -Subproject commit e2d243c40ddc142b8c57c067c0441694f3c22121 +Subproject commit 301bf24f1467c6a09e8758cfb70aad856ce421db diff --git a/packages/leann-core/src/leann/embedding_compute.py b/packages/leann-core/src/leann/embedding_compute.py index 2af4469..2cc2595 100644 --- a/packages/leann-core/src/leann/embedding_compute.py +++ b/packages/leann-core/src/leann/embedding_compute.py @@ -215,9 +215,14 @@ def compute_embeddings( Normalized embeddings array, shape: (len(texts), embedding_dim) """ provider_options = provider_options or {} + wrapper_start_time = time.time() + logger.debug( + f"[compute_embeddings] entry: mode={mode}, model='{model_name}', text_count={len(texts)}" + ) if mode == "sentence-transformers": - return compute_embeddings_sentence_transformers( + inner_start_time = time.time() + result = compute_embeddings_sentence_transformers( texts, model_name, is_build=is_build, @@ -226,6 +231,14 @@ def compute_embeddings( manual_tokenize=manual_tokenize, max_length=max_length, ) + inner_end_time = time.time() + wrapper_end_time = time.time() + logger.debug( + "[compute_embeddings] sentence-transformers timings: " + f"inner={inner_end_time - inner_start_time:.6f}s, " + f"wrapper_total={wrapper_end_time - wrapper_start_time:.6f}s" + ) + return result elif mode == "openai": return compute_embeddings_openai( texts, @@ -271,6 +284,7 @@ def compute_embeddings_sentence_transformers( is_build: Whether this is a build operation (shows progress bar) adaptive_optimization: Whether to use adaptive optimization based on batch size """ + outer_start_time = time.time() # Handle empty input if not texts: raise ValueError("Cannot compute embeddings for empty text list") @@ -301,7 +315,14 @@ def compute_embeddings_sentence_transformers( # Create cache key cache_key = f"sentence_transformers_{model_name}_{device}_{use_fp16}_optimized" + pre_model_init_end_time = time.time() + logger.debug( + "compute_embeddings_sentence_transformers pre-model-init time " + f"(device/batch selection etc.): {pre_model_init_end_time - outer_start_time:.6f}s" + ) + # Check if model is already cached + start_time = time.time() if cache_key in _model_cache: logger.info(f"Using cached optimized model: {model_name}") model = _model_cache[cache_key] @@ -441,10 +462,13 @@ def compute_embeddings_sentence_transformers( _model_cache[cache_key] = model logger.info(f"Model cached: {cache_key}") - # Compute embeddings with optimized inference mode - logger.info( - f"Starting embedding computation... (batch_size: {batch_size}, manual_tokenize={manual_tokenize})" - ) + end_time = time.time() + + # Compute embeddings with optimized inference mode + logger.info( + f"Starting embedding computation... (batch_size: {batch_size}, manual_tokenize={manual_tokenize})" + ) + logger.info(f"start sentence transformers {model} takes {end_time - start_time}") start_time = time.time() if not manual_tokenize: @@ -465,32 +489,46 @@ def compute_embeddings_sentence_transformers( except Exception: pass else: - # Manual tokenization + forward pass using HF AutoTokenizer/AutoModel + # Manual tokenization + forward pass using HF AutoTokenizer/AutoModel. + # This path is reserved for an aggressively optimized FP pipeline + # (no quantization), mainly for experimentation. try: from transformers import AutoModel, AutoTokenizer # type: ignore except Exception as e: raise ImportError(f"transformers is required for manual_tokenize=True: {e}") - # Cache tokenizer and model tok_cache_key = f"hf_tokenizer_{model_name}" - mdl_cache_key = f"hf_model_{model_name}_{device}_{use_fp16}" + mdl_cache_key = f"hf_model_{model_name}_{device}_{use_fp16}_fp" + if tok_cache_key in _model_cache and mdl_cache_key in _model_cache: hf_tokenizer = _model_cache[tok_cache_key] hf_model = _model_cache[mdl_cache_key] - logger.info("Using cached HF tokenizer/model for manual path") + logger.info("Using cached HF tokenizer/model for manual FP path") else: - logger.info("Loading HF tokenizer/model for manual tokenization path") + logger.info("Loading HF tokenizer/model for manual FP path") hf_tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True) + torch_dtype = torch.float16 if (use_fp16 and device == "cuda") else torch.float32 - hf_model = AutoModel.from_pretrained(model_name, torch_dtype=torch_dtype) + hf_model = AutoModel.from_pretrained( + model_name, + torch_dtype=torch_dtype, + ) hf_model.to(device) + hf_model.eval() # Optional compile on supported devices if device in ["cuda", "mps"]: try: - hf_model = torch.compile(hf_model, mode="reduce-overhead", dynamic=True) # type: ignore - except Exception: - pass + hf_model = torch.compile( # type: ignore + hf_model, mode="reduce-overhead", dynamic=True + ) + logger.info( + f"Applied torch.compile to HF model for {model_name} " + f"(device={device}, dtype={torch_dtype})" + ) + except Exception as exc: + logger.warning(f"torch.compile optimization failed: {exc}") + _model_cache[tok_cache_key] = hf_tokenizer _model_cache[mdl_cache_key] = hf_model @@ -516,7 +554,6 @@ def compute_embeddings_sentence_transformers( for start_index in batch_iter: end_index = min(start_index + batch_size, len(texts)) batch_texts = texts[start_index:end_index] - tokenize_start_time = time.time() inputs = hf_tokenizer( batch_texts, padding=True, @@ -524,34 +561,17 @@ def compute_embeddings_sentence_transformers( max_length=max_length, return_tensors="pt", ) - tokenize_end_time = time.time() - logger.info( - f"Tokenize time taken: {tokenize_end_time - tokenize_start_time} seconds" - ) - # Print shapes of all input tensors for debugging - for k, v in inputs.items(): - print(f"inputs[{k!r}] shape: {getattr(v, 'shape', type(v))}") - to_device_start_time = time.time() inputs = {k: v.to(device) for k, v in inputs.items()} - to_device_end_time = time.time() - logger.info( - f"To device time taken: {to_device_end_time - to_device_start_time} seconds" - ) - forward_start_time = time.time() outputs = hf_model(**inputs) - forward_end_time = time.time() - logger.info(f"Forward time taken: {forward_end_time - forward_start_time} seconds") last_hidden_state = outputs.last_hidden_state # (B, L, H) attention_mask = inputs.get("attention_mask") if attention_mask is None: - # Fallback: assume all tokens are valid pooled = last_hidden_state.mean(dim=1) else: mask = attention_mask.unsqueeze(-1).to(last_hidden_state.dtype) masked = last_hidden_state * mask lengths = mask.sum(dim=1).clamp(min=1) pooled = masked.sum(dim=1) / lengths - # Move to CPU float32 batch_embeddings = pooled.detach().to("cpu").float().numpy() all_embeddings.append(batch_embeddings) @@ -571,6 +591,12 @@ def compute_embeddings_sentence_transformers( if np.isnan(embeddings).any() or np.isinf(embeddings).any(): raise RuntimeError(f"Detected NaN or Inf values in embeddings, model: {model_name}") + outer_end_time = time.time() + logger.debug( + "compute_embeddings_sentence_transformers total time " + f"(function entry -> return): {outer_end_time - outer_start_time:.6f}s" + ) + return embeddings