Compare commits

..

12 Commits

Author SHA1 Message Date
yichuan520030910320
cd343e1d7a style: apply ruff format 2025-08-15 17:44:50 -07:00
yichuan520030910320
ef980d70b3 [MCP]update MCP of claude code 2025-08-15 14:29:59 -07:00
Andy Lee
db3c63c441 Docs/Core: Low-Resource Setups, SkyPilot Option, and No-Recompute (#45)
* docs: add SkyPilot template and instructions for running embeddings/index build on cloud GPU

* docs: add low-resource note in README; point to config guide; suggest OpenAI embeddings, SkyPilot remote build, and --no-recompute

* docs: consolidate low-resource guidance into config guide; README points to it

* cli: add --no-recompute and --no-recompute-embeddings flags; docs: clarify HNSW requires --no-compact when disabling recompute

* docs: dedupe recomputation guidance; keep single Low-resource setups section

* sky: expand leann-build.yaml with configurable params and flags (backend, recompute, compact, embedding options)

* hnsw: auto-disable compact when --no-recompute is used; docs: expand SkyPilot with -e overrides and copy-back example

* docs+sky: simplify SkyPilot flow (auto-build on launch, rsync copy-back); clarify HNSW auto non-compact when no-recompute

* feat: auto compact for hnsw when recompute

* reader: non-destructive portability (relative hints + fallback); fix comments; sky: refine yaml

* cli: unify flags to --recompute/--no-recompute for build/search/ask; docs: update references

* chore: remove

* hnsw: move pruned/no-recompute assertion into backend; api: drop global assertion; docs: will adjust after benchmarking

* cli: use argparse.BooleanOptionalAction for paired flags (--recompute/--compact) across build/search/ask

* docs: a real example on recompute

* benchmarks: fix and extend HNSW+DiskANN recompute vs no-recompute; docs: add fresh numbers and DiskANN notes

* benchmarks: unify HNSW & DiskANN into one clean script; isolate groups, fixed ports, warm-up, param complexity

* docs: diskann recompute

* core: auto-cleanup for LeannSearcher/LeannChat (__enter__/__exit__/__del__); ensure server terminate/kill robustness; benchmarks: use searcher.cleanup(); docs: suggest uv run

* fix: hang on warnings

* docs: boolean flags

* docs: leann help
2025-08-15 12:03:19 -07:00
yichuan520030910320
00eeadb9dd upd pkg 2025-08-14 14:39:45 -07:00
yichuan520030910320
42c8370709 add chunk size in leann build& fix batch size in oai& docs 2025-08-14 13:14:14 -07:00
Andy Lee
fafdf8fcbe feat(core,diskann): robust embedding server (no-hang) + DiskANN fast mode (graph partition) (#29)
* feat: Add graph partition support for DiskANN backend

- Add GraphPartitioner class for advanced graph partitioning
- Add partition_graph_simple function for easy-to-use partitioning
- Add pybind11 dependency for C++ executable building
- Update __init__.py to export partition functions
- Include test scripts for partition functionality

The partition functionality allows optimizing disk-based indices
for better search performance and memory efficiency.

* chore: Update DiskANN submodule to latest with graph partition tools

- Update DiskANN submodule to commit b2dc4ea
- Includes graph partition tools and CMake integration
- Enables graph partitioning functionality in DiskANN backend

* merge

* ruff

* add a path related fix

* fix: always use relative path in metadata

* docs: tool cli install

* chore: more data

* fix: diskann building and partitioning

* tests: diskann and partition

* docs: highlight diskann readiness and add performance comparison

* docs: add ldg-times parameter for diskann graph locality optimization

* fix: update pre-commit ruff version and format compliance

* fix: format test files with latest ruff version for CI compatibility

* fix: pin ruff version to 0.12.7 across all environments

- Pin ruff==0.12.7 in pyproject.toml dev dependencies
- Update CI to use exact ruff version instead of latest
- Add comments explaining version pinning rationale
- Ensures consistent formatting across local, CI, and pre-commit

* fix: use uv tool install for ruff instead of uv pip install

- uv tool install is the correct way to install CLI tools like ruff
- uv pip install --system is for Python packages, not tools

* debug: add detailed logging for CI path resolution debugging

- Add logging in DiskANN embedding server to show metadata_file_path
- Add debug logging in PassageManager to trace path resolution
- This will help identify why CI fails to find passage files

* fix: force install local wheels in CI to prevent PyPI version conflicts

- Change from --find-links to direct wheel installation with --force-reinstall
- This ensures CI uses locally built packages with latest source code
- Prevents uv from using PyPI packages with same version number but old code
- Fixes CI test failures where old code (without metadata_file_path) was used

Root cause: CI was installing leann-backend-diskann v0.2.1 from PyPI
instead of the locally built wheel with same version number.

* debug: add more CI diagnostics for DiskANN module import issue

- Check wheel contents before and after auditwheel repair
- Verify _diskannpy module installation after pip install
- List installed package directory structure
- Add explicit platform tag for auditwheel repair

This helps diagnose why ImportError: cannot import name '_diskannpy' occurs

* fix: remove invalid --plat argument from auditwheel repair

- Remove '--plat linux_x86_64' which is not a valid platform tag
- Let auditwheel automatically determine the correct platform
- Based on CI output, it will use manylinux_2_35_x86_64

This was causing auditwheel repair to fail, preventing proper wheel repair

* fix: ensure CI installs correct Python version wheel packages

- Use --find-links with --no-index to let uv select correct wheel
- Prevents installing wrong Python version wheel (e.g., cp310 for Python 3.11)
- Fixes ImportError: _diskannpy.cpython-310-x86_64-linux-gnu.so in Python 3.11

The issue was that *.whl glob matched all Python versions, causing
uv to potentially install a cp310 wheel in a Python 3.11 environment.

* fix: ensure venv uses correct Python version from matrix

- Explicitly specify Python version when creating venv with uv
- Prevents mismatch between build Python (e.g., 3.10) and test Python
- Fixes: _diskannpy.cpython-310-x86_64-linux-gnu.so in Python 3.11 error

The issue: uv venv was defaulting to Python 3.11 regardless of matrix version

* fix: resolve dependency issues in CI package installation

- Ubuntu: Install all packages from local builds with --no-index
- macOS: Install core packages from PyPI, backends from local builds
- Remove --no-index for macOS backend installation to allow dependency resolution
- Pin versions when installing from PyPI to ensure consistency

Fixes error: 'leann-core was not found in the provided package locations'

* fix: Python 3.9 compatibility - replace Union type syntax

- Replace 'int | None' with 'Optional[int]' everywhere
- Replace 'subprocess.Popen | None' with 'Optional[subprocess.Popen]'
- Add Optional import to all affected files
- Update ruff target-version from py310 to py39
- The '|' syntax for Union types was introduced in Python 3.10 (PEP 604)

Fixes TypeError: unsupported operand type(s) for |: 'type' and 'NoneType'

* ci: build all packages on all platforms; install from local wheels only

- Build leann-core and leann on macOS too
- Install all packages via --find-links and --no-index across platforms
- Lower macOS MACOSX_DEPLOYMENT_TARGET to 12.0 for wider compatibility

This ensures consistency and avoids PyPI drift while improving macOS compatibility.

* ci: allow resolving third-party deps from index; still prefer local wheels for our packages

- Remove --no-index so numpy/scipy/etc can be resolved on Python 3.13
- Keep --find-links to force our packages from local dist

Fixes: dependency resolution failure on Ubuntu Python 3.13 (numpy missing)

* ci(macOS): set MACOSX_DEPLOYMENT_TARGET back to 13.3

- Fix build failure: 'sgesdd_' only available on macOS 13.3+
- Keep other CI improvements (local builds, find-links installs)

* fix(py39): replace union type syntax in chat.py

- validate_model_and_suggest: str | None -> Optional[str]
- OpenAIChat.__init__: api_key: str | None -> Optional[str]
- get_llm: dict[str, Any] | None -> Optional[dict[str, Any]]

Ensures Python 3.9 compatibility for CI macOS 3.9.

* style: organize imports per ruff; finish py39 Optional changes

- Fix import ordering in embedding servers and graph_partition_simple
- Remove duplicate Optional import
- Complete Optional[...] replacements

* fix(py39): replace remaining '| None' in diskann graph_partition (module-level function)

* fix(py39): remove zip(strict=...) usage in api; Python 3.9 compatibility

* style: organize imports; fix process-group stop for embedding server

* chore: keep embedding server stdout/stderr visible; still use new session and pg-kill on stop

* fix: add timeout to final wait() in stop_server to prevent infinite hang

* fix: prevent hang in CI by flushing print statements and redirecting embedding server output

- Add flush=True to all print statements in convert_to_csr.py to prevent buffer deadlock
- Redirect embedding server stdout/stderr to DEVNULL in CI environment (CI=true)
- Fix timeout in embedding_server_manager.stop_server() final wait call

* fix: resolve CI hanging by removing problematic wait() in stop_server

* fix: remove hardcoded paths from MCP server and documentation

* feat: add CI timeout protection for tests

* fix: skip OpenAI test in CI to avoid failures and API costs

- Add CI skip for test_document_rag_openai
- Test was failing because it incorrectly used --llm simulated which isn't supported by document_rag.py

* feat: add simulated LLM option to document_rag.py

- Add 'simulated' to the LLM choices in base_rag_example.py
- Handle simulated case in get_llm_config() method
- This allows tests to use --llm simulated to avoid API costs

* feat: add comprehensive debugging capabilities with tmate integration

1. Tmate SSH Debugging:
   - Added manual workflow_dispatch trigger with debug_enabled option
   - Integrated mxschmitt/action-tmate@v3 for SSH access to CI runner
   - Can be triggered manually or by adding [debug] to commit message
   - Detached mode with 30min timeout, limited to actor only
   - Also triggers on test failure when debug is enabled

2. Enhanced Pytest Output:
   - Added --capture=no to see real-time output
   - Added --log-cli-level=DEBUG for maximum verbosity
   - Added --tb=short for cleaner tracebacks
   - Pipe output to tee for both display and logging
   - Show last 20 lines of output on completion

3. Environment Diagnostics:
   - Export PYTHONUNBUFFERED=1 for immediate output
   - Show Python/Pytest versions at start
   - Display relevant environment variables
   - Check network ports before/after tests

4. Diagnostic Script:
   - Created scripts/diagnose_hang.sh for comprehensive system checks
   - Shows processes, network, file descriptors, memory, ZMQ status
   - Automatically runs on timeout for detailed debugging info

This allows debugging CI hangs via SSH when needed while providing extensive logging by default.

* fix: add diagnostic script (force add to override .gitignore)

The diagnose_hang.sh script needs to be in git for CI to use it.
Using -f to override *.sh rule in .gitignore.

* test: investigate hanging [debug]

* fix: move tmate debug session inside pytest step to avoid hanging

The issue was that tmate was placed before pytest step, but the hang
occurs during pytest execution. Now tmate starts inside the test step
and provides connection info before running tests.

* debug: trigger tmate debug session [debug]

* fix: debug variable values and add commit message [debug] trigger

- Add debug output to show variable values
- Support both manual trigger and [debug] in commit message

* fix: force debug mode for investigation branch

- Auto-enable debug mode for debug/clean-state-investigation branch
- Add more debug info to troubleshoot trigger issues
- This ensures tmate will start regardless of trigger method

* fix: use github.head_ref for PR branch detection

For pull requests, github.ref is refs/pull/N/merge, but github.head_ref
contains the actual branch name. This should fix debug mode detection.

* fix: FORCE debug mode on - no more conditions

Just always enable debug mode on this branch.
We need tmate to work for investigation!

* fix: improve tmate connection info retrieval

- Add proper wait and retry logic for tmate initialization
- Tmate needs time to connect to servers before showing SSH info
- Try multiple times with delays to get connection details

* fix: ensure OpenMP is found during DiskANN build on macOS

- Add OpenMP environment variables directly in build step
- Should fix the libomp.dylib not found error on macOS-14

* fix: simplify macOS OpenMP configuration to match main branch

- Remove complex OpenMP environment variables
- Use simplified configuration from working main branch
- Remove redundant OpenMP setup in DiskANN build step
- Keep essential settings: OpenMP_ROOT, CMAKE_PREFIX_PATH, LDFLAGS, CPPFLAGS

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: revert DiskANN submodule to stable version

The debug branch had updated DiskANN submodule to a version with
hardcoded OpenMP paths that break macOS 13 builds. This reverts
to the stable version used in main branch.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: update faiss submodule to latest stable version

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: remove upterm/tmate debug code and clean CI workflow

- Remove all upterm/tmate SSH debugging infrastructure
- Restore clean CI workflow from main branch
- Remove diagnostic script that was only for SSH debugging
- Keep valuable DiskANN and HNSW backend improvements

This provides a clean base to add targeted pytest hang debugging
without the complexity of SSH sessions.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* debug: increase timeouts to 600s for comprehensive hang investigation

- Increase pytest timeout from 300s to 600s for thorough testing
- Increase import testing timeout from 60s to 120s
- Allow more time for C++ extension loading (faiss/diskann)
- Still provides timeout protection against infinite hangs

This gives the system more time to complete imports and tests
while still catching genuine hangs that exceed reasonable limits.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: remove debug_enabled parameter from build-and-publish workflow

- Remove debug_enabled input parameter that no longer exists in build-reusable.yml
- Keep workflow_dispatch trigger but without debug options
- Fixes workflow validation error: 'debug_enabled is not defined'

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* debug: fix YAML syntax and add post-pytest cleanup monitoring

- Fix Python code formatting in YAML (pre-commit fixed indentation issues)
- Add comprehensive post-pytest cleanup monitoring
- Monitor for hanging processes after test completion
- Focus on teardown phase based on previous hang analysis

This addresses the root cause identified: hang occurs after tests pass,
likely during cleanup/teardown of C++ extensions or embedding servers.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* debug: add external process monitoring and unbuffered output for precise hang detection

* fix

* feat: add comprehensive hang detection for pytest CI debugging

- Add Python faulthandler integration with signal-triggered stack dumps
- Implement periodic stack dumps at 5min and 10min intervals
- Add external process monitoring with SIGUSR1 signal on hang detection
- Use debug_pytest.py wrapper to capture exact hang location in C++ cleanup
- Enhance CPU stability monitoring to trigger precise stack traces

This addresses the persistent pytest hanging issue in Ubuntu 22.04 CI by
providing detailed stack traces to identify the exact code location where
the hang occurs during test cleanup phase.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* CI: move pytest hang-debug script into scripts/ci_debug_pytest.py; sort imports and apply ruff suggestion; update workflow to call the script

* fix: improve hang detection to monitor actual pytest process

* fix: implement comprehensive solution for CI pytest hangs

Key improvements:
1. Replace complex monitoring with simpler process group management
2. Add pytest conftest.py with per-test timeouts and aggressive cleanup
3. Skip problematic tests in CI that cause infinite loops
4. Enhanced cleanup at session start/end and after each test
5. Shorter timeouts (3min per test, 10min total) with better monitoring

This should resolve the hanging issues by:
- Preventing individual tests from running too long
- Automatically cleaning up hanging processes
- Skipping known problematic tests in CI
- Using process groups for more reliable cleanup

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: correct pytest_runtest_call hook parameter in conftest.py

- Change invalid 'puretest' parameter to proper pytest hooks
- Replace problematic pytest_runtest_call with pytest_runtest_setup/teardown
- This fixes PluginValidationError preventing pytest from starting
- Remove unused time import

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: prevent wrapper script from killing itself in cleanup

- Remove overly aggressive pattern 'python.*pytest' that matched wrapper itself
- Add current PID check to avoid killing wrapper process
- Add exclusion for wrapper and debug script names
- This fixes exit code 137 (SIGKILL) issue where wrapper killed itself

Root cause: cleanup function was killing the wrapper process itself,
causing immediate termination with no output in CI.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: prevent wrapper from detecting itself as remaining process

- Add PID and script name checks in post-test verification
- Avoid false positive detection of wrapper process as 'remaining'
- This prevents unnecessary cleanup calls that could cause hangs
- Root cause: wrapper was trying to clean up itself in verification phase

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: implement graceful shutdown for embedding servers

- Replace daemon threads with coordinated shutdown mechanism
- Add shutdown_event for thread synchronization
- Implement proper ZMQ resource cleanup
- Wait for threads to complete before exit
- Add ZMQ timeout to allow periodic shutdown checks
- Move signal handlers into server functions for proper scope access
- Fix protobuf class names and variable references
- Simplify resource cleanup to avoid variable scope issues

Root cause: Original servers used daemon threads + direct sys.exit(0)
which interrupted ZMQ operations and prevented proper resource cleanup,
causing hangs during process termination in CI environments.

This should resolve the core pytest hanging issue without complex wrappers.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: simplify embedding server process management

- Remove start_new_session=True to fix signal handling issues
- Simplify termination logic to use standard SIGTERM/SIGKILL
- Remove complex process group management that could cause hangs
- Add timeout-based cleanup to prevent CI hangs while ensuring proper resource cleanup
- Give graceful shutdown more time (5s) since we fixed the server shutdown logic
- Remove unused signal import

This addresses the remaining process management issues that could
cause startup failures and hanging during termination.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: increase CI test timeouts to accommodate model download

Analysis of recent CI failures shows:
- Model download takes ~12 seconds
- Embedding server startup + first search takes additional ~78 seconds
- Total time needed: ~90-100 seconds

Updated timeouts:
- test_readme_basic_example: 90s -> 180s
- test_backend_options: 60s -> 150s
- test_llm_config_simulated: 75s -> 150s

Root cause: Initial model download from huggingface.co in CI environment
is slower than local development, causing legitimate timeouts rather than
actual hanging processes.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* debug: preserve stderr in CI to debug embedding server startup failures

Previous fix revealed the real issue: embedding server fails to start within 120s,
not timeout issues. The error was hidden because both stdout and stderr were
redirected to DEVNULL in CI.

Changes:
- Keep stderr output in CI environment for debugging
- Only redirect stdout to DEVNULL to avoid buffer deadlock
- This will help us see why embedding server startup is failing

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(embedding-server): ensure shutdown-capable ZMQ threads create/bind their own REP sockets and poll with timeouts; fix undefined socket causing startup crash and CI hangs on Ubuntu 22.04

* style(hnsw-server): apply ruff-format after robustness changes

* fix(hnsw-server): be lenient to nested [[ids]] for both distance and embedding requests to match client expectations; prevents missing ID lookup when wrapper nests the list

* refactor(hnsw-server): remove duplicate legacy ZMQ thread; keep single shutdown-capable server implementation to reduce surface and avoid hangs

* ci: simplify test step to run pytest uniformly across OS; drop ubuntu-22.04 wrapper special-casing

* chore(ci): remove unused pytest wrapper and debug runner

* refactor(diskann): remove redundant graph_partition_simple; keep single partition API (graph_partition)

* refactor(hnsw-convert): remove global print override; rely on default flushing in CI

* tests: drop custom ci_timeout decorator and helpers; rely on pytest defaults and simplified CI

* tests: remove conftest global timeouts/cleanup; keep test suite minimal and rely on simplified CI + robust servers

* tests: call searcher.cleanup()/chat.cleanup() to ensure background embedding servers terminate after tests

* tests: fix ruff warnings in minimal conftest

* core: add weakref.finalize and atexit-based cleanup in EmbeddingServerManager to ensure server stops on interpreter exit/GC

* tests: remove minimal conftest to validate atexit/weakref cleanup path

* core: adopt compatible running server (record PID) and ensure stop_server() can terminate adopted processes; clear server_port on stop

* ci/core: skip compatibility scanning in CI (LEANN_SKIP_COMPAT=1) to avoid slow/hanging process scans; always pick a fresh available port

* core: unify atexit to always call _finalize_process (covers both self-launched and adopted servers)

* zmq: set SNDTIMEO=1s and LINGER=0 for REP sockets to avoid send blocking during shutdown; reduces CI hang risk

* tests(ci): skip DiskANN branch of README basic example on CI to avoid core dump in constrained runners; HNSW still validated

* diskann(ci): avoid stdout/stderr FD redirection in CI to prevent aborts from low-level dup2; no-op contextmanager on CI

* core: purge dead helpers and comments from EmbeddingServerManager; keep only minimal in-process flow

* core: fix lint (remove unused passages_file); keep per-instance reuse only

* fix: keep backward-compat

---------

Co-authored-by: yichuan520030910320 <yichuan_wang@berkeley.edu>
Co-authored-by: Claude <noreply@anthropic.com>
2025-08-14 01:02:24 -07:00
yichuan520030910320
21f7d8e031 docs: update -h and config advice 2025-08-13 14:26:35 -07:00
Andy Lee
46565b9249 docs: follows #34, patch leann backends into tool environment 2025-08-12 17:56:02 -07:00
GitHub Actions
3dad76126a chore: release v0.2.9 2025-08-12 23:00:12 +00:00
Andy Lee
18e28bda32 feat: Add macOS 15 support for M4 Mac compatibility (#38)
* feat: add macOS 15 support for M4 Mac compatibility

- Add macos-15 CI builds for Python 3.9-3.13
- Update MACOSX_DEPLOYMENT_TARGET from 11.0/13.3 to 14.0 for broader compatibility
- Addresses issue #34 with Mac M4 wheel compatibility

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: ensure wheels are compatible with older macOS versions

- Set MACOSX_DEPLOYMENT_TARGET=11.0 for HNSW backend (broad compatibility)
- Set MACOSX_DEPLOYMENT_TARGET=13.0 for DiskANN backend (required for LAPACK)
- Add --require-target-macos-version to delocate-wheel commands
- This fixes CI failures on macos-13 runners while maintaining M4 Mac support

Fixes the issue where wheels built on macos-14 runners were incorrectly
tagged as macosx_14_0, preventing installation on macos-13 runners.

* fix: use macOS 13.3 for DiskANN backend as required by LAPACK

DiskANN requires macOS 13.3+ for sgesdd_ LAPACK function, so we must
use 13.3 as the deployment target, not 13.0.

* fix: match deployment target with runner OS for library compatibility

The issue is that Homebrew libraries on macOS 14 runners are built for
macOS 14 and cannot be downgraded. We must use different deployment
targets based on the runner OS:

- macOS 13 runners: Can build for macOS 11.0 (HNSW) and 13.3 (DiskANN)
- macOS 14 runners: Must build for macOS 14.0 (due to system libraries)

This ensures delocate-wheel succeeds by matching the deployment target
with the actual minimum version required by bundled libraries.

* fix: add macOS 15 support to deployment target configuration

The issue extends to macOS 15 runners where Homebrew libraries are built
for macOS 15. We must handle all runner versions explicitly:

- macOS 13 runners: Can build for macOS 11.0 (HNSW) and 13.3 (DiskANN)
- macOS 14 runners: Must build for macOS 14.0 (system libraries)
- macOS 15 runners: Must build for macOS 15.0 (system libraries)

This ensures wheels are properly tagged for their actual minimum
supported macOS version, matching the bundled libraries.

* fix: correct macOS deployment targets based on Homebrew library requirements

The key insight is that Homebrew libraries on each macOS version are
compiled for that specific version:
- macOS 13: Libraries require macOS 13.0 minimum
- macOS 14: Libraries require macOS 14.0 minimum
- macOS 15: Libraries require macOS 15.0 minimum

We cannot build wheels for older macOS versions than what the bundled
Homebrew libraries require. This means:
- macOS 13 runners: Build for macOS 13.0+ (HNSW) and 13.3+ (DiskANN)
- macOS 14 runners: Build for macOS 14.0+
- macOS 15 runners: Build for macOS 15.0+

This ensures delocate-wheel succeeds by matching deployment targets
with the actual minimum versions required by system libraries.

* fix: restore macOS 15 build matrix and correct test path

- Add back macOS 15 configurations for Python 3.9-3.13
- Fix pytest path from test/ to tests/ (correct directory name)

The macOS 15 support was accidentally missing from the matrix, and
pytest was looking for the wrong directory name.

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-08-12 14:01:02 -07:00
GitHub Actions
609fa62fd5 chore: release v0.2.8 2025-08-12 19:04:51 +00:00
Yichuan Wang
eab13434ef feat: support multiple input formats for --docs argument (#39) 2025-08-12 10:30:31 -07:00
38 changed files with 2605 additions and 549 deletions

View File

@@ -5,6 +5,7 @@ on:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:
jobs:
build:

View File

@@ -64,6 +64,16 @@ jobs:
python: '3.12'
- os: macos-14
python: '3.13'
- os: macos-15
python: '3.9'
- os: macos-15
python: '3.10'
- os: macos-15
python: '3.11'
- os: macos-15
python: '3.12'
- os: macos-15
python: '3.13'
- os: macos-13
python: '3.9'
- os: macos-13
@@ -147,7 +157,14 @@ jobs:
# Use system clang for better compatibility
export CC=clang
export CXX=clang++
export MACOSX_DEPLOYMENT_TARGET=11.0
# Homebrew libraries on each macOS version require matching minimum version
if [[ "${{ matrix.os }}" == "macos-13" ]]; then
export MACOSX_DEPLOYMENT_TARGET=13.0
elif [[ "${{ matrix.os }}" == "macos-14" ]]; then
export MACOSX_DEPLOYMENT_TARGET=14.0
elif [[ "${{ matrix.os }}" == "macos-15" ]]; then
export MACOSX_DEPLOYMENT_TARGET=15.0
fi
uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist
else
uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist
@@ -161,7 +178,14 @@ jobs:
export CC=clang
export CXX=clang++
# DiskANN requires macOS 13.3+ for sgesdd_ LAPACK function
export MACOSX_DEPLOYMENT_TARGET=13.3
# But Homebrew libraries on each macOS version require matching minimum version
if [[ "${{ matrix.os }}" == "macos-13" ]]; then
export MACOSX_DEPLOYMENT_TARGET=13.3
elif [[ "${{ matrix.os }}" == "macos-14" ]]; then
export MACOSX_DEPLOYMENT_TARGET=14.0
elif [[ "${{ matrix.os }}" == "macos-15" ]]; then
export MACOSX_DEPLOYMENT_TARGET=15.0
fi
uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist
else
uv build --wheel --python ${{ matrix.python }} --find-links ${GITHUB_WORKSPACE}/packages/leann-core/dist
@@ -197,10 +221,24 @@ jobs:
- name: Repair wheels (macOS)
if: runner.os == 'macOS'
run: |
# Determine deployment target based on runner OS
# Must match the Homebrew libraries for each macOS version
if [[ "${{ matrix.os }}" == "macos-13" ]]; then
HNSW_TARGET="13.0"
DISKANN_TARGET="13.3"
elif [[ "${{ matrix.os }}" == "macos-14" ]]; then
HNSW_TARGET="14.0"
DISKANN_TARGET="14.0"
elif [[ "${{ matrix.os }}" == "macos-15" ]]; then
HNSW_TARGET="15.0"
DISKANN_TARGET="15.0"
fi
# Repair HNSW wheel
cd packages/leann-backend-hnsw
if [ -d dist ]; then
delocate-wheel -w dist_repaired -v dist/*.whl
export MACOSX_DEPLOYMENT_TARGET=$HNSW_TARGET
delocate-wheel -w dist_repaired -v --require-target-macos-version $HNSW_TARGET dist/*.whl
rm -rf dist
mv dist_repaired dist
fi
@@ -209,7 +247,8 @@ jobs:
# Repair DiskANN wheel
cd packages/leann-backend-diskann
if [ -d dist ]; then
delocate-wheel -w dist_repaired -v dist/*.whl
export MACOSX_DEPLOYMENT_TARGET=$DISKANN_TARGET
delocate-wheel -w dist_repaired -v --require-target-macos-version $DISKANN_TARGET dist/*.whl
rm -rf dist
mv dist_repaired dist
fi
@@ -238,19 +277,16 @@ jobs:
- name: Run tests with pytest
env:
CI: true # Mark as CI environment to skip memory-intensive tests
CI: true
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
HF_HUB_DISABLE_SYMLINKS: 1
TOKENIZERS_PARALLELISM: false
PYTORCH_ENABLE_MPS_FALLBACK: 0 # Disable MPS on macOS CI to avoid memory issues
OMP_NUM_THREADS: 1 # Disable OpenMP parallelism to avoid libomp crashes
MKL_NUM_THREADS: 1 # Single thread for MKL operations
PYTORCH_ENABLE_MPS_FALLBACK: 0
OMP_NUM_THREADS: 1
MKL_NUM_THREADS: 1
run: |
# Activate virtual environment
source .venv/bin/activate || source .venv/Scripts/activate
# Run all tests
pytest tests/
pytest tests/ -v --tb=short
- name: Run sanity checks (optional)
run: |

View File

@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
@@ -10,7 +10,7 @@ repos:
- id: debug-statements
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.2.1
rev: v0.12.7 # Fixed version to match pyproject.toml
hooks:
- id: ruff
- id: ruff-format

View File

@@ -31,7 +31,7 @@ LEANN achieves this through *graph-based selective recomputation* with *high-deg
<img src="assets/effects.png" alt="LEANN vs Traditional Vector DB Storage Comparison" width="70%">
</p>
> **The numbers speak for themselves:** Index 60 million text chunks in just 6GB instead of 201GB. From emails to browser history, everything fits on your laptop. [See detailed benchmarks for different applications below ↓](#storage-comparison)
> **The numbers speak for themselves:** Index 60 million text chunks in just 6GB instead of 201GB. From emails to browser history, everything fits on your laptop. [See detailed benchmarks for different applications below ↓](#-storage-comparison)
🔒 **Privacy:** Your data never leaves your laptop. No OpenAI, no cloud, no "terms of service".
@@ -70,6 +70,8 @@ uv venv
source .venv/bin/activate
uv pip install leann
```
<!--
> Low-resource? See “Low-resource setups” in the [Configuration Guide](docs/configuration-guide.md#low-resource-setups). -->
<details>
<summary>
@@ -184,34 +186,34 @@ All RAG examples share these common parameters. **Interactive mode** is availabl
```bash
# Core Parameters (General preprocessing for all examples)
--index-dir DIR # Directory to store the index (default: current directory)
--query "YOUR QUESTION" # Single query mode. Omit for interactive chat (type 'quit' to exit), and now you can play with your index interactively
--max-items N # Limit data preprocessing (default: -1, process all data)
--force-rebuild # Force rebuild index even if it exists
--index-dir DIR # Directory to store the index (default: current directory)
--query "YOUR QUESTION" # Single query mode. Omit for interactive chat (type 'quit' to exit), and now you can play with your index interactively
--max-items N # Limit data preprocessing (default: -1, process all data)
--force-rebuild # Force rebuild index even if it exists
# Embedding Parameters
--embedding-model MODEL # e.g., facebook/contriever, text-embedding-3-small, nomic-embed-text,mlx-community/Qwen3-Embedding-0.6B-8bit or nomic-embed-text
--embedding-mode MODE # sentence-transformers, openai, mlx, or ollama
--embedding-model MODEL # e.g., facebook/contriever, text-embedding-3-small, mlx-community/Qwen3-Embedding-0.6B-8bit or nomic-embed-text
--embedding-mode MODE # sentence-transformers, openai, mlx, or ollama
# LLM Parameters (Text generation models)
--llm TYPE # LLM backend: openai, ollama, or hf (default: openai)
--llm-model MODEL # Model name (default: gpt-4o) e.g., gpt-4o-mini, llama3.2:1b, Qwen/Qwen2.5-1.5B-Instruct
--thinking-budget LEVEL # Thinking budget for reasoning models: low/medium/high (supported by o3, o3-mini, GPT-Oss:20b, and other reasoning models)
--llm TYPE # LLM backend: openai, ollama, or hf (default: openai)
--llm-model MODEL # Model name (default: gpt-4o) e.g., gpt-4o-mini, llama3.2:1b, Qwen/Qwen2.5-1.5B-Instruct
--thinking-budget LEVEL # Thinking budget for reasoning models: low/medium/high (supported by o3, o3-mini, GPT-Oss:20b, and other reasoning models)
# Search Parameters
--top-k N # Number of results to retrieve (default: 20)
--search-complexity N # Search complexity for graph traversal (default: 32)
--top-k N # Number of results to retrieve (default: 20)
--search-complexity N # Search complexity for graph traversal (default: 32)
# Chunking Parameters
--chunk-size N # Size of text chunks (default varies by source: 256 for most, 192 for WeChat)
--chunk-overlap N # Overlap between chunks (default varies: 25-128 depending on source)
--chunk-size N # Size of text chunks (default varies by source: 256 for most, 192 for WeChat)
--chunk-overlap N # Overlap between chunks (default varies: 25-128 depending on source)
# Index Building Parameters
--backend-name NAME # Backend to use: hnsw or diskann (default: hnsw)
--graph-degree N # Graph degree for index construction (default: 32)
--build-complexity N # Build complexity for index construction (default: 64)
--no-compact # Disable compact index storage (compact storage IS enabled to save storage by default)
--no-recompute # Disable embedding recomputation (recomputation IS enabled to save storage by default)
--backend-name NAME # Backend to use: hnsw or diskann (default: hnsw)
--graph-degree N # Graph degree for index construction (default: 32)
--build-complexity N # Build complexity for index construction (default: 64)
--compact / --no-compact # Use compact storage (default: true). Must be `no-compact` for `no-recompute` build.
--recompute / --no-recompute # Enable/disable embedding recomputation (default: enabled). Should not do a `no-recompute` search in a `recompute` build.
```
</details>
@@ -455,7 +457,7 @@ leann --help
**To make it globally available:**
```bash
# Install the LEANN CLI globally using uv tool
uv tool install leann
uv tool install leann-core
# Now you can use leann from anywhere without activating venv
leann --help
@@ -482,27 +484,29 @@ leann list
```
**Key CLI features:**
- Auto-detects document formats (PDF, TXT, MD, DOCX)
- Auto-detects document formats (PDF, TXT, MD, DOCX, PPTX + code files)
- Smart text chunking with overlap
- Multiple LLM providers (Ollama, OpenAI, HuggingFace)
- Organized index storage in `~/.leann/indexes/`
- Organized index storage in `.leann/indexes/` (project-local)
- Support for advanced search parameters
<details>
<summary><strong>📋 Click to expand: Complete CLI Reference</strong></summary>
You can use `leann --help`, or `leann build --help`, `leann search --help`, `leann ask --help` to get the complete CLI reference.
**Build Command:**
```bash
leann build INDEX_NAME --docs DIRECTORY [OPTIONS]
leann build INDEX_NAME --docs DIRECTORY|FILE [DIRECTORY|FILE ...] [OPTIONS]
Options:
--backend {hnsw,diskann} Backend to use (default: hnsw)
--embedding-model MODEL Embedding model (default: facebook/contriever)
--graph-degree N Graph degree (default: 32)
--complexity N Build complexity (default: 64)
--force Force rebuild existing index
--compact Use compact storage (default: true)
--recompute Enable recomputation (default: true)
--graph-degree N Graph degree (default: 32)
--complexity N Build complexity (default: 64)
--force Force rebuild existing index
--compact / --no-compact Use compact storage (default: true). Must be `no-compact` for `no-recompute` build.
--recompute / --no-recompute Enable recomputation (default: true)
```
**Search Command:**
@@ -510,9 +514,9 @@ Options:
leann search INDEX_NAME QUERY [OPTIONS]
Options:
--top-k N Number of results (default: 5)
--complexity N Search complexity (default: 64)
--recompute-embeddings Use recomputation for highest accuracy
--top-k N Number of results (default: 5)
--complexity N Search complexity (default: 64)
--recompute / --no-recompute Enable/disable embedding recomputation (default: enabled). Should not do a `no-recompute` search in a `recompute` build.
--pruning-strategy {global,local,proportional}
```
@@ -543,12 +547,16 @@ Options:
- **Dynamic batching:** Efficiently batch embedding computations for GPU utilization
- **Two-level search:** Smart graph traversal that prioritizes promising nodes
**Backends:** HNSW (default) for most use cases, with optional DiskANN support for billion-scale datasets.
**Backends:**
- **HNSW** (default): Ideal for most datasets with maximum storage savings through full recomputation
- **DiskANN**: Advanced option with superior search performance, using PQ-based graph traversal with real-time reranking for the best speed-accuracy trade-off
## Benchmarks
**[DiskANN vs HNSW Performance Comparison →](benchmarks/diskann_vs_hnsw_speed_comparison.py)** - Compare search performance between both backends
**[Simple Example: Compare LEANN vs FAISS →](benchmarks/compare_faiss_vs_leann.py)** - See storage savings in action
**[Simple Example: Compare LEANN vs FAISS →](benchmarks/compare_faiss_vs_leann.py)**
### 📊 Storage Comparison
| System | DPR (2.1M) | Wiki (60M) | Chat (400K) | Email (780K) | Browser (38K) |

View File

@@ -69,14 +69,14 @@ class BaseRAGExample(ABC):
"--embedding-model",
type=str,
default=embedding_model_default,
help=f"Embedding model to use (default: {embedding_model_default})",
help=f"Embedding model to use (default: {embedding_model_default}), we provide facebook/contriever, text-embedding-3-small,mlx-community/Qwen3-Embedding-0.6B-8bit or nomic-embed-text",
)
embedding_group.add_argument(
"--embedding-mode",
type=str,
default="sentence-transformers",
choices=["sentence-transformers", "openai", "mlx", "ollama"],
help="Embedding backend mode (default: sentence-transformers)",
help="Embedding backend mode (default: sentence-transformers), we provide sentence-transformers, openai, mlx, or ollama",
)
# LLM parameters
@@ -86,13 +86,13 @@ class BaseRAGExample(ABC):
type=str,
default="openai",
choices=["openai", "ollama", "hf", "simulated"],
help="LLM backend to use (default: openai)",
help="LLM backend: openai, ollama, or hf (default: openai)",
)
llm_group.add_argument(
"--llm-model",
type=str,
default=None,
help="LLM model name (default: gpt-4o for openai, llama3.2:1b for ollama)",
help="Model name (default: gpt-4o) e.g., gpt-4o-mini, llama3.2:1b, Qwen/Qwen2.5-1.5B-Instruct",
)
llm_group.add_argument(
"--llm-host",
@@ -178,6 +178,9 @@ class BaseRAGExample(ABC):
config["host"] = args.llm_host
elif args.llm == "hf":
config["model"] = args.llm_model or "Qwen/Qwen2.5-1.5B-Instruct"
elif args.llm == "simulated":
# Simulated LLM doesn't need additional configuration
pass
return config

View File

@@ -1,9 +1,24 @@
# 🧪 Leann Sanity Checks
# 🧪 LEANN Benchmarks & Testing
This directory contains comprehensive sanity checks for the Leann system, ensuring all components work correctly across different configurations.
This directory contains performance benchmarks and comprehensive tests for the LEANN system, including backend comparisons and sanity checks across different configurations.
## 📁 Test Files
### `diskann_vs_hnsw_speed_comparison.py`
Performance comparison between DiskANN and HNSW backends:
-**Search latency** comparison with both backends using recompute
-**Index size** and **build time** measurements
-**Score validity** testing (ensures no -inf scores)
-**Configurable dataset sizes** for different scales
```bash
# Quick comparison with 500 docs, 10 queries
python benchmarks/diskann_vs_hnsw_speed_comparison.py
# Large-scale comparison with 2000 docs, 20 queries
python benchmarks/diskann_vs_hnsw_speed_comparison.py 2000 20
```
### `test_distance_functions.py`
Tests all supported distance functions across DiskANN backend:
-**MIPS** (Maximum Inner Product Search)

View File

@@ -0,0 +1,148 @@
import argparse
import os
import time
from pathlib import Path
from leann import LeannBuilder, LeannSearcher
def _meta_exists(index_path: str) -> bool:
p = Path(index_path)
return (p.parent / f"{p.stem}.meta.json").exists()
def ensure_index(index_path: str, backend_name: str, num_docs: int, is_recompute: bool) -> None:
# if _meta_exists(index_path):
# return
kwargs = {}
if backend_name == "hnsw":
kwargs["is_compact"] = is_recompute
builder = LeannBuilder(
backend_name=backend_name,
embedding_model=os.getenv("LEANN_EMBED_MODEL", "facebook/contriever"),
embedding_mode=os.getenv("LEANN_EMBED_MODE", "sentence-transformers"),
graph_degree=32,
complexity=64,
is_recompute=is_recompute,
num_threads=4,
**kwargs,
)
for i in range(num_docs):
builder.add_text(
f"This is a test document number {i}. It contains some repeated text for benchmarking."
)
builder.build_index(index_path)
def _bench_group(
index_path: str,
recompute: bool,
query: str,
repeats: int,
complexity: int = 32,
top_k: int = 10,
) -> float:
# Independent searcher per group; fixed port when recompute
searcher = LeannSearcher(index_path=index_path)
# Warm-up once
_ = searcher.search(
query,
top_k=top_k,
complexity=complexity,
recompute_embeddings=recompute,
)
def _once() -> float:
t0 = time.time()
_ = searcher.search(
query,
top_k=top_k,
complexity=complexity,
recompute_embeddings=recompute,
)
return time.time() - t0
if repeats <= 1:
t = _once()
else:
vals = [_once() for _ in range(repeats)]
vals.sort()
t = vals[len(vals) // 2]
searcher.cleanup()
return t
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--num-docs", type=int, default=5000)
parser.add_argument("--repeats", type=int, default=3)
parser.add_argument("--complexity", type=int, default=32)
args = parser.parse_args()
base = Path.cwd() / ".leann" / "indexes" / f"bench_n{args.num_docs}"
base.parent.mkdir(parents=True, exist_ok=True)
# ---------- Build HNSW variants ----------
hnsw_r = str(base / f"hnsw_recompute_n{args.num_docs}.leann")
hnsw_nr = str(base / f"hnsw_norecompute_n{args.num_docs}.leann")
ensure_index(hnsw_r, "hnsw", args.num_docs, True)
ensure_index(hnsw_nr, "hnsw", args.num_docs, False)
# ---------- Build DiskANN variants ----------
diskann_r = str(base / "diskann_r.leann")
diskann_nr = str(base / "diskann_nr.leann")
ensure_index(diskann_r, "diskann", args.num_docs, True)
ensure_index(diskann_nr, "diskann", args.num_docs, False)
# ---------- Helpers ----------
def _size_for(prefix: str) -> int:
p = Path(prefix)
base_dir = p.parent
stem = p.stem
total = 0
for f in base_dir.iterdir():
if f.is_file() and f.name.startswith(stem):
total += f.stat().st_size
return total
# ---------- HNSW benchmark ----------
t_hnsw_r = _bench_group(
hnsw_r, True, "test document number 42", repeats=args.repeats, complexity=args.complexity
)
t_hnsw_nr = _bench_group(
hnsw_nr, False, "test document number 42", repeats=args.repeats, complexity=args.complexity
)
size_hnsw_r = _size_for(hnsw_r)
size_hnsw_nr = _size_for(hnsw_nr)
print("Benchmark results (HNSW):")
print(f" recompute=True: search_time={t_hnsw_r:.3f}s, size={size_hnsw_r / 1024 / 1024:.1f}MB")
print(
f" recompute=False: search_time={t_hnsw_nr:.3f}s, size={size_hnsw_nr / 1024 / 1024:.1f}MB"
)
print(" Expectation: no-recompute should be faster but larger on disk.")
# ---------- DiskANN benchmark ----------
t_diskann_r = _bench_group(
diskann_r, True, "DiskANN R test doc 123", repeats=args.repeats, complexity=args.complexity
)
t_diskann_nr = _bench_group(
diskann_nr,
False,
"DiskANN NR test doc 123",
repeats=args.repeats,
complexity=args.complexity,
)
size_diskann_r = _size_for(diskann_r)
size_diskann_nr = _size_for(diskann_nr)
print("\nBenchmark results (DiskANN):")
print(f" build(recompute=True, partition): size={size_diskann_r / 1024 / 1024:.1f}MB")
print(f" build(recompute=False): size={size_diskann_nr / 1024 / 1024:.1f}MB")
print(f" search recompute=True (final rerank): {t_diskann_r:.3f}s")
print(f" search recompute=False (PQ only): {t_diskann_nr:.3f}s")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
DiskANN vs HNSW Search Performance Comparison
This benchmark compares search performance between DiskANN and HNSW backends:
- DiskANN: With graph partitioning enabled (is_recompute=True)
- HNSW: With recompute enabled (is_recompute=True)
- Tests performance across different dataset sizes
- Measures search latency, recall, and index size
"""
import gc
import multiprocessing as mp
import tempfile
import time
from pathlib import Path
from typing import Any
import numpy as np
# Prefer 'fork' start method to avoid POSIX semaphore leaks on macOS
try:
mp.set_start_method("fork", force=True)
except Exception:
pass
def create_test_texts(n_docs: int) -> list[str]:
"""Create synthetic test documents for benchmarking."""
np.random.seed(42)
topics = [
"machine learning and artificial intelligence",
"natural language processing and text analysis",
"computer vision and image recognition",
"data science and statistical analysis",
"deep learning and neural networks",
"information retrieval and search engines",
"database systems and data management",
"software engineering and programming",
"cybersecurity and network protection",
"cloud computing and distributed systems",
]
texts = []
for i in range(n_docs):
topic = topics[i % len(topics)]
variation = np.random.randint(1, 100)
text = (
f"This is document {i} about {topic}. Content variation {variation}. "
f"Additional information about {topic} with details and examples. "
f"Technical discussion of {topic} including implementation aspects."
)
texts.append(text)
return texts
def benchmark_backend(
backend_name: str, texts: list[str], test_queries: list[str], backend_kwargs: dict[str, Any]
) -> dict[str, float]:
"""Benchmark a specific backend with the given configuration."""
from leann.api import LeannBuilder, LeannSearcher
print(f"\n🔧 Testing {backend_name.upper()} backend...")
with tempfile.TemporaryDirectory() as temp_dir:
index_path = str(Path(temp_dir) / f"benchmark_{backend_name}.leann")
# Build index
print(f"📦 Building {backend_name} index with {len(texts)} documents...")
start_time = time.time()
builder = LeannBuilder(
backend_name=backend_name,
embedding_model="facebook/contriever",
embedding_mode="sentence-transformers",
**backend_kwargs,
)
for text in texts:
builder.add_text(text)
builder.build_index(index_path)
build_time = time.time() - start_time
# Measure index size
index_dir = Path(index_path).parent
index_files = list(index_dir.glob(f"{Path(index_path).stem}.*"))
total_size = sum(f.stat().st_size for f in index_files if f.is_file())
size_mb = total_size / (1024 * 1024)
print(f" ✅ Build completed in {build_time:.2f}s, index size: {size_mb:.1f}MB")
# Search benchmark
print("🔍 Running search benchmark...")
searcher = LeannSearcher(index_path)
search_times = []
all_results = []
for query in test_queries:
start_time = time.time()
results = searcher.search(query, top_k=5)
search_time = time.time() - start_time
search_times.append(search_time)
all_results.append(results)
avg_search_time = np.mean(search_times) * 1000 # Convert to ms
print(f" ✅ Average search time: {avg_search_time:.1f}ms")
# Check for valid scores (detect -inf issues)
all_scores = [
result.score
for results in all_results
for result in results
if result.score is not None
]
valid_scores = [
score for score in all_scores if score != float("-inf") and score != float("inf")
]
score_validity_rate = len(valid_scores) / len(all_scores) if all_scores else 0
# Clean up (ensure embedding server shutdown and object GC)
try:
if hasattr(searcher, "cleanup"):
searcher.cleanup()
del searcher
del builder
gc.collect()
except Exception as e:
print(f"⚠️ Warning: Resource cleanup error: {e}")
return {
"build_time": build_time,
"avg_search_time_ms": avg_search_time,
"index_size_mb": size_mb,
"score_validity_rate": score_validity_rate,
}
def run_comparison(n_docs: int = 500, n_queries: int = 10):
"""Run performance comparison between DiskANN and HNSW."""
print("🚀 Starting DiskANN vs HNSW Performance Comparison")
print(f"📊 Dataset: {n_docs} documents, {n_queries} test queries")
# Create test data
texts = create_test_texts(n_docs)
test_queries = [
"machine learning algorithms",
"natural language processing",
"computer vision techniques",
"data analysis methods",
"neural network architectures",
"database query optimization",
"software development practices",
"security vulnerabilities",
"cloud infrastructure",
"distributed computing",
][:n_queries]
# HNSW benchmark
hnsw_results = benchmark_backend(
backend_name="hnsw",
texts=texts,
test_queries=test_queries,
backend_kwargs={
"is_recompute": True, # Enable recompute for fair comparison
"M": 16,
"efConstruction": 200,
},
)
# DiskANN benchmark
diskann_results = benchmark_backend(
backend_name="diskann",
texts=texts,
test_queries=test_queries,
backend_kwargs={
"is_recompute": True, # Enable graph partitioning
"num_neighbors": 32,
"search_list_size": 50,
},
)
# Performance comparison
print("\n📈 Performance Comparison Results")
print(f"{'=' * 60}")
print(f"{'Metric':<25} {'HNSW':<15} {'DiskANN':<15} {'Speedup':<10}")
print(f"{'-' * 60}")
# Build time comparison
build_speedup = hnsw_results["build_time"] / diskann_results["build_time"]
print(
f"{'Build Time (s)':<25} {hnsw_results['build_time']:<15.2f} {diskann_results['build_time']:<15.2f} {build_speedup:<10.2f}x"
)
# Search time comparison
search_speedup = hnsw_results["avg_search_time_ms"] / diskann_results["avg_search_time_ms"]
print(
f"{'Search Time (ms)':<25} {hnsw_results['avg_search_time_ms']:<15.1f} {diskann_results['avg_search_time_ms']:<15.1f} {search_speedup:<10.2f}x"
)
# Index size comparison
size_ratio = diskann_results["index_size_mb"] / hnsw_results["index_size_mb"]
print(
f"{'Index Size (MB)':<25} {hnsw_results['index_size_mb']:<15.1f} {diskann_results['index_size_mb']:<15.1f} {size_ratio:<10.2f}x"
)
# Score validity
print(
f"{'Score Validity (%)':<25} {hnsw_results['score_validity_rate'] * 100:<15.1f} {diskann_results['score_validity_rate'] * 100:<15.1f}"
)
print(f"{'=' * 60}")
print("\n🎯 Summary:")
if search_speedup > 1:
print(f" DiskANN is {search_speedup:.2f}x faster than HNSW for search")
else:
print(f" HNSW is {1 / search_speedup:.2f}x faster than DiskANN for search")
if size_ratio > 1:
print(f" DiskANN uses {size_ratio:.2f}x more storage than HNSW")
else:
print(f" DiskANN uses {1 / size_ratio:.2f}x less storage than HNSW")
print(
f" Both backends achieved {min(hnsw_results['score_validity_rate'], diskann_results['score_validity_rate']) * 100:.1f}% score validity"
)
if __name__ == "__main__":
import sys
try:
# Handle help request
if len(sys.argv) > 1 and sys.argv[1] in ["-h", "--help", "help"]:
print("DiskANN vs HNSW Performance Comparison")
print("=" * 50)
print(f"Usage: python {sys.argv[0]} [n_docs] [n_queries]")
print()
print("Arguments:")
print(" n_docs Number of documents to index (default: 500)")
print(" n_queries Number of test queries to run (default: 10)")
print()
print("Examples:")
print(" python benchmarks/diskann_vs_hnsw_speed_comparison.py")
print(" python benchmarks/diskann_vs_hnsw_speed_comparison.py 1000")
print(" python benchmarks/diskann_vs_hnsw_speed_comparison.py 2000 20")
sys.exit(0)
# Parse command line arguments
n_docs = int(sys.argv[1]) if len(sys.argv) > 1 else 500
n_queries = int(sys.argv[2]) if len(sys.argv) > 2 else 10
print("DiskANN vs HNSW Performance Comparison")
print("=" * 50)
print(f"Dataset: {n_docs} documents, {n_queries} queries")
print()
run_comparison(n_docs=n_docs, n_queries=n_queries)
except KeyboardInterrupt:
print("\n⚠️ Benchmark interrupted by user")
sys.exit(130)
except Exception as e:
print(f"\n❌ Benchmark failed: {e}")
sys.exit(1)
finally:
# Ensure clean exit (forceful to prevent rare hangs from atexit/threads)
try:
gc.collect()
print("\n🧹 Cleanup completed")
# Flush stdio to ensure message is visible before hard-exit
try:
import sys as _sys
_sys.stdout.flush()
_sys.stderr.flush()
except Exception:
pass
except Exception:
pass
# Use os._exit to bypass atexit handlers that may hang in rare cases
import os as _os
_os._exit(0)

View File

@@ -52,7 +52,7 @@ Based on our experience developing LEANN, embedding models fall into three categ
### Quick Start: Cloud and Local Embedding Options
**OpenAI Embeddings (Fastest Setup)**
For immediate testing without local model downloads:
For immediate testing without local model downloads(also if you [do not have GPU](https://github.com/yichuan-w/LEANN/issues/43) and do not care that much about your document leak, you should use this, we compute the embedding and recompute using openai API):
```bash
# Set OpenAI embeddings (requires OPENAI_API_KEY)
--embedding-mode openai --embedding-model text-embedding-3-small
@@ -97,16 +97,24 @@ ollama pull nomic-embed-text
```
### DiskANN
**Best for**: Large datasets (> 10M vectors, 10GB+ index size) - **⚠️ Beta version, still in active development**
- Uses Product Quantization (PQ) for coarse filtering during graph traversal
- Novel approach: stores only PQ codes, performs rerank with exact computation in final step
- Implements a corner case of double-queue: prunes all neighbors and recomputes at the end
**Best for**: Large datasets, especially when you want `recompute=True`.
**Key advantages:**
- **Faster search** on large datasets (3x+ speedup vs HNSW in many cases)
- **Smart storage**: `recompute=True` enables automatic graph partitioning for smaller indexes
- **Better scaling**: Designed for 100k+ documents
**Recompute behavior:**
- `recompute=True` (recommended): Pure PQ traversal + final reranking - faster and enables partitioning
- `recompute=False`: PQ + partial real distances during traversal - slower but higher accuracy
```bash
# For billion-scale deployments
--backend-name diskann --graph-degree 64 --build-complexity 128
# Recommended for most use cases
--backend-name diskann --graph-degree 32 --build-complexity 64
```
**Performance Benchmark**: Run `uv run benchmarks/diskann_vs_hnsw_speed_comparison.py` to compare DiskANN and HNSW on your system.
## LLM Selection: Engine and Model Comparison
### LLM Engines
@@ -259,27 +267,118 @@ Every configuration choice involves trade-offs:
The key is finding the right balance for your specific use case. Start small and simple, measure performance, then scale up only where needed.
## Deep Dive: Critical Configuration Decisions
## Low-resource setups
### When to Disable Recomputation
If you dont have a local GPU or builds/searches are too slow, use one or more of the options below.
LEANN's recomputation feature provides exact distance calculations but can be disabled for extreme QPS requirements:
### 1) Use OpenAI embeddings (no local compute)
Fastest path with zero local GPU requirements. Set your API key and use OpenAI embeddings during build and search:
```bash
--no-recompute # Disable selective recomputation
export OPENAI_API_KEY=sk-...
# Build with OpenAI embeddings
leann build my-index \
--embedding-mode openai \
--embedding-model text-embedding-3-small
# Search with OpenAI embeddings (recompute at query time)
leann search my-index "your query" \
--recompute
```
**Trade-offs**:
- **With recomputation** (default): Exact distances, best quality, higher latency, minimal storage (only stores metadata, recomputes embeddings on-demand)
- **Without recomputation**: Must store full embeddings, significantly higher memory and storage usage (10-100x more), but faster search
### 2) Run remote builds with SkyPilot (cloud GPU)
Offload embedding generation and index building to a GPU VM using [SkyPilot](https://skypilot.readthedocs.io/en/latest/). A template is provided at `sky/leann-build.yaml`.
```bash
# One-time: install and configure SkyPilot
pip install skypilot
# Launch with defaults (L4:1) and mount ./data to ~/leann-data; the build runs automatically
sky launch -c leann-gpu sky/leann-build.yaml
# Override parameters via -e key=value (optional)
sky launch -c leann-gpu sky/leann-build.yaml \
-e index_name=my-index \
-e backend=hnsw \
-e embedding_mode=sentence-transformers \
-e embedding_model=Qwen/Qwen3-Embedding-0.6B
# Copy the built index back to your local .leann (use rsync)
rsync -Pavz leann-gpu:~/.leann/indexes/my-index ./.leann/indexes/
```
### 3) Disable recomputation to trade storage for speed
If you need lower latency and have more storage/memory, disable recomputation. This stores full embeddings and avoids recomputing at search time.
```bash
# Build without recomputation (HNSW requires non-compact in this mode)
leann build my-index --no-recompute --no-compact
# Search without recomputation
leann search my-index "your query" --no-recompute
```
When to use:
- Extreme low latency requirements (high QPS, interactive assistants)
- Read-heavy workloads where storage is cheaper than latency
- No always-available GPU
Constraints:
- HNSW: when `--no-recompute` is set, LEANN automatically disables compact mode during build
- DiskANN: supported; `--no-recompute` skips selective recompute during search
Storage impact:
- Storing N embeddings of dimension D with float32 requires approximately N × D × 4 bytes
- Example: 1,000,000 chunks × 768 dims × 4 bytes ≈ 2.86 GB (plus graph/metadata)
Converting an existing index (rebuild required):
```bash
# Rebuild in-place (ensure you still have original docs or can regenerate chunks)
leann build my-index --force --no-recompute --no-compact
```
Python API usage:
```python
from leann import LeannSearcher
searcher = LeannSearcher("/path/to/my-index.leann")
results = searcher.search("your query", top_k=10, recompute_embeddings=False)
```
Trade-offs:
- Lower latency and fewer network hops at query time
- Significantly higher storage (10100× vs selective recomputation)
- Slightly larger memory footprint during build and search
Quick benchmark results (`benchmarks/benchmark_no_recompute.py` with 5k texts, complexity=32):
- HNSW
```text
recompute=True: search_time=0.818s, size=1.1MB
recompute=False: search_time=0.012s, size=16.6MB
```
- DiskANN
```text
recompute=True: search_time=0.041s, size=5.9MB
recompute=False: search_time=0.013s, size=24.6MB
```
Conclusion:
- **HNSW**: `no-recompute` is significantly faster (no embedding recomputation) but requires much more storage (stores all embeddings)
- **DiskANN**: `no-recompute` uses PQ + partial real distances during traversal (slower but higher accuracy), while `recompute=True` uses pure PQ traversal + final reranking (faster traversal, enables build-time partitioning for smaller storage)
**Disable when**:
- You have abundant storage and memory
- Need extremely low latency (< 100ms)
- Running a read-heavy workload where storage cost is acceptable
## Further Reading
- [Lessons Learned Developing LEANN](https://yichuan-w.github.io/blog/lessons_learned_in_dev_leann/)
- [LEANN Technical Paper](https://arxiv.org/abs/2506.08276)
- [DiskANN Original Paper](https://papers.nips.cc/paper/2019/file/09853c7fb1d3f8ee67a61b6bf4a7f8e6-Paper.pdf)
- [SSD-based Graph Partitioning](https://github.com/SonglinLife/SSD_BASED_PLAN)

View File

@@ -1 +1,7 @@
from . import diskann_backend as diskann_backend
from . import graph_partition
# Export main classes and functions
from .graph_partition import GraphPartitioner, partition_graph
__all__ = ["GraphPartitioner", "diskann_backend", "graph_partition", "partition_graph"]

View File

@@ -22,6 +22,11 @@ logger = logging.getLogger(__name__)
@contextlib.contextmanager
def suppress_cpp_output_if_needed():
"""Suppress C++ stdout/stderr based on LEANN_LOG_LEVEL"""
# In CI we avoid fiddling with low-level file descriptors to prevent aborts
if os.getenv("CI") == "true":
yield
return
log_level = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper()
# Only suppress if log level is WARNING or higher (ERROR, CRITICAL)
@@ -137,6 +142,71 @@ class DiskannBuilder(LeannBackendBuilderInterface):
def __init__(self, **kwargs):
self.build_params = kwargs
def _safe_cleanup_after_partition(self, index_dir: Path, index_prefix: str):
"""
Safely cleanup files after partition.
In partition mode, C++ doesn't read _disk.index content,
so we can delete it if all derived files exist.
"""
disk_index_file = index_dir / f"{index_prefix}_disk.index"
beam_search_file = index_dir / f"{index_prefix}_disk_beam_search.index"
# Required files that C++ partition mode needs
# Note: C++ generates these with _disk.index suffix
disk_suffix = "_disk.index"
required_files = [
f"{index_prefix}{disk_suffix}_medoids.bin", # Critical: assert fails if missing
# Note: _centroids.bin is not created in single-shot build - C++ handles this automatically
f"{index_prefix}_pq_pivots.bin", # PQ table
f"{index_prefix}_pq_compressed.bin", # PQ compressed vectors
]
# Check if all required files exist
missing_files = []
for filename in required_files:
file_path = index_dir / filename
if not file_path.exists():
missing_files.append(filename)
if missing_files:
logger.warning(
f"Cannot safely delete _disk.index - missing required files: {missing_files}"
)
logger.info("Keeping all original files for safety")
return
# Calculate space savings
space_saved = 0
files_to_delete = []
if disk_index_file.exists():
space_saved += disk_index_file.stat().st_size
files_to_delete.append(disk_index_file)
if beam_search_file.exists():
space_saved += beam_search_file.stat().st_size
files_to_delete.append(beam_search_file)
# Safe to delete!
for file_to_delete in files_to_delete:
try:
os.remove(file_to_delete)
logger.info(f"✅ Safely deleted: {file_to_delete.name}")
except Exception as e:
logger.warning(f"Failed to delete {file_to_delete.name}: {e}")
if space_saved > 0:
space_saved_mb = space_saved / (1024 * 1024)
logger.info(f"💾 Space saved: {space_saved_mb:.1f} MB")
# Show what files are kept
logger.info("📁 Kept essential files for partition mode:")
for filename in required_files:
file_path = index_dir / filename
if file_path.exists():
size_mb = file_path.stat().st_size / (1024 * 1024)
logger.info(f" - {filename} ({size_mb:.1f} MB)")
def build(self, data: np.ndarray, ids: list[str], index_path: str, **kwargs):
path = Path(index_path)
index_dir = path.parent
@@ -151,6 +221,17 @@ class DiskannBuilder(LeannBackendBuilderInterface):
_write_vectors_to_bin(data, index_dir / data_filename)
build_kwargs = {**self.build_params, **kwargs}
# Extract is_recompute from nested backend_kwargs if needed
is_recompute = build_kwargs.get("is_recompute", False)
if not is_recompute and "backend_kwargs" in build_kwargs:
is_recompute = build_kwargs["backend_kwargs"].get("is_recompute", False)
# Flatten all backend_kwargs parameters to top level for compatibility
if "backend_kwargs" in build_kwargs:
nested_params = build_kwargs.pop("backend_kwargs")
build_kwargs.update(nested_params)
metric_enum = _get_diskann_metrics().get(
build_kwargs.get("distance_metric", "mips").lower()
)
@@ -185,6 +266,30 @@ class DiskannBuilder(LeannBackendBuilderInterface):
build_kwargs.get("pq_disk_bytes", 0),
"",
)
# Auto-partition if is_recompute is enabled
if build_kwargs.get("is_recompute", False):
logger.info("is_recompute=True, starting automatic graph partitioning...")
from .graph_partition import partition_graph
# Partition the index using absolute paths
# Convert to absolute paths to avoid issues with working directory changes
absolute_index_dir = Path(index_dir).resolve()
absolute_index_prefix_path = str(absolute_index_dir / index_prefix)
disk_graph_path, partition_bin_path = partition_graph(
index_prefix_path=absolute_index_prefix_path,
output_dir=str(absolute_index_dir),
partition_prefix=index_prefix,
)
# Safe cleanup: In partition mode, C++ doesn't read _disk.index content
# but still needs the derived files (_medoids.bin, _centroids.bin, etc.)
self._safe_cleanup_after_partition(index_dir, index_prefix)
logger.info("✅ Graph partitioning completed successfully!")
logger.info(f" - Disk graph: {disk_graph_path}")
logger.info(f" - Partition file: {partition_bin_path}")
finally:
temp_data_file = index_dir / data_filename
if temp_data_file.exists():
@@ -213,7 +318,26 @@ class DiskannSearcher(BaseSearcher):
# For DiskANN, we need to reinitialize the index when zmq_port changes
# Store the initialization parameters for later use
full_index_prefix = str(self.index_dir / self.index_path.stem)
# Note: C++ load method expects the BASE path (without _disk.index suffix)
# C++ internally constructs: index_prefix + "_disk.index"
index_name = self.index_path.stem # "simple_test.leann" -> "simple_test"
diskann_index_prefix = str(self.index_dir / index_name) # /path/to/simple_test
full_index_prefix = diskann_index_prefix # /path/to/simple_test (base path)
# Auto-detect partition files and set partition_prefix
partition_graph_file = self.index_dir / f"{index_name}_disk_graph.index"
partition_bin_file = self.index_dir / f"{index_name}_partition.bin"
partition_prefix = ""
if partition_graph_file.exists() and partition_bin_file.exists():
# C++ expects full path prefix, not just filename
partition_prefix = str(self.index_dir / index_name) # /path/to/simple_test
logger.info(
f"✅ Detected partition files, using partition_prefix='{partition_prefix}'"
)
else:
logger.debug("No partition files detected, using standard index files")
self._init_params = {
"metric_enum": metric_enum,
"full_index_prefix": full_index_prefix,
@@ -221,8 +345,14 @@ class DiskannSearcher(BaseSearcher):
"num_nodes_to_cache": kwargs.get("num_nodes_to_cache", 0),
"cache_mechanism": 1,
"pq_prefix": "",
"partition_prefix": "",
"partition_prefix": partition_prefix,
}
# Log partition configuration for debugging
if partition_prefix:
logger.info(
f"✅ Detected partition files, using partition_prefix='{partition_prefix}'"
)
self._diskannpy = diskannpy
self._current_zmq_port = None
self._index = None
@@ -311,9 +441,14 @@ class DiskannSearcher(BaseSearcher):
else: # "global"
use_global_pruning = True
# Perform search with suppressed C++ output based on log level
use_deferred_fetch = kwargs.get("USE_DEFERRED_FETCH", True)
recompute_neighors = False
# Strategy:
# - Traversal always uses PQ distances
# - If recompute_embeddings=True, do a single final rerank via deferred fetch
# (fetch embeddings for the final candidate set only)
# - Do not recompute neighbor distances along the path
use_deferred_fetch = True if recompute_embeddings else False
recompute_neighors = False # Expected typo. For backward compatibility.
with suppress_cpp_output_if_needed():
labels, distances = self._index.batch_search(
query,

View File

@@ -81,7 +81,8 @@ def create_diskann_embedding_server(
with open(passages_file) as f:
meta = json.load(f)
passages = PassageManager(meta["passage_sources"])
logger.info(f"Loading PassageManager with metadata_file_path: {passages_file}")
passages = PassageManager(meta["passage_sources"], metadata_file_path=passages_file)
logger.info(
f"Loaded PassageManager with {len(passages.global_offset_map)} passages from metadata"
)
@@ -102,8 +103,9 @@ def create_diskann_embedding_server(
socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}")
socket.setsockopt(zmq.RCVTIMEO, 300000)
socket.setsockopt(zmq.SNDTIMEO, 300000)
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.SNDTIMEO, 1000)
socket.setsockopt(zmq.LINGER, 0)
while True:
try:
@@ -220,30 +222,217 @@ def create_diskann_embedding_server(
traceback.print_exc()
raise
zmq_thread = threading.Thread(target=zmq_server_thread, daemon=True)
def zmq_server_thread_with_shutdown(shutdown_event):
"""ZMQ server thread that respects shutdown signal.
This creates its own REP socket, binds to zmq_port, and periodically
checks shutdown_event using recv timeouts to exit cleanly.
"""
logger.info("DiskANN ZMQ server thread started with shutdown support")
context = zmq.Context()
rep_socket = context.socket(zmq.REP)
rep_socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"DiskANN ZMQ REP server listening on port {zmq_port}")
# Set receive timeout so we can check shutdown_event periodically
rep_socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout
rep_socket.setsockopt(zmq.SNDTIMEO, 1000)
rep_socket.setsockopt(zmq.LINGER, 0)
try:
while not shutdown_event.is_set():
try:
e2e_start = time.time()
# REP socket receives single-part messages
message = rep_socket.recv()
# Check for empty messages - REP socket requires response to every request
if not message:
logger.warning("Received empty message, sending empty response")
rep_socket.send(b"")
continue
# Try protobuf first (same logic as original)
texts = []
is_text_request = False
try:
req_proto = embedding_pb2.NodeEmbeddingRequest()
req_proto.ParseFromString(message)
node_ids = list(req_proto.node_ids)
# Look up texts by node IDs
for nid in node_ids:
try:
passage_data = passages.get_passage(str(nid))
txt = passage_data["text"]
if not txt:
raise RuntimeError(f"FATAL: Empty text for passage ID {nid}")
texts.append(txt)
except KeyError:
raise RuntimeError(f"FATAL: Passage with ID {nid} not found")
logger.info(f"ZMQ received protobuf request for {len(node_ids)} node IDs")
except Exception:
# Fallback to msgpack for text requests
try:
import msgpack
request = msgpack.unpackb(message)
if isinstance(request, list) and all(
isinstance(item, str) for item in request
):
texts = request
is_text_request = True
logger.info(
f"ZMQ received msgpack text request for {len(texts)} texts"
)
else:
raise ValueError("Not a valid msgpack text request")
except Exception:
logger.error("Both protobuf and msgpack parsing failed!")
# Send error response
resp_proto = embedding_pb2.NodeEmbeddingResponse()
rep_socket.send(resp_proto.SerializeToString())
continue
# Process the request
embeddings = compute_embeddings(texts, model_name, mode=embedding_mode)
logger.info(f"Computed embeddings shape: {embeddings.shape}")
# Validation
if np.isnan(embeddings).any() or np.isinf(embeddings).any():
logger.error("NaN or Inf detected in embeddings!")
# Send error response
if is_text_request:
import msgpack
response_data = msgpack.packb([])
else:
resp_proto = embedding_pb2.NodeEmbeddingResponse()
response_data = resp_proto.SerializeToString()
rep_socket.send(response_data)
continue
# Prepare response based on request type
if is_text_request:
# For direct text requests, return msgpack
import msgpack
response_data = msgpack.packb(embeddings.tolist())
else:
# For protobuf requests, return protobuf
resp_proto = embedding_pb2.NodeEmbeddingResponse()
hidden_contiguous = np.ascontiguousarray(embeddings, dtype=np.float32)
resp_proto.embeddings_data = hidden_contiguous.tobytes()
resp_proto.dimensions.append(hidden_contiguous.shape[0])
resp_proto.dimensions.append(hidden_contiguous.shape[1])
response_data = resp_proto.SerializeToString()
# Send response back to the client
rep_socket.send(response_data)
e2e_end = time.time()
logger.info(f"⏱️ ZMQ E2E time: {e2e_end - e2e_start:.6f}s")
except zmq.Again:
# Timeout - check shutdown_event and continue
continue
except Exception as e:
if not shutdown_event.is_set():
logger.error(f"Error in ZMQ server loop: {e}")
try:
# Send error response for REP socket
resp_proto = embedding_pb2.NodeEmbeddingResponse()
rep_socket.send(resp_proto.SerializeToString())
except Exception:
pass
else:
logger.info("Shutdown in progress, ignoring ZMQ error")
break
finally:
try:
rep_socket.close(0)
except Exception:
pass
try:
context.term()
except Exception:
pass
logger.info("DiskANN ZMQ server thread exiting gracefully")
# Add shutdown coordination
shutdown_event = threading.Event()
def shutdown_zmq_server():
"""Gracefully shutdown ZMQ server."""
logger.info("Initiating graceful shutdown...")
shutdown_event.set()
if zmq_thread.is_alive():
logger.info("Waiting for ZMQ thread to finish...")
zmq_thread.join(timeout=5)
if zmq_thread.is_alive():
logger.warning("ZMQ thread did not finish in time")
# Clean up ZMQ resources
try:
# Note: socket and context are cleaned up by thread exit
logger.info("ZMQ resources cleaned up")
except Exception as e:
logger.warning(f"Error cleaning ZMQ resources: {e}")
# Clean up other resources
try:
import gc
gc.collect()
logger.info("Additional resources cleaned up")
except Exception as e:
logger.warning(f"Error cleaning additional resources: {e}")
logger.info("Graceful shutdown completed")
sys.exit(0)
# Register signal handlers within this function scope
import signal
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down gracefully...")
shutdown_zmq_server()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Start ZMQ thread (NOT daemon!)
zmq_thread = threading.Thread(
target=lambda: zmq_server_thread_with_shutdown(shutdown_event),
daemon=False, # Not daemon - we want to wait for it
)
zmq_thread.start()
logger.info(f"Started DiskANN ZMQ server thread on port {zmq_port}")
# Keep the main thread alive
try:
while True:
time.sleep(1)
while not shutdown_event.is_set():
time.sleep(0.1) # Check shutdown more frequently
except KeyboardInterrupt:
logger.info("DiskANN Server shutting down...")
shutdown_zmq_server()
return
# If we reach here, shutdown was triggered by signal
logger.info("Main loop exited, process should be shutting down")
if __name__ == "__main__":
import signal
import sys
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down gracefully...")
sys.exit(0)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Signal handlers are now registered within create_diskann_embedding_server
parser = argparse.ArgumentParser(description="DiskANN Embedding service")
parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on")

View File

@@ -0,0 +1,299 @@
#!/usr/bin/env python3
"""
Graph Partition Module for LEANN DiskANN Backend
This module provides Python bindings for the graph partition functionality
of DiskANN, allowing users to partition disk-based indices for better
performance.
"""
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
class GraphPartitioner:
"""
A Python interface for DiskANN's graph partition functionality.
This class provides methods to partition disk-based indices for improved
search performance and memory efficiency.
"""
def __init__(self, build_type: str = "release"):
"""
Initialize the GraphPartitioner.
Args:
build_type: Build type for the executables ("debug" or "release")
"""
self.build_type = build_type
self._ensure_executables()
def _get_executable_path(self, name: str) -> str:
"""Get the path to a graph partition executable."""
# Get the directory where this Python module is located
module_dir = Path(__file__).parent
# Navigate to the graph_partition directory
graph_partition_dir = module_dir.parent / "third_party" / "DiskANN" / "graph_partition"
executable_path = graph_partition_dir / "build" / self.build_type / "graph_partition" / name
if not executable_path.exists():
raise FileNotFoundError(f"Executable {name} not found at {executable_path}")
return str(executable_path)
def _ensure_executables(self):
"""Ensure that the required executables are built."""
try:
self._get_executable_path("partitioner")
self._get_executable_path("index_relayout")
except FileNotFoundError:
# Try to build the executables automatically
print("Executables not found, attempting to build them...")
self._build_executables()
def _build_executables(self):
"""Build the required executables."""
graph_partition_dir = (
Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition"
)
original_dir = os.getcwd()
try:
os.chdir(graph_partition_dir)
# Clean any existing build
if (graph_partition_dir / "build").exists():
shutil.rmtree(graph_partition_dir / "build")
# Run the build script
cmd = ["./build.sh", self.build_type, "split_graph", "/tmp/dummy"]
subprocess.run(cmd, capture_output=True, text=True, cwd=graph_partition_dir)
# Check if executables were created
partitioner_path = self._get_executable_path("partitioner")
relayout_path = self._get_executable_path("index_relayout")
print(f"✅ Built partitioner: {partitioner_path}")
print(f"✅ Built index_relayout: {relayout_path}")
except Exception as e:
raise RuntimeError(f"Failed to build executables: {e}")
finally:
os.chdir(original_dir)
def partition_graph(
self,
index_prefix_path: str,
output_dir: Optional[str] = None,
partition_prefix: Optional[str] = None,
**kwargs,
) -> tuple[str, str]:
"""
Partition a disk-based index for improved performance.
Args:
index_prefix_path: Path to the index prefix (e.g., "/path/to/index")
output_dir: Output directory for results (defaults to parent of index_prefix_path)
partition_prefix: Prefix for output files (defaults to basename of index_prefix_path)
**kwargs: Additional parameters for graph partitioning:
- gp_times: Number of LDG partition iterations (default: 10)
- lock_nums: Number of lock nodes (default: 10)
- cut: Cut adjacency list degree (default: 100)
- scale_factor: Scale factor (default: 1)
- data_type: Data type (default: "float")
- thread_nums: Number of threads (default: 10)
Returns:
Tuple of (disk_graph_index_path, partition_bin_path)
Raises:
RuntimeError: If the partitioning process fails
"""
# Set default parameters
params = {
"gp_times": 10,
"lock_nums": 10,
"cut": 100,
"scale_factor": 1,
"data_type": "float",
"thread_nums": 10,
**kwargs,
}
# Determine output directory
if output_dir is None:
output_dir = str(Path(index_prefix_path).parent)
# Create output directory if it doesn't exist
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Determine partition prefix
if partition_prefix is None:
partition_prefix = Path(index_prefix_path).name
# Get executable paths
partitioner_path = self._get_executable_path("partitioner")
relayout_path = self._get_executable_path("index_relayout")
# Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir:
# Change to the graph_partition directory for temporary files
graph_partition_dir = (
Path(__file__).parent.parent / "third_party" / "DiskANN" / "graph_partition"
)
original_dir = os.getcwd()
try:
os.chdir(graph_partition_dir)
# Create temporary data directory
temp_data_dir = Path(temp_dir) / "data"
temp_data_dir.mkdir(parents=True, exist_ok=True)
# Set up paths for temporary files
graph_path = temp_data_dir / "starling" / "_M_R_L_B" / "GRAPH"
graph_gp_path = (
graph_path
/ f"GP_TIMES_{params['gp_times']}_LOCK_{params['lock_nums']}_GP_USE_FREQ0_CUT{params['cut']}_SCALE{params['scale_factor']}"
)
graph_gp_path.mkdir(parents=True, exist_ok=True)
# Find input index file
old_index_file = f"{index_prefix_path}_disk_beam_search.index"
if not os.path.exists(old_index_file):
old_index_file = f"{index_prefix_path}_disk.index"
if not os.path.exists(old_index_file):
raise RuntimeError(f"Index file not found: {old_index_file}")
# Run partitioner
gp_file_path = graph_gp_path / "_part.bin"
partitioner_cmd = [
partitioner_path,
"--index_file",
old_index_file,
"--data_type",
params["data_type"],
"--gp_file",
str(gp_file_path),
"-T",
str(params["thread_nums"]),
"--ldg_times",
str(params["gp_times"]),
"--scale",
str(params["scale_factor"]),
"--mode",
"1",
]
print(f"Running partitioner: {' '.join(partitioner_cmd)}")
result = subprocess.run(
partitioner_cmd, capture_output=True, text=True, cwd=graph_partition_dir
)
if result.returncode != 0:
raise RuntimeError(
f"Partitioner failed with return code {result.returncode}.\n"
f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}"
)
# Run relayout
part_tmp_index = graph_gp_path / "_part_tmp.index"
relayout_cmd = [
relayout_path,
old_index_file,
str(gp_file_path),
params["data_type"],
"1",
]
print(f"Running relayout: {' '.join(relayout_cmd)}")
result = subprocess.run(
relayout_cmd, capture_output=True, text=True, cwd=graph_partition_dir
)
if result.returncode != 0:
raise RuntimeError(
f"Relayout failed with return code {result.returncode}.\n"
f"stdout: {result.stdout}\n"
f"stderr: {result.stderr}"
)
# Copy results to output directory
disk_graph_path = Path(output_dir) / f"{partition_prefix}_disk_graph.index"
partition_bin_path = Path(output_dir) / f"{partition_prefix}_partition.bin"
shutil.copy2(part_tmp_index, disk_graph_path)
shutil.copy2(gp_file_path, partition_bin_path)
print(f"Results copied to: {output_dir}")
return str(disk_graph_path), str(partition_bin_path)
finally:
os.chdir(original_dir)
def get_partition_info(self, partition_bin_path: str) -> dict:
"""
Get information about a partition file.
Args:
partition_bin_path: Path to the partition binary file
Returns:
Dictionary containing partition information
"""
if not os.path.exists(partition_bin_path):
raise FileNotFoundError(f"Partition file not found: {partition_bin_path}")
# For now, return basic file information
# In the future, this could parse the binary file for detailed info
stat = os.stat(partition_bin_path)
return {
"file_size": stat.st_size,
"file_path": partition_bin_path,
"modified_time": stat.st_mtime,
}
def partition_graph(
index_prefix_path: str,
output_dir: Optional[str] = None,
partition_prefix: Optional[str] = None,
build_type: str = "release",
**kwargs,
) -> tuple[str, str]:
"""
Convenience function to partition a graph index.
Args:
index_prefix_path: Path to the index prefix
output_dir: Output directory (defaults to parent of index_prefix_path)
partition_prefix: Prefix for output files (defaults to basename of index_prefix_path)
build_type: Build type for executables ("debug" or "release")
**kwargs: Additional parameters for graph partitioning
Returns:
Tuple of (disk_graph_index_path, partition_bin_path)
"""
partitioner = GraphPartitioner(build_type=build_type)
return partitioner.partition_graph(index_prefix_path, output_dir, partition_prefix, **kwargs)
# Example usage:
if __name__ == "__main__":
# Example: partition an index
try:
disk_graph_path, partition_bin_path = partition_graph(
"/path/to/your/index_prefix", gp_times=10, lock_nums=10, cut=100
)
print("Partitioning completed successfully!")
print(f"Disk graph index: {disk_graph_path}")
print(f"Partition binary: {partition_bin_path}")
except Exception as e:
print(f"Partitioning failed: {e}")

View File

@@ -4,8 +4,8 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-diskann"
version = "0.2.7"
dependencies = ["leann-core==0.2.7", "numpy", "protobuf>=3.19.0"]
version = "0.2.9"
dependencies = ["leann-core==0.2.9", "numpy", "protobuf>=3.19.0"]
[tool.scikit-build]
# Key: simplified CMake path

View File

@@ -1,5 +1,6 @@
import argparse
import gc # Import garbage collector interface
import logging
import os
import struct
import sys
@@ -7,6 +8,12 @@ import time
import numpy as np
# Set up logging to avoid print buffer issues
logger = logging.getLogger(__name__)
LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper()
log_level = getattr(logging, LOG_LEVEL, logging.WARNING)
logger.setLevel(log_level)
# --- FourCCs (add more if needed) ---
INDEX_HNSW_FLAT_FOURCC = int.from_bytes(b"IHNf", "little")
# Add other HNSW fourccs if you expect different storage types inside HNSW
@@ -243,6 +250,8 @@ def convert_hnsw_graph_to_csr(input_filename, output_filename, prune_embeddings=
output_filename: Output CSR index file
prune_embeddings: Whether to prune embedding storage (write NULL storage marker)
"""
# Keep prints simple; rely on CI runner to flush output as needed
print(f"Starting conversion: {input_filename} -> {output_filename}")
start_time = time.time()
original_hnsw_data = {}

View File

@@ -54,12 +54,13 @@ class HNSWBuilder(LeannBackendBuilderInterface):
self.efConstruction = self.build_params.setdefault("efConstruction", 200)
self.distance_metric = self.build_params.setdefault("distance_metric", "mips")
self.dimensions = self.build_params.get("dimensions")
if not self.is_recompute:
if self.is_compact:
# TODO: support this case @andy
raise ValueError(
"is_recompute is False, but is_compact is True. This is not compatible now. change is compact to False and you can use the original HNSW index."
)
if not self.is_recompute and self.is_compact:
# Auto-correct: non-recompute requires non-compact storage for HNSW
logger.warning(
"is_recompute=False requires non-compact HNSW. Forcing is_compact=False."
)
self.is_compact = False
self.build_params["is_compact"] = False
def build(self, data: np.ndarray, ids: list[str], index_path: str, **kwargs):
from . import faiss # type: ignore
@@ -184,9 +185,11 @@ class HNSWSearcher(BaseSearcher):
"""
from . import faiss # type: ignore
if not recompute_embeddings:
if self.is_pruned:
raise RuntimeError("Recompute is required for pruned index.")
if not recompute_embeddings and self.is_pruned:
raise RuntimeError(
"Recompute is required for pruned/compact HNSW index. "
"Re-run search with --recompute, or rebuild with --no-recompute and --no-compact."
)
if recompute_embeddings:
if zmq_port is None:
raise ValueError("zmq_port must be provided if recompute_embeddings is True")

View File

@@ -10,7 +10,7 @@ import sys
import threading
import time
from pathlib import Path
from typing import Union
from typing import Optional
import msgpack
import numpy as np
@@ -34,7 +34,7 @@ if not logger.handlers:
def create_hnsw_embedding_server(
passages_file: Union[str, None] = None,
passages_file: Optional[str] = None,
zmq_port: int = 5555,
model_name: str = "sentence-transformers/all-mpnet-base-v2",
distance_metric: str = "mips",
@@ -82,199 +82,317 @@ def create_hnsw_embedding_server(
with open(passages_file) as f:
meta = json.load(f)
# Convert relative paths to absolute paths based on metadata file location
metadata_dir = Path(passages_file).parent.parent # Go up one level from the metadata file
passage_sources = []
for source in meta["passage_sources"]:
source_copy = source.copy()
# Convert relative paths to absolute paths
if not Path(source_copy["path"]).is_absolute():
source_copy["path"] = str(metadata_dir / source_copy["path"])
if not Path(source_copy["index_path"]).is_absolute():
source_copy["index_path"] = str(metadata_dir / source_copy["index_path"])
passage_sources.append(source_copy)
passages = PassageManager(passage_sources)
# Let PassageManager handle path resolution uniformly. It supports fallback order:
# 1) path/index_path; 2) *_relative; 3) standard siblings next to meta
passages = PassageManager(meta["passage_sources"], metadata_file_path=passages_file)
# Dimension from metadata for shaping responses
try:
embedding_dim: int = int(meta.get("dimensions", 0))
except Exception:
embedding_dim = 0
logger.info(
f"Loaded PassageManager with {len(passages.global_offset_map)} passages from metadata"
)
def zmq_server_thread():
"""ZMQ server thread"""
# (legacy ZMQ thread removed; using shutdown-capable server only)
def zmq_server_thread_with_shutdown(shutdown_event):
"""ZMQ server thread that respects shutdown signal.
Creates its own REP socket bound to zmq_port and polls with timeouts
to allow graceful shutdown.
"""
logger.info("ZMQ server thread started with shutdown support")
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"HNSW ZMQ server listening on port {zmq_port}")
rep_socket = context.socket(zmq.REP)
rep_socket.bind(f"tcp://*:{zmq_port}")
logger.info(f"HNSW ZMQ REP server listening on port {zmq_port}")
rep_socket.setsockopt(zmq.RCVTIMEO, 1000)
# Keep sends from blocking during shutdown; fail fast and drop on close
rep_socket.setsockopt(zmq.SNDTIMEO, 1000)
rep_socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.RCVTIMEO, 300000)
socket.setsockopt(zmq.SNDTIMEO, 300000)
# Track last request type/length for shape-correct fallbacks
last_request_type = "unknown" # 'text' | 'distance' | 'embedding' | 'unknown'
last_request_length = 0
while True:
try:
message_bytes = socket.recv()
logger.debug(f"Received ZMQ request of size {len(message_bytes)} bytes")
try:
while not shutdown_event.is_set():
try:
e2e_start = time.time()
logger.debug("🔍 Waiting for ZMQ message...")
request_bytes = rep_socket.recv()
e2e_start = time.time()
request_payload = msgpack.unpackb(message_bytes)
# Rest of the processing logic (same as original)
request = msgpack.unpackb(request_bytes)
# Handle direct text embedding request
if isinstance(request_payload, list) and len(request_payload) > 0:
# Check if this is a direct text request (list of strings)
if all(isinstance(item, str) for item in request_payload):
logger.info(
f"Processing direct text embedding request for {len(request_payload)} texts in {embedding_mode} mode"
)
if len(request) == 1 and request[0] == "__QUERY_MODEL__":
response_bytes = msgpack.packb([model_name])
rep_socket.send(response_bytes)
continue
# Use unified embedding computation (now with model caching)
embeddings = compute_embeddings(
request_payload, model_name, mode=embedding_mode
)
response = embeddings.tolist()
socket.send(msgpack.packb(response))
# Handle direct text embedding request
if (
isinstance(request, list)
and request
and all(isinstance(item, str) for item in request)
):
last_request_type = "text"
last_request_length = len(request)
embeddings = compute_embeddings(request, model_name, mode=embedding_mode)
rep_socket.send(msgpack.packb(embeddings.tolist()))
e2e_end = time.time()
logger.info(f"⏱️ Text embedding E2E time: {e2e_end - e2e_start:.6f}s")
continue
# Handle distance calculation requests
if (
isinstance(request_payload, list)
and len(request_payload) == 2
and isinstance(request_payload[0], list)
and isinstance(request_payload[1], list)
):
node_ids = request_payload[0]
query_vector = np.array(request_payload[1], dtype=np.float32)
# Handle distance calculation request: [[ids], [query_vector]]
if (
isinstance(request, list)
and len(request) == 2
and isinstance(request[0], list)
and isinstance(request[1], list)
):
node_ids = request[0]
# Handle nested [[ids]] shape defensively
if len(node_ids) == 1 and isinstance(node_ids[0], list):
node_ids = node_ids[0]
query_vector = np.array(request[1], dtype=np.float32)
last_request_type = "distance"
last_request_length = len(node_ids)
logger.debug("Distance calculation request received")
logger.debug(f" Node IDs: {node_ids}")
logger.debug(f" Query vector dim: {len(query_vector)}")
logger.debug("Distance calculation request received")
logger.debug(f" Node IDs: {node_ids}")
logger.debug(f" Query vector dim: {len(query_vector)}")
# Get embeddings for node IDs
texts = []
for nid in node_ids:
# Gather texts for found ids
texts: list[str] = []
found_indices: list[int] = []
for idx, nid in enumerate(node_ids):
try:
passage_data = passages.get_passage(str(nid))
txt = passage_data.get("text", "")
if isinstance(txt, str) and len(txt) > 0:
texts.append(txt)
found_indices.append(idx)
else:
logger.error(f"Empty text for passage ID {nid}")
except KeyError:
logger.error(f"Passage ID {nid} not found")
except Exception as e:
logger.error(f"Exception looking up passage ID {nid}: {e}")
# Prepare full-length response with large sentinel values
large_distance = 1e9
response_distances = [large_distance] * len(node_ids)
if texts:
try:
embeddings = compute_embeddings(
texts, model_name, mode=embedding_mode
)
logger.info(
f"Computed embeddings for {len(texts)} texts, shape: {embeddings.shape}"
)
if distance_metric == "l2":
partial = np.sum(
np.square(embeddings - query_vector.reshape(1, -1)), axis=1
)
else: # mips or cosine
partial = -np.dot(embeddings, query_vector)
for pos, dval in zip(found_indices, partial.flatten().tolist()):
response_distances[pos] = float(dval)
except Exception as e:
logger.error(f"Distance computation error, using sentinels: {e}")
# Send response in expected shape [[distances]]
rep_socket.send(msgpack.packb([response_distances], use_single_float=True))
e2e_end = time.time()
logger.info(f"⏱️ Distance calculation E2E time: {e2e_end - e2e_start:.6f}s")
continue
# Fallback: treat as embedding-by-id request
if (
isinstance(request, list)
and len(request) == 1
and isinstance(request[0], list)
):
node_ids = request[0]
elif isinstance(request, list):
node_ids = request
else:
node_ids = []
last_request_type = "embedding"
last_request_length = len(node_ids)
logger.info(f"ZMQ received {len(node_ids)} node IDs for embedding fetch")
# Preallocate zero-filled flat data for robustness
if embedding_dim <= 0:
dims = [0, 0]
flat_data: list[float] = []
else:
dims = [len(node_ids), embedding_dim]
flat_data = [0.0] * (dims[0] * dims[1])
# Collect texts for found ids
texts: list[str] = []
found_indices: list[int] = []
for idx, nid in enumerate(node_ids):
try:
passage_data = passages.get_passage(str(nid))
txt = passage_data["text"]
texts.append(txt)
txt = passage_data.get("text", "")
if isinstance(txt, str) and len(txt) > 0:
texts.append(txt)
found_indices.append(idx)
else:
logger.error(f"Empty text for passage ID {nid}")
except KeyError:
logger.error(f"Passage ID {nid} not found")
raise RuntimeError(f"FATAL: Passage with ID {nid} not found")
logger.error(f"Passage with ID {nid} not found")
except Exception as e:
logger.error(f"Exception looking up passage ID {nid}: {e}")
raise
# Process embeddings
embeddings = compute_embeddings(texts, model_name, mode=embedding_mode)
logger.info(
f"Computed embeddings for {len(texts)} texts, shape: {embeddings.shape}"
)
if texts:
try:
embeddings = compute_embeddings(texts, model_name, mode=embedding_mode)
logger.info(
f"Computed embeddings for {len(texts)} texts, shape: {embeddings.shape}"
)
# Calculate distances
if distance_metric == "l2":
distances = np.sum(
np.square(embeddings - query_vector.reshape(1, -1)), axis=1
)
else: # mips or cosine
distances = -np.dot(embeddings, query_vector)
if np.isnan(embeddings).any() or np.isinf(embeddings).any():
logger.error(
f"NaN or Inf detected in embeddings! Requested IDs: {node_ids[:5]}..."
)
dims = [0, embedding_dim]
flat_data = []
else:
emb_f32 = np.ascontiguousarray(embeddings, dtype=np.float32)
flat = emb_f32.flatten().tolist()
for j, pos in enumerate(found_indices):
start = pos * embedding_dim
end = start + embedding_dim
if end <= len(flat_data):
flat_data[start:end] = flat[
j * embedding_dim : (j + 1) * embedding_dim
]
except Exception as e:
logger.error(f"Embedding computation error, returning zeros: {e}")
response_payload = distances.flatten().tolist()
response_bytes = msgpack.packb([response_payload], use_single_float=True)
logger.debug(f"Sending distance response with {len(distances)} distances")
response_payload = [dims, flat_data]
response_bytes = msgpack.packb(response_payload, use_single_float=True)
socket.send(response_bytes)
rep_socket.send(response_bytes)
e2e_end = time.time()
logger.info(f"⏱️ Distance calculation E2E time: {e2e_end - e2e_start:.6f}s")
logger.info(f"⏱️ ZMQ E2E time: {e2e_end - e2e_start:.6f}s")
except zmq.Again:
# Timeout - check shutdown_event and continue
continue
except Exception as e:
if not shutdown_event.is_set():
logger.error(f"Error in ZMQ server loop: {e}")
# Shape-correct fallback
try:
if last_request_type == "distance":
large_distance = 1e9
fallback_len = max(0, int(last_request_length))
safe = [[large_distance] * fallback_len]
elif last_request_type == "embedding":
bsz = max(0, int(last_request_length))
dim = max(0, int(embedding_dim))
safe = (
[[bsz, dim], [0.0] * (bsz * dim)] if dim > 0 else [[0, 0], []]
)
elif last_request_type == "text":
safe = [] # direct text embeddings expectation is a flat list
else:
safe = [[0, int(embedding_dim) if embedding_dim > 0 else 0], []]
rep_socket.send(msgpack.packb(safe, use_single_float=True))
except Exception:
pass
else:
logger.info("Shutdown in progress, ignoring ZMQ error")
break
finally:
try:
rep_socket.close(0)
except Exception:
pass
try:
context.term()
except Exception:
pass
# Standard embedding request (passage ID lookup)
if (
not isinstance(request_payload, list)
or len(request_payload) != 1
or not isinstance(request_payload[0], list)
):
logger.error(
f"Invalid MessagePack request format. Expected [[ids...]] or [texts...], got: {type(request_payload)}"
)
socket.send(msgpack.packb([[], []]))
continue
logger.info("ZMQ server thread exiting gracefully")
node_ids = request_payload[0]
logger.debug(f"Request for {len(node_ids)} node embeddings")
# Add shutdown coordination
shutdown_event = threading.Event()
# Look up texts by node IDs
texts = []
for nid in node_ids:
try:
passage_data = passages.get_passage(str(nid))
txt = passage_data["text"]
if not txt:
raise RuntimeError(f"FATAL: Empty text for passage ID {nid}")
texts.append(txt)
except KeyError:
raise RuntimeError(f"FATAL: Passage with ID {nid} not found")
except Exception as e:
logger.error(f"Exception looking up passage ID {nid}: {e}")
raise
def shutdown_zmq_server():
"""Gracefully shutdown ZMQ server."""
logger.info("Initiating graceful shutdown...")
shutdown_event.set()
# Process embeddings
embeddings = compute_embeddings(texts, model_name, mode=embedding_mode)
logger.info(
f"Computed embeddings for {len(texts)} texts, shape: {embeddings.shape}"
)
if zmq_thread.is_alive():
logger.info("Waiting for ZMQ thread to finish...")
zmq_thread.join(timeout=5)
if zmq_thread.is_alive():
logger.warning("ZMQ thread did not finish in time")
# Serialization and response
if np.isnan(embeddings).any() or np.isinf(embeddings).any():
logger.error(
f"NaN or Inf detected in embeddings! Requested IDs: {node_ids[:5]}..."
)
raise AssertionError()
# Clean up ZMQ resources
try:
# Note: socket and context are cleaned up by thread exit
logger.info("ZMQ resources cleaned up")
except Exception as e:
logger.warning(f"Error cleaning ZMQ resources: {e}")
hidden_contiguous_f32 = np.ascontiguousarray(embeddings, dtype=np.float32)
response_payload = [
list(hidden_contiguous_f32.shape),
hidden_contiguous_f32.flatten().tolist(),
]
response_bytes = msgpack.packb(response_payload, use_single_float=True)
# Clean up other resources
try:
import gc
socket.send(response_bytes)
e2e_end = time.time()
logger.info(f"⏱️ ZMQ E2E time: {e2e_end - e2e_start:.6f}s")
gc.collect()
logger.info("Additional resources cleaned up")
except Exception as e:
logger.warning(f"Error cleaning additional resources: {e}")
except zmq.Again:
logger.debug("ZMQ socket timeout, continuing to listen")
continue
except Exception as e:
logger.error(f"Error in ZMQ server loop: {e}")
import traceback
logger.info("Graceful shutdown completed")
sys.exit(0)
traceback.print_exc()
socket.send(msgpack.packb([[], []]))
# Register signal handlers within this function scope
import signal
zmq_thread = threading.Thread(target=zmq_server_thread, daemon=True)
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down gracefully...")
shutdown_zmq_server()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Pass shutdown_event to ZMQ thread
zmq_thread = threading.Thread(
target=lambda: zmq_server_thread_with_shutdown(shutdown_event),
daemon=False, # Not daemon - we want to wait for it
)
zmq_thread.start()
logger.info(f"Started HNSW ZMQ server thread on port {zmq_port}")
# Keep the main thread alive
try:
while True:
time.sleep(1)
while not shutdown_event.is_set():
time.sleep(0.1) # Check shutdown more frequently
except KeyboardInterrupt:
logger.info("HNSW Server shutting down...")
shutdown_zmq_server()
return
# If we reach here, shutdown was triggered by signal
logger.info("Main loop exited, process should be shutting down")
if __name__ == "__main__":
import signal
import sys
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down gracefully...")
sys.exit(0)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Signal handlers are now registered within create_hnsw_embedding_server
parser = argparse.ArgumentParser(description="HNSW Embedding service")
parser.add_argument("--zmq-port", type=int, default=5555, help="ZMQ port to run on")

View File

@@ -6,10 +6,10 @@ build-backend = "scikit_build_core.build"
[project]
name = "leann-backend-hnsw"
version = "0.2.7"
version = "0.2.9"
description = "Custom-built HNSW (Faiss) backend for the Leann toolkit."
dependencies = [
"leann-core==0.2.7",
"leann-core==0.2.9",
"numpy",
"pyzmq>=23.0.0",
"msgpack>=1.0.0",

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann-core"
version = "0.2.7"
version = "0.2.9"
description = "Core API and plugin system for LEANN"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -115,20 +115,62 @@ class SearchResult:
class PassageManager:
def __init__(self, passage_sources: list[dict[str, Any]]):
def __init__(
self, passage_sources: list[dict[str, Any]], metadata_file_path: Optional[str] = None
):
self.offset_maps = {}
self.passage_files = {}
self.global_offset_map = {} # Combined map for fast lookup
# Derive index base name for standard sibling fallbacks, e.g., <index_name>.passages.*
index_name_base = None
if metadata_file_path:
meta_name = Path(metadata_file_path).name
if meta_name.endswith(".meta.json"):
index_name_base = meta_name[: -len(".meta.json")]
for source in passage_sources:
assert source["type"] == "jsonl", "only jsonl is supported"
passage_file = source["path"]
index_file = source["index_path"] # .idx file
passage_file = source.get("path", "")
index_file = source.get("index_path", "") # .idx file
# Fix path resolution for Colab and other environments
if not Path(index_file).is_absolute():
# If relative path, try to resolve it properly
index_file = str(Path(index_file).resolve())
# Fix path resolution - relative paths should be relative to metadata file directory
def _resolve_candidates(
primary: str,
relative_key: str,
default_name: Optional[str],
source_dict: dict[str, Any],
) -> list[Path]:
candidates: list[Path] = []
# 1) Primary as-is (absolute or relative)
if primary:
p = Path(primary)
candidates.append(p if p.is_absolute() else (Path.cwd() / p))
# 2) metadata-relative explicit relative key
if metadata_file_path and source_dict.get(relative_key):
candidates.append(Path(metadata_file_path).parent / source_dict[relative_key])
# 3) metadata-relative standard sibling filename
if metadata_file_path and default_name:
candidates.append(Path(metadata_file_path).parent / default_name)
return candidates
# Build candidate lists and pick first existing; otherwise keep last candidate for error message
idx_default = f"{index_name_base}.passages.idx" if index_name_base else None
idx_candidates = _resolve_candidates(
index_file, "index_path_relative", idx_default, source
)
pas_default = f"{index_name_base}.passages.jsonl" if index_name_base else None
pas_candidates = _resolve_candidates(passage_file, "path_relative", pas_default, source)
def _pick_existing(cands: list[Path]) -> str:
for c in cands:
if c.exists():
return str(c.resolve())
# Fallback to last candidate (best guess) even if not exists; will error below
return str(cands[-1].resolve()) if cands else ""
index_file = _pick_existing(idx_candidates)
passage_file = _pick_existing(pas_candidates)
if not Path(index_file).exists():
raise FileNotFoundError(f"Passage index file not found: {index_file}")
@@ -162,6 +204,18 @@ class LeannBuilder:
**backend_kwargs,
):
self.backend_name = backend_name
# Normalize incompatible combinations early (for consistent metadata)
if backend_name == "hnsw":
is_recompute = backend_kwargs.get("is_recompute", True)
is_compact = backend_kwargs.get("is_compact", True)
if is_recompute is False and is_compact is True:
warnings.warn(
"HNSW with is_recompute=False requires non-compact storage. Forcing is_compact=False.",
UserWarning,
stacklevel=2,
)
backend_kwargs["is_compact"] = False
backend_factory: Optional[LeannBackendFactoryInterface] = BACKEND_REGISTRY.get(backend_name)
if backend_factory is None:
raise ValueError(f"Backend '{backend_name}' not found or not registered.")
@@ -314,8 +368,12 @@ class LeannBuilder:
"passage_sources": [
{
"type": "jsonl",
"path": str(passages_file),
"index_path": str(offset_file),
# Preserve existing relative file names (backward-compatible)
"path": passages_file.name,
"index_path": offset_file.name,
# Add optional redundant relative keys for remote build portability (non-breaking)
"path_relative": passages_file.name,
"index_path_relative": offset_file.name,
}
],
}
@@ -430,8 +488,12 @@ class LeannBuilder:
"passage_sources": [
{
"type": "jsonl",
"path": str(passages_file),
"index_path": str(offset_file),
# Preserve existing relative file names (backward-compatible)
"path": passages_file.name,
"index_path": offset_file.name,
# Add optional redundant relative keys for remote build portability (non-breaking)
"path_relative": passages_file.name,
"index_path_relative": offset_file.name,
}
],
"built_from_precomputed_embeddings": True,
@@ -473,7 +535,10 @@ class LeannSearcher:
self.embedding_model = self.meta_data["embedding_model"]
# Support both old and new format
self.embedding_mode = self.meta_data.get("embedding_mode", "sentence-transformers")
self.passage_manager = PassageManager(self.meta_data.get("passage_sources", []))
# Delegate portability handling to PassageManager
self.passage_manager = PassageManager(
self.meta_data.get("passage_sources", []), metadata_file_path=self.meta_path_str
)
backend_factory = BACKEND_REGISTRY.get(backend_name)
if backend_factory is None:
raise ValueError(f"Backend '{backend_name}' not found.")
@@ -546,13 +611,13 @@ class LeannSearcher:
zmq_port=zmq_port,
**kwargs,
)
time.time() - start_time
# logger.info(f" Search time: {search_time} seconds")
logger.info(f" Backend returned: labels={len(results.get('labels', [[]])[0])} results")
enriched_results = []
if "labels" in results and "distances" in results:
logger.info(f" Processing {len(results['labels'][0])} passage IDs:")
# Python 3.9 does not support zip(strict=...); lengths are expected to match
for i, (string_id, dist) in enumerate(
zip(results["labels"][0], results["distances"][0])
):
@@ -580,13 +645,43 @@ class LeannSearcher:
)
except KeyError:
RED = "\033[91m"
RESET = "\033[0m"
logger.error(
f" {RED}{RESET} [{i + 1:2d}] ID: '{string_id}' -> {RED}ERROR: Passage not found!{RESET}"
)
# Define color codes outside the loop for final message
GREEN = "\033[92m"
RESET = "\033[0m"
logger.info(f" {GREEN}✓ Final enriched results: {len(enriched_results)} passages{RESET}")
return enriched_results
def cleanup(self):
"""Explicitly cleanup embedding server resources.
This method should be called after you're done using the searcher,
especially in test environments or batch processing scenarios.
"""
if hasattr(self.backend_impl, "embedding_server_manager"):
self.backend_impl.embedding_server_manager.stop_server()
# Enable automatic cleanup patterns
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
try:
self.cleanup()
except Exception:
pass
def __del__(self):
try:
self.cleanup()
except Exception:
# Avoid noisy errors during interpreter shutdown
pass
class LeannChat:
def __init__(
@@ -656,3 +751,28 @@ class LeannChat:
except (KeyboardInterrupt, EOFError):
print("\nGoodbye!")
break
def cleanup(self):
"""Explicitly cleanup embedding server resources.
This method should be called after you're done using the chat interface,
especially in test environments or batch processing scenarios.
"""
if hasattr(self.searcher, "cleanup"):
self.searcher.cleanup()
# Enable automatic cleanup patterns
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
try:
self.cleanup()
except Exception:
pass
def __del__(self):
try:
self.cleanup()
except Exception:
pass

View File

@@ -422,7 +422,6 @@ class LLMInterface(ABC):
top_k=10,
complexity=64,
beam_width=8,
USE_DEFERRED_FETCH=True,
skip_search_reorder=True,
recompute_beighbor_embeddings=True,
dedup_node_dis=True,
@@ -434,7 +433,6 @@ class LLMInterface(ABC):
Supported kwargs:
- complexity (int): Search complexity parameter (default: 32)
- beam_width (int): Beam width for search (default: 4)
- USE_DEFERRED_FETCH (bool): Enable deferred fetch mode (default: False)
- skip_search_reorder (bool): Skip search reorder step (default: False)
- recompute_beighbor_embeddings (bool): Enable ZMQ embedding server for neighbor recomputation (default: False)
- dedup_node_dis (bool): Deduplicate nodes by distance (default: False)

View File

@@ -72,7 +72,7 @@ class LeannCLI:
def create_parser(self) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="leann",
description="LEANN - Local Enhanced AI Navigation",
description="The smallest vector index in the world. RAG Everything with LEANN!",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
@@ -102,9 +102,18 @@ Examples:
help="Documents directories and/or files (default: current directory)",
)
build_parser.add_argument(
"--backend", type=str, default="hnsw", choices=["hnsw", "diskann"]
"--backend",
type=str,
default="hnsw",
choices=["hnsw", "diskann"],
help="Backend to use (default: hnsw)",
)
build_parser.add_argument(
"--embedding-model",
type=str,
default="facebook/contriever",
help="Embedding model (default: facebook/contriever)",
)
build_parser.add_argument("--embedding-model", type=str, default="facebook/contriever")
build_parser.add_argument(
"--embedding-mode",
type=str,
@@ -112,36 +121,88 @@ Examples:
choices=["sentence-transformers", "openai", "mlx", "ollama"],
help="Embedding backend mode (default: sentence-transformers)",
)
build_parser.add_argument("--force", "-f", action="store_true", help="Force rebuild")
build_parser.add_argument("--graph-degree", type=int, default=32)
build_parser.add_argument("--complexity", type=int, default=64)
build_parser.add_argument(
"--force", "-f", action="store_true", help="Force rebuild existing index"
)
build_parser.add_argument(
"--graph-degree", type=int, default=32, help="Graph degree (default: 32)"
)
build_parser.add_argument(
"--complexity", type=int, default=64, help="Build complexity (default: 64)"
)
build_parser.add_argument("--num-threads", type=int, default=1)
build_parser.add_argument("--compact", action="store_true", default=True)
build_parser.add_argument("--recompute", action="store_true", default=True)
build_parser.add_argument(
"--compact",
action=argparse.BooleanOptionalAction,
default=True,
help="Use compact storage (default: true). Must be `no-compact` for `no-recompute` build.",
)
build_parser.add_argument(
"--recompute",
action=argparse.BooleanOptionalAction,
default=True,
help="Enable recomputation (default: true)",
)
build_parser.add_argument(
"--file-types",
type=str,
help="Comma-separated list of file extensions to include (e.g., '.txt,.pdf,.pptx'). If not specified, uses default supported types.",
)
build_parser.add_argument(
"--include-hidden",
action=argparse.BooleanOptionalAction,
default=False,
help="Include hidden files and directories (paths starting with '.') during indexing (default: false)",
)
build_parser.add_argument(
"--doc-chunk-size",
type=int,
default=256,
help="Document chunk size in tokens/characters (default: 256)",
)
build_parser.add_argument(
"--doc-chunk-overlap",
type=int,
default=128,
help="Document chunk overlap (default: 128)",
)
build_parser.add_argument(
"--code-chunk-size",
type=int,
default=512,
help="Code chunk size in tokens/lines (default: 512)",
)
build_parser.add_argument(
"--code-chunk-overlap",
type=int,
default=50,
help="Code chunk overlap (default: 50)",
)
# Search command
search_parser = subparsers.add_parser("search", help="Search documents")
search_parser.add_argument("index_name", help="Index name")
search_parser.add_argument("query", help="Search query")
search_parser.add_argument("--top-k", type=int, default=5)
search_parser.add_argument("--complexity", type=int, default=64)
search_parser.add_argument(
"--top-k", type=int, default=5, help="Number of results (default: 5)"
)
search_parser.add_argument(
"--complexity", type=int, default=64, help="Search complexity (default: 64)"
)
search_parser.add_argument("--beam-width", type=int, default=1)
search_parser.add_argument("--prune-ratio", type=float, default=0.0)
search_parser.add_argument(
"--recompute-embeddings",
action="store_true",
"--recompute",
dest="recompute_embeddings",
action=argparse.BooleanOptionalAction,
default=True,
help="Recompute embeddings (default: True)",
help="Enable/disable embedding recomputation (default: enabled). Should not do a `no-recompute` search in a `recompute` build.",
)
search_parser.add_argument(
"--pruning-strategy",
choices=["global", "local", "proportional"],
default="global",
help="Pruning strategy (default: global)",
)
# Ask command
@@ -152,19 +213,27 @@ Examples:
type=str,
default="ollama",
choices=["simulated", "ollama", "hf", "openai"],
help="LLM provider (default: ollama)",
)
ask_parser.add_argument(
"--model", type=str, default="qwen3:8b", help="Model name (default: qwen3:8b)"
)
ask_parser.add_argument("--model", type=str, default="qwen3:8b")
ask_parser.add_argument("--host", type=str, default="http://localhost:11434")
ask_parser.add_argument("--interactive", "-i", action="store_true")
ask_parser.add_argument("--top-k", type=int, default=20)
ask_parser.add_argument(
"--interactive", "-i", action="store_true", help="Interactive chat mode"
)
ask_parser.add_argument(
"--top-k", type=int, default=20, help="Retrieval count (default: 20)"
)
ask_parser.add_argument("--complexity", type=int, default=32)
ask_parser.add_argument("--beam-width", type=int, default=1)
ask_parser.add_argument("--prune-ratio", type=float, default=0.0)
ask_parser.add_argument(
"--recompute-embeddings",
action="store_true",
"--recompute",
dest="recompute_embeddings",
action=argparse.BooleanOptionalAction,
default=True,
help="Recompute embeddings (default: True)",
help="Enable/disable embedding recomputation during ask (default: enabled)",
)
ask_parser.add_argument(
"--pruning-strategy",
@@ -348,7 +417,10 @@ Examples:
print(f" leann ask {example_name} --interactive")
def load_documents(
self, docs_paths: Union[str, list], custom_file_types: Union[str, None] = None
self,
docs_paths: Union[str, list],
custom_file_types: Union[str, None] = None,
include_hidden: bool = False,
):
# Handle both single path (string) and multiple paths (list) for backward compatibility
if isinstance(docs_paths, str):
@@ -392,6 +464,10 @@ Examples:
all_documents = []
# Helper to detect hidden path components
def _path_has_hidden_segment(p: Path) -> bool:
return any(part.startswith(".") and part not in [".", ".."] for part in p.parts)
# First, process individual files if any
if files:
print(f"\n🔄 Processing {len(files)} individual file{'s' if len(files) > 1 else ''}...")
@@ -404,8 +480,12 @@ Examples:
files_by_dir = defaultdict(list)
for file_path in files:
parent_dir = str(Path(file_path).parent)
files_by_dir[parent_dir].append(file_path)
file_path_obj = Path(file_path)
if not include_hidden and _path_has_hidden_segment(file_path_obj):
print(f" ⚠️ Skipping hidden file: {file_path}")
continue
parent_dir = str(file_path_obj.parent)
files_by_dir[parent_dir].append(str(file_path_obj))
# Load files from each parent directory
for parent_dir, file_list in files_by_dir.items():
@@ -416,6 +496,7 @@ Examples:
file_docs = SimpleDirectoryReader(
parent_dir,
input_files=file_list,
# exclude_hidden only affects directory scans; input_files are explicit
filename_as_id=True,
).load_data()
all_documents.extend(file_docs)
@@ -514,6 +595,8 @@ Examples:
# Check if file matches any exclude pattern
try:
relative_path = file_path.relative_to(docs_path)
if not include_hidden and _path_has_hidden_segment(relative_path):
continue
if self._should_exclude_file(relative_path, gitignore_matches):
continue
except ValueError:
@@ -541,6 +624,7 @@ Examples:
try:
default_docs = SimpleDirectoryReader(
str(file_path.parent),
exclude_hidden=not include_hidden,
filename_as_id=True,
required_exts=[file_path.suffix],
).load_data()
@@ -569,6 +653,7 @@ Examples:
encoding="utf-8",
required_exts=code_extensions,
file_extractor={}, # Use default extractors
exclude_hidden=not include_hidden,
filename_as_id=True,
).load_data(show_progress=True)
@@ -687,7 +772,40 @@ Examples:
print(f"Index '{index_name}' already exists. Use --force to rebuild.")
return
all_texts = self.load_documents(docs_paths, args.file_types)
# Configure chunking based on CLI args before loading documents
# Guard against invalid configurations
doc_chunk_size = max(1, int(args.doc_chunk_size))
doc_chunk_overlap = max(0, int(args.doc_chunk_overlap))
if doc_chunk_overlap >= doc_chunk_size:
print(
f"⚠️ Adjusting doc chunk overlap from {doc_chunk_overlap} to {doc_chunk_size - 1} (must be < chunk size)"
)
doc_chunk_overlap = doc_chunk_size - 1
code_chunk_size = max(1, int(args.code_chunk_size))
code_chunk_overlap = max(0, int(args.code_chunk_overlap))
if code_chunk_overlap >= code_chunk_size:
print(
f"⚠️ Adjusting code chunk overlap from {code_chunk_overlap} to {code_chunk_size - 1} (must be < chunk size)"
)
code_chunk_overlap = code_chunk_size - 1
self.node_parser = SentenceSplitter(
chunk_size=doc_chunk_size,
chunk_overlap=doc_chunk_overlap,
separator=" ",
paragraph_separator="\n\n",
)
self.code_parser = SentenceSplitter(
chunk_size=code_chunk_size,
chunk_overlap=code_chunk_overlap,
separator="\n",
paragraph_separator="\n\n",
)
all_texts = self.load_documents(
docs_paths, args.file_types, include_hidden=args.include_hidden
)
if not all_texts:
print("No documents found")
return

View File

@@ -263,8 +263,16 @@ def compute_embeddings_openai(texts: list[str], model_name: str) -> np.ndarray:
print(f"len of texts: {len(texts)}")
# OpenAI has limits on batch size and input length
max_batch_size = 1000 # Conservative batch size
max_batch_size = 800 # Conservative batch size because the token limit is 300K
all_embeddings = []
# get the avg len of texts
avg_len = sum(len(text) for text in texts) / len(texts)
print(f"avg len of texts: {avg_len}")
# if avg len is less than 1000, use the max batch size
if avg_len > 300:
max_batch_size = 500
# if avg len is less than 1000, use the max batch size
try:
from tqdm import tqdm

View File

@@ -8,7 +8,7 @@ import time
from pathlib import Path
from typing import Optional
import psutil
# Lightweight, self-contained server manager with no cross-process inspection
# Set up logging based on environment variable
LOG_LEVEL = os.getenv("LEANN_LOG_LEVEL", "WARNING").upper()
@@ -43,130 +43,7 @@ def _check_port(port: int) -> bool:
return s.connect_ex(("localhost", port)) == 0
def _check_process_matches_config(
port: int, expected_model: str, expected_passages_file: str
) -> bool:
"""
Check if the process using the port matches our expected model and passages file.
Returns True if matches, False otherwise.
"""
try:
for proc in psutil.process_iter(["pid", "cmdline"]):
if not _is_process_listening_on_port(proc, port):
continue
cmdline = proc.info["cmdline"]
if not cmdline:
continue
return _check_cmdline_matches_config(
cmdline, port, expected_model, expected_passages_file
)
logger.debug(f"No process found listening on port {port}")
return False
except Exception as e:
logger.warning(f"Could not check process on port {port}: {e}")
return False
def _is_process_listening_on_port(proc, port: int) -> bool:
"""Check if a process is listening on the given port."""
try:
connections = proc.net_connections()
for conn in connections:
if conn.laddr.port == port and conn.status == psutil.CONN_LISTEN:
return True
return False
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return False
def _check_cmdline_matches_config(
cmdline: list, port: int, expected_model: str, expected_passages_file: str
) -> bool:
"""Check if command line matches our expected configuration."""
cmdline_str = " ".join(cmdline)
logger.debug(f"Found process on port {port}: {cmdline_str}")
# Check if it's our embedding server
is_embedding_server = any(
server_type in cmdline_str
for server_type in [
"embedding_server",
"leann_backend_diskann.embedding_server",
"leann_backend_hnsw.hnsw_embedding_server",
]
)
if not is_embedding_server:
logger.debug(f"Process on port {port} is not our embedding server")
return False
# Check model name
model_matches = _check_model_in_cmdline(cmdline, expected_model)
# Check passages file if provided
passages_matches = _check_passages_in_cmdline(cmdline, expected_passages_file)
result = model_matches and passages_matches
logger.debug(
f"model_matches: {model_matches}, passages_matches: {passages_matches}, overall: {result}"
)
return result
def _check_model_in_cmdline(cmdline: list, expected_model: str) -> bool:
"""Check if the command line contains the expected model."""
if "--model-name" not in cmdline:
return False
model_idx = cmdline.index("--model-name")
if model_idx + 1 >= len(cmdline):
return False
actual_model = cmdline[model_idx + 1]
return actual_model == expected_model
def _check_passages_in_cmdline(cmdline: list, expected_passages_file: str) -> bool:
"""Check if the command line contains the expected passages file."""
if "--passages-file" not in cmdline:
return False # Expected but not found
passages_idx = cmdline.index("--passages-file")
if passages_idx + 1 >= len(cmdline):
return False
actual_passages = cmdline[passages_idx + 1]
expected_path = Path(expected_passages_file).resolve()
actual_path = Path(actual_passages).resolve()
return actual_path == expected_path
def _find_compatible_port_or_next_available(
start_port: int, model_name: str, passages_file: str, max_attempts: int = 100
) -> tuple[int, bool]:
"""
Find a port that either has a compatible server or is available.
Returns (port, is_compatible) where is_compatible indicates if we found a matching server.
"""
for port in range(start_port, start_port + max_attempts):
if not _check_port(port):
# Port is available
return port, False
# Port is in use, check if it's compatible
if _check_process_matches_config(port, model_name, passages_file):
logger.info(f"Found compatible server on port {port}")
return port, True
else:
logger.info(f"Port {port} has incompatible server, trying next port...")
raise RuntimeError(
f"Could not find compatible or available port in range {start_port}-{start_port + max_attempts}"
)
# Note: All cross-process scanning helpers removed for simplicity
class EmbeddingServerManager:
@@ -185,7 +62,16 @@ class EmbeddingServerManager:
self.backend_module_name = backend_module_name
self.server_process: Optional[subprocess.Popen] = None
self.server_port: Optional[int] = None
# Track last-started config for in-process reuse only
self._server_config: Optional[dict] = None
self._atexit_registered = False
# Also register a weakref finalizer to ensure cleanup when manager is GC'ed
try:
import weakref
self._finalizer = weakref.finalize(self, self._finalize_process)
except Exception:
self._finalizer = None
def start_server(
self,
@@ -195,26 +81,24 @@ class EmbeddingServerManager:
**kwargs,
) -> tuple[bool, int]:
"""Start the embedding server."""
passages_file = kwargs.get("passages_file")
# passages_file may be present in kwargs for server CLI, but we don't need it here
# Check if we have a compatible server already running
if self._has_compatible_running_server(model_name, passages_file):
logger.info("Found compatible running server!")
return True, port
# If this manager already has a live server, just reuse it
if self.server_process and self.server_process.poll() is None and self.server_port:
logger.info("Reusing in-process server")
return True, self.server_port
# For Colab environment, use a different strategy
if _is_colab_environment():
logger.info("Detected Colab environment, using alternative startup strategy")
return self._start_server_colab(port, model_name, embedding_mode, **kwargs)
# Find a compatible port or next available
actual_port, is_compatible = _find_compatible_port_or_next_available(
port, model_name, passages_file
)
if is_compatible:
logger.info(f"Found compatible server on port {actual_port}")
return True, actual_port
# Always pick a fresh available port
try:
actual_port = _get_available_port(port)
except RuntimeError:
logger.error("No available ports found")
return False, port
# Start a new server
return self._start_new_server(actual_port, model_name, embedding_mode, **kwargs)
@@ -247,17 +131,7 @@ class EmbeddingServerManager:
logger.error(f"Failed to start embedding server in Colab: {e}")
return False, actual_port
def _has_compatible_running_server(self, model_name: str, passages_file: str) -> bool:
"""Check if we have a compatible running server."""
if not (self.server_process and self.server_process.poll() is None and self.server_port):
return False
if _check_process_matches_config(self.server_port, model_name, passages_file):
logger.info(f"Existing server process (PID {self.server_process.pid}) is compatible")
return True
logger.info("Existing server process is incompatible. Should start a new server.")
return False
# Note: No compatibility check needed; manager is per-searcher and configs are stable per instance
def _start_new_server(
self, port: int, model_name: str, embedding_mode: str, **kwargs
@@ -304,22 +178,61 @@ class EmbeddingServerManager:
project_root = Path(__file__).parent.parent.parent.parent.parent
logger.info(f"Command: {' '.join(command)}")
# Let server output go directly to console
# The server will respect LEANN_LOG_LEVEL environment variable
# In CI environment, redirect stdout to avoid buffer deadlock but keep stderr for debugging
# Embedding servers use many print statements that can fill stdout buffers
is_ci = os.environ.get("CI") == "true"
if is_ci:
stdout_target = subprocess.DEVNULL
stderr_target = None # Keep stderr for error debugging in CI
logger.info(
"CI environment detected, redirecting embedding server stdout to DEVNULL, keeping stderr"
)
else:
stdout_target = None # Direct to console for visible logs
stderr_target = None # Direct to console for visible logs
# Start embedding server subprocess
self.server_process = subprocess.Popen(
command,
cwd=project_root,
stdout=None, # Direct to console
stderr=None, # Direct to console
stdout=stdout_target,
stderr=stderr_target,
)
self.server_port = port
# Record config for in-process reuse
try:
self._server_config = {
"model_name": command[command.index("--model-name") + 1]
if "--model-name" in command
else "",
"passages_file": command[command.index("--passages-file") + 1]
if "--passages-file" in command
else "",
"embedding_mode": command[command.index("--embedding-mode") + 1]
if "--embedding-mode" in command
else "sentence-transformers",
}
except Exception:
self._server_config = {
"model_name": "",
"passages_file": "",
"embedding_mode": "sentence-transformers",
}
logger.info(f"Server process started with PID: {self.server_process.pid}")
# Register atexit callback only when we actually start a process
if not self._atexit_registered:
# Use a lambda to avoid issues with bound methods
atexit.register(lambda: self.stop_server() if self.server_process else None)
# Always attempt best-effort finalize at interpreter exit
atexit.register(self._finalize_process)
self._atexit_registered = True
# Touch finalizer so it knows there is a live process
if getattr(self, "_finalizer", None) is not None and not self._finalizer.alive:
try:
import weakref
self._finalizer = weakref.finalize(self, self._finalize_process)
except Exception:
pass
def _wait_for_server_ready(self, port: int) -> tuple[bool, int]:
"""Wait for the server to be ready."""
@@ -344,24 +257,35 @@ class EmbeddingServerManager:
if not self.server_process:
return
if self.server_process.poll() is not None:
if self.server_process and self.server_process.poll() is not None:
# Process already terminated
self.server_process = None
self.server_port = None
self._server_config = None
return
logger.info(
f"Terminating server process (PID: {self.server_process.pid}) for backend {self.backend_module_name}..."
)
self.server_process.terminate()
# Use simple termination first; if the server installed signal handlers,
# it will exit cleanly. Otherwise escalate to kill after a short wait.
try:
self.server_process.terminate()
except Exception:
pass
try:
self.server_process.wait(timeout=3)
logger.info(f"Server process {self.server_process.pid} terminated.")
self.server_process.wait(timeout=5) # Give more time for graceful shutdown
logger.info(f"Server process {self.server_process.pid} terminated gracefully.")
except subprocess.TimeoutExpired:
logger.warning(
f"Server process {self.server_process.pid} did not terminate gracefully within 3 seconds, killing it."
f"Server process {self.server_process.pid} did not terminate within 5 seconds, force killing..."
)
self.server_process.kill()
try:
self.server_process.kill()
except Exception:
pass
try:
self.server_process.wait(timeout=2)
logger.info(f"Server process {self.server_process.pid} killed successfully.")
@@ -369,15 +293,33 @@ class EmbeddingServerManager:
logger.error(
f"Failed to kill server process {self.server_process.pid} - it may be hung"
)
# Don't hang indefinitely
# Clean up process resources to prevent resource tracker warnings
# Clean up process resources with timeout to avoid CI hang
try:
self.server_process.wait() # Ensure process is fully cleaned up
# Use shorter timeout in CI environments
is_ci = os.environ.get("CI") == "true"
timeout = 3 if is_ci else 10
self.server_process.wait(timeout=timeout)
logger.info(f"Server process {self.server_process.pid} cleanup completed")
except subprocess.TimeoutExpired:
logger.warning(f"Process cleanup timeout after {timeout}s, proceeding anyway")
except Exception as e:
logger.warning(f"Error during process cleanup: {e}")
finally:
self.server_process = None
self.server_port = None
self._server_config = None
def _finalize_process(self) -> None:
"""Best-effort cleanup used by weakref.finalize/atexit."""
try:
self.stop_server()
except Exception:
pass
self.server_process = None
def _adopt_existing_server(self, *args, **kwargs) -> None:
# Removed: cross-process adoption no longer supported
return
def _launch_server_process_colab(self, command: list, port: int) -> None:
"""Launch the server process with Colab-specific settings."""
@@ -393,10 +335,16 @@ class EmbeddingServerManager:
self.server_port = port
logger.info(f"Colab server process started with PID: {self.server_process.pid}")
# Register atexit callback
# Register atexit callback (unified)
if not self._atexit_registered:
atexit.register(lambda: self.stop_server() if self.server_process else None)
atexit.register(self._finalize_process)
self._atexit_registered = True
# Record config for in-process reuse is best-effort in Colab mode
self._server_config = {
"model_name": "",
"passages_file": "",
"embedding_mode": "sentence-transformers",
}
def _wait_for_server_ready_colab(self, port: int) -> tuple[bool, int]:
"""Wait for the server to be ready with Colab-specific timeout."""

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Literal, Union
from typing import Any, Literal, Optional
import numpy as np
@@ -35,7 +35,7 @@ class LeannBackendSearcherInterface(ABC):
@abstractmethod
def _ensure_server_running(
self, passages_source_file: str, port: Union[int, None], **kwargs
self, passages_source_file: str, port: Optional[int], **kwargs
) -> int:
"""Ensure server is running"""
pass
@@ -50,7 +50,7 @@ class LeannBackendSearcherInterface(ABC):
prune_ratio: float = 0.0,
recompute_embeddings: bool = False,
pruning_strategy: Literal["global", "local", "proportional"] = "global",
zmq_port: Union[int, None] = None,
zmq_port: Optional[int] = None,
**kwargs,
) -> dict[str, Any]:
"""Search for nearest neighbors
@@ -76,7 +76,7 @@ class LeannBackendSearcherInterface(ABC):
self,
query: str,
use_server_if_available: bool = True,
zmq_port: Union[int, None] = None,
zmq_port: Optional[int] = None,
) -> np.ndarray:
"""Compute embedding for a query string

View File

@@ -64,19 +64,6 @@ def handle_request(request):
"required": ["index_name", "query"],
},
},
{
"name": "leann_status",
"description": "📊 Check the health and stats of your code indexes - like a medical checkup for your codebase knowledge!",
"inputSchema": {
"type": "object",
"properties": {
"index_name": {
"type": "string",
"description": "Optional: Name of specific index to check. If not provided, shows status of all indexes.",
}
},
},
},
{
"name": "leann_list",
"description": "📋 Show all your indexed codebases - your personal code library! Use this to see what's available for search.",
@@ -116,18 +103,8 @@ def handle_request(request):
f"--top-k={args.get('top_k', 5)}",
f"--complexity={args.get('complexity', 32)}",
]
result = subprocess.run(cmd, capture_output=True, text=True)
elif tool_name == "leann_status":
if args.get("index_name"):
# Check specific index status - for now, we'll use leann list and filter
result = subprocess.run(["leann", "list"], capture_output=True, text=True)
# We could enhance this to show more detailed status per index
else:
# Show all indexes status
result = subprocess.run(["leann", "list"], capture_output=True, text=True)
elif tool_name == "leann_list":
result = subprocess.run(["leann", "list"], capture_output=True, text=True)

View File

@@ -4,27 +4,29 @@ Transform your development workflow with intelligent code assistance using LEANN
## Prerequisites
**Step 1:** First, complete the basic LEANN installation following the [📦 Installation guide](../../README.md#installation) in the root README:
Install LEANN globally for MCP integration (with default backend):
```bash
uv venv
source .venv/bin/activate
uv pip install leann
uv tool install leann-core --with leann
```
**Step 2:** Install LEANN globally for MCP integration:
```bash
uv tool install leann-core
```
This makes the `leann` command available system-wide, which `leann_mcp` requires.
This installs the `leann` CLI into an isolated tool environment and includes both backends so `leann build` works out-of-the-box.
## 🚀 Quick Setup
Add the LEANN MCP server to Claude Code:
Add the LEANN MCP server to Claude Code. Choose the scope based on how widely you want it available. Below is the command to install it globally; if you prefer a local install, skip this step:
```bash
claude mcp add leann-server -- leann_mcp
# Global (recommended): available in all projects for your user
claude mcp add --scope user leann-server -- leann_mcp
```
- `leann-server`: the display name of the MCP server in Claude Code (you can change it).
- `leann_mcp`: the Python entry point installed with LEANN that starts the MCP server.
Verify it is registered globally:
```bash
claude mcp list | cat
```
## 🛠️ Available Tools
@@ -33,27 +35,36 @@ Once connected, you'll have access to these powerful semantic search tools in Cl
- **`leann_list`** - List all available indexes across your projects
- **`leann_search`** - Perform semantic searches across code and documents
- **`leann_ask`** - Ask natural language questions and get AI-powered answers from your codebase
## 🎯 Quick Start Example
```bash
# Add locally if you did not add it globally (current folder only; default if --scope is omitted)
claude mcp add leann-server -- leann_mcp
# Build an index for your project (change to your actual path)
leann build my-project --docs ./
# See the advanced examples below for more ways to configure indexing
# Set the index name (replace 'my-project' with your own)
leann build my-project --docs $(git ls-files)
# Start Claude Code
claude
```
## 🚀 Advanced Usage Examples
## 🚀 Advanced Usage Examples to build the index
### Index Entire Git Repository
```bash
# Index all tracked files in your git repository, note right now we will skip submodules, but we can add it back easily if you want
# Index all tracked files in your Git repository.
# Note: submodules are currently skipped; we can add them back if needed.
leann build my-repo --docs $(git ls-files) --embedding-mode sentence-transformers --embedding-model all-MiniLM-L6-v2 --backend hnsw
# Index only specific file types from git
# Index only tracked Python files from Git.
leann build my-python-code --docs $(git ls-files "*.py") --embedding-mode sentence-transformers --embedding-model all-MiniLM-L6-v2 --backend hnsw
# If you encounter empty requests caused by empty files (e.g., __init__.py), exclude zero-byte files. Thanks @ww2283 for pointing [that](https://github.com/yichuan-w/LEANN/issues/48) out
leann build leann-prospec-lig --docs $(find ./src -name "*.py" -not -empty) --embedding-mode openai --embedding-model text-embedding-3-small
```
### Multiple Directories and Files
@@ -90,6 +101,7 @@ Help me understand this codebase. List available indexes and search for authenti
<img src="../../assets/claude_code_leann.png" alt="LEANN in Claude Code" width="80%">
</p>
If you see a prompt asking whether to proceed with LEANN, you can now use it in your chat!
## 🧠 How It Works
@@ -125,3 +137,11 @@ To remove LEANN
```
uv pip uninstall leann leann-backend-hnsw leann-core
```
To globally remove LEANN (for version update)
```
uv tool list | cat
uv tool uninstall leann-core
command -v leann || echo "leann gone"
command -v leann_mcp || echo "leann_mcp gone"
```

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "leann"
version = "0.2.7"
version = "0.2.9"
description = "LEANN - The smallest vector index in the world. RAG Everything with LEANN!"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -0,0 +1 @@
__all__ = []

View File

@@ -136,5 +136,9 @@ def export_sqlite(
connection.commit()
if __name__ == "__main__":
def main():
app()
if __name__ == "__main__":
main()

View File

@@ -10,6 +10,7 @@ requires-python = ">=3.9"
dependencies = [
"leann-core",
"leann-backend-hnsw",
"typer>=0.12.3",
"numpy>=1.26.0",
"torch",
"tqdm",
@@ -43,6 +44,7 @@ dependencies = [
"mlx>=0.26.3; sys_platform == 'darwin' and platform_machine == 'arm64'",
"mlx-lm>=0.26.0; sys_platform == 'darwin' and platform_machine == 'arm64'",
"psutil>=5.8.0",
"pybind11>=3.0.0",
"pathspec>=0.12.1",
"nbconvert>=7.16.6",
"gitignore-parser>=0.1.12",
@@ -54,7 +56,7 @@ dev = [
"pytest-cov>=4.0",
"pytest-xdist>=3.0", # For parallel test execution
"black>=23.0",
"ruff>=0.1.0",
"ruff==0.12.7", # Fixed version to ensure consistent formatting across all environments
"matplotlib",
"huggingface-hub>=0.20.0",
"pre-commit>=3.5.0",
@@ -83,6 +85,11 @@ documents = [
[tool.setuptools]
py-modules = []
packages = ["wechat_exporter"]
package-dir = { "wechat_exporter" = "packages/wechat-exporter" }
[project.scripts]
wechat-exporter = "wechat_exporter.main:main"
[tool.uv.sources]
@@ -154,7 +161,7 @@ markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"openai: marks tests that require OpenAI API key",
]
timeout = 600
timeout = 300 # Reduced from 600s (10min) to 300s (5min) for CI safety
addopts = [
"-v",
"--tb=short",

76
sky/leann-build.yaml Normal file
View File

@@ -0,0 +1,76 @@
name: leann-build
resources:
# Choose a GPU for fast embeddings (examples: L4, A10G, A100). CPU also works but is slower.
accelerators: L4:1
# Optionally pin a cloud, otherwise SkyPilot will auto-select
# cloud: aws
disk_size: 100
envs:
# Build parameters (override with: sky launch -c leann-gpu sky/leann-build.yaml -e key=value)
index_name: my-index
docs: ./data
backend: hnsw # hnsw | diskann
complexity: 64
graph_degree: 32
num_threads: 8
# Embedding selection
embedding_mode: sentence-transformers # sentence-transformers | openai | mlx | ollama
embedding_model: facebook/contriever
# Storage/latency knobs
recompute: true # true => selective recomputation (recommended)
compact: true # for HNSW only
# Optional pass-through
extra_args: ""
# Rebuild control
force: true
# Sync local paths to the remote VM. Adjust as needed.
file_mounts:
# Example: mount your local data directory used for building
~/leann-data: ${docs}
setup: |
set -e
# Install uv (package manager)
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
# Ensure modern libstdc++ for FAISS (GLIBCXX >= 3.4.30)
sudo apt-get update -y
sudo apt-get install -y libstdc++6 libgomp1
# Also upgrade conda's libstdc++ in base env (Skypilot images include conda)
if command -v conda >/dev/null 2>&1; then
conda install -y -n base -c conda-forge libstdcxx-ng
fi
# Install LEANN CLI and backends into the user environment
uv pip install --upgrade pip
uv pip install leann-core leann-backend-hnsw leann-backend-diskann
run: |
export PATH="$HOME/.local/bin:$PATH"
# Derive flags from env
recompute_flag=""
if [ "${recompute}" = "false" ] || [ "${recompute}" = "0" ]; then
recompute_flag="--no-recompute"
fi
force_flag=""
if [ "${force}" = "true" ] || [ "${force}" = "1" ]; then
force_flag="--force"
fi
# Build command
python -m leann.cli build ${index_name} \
--docs ~/leann-data \
--backend ${backend} \
--complexity ${complexity} \
--graph-degree ${graph_degree} \
--num-threads ${num_threads} \
--embedding-mode ${embedding_mode} \
--embedding-model ${embedding_model} \
${recompute_flag} ${force_flag} ${extra_args}
# Print where the index is stored for downstream rsync
echo "INDEX_OUT_DIR=~/.leann/indexes/${index_name}"

View File

@@ -6,10 +6,11 @@ This directory contains automated tests for the LEANN project using pytest.
### `test_readme_examples.py`
Tests the examples shown in README.md:
- The basic example code that users see first
- The basic example code that users see first (parametrized for both HNSW and DiskANN backends)
- Import statements work correctly
- Different backend options (HNSW, DiskANN)
- Different LLM configuration options
- Different LLM configuration options (parametrized for both backends)
- **All main README examples are tested with both HNSW and DiskANN backends using pytest parametrization**
### `test_basic.py`
Basic functionality tests that verify:
@@ -25,6 +26,16 @@ Tests the document RAG example functionality:
- Tests error handling with invalid parameters
- Verifies that normalized embeddings are detected and cosine distance is used
### `test_diskann_partition.py`
Tests DiskANN graph partitioning functionality:
- Tests DiskANN index building without partitioning (baseline)
- Tests automatic graph partitioning with `is_recompute=True`
- Verifies that partition files are created and large files are cleaned up for storage saving
- Tests search functionality with partitioned indices
- Validates medoid and max_base_norm file generation and usage
- Includes performance comparison between DiskANN (with partition) and HNSW
- **Note**: These tests are skipped in CI due to hardware requirements and computation time
## Running Tests
### Install test dependencies:
@@ -54,15 +65,23 @@ pytest tests/ -m "not openai"
# Skip slow tests
pytest tests/ -m "not slow"
# Run DiskANN partition tests (requires local machine, not CI)
pytest tests/test_diskann_partition.py
```
### Run with specific backend:
```bash
# Test only HNSW backend
pytest tests/test_basic.py::test_backend_basic[hnsw]
pytest tests/test_readme_examples.py::test_readme_basic_example[hnsw]
# Test only DiskANN backend
pytest tests/test_basic.py::test_backend_basic[diskann]
pytest tests/test_readme_examples.py::test_readme_basic_example[diskann]
# All DiskANN tests (parametrized + specialized partition tests)
pytest tests/ -k diskann
```
## CI/CD Integration

View File

@@ -64,6 +64,9 @@ def test_backend_basic(backend_name):
assert isinstance(results[0], SearchResult)
assert "topic 2" in results[0].text or "document" in results[0].text
# Ensure cleanup to avoid hanging background servers
searcher.cleanup()
@pytest.mark.skipif(
os.environ.get("CI") == "true", reason="Skip model tests in CI to avoid MPS memory issues"
@@ -90,3 +93,5 @@ def test_large_index():
searcher = LeannSearcher(index_path)
results = searcher.search(["word10 word20"], top_k=10)
assert len(results[0]) == 10
# Cleanup
searcher.cleanup()

View File

@@ -0,0 +1,369 @@
"""
Test DiskANN graph partitioning functionality.
Tests the automatic graph partitioning feature that was implemented to save
storage space by partitioning large DiskANN indices and safely deleting
redundant files while maintaining search functionality.
"""
import os
import tempfile
from pathlib import Path
import pytest
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip DiskANN partition tests in CI - requires specific hardware and large memory",
)
def test_diskann_without_partition():
"""Test DiskANN index building without partition (baseline)."""
from leann.api import LeannBuilder, LeannSearcher
with tempfile.TemporaryDirectory() as temp_dir:
index_path = str(Path(temp_dir) / "test_no_partition.leann")
# Test data - enough to trigger index building
texts = [
f"Document {i} discusses topic {i % 10} with detailed analysis of subject {i // 10}."
for i in range(500)
]
# Build without partition (is_recompute=False)
builder = LeannBuilder(
backend_name="diskann",
embedding_model="facebook/contriever",
embedding_mode="sentence-transformers",
num_neighbors=32,
search_list_size=50,
is_recompute=False, # No partition
)
for text in texts:
builder.add_text(text)
builder.build_index(index_path)
# Verify index was created
index_dir = Path(index_path).parent
assert index_dir.exists()
# Check that traditional DiskANN files exist
index_prefix = Path(index_path).stem
# Core DiskANN files (beam search index may not be created for small datasets)
required_files = [
f"{index_prefix}_disk.index",
f"{index_prefix}_pq_compressed.bin",
f"{index_prefix}_pq_pivots.bin",
]
# Check all generated files first for debugging
generated_files = [f.name for f in index_dir.glob(f"{index_prefix}*")]
print(f"Generated files: {generated_files}")
for required_file in required_files:
file_path = index_dir / required_file
assert file_path.exists(), f"Required file {required_file} not found"
# Ensure no partition files exist in non-partition mode
partition_files = [f"{index_prefix}_disk_graph.index", f"{index_prefix}_partition.bin"]
for partition_file in partition_files:
file_path = index_dir / partition_file
assert not file_path.exists(), (
f"Partition file {partition_file} should not exist in non-partition mode"
)
# Test search functionality
searcher = LeannSearcher(index_path)
results = searcher.search("topic 3 analysis", top_k=3)
assert len(results) > 0
assert all(result.score is not None and result.score != float("-inf") for result in results)
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip DiskANN partition tests in CI - requires specific hardware and large memory",
)
def test_diskann_with_partition():
"""Test DiskANN index building with automatic graph partitioning."""
from leann.api import LeannBuilder
with tempfile.TemporaryDirectory() as temp_dir:
index_path = str(Path(temp_dir) / "test_with_partition.leann")
# Test data - enough to trigger partitioning
texts = [
f"Document {i} explores subject {i % 15} with comprehensive coverage of area {i // 15}."
for i in range(500)
]
# Build with partition (is_recompute=True)
builder = LeannBuilder(
backend_name="diskann",
embedding_model="facebook/contriever",
embedding_mode="sentence-transformers",
num_neighbors=32,
search_list_size=50,
is_recompute=True, # Enable automatic partitioning
)
for text in texts:
builder.add_text(text)
builder.build_index(index_path)
# Verify index was created
index_dir = Path(index_path).parent
assert index_dir.exists()
# Check that partition files exist
index_prefix = Path(index_path).stem
partition_files = [
f"{index_prefix}_disk_graph.index", # Partitioned graph
f"{index_prefix}_partition.bin", # Partition metadata
f"{index_prefix}_pq_compressed.bin",
f"{index_prefix}_pq_pivots.bin",
]
for partition_file in partition_files:
file_path = index_dir / partition_file
assert file_path.exists(), f"Expected partition file {partition_file} not found"
# Check that large files were cleaned up (storage saving goal)
large_files = [f"{index_prefix}_disk.index", f"{index_prefix}_disk_beam_search.index"]
for large_file in large_files:
file_path = index_dir / large_file
assert not file_path.exists(), (
f"Large file {large_file} should have been deleted for storage saving"
)
# Verify required auxiliary files for partition mode exist
required_files = [
f"{index_prefix}_disk.index_medoids.bin",
f"{index_prefix}_disk.index_max_base_norm.bin",
]
for req_file in required_files:
file_path = index_dir / req_file
assert file_path.exists(), (
f"Required auxiliary file {req_file} missing for partition mode"
)
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip DiskANN partition tests in CI - requires specific hardware and large memory",
)
def test_diskann_partition_search_functionality():
"""Test that search works correctly with partitioned indices."""
from leann.api import LeannBuilder, LeannSearcher
with tempfile.TemporaryDirectory() as temp_dir:
index_path = str(Path(temp_dir) / "test_partition_search.leann")
# Create diverse test data
texts = [
"LEANN is a storage-efficient approximate nearest neighbor search system.",
"Graph partitioning helps reduce memory usage in large scale vector search.",
"DiskANN provides high-performance disk-based approximate nearest neighbor search.",
"Vector embeddings enable semantic search over unstructured text data.",
"Approximate nearest neighbor algorithms trade accuracy for speed and storage.",
] * 100 # Repeat to get enough data
# Build with partitioning
builder = LeannBuilder(
backend_name="diskann",
embedding_model="facebook/contriever",
embedding_mode="sentence-transformers",
is_recompute=True, # Enable partitioning
)
for text in texts:
builder.add_text(text)
builder.build_index(index_path)
# Test search with partitioned index
searcher = LeannSearcher(index_path)
# Test various queries
test_queries = [
("vector search algorithms", 5),
("LEANN storage efficiency", 3),
("graph partitioning memory", 4),
("approximate nearest neighbor", 7),
]
for query, top_k in test_queries:
results = searcher.search(query, top_k=top_k)
# Verify search results
assert len(results) == top_k, f"Expected {top_k} results for query '{query}'"
assert all(result.score is not None for result in results), (
"All results should have scores"
)
assert all(result.score != float("-inf") for result in results), (
"No result should have -inf score"
)
assert all(result.text is not None for result in results), (
"All results should have text"
)
# Scores should be in descending order (higher similarity first)
scores = [result.score for result in results]
assert scores == sorted(scores, reverse=True), (
"Results should be sorted by score descending"
)
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip DiskANN partition tests in CI - requires specific hardware and large memory",
)
def test_diskann_medoid_and_norm_files():
"""Test that medoid and max_base_norm files are correctly generated and used."""
import struct
from leann.api import LeannBuilder, LeannSearcher
with tempfile.TemporaryDirectory() as temp_dir:
index_path = str(Path(temp_dir) / "test_medoid_norm.leann")
# Small but sufficient dataset
texts = [f"Test document {i} with content about subject {i % 10}." for i in range(200)]
builder = LeannBuilder(
backend_name="diskann",
embedding_model="facebook/contriever",
embedding_mode="sentence-transformers",
is_recompute=True,
)
for text in texts:
builder.add_text(text)
builder.build_index(index_path)
index_dir = Path(index_path).parent
index_prefix = Path(index_path).stem
# Test medoids file
medoids_file = index_dir / f"{index_prefix}_disk.index_medoids.bin"
assert medoids_file.exists(), "Medoids file should be generated"
# Read and validate medoids file format
with open(medoids_file, "rb") as f:
nshards = struct.unpack("<I", f.read(4))[0]
one_val = struct.unpack("<I", f.read(4))[0]
medoid_id = struct.unpack("<I", f.read(4))[0]
assert nshards == 1, "Single-shot build should have 1 shard"
assert one_val == 1, "Expected value should be 1"
assert medoid_id >= 0, "Medoid ID should be valid (not hardcoded 0)"
# Test max_base_norm file
norm_file = index_dir / f"{index_prefix}_disk.index_max_base_norm.bin"
assert norm_file.exists(), "Max base norm file should be generated"
# Read and validate norm file
with open(norm_file, "rb") as f:
npts = struct.unpack("<I", f.read(4))[0]
ndims = struct.unpack("<I", f.read(4))[0]
norm_val = struct.unpack("<f", f.read(4))[0]
assert npts == 1, "Should have 1 norm point"
assert ndims == 1, "Should have 1 dimension"
assert norm_val > 0, "Norm value should be positive"
assert norm_val != float("inf"), "Norm value should be finite"
# Test that search works with these files
searcher = LeannSearcher(index_path)
results = searcher.search("test subject", top_k=3)
# Verify that scores are not -inf (which indicates norm file was loaded correctly)
assert len(results) > 0
assert all(result.score != float("-inf") for result in results), (
"Scores should not be -inf when norm file is correct"
)
@pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="Skip performance comparison in CI - requires significant compute time",
)
def test_diskann_vs_hnsw_performance():
"""Compare DiskANN (with partition) vs HNSW performance."""
import time
from leann.api import LeannBuilder, LeannSearcher
with tempfile.TemporaryDirectory() as temp_dir:
# Test data
texts = [
f"Performance test document {i} covering topic {i % 20} in detail." for i in range(1000)
]
query = "performance topic test"
# Test DiskANN with partitioning
diskann_path = str(Path(temp_dir) / "perf_diskann.leann")
diskann_builder = LeannBuilder(
backend_name="diskann",
embedding_model="facebook/contriever",
embedding_mode="sentence-transformers",
is_recompute=True,
)
for text in texts:
diskann_builder.add_text(text)
start_time = time.time()
diskann_builder.build_index(diskann_path)
# Test HNSW
hnsw_path = str(Path(temp_dir) / "perf_hnsw.leann")
hnsw_builder = LeannBuilder(
backend_name="hnsw",
embedding_model="facebook/contriever",
embedding_mode="sentence-transformers",
is_recompute=True,
)
for text in texts:
hnsw_builder.add_text(text)
start_time = time.time()
hnsw_builder.build_index(hnsw_path)
# Compare search performance
diskann_searcher = LeannSearcher(diskann_path)
hnsw_searcher = LeannSearcher(hnsw_path)
# Warm up searches
diskann_searcher.search(query, top_k=5)
hnsw_searcher.search(query, top_k=5)
# Timed searches
start_time = time.time()
diskann_results = diskann_searcher.search(query, top_k=10)
diskann_search_time = time.time() - start_time
start_time = time.time()
hnsw_results = hnsw_searcher.search(query, top_k=10)
hnsw_search_time = time.time() - start_time
# Basic assertions
assert len(diskann_results) == 10
assert len(hnsw_results) == 10
assert all(r.score != float("-inf") for r in diskann_results)
assert all(r.score != float("-inf") for r in hnsw_results)
# Performance ratio (informational)
if hnsw_search_time > 0:
speed_ratio = hnsw_search_time / diskann_search_time
print(f"DiskANN search time: {diskann_search_time:.4f}s")
print(f"HNSW search time: {hnsw_search_time:.4f}s")
print(f"DiskANN is {speed_ratio:.2f}x faster than HNSW")

View File

@@ -58,6 +58,9 @@ def test_document_rag_simulated(test_data_dir):
@pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY"), reason="OpenAI API key not available")
@pytest.mark.skipif(
os.environ.get("CI") == "true", reason="Skip OpenAI tests in CI to avoid API costs"
)
def test_document_rag_openai(test_data_dir):
"""Test document_rag with OpenAI embeddings."""
with tempfile.TemporaryDirectory() as temp_dir:

View File

@@ -10,29 +10,33 @@ from pathlib import Path
import pytest
def test_readme_basic_example():
"""Test the basic example from README.md."""
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
def test_readme_basic_example(backend_name):
"""Test the basic example from README.md with both backends."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2
if os.environ.get("CI") == "true" and platform.system() == "Darwin":
pytest.skip("Skipping on macOS CI due to MPS environment issues with all-MiniLM-L6-v2")
# Skip DiskANN on CI (Linux runners) due to C++ extension memory/hardware constraints
if os.environ.get("CI") == "true" and backend_name == "diskann":
pytest.skip("Skip DiskANN tests in CI due to resource constraints and instability")
# This is the exact code from README (with smaller model for CI)
from leann import LeannBuilder, LeannChat, LeannSearcher
from leann.api import SearchResult
with tempfile.TemporaryDirectory() as temp_dir:
INDEX_PATH = str(Path(temp_dir) / "demo.leann")
INDEX_PATH = str(Path(temp_dir) / f"demo_{backend_name}.leann")
# Build an index
# In CI, use a smaller model to avoid memory issues
if os.environ.get("CI") == "true":
builder = LeannBuilder(
backend_name="hnsw",
backend_name=backend_name,
embedding_model="sentence-transformers/all-MiniLM-L6-v2", # Smaller model
dimensions=384, # Smaller dimensions
)
else:
builder = LeannBuilder(backend_name="hnsw")
builder = LeannBuilder(backend_name=backend_name)
builder.add_text("LEANN saves 97% storage compared to traditional vector databases.")
builder.add_text("Tung Tung Tung Sahur called—they need their banana-crocodile hybrid back")
builder.build_index(INDEX_PATH)
@@ -52,9 +56,15 @@ def test_readme_basic_example():
# Verify search results
assert len(results) > 0
assert isinstance(results[0], SearchResult)
assert results[0].score != float("-inf"), (
f"should return valid scores, got {results[0].score}"
)
# The second text about banana-crocodile should be more relevant
assert "banana" in results[0].text or "crocodile" in results[0].text
# Ensure we cleanup background embedding server
searcher.cleanup()
# Chat with your data (using simulated LLM to avoid external dependencies)
chat = LeannChat(INDEX_PATH, llm_config={"type": "simulated"})
response = chat.ask("How much storage does LEANN save?", top_k=1)
@@ -62,6 +72,8 @@ def test_readme_basic_example():
# Verify chat works
assert isinstance(response, str)
assert len(response) > 0
# Cleanup chat resources
chat.cleanup()
def test_readme_imports():
@@ -110,26 +122,31 @@ def test_backend_options():
assert len(list(Path(diskann_path).parent.glob(f"{Path(diskann_path).stem}.*"))) > 0
def test_llm_config_simulated():
"""Test simulated LLM configuration option."""
@pytest.mark.parametrize("backend_name", ["hnsw", "diskann"])
def test_llm_config_simulated(backend_name):
"""Test simulated LLM configuration option with both backends."""
# Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2
if os.environ.get("CI") == "true" and platform.system() == "Darwin":
pytest.skip("Skipping on macOS CI due to MPS environment issues with all-MiniLM-L6-v2")
# Skip DiskANN tests in CI due to hardware requirements
if os.environ.get("CI") == "true" and backend_name == "diskann":
pytest.skip("Skip DiskANN tests in CI - requires specific hardware and large memory")
from leann import LeannBuilder, LeannChat
with tempfile.TemporaryDirectory() as temp_dir:
# Build a simple index
index_path = str(Path(temp_dir) / "test.leann")
index_path = str(Path(temp_dir) / f"test_{backend_name}.leann")
# Use smaller model in CI to avoid memory issues
if os.environ.get("CI") == "true":
builder = LeannBuilder(
backend_name="hnsw",
backend_name=backend_name,
embedding_model="sentence-transformers/all-MiniLM-L6-v2",
dimensions=384,
)
else:
builder = LeannBuilder(backend_name="hnsw")
builder = LeannBuilder(backend_name=backend_name)
builder.add_text("Test document for LLM testing")
builder.build_index(index_path)

77
uv.lock generated
View File

@@ -2223,7 +2223,7 @@ wheels = [
[[package]]
name = "leann-backend-diskann"
version = "0.2.6"
version = "0.2.9"
source = { editable = "packages/leann-backend-diskann" }
dependencies = [
{ name = "leann-core" },
@@ -2235,14 +2235,14 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "leann-core", specifier = "==0.2.6" },
{ name = "leann-core", specifier = "==0.2.9" },
{ name = "numpy" },
{ name = "protobuf", specifier = ">=3.19.0" },
]
[[package]]
name = "leann-backend-hnsw"
version = "0.2.6"
version = "0.2.9"
source = { editable = "packages/leann-backend-hnsw" }
dependencies = [
{ name = "leann-core" },
@@ -2255,7 +2255,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "leann-core", specifier = "==0.2.6" },
{ name = "leann-core", specifier = "==0.2.9" },
{ name = "msgpack", specifier = ">=1.0.0" },
{ name = "numpy" },
{ name = "pyzmq", specifier = ">=23.0.0" },
@@ -2263,7 +2263,7 @@ requires-dist = [
[[package]]
name = "leann-core"
version = "0.2.6"
version = "0.2.9"
source = { editable = "packages/leann-core" }
dependencies = [
{ name = "accelerate" },
@@ -2272,8 +2272,8 @@ dependencies = [
{ name = "llama-index-core" },
{ name = "llama-index-embeddings-huggingface" },
{ name = "llama-index-readers-file" },
{ name = "mlx", marker = "sys_platform == 'darwin'" },
{ name = "mlx-lm", marker = "sys_platform == 'darwin'" },
{ name = "mlx", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'" },
{ name = "mlx-lm", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'" },
{ name = "msgpack" },
{ name = "nbconvert" },
{ name = "numpy", version = "2.0.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
@@ -2302,8 +2302,8 @@ requires-dist = [
{ name = "llama-index-core", specifier = ">=0.12.0" },
{ name = "llama-index-embeddings-huggingface", specifier = ">=0.5.5" },
{ name = "llama-index-readers-file", specifier = ">=0.4.0" },
{ name = "mlx", marker = "sys_platform == 'darwin'", specifier = ">=0.26.3" },
{ name = "mlx-lm", marker = "sys_platform == 'darwin'", specifier = ">=0.26.0" },
{ name = "mlx", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'", specifier = ">=0.26.3" },
{ name = "mlx-lm", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'", specifier = ">=0.26.0" },
{ name = "msgpack", specifier = ">=1.0.0" },
{ name = "nbconvert", specifier = ">=7.0.0" },
{ name = "numpy", specifier = ">=1.20.0" },
@@ -2343,8 +2343,8 @@ dependencies = [
{ name = "llama-index-embeddings-huggingface" },
{ name = "llama-index-readers-file" },
{ name = "llama-index-vector-stores-faiss" },
{ name = "mlx", marker = "sys_platform == 'darwin'" },
{ name = "mlx-lm", marker = "sys_platform == 'darwin'" },
{ name = "mlx", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'" },
{ name = "mlx-lm", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'" },
{ name = "msgpack" },
{ name = "nbconvert" },
{ name = "numpy", version = "2.0.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
@@ -2356,6 +2356,7 @@ dependencies = [
{ name = "pdfplumber" },
{ name = "protobuf" },
{ name = "psutil" },
{ name = "pybind11" },
{ name = "pymupdf" },
{ name = "pypdf2" },
{ name = "pypdfium2" },
@@ -2424,8 +2425,8 @@ requires-dist = [
{ name = "llama-index-readers-file", marker = "extra == 'test'", specifier = ">=0.4.0" },
{ name = "llama-index-vector-stores-faiss", specifier = ">=0.4.0" },
{ name = "matplotlib", marker = "extra == 'dev'" },
{ name = "mlx", marker = "sys_platform == 'darwin'", specifier = ">=0.26.3" },
{ name = "mlx-lm", marker = "sys_platform == 'darwin'", specifier = ">=0.26.0" },
{ name = "mlx", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'", specifier = ">=0.26.3" },
{ name = "mlx-lm", marker = "platform_machine == 'arm64' and sys_platform == 'darwin'", specifier = ">=0.26.0" },
{ name = "msgpack", specifier = ">=1.1.1" },
{ name = "nbconvert", specifier = ">=7.16.6" },
{ name = "numpy", specifier = ">=1.26.0" },
@@ -2438,6 +2439,7 @@ requires-dist = [
{ name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.5.0" },
{ name = "protobuf", specifier = "==4.25.3" },
{ name = "psutil", specifier = ">=5.8.0" },
{ name = "pybind11", specifier = ">=3.0.0" },
{ name = "pymupdf", specifier = ">=1.26.0" },
{ name = "pypdf2", specifier = ">=3.0.0" },
{ name = "pypdfium2", specifier = ">=4.30.0" },
@@ -2449,7 +2451,7 @@ requires-dist = [
{ name = "python-docx", marker = "extra == 'documents'", specifier = ">=0.8.11" },
{ name = "python-dotenv", marker = "extra == 'test'", specifier = ">=1.0.0" },
{ name = "requests", specifier = ">=2.25.0" },
{ name = "ruff", marker = "extra == 'dev'", specifier = ">=0.1.0" },
{ name = "ruff", marker = "extra == 'dev'", specifier = "==0.12.7" },
{ name = "sentence-transformers", specifier = ">=2.2.0" },
{ name = "sentence-transformers", marker = "extra == 'test'", specifier = ">=2.2.0" },
{ name = "sglang" },
@@ -4358,6 +4360,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/10/15/6b30e77872012bbfe8265d42a01d5b3c17ef0ac0f2fae531ad91b6a6c02e/pyarrow-21.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:cdc4c17afda4dab2a9c0b79148a43a7f4e1094916b3e18d8975bfd6d6d52241f", size = 26227521 },
]
[[package]]
name = "pybind11"
version = "3.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ef/83/698d120e257a116f2472c710932023ad779409adf2734d2e940f34eea2c5/pybind11-3.0.0.tar.gz", hash = "sha256:c3f07bce3ada51c3e4b76badfa85df11688d12c46111f9d242bc5c9415af7862", size = 544819 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/41/9c/85f50a5476832c3efc67b6d7997808388236ae4754bf53e1749b3bc27577/pybind11-3.0.0-py3-none-any.whl", hash = "sha256:7c5cac504da5a701b5163f0e6a7ba736c713a096a5378383c5b4b064b753f607", size = 292118 },
]
[[package]]
name = "pycparser"
version = "2.22"
@@ -5204,27 +5215,27 @@ wheels = [
[[package]]
name = "ruff"
version = "0.12.5"
version = "0.12.7"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/30/cd/01015eb5034605fd98d829c5839ec2c6b4582b479707f7c1c2af861e8258/ruff-0.12.5.tar.gz", hash = "sha256:b209db6102b66f13625940b7f8c7d0f18e20039bb7f6101fbdac935c9612057e", size = 5170722 }
sdist = { url = "https://files.pythonhosted.org/packages/a1/81/0bd3594fa0f690466e41bd033bdcdf86cba8288345ac77ad4afbe5ec743a/ruff-0.12.7.tar.gz", hash = "sha256:1fc3193f238bc2d7968772c82831a4ff69252f673be371fb49663f0068b7ec71", size = 5197814 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d4/de/ad2f68f0798ff15dd8c0bcc2889558970d9a685b3249565a937cd820ad34/ruff-0.12.5-py3-none-linux_armv6l.whl", hash = "sha256:1de2c887e9dec6cb31fcb9948299de5b2db38144e66403b9660c9548a67abd92", size = 11819133 },
{ url = "https://files.pythonhosted.org/packages/f8/fc/c6b65cd0e7fbe60f17e7ad619dca796aa49fbca34bb9bea5f8faf1ec2643/ruff-0.12.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:d1ab65e7d8152f519e7dea4de892317c9da7a108da1c56b6a3c1d5e7cf4c5e9a", size = 12501114 },
{ url = "https://files.pythonhosted.org/packages/c5/de/c6bec1dce5ead9f9e6a946ea15e8d698c35f19edc508289d70a577921b30/ruff-0.12.5-py3-none-macosx_11_0_arm64.whl", hash = "sha256:962775ed5b27c7aa3fdc0d8f4d4433deae7659ef99ea20f783d666e77338b8cf", size = 11716873 },
{ url = "https://files.pythonhosted.org/packages/a1/16/cf372d2ebe91e4eb5b82a2275c3acfa879e0566a7ac94d331ea37b765ac8/ruff-0.12.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:73b4cae449597e7195a49eb1cdca89fd9fbb16140c7579899e87f4c85bf82f73", size = 11958829 },
{ url = "https://files.pythonhosted.org/packages/25/bf/cd07e8f6a3a6ec746c62556b4c4b79eeb9b0328b362bb8431b7b8afd3856/ruff-0.12.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8b13489c3dc50de5e2d40110c0cce371e00186b880842e245186ca862bf9a1ac", size = 11626619 },
{ url = "https://files.pythonhosted.org/packages/d8/c9/c2ccb3b8cbb5661ffda6925f81a13edbb786e623876141b04919d1128370/ruff-0.12.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f1504fea81461cf4841778b3ef0a078757602a3b3ea4b008feb1308cb3f23e08", size = 13221894 },
{ url = "https://files.pythonhosted.org/packages/6b/58/68a5be2c8e5590ecdad922b2bcd5583af19ba648f7648f95c51c3c1eca81/ruff-0.12.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:c7da4129016ae26c32dfcbd5b671fe652b5ab7fc40095d80dcff78175e7eddd4", size = 14163909 },
{ url = "https://files.pythonhosted.org/packages/bd/d1/ef6b19622009ba8386fdb792c0743f709cf917b0b2f1400589cbe4739a33/ruff-0.12.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ca972c80f7ebcfd8af75a0f18b17c42d9f1ef203d163669150453f50ca98ab7b", size = 13583652 },
{ url = "https://files.pythonhosted.org/packages/62/e3/1c98c566fe6809a0c83751d825a03727f242cdbe0d142c9e292725585521/ruff-0.12.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:8dbbf9f25dfb501f4237ae7501d6364b76a01341c6f1b2cd6764fe449124bb2a", size = 12700451 },
{ url = "https://files.pythonhosted.org/packages/24/ff/96058f6506aac0fbc0d0fc0d60b0d0bd746240a0594657a2d94ad28033ba/ruff-0.12.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2c47dea6ae39421851685141ba9734767f960113d51e83fd7bb9958d5be8763a", size = 12937465 },
{ url = "https://files.pythonhosted.org/packages/eb/d3/68bc5e7ab96c94b3589d1789f2dd6dd4b27b263310019529ac9be1e8f31b/ruff-0.12.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:c5076aa0e61e30f848846f0265c873c249d4b558105b221be1828f9f79903dc5", size = 11771136 },
{ url = "https://files.pythonhosted.org/packages/52/75/7356af30a14584981cabfefcf6106dea98cec9a7af4acb5daaf4b114845f/ruff-0.12.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:a5a4c7830dadd3d8c39b1cc85386e2c1e62344f20766be6f173c22fb5f72f293", size = 11601644 },
{ url = "https://files.pythonhosted.org/packages/c2/67/91c71d27205871737cae11025ee2b098f512104e26ffd8656fd93d0ada0a/ruff-0.12.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:46699f73c2b5b137b9dc0fc1a190b43e35b008b398c6066ea1350cce6326adcb", size = 12478068 },
{ url = "https://files.pythonhosted.org/packages/34/04/b6b00383cf2f48e8e78e14eb258942fdf2a9bf0287fbf5cdd398b749193a/ruff-0.12.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:5a655a0a0d396f0f072faafc18ebd59adde8ca85fb848dc1b0d9f024b9c4d3bb", size = 12991537 },
{ url = "https://files.pythonhosted.org/packages/3e/b9/053d6445dc7544fb6594785056d8ece61daae7214859ada4a152ad56b6e0/ruff-0.12.5-py3-none-win32.whl", hash = "sha256:dfeb2627c459b0b78ca2bbdc38dd11cc9a0a88bf91db982058b26ce41714ffa9", size = 11751575 },
{ url = "https://files.pythonhosted.org/packages/bc/0f/ab16e8259493137598b9149734fec2e06fdeda9837e6f634f5c4e35916da/ruff-0.12.5-py3-none-win_amd64.whl", hash = "sha256:ae0d90cf5f49466c954991b9d8b953bd093c32c27608e409ae3564c63c5306a5", size = 12882273 },
{ url = "https://files.pythonhosted.org/packages/00/db/c376b0661c24cf770cb8815268190668ec1330eba8374a126ceef8c72d55/ruff-0.12.5-py3-none-win_arm64.whl", hash = "sha256:48cdbfc633de2c5c37d9f090ba3b352d1576b0015bfc3bc98eaf230275b7e805", size = 11951564 },
{ url = "https://files.pythonhosted.org/packages/e1/d2/6cb35e9c85e7a91e8d22ab32ae07ac39cc34a71f1009a6f9e4a2a019e602/ruff-0.12.7-py3-none-linux_armv6l.whl", hash = "sha256:76e4f31529899b8c434c3c1dede98c4483b89590e15fb49f2d46183801565303", size = 11852189 },
{ url = "https://files.pythonhosted.org/packages/63/5b/a4136b9921aa84638f1a6be7fb086f8cad0fde538ba76bda3682f2599a2f/ruff-0.12.7-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:789b7a03e72507c54fb3ba6209e4bb36517b90f1a3569ea17084e3fd295500fb", size = 12519389 },
{ url = "https://files.pythonhosted.org/packages/a8/c9/3e24a8472484269b6b1821794141f879c54645a111ded4b6f58f9ab0705f/ruff-0.12.7-py3-none-macosx_11_0_arm64.whl", hash = "sha256:2e1c2a3b8626339bb6369116e7030a4cf194ea48f49b64bb505732a7fce4f4e3", size = 11743384 },
{ url = "https://files.pythonhosted.org/packages/26/7c/458dd25deeb3452c43eaee853c0b17a1e84169f8021a26d500ead77964fd/ruff-0.12.7-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:32dec41817623d388e645612ec70d5757a6d9c035f3744a52c7b195a57e03860", size = 11943759 },
{ url = "https://files.pythonhosted.org/packages/7f/8b/658798472ef260ca050e400ab96ef7e85c366c39cf3dfbef4d0a46a528b6/ruff-0.12.7-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:47ef751f722053a5df5fa48d412dbb54d41ab9b17875c6840a58ec63ff0c247c", size = 11654028 },
{ url = "https://files.pythonhosted.org/packages/a8/86/9c2336f13b2a3326d06d39178fd3448dcc7025f82514d1b15816fe42bfe8/ruff-0.12.7-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a828a5fc25a3efd3e1ff7b241fd392686c9386f20e5ac90aa9234a5faa12c423", size = 13225209 },
{ url = "https://files.pythonhosted.org/packages/76/69/df73f65f53d6c463b19b6b312fd2391dc36425d926ec237a7ed028a90fc1/ruff-0.12.7-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:5726f59b171111fa6a69d82aef48f00b56598b03a22f0f4170664ff4d8298efb", size = 14182353 },
{ url = "https://files.pythonhosted.org/packages/58/1e/de6cda406d99fea84b66811c189b5ea139814b98125b052424b55d28a41c/ruff-0.12.7-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:74e6f5c04c4dd4aba223f4fe6e7104f79e0eebf7d307e4f9b18c18362124bccd", size = 13631555 },
{ url = "https://files.pythonhosted.org/packages/6f/ae/625d46d5164a6cc9261945a5e89df24457dc8262539ace3ac36c40f0b51e/ruff-0.12.7-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:5d0bfe4e77fba61bf2ccadf8cf005d6133e3ce08793bbe870dd1c734f2699a3e", size = 12667556 },
{ url = "https://files.pythonhosted.org/packages/55/bf/9cb1ea5e3066779e42ade8d0cd3d3b0582a5720a814ae1586f85014656b6/ruff-0.12.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06bfb01e1623bf7f59ea749a841da56f8f653d641bfd046edee32ede7ff6c606", size = 12939784 },
{ url = "https://files.pythonhosted.org/packages/55/7f/7ead2663be5627c04be83754c4f3096603bf5e99ed856c7cd29618c691bd/ruff-0.12.7-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:e41df94a957d50083fd09b916d6e89e497246698c3f3d5c681c8b3e7b9bb4ac8", size = 11771356 },
{ url = "https://files.pythonhosted.org/packages/17/40/a95352ea16edf78cd3a938085dccc55df692a4d8ba1b3af7accbe2c806b0/ruff-0.12.7-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:4000623300563c709458d0ce170c3d0d788c23a058912f28bbadc6f905d67afa", size = 11612124 },
{ url = "https://files.pythonhosted.org/packages/4d/74/633b04871c669e23b8917877e812376827c06df866e1677f15abfadc95cb/ruff-0.12.7-py3-none-musllinux_1_2_i686.whl", hash = "sha256:69ffe0e5f9b2cf2b8e289a3f8945b402a1b19eff24ec389f45f23c42a3dd6fb5", size = 12479945 },
{ url = "https://files.pythonhosted.org/packages/be/34/c3ef2d7799c9778b835a76189c6f53c179d3bdebc8c65288c29032e03613/ruff-0.12.7-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a07a5c8ffa2611a52732bdc67bf88e243abd84fe2d7f6daef3826b59abbfeda4", size = 12998677 },
{ url = "https://files.pythonhosted.org/packages/77/ab/aca2e756ad7b09b3d662a41773f3edcbd262872a4fc81f920dc1ffa44541/ruff-0.12.7-py3-none-win32.whl", hash = "sha256:c928f1b2ec59fb77dfdf70e0419408898b63998789cc98197e15f560b9e77f77", size = 11756687 },
{ url = "https://files.pythonhosted.org/packages/b4/71/26d45a5042bc71db22ddd8252ca9d01e9ca454f230e2996bb04f16d72799/ruff-0.12.7-py3-none-win_amd64.whl", hash = "sha256:9c18f3d707ee9edf89da76131956aba1270c6348bfee8f6c647de841eac7194f", size = 12912365 },
{ url = "https://files.pythonhosted.org/packages/4c/9b/0b8aa09817b63e78d94b4977f18b1fcaead3165a5ee49251c5d5c245bb2d/ruff-0.12.7-py3-none-win_arm64.whl", hash = "sha256:dfce05101dbd11833a0776716d5d1578641b7fddb537fe7fa956ab85d1769b69", size = 11982083 },
]
[[package]]