reproduce docvqa results

This commit is contained in:
yichuan-w
2025-11-14 10:22:42 +00:00
parent ae3b8af3df
commit 07afe546ea
3 changed files with 666 additions and 237 deletions

View File

@@ -223,17 +223,13 @@ def _embed_queries(model, processor, queries: list[str]) -> list[Any]:
model.eval()
# Match MTEB's exact query processing from ColPaliEngineWrapper.get_text_embeddings:
# 1. MTEB receives batch["text"] which may already include instruction/prompt
# 1. MTEB receives batch["text"] which already includes instruction/prompt (from _combine_queries_with_instruction_text)
# 2. Manually adds: query_prefix + text + query_augmentation_token * 10
# 3. Calls processor.process_queries(batch) where batch is now a list of strings
# 4. process_queries adds: query_prefix + text + suffix (suffix = query_augmentation_token * 10)
#
# However, MTEB's approach results in duplicate addition (20 tokens total).
# Since we're already adding the prompt in search_queries, let's try:
# Option 1: Just call process_queries (let it handle all additions) - avoids duplicate
# Option 2: Manual add + process_texts (to avoid duplicate)
#
# Testing shows Option 1 works better - just call process_queries without manual addition
# This results in duplicate addition: query_prefix is added twice, query_augmentation_token * 20 total
# We need to match this exactly to reproduce MTEB results
all_embeds = []
batch_size = 32 # Match MTEB's default batch_size
@@ -242,9 +238,15 @@ def _embed_queries(model, processor, queries: list[str]) -> list[Any]:
for i in tqdm(range(0, len(queries), batch_size), desc="Embedding queries"):
batch_queries = queries[i:i + batch_size]
# Just call process_queries - it will add query_prefix + text + 10 tokens
# This avoids duplicate addition that happens in MTEB's approach
inputs = processor.process_queries(batch_queries)
# Match MTEB: manually add query_prefix + text + query_augmentation_token * 10
# Then process_queries will add them again (resulting in 20 augmentation tokens total)
batch = [
processor.query_prefix
+ t
+ processor.query_augmentation_token * 10
for t in batch_queries
]
inputs = processor.process_queries(batch)
inputs = {k: v.to(model.device) for k, v in inputs.items()}
if model.device.type == "cuda":
@@ -1044,3 +1046,249 @@ class LeannMultiVector:
"image_path": meta.get("image_path", ""),
}
return None
class ViDoReBenchmarkEvaluator:
"""
A reusable class for evaluating ViDoRe benchmarks (v1 and v2).
This class encapsulates common functionality for building indexes, searching, and evaluating.
"""
def __init__(
self,
model_name: str,
use_fast_plaid: bool = False,
top_k: int = 100,
first_stage_k: int = 500,
k_values: list[int] = None,
):
"""
Initialize the evaluator.
Args:
model_name: Model name ("colqwen2" or "colpali")
use_fast_plaid: Whether to use Fast-Plaid instead of LEANN
top_k: Top-k results to retrieve
first_stage_k: First stage k for LEANN search
k_values: List of k values for evaluation metrics
"""
self.model_name = model_name
self.use_fast_plaid = use_fast_plaid
self.top_k = top_k
self.first_stage_k = first_stage_k
self.k_values = k_values if k_values is not None else [1, 3, 5, 10, 100]
# Load model once (can be reused across tasks)
self._model = None
self._processor = None
self._model_name_actual = None
def _load_model_if_needed(self):
"""Lazy load the model."""
if self._model is None:
print(f"\nLoading model: {self.model_name}")
self._model_name_actual, self._model, self._processor, _, _, _ = _load_colvision(self.model_name)
print(f"Model loaded: {self._model_name_actual}")
def build_index_from_corpus(
self,
corpus: dict[str, Image.Image],
index_path: str,
rebuild: bool = False,
) -> tuple[Any, list[str]]:
"""
Build index from corpus images.
Args:
corpus: dict mapping corpus_id to PIL Image
index_path: Path to save/load the index
rebuild: Whether to rebuild even if index exists
Returns:
tuple: (retriever or fast_plaid_index object, list of corpus_ids in order)
"""
self._load_model_if_needed()
# Ensure consistent ordering
corpus_ids = sorted(corpus.keys())
images = [corpus[cid] for cid in corpus_ids]
if self.use_fast_plaid:
# Check if Fast-Plaid index exists
if not rebuild and _load_fast_plaid_index_if_exists(index_path) is not None:
print(f"Fast-Plaid index already exists at {index_path}")
return _load_fast_plaid_index_if_exists(index_path), corpus_ids
print(f"Building Fast-Plaid index at {index_path}...")
print("Embedding images...")
doc_vecs = _embed_images(self._model, self._processor, images)
fast_plaid_index, build_time = _build_fast_plaid_index(
index_path, doc_vecs, corpus_ids, images
)
print(f"Fast-Plaid index built in {build_time:.2f}s")
return fast_plaid_index, corpus_ids
else:
# Check if LEANN index exists
if not rebuild:
retriever = _load_retriever_if_index_exists(index_path)
if retriever is not None:
print(f"LEANN index already exists at {index_path}")
return retriever, corpus_ids
print(f"Building LEANN index at {index_path}...")
print("Embedding images...")
doc_vecs = _embed_images(self._model, self._processor, images)
retriever = _build_index(index_path, doc_vecs, corpus_ids, images)
print(f"LEANN index built")
return retriever, corpus_ids
def search_queries(
self,
queries: dict[str, str],
corpus_ids: list[str],
index_or_retriever: Any,
fast_plaid_index_path: Optional[str] = None,
task_prompt: Optional[dict[str, str]] = None,
) -> dict[str, dict[str, float]]:
"""
Search queries against the index.
Args:
queries: dict mapping query_id to query text
corpus_ids: list of corpus_ids in the same order as the index
index_or_retriever: index or retriever object
fast_plaid_index_path: path to Fast-Plaid index (for metadata)
task_prompt: Optional dict with prompt for query (e.g., {"query": "..."})
Returns:
results: dict mapping query_id to dict of {corpus_id: score}
"""
self._load_model_if_needed()
print(f"Searching {len(queries)} queries (top_k={self.top_k})...")
query_ids = list(queries.keys())
query_texts = [queries[qid] for qid in query_ids]
# Note: ColPaliEngineWrapper does NOT use task prompt from metadata
# It uses query_prefix + text + query_augmentation_token (handled in _embed_queries)
# So we don't append task_prompt here to match MTEB behavior
# Embed queries
print("Embedding queries...")
query_vecs = _embed_queries(self._model, self._processor, query_texts)
results = {}
for query_id, query_vec in zip(tqdm(query_ids, desc="Searching"), query_vecs):
if self.use_fast_plaid:
# Fast-Plaid search
search_results, _ = _search_fast_plaid(index_or_retriever, query_vec, self.top_k)
query_results = {}
for score, doc_id in search_results:
if doc_id < len(corpus_ids):
corpus_id = corpus_ids[doc_id]
query_results[corpus_id] = float(score)
else:
# LEANN search
import torch
query_np = query_vec.float().numpy() if isinstance(query_vec, torch.Tensor) else query_vec
search_results = index_or_retriever.search_exact_all(query_np, topk=self.top_k)
query_results = {}
for score, doc_id in search_results:
if doc_id < len(corpus_ids):
corpus_id = corpus_ids[doc_id]
query_results[corpus_id] = float(score)
results[query_id] = query_results
return results
@staticmethod
def evaluate_results(
results: dict[str, dict[str, float]],
qrels: dict[str, dict[str, int]],
k_values: list[int] = None,
) -> dict[str, float]:
"""
Evaluate retrieval results using NDCG and other metrics.
Args:
results: dict mapping query_id to dict of {corpus_id: score}
qrels: dict mapping query_id to dict of {corpus_id: relevance_score}
k_values: List of k values for evaluation metrics
Returns:
Dictionary of metric scores
"""
try:
import pytrec_eval
from mteb._evaluators.retrieval_metrics import (
calculate_retrieval_scores,
make_score_dict,
)
except ImportError:
raise ImportError("pytrec_eval is required for evaluation. Install with: pip install pytrec-eval")
if k_values is None:
k_values = [1, 3, 5, 10, 100]
# Check if we have any queries to evaluate
if len(results) == 0:
print("Warning: No queries to evaluate. Returning zero scores.")
scores = {}
for k in k_values:
scores[f"ndcg_at_{k}"] = 0.0
scores[f"map_at_{k}"] = 0.0
scores[f"recall_at_{k}"] = 0.0
scores[f"precision_at_{k}"] = 0.0
scores[f"mrr_at_{k}"] = 0.0
return scores
print(f"Evaluating results with k_values={k_values}...")
print(f"Before filtering: {len(results)} results, {len(qrels)} qrels")
# Filter to ensure qrels and results have the same query set
# This matches MTEB behavior: only evaluate queries that exist in both
# pytrec_eval only evaluates queries in qrels, so we need to ensure
# results contains all queries in qrels, and filter out queries not in qrels
results_filtered = {qid: res for qid, res in results.items() if qid in qrels}
qrels_filtered = {qid: rel_docs for qid, rel_docs in qrels.items() if qid in results_filtered}
print(f"After filtering: {len(results_filtered)} results, {len(qrels_filtered)} qrels")
if len(results_filtered) != len(qrels_filtered):
print(f"Warning: Mismatch between results ({len(results_filtered)}) and qrels ({len(qrels_filtered)}) queries")
missing_in_results = set(qrels.keys()) - set(results.keys())
if missing_in_results:
print(f"Queries in qrels but not in results: {len(missing_in_results)} queries")
print(f"First 5 missing queries: {list(missing_in_results)[:5]}")
# Convert qrels to pytrec_eval format
qrels_pytrec = {}
for qid, rel_docs in qrels_filtered.items():
qrels_pytrec[qid] = {did: score for did, score in rel_docs.items()}
# Evaluate
eval_result = calculate_retrieval_scores(
results=results_filtered,
qrels=qrels_pytrec,
k_values=k_values,
)
# Format scores
scores = make_score_dict(
ndcg=eval_result.ndcg,
_map=eval_result.map,
recall=eval_result.recall,
precision=eval_result.precision,
mrr=eval_result.mrr,
naucs=eval_result.naucs,
naucs_mrr=eval_result.naucs_mrr,
cv_recall=eval_result.cv_recall,
task_scores={},
)
return scores