From 4a1353761a3a17556de3aa0fbf542a7db2baa040 Mon Sep 17 00:00:00 2001 From: yichuan520030910320 Date: Tue, 5 Aug 2025 23:23:07 -0700 Subject: [PATCH] merge --- .../leann_backend_diskann/__init__.py | 7 +- .../leann_backend_diskann/graph_partition.py | 169 +++++++++--------- .../graph_partition_simple.py | 92 +++++----- 3 files changed, 124 insertions(+), 144 deletions(-) diff --git a/packages/leann-backend-diskann/leann_backend_diskann/__init__.py b/packages/leann-backend-diskann/leann_backend_diskann/__init__.py index aaf671e..d569076 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/__init__.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/__init__.py @@ -4,9 +4,4 @@ from . import graph_partition # Export main classes and functions from .graph_partition import GraphPartitioner, partition_graph -__all__ = [ - "diskann_backend", - "graph_partition", - "GraphPartitioner", - "partition_graph" -] +__all__ = ["diskann_backend", "graph_partition", "GraphPartitioner", "partition_graph"] diff --git a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py index cec3c7f..1284ee5 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py @@ -8,31 +8,30 @@ performance. """ import os +import shutil import subprocess import tempfile -import shutil from pathlib import Path -from typing import Optional, Tuple class GraphPartitioner: """ A Python interface for DiskANN's graph partition functionality. - + This class provides methods to partition disk-based indices for improved search performance and memory efficiency. """ - + def __init__(self, build_type: str = "release"): """ Initialize the GraphPartitioner. - + Args: build_type: Build type for the executables ("debug" or "release") """ self.build_type = build_type self._ensure_executables() - + def _get_executable_path(self, name: str) -> str: """Get the path to a graph partition executable.""" # Get the directory where this Python module is located @@ -40,64 +39,58 @@ class GraphPartitioner: # Navigate to the graph_partition directory graph_partition_dir = module_dir.parent / "third_party" / "DiskANN" / "graph_partition" executable_path = graph_partition_dir / "build" / self.build_type / "graph_partition" / name - + if not executable_path.exists(): raise FileNotFoundError(f"Executable {name} not found at {executable_path}") - + return str(executable_path) - + def _ensure_executables(self): """Ensure that the required executables are built.""" try: self._get_executable_path("partitioner") self._get_executable_path("index_relayout") - except FileNotFoundError as e: + except FileNotFoundError: # Try to build the executables automatically print("Executables not found, attempting to build them...") self._build_executables() - + def _build_executables(self): """Build the required executables.""" - graph_partition_dir = Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" + graph_partition_dir = ( + Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" + ) original_dir = os.getcwd() - + try: os.chdir(graph_partition_dir) - + # Clean any existing build if (graph_partition_dir / "build").exists(): shutil.rmtree(graph_partition_dir / "build") - + # Run the build script cmd = ["./build.sh", self.build_type, "split_graph", "/tmp/dummy"] - result = subprocess.run( - cmd, - capture_output=True, - text=True, - cwd=graph_partition_dir - ) - + subprocess.run(cmd, capture_output=True, text=True, cwd=graph_partition_dir) + # Check if executables were created partitioner_path = self._get_executable_path("partitioner") relayout_path = self._get_executable_path("index_relayout") - + print(f"✅ Built partitioner: {partitioner_path}") print(f"✅ Built index_relayout: {relayout_path}") - + except Exception as e: raise RuntimeError(f"Failed to build executables: {e}") finally: os.chdir(original_dir) - + def partition_graph( - self, - index_prefix_path: str, - output_dir: Optional[str] = None, - **kwargs - ) -> Tuple[str, str]: + self, index_prefix_path: str, output_dir: str | None = None, **kwargs + ) -> tuple[str, str]: """ Partition a disk-based index for improved performance. - + Args: index_prefix_path: Path to the index prefix (e.g., "/path/to/index") output_dir: Output directory for results (defaults to parent of index_prefix_path) @@ -108,10 +101,10 @@ class GraphPartitioner: - scale_factor: Scale factor (default: 1) - data_type: Data type (default: "float") - thread_nums: Number of threads (default: 10) - + Returns: Tuple of (disk_graph_index_path, partition_bin_path) - + Raises: RuntimeError: If the partitioning process fails """ @@ -123,71 +116,80 @@ class GraphPartitioner: "scale_factor": 1, "data_type": "float", "thread_nums": 10, - **kwargs + **kwargs, } - + # Determine output directory if output_dir is None: output_dir = str(Path(index_prefix_path).parent) - + # Get executable paths partitioner_path = self._get_executable_path("partitioner") relayout_path = self._get_executable_path("index_relayout") - + # Create temporary directory for processing with tempfile.TemporaryDirectory() as temp_dir: # Change to the graph_partition directory for temporary files - graph_partition_dir = Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" + graph_partition_dir = ( + Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" + ) original_dir = os.getcwd() - + try: os.chdir(graph_partition_dir) - + # Create temporary data directory temp_data_dir = Path(temp_dir) / "data" temp_data_dir.mkdir(parents=True, exist_ok=True) - + # Set up paths for temporary files graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH" - graph_gp_path = graph_path / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" + graph_gp_path = ( + graph_path + / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" + ) graph_gp_path.mkdir(parents=True, exist_ok=True) - + # Find input index file old_index_file = f"{index_prefix_path}_disk_beam_search.index" if not os.path.exists(old_index_file): old_index_file = f"{index_prefix_path}_disk.index" - + if not os.path.exists(old_index_file): raise RuntimeError(f"Index file not found: {old_index_file}") - + # Run partitioner gp_file_path = graph_gp_path / "_part.bin" partitioner_cmd = [ partitioner_path, - "--index_file", old_index_file, - "--data_type", params["data_type"], - "--gp_file", str(gp_file_path), - "-T", str(params["thread_nums"]), - "--ldg_times", str(params["gp_times"]), - "--scale", str(params["scale_factor"]), - "--mode", "1" + "--index_file", + old_index_file, + "--data_type", + params["data_type"], + "--gp_file", + str(gp_file_path), + "-T", + str(params["thread_nums"]), + "--ldg_times", + str(params["gp_times"]), + "--scale", + str(params["scale_factor"]), + "--mode", + "1", ] - + print(f"Running partitioner: {' '.join(partitioner_cmd)}") result = subprocess.run( - partitioner_cmd, - capture_output=True, - text=True, - cwd=graph_partition_dir + partitioner_cmd, capture_output=True, text=True, cwd=graph_partition_dir ) - + if result.returncode != 0: raise RuntimeError( f"Partitioner failed with return code {result.returncode}.\n" f"stdout: {result.stdout}\n" f"stderr: {result.stderr}" ) - + # Run relayout part_tmp_index = graph_gp_path / "_part_tmp.index" relayout_cmd = [ @@ -195,75 +197,69 @@ class GraphPartitioner: old_index_file, str(gp_file_path), params["data_type"], - "1" + "1", ] - + print(f"Running relayout: {' '.join(relayout_cmd)}") result = subprocess.run( - relayout_cmd, - capture_output=True, - text=True, - cwd=graph_partition_dir + relayout_cmd, capture_output=True, text=True, cwd=graph_partition_dir ) - + if result.returncode != 0: raise RuntimeError( f"Relayout failed with return code {result.returncode}.\n" f"stdout: {result.stdout}\n" f"stderr: {result.stderr}" ) - + # Copy results to output directory disk_graph_path = Path(output_dir) / "_disk_graph.index" partition_bin_path = Path(output_dir) / "_partition.bin" - + shutil.copy2(part_tmp_index, disk_graph_path) shutil.copy2(gp_file_path, partition_bin_path) - + print(f"Results copied to: {output_dir}") return str(disk_graph_path), str(partition_bin_path) - + finally: os.chdir(original_dir) - + def get_partition_info(self, partition_bin_path: str) -> dict: """ Get information about a partition file. - + Args: partition_bin_path: Path to the partition binary file - + Returns: Dictionary containing partition information """ if not os.path.exists(partition_bin_path): raise FileNotFoundError(f"Partition file not found: {partition_bin_path}") - + # For now, return basic file information # In the future, this could parse the binary file for detailed info stat = os.stat(partition_bin_path) return { "file_size": stat.st_size, "file_path": partition_bin_path, - "modified_time": stat.st_mtime + "modified_time": stat.st_mtime, } def partition_graph( - index_prefix_path: str, - output_dir: Optional[str] = None, - build_type: str = "release", - **kwargs -) -> Tuple[str, str]: + index_prefix_path: str, output_dir: str | None = None, build_type: str = "release", **kwargs +) -> tuple[str, str]: """ Convenience function to partition a graph index. - + Args: index_prefix_path: Path to the index prefix output_dir: Output directory (defaults to parent of index_prefix_path) build_type: Build type for executables ("debug" or "release") **kwargs: Additional parameters for graph partitioning - + Returns: Tuple of (disk_graph_index_path, partition_bin_path) """ @@ -276,13 +272,10 @@ if __name__ == "__main__": # Example: partition an index try: disk_graph_path, partition_bin_path = partition_graph( - "/path/to/your/index_prefix", - gp_times=10, - lock_nums=10, - cut=100 + "/path/to/your/index_prefix", gp_times=10, lock_nums=10, cut=100 ) - print(f"Partitioning completed successfully!") + print("Partitioning completed successfully!") print(f"Disk graph index: {disk_graph_path}") print(f"Partition binary: {partition_bin_path}") except Exception as e: - print(f"Partitioning failed: {e}") \ No newline at end of file + print(f"Partitioning failed: {e}") diff --git a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py index d115244..0030daa 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py @@ -9,24 +9,20 @@ that directly calls the existing executables. import os import subprocess import tempfile -import shutil from pathlib import Path -from typing import Optional, Tuple def partition_graph_simple( - index_prefix_path: str, - output_dir: Optional[str] = None, - **kwargs -) -> Tuple[str, str]: + index_prefix_path: str, output_dir: str | None = None, **kwargs +) -> tuple[str, str]: """ Simple function to partition a graph index. - + Args: index_prefix_path: Path to the index prefix (e.g., "/path/to/index") output_dir: Output directory (defaults to parent of index_prefix_path) **kwargs: Additional parameters for graph partitioning - + Returns: Tuple of (disk_graph_index_path, partition_bin_path) """ @@ -38,69 +34,65 @@ def partition_graph_simple( "scale_factor": 1, "data_type": "float", "thread_nums": 10, - **kwargs + **kwargs, } - + # Determine output directory if output_dir is None: output_dir = str(Path(index_prefix_path).parent) - + # Find the graph_partition directory current_file = Path(__file__) graph_partition_dir = current_file.parent.parent / "third_party" / "DiskANN" / "graph_partition" - + if not graph_partition_dir.exists(): raise RuntimeError(f"Graph partition directory not found: {graph_partition_dir}") - + # Find input index file old_index_file = f"{index_prefix_path}_disk_beam_search.index" if not os.path.exists(old_index_file): old_index_file = f"{index_prefix_path}_disk.index" - + if not os.path.exists(old_index_file): raise RuntimeError(f"Index file not found: {old_index_file}") - + # Create temporary directory for processing with tempfile.TemporaryDirectory() as temp_dir: temp_data_dir = Path(temp_dir) / "data" temp_data_dir.mkdir(parents=True, exist_ok=True) - + # Set up paths for temporary files graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH" - graph_gp_path = graph_path / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" + graph_gp_path = ( + graph_path + / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" + ) graph_gp_path.mkdir(parents=True, exist_ok=True) - + # Run the build script with our parameters - cmd = [ - str(graph_partition_dir / "build.sh"), - "release", - "split_graph", - index_prefix_path - ] - + cmd = [str(graph_partition_dir / "build.sh"), "release", "split_graph", index_prefix_path] + # Set environment variables for parameters env = os.environ.copy() - env.update({ - "GP_TIMES": str(params["gp_times"]), - "GP_LOCK_NUMS": str(params["lock_nums"]), - "GP_CUT": str(params["cut"]), - "GP_SCALE_F": str(params["scale_factor"]), - "DATA_TYPE": params["data_type"], - "GP_T": str(params["thread_nums"]), - }) - + env.update( + { + "GP_TIMES": str(params["gp_times"]), + "GP_LOCK_NUMS": str(params["lock_nums"]), + "GP_CUT": str(params["cut"]), + "GP_SCALE_F": str(params["scale_factor"]), + "DATA_TYPE": params["data_type"], + "GP_T": str(params["thread_nums"]), + } + ) + print(f"Running graph partition with command: {' '.join(cmd)}") print(f"Working directory: {graph_partition_dir}") - + # Run the command result = subprocess.run( - cmd, - env=env, - capture_output=True, - text=True, - cwd=graph_partition_dir + cmd, env=env, capture_output=True, text=True, cwd=graph_partition_dir ) - + if result.returncode != 0: print(f"Command failed with return code {result.returncode}") print(f"stdout: {result.stdout}") @@ -110,21 +102,21 @@ def partition_graph_simple( f"stdout: {result.stdout}\n" f"stderr: {result.stderr}" ) - + # Check if output files were created disk_graph_path = Path(output_dir) / "_disk_graph.index" partition_bin_path = Path(output_dir) / "_partition.bin" - + if not disk_graph_path.exists(): raise RuntimeError(f"Expected output file not found: {disk_graph_path}") - + if not partition_bin_path.exists(): raise RuntimeError(f"Expected output file not found: {partition_bin_path}") - - print(f"✅ Partitioning completed successfully!") + + print("✅ Partitioning completed successfully!") print(f" Disk graph index: {disk_graph_path}") print(f" Partition binary: {partition_bin_path}") - + return str(disk_graph_path), str(partition_bin_path) @@ -135,10 +127,10 @@ if __name__ == "__main__": "/Users/yichuan/Desktop/release2/leann/diskannbuild/test_doc_files", gp_times=5, lock_nums=5, - cut=50 + cut=50, ) - print(f"Success! Output files:") + print("Success! Output files:") print(f" - {disk_graph_path}") print(f" - {partition_bin_path}") except Exception as e: - print(f"Error: {e}") \ No newline at end of file + print(f"Error: {e}")