fix: diskann building and partitioning
This commit is contained in:
@@ -137,6 +137,71 @@ class DiskannBuilder(LeannBackendBuilderInterface):
|
|||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
self.build_params = kwargs
|
self.build_params = kwargs
|
||||||
|
|
||||||
|
def _safe_cleanup_after_partition(self, index_dir: Path, index_prefix: str):
|
||||||
|
"""
|
||||||
|
Safely cleanup files after partition.
|
||||||
|
In partition mode, C++ doesn't read _disk.index content,
|
||||||
|
so we can delete it if all derived files exist.
|
||||||
|
"""
|
||||||
|
disk_index_file = index_dir / f"{index_prefix}_disk.index"
|
||||||
|
beam_search_file = index_dir / f"{index_prefix}_disk_beam_search.index"
|
||||||
|
|
||||||
|
# Required files that C++ partition mode needs
|
||||||
|
# Note: C++ generates these with _disk.index suffix
|
||||||
|
disk_suffix = "_disk.index"
|
||||||
|
required_files = [
|
||||||
|
f"{index_prefix}{disk_suffix}_medoids.bin", # Critical: assert fails if missing
|
||||||
|
# Note: _centroids.bin is not created in single-shot build - C++ handles this automatically
|
||||||
|
f"{index_prefix}_pq_pivots.bin", # PQ table
|
||||||
|
f"{index_prefix}_pq_compressed.bin", # PQ compressed vectors
|
||||||
|
]
|
||||||
|
|
||||||
|
# Check if all required files exist
|
||||||
|
missing_files = []
|
||||||
|
for filename in required_files:
|
||||||
|
file_path = index_dir / filename
|
||||||
|
if not file_path.exists():
|
||||||
|
missing_files.append(filename)
|
||||||
|
|
||||||
|
if missing_files:
|
||||||
|
logger.warning(
|
||||||
|
f"Cannot safely delete _disk.index - missing required files: {missing_files}"
|
||||||
|
)
|
||||||
|
logger.info("Keeping all original files for safety")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Calculate space savings
|
||||||
|
space_saved = 0
|
||||||
|
files_to_delete = []
|
||||||
|
|
||||||
|
if disk_index_file.exists():
|
||||||
|
space_saved += disk_index_file.stat().st_size
|
||||||
|
files_to_delete.append(disk_index_file)
|
||||||
|
|
||||||
|
if beam_search_file.exists():
|
||||||
|
space_saved += beam_search_file.stat().st_size
|
||||||
|
files_to_delete.append(beam_search_file)
|
||||||
|
|
||||||
|
# Safe to delete!
|
||||||
|
for file_to_delete in files_to_delete:
|
||||||
|
try:
|
||||||
|
os.remove(file_to_delete)
|
||||||
|
logger.info(f"✅ Safely deleted: {file_to_delete.name}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to delete {file_to_delete.name}: {e}")
|
||||||
|
|
||||||
|
if space_saved > 0:
|
||||||
|
space_saved_mb = space_saved / (1024 * 1024)
|
||||||
|
logger.info(f"💾 Space saved: {space_saved_mb:.1f} MB")
|
||||||
|
|
||||||
|
# Show what files are kept
|
||||||
|
logger.info("📁 Kept essential files for partition mode:")
|
||||||
|
for filename in required_files:
|
||||||
|
file_path = index_dir / filename
|
||||||
|
if file_path.exists():
|
||||||
|
size_mb = file_path.stat().st_size / (1024 * 1024)
|
||||||
|
logger.info(f" - {filename} ({size_mb:.1f} MB)")
|
||||||
|
|
||||||
def build(self, data: np.ndarray, ids: list[str], index_path: str, **kwargs):
|
def build(self, data: np.ndarray, ids: list[str], index_path: str, **kwargs):
|
||||||
path = Path(index_path)
|
path = Path(index_path)
|
||||||
index_dir = path.parent
|
index_dir = path.parent
|
||||||
@@ -151,6 +216,17 @@ class DiskannBuilder(LeannBackendBuilderInterface):
|
|||||||
_write_vectors_to_bin(data, index_dir / data_filename)
|
_write_vectors_to_bin(data, index_dir / data_filename)
|
||||||
|
|
||||||
build_kwargs = {**self.build_params, **kwargs}
|
build_kwargs = {**self.build_params, **kwargs}
|
||||||
|
|
||||||
|
# Extract is_recompute from nested backend_kwargs if needed
|
||||||
|
is_recompute = build_kwargs.get("is_recompute", False)
|
||||||
|
if not is_recompute and "backend_kwargs" in build_kwargs:
|
||||||
|
is_recompute = build_kwargs["backend_kwargs"].get("is_recompute", False)
|
||||||
|
|
||||||
|
# Flatten all backend_kwargs parameters to top level for compatibility
|
||||||
|
if "backend_kwargs" in build_kwargs:
|
||||||
|
nested_params = build_kwargs.pop("backend_kwargs")
|
||||||
|
build_kwargs.update(nested_params)
|
||||||
|
|
||||||
metric_enum = _get_diskann_metrics().get(
|
metric_enum = _get_diskann_metrics().get(
|
||||||
build_kwargs.get("distance_metric", "mips").lower()
|
build_kwargs.get("distance_metric", "mips").lower()
|
||||||
)
|
)
|
||||||
@@ -185,6 +261,30 @@ class DiskannBuilder(LeannBackendBuilderInterface):
|
|||||||
build_kwargs.get("pq_disk_bytes", 0),
|
build_kwargs.get("pq_disk_bytes", 0),
|
||||||
"",
|
"",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Auto-partition if is_recompute is enabled
|
||||||
|
if build_kwargs.get("is_recompute", False):
|
||||||
|
logger.info("is_recompute=True, starting automatic graph partitioning...")
|
||||||
|
from .graph_partition import partition_graph
|
||||||
|
|
||||||
|
# Partition the index using absolute paths
|
||||||
|
# Convert to absolute paths to avoid issues with working directory changes
|
||||||
|
absolute_index_dir = Path(index_dir).resolve()
|
||||||
|
absolute_index_prefix_path = str(absolute_index_dir / index_prefix)
|
||||||
|
disk_graph_path, partition_bin_path = partition_graph(
|
||||||
|
index_prefix_path=absolute_index_prefix_path,
|
||||||
|
output_dir=str(absolute_index_dir),
|
||||||
|
partition_prefix=index_prefix,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Safe cleanup: In partition mode, C++ doesn't read _disk.index content
|
||||||
|
# but still needs the derived files (_medoids.bin, _centroids.bin, etc.)
|
||||||
|
self._safe_cleanup_after_partition(index_dir, index_prefix)
|
||||||
|
|
||||||
|
logger.info("✅ Graph partitioning completed successfully!")
|
||||||
|
logger.info(f" - Disk graph: {disk_graph_path}")
|
||||||
|
logger.info(f" - Partition file: {partition_bin_path}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
temp_data_file = index_dir / data_filename
|
temp_data_file = index_dir / data_filename
|
||||||
if temp_data_file.exists():
|
if temp_data_file.exists():
|
||||||
@@ -213,7 +313,26 @@ class DiskannSearcher(BaseSearcher):
|
|||||||
|
|
||||||
# For DiskANN, we need to reinitialize the index when zmq_port changes
|
# For DiskANN, we need to reinitialize the index when zmq_port changes
|
||||||
# Store the initialization parameters for later use
|
# Store the initialization parameters for later use
|
||||||
full_index_prefix = str(self.index_dir / self.index_path.stem)
|
# Note: C++ load method expects the BASE path (without _disk.index suffix)
|
||||||
|
# C++ internally constructs: index_prefix + "_disk.index"
|
||||||
|
index_name = self.index_path.stem # "simple_test.leann" -> "simple_test"
|
||||||
|
diskann_index_prefix = str(self.index_dir / index_name) # /path/to/simple_test
|
||||||
|
full_index_prefix = diskann_index_prefix # /path/to/simple_test (base path)
|
||||||
|
|
||||||
|
# Auto-detect partition files and set partition_prefix
|
||||||
|
partition_graph_file = self.index_dir / f"{index_name}_disk_graph.index"
|
||||||
|
partition_bin_file = self.index_dir / f"{index_name}_partition.bin"
|
||||||
|
|
||||||
|
partition_prefix = ""
|
||||||
|
if partition_graph_file.exists() and partition_bin_file.exists():
|
||||||
|
# C++ expects full path prefix, not just filename
|
||||||
|
partition_prefix = str(self.index_dir / index_name) # /path/to/simple_test
|
||||||
|
logger.info(
|
||||||
|
f"✅ Detected partition files, using partition_prefix='{partition_prefix}'"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.debug("No partition files detected, using standard index files")
|
||||||
|
|
||||||
self._init_params = {
|
self._init_params = {
|
||||||
"metric_enum": metric_enum,
|
"metric_enum": metric_enum,
|
||||||
"full_index_prefix": full_index_prefix,
|
"full_index_prefix": full_index_prefix,
|
||||||
@@ -221,8 +340,14 @@ class DiskannSearcher(BaseSearcher):
|
|||||||
"num_nodes_to_cache": kwargs.get("num_nodes_to_cache", 0),
|
"num_nodes_to_cache": kwargs.get("num_nodes_to_cache", 0),
|
||||||
"cache_mechanism": 1,
|
"cache_mechanism": 1,
|
||||||
"pq_prefix": "",
|
"pq_prefix": "",
|
||||||
"partition_prefix": "",
|
"partition_prefix": partition_prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Log partition configuration for debugging
|
||||||
|
if partition_prefix:
|
||||||
|
logger.info(
|
||||||
|
f"✅ Detected partition files, using partition_prefix='{partition_prefix}'"
|
||||||
|
)
|
||||||
self._diskannpy = diskannpy
|
self._diskannpy = diskannpy
|
||||||
self._current_zmq_port = None
|
self._current_zmq_port = None
|
||||||
self._index = None
|
self._index = None
|
||||||
|
|||||||
@@ -86,7 +86,11 @@ class GraphPartitioner:
|
|||||||
os.chdir(original_dir)
|
os.chdir(original_dir)
|
||||||
|
|
||||||
def partition_graph(
|
def partition_graph(
|
||||||
self, index_prefix_path: str, output_dir: str | None = None, **kwargs
|
self,
|
||||||
|
index_prefix_path: str,
|
||||||
|
output_dir: str | None = None,
|
||||||
|
partition_prefix: str | None = None,
|
||||||
|
**kwargs,
|
||||||
) -> tuple[str, str]:
|
) -> tuple[str, str]:
|
||||||
"""
|
"""
|
||||||
Partition a disk-based index for improved performance.
|
Partition a disk-based index for improved performance.
|
||||||
@@ -94,6 +98,7 @@ class GraphPartitioner:
|
|||||||
Args:
|
Args:
|
||||||
index_prefix_path: Path to the index prefix (e.g., "/path/to/index")
|
index_prefix_path: Path to the index prefix (e.g., "/path/to/index")
|
||||||
output_dir: Output directory for results (defaults to parent of index_prefix_path)
|
output_dir: Output directory for results (defaults to parent of index_prefix_path)
|
||||||
|
partition_prefix: Prefix for output files (defaults to basename of index_prefix_path)
|
||||||
**kwargs: Additional parameters for graph partitioning:
|
**kwargs: Additional parameters for graph partitioning:
|
||||||
- gp_times: Number of LDG partition iterations (default: 10)
|
- gp_times: Number of LDG partition iterations (default: 10)
|
||||||
- lock_nums: Number of lock nodes (default: 10)
|
- lock_nums: Number of lock nodes (default: 10)
|
||||||
@@ -126,6 +131,10 @@ class GraphPartitioner:
|
|||||||
# Create output directory if it doesn't exist
|
# Create output directory if it doesn't exist
|
||||||
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Determine partition prefix
|
||||||
|
if partition_prefix is None:
|
||||||
|
partition_prefix = Path(index_prefix_path).name
|
||||||
|
|
||||||
# Get executable paths
|
# Get executable paths
|
||||||
partitioner_path = self._get_executable_path("partitioner")
|
partitioner_path = self._get_executable_path("partitioner")
|
||||||
relayout_path = self._get_executable_path("index_relayout")
|
relayout_path = self._get_executable_path("index_relayout")
|
||||||
@@ -216,8 +225,8 @@ class GraphPartitioner:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Copy results to output directory
|
# Copy results to output directory
|
||||||
disk_graph_path = Path(output_dir) / "_disk_graph.index"
|
disk_graph_path = Path(output_dir) / f"{partition_prefix}_disk_graph.index"
|
||||||
partition_bin_path = Path(output_dir) / "_partition.bin"
|
partition_bin_path = Path(output_dir) / f"{partition_prefix}_partition.bin"
|
||||||
|
|
||||||
shutil.copy2(part_tmp_index, disk_graph_path)
|
shutil.copy2(part_tmp_index, disk_graph_path)
|
||||||
shutil.copy2(gp_file_path, partition_bin_path)
|
shutil.copy2(gp_file_path, partition_bin_path)
|
||||||
@@ -252,7 +261,11 @@ class GraphPartitioner:
|
|||||||
|
|
||||||
|
|
||||||
def partition_graph(
|
def partition_graph(
|
||||||
index_prefix_path: str, output_dir: str | None = None, build_type: str = "release", **kwargs
|
index_prefix_path: str,
|
||||||
|
output_dir: str | None = None,
|
||||||
|
partition_prefix: str | None = None,
|
||||||
|
build_type: str = "release",
|
||||||
|
**kwargs,
|
||||||
) -> tuple[str, str]:
|
) -> tuple[str, str]:
|
||||||
"""
|
"""
|
||||||
Convenience function to partition a graph index.
|
Convenience function to partition a graph index.
|
||||||
@@ -260,6 +273,7 @@ def partition_graph(
|
|||||||
Args:
|
Args:
|
||||||
index_prefix_path: Path to the index prefix
|
index_prefix_path: Path to the index prefix
|
||||||
output_dir: Output directory (defaults to parent of index_prefix_path)
|
output_dir: Output directory (defaults to parent of index_prefix_path)
|
||||||
|
partition_prefix: Prefix for output files (defaults to basename of index_prefix_path)
|
||||||
build_type: Build type for executables ("debug" or "release")
|
build_type: Build type for executables ("debug" or "release")
|
||||||
**kwargs: Additional parameters for graph partitioning
|
**kwargs: Additional parameters for graph partitioning
|
||||||
|
|
||||||
@@ -267,7 +281,7 @@ def partition_graph(
|
|||||||
Tuple of (disk_graph_index_path, partition_bin_path)
|
Tuple of (disk_graph_index_path, partition_bin_path)
|
||||||
"""
|
"""
|
||||||
partitioner = GraphPartitioner(build_type=build_type)
|
partitioner = GraphPartitioner(build_type=build_type)
|
||||||
return partitioner.partition_graph(index_prefix_path, output_dir, **kwargs)
|
return partitioner.partition_graph(index_prefix_path, output_dir, partition_prefix, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
# Example usage:
|
# Example usage:
|
||||||
|
|||||||
Submodule packages/leann-backend-diskann/third_party/DiskANN updated: b2dc4ea2c7...c73bcec98c
Reference in New Issue
Block a user