diff --git a/apps/semantic_file_search/leann-plus-temporal-search.py b/apps/semantic_file_search/leann-plus-temporal-search.py new file mode 100644 index 0000000..167189e --- /dev/null +++ b/apps/semantic_file_search/leann-plus-temporal-search.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +import re +import sys +from datetime import datetime, timedelta +from pathlib import Path + +from leann import LeannSearcher + +INDEX_PATH = str(Path("./").resolve() / "demo.leann") + + +class TimeParser: + def __init__(self): + # Main pattern: captures optional fuzzy modifier, number, unit, and optional "ago" + self.pattern = r"(?:(around|about|roughly|approximately)\s+)?(\d+)\s+(hour|day|week|month|year)s?(?:\s+ago)?" + + # Compile for performance + self.regex = re.compile(self.pattern, re.IGNORECASE) + + # Stop words to remove before regex parsing + self.stop_words = { + "in", + "at", + "of", + "by", + "as", + "me", + "the", + "a", + "an", + "and", + "any", + "find", + "search", + "list", + "ago", + "back", + "past", + "earlier", + } + + def clean_text(self, text): + """Remove stop words from text""" + words = text.split() + cleaned = " ".join(word for word in words if word.lower() not in self.stop_words) + return cleaned + + def parse(self, text): + """Extract all time expressions from text""" + # Clean text first + cleaned_text = self.clean_text(text) + + matches = [] + for match in self.regex.finditer(cleaned_text): + fuzzy = match.group(1) # "around", "about", etc. + number = int(match.group(2)) + unit = match.group(3).lower() + + matches.append( + { + "full_match": match.group(0), + "fuzzy": bool(fuzzy), + "number": number, + "unit": unit, + "range": self.calculate_range(number, unit, bool(fuzzy)), + } + ) + + return matches + + def calculate_range(self, number, unit, is_fuzzy): + """Convert to actual datetime range and return ISO format strings""" + units = { + "hour": timedelta(hours=number), + "day": timedelta(days=number), + "week": timedelta(weeks=number), + "month": timedelta(days=number * 30), + "year": timedelta(days=number * 365), + } + + delta = units[unit] + now = datetime.now() + target = now - delta + + if is_fuzzy: + buffer = delta * 0.2 # 20% buffer for fuzzy + start = (target - buffer).isoformat() + end = (target + buffer).isoformat() + else: + start = target.isoformat() + end = now.isoformat() + + return (start, end) + + +def search_files(query, top_k=15): + """Search the index and return results""" + # Parse time expressions + parser = TimeParser() + time_matches = parser.parse(query) + + # Remove time expressions from query for semantic search + clean_query = query + if time_matches: + for match in time_matches: + clean_query = clean_query.replace(match["full_match"], "").strip() + + # Check if clean_query is less than 4 characters + if len(clean_query) < 4: + print("Error: add more input for accurate results.") + return + + # Single query to vector DB + searcher = LeannSearcher(INDEX_PATH) + results = searcher.search( + clean_query if clean_query else query, top_k=top_k, recompute_embeddings=False + ) + + # Filter by time if time expression found + if time_matches: + time_range = time_matches[0]["range"] # Use first time expression + start_time, end_time = time_range + + filtered_results = [] + for result in results: + # Access metadata attribute directly (not .get()) + metadata = result.metadata if hasattr(result, "metadata") else {} + + if metadata: + # Check modification date first, fall back to creation date + date_str = metadata.get("modification_date") or metadata.get("creation_date") + + if date_str: + # Convert strings to datetime objects for proper comparison + try: + file_date = datetime.fromisoformat(date_str) + start_dt = datetime.fromisoformat(start_time) + end_dt = datetime.fromisoformat(end_time) + + # Compare dates properly + if start_dt <= file_date <= end_dt: + filtered_results.append(result) + except (ValueError, TypeError): + # Handle invalid date formats + print(f"Warning: Invalid date format in metadata: {date_str}") + continue + + results = filtered_results + + # Print results + print(f"\nSearch results for: '{query}'") + if time_matches: + print( + f"Time filter: {time_matches[0]['number']} {time_matches[0]['unit']}(s) {'(fuzzy)' if time_matches[0]['fuzzy'] else ''}" + ) + print( + f"Date range: {time_matches[0]['range'][0][:10]} to {time_matches[0]['range'][1][:10]}" + ) + print("-" * 80) + + for i, result in enumerate(results, 1): + print(f"\n[{i}] Score: {result.score:.4f}") + print(f"Content: {result.text}") + + # Show metadata if present + metadata = result.metadata if hasattr(result, "metadata") else None + if metadata: + if "creation_date" in metadata: + print(f"Created: {metadata['creation_date']}") + if "modification_date" in metadata: + print(f"Modified: {metadata['modification_date']}") + print("-" * 80) + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print('Usage: python search_index.py "" [top_k]') + sys.exit(1) + + query = sys.argv[1] + top_k = int(sys.argv[2]) if len(sys.argv) > 2 else 15 + + search_files(query, top_k) diff --git a/apps/semantic_file_search/leann_index_builder.py b/apps/semantic_file_search/leann_index_builder.py new file mode 100644 index 0000000..958c697 --- /dev/null +++ b/apps/semantic_file_search/leann_index_builder.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +import json +import sys +from pathlib import Path + +from leann import LeannBuilder + + +def process_json_items(json_file_path): + """Load and process JSON file with metadata items""" + + with open(json_file_path, encoding="utf-8") as f: + items = json.load(f) + + # Guard against empty JSON + if not items: + print("⚠️ No items found in the JSON file. Exiting gracefully.") + return + + INDEX_PATH = str(Path("./").resolve() / "demo.leann") + builder = LeannBuilder(backend_name="hnsw", is_recompute=False) + + total_items = len(items) + items_added = 0 + print(f"Processing {total_items} items...") + + for idx, item in enumerate(items): + try: + # Create embedding text sentence + embedding_text = f"{item.get('Name', 'unknown')} located at {item.get('Path', 'unknown')} and size {item.get('Size', 'unknown')} bytes with content type {item.get('ContentType', 'unknown')} and kind {item.get('Kind', 'unknown')}" + + # Prepare metadata with dates + metadata = {} + if "CreationDate" in item: + metadata["creation_date"] = item["CreationDate"] + if "ContentChangeDate" in item: + metadata["modification_date"] = item["ContentChangeDate"] + + # Add to builder + builder.add_text(embedding_text, metadata=metadata) + items_added += 1 + + except Exception as e: + print(f"\n⚠️ Warning: Failed to process item {idx}: {e}") + continue + + # Show progress + progress = (idx + 1) / total_items * 100 + sys.stdout.write(f"\rProgress: {idx + 1}/{total_items} ({progress:.1f}%)") + sys.stdout.flush() + + print() # New line after progress + + # Guard against no successfully added items + if items_added == 0: + print("⚠️ No items were successfully added to the index. Exiting gracefully.") + return + + print(f"\n✅ Successfully processed {items_added}/{total_items} items") + print("Building index...") + + try: + builder.build_index(INDEX_PATH) + print(f"✓ Index saved to {INDEX_PATH}") + except ValueError as e: + if "No chunks added" in str(e): + print("⚠️ No chunks were added to the builder. Index not created.") + else: + raise + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python build_index.py ") + sys.exit(1) + + json_file = sys.argv[1] + if not Path(json_file).exists(): + print(f"Error: File {json_file} not found") + sys.exit(1) + + process_json_items(json_file) diff --git a/apps/semantic_file_search/spotlight_index_dump.py b/apps/semantic_file_search/spotlight_index_dump.py new file mode 100644 index 0000000..84a1ff6 --- /dev/null +++ b/apps/semantic_file_search/spotlight_index_dump.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 +""" +Spotlight Metadata Dumper for Vector DB +Extracts only essential metadata for semantic search embeddings +Output is optimized for vector database storage with minimal fields +""" + +import json +import sys +from datetime import datetime + +# Check platform before importing macOS-specific modules +if sys.platform != "darwin": + print("This script requires macOS (uses Spotlight)") + sys.exit(1) + +from Foundation import NSDate, NSMetadataQuery, NSPredicate, NSRunLoop + +# EDIT THIS LIST: Add or remove folders to search +# Can be either: +# - Folder names relative to home directory (e.g., "Desktop", "Downloads") +# - Absolute paths (e.g., "/Applications", "/System/Library") +SEARCH_FOLDERS = [ + "Desktop", + "Downloads", + "Documents", + "Music", + "Pictures", + "Movies", + # "Library", # Uncomment to include + # "/Applications", # Absolute path example + # "Code/Projects", # Subfolder example + # Add any other folders here +] + + +def convert_to_serializable(obj): + """Convert NS objects to Python serializable types""" + if obj is None: + return None + + # Handle NSDate + if hasattr(obj, "timeIntervalSince1970"): + return datetime.fromtimestamp(obj.timeIntervalSince1970()).isoformat() + + # Handle NSArray + if hasattr(obj, "count") and hasattr(obj, "objectAtIndex_"): + return [convert_to_serializable(obj.objectAtIndex_(i)) for i in range(obj.count())] + + # Convert to string + try: + return str(obj) + except Exception: + return repr(obj) + + +def dump_spotlight_data(max_items=10, output_file="spotlight_dump.json"): + """ + Dump Spotlight data using public.item predicate + """ + # Build full paths from SEARCH_FOLDERS + import os + + home_dir = os.path.expanduser("~") + search_paths = [] + + print("Search locations:") + for folder in SEARCH_FOLDERS: + # Check if it's an absolute path or relative + if folder.startswith("/"): + full_path = folder + else: + full_path = os.path.join(home_dir, folder) + + if os.path.exists(full_path): + search_paths.append(full_path) + print(f" ✓ {full_path}") + else: + print(f" ✗ {full_path} (not found)") + + if not search_paths: + print("No valid search paths found!") + return [] + + print(f"\nDumping {max_items} items from Spotlight (public.item)...") + + # Create query with public.item predicate + query = NSMetadataQuery.alloc().init() + predicate = NSPredicate.predicateWithFormat_("kMDItemContentTypeTree CONTAINS 'public.item'") + query.setPredicate_(predicate) + + # Set search scopes to our specific folders + query.setSearchScopes_(search_paths) + + print("Starting query...") + query.startQuery() + + # Wait for gathering to complete + run_loop = NSRunLoop.currentRunLoop() + print("Gathering results...") + + # Let it gather for a few seconds + for i in range(50): # 5 seconds max + run_loop.runMode_beforeDate_( + "NSDefaultRunLoopMode", NSDate.dateWithTimeIntervalSinceNow_(0.1) + ) + # Check gathering status periodically + if i % 10 == 0: + current_count = query.resultCount() + if current_count > 0: + print(f" Found {current_count} items so far...") + + # Continue while still gathering (up to 2 more seconds) + timeout = NSDate.dateWithTimeIntervalSinceNow_(2.0) + while query.isGathering() and timeout.timeIntervalSinceNow() > 0: + run_loop.runMode_beforeDate_( + "NSDefaultRunLoopMode", NSDate.dateWithTimeIntervalSinceNow_(0.1) + ) + + query.stopQuery() + + total_results = query.resultCount() + print(f"Found {total_results} total items") + + if total_results == 0: + print("No results found") + return [] + + # Process items + items_to_process = min(total_results, max_items) + results = [] + + # ONLY relevant attributes for vector embeddings + # These provide essential context for semantic search without bloat + attributes = [ + "kMDItemPath", # Full path for file retrieval + "kMDItemFSName", # Filename for display & embedding + "kMDItemFSSize", # Size for filtering/ranking + "kMDItemContentType", # File type for categorization + "kMDItemKind", # Human-readable type for embedding + "kMDItemFSCreationDate", # Temporal context + "kMDItemFSContentChangeDate", # Recency for ranking + ] + + print(f"Processing {items_to_process} items...") + + for i in range(items_to_process): + try: + item = query.resultAtIndex_(i) + metadata = {} + + # Extract ONLY the relevant attributes + for attr in attributes: + try: + value = item.valueForAttribute_(attr) + if value is not None: + # Keep the attribute name clean (remove kMDItem prefix for cleaner JSON) + clean_key = attr.replace("kMDItem", "").replace("FS", "") + metadata[clean_key] = convert_to_serializable(value) + except (AttributeError, ValueError, TypeError): + continue + + # Only add if we have at least a path + if metadata.get("Path"): + results.append(metadata) + + except Exception as e: + print(f"Error processing item {i}: {e}") + continue + + # Save to JSON + with open(output_file, "w", encoding="utf-8") as f: + json.dump(results, f, indent=2, ensure_ascii=False) + + print(f"\n✓ Saved {len(results)} items to {output_file}") + + # Show summary + print("\nSample items:") + import os + + home_dir = os.path.expanduser("~") + + for i, item in enumerate(results[:3]): + print(f"\n[Item {i + 1}]") + print(f" Path: {item.get('Path', 'N/A')}") + print(f" Name: {item.get('Name', 'N/A')}") + print(f" Type: {item.get('ContentType', 'N/A')}") + print(f" Kind: {item.get('Kind', 'N/A')}") + + # Handle size properly + size = item.get("Size") + if size: + try: + size_int = int(size) + if size_int > 1024 * 1024: + print(f" Size: {size_int / (1024 * 1024):.2f} MB") + elif size_int > 1024: + print(f" Size: {size_int / 1024:.2f} KB") + else: + print(f" Size: {size_int} bytes") + except (ValueError, TypeError): + print(f" Size: {size}") + + # Show dates + if "CreationDate" in item: + print(f" Created: {item['CreationDate']}") + if "ContentChangeDate" in item: + print(f" Modified: {item['ContentChangeDate']}") + + # Count by type + type_counts = {} + for item in results: + content_type = item.get("ContentType", "unknown") + type_counts[content_type] = type_counts.get(content_type, 0) + 1 + + print(f"\nTotal items saved: {len(results)}") + + if type_counts: + print("\nTop content types:") + for ct, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)[:5]: + print(f" {ct}: {count} items") + + # Count by folder + folder_counts = {} + for item in results: + path = item.get("Path", "") + for folder in SEARCH_FOLDERS: + # Build the full folder path + if folder.startswith("/"): + folder_path = folder + else: + folder_path = os.path.join(home_dir, folder) + + if path.startswith(folder_path): + folder_counts[folder] = folder_counts.get(folder, 0) + 1 + break + + if folder_counts: + print("\nItems by location:") + for folder, count in sorted(folder_counts.items(), key=lambda x: x[1], reverse=True): + print(f" {folder}: {count} items") + + return results + + +def main(): + # Parse arguments + if len(sys.argv) > 1: + try: + max_items = int(sys.argv[1]) + except ValueError: + print("Usage: python spot.py [number_of_items]") + print("Default: 10 items") + sys.exit(1) + else: + max_items = 10 + + output_file = sys.argv[2] if len(sys.argv) > 2 else "spotlight_dump.json" + + # Run dump + dump_spotlight_data(max_items=max_items, output_file=output_file) + + +if __name__ == "__main__": + main()