This commit is contained in:
yichuan520030910320
2025-08-05 23:23:07 -07:00
parent a72090d2ab
commit 4a1353761a
3 changed files with 124 additions and 144 deletions

View File

@@ -4,9 +4,4 @@ from . import graph_partition
# Export main classes and functions # Export main classes and functions
from .graph_partition import GraphPartitioner, partition_graph from .graph_partition import GraphPartitioner, partition_graph
__all__ = [ __all__ = ["diskann_backend", "graph_partition", "GraphPartitioner", "partition_graph"]
"diskann_backend",
"graph_partition",
"GraphPartitioner",
"partition_graph"
]

View File

@@ -8,31 +8,30 @@ performance.
""" """
import os import os
import shutil
import subprocess import subprocess
import tempfile import tempfile
import shutil
from pathlib import Path from pathlib import Path
from typing import Optional, Tuple
class GraphPartitioner: class GraphPartitioner:
""" """
A Python interface for DiskANN's graph partition functionality. A Python interface for DiskANN's graph partition functionality.
This class provides methods to partition disk-based indices for improved This class provides methods to partition disk-based indices for improved
search performance and memory efficiency. search performance and memory efficiency.
""" """
def __init__(self, build_type: str = "release"): def __init__(self, build_type: str = "release"):
""" """
Initialize the GraphPartitioner. Initialize the GraphPartitioner.
Args: Args:
build_type: Build type for the executables ("debug" or "release") build_type: Build type for the executables ("debug" or "release")
""" """
self.build_type = build_type self.build_type = build_type
self._ensure_executables() self._ensure_executables()
def _get_executable_path(self, name: str) -> str: def _get_executable_path(self, name: str) -> str:
"""Get the path to a graph partition executable.""" """Get the path to a graph partition executable."""
# Get the directory where this Python module is located # Get the directory where this Python module is located
@@ -40,64 +39,58 @@ class GraphPartitioner:
# Navigate to the graph_partition directory # Navigate to the graph_partition directory
graph_partition_dir = module_dir.parent / "third_party" / "DiskANN" / "graph_partition" graph_partition_dir = module_dir.parent / "third_party" / "DiskANN" / "graph_partition"
executable_path = graph_partition_dir / "build" / self.build_type / "graph_partition" / name executable_path = graph_partition_dir / "build" / self.build_type / "graph_partition" / name
if not executable_path.exists(): if not executable_path.exists():
raise FileNotFoundError(f"Executable {name} not found at {executable_path}") raise FileNotFoundError(f"Executable {name} not found at {executable_path}")
return str(executable_path) return str(executable_path)
def _ensure_executables(self): def _ensure_executables(self):
"""Ensure that the required executables are built.""" """Ensure that the required executables are built."""
try: try:
self._get_executable_path("partitioner") self._get_executable_path("partitioner")
self._get_executable_path("index_relayout") self._get_executable_path("index_relayout")
except FileNotFoundError as e: except FileNotFoundError:
# Try to build the executables automatically # Try to build the executables automatically
print("Executables not found, attempting to build them...") print("Executables not found, attempting to build them...")
self._build_executables() self._build_executables()
def _build_executables(self): def _build_executables(self):
"""Build the required executables.""" """Build the required executables."""
graph_partition_dir = Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" graph_partition_dir = (
Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition"
)
original_dir = os.getcwd() original_dir = os.getcwd()
try: try:
os.chdir(graph_partition_dir) os.chdir(graph_partition_dir)
# Clean any existing build # Clean any existing build
if (graph_partition_dir / "build").exists(): if (graph_partition_dir / "build").exists():
shutil.rmtree(graph_partition_dir / "build") shutil.rmtree(graph_partition_dir / "build")
# Run the build script # Run the build script
cmd = ["./build.sh", self.build_type, "split_graph", "/tmp/dummy"] cmd = ["./build.sh", self.build_type, "split_graph", "/tmp/dummy"]
result = subprocess.run( subprocess.run(cmd, capture_output=True, text=True, cwd=graph_partition_dir)
cmd,
capture_output=True,
text=True,
cwd=graph_partition_dir
)
# Check if executables were created # Check if executables were created
partitioner_path = self._get_executable_path("partitioner") partitioner_path = self._get_executable_path("partitioner")
relayout_path = self._get_executable_path("index_relayout") relayout_path = self._get_executable_path("index_relayout")
print(f"✅ Built partitioner: {partitioner_path}") print(f"✅ Built partitioner: {partitioner_path}")
print(f"✅ Built index_relayout: {relayout_path}") print(f"✅ Built index_relayout: {relayout_path}")
except Exception as e: except Exception as e:
raise RuntimeError(f"Failed to build executables: {e}") raise RuntimeError(f"Failed to build executables: {e}")
finally: finally:
os.chdir(original_dir) os.chdir(original_dir)
def partition_graph( def partition_graph(
self, self, index_prefix_path: str, output_dir: str | None = None, **kwargs
index_prefix_path: str, ) -> tuple[str, str]:
output_dir: Optional[str] = None,
**kwargs
) -> Tuple[str, str]:
""" """
Partition a disk-based index for improved performance. Partition a disk-based index for improved performance.
Args: Args:
index_prefix_path: Path to the index prefix (e.g., "/path/to/index") index_prefix_path: Path to the index prefix (e.g., "/path/to/index")
output_dir: Output directory for results (defaults to parent of index_prefix_path) output_dir: Output directory for results (defaults to parent of index_prefix_path)
@@ -108,10 +101,10 @@ class GraphPartitioner:
- scale_factor: Scale factor (default: 1) - scale_factor: Scale factor (default: 1)
- data_type: Data type (default: "float") - data_type: Data type (default: "float")
- thread_nums: Number of threads (default: 10) - thread_nums: Number of threads (default: 10)
Returns: Returns:
Tuple of (disk_graph_index_path, partition_bin_path) Tuple of (disk_graph_index_path, partition_bin_path)
Raises: Raises:
RuntimeError: If the partitioning process fails RuntimeError: If the partitioning process fails
""" """
@@ -123,71 +116,80 @@ class GraphPartitioner:
"scale_factor": 1, "scale_factor": 1,
"data_type": "float", "data_type": "float",
"thread_nums": 10, "thread_nums": 10,
**kwargs **kwargs,
} }
# Determine output directory # Determine output directory
if output_dir is None: if output_dir is None:
output_dir = str(Path(index_prefix_path).parent) output_dir = str(Path(index_prefix_path).parent)
# Get executable paths # Get executable paths
partitioner_path = self._get_executable_path("partitioner") partitioner_path = self._get_executable_path("partitioner")
relayout_path = self._get_executable_path("index_relayout") relayout_path = self._get_executable_path("index_relayout")
# Create temporary directory for processing # Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
# Change to the graph_partition directory for temporary files # Change to the graph_partition directory for temporary files
graph_partition_dir = Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition" graph_partition_dir = (
Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition"
)
original_dir = os.getcwd() original_dir = os.getcwd()
try: try:
os.chdir(graph_partition_dir) os.chdir(graph_partition_dir)
# Create temporary data directory # Create temporary data directory
temp_data_dir = Path(temp_dir) / "data" temp_data_dir = Path(temp_dir) / "data"
temp_data_dir.mkdir(parents=True, exist_ok=True) temp_data_dir.mkdir(parents=True, exist_ok=True)
# Set up paths for temporary files # Set up paths for temporary files
graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH" graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH"
graph_gp_path = graph_path / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" graph_gp_path = (
graph_path
/ f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}"
)
graph_gp_path.mkdir(parents=True, exist_ok=True) graph_gp_path.mkdir(parents=True, exist_ok=True)
# Find input index file # Find input index file
old_index_file = f"{index_prefix_path}_disk_beam_search.index" old_index_file = f"{index_prefix_path}_disk_beam_search.index"
if not os.path.exists(old_index_file): if not os.path.exists(old_index_file):
old_index_file = f"{index_prefix_path}_disk.index" old_index_file = f"{index_prefix_path}_disk.index"
if not os.path.exists(old_index_file): if not os.path.exists(old_index_file):
raise RuntimeError(f"Index file not found: {old_index_file}") raise RuntimeError(f"Index file not found: {old_index_file}")
# Run partitioner # Run partitioner
gp_file_path = graph_gp_path / "_part.bin" gp_file_path = graph_gp_path / "_part.bin"
partitioner_cmd = [ partitioner_cmd = [
partitioner_path, partitioner_path,
"--index_file", old_index_file, "--index_file",
"--data_type", params["data_type"], old_index_file,
"--gp_file", str(gp_file_path), "--data_type",
"-T", str(params["thread_nums"]), params["data_type"],
"--ldg_times", str(params["gp_times"]), "--gp_file",
"--scale", str(params["scale_factor"]), str(gp_file_path),
"--mode", "1" "-T",
str(params["thread_nums"]),
"--ldg_times",
str(params["gp_times"]),
"--scale",
str(params["scale_factor"]),
"--mode",
"1",
] ]
print(f"Running partitioner: {' '.join(partitioner_cmd)}") print(f"Running partitioner: {' '.join(partitioner_cmd)}")
result = subprocess.run( result = subprocess.run(
partitioner_cmd, partitioner_cmd, capture_output=True, text=True, cwd=graph_partition_dir
capture_output=True,
text=True,
cwd=graph_partition_dir
) )
if result.returncode != 0: if result.returncode != 0:
raise RuntimeError( raise RuntimeError(
f"Partitioner failed with return code {result.returncode}.\n" f"Partitioner failed with return code {result.returncode}.\n"
f"stdout: {result.stdout}\n" f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}" f"stderr: {result.stderr}"
) )
# Run relayout # Run relayout
part_tmp_index = graph_gp_path / "_part_tmp.index" part_tmp_index = graph_gp_path / "_part_tmp.index"
relayout_cmd = [ relayout_cmd = [
@@ -195,75 +197,69 @@ class GraphPartitioner:
old_index_file, old_index_file,
str(gp_file_path), str(gp_file_path),
params["data_type"], params["data_type"],
"1" "1",
] ]
print(f"Running relayout: {' '.join(relayout_cmd)}") print(f"Running relayout: {' '.join(relayout_cmd)}")
result = subprocess.run( result = subprocess.run(
relayout_cmd, relayout_cmd, capture_output=True, text=True, cwd=graph_partition_dir
capture_output=True,
text=True,
cwd=graph_partition_dir
) )
if result.returncode != 0: if result.returncode != 0:
raise RuntimeError( raise RuntimeError(
f"Relayout failed with return code {result.returncode}.\n" f"Relayout failed with return code {result.returncode}.\n"
f"stdout: {result.stdout}\n" f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}" f"stderr: {result.stderr}"
) )
# Copy results to output directory # Copy results to output directory
disk_graph_path = Path(output_dir) / "_disk_graph.index" disk_graph_path = Path(output_dir) / "_disk_graph.index"
partition_bin_path = Path(output_dir) / "_partition.bin" partition_bin_path = Path(output_dir) / "_partition.bin"
shutil.copy2(part_tmp_index, disk_graph_path) shutil.copy2(part_tmp_index, disk_graph_path)
shutil.copy2(gp_file_path, partition_bin_path) shutil.copy2(gp_file_path, partition_bin_path)
print(f"Results copied to: {output_dir}") print(f"Results copied to: {output_dir}")
return str(disk_graph_path), str(partition_bin_path) return str(disk_graph_path), str(partition_bin_path)
finally: finally:
os.chdir(original_dir) os.chdir(original_dir)
def get_partition_info(self, partition_bin_path: str) -> dict: def get_partition_info(self, partition_bin_path: str) -> dict:
""" """
Get information about a partition file. Get information about a partition file.
Args: Args:
partition_bin_path: Path to the partition binary file partition_bin_path: Path to the partition binary file
Returns: Returns:
Dictionary containing partition information Dictionary containing partition information
""" """
if not os.path.exists(partition_bin_path): if not os.path.exists(partition_bin_path):
raise FileNotFoundError(f"Partition file not found: {partition_bin_path}") raise FileNotFoundError(f"Partition file not found: {partition_bin_path}")
# For now, return basic file information # For now, return basic file information
# In the future, this could parse the binary file for detailed info # In the future, this could parse the binary file for detailed info
stat = os.stat(partition_bin_path) stat = os.stat(partition_bin_path)
return { return {
"file_size": stat.st_size, "file_size": stat.st_size,
"file_path": partition_bin_path, "file_path": partition_bin_path,
"modified_time": stat.st_mtime "modified_time": stat.st_mtime,
} }
def partition_graph( def partition_graph(
index_prefix_path: str, index_prefix_path: str, output_dir: str | None = None, build_type: str = "release", **kwargs
output_dir: Optional[str] = None, ) -> tuple[str, str]:
build_type: str = "release",
**kwargs
) -> Tuple[str, str]:
""" """
Convenience function to partition a graph index. Convenience function to partition a graph index.
Args: Args:
index_prefix_path: Path to the index prefix index_prefix_path: Path to the index prefix
output_dir: Output directory (defaults to parent of index_prefix_path) output_dir: Output directory (defaults to parent of index_prefix_path)
build_type: Build type for executables ("debug" or "release") build_type: Build type for executables ("debug" or "release")
**kwargs: Additional parameters for graph partitioning **kwargs: Additional parameters for graph partitioning
Returns: Returns:
Tuple of (disk_graph_index_path, partition_bin_path) Tuple of (disk_graph_index_path, partition_bin_path)
""" """
@@ -276,13 +272,10 @@ if __name__ == "__main__":
# Example: partition an index # Example: partition an index
try: try:
disk_graph_path, partition_bin_path = partition_graph( disk_graph_path, partition_bin_path = partition_graph(
"/path/to/your/index_prefix", "/path/to/your/index_prefix", gp_times=10, lock_nums=10, cut=100
gp_times=10,
lock_nums=10,
cut=100
) )
print(f"Partitioning completed successfully!") print("Partitioning completed successfully!")
print(f"Disk graph index: {disk_graph_path}") print(f"Disk graph index: {disk_graph_path}")
print(f"Partition binary: {partition_bin_path}") print(f"Partition binary: {partition_bin_path}")
except Exception as e: except Exception as e:
print(f"Partitioning failed: {e}") print(f"Partitioning failed: {e}")

View File

@@ -9,24 +9,20 @@ that directly calls the existing executables.
import os import os
import subprocess import subprocess
import tempfile import tempfile
import shutil
from pathlib import Path from pathlib import Path
from typing import Optional, Tuple
def partition_graph_simple( def partition_graph_simple(
index_prefix_path: str, index_prefix_path: str, output_dir: str | None = None, **kwargs
output_dir: Optional[str] = None, ) -> tuple[str, str]:
**kwargs
) -> Tuple[str, str]:
""" """
Simple function to partition a graph index. Simple function to partition a graph index.
Args: Args:
index_prefix_path: Path to the index prefix (e.g., "/path/to/index") index_prefix_path: Path to the index prefix (e.g., "/path/to/index")
output_dir: Output directory (defaults to parent of index_prefix_path) output_dir: Output directory (defaults to parent of index_prefix_path)
**kwargs: Additional parameters for graph partitioning **kwargs: Additional parameters for graph partitioning
Returns: Returns:
Tuple of (disk_graph_index_path, partition_bin_path) Tuple of (disk_graph_index_path, partition_bin_path)
""" """
@@ -38,69 +34,65 @@ def partition_graph_simple(
"scale_factor": 1, "scale_factor": 1,
"data_type": "float", "data_type": "float",
"thread_nums": 10, "thread_nums": 10,
**kwargs **kwargs,
} }
# Determine output directory # Determine output directory
if output_dir is None: if output_dir is None:
output_dir = str(Path(index_prefix_path).parent) output_dir = str(Path(index_prefix_path).parent)
# Find the graph_partition directory # Find the graph_partition directory
current_file = Path(__file__) current_file = Path(__file__)
graph_partition_dir = current_file.parent.parent / "third_party" / "DiskANN" / "graph_partition" graph_partition_dir = current_file.parent.parent / "third_party" / "DiskANN" / "graph_partition"
if not graph_partition_dir.exists(): if not graph_partition_dir.exists():
raise RuntimeError(f"Graph partition directory not found: {graph_partition_dir}") raise RuntimeError(f"Graph partition directory not found: {graph_partition_dir}")
# Find input index file # Find input index file
old_index_file = f"{index_prefix_path}_disk_beam_search.index" old_index_file = f"{index_prefix_path}_disk_beam_search.index"
if not os.path.exists(old_index_file): if not os.path.exists(old_index_file):
old_index_file = f"{index_prefix_path}_disk.index" old_index_file = f"{index_prefix_path}_disk.index"
if not os.path.exists(old_index_file): if not os.path.exists(old_index_file):
raise RuntimeError(f"Index file not found: {old_index_file}") raise RuntimeError(f"Index file not found: {old_index_file}")
# Create temporary directory for processing # Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
temp_data_dir = Path(temp_dir) / "data" temp_data_dir = Path(temp_dir) / "data"
temp_data_dir.mkdir(parents=True, exist_ok=True) temp_data_dir.mkdir(parents=True, exist_ok=True)
# Set up paths for temporary files # Set up paths for temporary files
graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH" graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH"
graph_gp_path = graph_path / f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}" graph_gp_path = (
graph_path
/ f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}"
)
graph_gp_path.mkdir(parents=True, exist_ok=True) graph_gp_path.mkdir(parents=True, exist_ok=True)
# Run the build script with our parameters # Run the build script with our parameters
cmd = [ cmd = [str(graph_partition_dir / "build.sh"), "release", "split_graph", index_prefix_path]
str(graph_partition_dir / "build.sh"),
"release",
"split_graph",
index_prefix_path
]
# Set environment variables for parameters # Set environment variables for parameters
env = os.environ.copy() env = os.environ.copy()
env.update({ env.update(
"GP_TIMES": str(params["gp_times"]), {
"GP_LOCK_NUMS": str(params["lock_nums"]), "GP_TIMES": str(params["gp_times"]),
"GP_CUT": str(params["cut"]), "GP_LOCK_NUMS": str(params["lock_nums"]),
"GP_SCALE_F": str(params["scale_factor"]), "GP_CUT": str(params["cut"]),
"DATA_TYPE": params["data_type"], "GP_SCALE_F": str(params["scale_factor"]),
"GP_T": str(params["thread_nums"]), "DATA_TYPE": params["data_type"],
}) "GP_T": str(params["thread_nums"]),
}
)
print(f"Running graph partition with command: {' '.join(cmd)}") print(f"Running graph partition with command: {' '.join(cmd)}")
print(f"Working directory: {graph_partition_dir}") print(f"Working directory: {graph_partition_dir}")
# Run the command # Run the command
result = subprocess.run( result = subprocess.run(
cmd, cmd, env=env, capture_output=True, text=True, cwd=graph_partition_dir
env=env,
capture_output=True,
text=True,
cwd=graph_partition_dir
) )
if result.returncode != 0: if result.returncode != 0:
print(f"Command failed with return code {result.returncode}") print(f"Command failed with return code {result.returncode}")
print(f"stdout: {result.stdout}") print(f"stdout: {result.stdout}")
@@ -110,21 +102,21 @@ def partition_graph_simple(
f"stdout: {result.stdout}\n" f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}" f"stderr: {result.stderr}"
) )
# Check if output files were created # Check if output files were created
disk_graph_path = Path(output_dir) / "_disk_graph.index" disk_graph_path = Path(output_dir) / "_disk_graph.index"
partition_bin_path = Path(output_dir) / "_partition.bin" partition_bin_path = Path(output_dir) / "_partition.bin"
if not disk_graph_path.exists(): if not disk_graph_path.exists():
raise RuntimeError(f"Expected output file not found: {disk_graph_path}") raise RuntimeError(f"Expected output file not found: {disk_graph_path}")
if not partition_bin_path.exists(): if not partition_bin_path.exists():
raise RuntimeError(f"Expected output file not found: {partition_bin_path}") raise RuntimeError(f"Expected output file not found: {partition_bin_path}")
print(f"✅ Partitioning completed successfully!") print("✅ Partitioning completed successfully!")
print(f" Disk graph index: {disk_graph_path}") print(f" Disk graph index: {disk_graph_path}")
print(f" Partition binary: {partition_bin_path}") print(f" Partition binary: {partition_bin_path}")
return str(disk_graph_path), str(partition_bin_path) return str(disk_graph_path), str(partition_bin_path)
@@ -135,10 +127,10 @@ if __name__ == "__main__":
"/Users/yichuan/Desktop/release2/leann/diskannbuild/test_doc_files", "/Users/yichuan/Desktop/release2/leann/diskannbuild/test_doc_files",
gp_times=5, gp_times=5,
lock_nums=5, lock_nums=5,
cut=50 cut=50,
) )
print(f"Success! Output files:") print("Success! Output files:")
print(f" - {disk_graph_path}") print(f" - {disk_graph_path}")
print(f" - {partition_bin_path}") print(f" - {partition_bin_path}")
except Exception as e: except Exception as e:
print(f"Error: {e}") print(f"Error: {e}")