* ci: add Mac Intel (x86_64) build support * fix: auto-detect Homebrew path for Intel vs Apple Silicon Macs This fixes the hardcoded /opt/homebrew path which only works on Apple Silicon Macs. Intel Macs use /usr/local as the Homebrew prefix. * fix: auto-detect Homebrew paths for both DiskANN and HNSW backends - Fix DiskANN CMakeLists.txt path reference - Add macOS environment variable detection for OpenMP_ROOT - Support both Intel (/usr/local) and Apple Silicon (/opt/homebrew) paths * fix: improve macOS build reliability with proper OpenMP path detection - Add proper CMAKE_PREFIX_PATH and OpenMP_ROOT detection for both Intel and Apple Silicon Macs - Set LDFLAGS and CPPFLAGS for all Homebrew packages to ensure CMake can find them - Apply CMAKE_ARGS to both HNSW and DiskANN backends for consistent builds - Fix hardcoded paths that caused build failures on Intel Macs (macos-13) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: add abseil library path for protobuf compilation on macOS - Include abseil in CMAKE_PREFIX_PATH for both Intel and Apple Silicon Macs - Add explicit absl_DIR CMake variable to help find abseil for protobuf - Fixes 'absl/log/absl_log.h' file not found error during compilation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: add abseil include path to CPPFLAGS for both Intel and Apple Silicon - Add -I/opt/homebrew/opt/abseil/include to CPPFLAGS for Apple Silicon - Add -I/usr/local/opt/abseil/include to CPPFLAGS for Intel - Fixes 'absl/log/absl_log.h' file not found by ensuring abseil headers are in compiler include path Root cause: CMAKE_PREFIX_PATH alone wasn't sufficient - compiler needs explicit -I flags 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: clean build system and Python 3.9 compatibility Build system improvements: - Simplify macOS environment detection using brew --prefix - Remove complex hardcoded paths and CMAKE_ARGS - Let CMake automatically find Homebrew packages via CMAKE_PREFIX_PATH - Clean separation between Intel (/usr/local) and Apple Silicon (/opt/homebrew) Python 3.9 compatibility: - Set ruff target-version to py39 to match project requirements - Replace str | None with Union[str, None] in type annotations - Add Union imports where needed - Fix core interface, CLI, chat, and embedding server files 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: type * fix: ensure CMAKE_PREFIX_PATH is passed to backend builds - Add CMAKE_ARGS with CMAKE_PREFIX_PATH and OpenMP_ROOT for both HNSW and DiskANN backends - This ensures CMake can find Homebrew packages on both Intel (/usr/local) and Apple Silicon (/opt/homebrew) - Fixes the issue where CMake was still looking for hardcoded paths instead of using detected ones 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: configure CMake paths in pyproject.toml for proper Homebrew detection - Add CMAKE_PREFIX_PATH and OpenMP_ROOT environment variable mapping in both backends - Remove CMAKE_ARGS from GitHub Actions workflow (cleaner separation) - Ensure scikit-build-core correctly uses environment variables for CMake configuration - This should fix the hardcoded /opt/homebrew paths on Intel Macs 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: remove hardcoded /opt/homebrew paths from DiskANN CMake - Auto-detect Homebrew libomp path using OpenMP_ROOT environment variable - Fallback to CMAKE_PREFIX_PATH/opt/libomp if OpenMP_ROOT not set - Final fallback to brew --prefix libomp for auto-detection - Maintains backwards compatibility with old hardcoded path - Fixes Intel Mac builds that were failing due to hardcoded Apple Silicon paths 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: update DiskANN submodule with macOS Intel/Apple Silicon compatibility fixes - Auto-detect Homebrew libomp path using OpenMP_ROOT environment variable - Exclude mkl_set_num_threads on macOS (uses Accelerate framework instead of MKL) - Fixes compilation on Intel Macs by using correct /usr/local paths 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: update DiskANN submodule with SIMD function name corrections - Fix _mm128_loadu_ps to _mm_loadu_ps (and similar functions) - This is a known issue in upstream DiskANN code where incorrect function names were used - Resolves compilation errors on macOS Intel builds References: Known DiskANN issue with SIMD intrinsics naming 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: update DiskANN submodule with type cast fix for signed char templates - Add missing type casts (float*)a and (float*)b in SSE2 version - This matches the existing type casts in the AVX version - Fixes compilation error when instantiating DistanceInnerProduct<int8_t> - Resolves "cannot initialize const float* with const signed char*" error 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: update Faiss submodule with override keyword fix - Add missing override keyword to IDSelectorModulo::is_member function - Fixes C++ compilation warning that was treated as error due to -Werror flag - Resolves "warning: 'is_member' overrides a member function but is not marked 'override'" - Improves code conformance to modern C++ best practices 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: update Faiss submodule with override keyword fix * fix: update DiskANN submodule with additional type cast fix - Add missing type cast in DistanceFastL2::norm function SSE2 version - Fixes const float* = const signed char* compilation error - Ensures consistent type casting across all SIMD code paths - Resolves template instantiation error for DistanceFastL2<int8_t> 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * debug: simplify wheel compatibility checking - Fix YAML syntax error in debug step - Use simpler approach to show platform tags and wheel names - This will help identify platform tag compatibility issues 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: use correct Python version for wheel builds - Replace --python python with --python ${{ matrix.python }} - This ensures wheels are built for the correct Python version in each matrix job - Fixes Python version mismatch where cp39 wheels were used in cp311 environments 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: resolve wheel installation conflicts in CI matrix builds Fix issue where multiple Python versions' wheels in the same dist directory caused installation conflicts during CI testing. The problem occurred when matrix builds for different Python versions accumulated wheels in shared directories, and uv pip install would find incompatible wheels. Changes: - Add Python version detection using matrix.python variable - Convert Python version to wheel tag format (e.g., 3.11 -> cp311) - Use find with version-specific pattern matching to select correct wheels - Add explicit error handling if no matching wheel is found This ensures each CI job installs only wheels compatible with its specific Python version, preventing "A path dependency is incompatible with the current platform" errors. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: ensure virtual environment uses correct Python version in CI Fix issue where uv venv was creating virtual environments with a different Python version than specified in the matrix, causing wheel compatibility errors. The problem occurred when the system had multiple Python versions and uv venv defaulted to a different version than intended. Changes: - Add --python ${{ matrix.python }} flag to uv venv command - Ensures virtual environment matches the matrix-specified Python version - Fixes "The wheel is compatible with CPython 3.X but you're using CPython 3.Y" errors This ensures wheel installation selects and installs the correctly built wheels that match the runtime Python version. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: complete Python 3.9 type annotation compatibility fixes Fix remaining Python 3.9 incompatible type annotations throughout the leann-core package that were causing test failures in CI. The union operator (|) syntax for type hints was introduced in Python 3.10 and causes "TypeError: unsupported operand type(s) for |" errors in Python 3.9. Changes: - Convert dict[str, Any] | None to Optional[dict[str, Any]] - Convert int | None to Optional[int] - Convert subprocess.Popen | None to Optional[subprocess.Popen] - Convert LeannBackendFactoryInterface | None to Optional[LeannBackendFactoryInterface] - Add missing Optional imports to all affected files This resolves all test failures related to type annotation syntax and ensures compatibility with Python 3.9 as specified in pyproject.toml. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: complete Python 3.9 type annotation fixes in backend packages Fix remaining Python 3.9 incompatible type annotations in backend packages that were causing test failures. The union operator (|) syntax for type hints was introduced in Python 3.10 and causes "TypeError: unsupported operand type(s) for |" errors in Python 3.9. Changes in leann-backend-diskann: - Convert zmq_port: int | None to Optional[int] in diskann_backend.py - Convert passages_file: str | None to Optional[str] in diskann_embedding_server.py - Add Optional imports to both files Changes in leann-backend-hnsw: - Convert zmq_port: int | None to Optional[int] in hnsw_backend.py - Add Optional import This resolves the final test failures related to type annotation syntax and ensures full Python 3.9 compatibility across all packages. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: remove Python 3.10+ zip strict parameter for Python 3.9 compatibility Remove the strict=False parameter from zip() call in api.py as it was introduced in Python 3.10 and causes "TypeError: zip() takes no keyword arguments" in Python 3.9. The strict parameter controls whether zip() raises an exception when the iterables have different lengths. Since we're not relying on this behavior and the code works correctly without it, removing it maintains the same functionality while ensuring Python 3.9 compatibility. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: ensure leann-core package is built on all platforms, not just Ubuntu This fixes the issue where CI was installing leann-core from PyPI instead of using locally built package with Python 3.9 compatibility fixes. * fix: build and install leann meta package on all platforms The leann meta package is pure Python and platform-independent, so there's no reason to restrict it to Ubuntu only. This ensures all platforms use consistent local builds instead of falling back to PyPI versions. * fix: restrict MLX dependencies to Apple Silicon Macs only MLX framework only supports Apple Silicon (ARM64) Macs, not Intel x86_64. Add platform_machine == 'arm64' condition to prevent installation failures on Intel Macs (macos-13). * cleanup: simplify CI configuration - Remove debug step with non-existent 'uv pip debug' command - Simplify wheel installation logic - let uv handle compatibility - Use -e .[test] instead of manually listing all test dependencies * fix: install backend wheels before meta packages Install backend wheels first to ensure they're available when core/meta packages are installed, preventing uv from trying to resolve backend dependencies from PyPI. * fix: use local leann-core when building backend packages Add --find-links to backend builds to ensure they use the locally built leann-core with fixed MLX dependencies instead of downloading from PyPI. Also bump leann-core version to 0.2.8 to ensure clean dependency resolution. * fix: use absolute path for find-links and upgrade backend version - Use GITHUB_WORKSPACE for absolute path to ensure find-links works - Upgrade leann-backend-hnsw to 0.2.8 to match leann-core version * fix: use absolute path for find-links and upgrade backend version - Use GITHUB_WORKSPACE for absolute path to ensure find-links works - Upgrade leann-backend-hnsw to 0.2.8 to match leann-core version * fix: correct version consistency for --find-links to work properly - All packages now use version 0.2.7 consistently - Backend packages can find exact leann-core==0.2.7 from local build - This ensures --find-links works during CI builds instead of falling back to PyPI 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: revert all packages to consistent version 0.2.7 - This PR should not bump versions, only fix Intel Mac build - Version bumps should be done in release_manual workflow - All packages now use 0.2.7 consistently for --find-links to work 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: use --find-links during package installation to avoid PyPI MLX conflicts - Backend wheels contain Requires-Dist: leann-core==0.2.7 - Without --find-links, uv resolves this from PyPI which has MLX for all Darwin - With --find-links, uv uses local leann-core with proper platform restrictions - Root cause: dependency resolution happens at install time, not just build time - Local test confirms this fixes Intel Mac MLX dependency issues 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: restrict MLX dependencies to ARM64 Macs in workspace pyproject.toml - Root pyproject.toml also had MLX dependencies without platform_machine restriction - This caused test dependency installation to fail on Intel Macs - Now consistent with packages/leann-core/pyproject.toml platform restrictions 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * chore: cleanup unused files and fix GitHub Actions warnings - Remove unused packages/leann-backend-diskann/CMakeLists.txt (DiskANN uses cmake.source-dir=third_party/DiskANN instead) - Replace macos-latest with macos-14 to avoid migration warnings (macos-latest will migrate to macOS 15 on August 4, 2025) - Keep packages/leann-backend-hnsw/CMakeLists.txt (needed for Faiss config) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: properly handle Python 3.13 support with PyTorch compatibility - Support Python 3.13 on most platforms (Ubuntu, ARM64 Mac) - Exclude Intel Mac + Python 3.13 combination due to PyTorch wheel availability - PyTorch <2.5 supports Intel Mac but not Python 3.13 - PyTorch 2.5+ supports Python 3.13 but not Intel Mac x86_64 - Document limitation in CI configuration comments - Update README badges with detailed Python version support and CI status 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
659 lines
25 KiB
Python
659 lines
25 KiB
Python
"""
|
|
This file contains the core API for the LEANN project, now definitively updated
|
|
with the correct, original embedding logic from the user's reference code.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import pickle
|
|
import time
|
|
import warnings
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Literal, Optional
|
|
|
|
import numpy as np
|
|
|
|
from leann.interface import LeannBackendSearcherInterface
|
|
|
|
from .chat import get_llm
|
|
from .interface import LeannBackendFactoryInterface
|
|
from .registry import BACKEND_REGISTRY
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_registered_backends() -> list[str]:
|
|
"""Get list of registered backend names."""
|
|
return list(BACKEND_REGISTRY.keys())
|
|
|
|
|
|
def compute_embeddings(
|
|
chunks: list[str],
|
|
model_name: str,
|
|
mode: str = "sentence-transformers",
|
|
use_server: bool = True,
|
|
port: Optional[int] = None,
|
|
is_build=False,
|
|
) -> np.ndarray:
|
|
"""
|
|
Computes embeddings using different backends.
|
|
|
|
Args:
|
|
chunks: List of text chunks to embed
|
|
model_name: Name of the embedding model
|
|
mode: Embedding backend mode. Options:
|
|
- "sentence-transformers": Use sentence-transformers library (default)
|
|
- "mlx": Use MLX backend for Apple Silicon
|
|
- "openai": Use OpenAI embedding API
|
|
use_server: Whether to use embedding server (True for search, False for build)
|
|
|
|
Returns:
|
|
numpy array of embeddings
|
|
"""
|
|
if use_server:
|
|
# Use embedding server (for search/query)
|
|
if port is None:
|
|
raise ValueError("port is required when use_server is True")
|
|
return compute_embeddings_via_server(chunks, model_name, port=port)
|
|
else:
|
|
# Use direct computation (for build_index)
|
|
from .embedding_compute import (
|
|
compute_embeddings as compute_embeddings_direct,
|
|
)
|
|
|
|
return compute_embeddings_direct(
|
|
chunks,
|
|
model_name,
|
|
mode=mode,
|
|
is_build=is_build,
|
|
)
|
|
|
|
|
|
def compute_embeddings_via_server(chunks: list[str], model_name: str, port: int) -> np.ndarray:
|
|
"""Computes embeddings using sentence-transformers.
|
|
|
|
Args:
|
|
chunks: List of text chunks to embed
|
|
model_name: Name of the sentence transformer model
|
|
"""
|
|
logger.info(
|
|
f"Computing embeddings for {len(chunks)} chunks using SentenceTransformer model '{model_name}' (via embedding server)..."
|
|
)
|
|
import msgpack
|
|
import numpy as np
|
|
import zmq
|
|
|
|
# Connect to embedding server
|
|
context = zmq.Context()
|
|
socket = context.socket(zmq.REQ)
|
|
socket.connect(f"tcp://localhost:{port}")
|
|
|
|
# Send chunks to server for embedding computation
|
|
request = chunks
|
|
socket.send(msgpack.packb(request))
|
|
|
|
# Receive embeddings from server
|
|
response = socket.recv()
|
|
embeddings_list = msgpack.unpackb(response)
|
|
|
|
# Convert back to numpy array
|
|
embeddings = np.array(embeddings_list, dtype=np.float32)
|
|
|
|
socket.close()
|
|
context.term()
|
|
|
|
return embeddings
|
|
|
|
|
|
@dataclass
|
|
class SearchResult:
|
|
id: str
|
|
score: float
|
|
text: str
|
|
metadata: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class PassageManager:
|
|
def __init__(self, passage_sources: list[dict[str, Any]]):
|
|
self.offset_maps = {}
|
|
self.passage_files = {}
|
|
self.global_offset_map = {} # Combined map for fast lookup
|
|
|
|
for source in passage_sources:
|
|
assert source["type"] == "jsonl", "only jsonl is supported"
|
|
passage_file = source["path"]
|
|
index_file = source["index_path"] # .idx file
|
|
|
|
# Fix path resolution for Colab and other environments
|
|
if not Path(index_file).is_absolute():
|
|
# If relative path, try to resolve it properly
|
|
index_file = str(Path(index_file).resolve())
|
|
|
|
if not Path(index_file).exists():
|
|
raise FileNotFoundError(f"Passage index file not found: {index_file}")
|
|
|
|
with open(index_file, "rb") as f:
|
|
offset_map = pickle.load(f)
|
|
self.offset_maps[passage_file] = offset_map
|
|
self.passage_files[passage_file] = passage_file
|
|
|
|
# Build global map for O(1) lookup
|
|
for passage_id, offset in offset_map.items():
|
|
self.global_offset_map[passage_id] = (passage_file, offset)
|
|
|
|
def get_passage(self, passage_id: str) -> dict[str, Any]:
|
|
if passage_id in self.global_offset_map:
|
|
passage_file, offset = self.global_offset_map[passage_id]
|
|
# Lazy file opening - only open when needed
|
|
with open(passage_file, encoding="utf-8") as f:
|
|
f.seek(offset)
|
|
return json.loads(f.readline())
|
|
raise KeyError(f"Passage ID not found: {passage_id}")
|
|
|
|
|
|
class LeannBuilder:
|
|
def __init__(
|
|
self,
|
|
backend_name: str,
|
|
embedding_model: str = "facebook/contriever",
|
|
dimensions: Optional[int] = None,
|
|
embedding_mode: str = "sentence-transformers",
|
|
**backend_kwargs,
|
|
):
|
|
self.backend_name = backend_name
|
|
backend_factory: Optional[LeannBackendFactoryInterface] = BACKEND_REGISTRY.get(backend_name)
|
|
if backend_factory is None:
|
|
raise ValueError(f"Backend '{backend_name}' not found or not registered.")
|
|
self.backend_factory = backend_factory
|
|
self.embedding_model = embedding_model
|
|
self.dimensions = dimensions
|
|
self.embedding_mode = embedding_mode
|
|
|
|
# Check if we need to use cosine distance for normalized embeddings
|
|
normalized_embeddings_models = {
|
|
# OpenAI models
|
|
("openai", "text-embedding-ada-002"),
|
|
("openai", "text-embedding-3-small"),
|
|
("openai", "text-embedding-3-large"),
|
|
# Voyage AI models
|
|
("voyage", "voyage-2"),
|
|
("voyage", "voyage-3"),
|
|
("voyage", "voyage-large-2"),
|
|
("voyage", "voyage-multilingual-2"),
|
|
("voyage", "voyage-code-2"),
|
|
# Cohere models
|
|
("cohere", "embed-english-v3.0"),
|
|
("cohere", "embed-multilingual-v3.0"),
|
|
("cohere", "embed-english-light-v3.0"),
|
|
("cohere", "embed-multilingual-light-v3.0"),
|
|
}
|
|
|
|
# Also check for patterns in model names
|
|
is_normalized = False
|
|
current_model_lower = embedding_model.lower()
|
|
current_mode_lower = embedding_mode.lower()
|
|
|
|
# Check exact matches
|
|
for mode, model in normalized_embeddings_models:
|
|
if (current_mode_lower == mode and current_model_lower == model) or (
|
|
mode in current_mode_lower and model in current_model_lower
|
|
):
|
|
is_normalized = True
|
|
break
|
|
|
|
# Check patterns
|
|
if not is_normalized:
|
|
# OpenAI patterns
|
|
if "openai" in current_mode_lower or "openai" in current_model_lower:
|
|
if any(
|
|
pattern in current_model_lower
|
|
for pattern in ["text-embedding", "ada", "3-small", "3-large"]
|
|
):
|
|
is_normalized = True
|
|
# Voyage patterns
|
|
elif "voyage" in current_mode_lower or "voyage" in current_model_lower:
|
|
is_normalized = True
|
|
# Cohere patterns
|
|
elif "cohere" in current_mode_lower or "cohere" in current_model_lower:
|
|
if "embed" in current_model_lower:
|
|
is_normalized = True
|
|
|
|
# Handle distance metric
|
|
if is_normalized and "distance_metric" not in backend_kwargs:
|
|
backend_kwargs["distance_metric"] = "cosine"
|
|
warnings.warn(
|
|
f"Detected normalized embeddings model '{embedding_model}' with mode '{embedding_mode}'. "
|
|
f"Automatically setting distance_metric='cosine' for optimal performance. "
|
|
f"Normalized embeddings (L2 norm = 1) should use cosine similarity instead of MIPS.",
|
|
UserWarning,
|
|
stacklevel=2,
|
|
)
|
|
elif is_normalized and backend_kwargs.get("distance_metric", "").lower() != "cosine":
|
|
current_metric = backend_kwargs.get("distance_metric", "mips")
|
|
warnings.warn(
|
|
f"Warning: Using '{current_metric}' distance metric with normalized embeddings model "
|
|
f"'{embedding_model}' may lead to suboptimal search results. "
|
|
f"Consider using 'cosine' distance metric for better performance.",
|
|
UserWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
self.backend_kwargs = backend_kwargs
|
|
self.chunks: list[dict[str, Any]] = []
|
|
|
|
def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None):
|
|
if metadata is None:
|
|
metadata = {}
|
|
passage_id = metadata.get("id", str(len(self.chunks)))
|
|
chunk_data = {"id": passage_id, "text": text, "metadata": metadata}
|
|
self.chunks.append(chunk_data)
|
|
|
|
def build_index(self, index_path: str):
|
|
if not self.chunks:
|
|
raise ValueError("No chunks added.")
|
|
if self.dimensions is None:
|
|
self.dimensions = len(
|
|
compute_embeddings(
|
|
["dummy"],
|
|
self.embedding_model,
|
|
self.embedding_mode,
|
|
use_server=False,
|
|
)[0]
|
|
)
|
|
path = Path(index_path)
|
|
index_dir = path.parent
|
|
index_name = path.name
|
|
index_dir.mkdir(parents=True, exist_ok=True)
|
|
passages_file = index_dir / f"{index_name}.passages.jsonl"
|
|
offset_file = index_dir / f"{index_name}.passages.idx"
|
|
offset_map = {}
|
|
with open(passages_file, "w", encoding="utf-8") as f:
|
|
try:
|
|
from tqdm import tqdm
|
|
|
|
chunk_iterator = tqdm(self.chunks, desc="Writing passages", unit="chunk")
|
|
except ImportError:
|
|
chunk_iterator = self.chunks
|
|
|
|
for chunk in chunk_iterator:
|
|
offset = f.tell()
|
|
json.dump(
|
|
{
|
|
"id": chunk["id"],
|
|
"text": chunk["text"],
|
|
"metadata": chunk["metadata"],
|
|
},
|
|
f,
|
|
ensure_ascii=False,
|
|
)
|
|
f.write("\n")
|
|
offset_map[chunk["id"]] = offset
|
|
with open(offset_file, "wb") as f:
|
|
pickle.dump(offset_map, f)
|
|
texts_to_embed = [c["text"] for c in self.chunks]
|
|
embeddings = compute_embeddings(
|
|
texts_to_embed,
|
|
self.embedding_model,
|
|
self.embedding_mode,
|
|
use_server=False,
|
|
is_build=True,
|
|
)
|
|
string_ids = [chunk["id"] for chunk in self.chunks]
|
|
current_backend_kwargs = {**self.backend_kwargs, "dimensions": self.dimensions}
|
|
builder_instance = self.backend_factory.builder(**current_backend_kwargs)
|
|
builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs)
|
|
leann_meta_path = index_dir / f"{index_name}.meta.json"
|
|
meta_data = {
|
|
"version": "1.0",
|
|
"backend_name": self.backend_name,
|
|
"embedding_model": self.embedding_model,
|
|
"dimensions": self.dimensions,
|
|
"backend_kwargs": self.backend_kwargs,
|
|
"embedding_mode": self.embedding_mode,
|
|
"passage_sources": [
|
|
{
|
|
"type": "jsonl",
|
|
"path": str(passages_file),
|
|
"index_path": str(offset_file),
|
|
}
|
|
],
|
|
}
|
|
|
|
# Add storage status flags for HNSW backend
|
|
if self.backend_name == "hnsw":
|
|
is_compact = self.backend_kwargs.get("is_compact", True)
|
|
is_recompute = self.backend_kwargs.get("is_recompute", True)
|
|
meta_data["is_compact"] = is_compact
|
|
meta_data["is_pruned"] = (
|
|
is_compact and is_recompute
|
|
) # Pruned only if compact and recompute
|
|
with open(leann_meta_path, "w", encoding="utf-8") as f:
|
|
json.dump(meta_data, f, indent=2)
|
|
|
|
def build_index_from_embeddings(self, index_path: str, embeddings_file: str):
|
|
"""
|
|
Build an index from pre-computed embeddings stored in a pickle file.
|
|
|
|
Args:
|
|
index_path: Path where the index will be saved
|
|
embeddings_file: Path to pickle file containing (ids, embeddings) tuple
|
|
"""
|
|
# Load pre-computed embeddings
|
|
with open(embeddings_file, "rb") as f:
|
|
data = pickle.load(f)
|
|
|
|
if not isinstance(data, tuple) or len(data) != 2:
|
|
raise ValueError(
|
|
f"Invalid embeddings file format. Expected tuple with 2 elements, got {type(data)}"
|
|
)
|
|
|
|
ids, embeddings = data
|
|
|
|
if not isinstance(embeddings, np.ndarray):
|
|
raise ValueError(f"Expected embeddings to be numpy array, got {type(embeddings)}")
|
|
|
|
if len(ids) != embeddings.shape[0]:
|
|
raise ValueError(
|
|
f"Mismatch between number of IDs ({len(ids)}) and embeddings ({embeddings.shape[0]})"
|
|
)
|
|
|
|
# Validate/set dimensions
|
|
embedding_dim = embeddings.shape[1]
|
|
if self.dimensions is None:
|
|
self.dimensions = embedding_dim
|
|
elif self.dimensions != embedding_dim:
|
|
raise ValueError(f"Dimension mismatch: expected {self.dimensions}, got {embedding_dim}")
|
|
|
|
logger.info(
|
|
f"Building index from precomputed embeddings: {len(ids)} items, {embedding_dim} dimensions"
|
|
)
|
|
|
|
# Ensure we have text data for each embedding
|
|
if len(self.chunks) != len(ids):
|
|
# If no text chunks provided, create placeholder text entries
|
|
if not self.chunks:
|
|
logger.info("No text chunks provided, creating placeholder entries...")
|
|
for id_val in ids:
|
|
self.add_text(
|
|
f"Document {id_val}",
|
|
metadata={"id": str(id_val), "from_embeddings": True},
|
|
)
|
|
else:
|
|
raise ValueError(
|
|
f"Number of text chunks ({len(self.chunks)}) doesn't match number of embeddings ({len(ids)})"
|
|
)
|
|
|
|
# Build file structure
|
|
path = Path(index_path)
|
|
index_dir = path.parent
|
|
index_name = path.name
|
|
index_dir.mkdir(parents=True, exist_ok=True)
|
|
passages_file = index_dir / f"{index_name}.passages.jsonl"
|
|
offset_file = index_dir / f"{index_name}.passages.idx"
|
|
|
|
# Write passages and create offset map
|
|
offset_map = {}
|
|
with open(passages_file, "w", encoding="utf-8") as f:
|
|
for chunk in self.chunks:
|
|
offset = f.tell()
|
|
json.dump(
|
|
{
|
|
"id": chunk["id"],
|
|
"text": chunk["text"],
|
|
"metadata": chunk["metadata"],
|
|
},
|
|
f,
|
|
ensure_ascii=False,
|
|
)
|
|
f.write("\n")
|
|
offset_map[chunk["id"]] = offset
|
|
|
|
with open(offset_file, "wb") as f:
|
|
pickle.dump(offset_map, f)
|
|
|
|
# Build the vector index using precomputed embeddings
|
|
string_ids = [str(id_val) for id_val in ids]
|
|
current_backend_kwargs = {**self.backend_kwargs, "dimensions": self.dimensions}
|
|
builder_instance = self.backend_factory.builder(**current_backend_kwargs)
|
|
builder_instance.build(embeddings, string_ids, index_path)
|
|
|
|
# Create metadata file
|
|
leann_meta_path = index_dir / f"{index_name}.meta.json"
|
|
meta_data = {
|
|
"version": "1.0",
|
|
"backend_name": self.backend_name,
|
|
"embedding_model": self.embedding_model,
|
|
"dimensions": self.dimensions,
|
|
"backend_kwargs": self.backend_kwargs,
|
|
"embedding_mode": self.embedding_mode,
|
|
"passage_sources": [
|
|
{
|
|
"type": "jsonl",
|
|
"path": str(passages_file),
|
|
"index_path": str(offset_file),
|
|
}
|
|
],
|
|
"built_from_precomputed_embeddings": True,
|
|
"embeddings_source": str(embeddings_file),
|
|
}
|
|
|
|
# Add storage status flags for HNSW backend
|
|
if self.backend_name == "hnsw":
|
|
is_compact = self.backend_kwargs.get("is_compact", True)
|
|
is_recompute = self.backend_kwargs.get("is_recompute", True)
|
|
meta_data["is_compact"] = is_compact
|
|
meta_data["is_pruned"] = is_compact and is_recompute
|
|
|
|
with open(leann_meta_path, "w", encoding="utf-8") as f:
|
|
json.dump(meta_data, f, indent=2)
|
|
|
|
logger.info(f"Index built successfully from precomputed embeddings: {index_path}")
|
|
|
|
|
|
class LeannSearcher:
|
|
def __init__(self, index_path: str, enable_warmup: bool = False, **backend_kwargs):
|
|
# Fix path resolution for Colab and other environments
|
|
if not Path(index_path).is_absolute():
|
|
index_path = str(Path(index_path).resolve())
|
|
|
|
self.meta_path_str = f"{index_path}.meta.json"
|
|
if not Path(self.meta_path_str).exists():
|
|
parent_dir = Path(index_path).parent
|
|
print(
|
|
f"Leann metadata file not found at {self.meta_path_str}, and you may need to rm -rf {parent_dir}"
|
|
)
|
|
# highlight in red the filenotfound error
|
|
raise FileNotFoundError(
|
|
f"Leann metadata file not found at {self.meta_path_str}, \033[91m you may need to rm -rf {parent_dir}\033[0m"
|
|
)
|
|
with open(self.meta_path_str, encoding="utf-8") as f:
|
|
self.meta_data = json.load(f)
|
|
backend_name = self.meta_data["backend_name"]
|
|
self.embedding_model = self.meta_data["embedding_model"]
|
|
# Support both old and new format
|
|
self.embedding_mode = self.meta_data.get("embedding_mode", "sentence-transformers")
|
|
self.passage_manager = PassageManager(self.meta_data.get("passage_sources", []))
|
|
backend_factory = BACKEND_REGISTRY.get(backend_name)
|
|
if backend_factory is None:
|
|
raise ValueError(f"Backend '{backend_name}' not found.")
|
|
final_kwargs = {**self.meta_data.get("backend_kwargs", {}), **backend_kwargs}
|
|
final_kwargs["enable_warmup"] = enable_warmup
|
|
self.backend_impl: LeannBackendSearcherInterface = backend_factory.searcher(
|
|
index_path, **final_kwargs
|
|
)
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
top_k: int = 5,
|
|
complexity: int = 64,
|
|
beam_width: int = 1,
|
|
prune_ratio: float = 0.0,
|
|
recompute_embeddings: bool = True,
|
|
pruning_strategy: Literal["global", "local", "proportional"] = "global",
|
|
expected_zmq_port: int = 5557,
|
|
**kwargs,
|
|
) -> list[SearchResult]:
|
|
logger.info("🔍 LeannSearcher.search() called:")
|
|
logger.info(f" Query: '{query}'")
|
|
logger.info(f" Top_k: {top_k}")
|
|
logger.info(f" Additional kwargs: {kwargs}")
|
|
|
|
# Smart top_k detection and adjustment
|
|
total_docs = len(self.passage_manager.global_offset_map)
|
|
original_top_k = top_k
|
|
if top_k > total_docs:
|
|
top_k = total_docs
|
|
logger.warning(
|
|
f" ⚠️ Requested top_k ({original_top_k}) exceeds total documents ({total_docs})"
|
|
)
|
|
logger.warning(f" ✅ Auto-adjusted top_k to {top_k} to match available documents")
|
|
|
|
zmq_port = None
|
|
|
|
start_time = time.time()
|
|
if recompute_embeddings:
|
|
zmq_port = self.backend_impl._ensure_server_running(
|
|
self.meta_path_str,
|
|
port=expected_zmq_port,
|
|
**kwargs,
|
|
)
|
|
del expected_zmq_port
|
|
zmq_time = time.time() - start_time
|
|
logger.info(f" Launching server time: {zmq_time} seconds")
|
|
|
|
start_time = time.time()
|
|
|
|
query_embedding = self.backend_impl.compute_query_embedding(
|
|
query,
|
|
use_server_if_available=recompute_embeddings,
|
|
zmq_port=zmq_port,
|
|
)
|
|
# logger.info(f" Generated embedding shape: {query_embedding.shape}")
|
|
time.time() - start_time
|
|
# logger.info(f" Embedding time: {embedding_time} seconds")
|
|
|
|
start_time = time.time()
|
|
results = self.backend_impl.search(
|
|
query_embedding,
|
|
top_k,
|
|
complexity=complexity,
|
|
beam_width=beam_width,
|
|
prune_ratio=prune_ratio,
|
|
recompute_embeddings=recompute_embeddings,
|
|
pruning_strategy=pruning_strategy,
|
|
zmq_port=zmq_port,
|
|
**kwargs,
|
|
)
|
|
time.time() - start_time
|
|
# logger.info(f" Search time: {search_time} seconds")
|
|
logger.info(f" Backend returned: labels={len(results.get('labels', [[]])[0])} results")
|
|
|
|
enriched_results = []
|
|
if "labels" in results and "distances" in results:
|
|
logger.info(f" Processing {len(results['labels'][0])} passage IDs:")
|
|
for i, (string_id, dist) in enumerate(
|
|
zip(results["labels"][0], results["distances"][0])
|
|
):
|
|
try:
|
|
passage_data = self.passage_manager.get_passage(string_id)
|
|
enriched_results.append(
|
|
SearchResult(
|
|
id=string_id,
|
|
score=dist,
|
|
text=passage_data["text"],
|
|
metadata=passage_data.get("metadata", {}),
|
|
)
|
|
)
|
|
|
|
# Color codes for better logging
|
|
GREEN = "\033[92m"
|
|
BLUE = "\033[94m"
|
|
YELLOW = "\033[93m"
|
|
RESET = "\033[0m"
|
|
|
|
# Truncate text for display (first 100 chars)
|
|
display_text = passage_data["text"]
|
|
logger.info(
|
|
f" {GREEN}✓{RESET} {BLUE}[{i + 1:2d}]{RESET} {YELLOW}ID:{RESET} '{string_id}' {YELLOW}Score:{RESET} {dist:.4f} {YELLOW}Text:{RESET} {display_text}"
|
|
)
|
|
except KeyError:
|
|
RED = "\033[91m"
|
|
logger.error(
|
|
f" {RED}✗{RESET} [{i + 1:2d}] ID: '{string_id}' -> {RED}ERROR: Passage not found!{RESET}"
|
|
)
|
|
|
|
logger.info(f" {GREEN}✓ Final enriched results: {len(enriched_results)} passages{RESET}")
|
|
return enriched_results
|
|
|
|
|
|
class LeannChat:
|
|
def __init__(
|
|
self,
|
|
index_path: str,
|
|
llm_config: Optional[dict[str, Any]] = None,
|
|
enable_warmup: bool = False,
|
|
**kwargs,
|
|
):
|
|
self.searcher = LeannSearcher(index_path, enable_warmup=enable_warmup, **kwargs)
|
|
self.llm = get_llm(llm_config)
|
|
|
|
def ask(
|
|
self,
|
|
question: str,
|
|
top_k: int = 5,
|
|
complexity: int = 64,
|
|
beam_width: int = 1,
|
|
prune_ratio: float = 0.0,
|
|
recompute_embeddings: bool = True,
|
|
pruning_strategy: Literal["global", "local", "proportional"] = "global",
|
|
llm_kwargs: Optional[dict[str, Any]] = None,
|
|
expected_zmq_port: int = 5557,
|
|
**search_kwargs,
|
|
):
|
|
if llm_kwargs is None:
|
|
llm_kwargs = {}
|
|
search_time = time.time()
|
|
results = self.searcher.search(
|
|
question,
|
|
top_k=top_k,
|
|
complexity=complexity,
|
|
beam_width=beam_width,
|
|
prune_ratio=prune_ratio,
|
|
recompute_embeddings=recompute_embeddings,
|
|
pruning_strategy=pruning_strategy,
|
|
expected_zmq_port=expected_zmq_port,
|
|
**search_kwargs,
|
|
)
|
|
search_time = time.time() - search_time
|
|
# logger.info(f" Search time: {search_time} seconds")
|
|
context = "\n\n".join([r.text for r in results])
|
|
prompt = (
|
|
"Here is some retrieved context that might help answer your question:\n\n"
|
|
f"{context}\n\n"
|
|
f"Question: {question}\n\n"
|
|
"Please provide the best answer you can based on this context and your knowledge."
|
|
)
|
|
|
|
ask_time = time.time()
|
|
ans = self.llm.ask(prompt, **llm_kwargs)
|
|
ask_time = time.time() - ask_time
|
|
logger.info(f" Ask time: {ask_time} seconds")
|
|
return ans
|
|
|
|
def start_interactive(self):
|
|
print("\nLeann Chat started (type 'quit' to exit)")
|
|
while True:
|
|
try:
|
|
user_input = input("You: ").strip()
|
|
if user_input.lower() in ["quit", "exit"]:
|
|
break
|
|
if not user_input:
|
|
continue
|
|
response = self.ask(user_input)
|
|
print(f"Leann: {response}")
|
|
except (KeyboardInterrupt, EOFError):
|
|
print("\nGoodbye!")
|
|
break
|