From bc621677f63fd1dbad69ff67e29b814f1faf546f Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Tue, 19 Aug 2025 10:58:16 -0700 Subject: [PATCH] perf: avoid merging offset dicts for lower mem usage --- .../diskann_embedding_server.py | 2 +- .../hnsw_embedding_server.py | 2 +- packages/leann-core/src/leann/api.py | 59 +++++++++++++------ .../src/leann/embedding_server_manager.py | 1 + 4 files changed, 44 insertions(+), 20 deletions(-) diff --git a/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py b/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py index 2b1e326..37272a9 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/diskann_embedding_server.py @@ -84,7 +84,7 @@ def create_diskann_embedding_server( logger.info(f"Loading PassageManager with metadata_file_path: {passages_file}") passages = PassageManager(meta["passage_sources"], metadata_file_path=passages_file) logger.info( - f"Loaded PassageManager with {len(passages.global_offset_map)} passages from metadata" + f"Loaded PassageManager with {len(passages)} passages from metadata" ) # Import protobuf after ensuring the path is correct diff --git a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py index 7c472ad..42b5cab 100644 --- a/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py +++ b/packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_embedding_server.py @@ -91,7 +91,7 @@ def create_hnsw_embedding_server( except Exception: embedding_dim = 0 logger.info( - f"Loaded PassageManager with {len(passages.global_offset_map)} passages from metadata" + f"Loaded PassageManager with {len(passages)} passages from metadata" ) # (legacy ZMQ thread removed; using shutdown-capable server only) diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index ec32569..676d867 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -119,9 +119,12 @@ class PassageManager: def __init__( self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None ): - self.offset_maps = {} - self.passage_files = {} - self.global_offset_map = {} # Combined map for fast lookup + self.offset_maps: dict[str, dict[str, int]] = {} + self.passage_files: dict[str, str] = {} + # Avoid materializing a single gigantic global map to reduce memory + # footprint on very large corpora (e.g., 60M+ passages). Instead, keep + # per-shard maps and do a lightweight per-shard lookup on demand. + self._total_count: int = 0 # Derive index base name for standard sibling fallbacks, e.g., .passages.* index_name_base = None @@ -142,12 +145,25 @@ class PassageManager: default_name: Optional[str], source_dict: dict[str, Any], ) -> list[Path]: + """ + Build an ordered list of candidate paths. For relative paths specified in + metadata, prefer resolution relative to the metadata file directory first, + then fall back to CWD-based resolution, and finally to conventional + sibling defaults (e.g., .passages.idx / .jsonl). + """ candidates: list[Path] = [] - # 1) Primary as-is (absolute or relative) + # 1) Primary path if primary: p = Path(primary) - candidates.append(p if p.is_absolute() else (Path.cwd() / p)) - # 2) metadata-relative explicit relative key + if p.is_absolute(): + candidates.append(p) + else: + # Prefer metadata-relative resolution for relative paths + if metadata_file_path: + candidates.append(Path(metadata_file_path).parent / p) + # Also consider CWD-relative as a fallback for legacy layouts + candidates.append(Path.cwd() / p) + # 2) metadata-relative explicit relative key (if present) if metadata_file_path and source_dict.get(relative_key): candidates.append(Path(metadata_file_path).parent / source_dict[relative_key]) # 3) metadata-relative standard sibling filename @@ -177,23 +193,28 @@ class PassageManager: raise FileNotFoundError(f"Passage index file not found: {index_file}") with open(index_file, "rb") as f: - offset_map = pickle.load(f) + offset_map: dict[str, int] = pickle.load(f) self.offset_maps[passage_file] = offset_map self.passage_files[passage_file] = passage_file - - # Build global map for O(1) lookup - for passage_id, offset in offset_map.items(): - self.global_offset_map[passage_id] = (passage_file, offset) + self._total_count += len(offset_map) def get_passage(self, passage_id: str) -> dict[str, Any]: - if passage_id in self.global_offset_map: - passage_file, offset = self.global_offset_map[passage_id] - # Lazy file opening - only open when needed - with open(passage_file, encoding="utf-8") as f: - f.seek(offset) - return json.loads(f.readline()) + # Fast path: check each shard map (there are typically few shards). + # This avoids building a massive combined dict while keeping lookups + # bounded by the number of shards. + for passage_file, offset_map in self.offset_maps.items(): + try: + offset = offset_map[passage_id] + with open(passage_file, encoding="utf-8") as f: + f.seek(offset) + return json.loads(f.readline()) + except KeyError: + continue raise KeyError(f"Passage ID not found: {passage_id}") + def __len__(self) -> int: + return self._total_count + class LeannBuilder: def __init__( @@ -584,7 +605,9 @@ class LeannSearcher: logger.info(f" Additional kwargs: {kwargs}") # Smart top_k detection and adjustment - total_docs = len(self.passage_manager.global_offset_map) + # Use PassageManager length (sum of shard sizes) to avoid + # depending on a massive combined map + total_docs = len(self.passage_manager) original_top_k = top_k if top_k > total_docs: top_k = total_docs diff --git a/packages/leann-core/src/leann/embedding_server_manager.py b/packages/leann-core/src/leann/embedding_server_manager.py index 05c8639..3d7c31e 100644 --- a/packages/leann-core/src/leann/embedding_server_manager.py +++ b/packages/leann-core/src/leann/embedding_server_manager.py @@ -192,6 +192,7 @@ class EmbeddingServerManager: stderr_target = None # Direct to console for visible logs # Start embedding server subprocess + logger.info(f"Starting server process with command: {' '.join(command)}") self.server_process = subprocess.Popen( command, cwd=project_root,