Compare commits

..

8 Commits

Author SHA1 Message Date
yichuan520030910320
00c44e3980 [cli] fix # 81 2025-09-01 16:53:09 -07:00
yichuan520030910320
e6a542bf4b [cli] better gitignore / better leann list 2025-09-01 16:42:11 -07:00
yichuan520030910320
7e84dae02e [chore] add slack to share use case 2025-08-30 00:32:13 -07:00
yichuan520030910320
2f05ed4535 chore(submodule): bump faiss to latest storage-efficient build 2025-08-23 18:29:11 -07:00
yichuan520030910320
4e5b73ce7b fix bug introduce in #58 2025-08-22 02:35:09 -07:00
Gabriel Dehan
31b4973141 Metadata filtering feature (#75)
* Metadata filtering initial version

* Metadata filtering initial version

* Fixes linter issues

* Cleanup code

* Clean up and readme

* Fix after review

* Use UV in example

* Merge main into feature/metadata-filtering
2025-08-20 19:57:56 -07:00
Yichuan Wang
dde2221513 [EXP] Update the benchmark code (#71)
* chore(hnsw): reorder imports to satisfy ruff I001

* chore: sync changes; fix Ruff import order; update examples, benchmarks, and dependencies

- Fix import order in packages/leann-backend-hnsw/leann_backend_hnsw/hnsw_backend.py (Ruff I001)

- Update benchmarks/run_evaluation.py

- Update apps/base_rag_example.py and leann-core API usage

- Add benchmarks/data/README.md

- Update uv.lock

- Misc cleanup

- Note: added paru-bin as an embedded git repo; consider making it a submodule (git rm --cached paru-bin) if unintended

* chore: remove unintended embedded repo paru-bin and ignore it

Fix CI: avoid missing .gitmodules entry by removing gitlink and adding to .gitignore.

* ci: retrigger after removing unintended gitlink (paru-bin)

* feat(benchmarks): add --batch-size option and plumb through to HNSW search (default 0)

* feat(hnsw): add batch_size to LeannSearcher.search and LeannChat.ask; forward only for HNSW backend

* chore(logging): surface recompute and batching params; enable INFO logging in benchmark

* feat(embeddings): add optional manual tokenization path (HF tokenizer+model) with mean pooling; default remains SentenceTransformer.encode

* fix micro bench and fix pre commit

* update readme

---------

Co-authored-by: yichuan-w <yichuan-w@users.noreply.github.com>
2025-08-20 17:31:46 -07:00
Andy Lee
6d11e86e71 Run Evaluation RPJ Wiki on Arch Linux (#74)
* chore: ignore benchmark data

* perf: avoid merging offset dicts for lower mem usage

* style: format

* docs: rpj_wiki
2025-08-20 12:25:54 -07:00
20 changed files with 2849 additions and 1266 deletions

5
.gitignore vendored
View File

@@ -93,5 +93,10 @@ packages/leann-backend-diskann/third_party/DiskANN/_deps/
batchtest.py batchtest.py
tests/__pytest_cache__/ tests/__pytest_cache__/
tests/__pycache__/ tests/__pycache__/
paru-bin/
CLAUDE.md
CLAUDE.local.md
.claude/*.local.*
.claude/local/*
benchmarks/data/ benchmarks/data/

View File

@@ -13,4 +13,5 @@ repos:
rev: v0.12.7 # Fixed version to match pyproject.toml rev: v0.12.7 # Fixed version to match pyproject.toml
hooks: hooks:
- id: ruff - id: ruff
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format - id: ruff-format

View File

@@ -8,6 +8,8 @@
<img src="https://img.shields.io/badge/Platform-Ubuntu%20%26%20Arch%20%26%20WSL%20%7C%20macOS%20(ARM64%2FIntel)-lightgrey" alt="Platform"> <img src="https://img.shields.io/badge/Platform-Ubuntu%20%26%20Arch%20%26%20WSL%20%7C%20macOS%20(ARM64%2FIntel)-lightgrey" alt="Platform">
<img src="https://img.shields.io/badge/License-MIT-green.svg" alt="MIT License"> <img src="https://img.shields.io/badge/License-MIT-green.svg" alt="MIT License">
<img src="https://img.shields.io/badge/MCP-Native%20Integration-blue" alt="MCP Integration"> <img src="https://img.shields.io/badge/MCP-Native%20Integration-blue" alt="MCP Integration">
<a href="https://join.slack.com/t/leann-e2u9779/shared_invite/zt-3ckd2f6w1-OX08~NN4gkWhh10PRVBj1Q"><img src="https://img.shields.io/badge/Slack-Join-4A154B?logo=slack&logoColor=white" alt="Join Slack">
<a href="assets/wechat_user_group.JPG" title="Join WeChat group"><img src="https://img.shields.io/badge/WeChat-Join-2DC100?logo=wechat&logoColor=white" alt="Join WeChat group"></a>
</p> </p>
<h2 align="center" tabindex="-1" class="heading-element" dir="auto"> <h2 align="center" tabindex="-1" class="heading-element" dir="auto">
@@ -176,8 +178,7 @@ response = chat.ask("How much storage does LEANN save?", top_k=1)
LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, and more. LEANN supports RAG on various data sources including documents (`.pdf`, `.txt`, `.md`), Apple Mail, Google Search History, WeChat, and more.
**AST-Aware Code Chunking** - LEANN also features intelligent code chunking that preserves semantic boundaries (functions, classes, methods) for Python, Java, C#, and TypeScript files, providing improved code understanding compared to traditional text-based approaches.
📖 Read the [AST Chunking Guide →](docs/ast_chunking_guide.md) to learn more.
### Generation Model Setup ### Generation Model Setup
@@ -221,7 +222,8 @@ ollama pull llama3.2:1b
</details> </details>
### ⭐ Flexible Configuration
## ⭐ Flexible Configuration
LEANN provides flexible parameters for embedding models, search strategies, and data processing to fit your specific needs. LEANN provides flexible parameters for embedding models, search strategies, and data processing to fit your specific needs.
@@ -477,6 +479,15 @@ Once the index is built, you can ask questions like:
### 🚀 Claude Code Integration: Transform Your Development Workflow! ### 🚀 Claude Code Integration: Transform Your Development Workflow!
<details>
<summary><strong>NEW!! ASTAware Code Chunking</strong></summary>
LEANN features intelligent code chunking that preserves semantic boundaries (functions, classes, methods) for Python, Java, C#, and TypeScript, improving code understanding compared to text-based chunking.
📖 Read the [AST Chunking Guide →](docs/ast_chunking_guide.md)
</details>
**The future of code assistance is here.** Transform your development workflow with LEANN's native MCP integration for Claude Code. Index your entire codebase and get intelligent code assistance directly in your IDE. **The future of code assistance is here.** Transform your development workflow with LEANN's native MCP integration for Claude Code. Index your entire codebase and get intelligent code assistance directly in your IDE.
**Key features:** **Key features:**
@@ -618,6 +629,33 @@ Options:
</details> </details>
## 🚀 Advanced Features
### 🎯 Metadata Filtering
LEANN supports a simple metadata filtering system to enable sophisticated use cases like document filtering by date/type, code search by file extension, and content management based on custom criteria.
```python
# Add metadata during indexing
builder.add_text(
"def authenticate_user(token): ...",
metadata={"file_extension": ".py", "lines_of_code": 25}
)
# Search with filters
results = searcher.search(
query="authentication function",
metadata_filters={
"file_extension": {"==": ".py"},
"lines_of_code": {"<": 100}
}
)
```
**Supported operators**: `==`, `!=`, `<`, `<=`, `>`, `>=`, `in`, `not_in`, `contains`, `starts_with`, `ends_with`, `is_true`, `is_false`
📖 **[Complete Metadata filtering guide →](docs/metadata_filtering.md)**
## 🏗️ Architecture & How It Works ## 🏗️ Architecture & How It Works
<p align="center"> <p align="center">
@@ -697,6 +735,9 @@ MIT License - see [LICENSE](LICENSE) for details.
Core Contributors: [Yichuan Wang](https://yichuan-w.github.io/) & [Zhifei Li](https://github.com/andylizf). Core Contributors: [Yichuan Wang](https://yichuan-w.github.io/) & [Zhifei Li](https://github.com/andylizf).
Active Contributors: [Gabriel Dehan](https://github.com/gabriel-dehan)
We welcome more contributors! Feel free to open issues or submit PRs. We welcome more contributors! Feel free to open issues or submit PRs.
This work is done at [**Berkeley Sky Computing Lab**](https://sky.cs.berkeley.edu/). This work is done at [**Berkeley Sky Computing Lab**](https://sky.cs.berkeley.edu/).

View File

@@ -299,7 +299,6 @@ class BaseRAGExample(ABC):
chat = LeannChat( chat = LeannChat(
index_path, index_path,
llm_config=self.get_llm_config(args), llm_config=self.get_llm_config(args),
system_prompt=f"You are a helpful assistant that answers questions about {self.name} data.",
complexity=args.search_complexity, complexity=args.search_complexity,
) )

View File

@@ -10,7 +10,8 @@ from pathlib import Path
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks from base_rag_example import BaseRAGExample
from chunking import create_text_chunks
from .history_data.history import ChromeHistoryReader from .history_data.history import ChromeHistoryReader

View File

@@ -9,7 +9,8 @@ from pathlib import Path
# Add parent directory to path for imports # Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
from base_rag_example import BaseRAGExample, create_text_chunks from base_rag_example import BaseRAGExample
from chunking import create_text_chunks
from .email_data.LEANN_email_reader import EmlxReader from .email_data.LEANN_email_reader import EmlxReader

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 152 KiB

44
benchmarks/data/README.md Executable file
View File

@@ -0,0 +1,44 @@
---
license: mit
---
# LEANN-RAG Evaluation Data
This repository contains the necessary data to run the recall evaluation scripts for the [LEANN-RAG](https://huggingface.co/LEANN-RAG) project.
## Dataset Components
This dataset is structured into three main parts:
1. **Pre-built LEANN Indices**:
* `dpr/`: A pre-built index for the DPR dataset.
* `rpj_wiki/`: A pre-built index for the RPJ-Wiki dataset.
These indices were created using the `leann-core` library and are required by the `LeannSearcher`.
2. **Ground Truth Data**:
* `ground_truth/`: Contains the ground truth files (`flat_results_nq_k3.json`) for both the DPR and RPJ-Wiki datasets. These files map queries to the original passage IDs from the Natural Questions benchmark, evaluated using the Contriever model.
3. **Queries**:
* `queries/`: Contains the `nq_open.jsonl` file with the Natural Questions queries used for the evaluation.
## Usage
To use this data, you can download it locally using the `huggingface-hub` library. First, install the library:
```bash
pip install huggingface-hub
```
Then, you can download the entire dataset to a local directory (e.g., `data/`) with the following Python script:
```python
from huggingface_hub import snapshot_download
snapshot_download(
repo_id="LEANN-RAG/leann-rag-evaluation-data",
repo_type="dataset",
local_dir="data"
)
```
This will download all the necessary files into a local `data` folder, preserving the repository structure. The evaluation scripts in the main [LEANN-RAG Space](https://huggingface.co/LEANN-RAG) are configured to work with this data structure.

View File

@@ -12,7 +12,7 @@ import time
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np
from leann.api import LeannBuilder, LeannSearcher from leann.api import LeannBuilder, LeannChat, LeannSearcher
def download_data_if_needed(data_root: Path, download_embeddings: bool = False): def download_data_if_needed(data_root: Path, download_embeddings: bool = False):
@@ -197,6 +197,25 @@ def main():
parser.add_argument( parser.add_argument(
"--ef-search", type=int, default=120, help="The 'efSearch' parameter for HNSW." "--ef-search", type=int, default=120, help="The 'efSearch' parameter for HNSW."
) )
parser.add_argument(
"--batch-size",
type=int,
default=0,
help="Batch size for HNSW batched search (0 disables batching)",
)
parser.add_argument(
"--llm-type",
type=str,
choices=["ollama", "hf", "openai", "gemini", "simulated"],
default="ollama",
help="LLM backend type to optionally query during evaluation (default: ollama)",
)
parser.add_argument(
"--llm-model",
type=str,
default="qwen3:1.7b",
help="LLM model identifier for the chosen backend (default: qwen3:1.7b)",
)
args = parser.parse_args() args = parser.parse_args()
# --- Path Configuration --- # --- Path Configuration ---
@@ -318,9 +337,24 @@ def main():
for i in range(num_eval_queries): for i in range(num_eval_queries):
start_time = time.time() start_time = time.time()
new_results = searcher.search(queries[i], top_k=args.top_k, ef=args.ef_search) new_results = searcher.search(
queries[i],
top_k=args.top_k,
complexity=args.ef_search,
batch_size=args.batch_size,
)
search_times.append(time.time() - start_time) search_times.append(time.time() - start_time)
# Optional: also call the LLM with configurable backend/model (does not affect recall)
llm_config = {"type": args.llm_type, "model": args.llm_model}
chat = LeannChat(args.index_path, llm_config=llm_config, searcher=searcher)
answer = chat.ask(
queries[i],
top_k=args.top_k,
complexity=args.ef_search,
batch_size=args.batch_size,
)
print(f"Answer: {answer}")
# Correct Recall Calculation: Based on TEXT content # Correct Recall Calculation: Based on TEXT content
new_texts = {result.text for result in new_results} new_texts = {result.text for result in new_results}

View File

@@ -20,7 +20,7 @@ except ImportError:
@dataclass @dataclass
class BenchmarkConfig: class BenchmarkConfig:
model_path: str = "facebook/contriever" model_path: str = "facebook/contriever-msmarco"
batch_sizes: list[int] = None batch_sizes: list[int] = None
seq_length: int = 256 seq_length: int = 256
num_runs: int = 5 num_runs: int = 5
@@ -34,7 +34,7 @@ class BenchmarkConfig:
def __post_init__(self): def __post_init__(self):
if self.batch_sizes is None: if self.batch_sizes is None:
self.batch_sizes = [1, 2, 4, 8, 16, 32, 64] self.batch_sizes = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]
class MLXBenchmark: class MLXBenchmark:
@@ -179,11 +179,14 @@ class Benchmark:
def _run_inference(self, input_ids: torch.Tensor) -> float: def _run_inference(self, input_ids: torch.Tensor) -> float:
attention_mask = torch.ones_like(input_ids) attention_mask = torch.ones_like(input_ids)
# print shape of input_ids and attention_mask
print(f"input_ids shape: {input_ids.shape}")
print(f"attention_mask shape: {attention_mask.shape}")
start_time = time.time() start_time = time.time()
with torch.no_grad(): with torch.no_grad():
self.model(input_ids=input_ids, attention_mask=attention_mask) self.model(input_ids=input_ids, attention_mask=attention_mask)
# mps sync if torch.cuda.is_available():
torch.cuda.synchronize()
if torch.backends.mps.is_available(): if torch.backends.mps.is_available():
torch.mps.synchronize() torch.mps.synchronize()
end_time = time.time() end_time = time.time()

300
docs/metadata_filtering.md Normal file
View File

@@ -0,0 +1,300 @@
# LEANN Metadata Filtering Usage Guide
## Overview
Leann possesses metadata filtering capabilities that allow you to filter search results based on arbitrary metadata fields set during chunking. This feature enables use cases like spoiler-free book search, document filtering by date/type, code search by file type, and potentially much more.
## Basic Usage
### Adding Metadata to Your Documents
When building your index, add metadata to each text chunk:
```python
from leann.api import LeannBuilder
builder = LeannBuilder("hnsw")
# Add text with metadata
builder.add_text(
text="Chapter 1: Alice falls down the rabbit hole",
metadata={
"chapter": 1,
"character": "Alice",
"themes": ["adventure", "curiosity"],
"word_count": 150
}
)
builder.build_index("alice_in_wonderland_index")
```
### Searching with Metadata Filters
Use the `metadata_filters` parameter in search calls:
```python
from leann.api import LeannSearcher
searcher = LeannSearcher("alice_in_wonderland_index")
# Search with filters
results = searcher.search(
query="What happens to Alice?",
top_k=10,
metadata_filters={
"chapter": {"<=": 5}, # Only chapters 1-5
"spoiler_level": {"!=": "high"} # No high spoilers
}
)
```
## Filter Syntax
### Basic Structure
```python
metadata_filters = {
"field_name": {"operator": value},
"another_field": {"operator": value}
}
```
### Supported Operators
#### Comparison Operators
- `"=="`: Equal to
- `"!="`: Not equal to
- `"<"`: Less than
- `"<="`: Less than or equal
- `">"`: Greater than
- `">="`: Greater than or equal
```python
# Examples
{"chapter": {"==": 1}} # Exactly chapter 1
{"page": {">": 100}} # Pages after 100
{"rating": {">=": 4.0}} # Rating 4.0 or higher
{"word_count": {"<": 500}} # Short passages
```
#### Membership Operators
- `"in"`: Value is in list
- `"not_in"`: Value is not in list
```python
# Examples
{"character": {"in": ["Alice", "Bob"]}} # Alice OR Bob
{"genre": {"not_in": ["horror", "thriller"]}} # Exclude genres
{"tags": {"in": ["fiction", "adventure"]}} # Any of these tags
```
#### String Operators
- `"contains"`: String contains substring
- `"starts_with"`: String starts with prefix
- `"ends_with"`: String ends with suffix
```python
# Examples
{"title": {"contains": "alice"}} # Title contains "alice"
{"filename": {"ends_with": ".py"}} # Python files
{"author": {"starts_with": "Dr."}} # Authors with "Dr." prefix
```
#### Boolean Operators
- `"is_true"`: Field is truthy
- `"is_false"`: Field is falsy
```python
# Examples
{"is_published": {"is_true": True}} # Published content
{"is_draft": {"is_false": False}} # Not drafts
```
### Multiple Operators on Same Field
You can apply multiple operators to the same field (AND logic):
```python
metadata_filters = {
"word_count": {
">=": 100, # At least 100 words
"<=": 500 # At most 500 words
}
}
```
### Compound Filters
Multiple fields are combined with AND logic:
```python
metadata_filters = {
"chapter": {"<=": 10}, # Up to chapter 10
"character": {"==": "Alice"}, # About Alice
"spoiler_level": {"!=": "high"} # No major spoilers
}
```
## Use Case Examples
### 1. Spoiler-Free Book Search
```python
# Reader has only read up to chapter 5
def search_spoiler_free(query, max_chapter):
return searcher.search(
query=query,
metadata_filters={
"chapter": {"<=": max_chapter},
"spoiler_level": {"in": ["none", "low"]}
}
)
results = search_spoiler_free("What happens to Alice?", max_chapter=5)
```
### 2. Document Management by Date
```python
# Find recent documents
recent_docs = searcher.search(
query="project updates",
metadata_filters={
"date": {">=": "2024-01-01"},
"document_type": {"==": "report"}
}
)
```
### 3. Code Search by File Type
```python
# Search only Python files
python_code = searcher.search(
query="authentication function",
metadata_filters={
"file_extension": {"==": ".py"},
"lines_of_code": {"<": 100}
}
)
```
### 4. Content Filtering by Audience
```python
# Age-appropriate content
family_content = searcher.search(
query="adventure stories",
metadata_filters={
"age_rating": {"in": ["G", "PG"]},
"content_warnings": {"not_in": ["violence", "adult_themes"]}
}
)
```
### 5. Multi-Book Series Management
```python
# Search across first 3 books only
early_series = searcher.search(
query="character development",
metadata_filters={
"series": {"==": "Harry Potter"},
"book_number": {"<=": 3}
}
)
```
## Running the Example
You can see metadata filtering in action with our spoiler-free book RAG example:
```bash
# Don't forget to set up the environment
uv venv
source .venv/bin/activate
# Set your OpenAI API key (required for embeddings, but you can update the example locally and use ollama instead)
export OPENAI_API_KEY="your-api-key-here"
# Run the spoiler-free book RAG example
uv run examples/spoiler_free_book_rag.py
```
This example demonstrates:
- Building an index with metadata (chapter numbers, characters, themes, locations)
- Searching with filters to avoid spoilers (e.g., only show results up to chapter 5)
- Different scenarios for readers at various points in the book
The example uses Alice's Adventures in Wonderland as sample data and shows how you can search for information without revealing plot points from later chapters.
## Advanced Patterns
### Custom Chunking with metadata
```python
def chunk_book_with_metadata(book_text, book_info):
chunks = []
for chapter_num, chapter_text in parse_chapters(book_text):
# Extract entities, themes, etc.
characters = extract_characters(chapter_text)
themes = classify_themes(chapter_text)
spoiler_level = assess_spoiler_level(chapter_text, chapter_num)
# Create chunks with rich metadata
for paragraph in split_paragraphs(chapter_text):
chunks.append({
"text": paragraph,
"metadata": {
"book_title": book_info["title"],
"chapter": chapter_num,
"characters": characters,
"themes": themes,
"spoiler_level": spoiler_level,
"word_count": len(paragraph.split()),
"reading_level": calculate_reading_level(paragraph)
}
})
return chunks
```
## Performance Considerations
### Efficient Filtering Strategies
1. **Post-search filtering**: Applies filters after vector search, which should be efficient for typical result sets (10-100 results).
2. **Metadata design**: Keep metadata fields simple and avoid deeply nested structures.
### Best Practices
1. **Consistent metadata schema**: Use consistent field names and value types across your documents.
2. **Reasonable metadata size**: Keep metadata reasonably sized to avoid storage overhead.
3. **Type consistency**: Use consistent data types for the same fields (e.g., always integers for chapter numbers).
4. **Index multiple granularities**: Consider chunking at different levels (paragraph, section, chapter) with appropriate metadata.
### Adding Metadata to Existing Indices
To add metadata filtering to existing indices, you'll need to rebuild them with metadata:
```python
# Read existing passages and add metadata
def add_metadata_to_existing_chunks(chunks):
for chunk in chunks:
# Extract or assign metadata based on content
chunk["metadata"] = extract_metadata(chunk["text"])
return chunks
# Rebuild index with metadata
enhanced_chunks = add_metadata_to_existing_chunks(existing_chunks)
builder = LeannBuilder("hnsw")
for chunk in enhanced_chunks:
builder.add_text(chunk["text"], chunk["metadata"])
builder.build_index("enhanced_index")
```

View File

@@ -0,0 +1,250 @@
#!/usr/bin/env python3
"""
Spoiler-Free Book RAG Example using LEANN Metadata Filtering
This example demonstrates how to use LEANN's metadata filtering to create
a spoiler-free book RAG system where users can search for information
up to a specific chapter they've read.
Usage:
python spoiler_free_book_rag.py
"""
import os
import sys
from typing import Any, Optional
# Add LEANN to path (adjust path as needed)
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../packages/leann-core/src"))
from leann.api import LeannBuilder, LeannSearcher
def chunk_book_with_metadata(book_title: str = "Sample Book") -> list[dict[str, Any]]:
"""
Create sample book chunks with metadata for demonstration.
In a real implementation, this would parse actual book files (epub, txt, etc.)
and extract chapter boundaries, character mentions, etc.
Args:
book_title: Title of the book
Returns:
List of chunk dictionaries with text and metadata
"""
# Sample book chunks with metadata
# In practice, you'd use proper text processing libraries
sample_chunks = [
{
"text": "Alice was beginning to get very tired of sitting by her sister on the bank, and of having nothing to do.",
"metadata": {
"book": book_title,
"chapter": 1,
"page": 1,
"characters": ["Alice", "Sister"],
"themes": ["boredom", "curiosity"],
"location": "riverbank",
},
},
{
"text": "So she was considering in her own mind (as well as she could, for the hot day made her feel very sleepy and stupid), whether the pleasure of making a daisy-chain would be worth the trouble of getting up and picking the daisies, when suddenly a White Rabbit with pink eyes ran close by her.",
"metadata": {
"book": book_title,
"chapter": 1,
"page": 2,
"characters": ["Alice", "White Rabbit"],
"themes": ["decision", "surprise", "magic"],
"location": "riverbank",
},
},
{
"text": "Alice found herself falling down a very deep well. Either the well was very deep, or she fell very slowly, for she had plenty of time as she fell to look about her and to wonder what was going to happen next.",
"metadata": {
"book": book_title,
"chapter": 2,
"page": 15,
"characters": ["Alice"],
"themes": ["falling", "wonder", "transformation"],
"location": "rabbit hole",
},
},
{
"text": "Alice meets the Cheshire Cat, who tells her that everyone in Wonderland is mad, including Alice herself.",
"metadata": {
"book": book_title,
"chapter": 6,
"page": 85,
"characters": ["Alice", "Cheshire Cat"],
"themes": ["madness", "philosophy", "identity"],
"location": "Duchess's house",
},
},
{
"text": "At the Queen's croquet ground, Alice witnesses the absurd trial that reveals the arbitrary nature of Wonderland's justice system.",
"metadata": {
"book": book_title,
"chapter": 8,
"page": 120,
"characters": ["Alice", "Queen of Hearts", "King of Hearts"],
"themes": ["justice", "absurdity", "authority"],
"location": "Queen's court",
},
},
{
"text": "Alice realizes that Wonderland was all a dream, even the Rabbit, as she wakes up on the riverbank next to her sister.",
"metadata": {
"book": book_title,
"chapter": 12,
"page": 180,
"characters": ["Alice", "Sister", "Rabbit"],
"themes": ["revelation", "reality", "growth"],
"location": "riverbank",
},
},
]
return sample_chunks
def build_spoiler_free_index(book_chunks: list[dict[str, Any]], index_name: str) -> str:
"""
Build a LEANN index with book chunks that include spoiler metadata.
Args:
book_chunks: List of book chunks with metadata
index_name: Name for the index
Returns:
Path to the built index
"""
print(f"📚 Building spoiler-free book index: {index_name}")
# Initialize LEANN builder
builder = LeannBuilder(
backend_name="hnsw", embedding_model="text-embedding-3-small", embedding_mode="openai"
)
# Add each chunk with its metadata
for chunk in book_chunks:
builder.add_text(text=chunk["text"], metadata=chunk["metadata"])
# Build the index
index_path = f"{index_name}_book_index"
builder.build_index(index_path)
print(f"✅ Index built successfully: {index_path}")
return index_path
def spoiler_free_search(
index_path: str,
query: str,
max_chapter: int,
character_filter: Optional[list[str]] = None,
) -> list[dict[str, Any]]:
"""
Perform a spoiler-free search on the book index.
Args:
index_path: Path to the LEANN index
query: Search query
max_chapter: Maximum chapter number to include
character_filter: Optional list of characters to focus on
Returns:
List of search results safe for the reader
"""
print(f"🔍 Searching: '{query}' (up to chapter {max_chapter})")
searcher = LeannSearcher(index_path)
metadata_filters = {"chapter": {"<=": max_chapter}}
if character_filter:
metadata_filters["characters"] = {"contains": character_filter[0]}
results = searcher.search(query=query, top_k=10, metadata_filters=metadata_filters)
return results
def demo_spoiler_free_rag():
"""
Demonstrate the spoiler-free book RAG system.
"""
print("🎭 Spoiler-Free Book RAG Demo")
print("=" * 40)
# Step 1: Prepare book data
book_title = "Alice's Adventures in Wonderland"
book_chunks = chunk_book_with_metadata(book_title)
print(f"📖 Loaded {len(book_chunks)} chunks from '{book_title}'")
# Step 2: Build the index (in practice, this would be done once)
try:
index_path = build_spoiler_free_index(book_chunks, "alice_wonderland")
except Exception as e:
print(f"❌ Failed to build index (likely missing dependencies): {e}")
print(
"💡 This demo shows the filtering logic - actual indexing requires LEANN dependencies"
)
return
# Step 3: Demonstrate various spoiler-free searches
search_scenarios = [
{
"description": "Reader who has only read Chapter 1",
"query": "What can you tell me about the rabbit?",
"max_chapter": 1,
},
{
"description": "Reader who has read up to Chapter 5",
"query": "Tell me about Alice's adventures",
"max_chapter": 5,
},
{
"description": "Reader who has read most of the book",
"query": "What does the Cheshire Cat represent?",
"max_chapter": 10,
},
{
"description": "Reader who has read the whole book",
"query": "What can you tell me about the rabbit?",
"max_chapter": 12,
},
]
for scenario in search_scenarios:
print(f"\n📚 Scenario: {scenario['description']}")
print(f" Query: {scenario['query']}")
try:
results = spoiler_free_search(
index_path=index_path,
query=scenario["query"],
max_chapter=scenario["max_chapter"],
)
print(f" 📄 Found {len(results)} results:")
for i, result in enumerate(results[:3], 1): # Show top 3
chapter = result.metadata.get("chapter", "?")
location = result.metadata.get("location", "?")
print(f" {i}. Chapter {chapter} ({location}): {result.text[:80]}...")
except Exception as e:
print(f" ❌ Search failed: {e}")
if __name__ == "__main__":
print("📚 LEANN Spoiler-Free Book RAG Example")
print("=====================================")
try:
demo_spoiler_free_rag()
except ImportError as e:
print(f"❌ Cannot run demo due to missing dependencies: {e}")
except Exception as e:
print(f"❌ Error running demo: {e}")

View File

@@ -1,6 +1,7 @@
import logging import logging
import os import os
import shutil import shutil
import time
from pathlib import Path from pathlib import Path
from typing import Any, Literal, Optional from typing import Any, Literal, Optional
@@ -236,6 +237,7 @@ class HNSWSearcher(BaseSearcher):
distances = np.empty((batch_size_query, top_k), dtype=np.float32) distances = np.empty((batch_size_query, top_k), dtype=np.float32)
labels = np.empty((batch_size_query, top_k), dtype=np.int64) labels = np.empty((batch_size_query, top_k), dtype=np.int64)
search_time = time.time()
self._index.search( self._index.search(
query.shape[0], query.shape[0],
faiss.swig_ptr(query), faiss.swig_ptr(query),
@@ -244,7 +246,8 @@ class HNSWSearcher(BaseSearcher):
faiss.swig_ptr(labels), faiss.swig_ptr(labels),
params, params,
) )
search_time = time.time() - search_time
logger.info(f" Search time in HNSWSearcher.search() backend: {search_time} seconds")
string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels] string_labels = [[str(int_label) for int_label in batch_labels] for batch_labels in labels]
return {"labels": string_labels, "distances": distances} return {"labels": string_labels, "distances": distances}

View File

@@ -10,7 +10,7 @@ import time
import warnings import warnings
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Literal, Optional from typing import Any, Literal, Optional, Union
import numpy as np import numpy as np
@@ -18,6 +18,7 @@ from leann.interface import LeannBackendSearcherInterface
from .chat import get_llm from .chat import get_llm
from .interface import LeannBackendFactoryInterface from .interface import LeannBackendFactoryInterface
from .metadata_filter import MetadataFilterEngine
from .registry import BACKEND_REGISTRY from .registry import BACKEND_REGISTRY
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -125,6 +126,7 @@ class PassageManager:
# footprint on very large corpora (e.g., 60M+ passages). Instead, keep # footprint on very large corpora (e.g., 60M+ passages). Instead, keep
# per-shard maps and do a lightweight per-shard lookup on demand. # per-shard maps and do a lightweight per-shard lookup on demand.
self._total_count: int = 0 self._total_count: int = 0
self.filter_engine = MetadataFilterEngine() # Initialize filter engine
# Derive index base name for standard sibling fallbacks, e.g., <index_name>.passages.* # Derive index base name for standard sibling fallbacks, e.g., <index_name>.passages.*
index_name_base = None index_name_base = None
@@ -212,6 +214,56 @@ class PassageManager:
continue continue
raise KeyError(f"Passage ID not found: {passage_id}") raise KeyError(f"Passage ID not found: {passage_id}")
def filter_search_results(
self,
search_results: list[SearchResult],
metadata_filters: Optional[dict[str, dict[str, Union[str, int, float, bool, list]]]],
) -> list[SearchResult]:
"""
Apply metadata filters to search results.
Args:
search_results: List of SearchResult objects
metadata_filters: Filter specifications to apply
Returns:
Filtered list of SearchResult objects
"""
if not metadata_filters:
return search_results
logger.debug(f"Applying metadata filters to {len(search_results)} results")
# Convert SearchResult objects to dictionaries for the filter engine
result_dicts = []
for result in search_results:
result_dicts.append(
{
"id": result.id,
"score": result.score,
"text": result.text,
"metadata": result.metadata,
}
)
# Apply filters using the filter engine
filtered_dicts = self.filter_engine.apply_filters(result_dicts, metadata_filters)
# Convert back to SearchResult objects
filtered_results = []
for result_dict in filtered_dicts:
filtered_results.append(
SearchResult(
id=result_dict["id"],
score=result_dict["score"],
text=result_dict["text"],
metadata=result_dict["metadata"],
)
)
logger.debug(f"Filtered results: {len(filtered_results)} remaining")
return filtered_results
def __len__(self) -> int: def __len__(self) -> int:
return self._total_count return self._total_count
@@ -578,6 +630,8 @@ class LeannSearcher:
self.passage_manager = PassageManager( self.passage_manager = PassageManager(
self.meta_data.get("passage_sources", []), metadata_file_path=self.meta_path_str self.meta_data.get("passage_sources", []), metadata_file_path=self.meta_path_str
) )
# Preserve backend name for conditional parameter forwarding
self.backend_name = backend_name
backend_factory = BACKEND_REGISTRY.get(backend_name) backend_factory = BACKEND_REGISTRY.get(backend_name)
if backend_factory is None: if backend_factory is None:
raise ValueError(f"Backend '{backend_name}' not found.") raise ValueError(f"Backend '{backend_name}' not found.")
@@ -597,11 +651,38 @@ class LeannSearcher:
recompute_embeddings: bool = True, recompute_embeddings: bool = True,
pruning_strategy: Literal["global", "local", "proportional"] = "global", pruning_strategy: Literal["global", "local", "proportional"] = "global",
expected_zmq_port: int = 5557, expected_zmq_port: int = 5557,
metadata_filters: Optional[dict[str, dict[str, Union[str, int, float, bool, list]]]] = None,
batch_size: int = 0,
**kwargs, **kwargs,
) -> list[SearchResult]: ) -> list[SearchResult]:
"""
Search for nearest neighbors with optional metadata filtering.
Args:
query: Text query to search for
top_k: Number of nearest neighbors to return
complexity: Search complexity/candidate list size, higher = more accurate but slower
beam_width: Number of parallel search paths/IO requests per iteration
prune_ratio: Ratio of neighbors to prune via approximate distance (0.0-1.0)
recompute_embeddings: Whether to fetch fresh embeddings from server vs use stored codes
pruning_strategy: Candidate selection strategy - "global" (default), "local", or "proportional"
expected_zmq_port: ZMQ port for embedding server communication
metadata_filters: Optional filters to apply to search results based on metadata.
Format: {"field_name": {"operator": value}}
Supported operators:
- Comparison: "==", "!=", "<", "<=", ">", ">="
- Membership: "in", "not_in"
- String: "contains", "starts_with", "ends_with"
Example: {"chapter": {"<=": 5}, "tags": {"in": ["fiction", "drama"]}}
**kwargs: Backend-specific parameters
Returns:
List of SearchResult objects with text, metadata, and similarity scores
"""
logger.info("🔍 LeannSearcher.search() called:") logger.info("🔍 LeannSearcher.search() called:")
logger.info(f" Query: '{query}'") logger.info(f" Query: '{query}'")
logger.info(f" Top_k: {top_k}") logger.info(f" Top_k: {top_k}")
logger.info(f" Metadata filters: {metadata_filters}")
logger.info(f" Additional kwargs: {kwargs}") logger.info(f" Additional kwargs: {kwargs}")
# Smart top_k detection and adjustment # Smart top_k detection and adjustment
@@ -636,23 +717,33 @@ class LeannSearcher:
use_server_if_available=recompute_embeddings, use_server_if_available=recompute_embeddings,
zmq_port=zmq_port, zmq_port=zmq_port,
) )
# logger.info(f" Generated embedding shape: {query_embedding.shape}") logger.info(f" Generated embedding shape: {query_embedding.shape}")
# time.time() - start_time embedding_time = time.time() - start_time
# logger.info(f" Embedding time: {embedding_time} seconds") logger.info(f" Embedding time: {embedding_time} seconds")
start_time = time.time() start_time = time.time()
backend_search_kwargs: dict[str, Any] = {
"complexity": complexity,
"beam_width": beam_width,
"prune_ratio": prune_ratio,
"recompute_embeddings": recompute_embeddings,
"pruning_strategy": pruning_strategy,
"zmq_port": zmq_port,
}
# Only HNSW supports batching; forward conditionally
if self.backend_name == "hnsw":
backend_search_kwargs["batch_size"] = batch_size
# Merge any extra kwargs last
backend_search_kwargs.update(kwargs)
results = self.backend_impl.search( results = self.backend_impl.search(
query_embedding, query_embedding,
top_k, top_k,
complexity=complexity, **backend_search_kwargs,
beam_width=beam_width,
prune_ratio=prune_ratio,
recompute_embeddings=recompute_embeddings,
pruning_strategy=pruning_strategy,
zmq_port=zmq_port,
**kwargs,
) )
# logger.info(f" Search time: {search_time} seconds") search_time = time.time() - start_time
logger.info(f" Search time in search() LEANN searcher: {search_time} seconds")
logger.info(f" Backend returned: labels={len(results.get('labels', [[]])[0])} results") logger.info(f" Backend returned: labels={len(results.get('labels', [[]])[0])} results")
enriched_results = [] enriched_results = []
@@ -691,6 +782,13 @@ class LeannSearcher:
f" {RED}{RESET} [{i + 1:2d}] ID: '{string_id}' -> {RED}ERROR: Passage not found!{RESET}" f" {RED}{RESET} [{i + 1:2d}] ID: '{string_id}' -> {RED}ERROR: Passage not found!{RESET}"
) )
# Apply metadata filters if specified
if metadata_filters:
logger.info(f" 🔍 Applying metadata filters: {metadata_filters}")
enriched_results = self.passage_manager.filter_search_results(
enriched_results, metadata_filters
)
# Define color codes outside the loop for final message # Define color codes outside the loop for final message
GREEN = "\033[92m" GREEN = "\033[92m"
RESET = "\033[0m" RESET = "\033[0m"
@@ -731,9 +829,15 @@ class LeannChat:
index_path: str, index_path: str,
llm_config: Optional[dict[str, Any]] = None, llm_config: Optional[dict[str, Any]] = None,
enable_warmup: bool = False, enable_warmup: bool = False,
searcher: Optional[LeannSearcher] = None,
**kwargs, **kwargs,
): ):
self.searcher = LeannSearcher(index_path, enable_warmup=enable_warmup, **kwargs) if searcher is None:
self.searcher = LeannSearcher(index_path, enable_warmup=enable_warmup, **kwargs)
self._owns_searcher = True
else:
self.searcher = searcher
self._owns_searcher = False
self.llm = get_llm(llm_config) self.llm = get_llm(llm_config)
def ask( def ask(
@@ -747,6 +851,8 @@ class LeannChat:
pruning_strategy: Literal["global", "local", "proportional"] = "global", pruning_strategy: Literal["global", "local", "proportional"] = "global",
llm_kwargs: Optional[dict[str, Any]] = None, llm_kwargs: Optional[dict[str, Any]] = None,
expected_zmq_port: int = 5557, expected_zmq_port: int = 5557,
metadata_filters: Optional[dict[str, dict[str, Union[str, int, float, bool, list]]]] = None,
batch_size: int = 0,
**search_kwargs, **search_kwargs,
): ):
if llm_kwargs is None: if llm_kwargs is None:
@@ -761,10 +867,12 @@ class LeannChat:
recompute_embeddings=recompute_embeddings, recompute_embeddings=recompute_embeddings,
pruning_strategy=pruning_strategy, pruning_strategy=pruning_strategy,
expected_zmq_port=expected_zmq_port, expected_zmq_port=expected_zmq_port,
metadata_filters=metadata_filters,
batch_size=batch_size,
**search_kwargs, **search_kwargs,
) )
search_time = time.time() - search_time search_time = time.time() - search_time
# logger.info(f" Search time: {search_time} seconds") logger.info(f" Search time: {search_time} seconds")
context = "\n\n".join([r.text for r in results]) context = "\n\n".join([r.text for r in results])
prompt = ( prompt = (
"Here is some retrieved context that might help answer your question:\n\n" "Here is some retrieved context that might help answer your question:\n\n"
@@ -800,7 +908,9 @@ class LeannChat:
This method should be called after you're done using the chat interface, This method should be called after you're done using the chat interface,
especially in test environments or batch processing scenarios. especially in test environments or batch processing scenarios.
""" """
if hasattr(self.searcher, "cleanup"): # Only stop the embedding server if this LeannChat instance created the searcher.
# When a shared searcher is passed in, avoid shutting down the server to enable reuse.
if getattr(self, "_owns_searcher", False) and hasattr(self.searcher, "cleanup"):
self.searcher.cleanup() self.searcher.cleanup()
# Enable automatic cleanup patterns # Enable automatic cleanup patterns

View File

@@ -322,9 +322,17 @@ Examples:
return basic_matches return basic_matches
def _should_exclude_file(self, relative_path: Path, gitignore_matches) -> bool: def _should_exclude_file(self, file_path: Path, gitignore_matches) -> bool:
"""Check if a file should be excluded using gitignore parser.""" """Check if a file should be excluded using gitignore parser.
return gitignore_matches(str(relative_path))
Always match against absolute, posix-style paths for consistency with
gitignore_parser expectations.
"""
try:
absolute_path = file_path.resolve()
except Exception:
absolute_path = Path(str(file_path))
return gitignore_matches(absolute_path.as_posix())
def _is_git_submodule(self, path: Path) -> bool: def _is_git_submodule(self, path: Path) -> bool:
"""Check if a path is a git submodule.""" """Check if a path is a git submodule."""
@@ -396,7 +404,9 @@ Examples:
print(f" {current_path}") print(f" {current_path}")
print(" " + "" * 45) print(" " + "" * 45)
current_indexes = self._discover_indexes_in_project(current_path) current_indexes = self._discover_indexes_in_project(
current_path, exclude_dirs=other_projects
)
if current_indexes: if current_indexes:
for idx in current_indexes: for idx in current_indexes:
total_indexes += 1 total_indexes += 1
@@ -435,9 +445,14 @@ Examples:
print(" leann build my-docs --docs ./documents") print(" leann build my-docs --docs ./documents")
else: else:
# Count only projects that have at least one discoverable index # Count only projects that have at least one discoverable index
projects_count = sum( projects_count = 0
1 for p in valid_projects if len(self._discover_indexes_in_project(p)) > 0 for p in valid_projects:
) if p == current_path:
discovered = self._discover_indexes_in_project(p, exclude_dirs=other_projects)
else:
discovered = self._discover_indexes_in_project(p)
if len(discovered) > 0:
projects_count += 1
print(f"📊 Total: {total_indexes} indexes across {projects_count} projects") print(f"📊 Total: {total_indexes} indexes across {projects_count} projects")
if current_indexes_count > 0: if current_indexes_count > 0:
@@ -454,9 +469,22 @@ Examples:
print("\n💡 Create your first index:") print("\n💡 Create your first index:")
print(" leann build my-docs --docs ./documents") print(" leann build my-docs --docs ./documents")
def _discover_indexes_in_project(self, project_path: Path): def _discover_indexes_in_project(
"""Discover all indexes in a project directory (both CLI and apps formats)""" self, project_path: Path, exclude_dirs: Optional[list[Path]] = None
):
"""Discover all indexes in a project directory (both CLI and apps formats)
exclude_dirs: when provided, skip any APP-format index files that are
located under these directories. This prevents duplicates when the
current project is a parent directory of other registered projects.
"""
indexes = [] indexes = []
exclude_dirs = exclude_dirs or []
# normalize to resolved paths once for comparison
try:
exclude_dirs_resolved = [p.resolve() for p in exclude_dirs]
except Exception:
exclude_dirs_resolved = exclude_dirs
# 1. CLI format: .leann/indexes/index_name/ # 1. CLI format: .leann/indexes/index_name/
cli_indexes_dir = project_path / ".leann" / "indexes" cli_indexes_dir = project_path / ".leann" / "indexes"
@@ -495,6 +523,17 @@ Examples:
continue continue
except Exception: except Exception:
pass pass
# Skip meta files that live under excluded directories
try:
meta_parent_resolved = meta_file.parent.resolve()
if any(
meta_parent_resolved.is_relative_to(ex_dir)
for ex_dir in exclude_dirs_resolved
):
continue
except Exception:
# best effort; if resolve or comparison fails, do not exclude
pass
# Use the parent directory name as the app index display name # Use the parent directory name as the app index display name
display_name = meta_file.parent.name display_name = meta_file.parent.name
# Extract file base used to store files # Extract file base used to store files
@@ -1022,7 +1061,8 @@ Examples:
# Try to use better PDF parsers first, but only if PDFs are requested # Try to use better PDF parsers first, but only if PDFs are requested
documents = [] documents = []
docs_path = Path(docs_dir) # Use resolved absolute paths to avoid mismatches (symlinks, relative vs absolute)
docs_path = Path(docs_dir).resolve()
# Check if we should process PDFs # Check if we should process PDFs
should_process_pdfs = custom_file_types is None or ".pdf" in custom_file_types should_process_pdfs = custom_file_types is None or ".pdf" in custom_file_types
@@ -1031,10 +1071,15 @@ Examples:
for file_path in docs_path.rglob("*.pdf"): for file_path in docs_path.rglob("*.pdf"):
# Check if file matches any exclude pattern # Check if file matches any exclude pattern
try: try:
# Ensure both paths are resolved before computing relativity
file_path_resolved = file_path.resolve()
# Determine directory scope using the non-resolved path to avoid
# misclassifying symlinked entries as outside the docs directory
relative_path = file_path.relative_to(docs_path) relative_path = file_path.relative_to(docs_path)
if not include_hidden and _path_has_hidden_segment(relative_path): if not include_hidden and _path_has_hidden_segment(relative_path):
continue continue
if self._should_exclude_file(relative_path, gitignore_matches): # Use absolute path for gitignore matching
if self._should_exclude_file(file_path_resolved, gitignore_matches):
continue continue
except ValueError: except ValueError:
# Skip files that can't be made relative to docs_path # Skip files that can't be made relative to docs_path
@@ -1077,10 +1122,11 @@ Examples:
) -> bool: ) -> bool:
"""Return True if file should be included (not excluded)""" """Return True if file should be included (not excluded)"""
try: try:
docs_path_obj = Path(docs_dir) docs_path_obj = Path(docs_dir).resolve()
file_path_obj = Path(file_path) file_path_obj = Path(file_path).resolve()
relative_path = file_path_obj.relative_to(docs_path_obj) # Use absolute path for gitignore matching
return not self._should_exclude_file(relative_path, gitignore_matches) _ = file_path_obj.relative_to(docs_path_obj) # validate scope
return not self._should_exclude_file(file_path_obj, gitignore_matches)
except (ValueError, OSError): except (ValueError, OSError):
return True # Include files that can't be processed return True # Include files that can't be processed

View File

@@ -6,6 +6,7 @@ Preserves all optimization parameters to ensure performance
import logging import logging
import os import os
import time
from typing import Any from typing import Any
import numpy as np import numpy as np
@@ -28,6 +29,8 @@ def compute_embeddings(
is_build: bool = False, is_build: bool = False,
batch_size: int = 32, batch_size: int = 32,
adaptive_optimization: bool = True, adaptive_optimization: bool = True,
manual_tokenize: bool = False,
max_length: int = 512,
) -> np.ndarray: ) -> np.ndarray:
""" """
Unified embedding computation entry point Unified embedding computation entry point
@@ -50,6 +53,8 @@ def compute_embeddings(
is_build=is_build, is_build=is_build,
batch_size=batch_size, batch_size=batch_size,
adaptive_optimization=adaptive_optimization, adaptive_optimization=adaptive_optimization,
manual_tokenize=manual_tokenize,
max_length=max_length,
) )
elif mode == "openai": elif mode == "openai":
return compute_embeddings_openai(texts, model_name) return compute_embeddings_openai(texts, model_name)
@@ -71,6 +76,8 @@ def compute_embeddings_sentence_transformers(
batch_size: int = 32, batch_size: int = 32,
is_build: bool = False, is_build: bool = False,
adaptive_optimization: bool = True, adaptive_optimization: bool = True,
manual_tokenize: bool = False,
max_length: int = 512,
) -> np.ndarray: ) -> np.ndarray:
""" """
Compute embeddings using SentenceTransformer with model caching and adaptive optimization Compute embeddings using SentenceTransformer with model caching and adaptive optimization
@@ -214,20 +221,130 @@ def compute_embeddings_sentence_transformers(
logger.info(f"Model cached: {cache_key}") logger.info(f"Model cached: {cache_key}")
# Compute embeddings with optimized inference mode # Compute embeddings with optimized inference mode
logger.info(f"Starting embedding computation... (batch_size: {batch_size})") logger.info(
f"Starting embedding computation... (batch_size: {batch_size}, manual_tokenize={manual_tokenize})"
)
# Use torch.inference_mode for optimal performance start_time = time.time()
with torch.inference_mode(): if not manual_tokenize:
embeddings = model.encode( # Use SentenceTransformer's optimized encode path (default)
texts, with torch.inference_mode():
batch_size=batch_size, embeddings = model.encode(
show_progress_bar=is_build, # Don't show progress bar in server environment texts,
convert_to_numpy=True, batch_size=batch_size,
normalize_embeddings=False, show_progress_bar=is_build, # Don't show progress bar in server environment
device=device, convert_to_numpy=True,
) normalize_embeddings=False,
device=device,
)
# Synchronize if CUDA to measure accurate wall time
try:
if torch.cuda.is_available():
torch.cuda.synchronize()
except Exception:
pass
else:
# Manual tokenization + forward pass using HF AutoTokenizer/AutoModel
try:
from transformers import AutoModel, AutoTokenizer # type: ignore
except Exception as e:
raise ImportError(f"transformers is required for manual_tokenize=True: {e}")
# Cache tokenizer and model
tok_cache_key = f"hf_tokenizer_{model_name}"
mdl_cache_key = f"hf_model_{model_name}_{device}_{use_fp16}"
if tok_cache_key in _model_cache and mdl_cache_key in _model_cache:
hf_tokenizer = _model_cache[tok_cache_key]
hf_model = _model_cache[mdl_cache_key]
logger.info("Using cached HF tokenizer/model for manual path")
else:
logger.info("Loading HF tokenizer/model for manual tokenization path")
hf_tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
torch_dtype = torch.float16 if (use_fp16 and device == "cuda") else torch.float32
hf_model = AutoModel.from_pretrained(model_name, torch_dtype=torch_dtype)
hf_model.to(device)
hf_model.eval()
# Optional compile on supported devices
if device in ["cuda", "mps"]:
try:
hf_model = torch.compile(hf_model, mode="reduce-overhead", dynamic=True) # type: ignore
except Exception:
pass
_model_cache[tok_cache_key] = hf_tokenizer
_model_cache[mdl_cache_key] = hf_model
all_embeddings: list[np.ndarray] = []
# Progress bar when building or for large inputs
show_progress = is_build or len(texts) > 32
try:
if show_progress:
from tqdm import tqdm # type: ignore
batch_iter = tqdm(
range(0, len(texts), batch_size),
desc="Embedding (manual)",
unit="batch",
)
else:
batch_iter = range(0, len(texts), batch_size)
except Exception:
batch_iter = range(0, len(texts), batch_size)
start_time_manual = time.time()
with torch.inference_mode():
for start_index in batch_iter:
end_index = min(start_index + batch_size, len(texts))
batch_texts = texts[start_index:end_index]
tokenize_start_time = time.time()
inputs = hf_tokenizer(
batch_texts,
padding=True,
truncation=True,
max_length=max_length,
return_tensors="pt",
)
tokenize_end_time = time.time()
logger.info(
f"Tokenize time taken: {tokenize_end_time - tokenize_start_time} seconds"
)
# Print shapes of all input tensors for debugging
for k, v in inputs.items():
print(f"inputs[{k!r}] shape: {getattr(v, 'shape', type(v))}")
to_device_start_time = time.time()
inputs = {k: v.to(device) for k, v in inputs.items()}
to_device_end_time = time.time()
logger.info(
f"To device time taken: {to_device_end_time - to_device_start_time} seconds"
)
forward_start_time = time.time()
outputs = hf_model(**inputs)
forward_end_time = time.time()
logger.info(f"Forward time taken: {forward_end_time - forward_start_time} seconds")
last_hidden_state = outputs.last_hidden_state # (B, L, H)
attention_mask = inputs.get("attention_mask")
if attention_mask is None:
# Fallback: assume all tokens are valid
pooled = last_hidden_state.mean(dim=1)
else:
mask = attention_mask.unsqueeze(-1).to(last_hidden_state.dtype)
masked = last_hidden_state * mask
lengths = mask.sum(dim=1).clamp(min=1)
pooled = masked.sum(dim=1) / lengths
# Move to CPU float32
batch_embeddings = pooled.detach().to("cpu").float().numpy()
all_embeddings.append(batch_embeddings)
embeddings = np.vstack(all_embeddings).astype(np.float32, copy=False)
try:
if torch.cuda.is_available():
torch.cuda.synchronize()
except Exception:
pass
end_time = time.time()
logger.info(f"Manual tokenize time taken: {end_time - start_time_manual} seconds")
end_time = time.time()
logger.info(f"Generated {len(embeddings)} embeddings, dimension: {embeddings.shape[1]}") logger.info(f"Generated {len(embeddings)} embeddings, dimension: {embeddings.shape[1]}")
logger.info(f"Time taken: {end_time - start_time} seconds")
# Validate results # Validate results
if np.isnan(embeddings).any() or np.isinf(embeddings).any(): if np.isnan(embeddings).any() or np.isinf(embeddings).any():

View File

@@ -0,0 +1,240 @@
"""
Metadata filtering engine for LEANN search results.
This module provides generic metadata filtering capabilities that can be applied
to search results from any LEANN backend. The filtering supports various
operators for different data types including numbers, strings, booleans, and lists.
"""
import logging
from typing import Any, Union
logger = logging.getLogger(__name__)
# Type alias for filter specifications
FilterValue = Union[str, int, float, bool, list]
FilterSpec = dict[str, FilterValue]
MetadataFilters = dict[str, FilterSpec]
class MetadataFilterEngine:
"""
Engine for evaluating metadata filters against search results.
Supports various operators for filtering based on metadata fields:
- Comparison: ==, !=, <, <=, >, >=
- Membership: in, not_in
- String operations: contains, starts_with, ends_with
- Boolean operations: is_true, is_false
"""
def __init__(self):
"""Initialize the filter engine with supported operators."""
self.operators = {
"==": self._equals,
"!=": self._not_equals,
"<": self._less_than,
"<=": self._less_than_or_equal,
">": self._greater_than,
">=": self._greater_than_or_equal,
"in": self._in,
"not_in": self._not_in,
"contains": self._contains,
"starts_with": self._starts_with,
"ends_with": self._ends_with,
"is_true": self._is_true,
"is_false": self._is_false,
}
def apply_filters(
self, search_results: list[dict[str, Any]], metadata_filters: MetadataFilters
) -> list[dict[str, Any]]:
"""
Apply metadata filters to a list of search results.
Args:
search_results: List of result dictionaries, each containing 'metadata' field
metadata_filters: Dictionary of filter specifications
Format: {"field_name": {"operator": value}}
Returns:
Filtered list of search results
"""
if not metadata_filters:
return search_results
logger.debug(f"Applying filters: {metadata_filters}")
logger.debug(f"Input results count: {len(search_results)}")
filtered_results = []
for result in search_results:
if self._evaluate_filters(result, metadata_filters):
filtered_results.append(result)
logger.debug(f"Filtered results count: {len(filtered_results)}")
return filtered_results
def _evaluate_filters(self, result: dict[str, Any], filters: MetadataFilters) -> bool:
"""
Evaluate all filters against a single search result.
All filters must pass (AND logic) for the result to be included.
Args:
result: Full search result dictionary (including metadata, text, etc.)
filters: Filter specifications to evaluate
Returns:
True if all filters pass, False otherwise
"""
for field_name, filter_spec in filters.items():
if not self._evaluate_field_filter(result, field_name, filter_spec):
return False
return True
def _evaluate_field_filter(
self, result: dict[str, Any], field_name: str, filter_spec: FilterSpec
) -> bool:
"""
Evaluate a single field filter against a search result.
Args:
result: Full search result dictionary
field_name: Name of the field to filter on
filter_spec: Filter specification for this field
Returns:
True if the filter passes, False otherwise
"""
# First check top-level fields, then check metadata
field_value = result.get(field_name)
if field_value is None:
# Try to get from metadata if not found at top level
metadata = result.get("metadata", {})
field_value = metadata.get(field_name)
# Handle missing fields - they fail all filters except existence checks
if field_value is None:
logger.debug(f"Field '{field_name}' not found in result or metadata")
return False
# Evaluate each operator in the filter spec
for operator, expected_value in filter_spec.items():
if operator not in self.operators:
logger.warning(f"Unsupported operator: {operator}")
return False
try:
if not self.operators[operator](field_value, expected_value):
logger.debug(
f"Filter failed: {field_name} {operator} {expected_value} "
f"(actual: {field_value})"
)
return False
except Exception as e:
logger.warning(
f"Error evaluating filter {field_name} {operator} {expected_value}: {e}"
)
return False
return True
# Comparison operators
def _equals(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value equals expected value."""
return field_value == expected_value
def _not_equals(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value does not equal expected value."""
return field_value != expected_value
def _less_than(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is less than expected value."""
return self._numeric_compare(field_value, expected_value, lambda a, b: a < b)
def _less_than_or_equal(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is less than or equal to expected value."""
return self._numeric_compare(field_value, expected_value, lambda a, b: a <= b)
def _greater_than(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is greater than expected value."""
return self._numeric_compare(field_value, expected_value, lambda a, b: a > b)
def _greater_than_or_equal(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is greater than or equal to expected value."""
return self._numeric_compare(field_value, expected_value, lambda a, b: a >= b)
# Membership operators
def _in(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is in the expected list/collection."""
if not isinstance(expected_value, (list, tuple, set)):
raise ValueError("'in' operator requires a list, tuple, or set")
return field_value in expected_value
def _not_in(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is not in the expected list/collection."""
if not isinstance(expected_value, (list, tuple, set)):
raise ValueError("'not_in' operator requires a list, tuple, or set")
return field_value not in expected_value
# String operators
def _contains(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value contains the expected substring."""
field_str = str(field_value)
expected_str = str(expected_value)
return expected_str in field_str
def _starts_with(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value starts with the expected prefix."""
field_str = str(field_value)
expected_str = str(expected_value)
return field_str.startswith(expected_str)
def _ends_with(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value ends with the expected suffix."""
field_str = str(field_value)
expected_str = str(expected_value)
return field_str.endswith(expected_str)
# Boolean operators
def _is_true(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is truthy."""
return bool(field_value)
def _is_false(self, field_value: Any, expected_value: Any) -> bool:
"""Check if field value is falsy."""
return not bool(field_value)
# Helper methods
def _numeric_compare(self, field_value: Any, expected_value: Any, compare_func) -> bool:
"""
Helper for numeric comparisons with type coercion.
Args:
field_value: Value from metadata
expected_value: Value to compare against
compare_func: Comparison function to apply
Returns:
Result of comparison
"""
try:
# Try to convert both values to numbers for comparison
if isinstance(field_value, str) and isinstance(expected_value, str):
# String comparison if both are strings
return compare_func(field_value, expected_value)
# Numeric comparison - attempt to convert to float
field_num = (
float(field_value) if not isinstance(field_value, (int, float)) else field_value
)
expected_num = (
float(expected_value)
if not isinstance(expected_value, (int, float))
else expected_value
)
return compare_func(field_num, expected_num)
except (ValueError, TypeError):
# Fall back to string comparison if numeric conversion fails
return compare_func(str(field_value), str(expected_value))

View File

@@ -0,0 +1,365 @@
"""
Comprehensive tests for metadata filtering functionality.
This module tests the MetadataFilterEngine class and its integration
with the LEANN search system.
"""
import os
# Import the modules we're testing
import sys
from unittest.mock import Mock, patch
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../packages/leann-core/src"))
from leann.api import PassageManager, SearchResult
from leann.metadata_filter import MetadataFilterEngine
class TestMetadataFilterEngine:
"""Test suite for the MetadataFilterEngine class."""
def setup_method(self):
"""Setup test fixtures."""
self.engine = MetadataFilterEngine()
# Sample search results for testing
self.sample_results = [
{
"id": "doc1",
"score": 0.95,
"text": "This is chapter 1 content",
"metadata": {
"chapter": 1,
"character": "Alice",
"tags": ["adventure", "fantasy"],
"word_count": 150,
"is_published": True,
"genre": "fiction",
},
},
{
"id": "doc2",
"score": 0.87,
"text": "This is chapter 3 content",
"metadata": {
"chapter": 3,
"character": "Bob",
"tags": ["mystery", "thriller"],
"word_count": 250,
"is_published": True,
"genre": "fiction",
},
},
{
"id": "doc3",
"score": 0.82,
"text": "This is chapter 5 content",
"metadata": {
"chapter": 5,
"character": "Alice",
"tags": ["romance", "drama"],
"word_count": 300,
"is_published": False,
"genre": "non-fiction",
},
},
{
"id": "doc4",
"score": 0.78,
"text": "This is chapter 10 content",
"metadata": {
"chapter": 10,
"character": "Charlie",
"tags": ["action", "adventure"],
"word_count": 400,
"is_published": True,
"genre": "fiction",
},
},
]
def test_engine_initialization(self):
"""Test that the filter engine initializes correctly."""
assert self.engine is not None
assert len(self.engine.operators) > 0
assert "==" in self.engine.operators
assert "contains" in self.engine.operators
assert "in" in self.engine.operators
def test_direct_instantiation(self):
"""Test direct instantiation of the engine."""
engine = MetadataFilterEngine()
assert isinstance(engine, MetadataFilterEngine)
def test_no_filters_returns_all_results(self):
"""Test that passing None or empty filters returns all results."""
# Test with None
result = self.engine.apply_filters(self.sample_results, None)
assert len(result) == len(self.sample_results)
# Test with empty dict
result = self.engine.apply_filters(self.sample_results, {})
assert len(result) == len(self.sample_results)
# Test comparison operators
def test_equals_filter(self):
"""Test equals (==) filter."""
filters = {"chapter": {"==": 1}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 1
assert result[0]["id"] == "doc1"
def test_not_equals_filter(self):
"""Test not equals (!=) filter."""
filters = {"genre": {"!=": "fiction"}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 1
assert result[0]["metadata"]["genre"] == "non-fiction"
def test_less_than_filter(self):
"""Test less than (<) filter."""
filters = {"chapter": {"<": 5}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 2
chapters = [r["metadata"]["chapter"] for r in result]
assert all(ch < 5 for ch in chapters)
def test_less_than_or_equal_filter(self):
"""Test less than or equal (<=) filter."""
filters = {"chapter": {"<=": 5}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 3
chapters = [r["metadata"]["chapter"] for r in result]
assert all(ch <= 5 for ch in chapters)
def test_greater_than_filter(self):
"""Test greater than (>) filter."""
filters = {"word_count": {">": 200}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 3 # Documents with word_count 250, 300, 400
word_counts = [r["metadata"]["word_count"] for r in result]
assert all(wc > 200 for wc in word_counts)
def test_greater_than_or_equal_filter(self):
"""Test greater than or equal (>=) filter."""
filters = {"word_count": {">=": 250}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 3
word_counts = [r["metadata"]["word_count"] for r in result]
assert all(wc >= 250 for wc in word_counts)
# Test membership operators
def test_in_filter(self):
"""Test in filter."""
filters = {"character": {"in": ["Alice", "Bob"]}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 3
characters = [r["metadata"]["character"] for r in result]
assert all(ch in ["Alice", "Bob"] for ch in characters)
def test_not_in_filter(self):
"""Test not_in filter."""
filters = {"character": {"not_in": ["Alice", "Bob"]}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 1
assert result[0]["metadata"]["character"] == "Charlie"
# Test string operators
def test_contains_filter(self):
"""Test contains filter."""
filters = {"genre": {"contains": "fiction"}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 4 # Both "fiction" and "non-fiction"
def test_starts_with_filter(self):
"""Test starts_with filter."""
filters = {"genre": {"starts_with": "non"}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 1
assert result[0]["metadata"]["genre"] == "non-fiction"
def test_ends_with_filter(self):
"""Test ends_with filter."""
filters = {"text": {"ends_with": "content"}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 4 # All sample texts end with "content"
# Test boolean operators
def test_is_true_filter(self):
"""Test is_true filter."""
filters = {"is_published": {"is_true": True}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 3
assert all(r["metadata"]["is_published"] for r in result)
def test_is_false_filter(self):
"""Test is_false filter."""
filters = {"is_published": {"is_false": False}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 1
assert not result[0]["metadata"]["is_published"]
# Test compound filters (AND logic)
def test_compound_filters(self):
"""Test multiple filters applied together (AND logic)."""
filters = {"genre": {"==": "fiction"}, "chapter": {"<=": 5}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 2
for r in result:
assert r["metadata"]["genre"] == "fiction"
assert r["metadata"]["chapter"] <= 5
def test_multiple_operators_same_field(self):
"""Test multiple operators on the same field."""
filters = {"word_count": {">=": 200, "<=": 350}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 2
for r in result:
wc = r["metadata"]["word_count"]
assert 200 <= wc <= 350
# Test edge cases
def test_missing_field_fails_filter(self):
"""Test that missing metadata fields fail filters."""
filters = {"nonexistent_field": {"==": "value"}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 0
def test_invalid_operator(self):
"""Test that invalid operators are handled gracefully."""
filters = {"chapter": {"invalid_op": 1}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 0 # Should filter out all results
def test_type_coercion_numeric(self):
"""Test numeric type coercion in comparisons."""
# Add a result with string chapter number
test_results = [
*self.sample_results,
{
"id": "doc5",
"score": 0.75,
"text": "String chapter test",
"metadata": {"chapter": "2", "genre": "test"},
},
]
filters = {"chapter": {"<": 3}}
result = self.engine.apply_filters(test_results, filters)
# Should include doc1 (chapter=1) and doc5 (chapter="2")
assert len(result) == 2
ids = [r["id"] for r in result]
assert "doc1" in ids
assert "doc5" in ids
def test_list_membership_with_nested_tags(self):
"""Test membership operations with list metadata."""
# Note: This tests the metadata structure, not list field filtering
# For list field filtering, we'd need to modify the test data
filters = {"character": {"in": ["Alice"]}}
result = self.engine.apply_filters(self.sample_results, filters)
assert len(result) == 2
assert all(r["metadata"]["character"] == "Alice" for r in result)
def test_empty_results_list(self):
"""Test filtering on empty results list."""
filters = {"chapter": {"==": 1}}
result = self.engine.apply_filters([], filters)
assert len(result) == 0
class TestPassageManagerFiltering:
"""Test suite for PassageManager filtering integration."""
def setup_method(self):
"""Setup test fixtures."""
# Mock the passage manager without actual file I/O
self.passage_manager = Mock(spec=PassageManager)
self.passage_manager.filter_engine = MetadataFilterEngine()
# Sample SearchResult objects
self.search_results = [
SearchResult(
id="doc1",
score=0.95,
text="Chapter 1 content",
metadata={"chapter": 1, "character": "Alice"},
),
SearchResult(
id="doc2",
score=0.87,
text="Chapter 5 content",
metadata={"chapter": 5, "character": "Bob"},
),
SearchResult(
id="doc3",
score=0.82,
text="Chapter 10 content",
metadata={"chapter": 10, "character": "Alice"},
),
]
def test_search_result_filtering(self):
"""Test filtering SearchResult objects."""
# Create a real PassageManager instance just for the filtering method
# We'll mock the file operations
with patch("builtins.open"), patch("json.loads"), patch("pickle.load"):
pm = PassageManager([{"type": "jsonl", "path": "test.jsonl"}])
filters = {"chapter": {"<=": 5}}
result = pm.filter_search_results(self.search_results, filters)
assert len(result) == 2
chapters = [r.metadata["chapter"] for r in result]
assert all(ch <= 5 for ch in chapters)
def test_filter_search_results_no_filters(self):
"""Test that None filters return all results."""
with patch("builtins.open"), patch("json.loads"), patch("pickle.load"):
pm = PassageManager([{"type": "jsonl", "path": "test.jsonl"}])
result = pm.filter_search_results(self.search_results, None)
assert len(result) == len(self.search_results)
def test_filter_maintains_search_result_type(self):
"""Test that filtering returns SearchResult objects."""
with patch("builtins.open"), patch("json.loads"), patch("pickle.load"):
pm = PassageManager([{"type": "jsonl", "path": "test.jsonl"}])
filters = {"character": {"==": "Alice"}}
result = pm.filter_search_results(self.search_results, filters)
assert len(result) == 2
for r in result:
assert isinstance(r, SearchResult)
assert r.metadata["character"] == "Alice"
# Integration tests would go here, but they require actual LEANN backend setup
# These would test the full pipeline from LeannSearcher.search() with metadata_filters
if __name__ == "__main__":
# Run basic smoke tests
engine = MetadataFilterEngine()
sample_data = [
{
"id": "test1",
"score": 0.9,
"text": "Test content",
"metadata": {"chapter": 1, "published": True},
}
]
# Test basic filtering
result = engine.apply_filters(sample_data, {"chapter": {"==": 1}})
assert len(result) == 1
print("✅ Basic filtering test passed")
result = engine.apply_filters(sample_data, {"chapter": {"==": 2}})
assert len(result) == 0
print("✅ No match filtering test passed")
print("🎉 All smoke tests passed!")

2445
uv.lock generated
View File

File diff suppressed because it is too large Load Diff