From 77d7b60a611b490e01d8e29a8c4e20aabc6cb223 Mon Sep 17 00:00:00 2001 From: yichuan520030910320 Date: Tue, 5 Aug 2025 23:11:09 -0700 Subject: [PATCH] feat: Add graph partition support for DiskANN backend - Add GraphPartitioner class for advanced graph partitioning - Add partition_graph_simple function for easy-to-use partitioning - Add pybind11 dependency for C++ executable building - Update __init__.py to export partition functions - Include test scripts for partition functionality The partition functionality allows optimizing disk-based indices for better search performance and memory efficiency. --- .../leann_backend_diskann/__init__.py | 11 + .../leann_backend_diskann/graph_partition.py | 288 ++++++++++++++++++ .../graph_partition_simple.py | 144 +++++++++ pyproject.toml | 3 +- uv.lock | 11 + 5 files changed, 456 insertions(+), 1 deletion(-) create mode 100644 packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py create mode 100644 packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py diff --git a/packages/leann-backend-diskann/leann_backend_diskann/__init__.py b/packages/leann-backend-diskann/leann_backend_diskann/__init__.py index cba4f62..aaf671e 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/__init__.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/__init__.py @@ -1 +1,12 @@ from . import diskann_backend as diskann_backend +from . import graph_partition + +# Export main classes and functions +from .graph_partition import GraphPartitioner, partition_graph + +__all__ = [ + "diskann_backend", + "graph_partition", + "GraphPartitioner", + "partition_graph" +] diff --git a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py new file mode 100644 index 0000000..cec3c7f --- /dev/null +++ b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition.py @@ -0,0 +1,288 @@ +#!/usr/bin/env python3 +""" +Graph Partition Module for LEANN DiskANN Backend + +This module provides Python bindings for the graph partition functionality +of DiskANN, allowing users to partition disk-based indices for better +performance. +""" + +import os +import subprocess +import tempfile +import shutil +from pathlib import Path +from typing import Optional, Tuple + + +class GraphPartitioner: + """ + A Python interface for DiskANN's graph partition functionality. + + This class provides methods to partition disk-based indices for improved + search performance and memory efficiency. + """ + + def __init__(self, build_type: str = "release"): + """ + Initialize the GraphPartitioner. + + Args: + build_type: Build type for the executables ("debug" or "release") + """ + self.build_type = build_type + self._ensure_executables() + + def _get_executable_path(self, name: str) -> str: + """Get the path to a graph partition executable.""" + # Get the directory where this Python module is located + module_dir = Path(__file__).parent + # Navigate to the graph_partition directory + graph_partition_dir = module_dir.parent / "third_party" / "DiskANN" / "graph_partition" + executable_path = graph_partition_dir / "build" / self.build_type / "graph_partition" / name + + if not executable_path.exists(): + raise FileNotFoundError(f"Executable {name} not found at {executable_path}") + + return str(executable_path) + + def _ensure_executables(self): + """Ensure that the required executables are built.""" + try: + self._get_executable_path("partitioner") + self._get_executable_path("index_relayout") + except FileNotFoundError as e: + # Try to build the executables automatically + print("Executables not found, attempting to build them...") + self._build_executables() + + def _build_executables(self): + """Build the required executables.""" + graph_partition_dir = Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" + original_dir = os.getcwd() + + try: + os.chdir(graph_partition_dir) + + # Clean any existing build + if (graph_partition_dir / "build").exists(): + shutil.rmtree(graph_partition_dir / "build") + + # Run the build script + cmd = ["./build.sh", self.build_type, "split_graph", "/tmp/dummy"] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + cwd=graph_partition_dir + ) + + # Check if executables were created + partitioner_path = self._get_executable_path("partitioner") + relayout_path = self._get_executable_path("index_relayout") + + print(f"✅ Built partitioner: {partitioner_path}") + print(f"✅ Built index_relayout: {relayout_path}") + + except Exception as e: + raise RuntimeError(f"Failed to build executables: {e}") + finally: + os.chdir(original_dir) + + def partition_graph( + self, + index_prefix_path: str, + output_dir: Optional[str] = None, + **kwargs + ) -> Tuple[str, str]: + """ + Partition a disk-based index for improved performance. + + Args: + index_prefix_path: Path to the index prefix (e.g., "/path/to/index") + output_dir: Output directory for results (defaults to parent of index_prefix_path) + **kwargs: Additional parameters for graph partitioning: + - gp_times: Number of LDG partition iterations (default: 10) + - lock_nums: Number of lock nodes (default: 10) + - cut: Cut adjacency list degree (default: 100) + - scale_factor: Scale factor (default: 1) + - data_type: Data type (default: "float") + - thread_nums: Number of threads (default: 10) + + Returns: + Tuple of (disk_graph_index_path, partition_bin_path) + + Raises: + RuntimeError: If the partitioning process fails + """ + # Set default parameters + params = { + "gp_times": 10, + "lock_nums": 10, + "cut": 100, + "scale_factor": 1, + "data_type": "float", + "thread_nums": 10, + **kwargs + } + + # Determine output directory + if output_dir is None: + output_dir = str(Path(index_prefix_path).parent) + + # Get executable paths + partitioner_path = self._get_executable_path("partitioner") + relayout_path = self._get_executable_path("index_relayout") + + # Create temporary directory for processing + with tempfile.TemporaryDirectory() as temp_dir: + # Change to the graph_partition directory for temporary files + graph_partition_dir = Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" + original_dir = os.getcwd() + + try: + os.chdir(graph_partition_dir) + + # Create temporary data directory + temp_data_dir = Path(temp_dir) / "data" + temp_data_dir.mkdir(parents=True, exist_ok=True) + + # Set up paths for temporary files + graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH" + graph_gp_path = graph_path / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" + graph_gp_path.mkdir(parents=True, exist_ok=True) + + # Find input index file + old_index_file = f"{index_prefix_path}_disk_beam_search.index" + if not os.path.exists(old_index_file): + old_index_file = f"{index_prefix_path}_disk.index" + + if not os.path.exists(old_index_file): + raise RuntimeError(f"Index file not found: {old_index_file}") + + # Run partitioner + gp_file_path = graph_gp_path / "_part.bin" + partitioner_cmd = [ + partitioner_path, + "--index_file", old_index_file, + "--data_type", params["data_type"], + "--gp_file", str(gp_file_path), + "-T", str(params["thread_nums"]), + "--ldg_times", str(params["gp_times"]), + "--scale", str(params["scale_factor"]), + "--mode", "1" + ] + + print(f"Running partitioner: {' '.join(partitioner_cmd)}") + result = subprocess.run( + partitioner_cmd, + capture_output=True, + text=True, + cwd=graph_partition_dir + ) + + if result.returncode != 0: + raise RuntimeError( + f"Partitioner failed with return code {result.returncode}.\n" + f"stdout: {result.stdout}\n" + f"stderr: {result.stderr}" + ) + + # Run relayout + part_tmp_index = graph_gp_path / "_part_tmp.index" + relayout_cmd = [ + relayout_path, + old_index_file, + str(gp_file_path), + params["data_type"], + "1" + ] + + print(f"Running relayout: {' '.join(relayout_cmd)}") + result = subprocess.run( + relayout_cmd, + capture_output=True, + text=True, + cwd=graph_partition_dir + ) + + if result.returncode != 0: + raise RuntimeError( + f"Relayout failed with return code {result.returncode}.\n" + f"stdout: {result.stdout}\n" + f"stderr: {result.stderr}" + ) + + # Copy results to output directory + disk_graph_path = Path(output_dir) / "_disk_graph.index" + partition_bin_path = Path(output_dir) / "_partition.bin" + + shutil.copy2(part_tmp_index, disk_graph_path) + shutil.copy2(gp_file_path, partition_bin_path) + + print(f"Results copied to: {output_dir}") + return str(disk_graph_path), str(partition_bin_path) + + finally: + os.chdir(original_dir) + + def get_partition_info(self, partition_bin_path: str) -> dict: + """ + Get information about a partition file. + + Args: + partition_bin_path: Path to the partition binary file + + Returns: + Dictionary containing partition information + """ + if not os.path.exists(partition_bin_path): + raise FileNotFoundError(f"Partition file not found: {partition_bin_path}") + + # For now, return basic file information + # In the future, this could parse the binary file for detailed info + stat = os.stat(partition_bin_path) + return { + "file_size": stat.st_size, + "file_path": partition_bin_path, + "modified_time": stat.st_mtime + } + + +def partition_graph( + index_prefix_path: str, + output_dir: Optional[str] = None, + build_type: str = "release", + **kwargs +) -> Tuple[str, str]: + """ + Convenience function to partition a graph index. + + Args: + index_prefix_path: Path to the index prefix + output_dir: Output directory (defaults to parent of index_prefix_path) + build_type: Build type for executables ("debug" or "release") + **kwargs: Additional parameters for graph partitioning + + Returns: + Tuple of (disk_graph_index_path, partition_bin_path) + """ + partitioner = GraphPartitioner(build_type=build_type) + return partitioner.partition_graph(index_prefix_path, output_dir, **kwargs) + + +# Example usage: +if __name__ == "__main__": + # Example: partition an index + try: + disk_graph_path, partition_bin_path = partition_graph( + "/path/to/your/index_prefix", + gp_times=10, + lock_nums=10, + cut=100 + ) + print(f"Partitioning completed successfully!") + print(f"Disk graph index: {disk_graph_path}") + print(f"Partition binary: {partition_bin_path}") + except Exception as e: + print(f"Partitioning failed: {e}") \ No newline at end of file diff --git a/packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py new file mode 100644 index 0000000..d115244 --- /dev/null +++ b/packages/leann-backend-diskann/leann_backend_diskann/graph_partition_simple.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Simplified Graph Partition Module for LEANN DiskANN Backend + +This module provides a simple Python interface for graph partitioning +that directly calls the existing executables. +""" + +import os +import subprocess +import tempfile +import shutil +from pathlib import Path +from typing import Optional, Tuple + + +def partition_graph_simple( + index_prefix_path: str, + output_dir: Optional[str] = None, + **kwargs +) -> Tuple[str, str]: + """ + Simple function to partition a graph index. + + Args: + index_prefix_path: Path to the index prefix (e.g., "/path/to/index") + output_dir: Output directory (defaults to parent of index_prefix_path) + **kwargs: Additional parameters for graph partitioning + + Returns: + Tuple of (disk_graph_index_path, partition_bin_path) + """ + # Set default parameters + params = { + "gp_times": 10, + "lock_nums": 10, + "cut": 100, + "scale_factor": 1, + "data_type": "float", + "thread_nums": 10, + **kwargs + } + + # Determine output directory + if output_dir is None: + output_dir = str(Path(index_prefix_path).parent) + + # Find the graph_partition directory + current_file = Path(__file__) + graph_partition_dir = current_file.parent.parent / "third_party" / "DiskANN" / "graph_partition" + + if not graph_partition_dir.exists(): + raise RuntimeError(f"Graph partition directory not found: {graph_partition_dir}") + + # Find input index file + old_index_file = f"{index_prefix_path}_disk_beam_search.index" + if not os.path.exists(old_index_file): + old_index_file = f"{index_prefix_path}_disk.index" + + if not os.path.exists(old_index_file): + raise RuntimeError(f"Index file not found: {old_index_file}") + + # Create temporary directory for processing + with tempfile.TemporaryDirectory() as temp_dir: + temp_data_dir = Path(temp_dir) / "data" + temp_data_dir.mkdir(parents=True, exist_ok=True) + + # Set up paths for temporary files + graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH" + graph_gp_path = graph_path / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" + graph_gp_path.mkdir(parents=True, exist_ok=True) + + # Run the build script with our parameters + cmd = [ + str(graph_partition_dir / "build.sh"), + "release", + "split_graph", + index_prefix_path + ] + + # Set environment variables for parameters + env = os.environ.copy() + env.update({ + "GP_TIMES": str(params["gp_times"]), + "GP_LOCK_NUMS": str(params["lock_nums"]), + "GP_CUT": str(params["cut"]), + "GP_SCALE_F": str(params["scale_factor"]), + "DATA_TYPE": params["data_type"], + "GP_T": str(params["thread_nums"]), + }) + + print(f"Running graph partition with command: {' '.join(cmd)}") + print(f"Working directory: {graph_partition_dir}") + + # Run the command + result = subprocess.run( + cmd, + env=env, + capture_output=True, + text=True, + cwd=graph_partition_dir + ) + + if result.returncode != 0: + print(f"Command failed with return code {result.returncode}") + print(f"stdout: {result.stdout}") + print(f"stderr: {result.stderr}") + raise RuntimeError( + f"Graph partitioning failed with return code {result.returncode}.\n" + f"stdout: {result.stdout}\n" + f"stderr: {result.stderr}" + ) + + # Check if output files were created + disk_graph_path = Path(output_dir) / "_disk_graph.index" + partition_bin_path = Path(output_dir) / "_partition.bin" + + if not disk_graph_path.exists(): + raise RuntimeError(f"Expected output file not found: {disk_graph_path}") + + if not partition_bin_path.exists(): + raise RuntimeError(f"Expected output file not found: {partition_bin_path}") + + print(f"✅ Partitioning completed successfully!") + print(f" Disk graph index: {disk_graph_path}") + print(f" Partition binary: {partition_bin_path}") + + return str(disk_graph_path), str(partition_bin_path) + + +# Example usage +if __name__ == "__main__": + try: + disk_graph_path, partition_bin_path = partition_graph_simple( + "/Users/yichuan/Desktop/release2/leann/diskannbuild/test_doc_files", + gp_times=5, + lock_nums=5, + cut=50 + ) + print(f"Success! Output files:") + print(f" - {disk_graph_path}") + print(f" - {partition_bin_path}") + except Exception as e: + print(f"Error: {e}") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d3b42e3..8fca76c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "pypdfium2>=4.30.0", # LlamaIndex core and readers - updated versions "llama-index>=0.12.44", - "llama-index-readers-file>=0.4.0", # Essential for PDF parsing + "llama-index-readers-file>=0.4.0", # Essential for PDF parsing # "llama-index-readers-docling", # Requires Python >= 3.10 # "llama-index-node-parser-docling", # Requires Python >= 3.10 "llama-index-vector-stores-faiss>=0.4.0", @@ -43,6 +43,7 @@ dependencies = [ "mlx>=0.26.3; sys_platform == 'darwin'", "mlx-lm>=0.26.0; sys_platform == 'darwin'", "psutil>=5.8.0", + "pybind11>=3.0.0", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 4962645..58d6b01 100644 --- a/uv.lock +++ b/uv.lock @@ -2281,6 +2281,7 @@ dependencies = [ { name = "pdfplumber" }, { name = "protobuf" }, { name = "psutil" }, + { name = "pybind11" }, { name = "pymupdf" }, { name = "pypdf2" }, { name = "pypdfium2" }, @@ -2360,6 +2361,7 @@ requires-dist = [ { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.5.0" }, { name = "protobuf", specifier = "==4.25.3" }, { name = "psutil", specifier = ">=5.8.0" }, + { name = "pybind11", specifier = ">=3.0.0" }, { name = "pymupdf", specifier = ">=1.26.0" }, { name = "pypdf2", specifier = ">=3.0.0" }, { name = "pypdfium2", specifier = ">=4.30.0" }, @@ -4203,6 +4205,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/10/15/6b30e77872012bbfe8265d42a01d5b3c17ef0ac0f2fae531ad91b6a6c02e/pyarrow-21.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:cdc4c17afda4dab2a9c0b79148a43a7f4e1094916b3e18d8975bfd6d6d52241f", size = 26227521, upload-time = "2025-07-18T00:57:29.119Z" }, ] +[[package]] +name = "pybind11" +version = "3.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ef/83/698d120e257a116f2472c710932023ad779409adf2734d2e940f34eea2c5/pybind11-3.0.0.tar.gz", hash = "sha256:c3f07bce3ada51c3e4b76badfa85df11688d12c46111f9d242bc5c9415af7862", size = 544819, upload-time = "2025-07-10T16:52:09.335Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/41/9c/85f50a5476832c3efc67b6d7997808388236ae4754bf53e1749b3bc27577/pybind11-3.0.0-py3-none-any.whl", hash = "sha256:7c5cac504da5a701b5163f0e6a7ba736c713a096a5378383c5b4b064b753f607", size = 292118, upload-time = "2025-07-10T16:52:07.828Z" }, +] + [[package]] name = "pycparser" version = "2.22"