From 3d79741f9cadce3fb2b208094c0f7dcc69923c0d Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Mon, 25 Aug 2025 15:46:48 -0700 Subject: [PATCH] experiments for running DiskANN & BM25 on Arch 4090 --- benchmarks/bm25_diskann_baselines/README.md | 23 +++ benchmarks/bm25_diskann_baselines/run_bm25.py | 183 ++++++++++++++++++ .../bm25_diskann_baselines/run_diskann.py | 135 +++++++++++++ .../leann_backend_diskann/diskann_backend.py | 3 +- scripts/hf_upload.py | 121 ++++++++++++ 5 files changed, 464 insertions(+), 1 deletion(-) create mode 100644 benchmarks/bm25_diskann_baselines/README.md create mode 100644 benchmarks/bm25_diskann_baselines/run_bm25.py create mode 100644 benchmarks/bm25_diskann_baselines/run_diskann.py create mode 100644 scripts/hf_upload.py diff --git a/benchmarks/bm25_diskann_baselines/README.md b/benchmarks/bm25_diskann_baselines/README.md new file mode 100644 index 0000000..adf4736 --- /dev/null +++ b/benchmarks/bm25_diskann_baselines/README.md @@ -0,0 +1,23 @@ +BM25 vs DiskANN Baselines + +```bash +aws s3 sync s3://powerrag-diskann-rpj-wiki-20250824-224037-194d640c/bm25_rpj_wiki/index_en_only/ benchmarks/data/indices/bm25_index/ +aws s3 sync s3://powerrag-diskann-rpj-wiki-20250824-224037-194d640c/diskann_rpj_wiki/ benchmarks/data/indices/diskann_rpj_wiki/ +``` + +- Dataset: `benchmarks/data/queries/nq_open.jsonl` (Natural Questions) +- Machine-specific; results measured locally with the current repo. + +DiskANN (NQ queries, search-only) +- Command: `uv run benchmarks/bm25_diskann_baselines/run_diskann.py` +- Settings: `recompute_embeddings=False`, embeddings precomputed (excluded from timing), batching off, caching off (`cache_mechanism=2`, `num_nodes_to_cache=0`) +- Result: avg 0.019339 s/query, QPS 51.71 (p50 ~0.018936 s, p95 ~0.023573 s) + +BM25 +- Command: `uv run --script ./benchmarks/run_bm25.py` +- Settings: `k=10`, `k1=0.9`, `b=0.4`, queries=100 +- Result: avg 0.026976 s/query, QPS 37.07 (p50 0.024729 s, p90 0.042158 s, p95 0.047099 s, p99 0.053520 s) + +Notes +- DiskANN measures search-only latency on real NQ queries (embeddings computed beforehand and excluded from timing). +- Use `benchmarks/bm25_diskann_baselines/run_diskann.py` for DiskANN; `benchmarks/run_bm25.py` for BM25. diff --git a/benchmarks/bm25_diskann_baselines/run_bm25.py b/benchmarks/bm25_diskann_baselines/run_bm25.py new file mode 100644 index 0000000..f10ad93 --- /dev/null +++ b/benchmarks/bm25_diskann_baselines/run_bm25.py @@ -0,0 +1,183 @@ +# /// script +# dependencies = [ +# "pyserini" +# ] +# /// +# sudo pacman -S jdk21-openjdk +# export JAVA_HOME=/usr/lib/jvm/java-21-openjdk +# sudo archlinux-java status +# sudo archlinux-java set java-21-openjdk +# set -Ux JAVA_HOME /usr/lib/jvm/java-21-openjdk +# fish_add_path --global $JAVA_HOME/bin +# set -Ux LD_LIBRARY_PATH $JAVA_HOME/lib/server $LD_LIBRARY_PATH +# which javac # Should be /usr/lib/jvm/java-21-openjdk/bin/javac + +import argparse +import json +import os +import sys +import time +from statistics import mean + + +def load_queries(path: str, limit: int | None) -> list[str]: + queries: list[str] = [] + # Try JSONL with a 'query' or 'text' field; fallback to plain text (one query per line) + _, ext = os.path.splitext(path) + if ext.lower() in {".jsonl", ".json"}: + with open(path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + # Not strict JSONL? treat the whole line as the query + queries.append(line) + continue + q = obj.get("query") or obj.get("text") or obj.get("question") + if q: + queries.append(str(q)) + else: + with open(path, encoding="utf-8") as f: + for line in f: + s = line.strip() + if s: + queries.append(s) + + if limit is not None and limit > 0: + queries = queries[:limit] + return queries + + +def percentile(values: list[float], p: float) -> float: + if not values: + return 0.0 + s = sorted(values) + k = (len(s) - 1) * (p / 100.0) + f = int(k) + c = min(f + 1, len(s) - 1) + if f == c: + return s[f] + return s[f] + (s[c] - s[f]) * (k - f) + + +def main(): + ap = argparse.ArgumentParser(description="Standalone BM25 latency benchmark (Pyserini)") + ap.add_argument( + "--bm25-index", + default="benchmarks/data/indices/bm25_index", + help="Path to Pyserini Lucene index directory", + ) + ap.add_argument( + "--queries", + default="benchmarks/data/queries/nq_open.jsonl", + help="Path to queries file (JSONL with 'query'/'text' or plain txt one-per-line)", + ) + ap.add_argument("--k", type=int, default=10, help="Top-k to retrieve (default: 10)") + ap.add_argument("--k1", type=float, default=0.9, help="BM25 k1 (default: 0.9)") + ap.add_argument("--b", type=float, default=0.4, help="BM25 b (default: 0.4)") + ap.add_argument("--limit", type=int, default=100, help="Max queries to run (default: 100)") + ap.add_argument( + "--warmup", type=int, default=5, help="Warmup queries not counted in latency (default: 5)" + ) + ap.add_argument( + "--fetch-docs", action="store_true", help="Also fetch doc contents (slower; default: off)" + ) + ap.add_argument("--report", type=str, default=None, help="Optional JSON report path") + args = ap.parse_args() + + try: + from pyserini.search.lucene import LuceneSearcher + except Exception: + print("Pyserini not found. Install with: pip install pyserini", file=sys.stderr) + raise + + if not os.path.isdir(args.bm25_index): + print(f"Index directory not found: {args.bm25_index}", file=sys.stderr) + sys.exit(1) + + queries = load_queries(args.queries, args.limit) + if not queries: + print("No queries loaded.", file=sys.stderr) + sys.exit(1) + + print(f"Loaded {len(queries)} queries from {args.queries}") + print(f"Opening BM25 index: {args.bm25_index}") + searcher = LuceneSearcher(args.bm25_index) + # Some builds of pyserini require explicit set_bm25; others ignore + try: + searcher.set_bm25(k1=args.k1, b=args.b) + except Exception: + pass + + latencies: list[float] = [] + total_searches = 0 + + # Warmup + for i in range(min(args.warmup, len(queries))): + _ = searcher.search(queries[i], k=args.k) + + t0 = time.time() + for i, q in enumerate(queries): + t1 = time.time() + hits = searcher.search(q, k=args.k) + t2 = time.time() + latencies.append(t2 - t1) + total_searches += 1 + + if args.fetch_docs: + # Optional doc fetch to include I/O time + for h in hits: + try: + _ = searcher.doc(h.docid) + except Exception: + pass + + if (i + 1) % 50 == 0: + print(f"Processed {i + 1}/{len(queries)} queries") + + t1 = time.time() + total_time = t1 - t0 + + if latencies: + avg = mean(latencies) + p50 = percentile(latencies, 50) + p90 = percentile(latencies, 90) + p95 = percentile(latencies, 95) + p99 = percentile(latencies, 99) + qps = total_searches / total_time if total_time > 0 else 0.0 + else: + avg = p50 = p90 = p95 = p99 = qps = 0.0 + + print("BM25 Latency Report") + print(f" queries: {total_searches}") + print(f" k: {args.k}, k1: {args.k1}, b: {args.b}") + print(f" avg per query: {avg:.6f} s") + print(f" p50/p90/p95/p99: {p50:.6f}/{p90:.6f}/{p95:.6f}/{p99:.6f} s") + print(f" total time: {total_time:.3f} s, qps: {qps:.2f}") + + if args.report: + payload = { + "queries": total_searches, + "k": args.k, + "k1": args.k1, + "b": args.b, + "avg_s": avg, + "p50_s": p50, + "p90_s": p90, + "p95_s": p95, + "p99_s": p99, + "total_time_s": total_time, + "qps": qps, + "index_dir": os.path.abspath(args.bm25_index), + "fetch_docs": bool(args.fetch_docs), + } + with open(args.report, "w", encoding="utf-8") as f: + json.dump(payload, f, indent=2) + print(f"Saved report to {args.report}") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/bm25_diskann_baselines/run_diskann.py b/benchmarks/bm25_diskann_baselines/run_diskann.py new file mode 100644 index 0000000..2173af3 --- /dev/null +++ b/benchmarks/bm25_diskann_baselines/run_diskann.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +""" +Run DiskANN with real NQ queries (search-only timing). + +Steps: +- Load queries from nq_open.jsonl +- Compute embeddings (facebook/contriever-msmarco) once upfront +- Search via DiskANN (no recompute, no batching), measure per-query latency + +Example: + python benchmarks/bm25_diskann_baselines/run_diskann_nq.py \ + --index-dir benchmarks/data/indices/diskann_rpj_wiki \ + --index-prefix ann \ + --queries-file benchmarks/data/queries/nq_open.jsonl \ + --num-queries 200 --top-k 10 --complexity 120 --threads 1 --beam-width 1 +""" + +import argparse +import json +import time +from pathlib import Path + +import numpy as np + + +def load_queries(path: Path, limit: int | None) -> list[str]: + out: list[str] = [] + with open(path, encoding="utf-8") as f: + for line in f: + obj = json.loads(line) + out.append(obj["query"]) + if limit and len(out) >= limit: + break + return out + + +def main() -> None: + ap = argparse.ArgumentParser( + description="DiskANN baseline on real NQ queries (search-only timing)" + ) + ap.add_argument( + "--index-dir", + default="benchmarks/data/indices/diskann_rpj_wiki", + help="Directory containing DiskANN files", + ) + ap.add_argument("--index-prefix", default="ann") + ap.add_argument("--queries-file", default="benchmarks/data/queries/nq_open.jsonl") + ap.add_argument("--num-queries", type=int, default=200) + ap.add_argument("--top-k", type=int, default=10) + ap.add_argument("--complexity", type=int, default=120) + ap.add_argument("--threads", type=int, default=1) + ap.add_argument("--beam-width", type=int, default=1) + ap.add_argument("--cache-mechanism", type=int, default=2) + ap.add_argument("--num-nodes-to-cache", type=int, default=0) + args = ap.parse_args() + + index_dir = Path(args.index_dir).resolve() + if not index_dir.is_dir(): + raise SystemExit(f"Index dir not found: {index_dir}") + + qpath = Path(args.queries_file).resolve() + if not qpath.exists(): + raise SystemExit(f"Queries file not found: {qpath}") + + queries = load_queries(qpath, args.num_queries) + print(f"Loaded {len(queries)} queries from {qpath}") + + # Compute embeddings once (exclude from timing) + from leann.api import compute_embeddings as _compute + + embs = _compute( + queries, + model_name="facebook/contriever-msmarco", + mode="sentence-transformers", + use_server=False, + ).astype(np.float32) + if embs.ndim != 2: + raise SystemExit("Embedding compute failed or returned wrong shape") + + # Build searcher + from leann_backend_diskann.diskann_backend import DiskannSearcher as _DiskannSearcher + + index_prefix_path = str(index_dir / args.index_prefix) + searcher = _DiskannSearcher( + index_prefix_path, + num_threads=int(args.threads), + cache_mechanism=int(args.cache_mechanism), + num_nodes_to_cache=int(args.num_nodes_to_cache), + ) + + # Warmup (not timed) + _ = searcher.search( + embs[0:1], + top_k=args.top_k, + complexity=args.complexity, + beam_width=args.beam_width, + prune_ratio=0.0, + recompute_embeddings=False, + batch_recompute=False, + dedup_node_dis=False, + ) + + # Timed loop + times: list[float] = [] + for i in range(embs.shape[0]): + t0 = time.time() + _ = searcher.search( + embs[i : i + 1], + top_k=args.top_k, + complexity=args.complexity, + beam_width=args.beam_width, + prune_ratio=0.0, + recompute_embeddings=False, + batch_recompute=False, + dedup_node_dis=False, + ) + times.append(time.time() - t0) + + times_sorted = sorted(times) + avg = float(sum(times) / len(times)) + p50 = times_sorted[len(times) // 2] + p95 = times_sorted[max(0, int(len(times) * 0.95) - 1)] + + print("\nDiskANN (NQ, search-only) Report") + print(f" queries: {len(times)}") + print( + f" k: {args.top_k}, complexity: {args.complexity}, beam_width: {args.beam_width}, threads: {args.threads}" + ) + print(f" avg per query: {avg:.6f} s") + print(f" p50/p95: {p50:.6f}/{p95:.6f} s") + print(f" QPS: {1.0 / avg:.2f}") + + +if __name__ == "__main__": + main() diff --git a/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py b/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py index 96fb9ee..65aa9db 100644 --- a/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py +++ b/packages/leann-backend-diskann/leann_backend_diskann/diskann_backend.py @@ -343,7 +343,8 @@ class DiskannSearcher(BaseSearcher): "full_index_prefix": full_index_prefix, "num_threads": self.num_threads, "num_nodes_to_cache": kwargs.get("num_nodes_to_cache", 0), - "cache_mechanism": 1, + # 1 -> initialize cache using sample_data; 2 -> ready cache without init; others disable cache + "cache_mechanism": kwargs.get("cache_mechanism", 1), "pq_prefix": "", "partition_prefix": partition_prefix, } diff --git a/scripts/hf_upload.py b/scripts/hf_upload.py new file mode 100644 index 0000000..48ef14c --- /dev/null +++ b/scripts/hf_upload.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +""" +Upload local evaluation data to Hugging Face Hub, excluding diskann_rpj_wiki. + +Defaults: +- repo_id: LEANN-RAG/leann-rag-evaluation-data (dataset) +- folder_path: benchmarks/data +- ignore_patterns: diskann_rpj_wiki/** and .cache/** + +Requires authentication via `huggingface-cli login` or HF_TOKEN env var. +""" + +from __future__ import annotations + +import argparse +import os + +try: + from huggingface_hub import HfApi +except Exception as e: + raise SystemExit( + "huggingface_hub is required. Install with: pip install huggingface_hub hf_transfer" + ) from e + + +def _enable_transfer_accel_if_available() -> None: + """Best-effort enabling of accelerated transfers across hub versions. + + Tries the public util if present; otherwise, falls back to env flag when + hf_transfer is installed. Silently no-ops if unavailable. + """ + try: + # Newer huggingface_hub exposes this under utils + from huggingface_hub.utils import hf_hub_enable_hf_transfer # type: ignore + + hf_hub_enable_hf_transfer() + return + except Exception: + pass + + try: + # If hf_transfer is installed, set env flag recognized by the hub + import hf_transfer # noqa: F401 + + os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") + except Exception: + # Acceleration not available; proceed without it + pass + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="Upload local data to HF, excluding diskann_rpj_wiki") + p.add_argument( + "--repo-id", + default="LEANN-RAG/leann-rag-evaluation-data", + help="Target dataset repo id (namespace/name)", + ) + p.add_argument( + "--folder-path", + default="benchmarks/data", + help="Local folder to upload (default: benchmarks/data)", + ) + p.add_argument( + "--ignore", + default=["diskann_rpj_wiki/**", ".cache/**"], + nargs="+", + help="Glob patterns to ignore (space-separated)", + ) + p.add_argument( + "--allow", + default=["**"], + nargs="+", + help="Glob patterns to allow (space-separated). Defaults to everything.", + ) + p.add_argument( + "--message", + default="sync local data (exclude diskann_rpj_wiki)", + help="Commit message", + ) + p.add_argument( + "--no-transfer-accel", + action="store_true", + help="Disable hf_transfer accelerated uploads", + ) + return p.parse_args() + + +def main() -> None: + args = parse_args() + + if not args.no_transfer_accel: + _enable_transfer_accel_if_available() + + if not os.path.isdir(args.folder_path): + raise SystemExit(f"Folder not found: {args.folder_path}") + + print("Uploading to Hugging Face Hub:") + print(f" repo_id: {args.repo_id}") + print(" repo_type: dataset") + print(f" folder_path: {args.folder_path}") + print(f" allow_patterns: {args.allow}") + print(f" ignore_patterns:{args.ignore}") + + api = HfApi() + + # Perform upload. This skips unchanged files by content hash. + api.upload_folder( + repo_id=args.repo_id, + repo_type="dataset", + folder_path=args.folder_path, + path_in_repo=".", + allow_patterns=args.allow, + ignore_patterns=args.ignore, + commit_message=args.message, + ) + + print("Upload completed (unchanged files were skipped by the Hub).") + + +if __name__ == "__main__": + main()