diff --git a/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py b/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py index 926a26e..8c44704 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py @@ -137,6 +137,71 @@ class DiskannBuilder(LeannBackendBuilderInterface): def __init__(self, **kwargs): self.build_params = kwargs + def _safe_cleanup_after_partition(self, index_dir: Path, index_prefix: str): + """ + Safely cleanup files after partition. + In partition mode, C++ doesn't read _disk.index content, + so we can delete it if all derived files exist. + """ + disk_index_file = index_dir / f"{index_prefix}_disk.index" + beam_search_file = index_dir / f"{index_prefix}_disk_beam_search.index" + + # Required files that C++ partition mode needs + # Note: C++ generates these with _disk.index suffix + disk_suffix = "_disk.index" + required_files = [ + f"{index_prefix}{disk_suffix}_medoids.bin", # Critical: assert fails if missing + # Note: _centroids.bin is not created in single-shot build - C++ handles this automatically + f"{index_prefix}_pq_pivots.bin", # PQ table + f"{index_prefix}_pq_compressed.bin", # PQ compressed vectors + ] + + # Check if all required files exist + missing_files = [] + for filename in required_files: + file_path = index_dir / filename + if not file_path.exists(): + missing_files.append(filename) + + if missing_files: + logger.warning( + f"Cannot safely delete _disk.index - missing required files: {missing_files}" + ) + logger.info("Keeping all original files for safety") + return + + # Calculate space savings + space_saved = 0 + files_to_delete = [] + + if disk_index_file.exists(): + space_saved += disk_index_file.stat().st_size + files_to_delete.append(disk_index_file) + + if beam_search_file.exists(): + space_saved += beam_search_file.stat().st_size + files_to_delete.append(beam_search_file) + + # Safe to delete! + for file_to_delete in files_to_delete: + try: + os.remove(file_to_delete) + logger.info(f"✅ Safely deleted: {file_to_delete.name}") + except Exception as e: + logger.warning(f"Failed to delete {file_to_delete.name}: {e}") + + if space_saved > 0: + space_saved_mb = space_saved / (1024 * 1024) + logger.info(f"💾 Space saved: {space_saved_mb:.1f} MB") + + # Show what files are kept + logger.info("📁 Kept essential files for partition mode:") + for filename in required_files: + file_path = index_dir / filename + if file_path.exists(): + size_mb = file_path.stat().st_size / (1024 * 1024) + logger.info(f" - {filename} ({size_mb:.1f} MB)") + def build(self, data: np.ndarray, ids: list[str], index_path: str, **kwargs): path = Path(index_path) index_dir = path.parent @@ -151,6 +216,17 @@ class DiskannBuilder(LeannBackendBuilderInterface): _write_vectors_to_bin(data, index_dir / data_filename) build_kwargs = {**self.build_params, **kwargs} + + # Extract is_recompute from nested backend_kwargs if needed + is_recompute = build_kwargs.get("is_recompute", False) + if not is_recompute and "backend_kwargs" in build_kwargs: + is_recompute = build_kwargs["backend_kwargs"].get("is_recompute", False) + + # Flatten all backend_kwargs parameters to top level for compatibility + if "backend_kwargs" in build_kwargs: + nested_params = build_kwargs.pop("backend_kwargs") + build_kwargs.update(nested_params) + metric_enum = _get_diskann_metrics().get( build_kwargs.get("distance_metric", "mips").lower() ) @@ -185,6 +261,30 @@ class DiskannBuilder(LeannBackendBuilderInterface): build_kwargs.get("pq_disk_bytes", 0), "", ) + + # Auto-partition if is_recompute is enabled + if build_kwargs.get("is_recompute", False): + logger.info("is_recompute=True, starting automatic graph partitioning...") + from .graph_partition import partition_graph + + # Partition the index using absolute paths + # Convert to absolute paths to avoid issues with working directory changes + absolute_index_dir = Path(index_dir).resolve() + absolute_index_prefix_path = str(absolute_index_dir / index_prefix) + disk_graph_path, partition_bin_path = partition_graph( + index_prefix_path=absolute_index_prefix_path, + output_dir=str(absolute_index_dir), + partition_prefix=index_prefix, + ) + + # Safe cleanup: In partition mode, C++ doesn't read _disk.index content + # but still needs the derived files (_medoids.bin, _centroids.bin, etc.) + self._safe_cleanup_after_partition(index_dir, index_prefix) + + logger.info("✅ Graph partitioning completed successfully!") + logger.info(f" - Disk graph: {disk_graph_path}") + logger.info(f" - Partition file: {partition_bin_path}") + finally: temp_data_file = index_dir / data_filename if temp_data_file.exists(): @@ -213,7 +313,26 @@ class DiskannSearcher(BaseSearcher): # For DiskANN, we need to reinitialize the index when zmq_port changes # Store the initialization parameters for later use - full_index_prefix = str(self.index_dir / self.index_path.stem) + # Note: C++ load method expects the BASE path (without _disk.index suffix) + # C++ internally constructs: index_prefix + "_disk.index" + index_name = self.index_path.stem # "simple_test.leann" -> "simple_test" + diskann_index_prefix = str(self.index_dir / index_name) # /path/to/simple_test + full_index_prefix = diskann_index_prefix # /path/to/simple_test (base path) + + # Auto-detect partition files and set partition_prefix + partition_graph_file = self.index_dir / f"{index_name}_disk_graph.index" + partition_bin_file = self.index_dir / f"{index_name}_partition.bin" + + partition_prefix = "" + if partition_graph_file.exists() and partition_bin_file.exists(): + # C++ expects full path prefix, not just filename + partition_prefix = str(self.index_dir / index_name) # /path/to/simple_test + logger.info( + f"✅ Detected partition files, using partition_prefix='{partition_prefix}'" + ) + else: + logger.debug("No partition files detected, using standard index files") + self._init_params = { "metric_enum": metric_enum, "full_index_prefix": full_index_prefix, @@ -221,8 +340,14 @@ class DiskannSearcher(BaseSearcher): "num_nodes_to_cache": kwargs.get("num_nodes_to_cache", 0), "cache_mechanism": 1, "pq_prefix": "", - "partition_prefix": "", + "partition_prefix": partition_prefix, } + + # Log partition configuration for debugging + if partition_prefix: + logger.info( + f"✅ Detected partition files, using partition_prefix='{partition_prefix}'" + ) self._diskannpy = diskannpy self._current_zmq_port = None self._index = None diff --git a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py index 18dd350..1a608bc 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py @@ -86,7 +86,11 @@ class GraphPartitioner: os.chdir(original_dir) def partition_graph( - self, index_prefix_path: str, output_dir: str | None = None, **kwargs + self, + index_prefix_path: str, + output_dir: str | None = None, + partition_prefix: str | None = None, + **kwargs, ) -> tuple[str, str]: """ Partition a disk-based index for improved performance. @@ -94,6 +98,7 @@ class GraphPartitioner: Args: index_prefix_path: Path to the index prefix (e.g., "/path/to/index") output_dir: Output directory for results (defaults to parent of index_prefix_path) + partition_prefix: Prefix for output files (defaults to basename of index_prefix_path) **kwargs: Additional parameters for graph partitioning: - gp_times: Number of LDG partition iterations (default: 10) - lock_nums: Number of lock nodes (default: 10) @@ -126,6 +131,10 @@ class GraphPartitioner: # Create output directory if it doesn't exist Path(output_dir).mkdir(parents=True, exist_ok=True) + # Determine partition prefix + if partition_prefix is None: + partition_prefix = Path(index_prefix_path).name + # Get executable paths partitioner_path = self._get_executable_path("partitioner") relayout_path = self._get_executable_path("index_relayout") @@ -216,8 +225,8 @@ class GraphPartitioner: ) # Copy results to output directory - disk_graph_path = Path(output_dir) / "_disk_graph.index" - partition_bin_path = Path(output_dir) / "_partition.bin" + disk_graph_path = Path(output_dir) / f"{partition_prefix}_disk_graph.index" + partition_bin_path = Path(output_dir) / f"{partition_prefix}_partition.bin" shutil.copy2(part_tmp_index, disk_graph_path) shutil.copy2(gp_file_path, partition_bin_path) @@ -252,7 +261,11 @@ class GraphPartitioner: def partition_graph( - index_prefix_path: str, output_dir: str | None = None, build_type: str = "release", **kwargs + index_prefix_path: str, + output_dir: str | None = None, + partition_prefix: str | None = None, + build_type: str = "release", + **kwargs, ) -> tuple[str, str]: """ Convenience function to partition a graph index. @@ -260,6 +273,7 @@ def partition_graph( Args: index_prefix_path: Path to the index prefix output_dir: Output directory (defaults to parent of index_prefix_path) + partition_prefix: Prefix for output files (defaults to basename of index_prefix_path) build_type: Build type for executables ("debug" or "release") **kwargs: Additional parameters for graph partitioning @@ -267,7 +281,7 @@ def partition_graph( Tuple of (disk_graph_index_path, partition_bin_path) """ partitioner = GraphPartitioner(build_type=build_type) - return partitioner.partition_graph(index_prefix_path, output_dir, **kwargs) + return partitioner.partition_graph(index_prefix_path, output_dir, partition_prefix, **kwargs) # Example usage: diff --git a/packages/leann-backend-diskann/third_party/DiskANN b/packages/leann-backend-diskann/third_party/DiskANN index b2dc4ea..c73bcec 160000 --- a/packages/leann-backend-diskann/third_party/DiskANN +++ b/packages/leann-backend-diskann/third_party/DiskANN @@ -1 +1 @@ -Subproject commit b2dc4ea2c7e52e8a6481d3ba10003e192192a7b7 +Subproject commit c73bcec98c956181602f42b94f00d6a24388c1f2