experiments for running DiskANN & BM25 on Arch 4090
This commit is contained in:
23
benchmarks/bm25_diskann_baselines/README.md
Normal file
23
benchmarks/bm25_diskann_baselines/README.md
Normal file
@@ -0,0 +1,23 @@
|
||||
BM25 vs DiskANN Baselines
|
||||
|
||||
```bash
|
||||
aws s3 sync s3://powerrag-diskann-rpj-wiki-20250824-224037-194d640c/bm25_rpj_wiki/index_en_only/ benchmarks/data/indices/bm25_index/
|
||||
aws s3 sync s3://powerrag-diskann-rpj-wiki-20250824-224037-194d640c/diskann_rpj_wiki/ benchmarks/data/indices/diskann_rpj_wiki/
|
||||
```
|
||||
|
||||
- Dataset: `benchmarks/data/queries/nq_open.jsonl` (Natural Questions)
|
||||
- Machine-specific; results measured locally with the current repo.
|
||||
|
||||
DiskANN (NQ queries, search-only)
|
||||
- Command: `uv run benchmarks/bm25_diskann_baselines/run_diskann.py`
|
||||
- Settings: `recompute_embeddings=False`, embeddings precomputed (excluded from timing), batching off, caching off (`cache_mechanism=2`, `num_nodes_to_cache=0`)
|
||||
- Result: avg 0.019339 s/query, QPS 51.71 (p50 ~0.018936 s, p95 ~0.023573 s)
|
||||
|
||||
BM25
|
||||
- Command: `uv run --script ./benchmarks/run_bm25.py`
|
||||
- Settings: `k=10`, `k1=0.9`, `b=0.4`, queries=100
|
||||
- Result: avg 0.026976 s/query, QPS 37.07 (p50 0.024729 s, p90 0.042158 s, p95 0.047099 s, p99 0.053520 s)
|
||||
|
||||
Notes
|
||||
- DiskANN measures search-only latency on real NQ queries (embeddings computed beforehand and excluded from timing).
|
||||
- Use `benchmarks/bm25_diskann_baselines/run_diskann.py` for DiskANN; `benchmarks/run_bm25.py` for BM25.
|
||||
|
After Width: | Height: | Size: 1.2 KiB |
183
benchmarks/bm25_diskann_baselines/run_bm25.py
Normal file
183
benchmarks/bm25_diskann_baselines/run_bm25.py
Normal file
@@ -0,0 +1,183 @@
|
||||
# /// script
|
||||
# dependencies = [
|
||||
# "pyserini"
|
||||
# ]
|
||||
# ///
|
||||
# sudo pacman -S jdk21-openjdk
|
||||
# export JAVA_HOME=/usr/lib/jvm/java-21-openjdk
|
||||
# sudo archlinux-java status
|
||||
# sudo archlinux-java set java-21-openjdk
|
||||
# set -Ux JAVA_HOME /usr/lib/jvm/java-21-openjdk
|
||||
# fish_add_path --global $JAVA_HOME/bin
|
||||
# set -Ux LD_LIBRARY_PATH $JAVA_HOME/lib/server $LD_LIBRARY_PATH
|
||||
# which javac # Should be /usr/lib/jvm/java-21-openjdk/bin/javac
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from statistics import mean
|
||||
|
||||
|
||||
def load_queries(path: str, limit: int | None) -> list[str]:
|
||||
queries: list[str] = []
|
||||
# Try JSONL with a 'query' or 'text' field; fallback to plain text (one query per line)
|
||||
_, ext = os.path.splitext(path)
|
||||
if ext.lower() in {".jsonl", ".json"}:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
# Not strict JSONL? treat the whole line as the query
|
||||
queries.append(line)
|
||||
continue
|
||||
q = obj.get("query") or obj.get("text") or obj.get("question")
|
||||
if q:
|
||||
queries.append(str(q))
|
||||
else:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
s = line.strip()
|
||||
if s:
|
||||
queries.append(s)
|
||||
|
||||
if limit is not None and limit > 0:
|
||||
queries = queries[:limit]
|
||||
return queries
|
||||
|
||||
|
||||
def percentile(values: list[float], p: float) -> float:
|
||||
if not values:
|
||||
return 0.0
|
||||
s = sorted(values)
|
||||
k = (len(s) - 1) * (p / 100.0)
|
||||
f = int(k)
|
||||
c = min(f + 1, len(s) - 1)
|
||||
if f == c:
|
||||
return s[f]
|
||||
return s[f] + (s[c] - s[f]) * (k - f)
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Standalone BM25 latency benchmark (Pyserini)")
|
||||
ap.add_argument(
|
||||
"--bm25-index",
|
||||
default="benchmarks/data/indices/bm25_index",
|
||||
help="Path to Pyserini Lucene index directory",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--queries",
|
||||
default="benchmarks/data/queries/nq_open.jsonl",
|
||||
help="Path to queries file (JSONL with 'query'/'text' or plain txt one-per-line)",
|
||||
)
|
||||
ap.add_argument("--k", type=int, default=10, help="Top-k to retrieve (default: 10)")
|
||||
ap.add_argument("--k1", type=float, default=0.9, help="BM25 k1 (default: 0.9)")
|
||||
ap.add_argument("--b", type=float, default=0.4, help="BM25 b (default: 0.4)")
|
||||
ap.add_argument("--limit", type=int, default=100, help="Max queries to run (default: 100)")
|
||||
ap.add_argument(
|
||||
"--warmup", type=int, default=5, help="Warmup queries not counted in latency (default: 5)"
|
||||
)
|
||||
ap.add_argument(
|
||||
"--fetch-docs", action="store_true", help="Also fetch doc contents (slower; default: off)"
|
||||
)
|
||||
ap.add_argument("--report", type=str, default=None, help="Optional JSON report path")
|
||||
args = ap.parse_args()
|
||||
|
||||
try:
|
||||
from pyserini.search.lucene import LuceneSearcher
|
||||
except Exception:
|
||||
print("Pyserini not found. Install with: pip install pyserini", file=sys.stderr)
|
||||
raise
|
||||
|
||||
if not os.path.isdir(args.bm25_index):
|
||||
print(f"Index directory not found: {args.bm25_index}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
queries = load_queries(args.queries, args.limit)
|
||||
if not queries:
|
||||
print("No queries loaded.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Loaded {len(queries)} queries from {args.queries}")
|
||||
print(f"Opening BM25 index: {args.bm25_index}")
|
||||
searcher = LuceneSearcher(args.bm25_index)
|
||||
# Some builds of pyserini require explicit set_bm25; others ignore
|
||||
try:
|
||||
searcher.set_bm25(k1=args.k1, b=args.b)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
latencies: list[float] = []
|
||||
total_searches = 0
|
||||
|
||||
# Warmup
|
||||
for i in range(min(args.warmup, len(queries))):
|
||||
_ = searcher.search(queries[i], k=args.k)
|
||||
|
||||
t0 = time.time()
|
||||
for i, q in enumerate(queries):
|
||||
t1 = time.time()
|
||||
hits = searcher.search(q, k=args.k)
|
||||
t2 = time.time()
|
||||
latencies.append(t2 - t1)
|
||||
total_searches += 1
|
||||
|
||||
if args.fetch_docs:
|
||||
# Optional doc fetch to include I/O time
|
||||
for h in hits:
|
||||
try:
|
||||
_ = searcher.doc(h.docid)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if (i + 1) % 50 == 0:
|
||||
print(f"Processed {i + 1}/{len(queries)} queries")
|
||||
|
||||
t1 = time.time()
|
||||
total_time = t1 - t0
|
||||
|
||||
if latencies:
|
||||
avg = mean(latencies)
|
||||
p50 = percentile(latencies, 50)
|
||||
p90 = percentile(latencies, 90)
|
||||
p95 = percentile(latencies, 95)
|
||||
p99 = percentile(latencies, 99)
|
||||
qps = total_searches / total_time if total_time > 0 else 0.0
|
||||
else:
|
||||
avg = p50 = p90 = p95 = p99 = qps = 0.0
|
||||
|
||||
print("BM25 Latency Report")
|
||||
print(f" queries: {total_searches}")
|
||||
print(f" k: {args.k}, k1: {args.k1}, b: {args.b}")
|
||||
print(f" avg per query: {avg:.6f} s")
|
||||
print(f" p50/p90/p95/p99: {p50:.6f}/{p90:.6f}/{p95:.6f}/{p99:.6f} s")
|
||||
print(f" total time: {total_time:.3f} s, qps: {qps:.2f}")
|
||||
|
||||
if args.report:
|
||||
payload = {
|
||||
"queries": total_searches,
|
||||
"k": args.k,
|
||||
"k1": args.k1,
|
||||
"b": args.b,
|
||||
"avg_s": avg,
|
||||
"p50_s": p50,
|
||||
"p90_s": p90,
|
||||
"p95_s": p95,
|
||||
"p99_s": p99,
|
||||
"total_time_s": total_time,
|
||||
"qps": qps,
|
||||
"index_dir": os.path.abspath(args.bm25_index),
|
||||
"fetch_docs": bool(args.fetch_docs),
|
||||
}
|
||||
with open(args.report, "w", encoding="utf-8") as f:
|
||||
json.dump(payload, f, indent=2)
|
||||
print(f"Saved report to {args.report}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
135
benchmarks/bm25_diskann_baselines/run_diskann.py
Normal file
135
benchmarks/bm25_diskann_baselines/run_diskann.py
Normal file
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Run DiskANN with real NQ queries (search-only timing).
|
||||
|
||||
Steps:
|
||||
- Load queries from nq_open.jsonl
|
||||
- Compute embeddings (facebook/contriever-msmarco) once upfront
|
||||
- Search via DiskANN (no recompute, no batching), measure per-query latency
|
||||
|
||||
Example:
|
||||
python benchmarks/bm25_diskann_baselines/run_diskann_nq.py \
|
||||
--index-dir benchmarks/data/indices/diskann_rpj_wiki \
|
||||
--index-prefix ann \
|
||||
--queries-file benchmarks/data/queries/nq_open.jsonl \
|
||||
--num-queries 200 --top-k 10 --complexity 120 --threads 1 --beam-width 1
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def load_queries(path: Path, limit: int | None) -> list[str]:
|
||||
out: list[str] = []
|
||||
with open(path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
obj = json.loads(line)
|
||||
out.append(obj["query"])
|
||||
if limit and len(out) >= limit:
|
||||
break
|
||||
return out
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(
|
||||
description="DiskANN baseline on real NQ queries (search-only timing)"
|
||||
)
|
||||
ap.add_argument(
|
||||
"--index-dir",
|
||||
default="benchmarks/data/indices/diskann_rpj_wiki",
|
||||
help="Directory containing DiskANN files",
|
||||
)
|
||||
ap.add_argument("--index-prefix", default="ann")
|
||||
ap.add_argument("--queries-file", default="benchmarks/data/queries/nq_open.jsonl")
|
||||
ap.add_argument("--num-queries", type=int, default=200)
|
||||
ap.add_argument("--top-k", type=int, default=10)
|
||||
ap.add_argument("--complexity", type=int, default=120)
|
||||
ap.add_argument("--threads", type=int, default=1)
|
||||
ap.add_argument("--beam-width", type=int, default=1)
|
||||
ap.add_argument("--cache-mechanism", type=int, default=2)
|
||||
ap.add_argument("--num-nodes-to-cache", type=int, default=0)
|
||||
args = ap.parse_args()
|
||||
|
||||
index_dir = Path(args.index_dir).resolve()
|
||||
if not index_dir.is_dir():
|
||||
raise SystemExit(f"Index dir not found: {index_dir}")
|
||||
|
||||
qpath = Path(args.queries_file).resolve()
|
||||
if not qpath.exists():
|
||||
raise SystemExit(f"Queries file not found: {qpath}")
|
||||
|
||||
queries = load_queries(qpath, args.num_queries)
|
||||
print(f"Loaded {len(queries)} queries from {qpath}")
|
||||
|
||||
# Compute embeddings once (exclude from timing)
|
||||
from leann.api import compute_embeddings as _compute
|
||||
|
||||
embs = _compute(
|
||||
queries,
|
||||
model_name="facebook/contriever-msmarco",
|
||||
mode="sentence-transformers",
|
||||
use_server=False,
|
||||
).astype(np.float32)
|
||||
if embs.ndim != 2:
|
||||
raise SystemExit("Embedding compute failed or returned wrong shape")
|
||||
|
||||
# Build searcher
|
||||
from leann_backend_diskann.diskann_backend import DiskannSearcher as _DiskannSearcher
|
||||
|
||||
index_prefix_path = str(index_dir / args.index_prefix)
|
||||
searcher = _DiskannSearcher(
|
||||
index_prefix_path,
|
||||
num_threads=int(args.threads),
|
||||
cache_mechanism=int(args.cache_mechanism),
|
||||
num_nodes_to_cache=int(args.num_nodes_to_cache),
|
||||
)
|
||||
|
||||
# Warmup (not timed)
|
||||
_ = searcher.search(
|
||||
embs[0:1],
|
||||
top_k=args.top_k,
|
||||
complexity=args.complexity,
|
||||
beam_width=args.beam_width,
|
||||
prune_ratio=0.0,
|
||||
recompute_embeddings=False,
|
||||
batch_recompute=False,
|
||||
dedup_node_dis=False,
|
||||
)
|
||||
|
||||
# Timed loop
|
||||
times: list[float] = []
|
||||
for i in range(embs.shape[0]):
|
||||
t0 = time.time()
|
||||
_ = searcher.search(
|
||||
embs[i : i + 1],
|
||||
top_k=args.top_k,
|
||||
complexity=args.complexity,
|
||||
beam_width=args.beam_width,
|
||||
prune_ratio=0.0,
|
||||
recompute_embeddings=False,
|
||||
batch_recompute=False,
|
||||
dedup_node_dis=False,
|
||||
)
|
||||
times.append(time.time() - t0)
|
||||
|
||||
times_sorted = sorted(times)
|
||||
avg = float(sum(times) / len(times))
|
||||
p50 = times_sorted[len(times) // 2]
|
||||
p95 = times_sorted[max(0, int(len(times) * 0.95) - 1)]
|
||||
|
||||
print("\nDiskANN (NQ, search-only) Report")
|
||||
print(f" queries: {len(times)}")
|
||||
print(
|
||||
f" k: {args.top_k}, complexity: {args.complexity}, beam_width: {args.beam_width}, threads: {args.threads}"
|
||||
)
|
||||
print(f" avg per query: {avg:.6f} s")
|
||||
print(f" p50/p95: {p50:.6f}/{p95:.6f} s")
|
||||
print(f" QPS: {1.0 / avg:.2f}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -343,7 +343,8 @@ class DiskannSearcher(BaseSearcher):
|
||||
"full_index_prefix": full_index_prefix,
|
||||
"num_threads": self.num_threads,
|
||||
"num_nodes_to_cache": kwargs.get("num_nodes_to_cache", 0),
|
||||
"cache_mechanism": 1,
|
||||
# 1 -> initialize cache using sample_data; 2 -> ready cache without init; others disable cache
|
||||
"cache_mechanism": kwargs.get("cache_mechanism", 1),
|
||||
"pq_prefix": "",
|
||||
"partition_prefix": partition_prefix,
|
||||
}
|
||||
|
||||
121
scripts/hf_upload.py
Normal file
121
scripts/hf_upload.py
Normal file
@@ -0,0 +1,121 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Upload local evaluation data to Hugging Face Hub, excluding diskann_rpj_wiki.
|
||||
|
||||
Defaults:
|
||||
- repo_id: LEANN-RAG/leann-rag-evaluation-data (dataset)
|
||||
- folder_path: benchmarks/data
|
||||
- ignore_patterns: diskann_rpj_wiki/** and .cache/**
|
||||
|
||||
Requires authentication via `huggingface-cli login` or HF_TOKEN env var.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
try:
|
||||
from huggingface_hub import HfApi
|
||||
except Exception as e:
|
||||
raise SystemExit(
|
||||
"huggingface_hub is required. Install with: pip install huggingface_hub hf_transfer"
|
||||
) from e
|
||||
|
||||
|
||||
def _enable_transfer_accel_if_available() -> None:
|
||||
"""Best-effort enabling of accelerated transfers across hub versions.
|
||||
|
||||
Tries the public util if present; otherwise, falls back to env flag when
|
||||
hf_transfer is installed. Silently no-ops if unavailable.
|
||||
"""
|
||||
try:
|
||||
# Newer huggingface_hub exposes this under utils
|
||||
from huggingface_hub.utils import hf_hub_enable_hf_transfer # type: ignore
|
||||
|
||||
hf_hub_enable_hf_transfer()
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
# If hf_transfer is installed, set env flag recognized by the hub
|
||||
import hf_transfer # noqa: F401
|
||||
|
||||
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
|
||||
except Exception:
|
||||
# Acceleration not available; proceed without it
|
||||
pass
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Upload local data to HF, excluding diskann_rpj_wiki")
|
||||
p.add_argument(
|
||||
"--repo-id",
|
||||
default="LEANN-RAG/leann-rag-evaluation-data",
|
||||
help="Target dataset repo id (namespace/name)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--folder-path",
|
||||
default="benchmarks/data",
|
||||
help="Local folder to upload (default: benchmarks/data)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--ignore",
|
||||
default=["diskann_rpj_wiki/**", ".cache/**"],
|
||||
nargs="+",
|
||||
help="Glob patterns to ignore (space-separated)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--allow",
|
||||
default=["**"],
|
||||
nargs="+",
|
||||
help="Glob patterns to allow (space-separated). Defaults to everything.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--message",
|
||||
default="sync local data (exclude diskann_rpj_wiki)",
|
||||
help="Commit message",
|
||||
)
|
||||
p.add_argument(
|
||||
"--no-transfer-accel",
|
||||
action="store_true",
|
||||
help="Disable hf_transfer accelerated uploads",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
|
||||
if not args.no_transfer_accel:
|
||||
_enable_transfer_accel_if_available()
|
||||
|
||||
if not os.path.isdir(args.folder_path):
|
||||
raise SystemExit(f"Folder not found: {args.folder_path}")
|
||||
|
||||
print("Uploading to Hugging Face Hub:")
|
||||
print(f" repo_id: {args.repo_id}")
|
||||
print(" repo_type: dataset")
|
||||
print(f" folder_path: {args.folder_path}")
|
||||
print(f" allow_patterns: {args.allow}")
|
||||
print(f" ignore_patterns:{args.ignore}")
|
||||
|
||||
api = HfApi()
|
||||
|
||||
# Perform upload. This skips unchanged files by content hash.
|
||||
api.upload_folder(
|
||||
repo_id=args.repo_id,
|
||||
repo_type="dataset",
|
||||
folder_path=args.folder_path,
|
||||
path_in_repo=".",
|
||||
allow_patterns=args.allow,
|
||||
ignore_patterns=args.ignore,
|
||||
commit_message=args.message,
|
||||
)
|
||||
|
||||
print("Upload completed (unchanged files were skipped by the Hub).")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user