Compare commits

...

4 Commits

Author SHA1 Message Date
Andy Lee
2e7b2d29dc style 2025-09-17 16:42:19 -07:00
Andy Lee
bc4d02c2d3 chore: fix ruff warnings (RUF059, F401) 2025-09-17 16:18:48 -07:00
Andy Lee
b5ce34ad82 style 2025-09-17 16:00:37 -07:00
Andy Lee
6c15e6ad23 fix(core): package chunking utils for AST chunking; re-export in apps; CLI imports packaged utils 2025-09-17 15:57:05 -07:00
4 changed files with 59 additions and 141 deletions

View File

@@ -1,16 +1,38 @@
""" """Unified chunking utilities facade.
Chunking utilities for LEANN RAG applications.
Provides AST-aware and traditional text chunking functionality. This module re-exports the packaged utilities from `leann.chunking_utils` so
that both repo apps (importing `chunking`) and installed wheels share one
single implementation. When running from the repo without installation, it
adds the `packages/leann-core/src` directory to `sys.path` as a fallback.
""" """
from .utils import ( import sys
CODE_EXTENSIONS, from pathlib import Path
create_ast_chunks,
create_text_chunks, try:
create_traditional_chunks, from leann.chunking_utils import (
detect_code_files, CODE_EXTENSIONS,
get_language_from_extension, create_ast_chunks,
) create_text_chunks,
create_traditional_chunks,
detect_code_files,
get_language_from_extension,
)
except Exception: # pragma: no cover - best-effort fallback for dev environment
repo_root = Path(__file__).resolve().parents[2]
leann_src = repo_root / "packages" / "leann-core" / "src"
if leann_src.exists():
sys.path.insert(0, str(leann_src))
from leann.chunking_utils import (
CODE_EXTENSIONS,
create_ast_chunks,
create_text_chunks,
create_traditional_chunks,
detect_code_files,
get_language_from_extension,
)
else:
raise
__all__ = [ __all__ = [
"CODE_EXTENSIONS", "CODE_EXTENSIONS",

View File

@@ -74,7 +74,7 @@ class ChromeHistoryReader(BaseReader):
if count >= max_count and max_count > 0: if count >= max_count and max_count > 0:
break break
last_visit, url, title, visit_count, typed_count, hidden = row last_visit, url, title, visit_count, typed_count, _hidden = row
# Create document content with metadata embedded in text # Create document content with metadata embedded in text
doc_content = f""" doc_content = f"""

View File

@@ -1,6 +1,6 @@
""" """
Enhanced chunking utilities with AST-aware code chunking support. Enhanced chunking utilities with AST-aware code chunking support.
Provides unified interface for both traditional and AST-based text chunking. Packaged within leann-core so installed wheels can import it reliably.
""" """
import logging import logging
@@ -22,30 +22,9 @@ CODE_EXTENSIONS = {
".jsx": "typescript", ".jsx": "typescript",
} }
# Default chunk parameters for different content types
DEFAULT_CHUNK_PARAMS = {
"code": {
"max_chunk_size": 512,
"chunk_overlap": 64,
},
"text": {
"chunk_size": 256,
"chunk_overlap": 128,
},
}
def detect_code_files(documents, code_extensions=None) -> tuple[list, list]: def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
""" """Separate documents into code files and regular text files."""
Separate documents into code files and regular text files.
Args:
documents: List of LlamaIndex Document objects
code_extensions: Dict mapping file extensions to languages (defaults to CODE_EXTENSIONS)
Returns:
Tuple of (code_documents, text_documents)
"""
if code_extensions is None: if code_extensions is None:
code_extensions = CODE_EXTENSIONS code_extensions = CODE_EXTENSIONS
@@ -53,16 +32,10 @@ def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
text_docs = [] text_docs = []
for doc in documents: for doc in documents:
# Get file path from metadata file_path = doc.metadata.get("file_path", "") or doc.metadata.get("file_name", "")
file_path = doc.metadata.get("file_path", "")
if not file_path:
# Fallback to file_name
file_path = doc.metadata.get("file_name", "")
if file_path: if file_path:
file_ext = Path(file_path).suffix.lower() file_ext = Path(file_path).suffix.lower()
if file_ext in code_extensions: if file_ext in code_extensions:
# Add language info to metadata
doc.metadata["language"] = code_extensions[file_ext] doc.metadata["language"] = code_extensions[file_ext]
doc.metadata["is_code"] = True doc.metadata["is_code"] = True
code_docs.append(doc) code_docs.append(doc)
@@ -70,7 +43,6 @@ def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
doc.metadata["is_code"] = False doc.metadata["is_code"] = False
text_docs.append(doc) text_docs.append(doc)
else: else:
# If no file path, treat as text
doc.metadata["is_code"] = False doc.metadata["is_code"] = False
text_docs.append(doc) text_docs.append(doc)
@@ -79,7 +51,7 @@ def detect_code_files(documents, code_extensions=None) -> tuple[list, list]:
def get_language_from_extension(file_path: str) -> Optional[str]: def get_language_from_extension(file_path: str) -> Optional[str]:
"""Get the programming language from file extension.""" """Return language string from a filename/extension using CODE_EXTENSIONS."""
ext = Path(file_path).suffix.lower() ext = Path(file_path).suffix.lower()
return CODE_EXTENSIONS.get(ext) return CODE_EXTENSIONS.get(ext)
@@ -90,40 +62,26 @@ def create_ast_chunks(
chunk_overlap: int = 64, chunk_overlap: int = 64,
metadata_template: str = "default", metadata_template: str = "default",
) -> list[str]: ) -> list[str]:
""" """Create AST-aware chunks from code documents using astchunk.
Create AST-aware chunks from code documents using astchunk.
Args: Falls back to traditional chunking if astchunk is unavailable.
documents: List of code documents
max_chunk_size: Maximum characters per chunk
chunk_overlap: Number of AST nodes to overlap between chunks
metadata_template: Template for chunk metadata
Returns:
List of text chunks with preserved code structure
""" """
try: try:
from astchunk import ASTChunkBuilder from astchunk import ASTChunkBuilder # optional dependency
except ImportError as e: except ImportError as e:
logger.error(f"astchunk not available: {e}") logger.error(f"astchunk not available: {e}")
logger.info("Falling back to traditional chunking for code files") logger.info("Falling back to traditional chunking for code files")
return create_traditional_chunks(documents, max_chunk_size, chunk_overlap) return create_traditional_chunks(documents, max_chunk_size, chunk_overlap)
all_chunks = [] all_chunks = []
for doc in documents: for doc in documents:
# Get language from metadata (set by detect_code_files)
language = doc.metadata.get("language") language = doc.metadata.get("language")
if not language: if not language:
logger.warning( logger.warning("No language detected; falling back to traditional chunking")
"No language detected for document, falling back to traditional chunking" all_chunks.extend(create_traditional_chunks([doc], max_chunk_size, chunk_overlap))
)
traditional_chunks = create_traditional_chunks([doc], max_chunk_size, chunk_overlap)
all_chunks.extend(traditional_chunks)
continue continue
try: try:
# Configure astchunk
configs = { configs = {
"max_chunk_size": max_chunk_size, "max_chunk_size": max_chunk_size,
"language": language, "language": language,
@@ -131,7 +89,6 @@ def create_ast_chunks(
"chunk_overlap": chunk_overlap if chunk_overlap > 0 else 0, "chunk_overlap": chunk_overlap if chunk_overlap > 0 else 0,
} }
# Add repository-level metadata if available
repo_metadata = { repo_metadata = {
"file_path": doc.metadata.get("file_path", ""), "file_path": doc.metadata.get("file_path", ""),
"file_name": doc.metadata.get("file_name", ""), "file_name": doc.metadata.get("file_name", ""),
@@ -140,17 +97,13 @@ def create_ast_chunks(
} }
configs["repo_level_metadata"] = repo_metadata configs["repo_level_metadata"] = repo_metadata
# Create chunk builder and process
chunk_builder = ASTChunkBuilder(**configs) chunk_builder = ASTChunkBuilder(**configs)
code_content = doc.get_content() code_content = doc.get_content()
if not code_content or not code_content.strip(): if not code_content or not code_content.strip():
logger.warning("Empty code content, skipping") logger.warning("Empty code content, skipping")
continue continue
chunks = chunk_builder.chunkify(code_content) chunks = chunk_builder.chunkify(code_content)
# Extract text content from chunks
for chunk in chunks: for chunk in chunks:
if hasattr(chunk, "text"): if hasattr(chunk, "text"):
chunk_text = chunk.text chunk_text = chunk.text
@@ -159,7 +112,6 @@ def create_ast_chunks(
elif isinstance(chunk, str): elif isinstance(chunk, str):
chunk_text = chunk chunk_text = chunk
else: else:
# Try to convert to string
chunk_text = str(chunk) chunk_text = str(chunk)
if chunk_text and chunk_text.strip(): if chunk_text and chunk_text.strip():
@@ -168,12 +120,10 @@ def create_ast_chunks(
logger.info( logger.info(
f"Created {len(chunks)} AST chunks from {language} file: {doc.metadata.get('file_name', 'unknown')}" f"Created {len(chunks)} AST chunks from {language} file: {doc.metadata.get('file_name', 'unknown')}"
) )
except Exception as e: except Exception as e:
logger.warning(f"AST chunking failed for {language} file: {e}") logger.warning(f"AST chunking failed for {language} file: {e}")
logger.info("Falling back to traditional chunking") logger.info("Falling back to traditional chunking")
traditional_chunks = create_traditional_chunks([doc], max_chunk_size, chunk_overlap) all_chunks.extend(create_traditional_chunks([doc], max_chunk_size, chunk_overlap))
all_chunks.extend(traditional_chunks)
return all_chunks return all_chunks
@@ -181,23 +131,10 @@ def create_ast_chunks(
def create_traditional_chunks( def create_traditional_chunks(
documents, chunk_size: int = 256, chunk_overlap: int = 128 documents, chunk_size: int = 256, chunk_overlap: int = 128
) -> list[str]: ) -> list[str]:
""" """Create traditional text chunks using LlamaIndex SentenceSplitter."""
Create traditional text chunks using LlamaIndex SentenceSplitter.
Args:
documents: List of documents to chunk
chunk_size: Size of each chunk in characters
chunk_overlap: Overlap between chunks
Returns:
List of text chunks
"""
# Handle invalid chunk_size values
if chunk_size <= 0: if chunk_size <= 0:
logger.warning(f"Invalid chunk_size={chunk_size}, using default value of 256") logger.warning(f"Invalid chunk_size={chunk_size}, using default value of 256")
chunk_size = 256 chunk_size = 256
# Ensure chunk_overlap is not negative and not larger than chunk_size
if chunk_overlap < 0: if chunk_overlap < 0:
chunk_overlap = 0 chunk_overlap = 0
if chunk_overlap >= chunk_size: if chunk_overlap >= chunk_size:
@@ -215,12 +152,9 @@ def create_traditional_chunks(
try: try:
nodes = node_parser.get_nodes_from_documents([doc]) nodes = node_parser.get_nodes_from_documents([doc])
if nodes: if nodes:
chunk_texts = [node.get_content() for node in nodes] all_texts.extend(node.get_content() for node in nodes)
all_texts.extend(chunk_texts)
logger.debug(f"Created {len(chunk_texts)} traditional chunks from document")
except Exception as e: except Exception as e:
logger.error(f"Traditional chunking failed for document: {e}") logger.error(f"Traditional chunking failed for document: {e}")
# As last resort, add the raw content
content = doc.get_content() content = doc.get_content()
if content and content.strip(): if content and content.strip():
all_texts.append(content.strip()) all_texts.append(content.strip())
@@ -238,32 +172,13 @@ def create_text_chunks(
code_file_extensions: Optional[list[str]] = None, code_file_extensions: Optional[list[str]] = None,
ast_fallback_traditional: bool = True, ast_fallback_traditional: bool = True,
) -> list[str]: ) -> list[str]:
""" """Create text chunks from documents with optional AST support for code files."""
Create text chunks from documents with optional AST support for code files.
Args:
documents: List of LlamaIndex Document objects
chunk_size: Size for traditional text chunks
chunk_overlap: Overlap for traditional text chunks
use_ast_chunking: Whether to use AST chunking for code files
ast_chunk_size: Size for AST chunks
ast_chunk_overlap: Overlap for AST chunks
code_file_extensions: Custom list of code file extensions
ast_fallback_traditional: Fall back to traditional chunking on AST errors
Returns:
List of text chunks
"""
if not documents: if not documents:
logger.warning("No documents provided for chunking") logger.warning("No documents provided for chunking")
return [] return []
# Create a local copy of supported extensions for this function call
local_code_extensions = CODE_EXTENSIONS.copy() local_code_extensions = CODE_EXTENSIONS.copy()
# Update supported extensions if provided
if code_file_extensions: if code_file_extensions:
# Map extensions to languages (simplified mapping)
ext_mapping = { ext_mapping = {
".py": "python", ".py": "python",
".java": "java", ".java": "java",
@@ -273,47 +188,32 @@ def create_text_chunks(
} }
for ext in code_file_extensions: for ext in code_file_extensions:
if ext.lower() not in local_code_extensions: if ext.lower() not in local_code_extensions:
# Try to guess language from extension
if ext.lower() in ext_mapping: if ext.lower() in ext_mapping:
local_code_extensions[ext.lower()] = ext_mapping[ext.lower()] local_code_extensions[ext.lower()] = ext_mapping[ext.lower()]
else: else:
logger.warning(f"Unsupported extension {ext}, will use traditional chunking") logger.warning(f"Unsupported extension {ext}, will use traditional chunking")
all_chunks = [] all_chunks = []
if use_ast_chunking: if use_ast_chunking:
# Separate code and text documents using local extensions
code_docs, text_docs = detect_code_files(documents, local_code_extensions) code_docs, text_docs = detect_code_files(documents, local_code_extensions)
# Process code files with AST chunking
if code_docs: if code_docs:
logger.info(f"Processing {len(code_docs)} code files with AST chunking")
try: try:
ast_chunks = create_ast_chunks( all_chunks.extend(
code_docs, max_chunk_size=ast_chunk_size, chunk_overlap=ast_chunk_overlap create_ast_chunks(
code_docs, max_chunk_size=ast_chunk_size, chunk_overlap=ast_chunk_overlap
)
) )
all_chunks.extend(ast_chunks)
logger.info(f"Created {len(ast_chunks)} AST chunks from code files")
except Exception as e: except Exception as e:
logger.error(f"AST chunking failed: {e}") logger.error(f"AST chunking failed: {e}")
if ast_fallback_traditional: if ast_fallback_traditional:
logger.info("Falling back to traditional chunking for code files") all_chunks.extend(
traditional_code_chunks = create_traditional_chunks( create_traditional_chunks(code_docs, chunk_size, chunk_overlap)
code_docs, chunk_size, chunk_overlap
) )
all_chunks.extend(traditional_code_chunks)
else: else:
raise raise
# Process text files with traditional chunking
if text_docs: if text_docs:
logger.info(f"Processing {len(text_docs)} text files with traditional chunking") all_chunks.extend(create_traditional_chunks(text_docs, chunk_size, chunk_overlap))
text_chunks = create_traditional_chunks(text_docs, chunk_size, chunk_overlap)
all_chunks.extend(text_chunks)
logger.info(f"Created {len(text_chunks)} traditional chunks from text files")
else: else:
# Use traditional chunking for all files
logger.info(f"Processing {len(documents)} documents with traditional chunking")
all_chunks = create_traditional_chunks(documents, chunk_size, chunk_overlap) all_chunks = create_traditional_chunks(documents, chunk_size, chunk_overlap)
logger.info(f"Total chunks created: {len(all_chunks)}") logger.info(f"Total chunks created: {len(all_chunks)}")

View File

@@ -1,6 +1,5 @@
import argparse import argparse
import asyncio import asyncio
import sys
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Union from typing import Any, Optional, Union
@@ -1216,13 +1215,8 @@ Examples:
if use_ast: if use_ast:
print("🧠 Using AST-aware chunking for code files") print("🧠 Using AST-aware chunking for code files")
try: try:
# Import enhanced chunking utilities # Import enhanced chunking utilities from packaged module
# Add apps directory to path to import chunking utilities from .chunking_utils import create_text_chunks
apps_dir = Path(__file__).parent.parent.parent.parent.parent / "apps"
if apps_dir.exists():
sys.path.insert(0, str(apps_dir))
from chunking import create_text_chunks
# Use enhanced chunking with AST support # Use enhanced chunking with AST support
all_texts = create_text_chunks( all_texts = create_text_chunks(
@@ -1237,7 +1231,9 @@ Examples:
) )
except ImportError as e: except ImportError as e:
print(f"⚠️ AST chunking not available ({e}), falling back to traditional chunking") print(
f"⚠️ AST chunking utilities not available in package ({e}), falling back to traditional chunking"
)
use_ast = False use_ast = False
if not use_ast: if not use_ast: