● feat: Draft pip package policy management system (not yet integrated)

Add comprehensive pip dependency conflict resolution framework as draft implementation. This is self-contained and does not affect existing
ComfyUI Manager functionality.

Key components:
- pip_util.py with PipBatch class for policy-driven package management
- Lazy-loaded policy system supporting base + user overrides
- Multi-stage policy execution (uninstall → apply_first_match → apply_all_matches → restore)
- Conditional policies based on platform, installed packages, and ComfyUI version
- Comprehensive test suite covering edge cases, workflows, and platform scenarios
- Design and implementation documentation

Policy capabilities (draft):
- Package replacement (e.g., PIL → Pillow, opencv-python → opencv-contrib-python)
- Version pinning to prevent dependency conflicts
- Dependency protection during installations
- Platform-specific handling (Linux/Windows, GPU detection)
- Pre-removal and post-restoration workflows

Testing infrastructure:
- Pytest-based test suite with isolated environments
- Dependency analysis tools for conflict detection
- Coverage for policy priority, edge cases, and environment recovery

Status: Draft implementation complete, integration with manager workflows pending.
This commit is contained in:
Dr.Lt.Data
2025-10-04 08:55:59 +09:00
parent 1ab2b1aeb3
commit 2866193baf
29 changed files with 10024 additions and 1 deletions

View File

@@ -0,0 +1,713 @@
# Design Document for pip_util.py Implementation
This is designed to minimize breaking existing installed dependencies.
## List of Functions to Implement
## Global Policy Management
### Global Variables
```python
_pip_policy_cache = None # Policy cache (program-wide, loaded once)
```
### Global Functions
* get_pip_policy(): Returns policy for resolving pip dependency conflicts (lazy loading)
- **Call timing**: Called whenever needed (automatically loads only once on first call)
- **Purpose**: Returns policy cache, automatically loads if cache is empty
- **Execution flow**:
1. Declare global _pip_policy_cache
2. If _pip_policy_cache is already loaded, return immediately (prevent duplicate loading)
3. Read base policy file:
- Path: {manager_util.comfyui_manager_path}/pip-policy.json
- Use empty dictionary if file doesn't exist
- Log error and use empty dictionary if JSON parsing fails
4. Read user policy file:
- Path: {context.manager_files_path}/pip-policy.user.json
- Create empty JSON file if doesn't exist ({"_comment": "User-specific pip policy overrides"})
- Log warning and use empty dictionary if JSON parsing fails
5. Apply merge rules (merge by package name):
- Start with base policy as base
- For each package in user policy:
* Package only in user policy: add to base
* Package only in base policy: keep in base
* Package in both: completely replace with user policy (entire package replacement, not section-level)
6. Store merged policy in _pip_policy_cache
7. Log policy load success (include number of loaded package policies)
8. Return _pip_policy_cache
- **Return value**: Dict (merged policy dictionary)
- **Exception handling**:
- File read failure: Log warning and treat file as empty dictionary
- JSON parsing failure: Log error and treat file as empty dictionary
- **Notes**:
- Lazy loading pattern automatically loads on first call
- Not thread-safe, caution needed in multi-threaded environments
- Policy file structure should support the following scenarios:
- Dictionary structure of {dependency name -> policy object}
- Policy object has four policy sections:
- **uninstall**: Package removal policy (pre-processing, condition optional)
- **apply_first_match**: Evaluate top-to-bottom and execute only the first policy that satisfies condition (exclusive)
- **apply_all_matches**: Execute all policies that satisfy conditions (cumulative)
- **restore**: Package restoration policy (post-processing, condition optional)
- Condition types:
- installed: Check version condition of already installed dependencies
- spec is optional
- package field: Specify package to check (optional, defaults to self)
- Explicit: Reference another package (e.g., numba checks numpy version)
- Omitted: Check own version (e.g., critical-package checks its own version)
- platform: Platform conditions (os, has_gpu, comfyui_version, etc.)
- If condition is absent, always considered satisfied
- uninstall policy (pre-removal policy):
- Removal policy list (condition is optional, evaluate top-to-bottom and execute only first match)
- When condition satisfied (or always if no condition): remove target package and abort installation
- If this policy is applied, all subsequent steps are ignored
- target field specifies package to remove
- Example: Unconditionally remove if specific package is installed
- Actions available in apply_first_match (determine installation method, exclusive):
- skip: Block installation of specific dependency
- force_version: Force change to specific version during installation
- extra_index_url field can specify custom package repository (optional)
- replace: Replace with different dependency
- extra_index_url field can specify custom package repository (optional)
- Actions available in apply_all_matches (installation options, cumulative):
- pin_dependencies: Pin currently installed versions of other dependencies
- pinned_packages field specifies package list
- Example: `pip install requests urllib3==1.26.15 certifi==2023.7.22 charset-normalizer==3.2.0`
- Real use case: Prevent urllib3 from upgrading to 2.x when installing requests
- on_failure: "fail" or "retry_without_pin"
- install_with: Specify additional dependencies to install together
- warn: Record warning message in log
- restore policy (post-restoration policy):
- Restoration policy list (condition is optional, evaluate top-to-bottom and execute only first match)
- Executed after package installation completes (post-processing)
- When condition satisfied (or always if no condition): force install target package to specific version
- target field specifies package to restore (can be different package)
- version field specifies version to install
- extra_index_url field can specify custom package repository (optional)
- Example: Reinstall/change version if specific package is deleted or wrong version
- Execution order:
1. uninstall evaluation: If condition satisfied, remove package and **terminate** (ignore subsequent steps)
2. apply_first_match evaluation:
- Execute first policy that satisfies condition among skip/force_version/replace
- If no matching policy, proceed with default installation of originally requested package
3. apply_all_matches evaluation: Apply all pin_dependencies, install_with, warn that satisfy conditions
4. Execute actual package installation (pip install or uv pip install)
5. restore evaluation: If condition satisfied, restore target package (post-processing)
## Batch Unit Class (PipBatch)
### Class Structure
```python
class PipBatch:
"""
pip package installation batch unit manager
Maintains pip freeze cache during batch operations for performance optimization
Usage pattern:
# Batch operations (policy auto-loaded)
with PipBatch() as batch:
batch.ensure_not_installed()
batch.install("numpy>=1.20")
batch.install("pandas>=2.0")
batch.install("scipy>=1.7")
batch.ensure_installed()
"""
def __init__(self):
self._installed_cache = None # Installed packages cache (batch-level)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._installed_cache = None
```
### Private Methods
* PipBatch._refresh_installed_cache():
- **Purpose**: Read currently installed package information and refresh cache
- **Execution flow**:
1. Generate command using manager_util.make_pip_cmd(["freeze"])
2. Execute pip freeze via subprocess
3. Parse output:
- Each line is in "package_name==version" format
- Parse "package_name==version" to create dictionary
- Ignore editable packages (starting with -e)
- Ignore comments (starting with #)
4. Store parsed dictionary in self._installed_cache
- **Return value**: None
- **Exception handling**:
- pip freeze failure: Set cache to empty dictionary and log warning
- Parse failure: Ignore line and continue
* PipBatch._get_installed_packages():
- **Purpose**: Return cached installed package information (refresh if cache is None)
- **Execution flow**:
1. If self._installed_cache is None, call _refresh_installed_cache()
2. Return self._installed_cache
- **Return value**: {package_name: version} dictionary
* PipBatch._invalidate_cache():
- **Purpose**: Invalidate cache after package install/uninstall
- **Execution flow**:
1. Set self._installed_cache = None
- **Return value**: None
- **Call timing**: After install(), ensure_not_installed(), ensure_installed()
* PipBatch._parse_package_spec(package_info):
- **Purpose**: Split package spec string into package name and version spec
- **Parameters**:
- package_info: "numpy", "numpy==1.26.0", "numpy>=1.20.0", "numpy~=1.20", etc.
- **Execution flow**:
1. Use regex to split package name and version spec
2. Pattern: `^([a-zA-Z0-9_-]+)([><=!~]+.*)?$`
- **Return value**: (package_name, version_spec) tuple
- Examples: ("numpy", "==1.26.0"), ("pandas", ">=2.0.0"), ("scipy", None)
- **Exception handling**:
- Parse failure: Raise ValueError
* PipBatch._evaluate_condition(condition, package_name, installed_packages):
- **Purpose**: Evaluate policy condition and return whether satisfied
- **Parameters**:
- condition: Policy condition object (dictionary)
- package_name: Name of package currently being processed
- installed_packages: {package_name: version} dictionary
- **Execution flow**:
1. If condition is None, return True (always satisfied)
2. Branch based on condition["type"]:
a. "installed" type:
- target_package = condition.get("package", package_name)
- Check current version with installed_packages.get(target_package)
- If not installed (None), return False
- If spec exists, compare version using packaging.specifiers.SpecifierSet
- If no spec, only check installation status (True)
b. "platform" type:
- If condition["os"] exists, compare with platform.system()
- If condition["has_gpu"] exists, check GPU presence (torch.cuda.is_available(), etc.)
- If condition["comfyui_version"] exists, compare ComfyUI version
- Return True if all conditions satisfied
3. Return True if all conditions satisfied, False if any unsatisfied
- **Return value**: bool
- **Exception handling**:
- Version comparison failure: Log warning and return False
- Unknown condition type: Log warning and return False
### Public Methods
* PipBatch.install(package_info, extra_index_url=None, override_policy=False):
- **Purpose**: Perform policy-based pip package installation (individual package basis)
- **Parameters**:
- package_info: Package name and version spec (e.g., "numpy", "numpy==1.26.0", "numpy>=1.20.0")
- extra_index_url: Additional package repository URL (optional)
- override_policy: If True, skip policy application and install directly (default: False)
- **Execution flow**:
1. Call get_pip_policy() to get policy (lazy loading)
2. Use self._parse_package_spec() to split package_info into package name and version spec
3. Call self._get_installed_packages() to get cached installed package information
4. If override_policy=True → Jump directly to step 10 (skip policy)
5. Get policy for package name from policy dictionary
6. If no policy → Jump to step 10 (default installation)
7. **apply_first_match policy evaluation** (exclusive - only first match):
- Iterate through policy list top-to-bottom
- Evaluate each policy's condition with self._evaluate_condition()
- When first condition-satisfying policy found:
* type="skip": Log reason and return False (don't install)
* type="force_version": Change package_info version to policy's version
* type="replace": Completely replace package_info with policy's replacement package
- If no matching policy, keep original package_info
8. **apply_all_matches policy evaluation** (cumulative - all matches):
- Iterate through policy list top-to-bottom
- Evaluate each policy's condition with self._evaluate_condition()
- For all condition-satisfying policies:
* type="pin_dependencies":
- For each package in pinned_packages, query current version with self._installed_cache.get(pkg)
- Pin to installed version in "package==version" format
- Add to installation package list
* type="install_with":
- Add additional_packages to installation package list
* type="warn":
- Output message as warning log
- If allow_continue=false, wait for user confirmation (optional)
9. Compose final installation package list:
- Main package (modified/replaced package_info)
- Packages pinned by pin_dependencies
- Packages added by install_with
10. Handle extra_index_url:
- Parameter-passed extra_index_url takes priority
- Otherwise use extra_index_url defined in policy
11. Generate pip/uv command using manager_util.make_pip_cmd():
- Basic format: ["pip", "install"] + package list
- If extra_index_url exists: add ["--extra-index-url", url]
12. Execute command via subprocess
13. Handle installation failure:
- If pin_dependencies's on_failure="retry_without_pin":
* Retry with only main package excluding pinned packages
- If on_failure="fail":
* Raise exception and abort installation
- Otherwise: Log warning and continue
14. On successful installation:
- Call self._invalidate_cache() (invalidate cache)
- Log info if reason exists
- Return True
- **Return value**: Installation success status (bool)
- **Exception handling**:
- Policy parsing failure: Log warning and proceed with default installation
- Installation failure: Log error and raise exception (depends on on_failure setting)
- **Notes**:
- restore policy not handled in this method (batch-processed in ensure_installed())
- uninstall policy not handled in this method (batch-processed in ensure_not_installed())
* PipBatch.ensure_not_installed():
- **Purpose**: Iterate through all policies and remove all packages satisfying uninstall conditions (batch processing)
- **Parameters**: None
- **Execution flow**:
1. Call get_pip_policy() to get policy (lazy loading)
2. Call self._get_installed_packages() to get cached installed package information
3. Iterate through all package policies in policy dictionary:
a. Check if each package has uninstall policy
b. If uninstall policy exists:
- Iterate through uninstall policy list top-to-bottom
- Evaluate each policy's condition with self._evaluate_condition()
- When first condition-satisfying policy found:
* Check if target package exists in self._installed_cache
* If installed:
- Generate command with manager_util.make_pip_cmd(["uninstall", "-y", target])
- Execute pip uninstall via subprocess
- Log reason in info log
- Add to removed package list
- Remove package from self._installed_cache
* Move to next package (only first match per package)
4. Complete iteration through all package policies
- **Return value**: List of removed package names (list of str)
- **Exception handling**:
- Individual package removal failure: Log warning only and continue to next package
- **Call timing**:
- Called at batch operation start to pre-remove conflicting packages
- Called before multiple package installations to clean installation environment
* PipBatch.ensure_installed():
- **Purpose**: Iterate through all policies and restore all packages satisfying restore conditions (batch processing)
- **Parameters**: None
- **Execution flow**:
1. Call get_pip_policy() to get policy (lazy loading)
2. Call self._get_installed_packages() to get cached installed package information
3. Iterate through all package policies in policy dictionary:
a. Check if each package has restore policy
b. If restore policy exists:
- Iterate through restore policy list top-to-bottom
- Evaluate each policy's condition with self._evaluate_condition()
- When first condition-satisfying policy found:
* Get target package name (policy's "target" field)
* Get version specified in version field
* Check current version with self._installed_cache.get(target)
* If current version is None or different from specified version:
- Compose as package_spec = f"{target}=={version}" format
- Generate command with manager_util.make_pip_cmd(["install", package_spec])
- If extra_index_url exists, add ["--extra-index-url", url]
- Execute pip install via subprocess
- Log reason in info log
- Add to restored package list
- Update cache: self._installed_cache[target] = version
* Move to next package (only first match per package)
4. Complete iteration through all package policies
- **Return value**: List of restored package names (list of str)
- **Exception handling**:
- Individual package installation failure: Log warning only and continue to next package
- **Call timing**:
- Called at batch operation end to restore essential package versions
- Called for environment verification after multiple package installations
## pip-policy.json Examples
### Base Policy File ({manager_util.comfyui_manager_path}/pip-policy.json)
```json
{
"torch": {
"apply_first_match": [
{
"type": "skip",
"reason": "PyTorch installation should be managed manually due to CUDA compatibility"
}
]
},
"opencv-python": {
"apply_first_match": [
{
"type": "replace",
"replacement": "opencv-contrib-python",
"version": ">=4.8.0",
"reason": "opencv-contrib-python includes all opencv-python features plus extras"
}
]
},
"PIL": {
"apply_first_match": [
{
"type": "replace",
"replacement": "Pillow",
"reason": "PIL is deprecated, use Pillow instead"
}
]
},
"click": {
"apply_first_match": [
{
"condition": {
"type": "installed",
"package": "colorama",
"spec": "<0.5.0"
},
"type": "force_version",
"version": "8.1.3",
"reason": "click 8.1.3 compatible with colorama <0.5"
}
],
"apply_all_matches": [
{
"type": "pin_dependencies",
"pinned_packages": ["colorama"],
"reason": "Prevent colorama upgrade that may break compatibility"
}
]
},
"requests": {
"apply_all_matches": [
{
"type": "pin_dependencies",
"pinned_packages": ["urllib3", "certifi", "charset-normalizer"],
"on_failure": "retry_without_pin",
"reason": "Prevent urllib3 from upgrading to 2.x which has breaking changes"
}
]
},
"six": {
"restore": [
{
"target": "six",
"version": "1.16.0",
"reason": "six must be maintained at 1.16.0 for compatibility"
}
]
},
"urllib3": {
"restore": [
{
"condition": {
"type": "installed",
"spec": "!=1.26.15"
},
"target": "urllib3",
"version": "1.26.15",
"reason": "urllib3 must be 1.26.15 for compatibility with legacy code"
}
]
},
"onnxruntime": {
"apply_first_match": [
{
"condition": {
"type": "platform",
"os": "linux",
"has_gpu": true
},
"type": "replace",
"replacement": "onnxruntime-gpu",
"reason": "Use GPU version on Linux with CUDA"
}
]
},
"legacy-custom-node-package": {
"apply_first_match": [
{
"condition": {
"type": "platform",
"comfyui_version": "<1.0.0"
},
"type": "force_version",
"version": "0.9.0",
"reason": "legacy-custom-node-package 0.9.0 is compatible with ComfyUI <1.0.0"
},
{
"condition": {
"type": "platform",
"comfyui_version": ">=1.0.0"
},
"type": "force_version",
"version": "1.5.0",
"reason": "legacy-custom-node-package 1.5.0 is required for ComfyUI >=1.0.0"
}
]
},
"tensorflow": {
"apply_all_matches": [
{
"condition": {
"type": "installed",
"package": "torch"
},
"type": "warn",
"message": "Installing TensorFlow alongside PyTorch may cause CUDA conflicts",
"allow_continue": true
}
]
},
"some-package": {
"uninstall": [
{
"condition": {
"type": "installed",
"package": "conflicting-package",
"spec": ">=2.0.0"
},
"target": "conflicting-package",
"reason": "conflicting-package >=2.0.0 conflicts with some-package"
}
]
},
"banned-malicious-package": {
"uninstall": [
{
"target": "banned-malicious-package",
"reason": "Security vulnerability CVE-2024-XXXXX, always remove if attempting to install"
}
]
},
"critical-package": {
"restore": [
{
"condition": {
"type": "installed",
"package": "critical-package",
"spec": "!=1.2.3"
},
"target": "critical-package",
"version": "1.2.3",
"extra_index_url": "https://custom-repo.example.com/simple",
"reason": "critical-package must be version 1.2.3, restore if different or missing"
}
]
},
"stable-package": {
"apply_first_match": [
{
"condition": {
"type": "installed",
"package": "critical-dependency",
"spec": ">=2.0.0"
},
"type": "force_version",
"version": "1.5.0",
"extra_index_url": "https://custom-repo.example.com/simple",
"reason": "stable-package 1.5.0 is required when critical-dependency >=2.0.0 is installed"
}
]
},
"new-experimental-package": {
"apply_all_matches": [
{
"type": "pin_dependencies",
"pinned_packages": ["numpy", "pandas", "scipy"],
"on_failure": "retry_without_pin",
"reason": "new-experimental-package may upgrade numpy/pandas/scipy, pin them to prevent breakage"
}
]
},
"pytorch-addon": {
"apply_all_matches": [
{
"condition": {
"type": "installed",
"package": "torch",
"spec": ">=2.0.0"
},
"type": "pin_dependencies",
"pinned_packages": ["torch", "torchvision", "torchaudio"],
"on_failure": "fail",
"reason": "pytorch-addon must not change PyTorch ecosystem versions"
}
]
}
}
```
### Policy Structure Schema
```json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"patternProperties": {
"^.*$": {
"type": "object",
"properties": {
"uninstall": {
"type": "array",
"description": "When condition satisfied (or always if no condition), remove package and terminate",
"items": {
"type": "object",
"required": ["target"],
"properties": {
"condition": {
"type": "object",
"description": "Optional: always remove if absent",
"required": ["type"],
"properties": {
"type": {"enum": ["installed", "platform"]},
"package": {"type": "string", "description": "Optional: defaults to self"},
"spec": {"type": "string", "description": "Optional: version condition"},
"os": {"type": "string"},
"has_gpu": {"type": "boolean"},
"comfyui_version": {"type": "string"}
}
},
"target": {
"type": "string",
"description": "Package name to remove"
},
"reason": {"type": "string"}
}
}
},
"restore": {
"type": "array",
"description": "When condition satisfied (or always if no condition), restore package and terminate",
"items": {
"type": "object",
"required": ["target", "version"],
"properties": {
"condition": {
"type": "object",
"description": "Optional: always restore if absent",
"required": ["type"],
"properties": {
"type": {"enum": ["installed", "platform"]},
"package": {"type": "string", "description": "Optional: defaults to self"},
"spec": {"type": "string", "description": "Optional: version condition"},
"os": {"type": "string"},
"has_gpu": {"type": "boolean"},
"comfyui_version": {"type": "string"}
}
},
"target": {
"type": "string",
"description": "Package name to restore"
},
"version": {
"type": "string",
"description": "Version to restore"
},
"extra_index_url": {"type": "string"},
"reason": {"type": "string"}
}
}
},
"apply_first_match": {
"type": "array",
"description": "Execute only first condition-satisfying policy (exclusive)",
"items": {
"type": "object",
"required": ["type"],
"properties": {
"condition": {
"type": "object",
"description": "Optional: always apply if absent",
"required": ["type"],
"properties": {
"type": {"enum": ["installed", "platform"]},
"package": {"type": "string", "description": "Optional: defaults to self"},
"spec": {"type": "string", "description": "Optional: version condition"},
"os": {"type": "string"},
"has_gpu": {"type": "boolean"},
"comfyui_version": {"type": "string"}
}
},
"type": {
"enum": ["skip", "force_version", "replace"],
"description": "Exclusive action: determines installation method"
},
"version": {"type": "string"},
"replacement": {"type": "string"},
"extra_index_url": {"type": "string"},
"reason": {"type": "string"}
}
}
},
"apply_all_matches": {
"type": "array",
"description": "Execute all condition-satisfying policies (cumulative)",
"items": {
"type": "object",
"required": ["type"],
"properties": {
"condition": {
"type": "object",
"description": "Optional: always apply if absent",
"required": ["type"],
"properties": {
"type": {"enum": ["installed", "platform"]},
"package": {"type": "string", "description": "Optional: defaults to self"},
"spec": {"type": "string", "description": "Optional: version condition"},
"os": {"type": "string"},
"has_gpu": {"type": "boolean"},
"comfyui_version": {"type": "string"}
}
},
"type": {
"enum": ["pin_dependencies", "install_with", "warn"],
"description": "Cumulative action: adds installation options"
},
"pinned_packages": {
"type": "array",
"items": {"type": "string"}
},
"on_failure": {"enum": ["fail", "retry_without_pin"]},
"additional_packages": {"type": "array"},
"message": {"type": "string"},
"allow_continue": {"type": "boolean"},
"reason": {"type": "string"}
}
}
}
}
}
}
}
```
## Error Handling
* Default behavior when errors occur during policy execution:
- Log error and continue
- Only treat as installation failure when pin_dependencies's on_failure="fail"
- For other cases, leave warning and attempt originally requested installation
* pip_install: Performs pip package installation
- Use manager_util.make_pip_cmd to generate commands for selective application of uv and pip
- Provide functionality to skip policy application through override_policy flag

View File

@@ -0,0 +1,614 @@
# pip_util.py Implementation Plan Document
## 1. Project Overview
### Purpose
Implement a policy-based pip package management system that minimizes breaking existing installed dependencies
### Core Features
- JSON-based policy file loading and merging (lazy loading)
- Per-package installation policy evaluation and application
- Performance optimization through batch-level pip freeze caching
- Automated conditional package removal/restoration
### Technology Stack
- Python 3.x
- packaging library (version comparison)
- subprocess (pip command execution)
- json (policy file parsing)
---
## 2. Architecture Design
### 2.1 Global Policy Management (Lazy Loading Pattern)
```
┌─────────────────────────────────────┐
│ get_pip_policy() │
│ - Auto-loads policy files on │
│ first call via lazy loading │
│ - Returns cache on subsequent calls│
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ _pip_policy_cache (global) │
│ - Merged policy dictionary │
│ - {package_name: policy_object} │
└─────────────────────────────────────┘
```
### 2.2 Batch Operation Class (PipBatch)
```
┌─────────────────────────────────────┐
│ PipBatch (Context Manager) │
│ ┌───────────────────────────────┐ │
│ │ _installed_cache │ │
│ │ - Caches pip freeze results │ │
│ │ - {package: version} │ │
│ └───────────────────────────────┘ │
│ │
│ Public Methods: │
│ ├─ install() │
│ ├─ ensure_not_installed() │
│ └─ ensure_installed() │
│ │
│ Private Methods: │
│ ├─ _get_installed_packages() │
│ ├─ _refresh_installed_cache() │
│ ├─ _invalidate_cache() │
│ ├─ _parse_package_spec() │
│ └─ _evaluate_condition() │
└─────────────────────────────────────┘
```
### 2.3 Policy Evaluation Flow
```
install("numpy>=1.20") called
get_pip_policy() → Load policy (lazy)
Parse package name: "numpy"
Look up "numpy" policy in policy dictionary
├─ Evaluate apply_first_match (exclusive)
│ ├─ skip → Return False (don't install)
│ ├─ force_version → Change version
│ └─ replace → Replace package
├─ Evaluate apply_all_matches (cumulative)
│ ├─ pin_dependencies → Pin dependencies
│ ├─ install_with → Additional packages
│ └─ warn → Warning log
Execute pip install
Invalidate cache (_invalidate_cache)
```
---
## 3. Phase-by-Phase Implementation Plan
### Phase 1: Core Infrastructure Setup (2-3 hours)
#### Task 1.1: Project Structure and Dependency Setup (30 min)
**Implementation**:
- Create `pip_util.py` file
- Add necessary import statements
```python
import json
import logging
import platform
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from . import manager_util, context
```
- Set up logging
```python
logger = logging.getLogger(__name__)
```
**Validation**:
- Module loads without import errors
- Logger works correctly
#### Task 1.2: Global Variable and get_pip_policy() Implementation (1 hour)
**Implementation**:
- Declare global variable
```python
_pip_policy_cache: Optional[Dict] = None
```
- Implement `get_pip_policy()` function
- Check cache and early return
- Read base policy file (`{manager_util.comfyui_manager_path}/pip-policy.json`)
- Read user policy file (`{context.manager_files_path}/pip-policy.user.json`)
- Create file if doesn't exist (for user policy)
- Merge policies (complete package-level replacement)
- Save to cache and return
**Exception Handling**:
- `FileNotFoundError`: File not found → Use empty dictionary
- `json.JSONDecodeError`: JSON parse failure → Warning log + empty dictionary
- General exception: Warning log + empty dictionary
**Validation**:
- Returns empty dictionary when policy files don't exist
- Returns correct merged result when policy files exist
- Confirms cache usage on second call (load log appears only once)
#### Task 1.3: PipBatch Class Basic Structure (30 min)
**Implementation**:
- Class definition and `__init__`
```python
class PipBatch:
def __init__(self):
self._installed_cache: Optional[Dict[str, str]] = None
```
- Context manager methods (`__enter__`, `__exit__`)
```python
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._installed_cache = None
return False
```
**Validation**:
- `with PipBatch() as batch:` syntax works correctly
- Cache cleared on `__exit__` call
---
### Phase 2: Caching and Utility Methods (2-3 hours)
#### Task 2.1: pip freeze Caching Methods (1 hour)
**Implementation**:
- Implement `_refresh_installed_cache()`
- Call `manager_util.make_pip_cmd(["freeze"])`
- Execute command via subprocess
- Parse output (package==version format)
- Exclude editable packages (-e) and comments (#)
- Convert to dictionary and store in `self._installed_cache`
- Implement `_get_installed_packages()`
- Call `_refresh_installed_cache()` if cache is None
- Return cache
- Implement `_invalidate_cache()`
- Set `self._installed_cache = None`
**Exception Handling**:
- `subprocess.CalledProcessError`: pip freeze failure → Empty dictionary
- Parse error: Ignore line + warning log
**Validation**:
- pip freeze results correctly parsed into dictionary
- New load occurs after cache invalidation and re-query
#### Task 2.2: Package Spec Parsing (30 min)
**Implementation**:
- Implement `_parse_package_spec(package_info)`
- Regex pattern: `^([a-zA-Z0-9_-]+)([><=!~]+.*)?$`
- Split package name and version spec
- Return tuple: `(package_name, version_spec)`
**Exception Handling**:
- Parse failure: Raise `ValueError`
**Validation**:
- "numpy" → ("numpy", None)
- "numpy==1.26.0" → ("numpy", "==1.26.0")
- "pandas>=2.0.0" → ("pandas", ">=2.0.0")
- Invalid format → ValueError
#### Task 2.3: Condition Evaluation Method (1.5 hours)
**Implementation**:
- Implement `_evaluate_condition(condition, package_name, installed_packages)`
**Handling by Condition Type**:
1. **condition is None**: Always return True
2. **"installed" type**:
- `target_package = condition.get("package", package_name)`
- Check version with `installed_packages.get(target_package)`
- If spec exists, compare using `packaging.specifiers.SpecifierSet`
- If no spec, only check installation status
3. **"platform" type**:
- `os` condition: Compare with `platform.system()`
- `has_gpu` condition: Check `torch.cuda.is_available()` (False if torch unavailable)
- `comfyui_version` condition: TODO (currently warning)
**Exception Handling**:
- Version comparison failure: Warning log + return False
- Unknown condition type: Warning log + return False
**Validation**:
- Write test cases for each condition type
- Verify edge case handling (torch not installed, invalid version format, etc.)
---
### Phase 3: Core Installation Logic Implementation (4-5 hours)
#### Task 3.1: install() Method - Basic Flow (2 hours)
**Implementation**:
1. Parse package spec (`_parse_package_spec`)
2. Query installed package cache (`_get_installed_packages`)
3. If `override_policy=True`, install directly and return
4. Call `get_pip_policy()` to load policy
5. Default installation if no policy exists
**Validation**:
- Verify policy ignored when override_policy=True
- Verify default installation for packages without policy
#### Task 3.2: install() Method - apply_first_match Policy (1 hour)
**Implementation**:
- Iterate through policy list top-to-bottom
- Evaluate each policy's condition (`_evaluate_condition`)
- When condition satisfied:
- **skip**: Log reason and return False
- **force_version**: Force version change
- **replace**: Replace package
- Apply only first match (break)
**Validation**:
- Verify installation blocked by skip policy
- Verify version changed by force_version
- Verify package replaced by replace
#### Task 3.3: install() Method - apply_all_matches Policy (1 hour)
**Implementation**:
- Iterate through policy list top-to-bottom
- Evaluate each policy's condition
- Apply all condition-satisfying policies:
- **pin_dependencies**: Pin to installed version
- **install_with**: Add to additional package list
- **warn**: Output warning log
**Validation**:
- Verify multiple policies applied simultaneously
- Verify version pinning by pin_dependencies
- Verify additional package installation by install_with
#### Task 3.4: install() Method - Installation Execution and Retry Logic (1 hour)
**Implementation**:
1. Compose final package list
2. Generate command using `manager_util.make_pip_cmd()`
3. Handle `extra_index_url`
4. Execute installation via subprocess
5. Handle failure based on on_failure setting:
- `retry_without_pin`: Retry without pins
- `fail`: Raise exception
- Other: Warning log
6. Invalidate cache on success
**Validation**:
- Verify normal installation
- Verify retry logic on pin failure
- Verify error handling
---
### Phase 4: Batch Operation Methods Implementation (2-3 hours)
#### Task 4.1: ensure_not_installed() Implementation (1.5 hours)
**Implementation**:
1. Call `get_pip_policy()`
2. Iterate through all package policies
3. Check each package's uninstall policy
4. When condition satisfied:
- Check if target package is installed
- If installed, execute `pip uninstall -y {target}`
- Remove from cache
- Add to removal list
5. Execute only first match (per package)
6. Return list of removed packages
**Exception Handling**:
- Individual package removal failure: Warning log + continue
**Validation**:
- Verify package removal by uninstall policy
- Verify batch removal of multiple packages
- Verify continued processing of other packages even on removal failure
#### Task 4.2: ensure_installed() Implementation (1.5 hours)
**Implementation**:
1. Call `get_pip_policy()`
2. Iterate through all package policies
3. Check each package's restore policy
4. When condition satisfied:
- Check target package's current version
- If absent or different version:
- Execute `pip install {target}=={version}`
- Add extra_index_url if present
- Update cache
- Add to restoration list
5. Execute only first match (per package)
6. Return list of restored packages
**Exception Handling**:
- Individual package installation failure: Warning log + continue
**Validation**:
- Verify package restoration by restore policy
- Verify reinstallation on version mismatch
- Verify continued processing of other packages even on restoration failure
---
## 4. Testing Strategy
### 4.1 Unit Tests
#### Policy Loading Tests
```python
def test_get_pip_policy_empty():
"""Returns empty dictionary when policy files don't exist"""
def test_get_pip_policy_merge():
"""Correctly merges base and user policies"""
def test_get_pip_policy_cache():
"""Uses cache on second call"""
```
#### Package Parsing Tests
```python
def test_parse_package_spec_simple():
"""'numpy' → ('numpy', None)"""
def test_parse_package_spec_version():
"""'numpy==1.26.0' → ('numpy', '==1.26.0')"""
def test_parse_package_spec_range():
"""'pandas>=2.0.0' → ('pandas', '>=2.0.0')"""
def test_parse_package_spec_invalid():
"""Invalid format → ValueError"""
```
#### Condition Evaluation Tests
```python
def test_evaluate_condition_none():
"""None condition → True"""
def test_evaluate_condition_installed():
"""Evaluates installed package condition"""
def test_evaluate_condition_platform():
"""Evaluates platform condition"""
```
### 4.2 Integration Tests
#### Installation Policy Tests
```python
def test_install_with_skip_policy():
"""Blocks installation with skip policy"""
def test_install_with_force_version():
"""Changes version with force_version policy"""
def test_install_with_replace():
"""Replaces package with replace policy"""
def test_install_with_pin_dependencies():
"""Pins versions with pin_dependencies"""
```
#### Batch Operation Tests
```python
def test_ensure_not_installed():
"""Removes packages with uninstall policy"""
def test_ensure_installed():
"""Restores packages with restore policy"""
def test_batch_workflow():
"""Tests complete batch workflow"""
```
### 4.3 Edge Case Tests
```python
def test_install_without_policy():
"""Default installation for packages without policy"""
def test_install_override_policy():
"""Ignores policy with override_policy=True"""
def test_pip_freeze_failure():
"""Handles empty cache on pip freeze failure"""
def test_json_parse_error():
"""Handles malformed JSON files"""
def test_subprocess_failure():
"""Exception handling when pip command fails"""
```
---
## 5. Error Handling Strategy
### 5.1 Policy Loading Errors
- **File not found**: Warning log + empty dictionary
- **JSON parse failure**: Error log + empty dictionary
- **No read permission**: Warning log + empty dictionary
### 5.2 Package Installation Errors
- **pip command failure**: Depends on on_failure setting
- `retry_without_pin`: Retry
- `fail`: Raise exception
- Other: Warning log
- **Invalid package spec**: Raise ValueError
### 5.3 Batch Operation Errors
- **Individual package failure**: Warning log + continue to next package
- **pip freeze failure**: Empty dictionary + warning log
---
## 6. Performance Optimization
### 6.1 Caching Strategy
- **Policy cache**: Reused program-wide via global variable
- **pip freeze cache**: Reused per batch, invalidated after install/remove
- **lazy loading**: Load only when needed
### 6.2 Parallel Processing Considerations
- Current implementation is not thread-safe
- Consider adding threading.Lock if needed
- Batch operations execute sequentially only
---
## 7. Documentation Requirements
### 7.1 Code Documentation
- Docstrings required for all public methods
- Specify parameters, return values, and exceptions
- Include usage examples
### 7.2 User Guide
- Explain `pip-policy.json` structure
- Policy writing examples
- Usage pattern examples
### 7.3 Developer Guide
- Architecture explanation
- Extension methods
- Test execution methods
---
## 8. Deployment Checklist
### 8.1 Code Quality
- [ ] All unit tests pass
- [ ] All integration tests pass
- [ ] Code coverage ≥80%
- [ ] No linting errors (flake8, pylint)
- [ ] Type hints complete (mypy passes)
### 8.2 Documentation
- [ ] README.md written
- [ ] API documentation generated
- [ ] Example policy files written
- [ ] Usage guide written
### 8.3 Performance Verification
- [ ] Policy loading performance measured (<100ms)
- [ ] pip freeze caching effectiveness verified (≥50% speed improvement)
- [ ] Memory usage confirmed (<10MB)
### 8.4 Security Verification
- [ ] Input validation complete
- [ ] Path traversal prevention
- [ ] Command injection prevention
- [ ] JSON parsing safety confirmed
---
## 9. Future Improvements
### 9.1 Short-term (1-2 weeks)
- Implement ComfyUI version check
- Implement user confirmation prompt (allow_continue=false)
- Thread-safe improvements (add Lock)
### 9.2 Mid-term (1-2 months)
- Add policy validation tools
- Policy migration tools
- More detailed logging and debugging options
### 9.3 Long-term (3-6 months)
- Web UI for policy management
- Provide policy templates
- Community policy sharing system
---
## 10. Risks and Mitigation Strategies
### Risk 1: Policy Conflicts
**Description**: Policies for different packages may conflict
**Mitigation**: Develop policy validation tools, conflict detection algorithm
### Risk 2: pip Version Compatibility
**Description**: Must work across various pip versions
**Mitigation**: Test on multiple pip versions, version-specific branching
### Risk 3: Performance Degradation
**Description**: Installation speed may decrease due to policy evaluation
**Mitigation**: Optimize caching, minimize condition evaluation
### Risk 4: Policy Misconfiguration
**Description**: Users may write incorrect policies
**Mitigation**: JSON schema validation, provide examples and guides
---
## 11. Timeline
### Week 1
- Phase 1: Core Infrastructure Setup (Day 1-2)
- Phase 2: Caching and Utility Methods (Day 3-4)
- Write unit tests (Day 5)
### Week 2
- Phase 3: Core Installation Logic Implementation (Day 1-3)
- Phase 4: Batch Operation Methods Implementation (Day 4-5)
### Week 3
- Integration and edge case testing (Day 1-2)
- Documentation (Day 3)
- Code review and refactoring (Day 4-5)
### Week 4
- Performance optimization (Day 1-2)
- Security verification (Day 3)
- Final testing and deployment preparation (Day 4-5)
---
## 12. Success Criteria
### Feature Completeness
- ✅ All policy types (uninstall, apply_first_match, apply_all_matches, restore) work correctly
- ✅ Policy merge logic works correctly
- ✅ Batch operations perform normally
### Quality Metrics
- ✅ Test coverage ≥80%
- ✅ All tests pass
- ✅ 0 linting errors
- ✅ 100% type hint completion
### Performance Metrics
- ✅ Policy loading <100ms
- ✅ ≥50% performance improvement with pip freeze caching
- ✅ Memory usage <10MB
### Usability
- ✅ Clear error messages
- ✅ Sufficient documentation
- ✅ Verified in real-world use cases

View File

@@ -0,0 +1,629 @@
"""
pip_util - Policy-based pip package management system
This module provides a policy-based approach to pip package installation
to minimize dependency conflicts and protect existing installed packages.
Usage:
# Batch operations (policy auto-loaded)
with PipBatch() as batch:
batch.ensure_not_installed()
batch.install("numpy>=1.20")
batch.install("pandas>=2.0")
batch.install("scipy>=1.7")
batch.ensure_installed()
"""
import json
import logging
import platform
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from . import manager_util, context
logger = logging.getLogger(__name__)
# Global policy cache (lazy loaded on first access)
_pip_policy_cache: Optional[Dict] = None
def get_pip_policy() -> Dict:
"""
Get pip policy with lazy loading.
Returns the cached policy if available, otherwise loads it from files.
This function automatically loads the policy on first access.
Thread safety: This function is NOT thread-safe.
Ensure single-threaded access during initialization.
Returns:
Dictionary of merged pip policies
Example:
>>> policy = get_pip_policy()
>>> numpy_policy = policy.get("numpy", {})
"""
global _pip_policy_cache
# Return cached policy if already loaded
if _pip_policy_cache is not None:
logger.debug("Returning cached pip policy")
return _pip_policy_cache
logger.info("Loading pip policies...")
# Load base policy
base_policy = {}
base_policy_path = Path(manager_util.comfyui_manager_path) / "pip-policy.json"
try:
if base_policy_path.exists():
with open(base_policy_path, 'r', encoding='utf-8') as f:
base_policy = json.load(f)
logger.debug(f"Loaded base policy from {base_policy_path}")
else:
logger.warning(f"Base policy file not found: {base_policy_path}")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse base policy JSON: {e}")
base_policy = {}
except Exception as e:
logger.warning(f"Failed to read base policy file: {e}")
base_policy = {}
# Load user policy
user_policy = {}
user_policy_path = Path(context.manager_files_path) / "pip-policy.user.json"
try:
if user_policy_path.exists():
with open(user_policy_path, 'r', encoding='utf-8') as f:
user_policy = json.load(f)
logger.debug(f"Loaded user policy from {user_policy_path}")
else:
# Create empty user policy file
user_policy_path.parent.mkdir(parents=True, exist_ok=True)
with open(user_policy_path, 'w', encoding='utf-8') as f:
json.dump({"_comment": "User-specific pip policy overrides"}, f, indent=2)
logger.info(f"Created empty user policy file: {user_policy_path}")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse user policy JSON: {e}")
user_policy = {}
except Exception as e:
logger.warning(f"Failed to read user policy file: {e}")
user_policy = {}
# Merge policies (package-level override: user completely replaces base per package)
merged_policy = base_policy.copy()
for package_name, package_policy in user_policy.items():
if package_name.startswith("_"): # Skip metadata fields like _comment
continue
merged_policy[package_name] = package_policy # Complete package replacement
# Store in global cache
_pip_policy_cache = merged_policy
logger.info(f"Policy loaded successfully: {len(_pip_policy_cache)} package policies")
return _pip_policy_cache
class PipBatch:
"""
Pip package installation batch manager.
Maintains pip freeze cache during a batch of operations for performance optimization.
Usage pattern:
# Batch operations (policy auto-loaded)
with PipBatch() as batch:
batch.ensure_not_installed()
batch.install("numpy>=1.20")
batch.install("pandas>=2.0")
batch.install("scipy>=1.7")
batch.ensure_installed()
Attributes:
_installed_cache: Cache of installed packages from pip freeze
"""
def __init__(self):
"""Initialize PipBatch with empty cache."""
self._installed_cache: Optional[Dict[str, str]] = None
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and clear cache."""
self._installed_cache = None
return False
def _refresh_installed_cache(self) -> None:
"""
Refresh the installed packages cache by executing pip freeze.
Parses pip freeze output into a dictionary of {package_name: version}.
Ignores editable packages and comments.
Raises:
No exceptions raised - failures result in empty cache with warning log
"""
try:
cmd = manager_util.make_pip_cmd(["freeze"])
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
packages = {}
for line in result.stdout.strip().split('\n'):
line = line.strip()
# Skip empty lines
if not line:
continue
# Skip editable packages (-e /path/to/package or -e git+https://...)
# Editable packages don't have version info and are typically development-only
if line.startswith('-e '):
continue
# Skip comments (defensive: pip freeze typically doesn't output comments,
# but this handles manually edited requirements.txt or future pip changes)
if line.startswith('#'):
continue
# Parse package==version
if '==' in line:
try:
package_name, version = line.split('==', 1)
packages[package_name.strip()] = version.strip()
except ValueError:
logger.warning(f"Failed to parse pip freeze line: {line}")
continue
self._installed_cache = packages
logger.debug(f"Refreshed installed packages cache: {len(packages)} packages")
except subprocess.CalledProcessError as e:
logger.warning(f"pip freeze failed: {e}")
self._installed_cache = {}
except Exception as e:
logger.warning(f"Failed to refresh installed packages cache: {e}")
self._installed_cache = {}
def _get_installed_packages(self) -> Dict[str, str]:
"""
Get cached installed packages, refresh if cache is None.
Returns:
Dictionary of {package_name: version}
"""
if self._installed_cache is None:
self._refresh_installed_cache()
return self._installed_cache
def _invalidate_cache(self) -> None:
"""
Invalidate the installed packages cache.
Should be called after install/uninstall operations.
"""
self._installed_cache = None
def _parse_package_spec(self, package_info: str) -> Tuple[str, Optional[str]]:
"""
Parse package spec string into package name and version spec using PEP 508.
Uses the packaging library to properly parse package specifications according to
PEP 508 standard, which handles complex cases like extras and multiple version
constraints that simple regex cannot handle correctly.
Args:
package_info: Package specification like "numpy", "numpy==1.26.0", "numpy>=1.20.0",
or complex specs like "package[extra]>=1.0,<2.0"
Returns:
Tuple of (package_name, version_spec)
Examples: ("numpy", "==1.26.0"), ("pandas", ">=2.0.0"), ("scipy", None)
Package names are normalized (e.g., "NumPy" -> "numpy")
Raises:
ValueError: If package_info cannot be parsed according to PEP 508
Example:
>>> batch._parse_package_spec("numpy>=1.20")
("numpy", ">=1.20")
>>> batch._parse_package_spec("requests[security]>=2.0,<3.0")
("requests", ">=2.0,<3.0")
"""
try:
req = Requirement(package_info)
package_name = req.name # Normalized package name
version_spec = str(req.specifier) if req.specifier else None
return package_name, version_spec
except Exception as e:
raise ValueError(f"Invalid package spec: {package_info}") from e
def _evaluate_condition(self, condition: Optional[Dict], package_name: str,
installed_packages: Dict[str, str]) -> bool:
"""
Evaluate policy condition and return whether it's satisfied.
Args:
condition: Policy condition object (dict) or None
package_name: Current package being processed
installed_packages: Dictionary of {package_name: version}
Returns:
True if condition is satisfied, False otherwise
None condition always returns True
Example:
>>> condition = {"type": "installed", "package": "numpy", "spec": ">=1.20"}
>>> batch._evaluate_condition(condition, "numba", {"numpy": "1.26.0"})
True
"""
# No condition means always satisfied
if condition is None:
return True
condition_type = condition.get("type")
if condition_type == "installed":
# Check if a package is installed with optional version spec
target_package = condition.get("package", package_name)
installed_version = installed_packages.get(target_package)
# Package not installed
if installed_version is None:
return False
# Check version spec if provided
spec = condition.get("spec")
if spec:
try:
specifier = SpecifierSet(spec)
return Version(installed_version) in specifier
except Exception as e:
logger.warning(f"Failed to compare version {installed_version} with spec {spec}: {e}")
return False
# Package is installed (no spec check)
return True
elif condition_type == "platform":
# Check platform conditions (os, has_gpu, comfyui_version)
conditions_met = True
# Check OS
if "os" in condition:
expected_os = condition["os"].lower()
actual_os = platform.system().lower()
if expected_os not in actual_os and actual_os not in expected_os:
conditions_met = False
# Check GPU availability
if "has_gpu" in condition:
expected_gpu = condition["has_gpu"]
try:
import torch
has_gpu = torch.cuda.is_available()
except ImportError:
has_gpu = False
if expected_gpu != has_gpu:
conditions_met = False
# Check ComfyUI version
if "comfyui_version" in condition:
# TODO: Implement ComfyUI version check
logger.warning("ComfyUI version condition not yet implemented")
return conditions_met
else:
logger.warning(f"Unknown condition type: {condition_type}")
return False
def install(self, package_info: str, extra_index_url: Optional[str] = None,
override_policy: bool = False) -> bool:
"""
Install a pip package with policy-based modifications.
Args:
package_info: Package specification (e.g., "numpy", "numpy==1.26.0", "numpy>=1.20.0")
extra_index_url: Additional package repository URL (optional)
override_policy: If True, skip policy application and install directly (default: False)
Returns:
True if installation succeeded, False if skipped by policy
Raises:
ValueError: If package_info cannot be parsed
subprocess.CalledProcessError: If installation fails (depending on policy on_failure settings)
Example:
>>> with PipBatch() as batch:
... batch.install("numpy>=1.20")
... batch.install("torch", override_policy=True)
"""
# Parse package spec
try:
package_name, version_spec = self._parse_package_spec(package_info)
except ValueError as e:
logger.error(f"Invalid package spec: {e}")
raise
# Get installed packages cache
installed_packages = self._get_installed_packages()
# Override policy - skip to direct installation
if override_policy:
logger.info(f"Installing {package_info} (policy override)")
cmd = manager_util.make_pip_cmd(["install", package_info])
if extra_index_url:
cmd.extend(["--extra-index-url", extra_index_url])
try:
subprocess.run(cmd, check=True)
self._invalidate_cache()
logger.info(f"Successfully installed {package_info}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to install {package_info}: {e}")
raise
# Get policy (lazy loading)
pip_policy = get_pip_policy()
policy = pip_policy.get(package_name, {})
# If no policy, proceed with default installation
if not policy:
logger.debug(f"No policy found for {package_name}, proceeding with default installation")
cmd = manager_util.make_pip_cmd(["install", package_info])
if extra_index_url:
cmd.extend(["--extra-index-url", extra_index_url])
try:
subprocess.run(cmd, check=True)
self._invalidate_cache()
logger.info(f"Successfully installed {package_info}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to install {package_info}: {e}")
raise
# Apply apply_first_match policies (exclusive - first match only)
final_package_info = package_info
final_extra_index_url = extra_index_url
policy_reason = None
apply_first_match = policy.get("apply_first_match", [])
for policy_item in apply_first_match:
condition = policy_item.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
policy_type = policy_item.get("type")
if policy_type == "skip":
reason = policy_item.get("reason", "No reason provided")
logger.info(f"Skipping installation of {package_name}: {reason}")
return False
elif policy_type == "force_version":
forced_version = policy_item.get("version")
final_package_info = f"{package_name}=={forced_version}"
policy_reason = policy_item.get("reason")
if "extra_index_url" in policy_item:
final_extra_index_url = policy_item["extra_index_url"]
logger.info(f"Force version for {package_name}: {forced_version} ({policy_reason})")
break # First match only
elif policy_type == "replace":
replacement = policy_item.get("replacement")
replacement_version = policy_item.get("version", "")
if replacement_version:
final_package_info = f"{replacement}{replacement_version}"
else:
final_package_info = replacement
policy_reason = policy_item.get("reason")
if "extra_index_url" in policy_item:
final_extra_index_url = policy_item["extra_index_url"]
logger.info(f"Replacing {package_name} with {final_package_info}: {policy_reason}")
break # First match only
# Apply apply_all_matches policies (cumulative - all matches)
additional_packages = []
pinned_packages = []
pin_on_failure = "fail"
apply_all_matches = policy.get("apply_all_matches", [])
for policy_item in apply_all_matches:
condition = policy_item.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
policy_type = policy_item.get("type")
if policy_type == "pin_dependencies":
pin_list = policy_item.get("pinned_packages", [])
for pkg in pin_list:
installed_version = installed_packages.get(pkg)
if installed_version:
pinned_packages.append(f"{pkg}=={installed_version}")
else:
logger.warning(f"Cannot pin {pkg}: not currently installed")
pin_on_failure = policy_item.get("on_failure", "fail")
reason = policy_item.get("reason", "")
logger.info(f"Pinning dependencies: {pinned_packages} ({reason})")
elif policy_type == "install_with":
additional = policy_item.get("additional_packages", [])
additional_packages.extend(additional)
reason = policy_item.get("reason", "")
logger.info(f"Installing additional packages: {additional} ({reason})")
elif policy_type == "warn":
message = policy_item.get("message", "")
allow_continue = policy_item.get("allow_continue", True)
logger.warning(f"Policy warning for {package_name}: {message}")
if not allow_continue:
# TODO: Implement user confirmation
logger.info("User confirmation required (not implemented, continuing)")
# Build final package list
packages_to_install = [final_package_info] + pinned_packages + additional_packages
# Execute installation
cmd = manager_util.make_pip_cmd(["install"] + packages_to_install)
if final_extra_index_url:
cmd.extend(["--extra-index-url", final_extra_index_url])
try:
subprocess.run(cmd, check=True)
self._invalidate_cache()
if policy_reason:
logger.info(f"Successfully installed {final_package_info}: {policy_reason}")
else:
logger.info(f"Successfully installed {final_package_info}")
return True
except subprocess.CalledProcessError as e:
# Handle installation failure
if pinned_packages and pin_on_failure == "retry_without_pin":
logger.warning(f"Installation failed with pinned dependencies, retrying without pins")
retry_cmd = manager_util.make_pip_cmd(["install", final_package_info])
if final_extra_index_url:
retry_cmd.extend(["--extra-index-url", final_extra_index_url])
try:
subprocess.run(retry_cmd, check=True)
self._invalidate_cache()
logger.info(f"Successfully installed {final_package_info} (without pins)")
return True
except subprocess.CalledProcessError as retry_error:
logger.error(f"Retry installation also failed: {retry_error}")
raise
elif pin_on_failure == "fail":
logger.error(f"Installation failed: {e}")
raise
else:
logger.warning(f"Installation failed, but continuing: {e}")
return False
def ensure_not_installed(self) -> List[str]:
"""
Remove all packages matching uninstall policies (batch processing).
Iterates through all package policies and executes uninstall actions
where conditions are satisfied.
Returns:
List of removed package names
Example:
>>> with PipBatch() as batch:
... removed = batch.ensure_not_installed()
... print(f"Removed: {removed}")
"""
# Get policy (lazy loading)
pip_policy = get_pip_policy()
installed_packages = self._get_installed_packages()
removed_packages = []
for package_name, policy in pip_policy.items():
uninstall_policies = policy.get("uninstall", [])
for uninstall_policy in uninstall_policies:
condition = uninstall_policy.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
target = uninstall_policy.get("target")
reason = uninstall_policy.get("reason", "No reason provided")
# Check if target is installed
if target in installed_packages:
try:
cmd = manager_util.make_pip_cmd(["uninstall", "-y", target])
subprocess.run(cmd, check=True)
logger.info(f"Uninstalled {target}: {reason}")
removed_packages.append(target)
# Remove from cache
del installed_packages[target]
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to uninstall {target}: {e}")
# First match only per package
break
return removed_packages
def ensure_installed(self) -> List[str]:
"""
Restore all packages matching restore policies (batch processing).
Iterates through all package policies and executes restore actions
where conditions are satisfied.
Returns:
List of restored package names
Example:
>>> with PipBatch() as batch:
... batch.install("numpy>=1.20")
... restored = batch.ensure_installed()
... print(f"Restored: {restored}")
"""
# Get policy (lazy loading)
pip_policy = get_pip_policy()
installed_packages = self._get_installed_packages()
restored_packages = []
for package_name, policy in pip_policy.items():
restore_policies = policy.get("restore", [])
for restore_policy in restore_policies:
condition = restore_policy.get("condition")
if self._evaluate_condition(condition, package_name, installed_packages):
target = restore_policy.get("target")
version = restore_policy.get("version")
reason = restore_policy.get("reason", "No reason provided")
extra_index_url = restore_policy.get("extra_index_url")
# Check if target needs restoration
current_version = installed_packages.get(target)
if current_version is None or current_version != version:
try:
package_spec = f"{target}=={version}"
cmd = manager_util.make_pip_cmd(["install", package_spec])
if extra_index_url:
cmd.extend(["--extra-index-url", extra_index_url])
subprocess.run(cmd, check=True)
logger.info(f"Restored {package_spec}: {reason}")
restored_packages.append(target)
# Update cache
installed_packages[target] = version
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to restore {target}: {e}")
# First match only per package
break
return restored_packages

View File

File diff suppressed because it is too large Load Diff