@@ -25,6 +25,7 @@ Designed to support desktop, mobile and multi-screen devices.
|
||||
- Press the "copy" button to copy a model to ComfyUI's clipboard or copy the embedding to the system clipboard. (Copying the embedding to the system clipboard requires a secure http connection.)
|
||||
- Press the "add" button to add the model to the ComfyUI graph or append the embedding to one or more selected nodes.
|
||||
- Press the "load workflow" button to try and load a workflow embedded in a model's preview image.
|
||||
- Press the "open model url" button to try and search the web and open a model's webpage.
|
||||
|
||||
### Download Tab
|
||||
|
||||
@@ -55,6 +56,7 @@ Designed to support desktop, mobile and multi-screen devices.
|
||||
- Read, edit and save notes. (Saved as a `.txt` file beside the model).
|
||||
- `Ctrl+s` or `⌘+S` to save a note when the textarea is in focus.
|
||||
- Autosave can be enabled in settings. (Note: Once the model info view is closed, the undo history is lost.)
|
||||
- Automatically search the web for model info and save as notes with a single button.
|
||||
- Change or remove a model's preview image.
|
||||
- View training tags and use the random tag generator to generate prompt ideas. (Inspired by the one in A1111.)
|
||||
|
||||
|
||||
633
__init__.py
633
__init__.py
@@ -8,6 +8,8 @@ import copy
|
||||
import importlib
|
||||
import re
|
||||
import base64
|
||||
import hashlib
|
||||
import markdownify
|
||||
|
||||
from aiohttp import web
|
||||
import server
|
||||
@@ -23,7 +25,7 @@ import folder_paths
|
||||
|
||||
comfyui_model_uri = folder_paths.models_dir
|
||||
|
||||
extension_uri = os.path.dirname(__file__)
|
||||
extension_uri = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
config_loader_path = os.path.join(extension_uri, 'config_loader.py')
|
||||
config_loader_spec = importlib.util.spec_from_file_location('config_loader', config_loader_path)
|
||||
@@ -58,7 +60,8 @@ preview_extensions = ( # TODO: JavaScript does not know about this (x2 states)
|
||||
image_extensions + # order matters
|
||||
stable_diffusion_webui_civitai_helper_image_extensions
|
||||
)
|
||||
model_info_extension = ".txt"
|
||||
model_notes_extension = ".txt"
|
||||
model_info_extension = ".json"
|
||||
#video_extensions = (".avi", ".mp4", ".webm") # TODO: Requires ffmpeg or cv2. Cache preview frame?
|
||||
|
||||
def split_valid_ext(s, *arg_exts):
|
||||
@@ -71,6 +74,7 @@ def split_valid_ext(s, *arg_exts):
|
||||
|
||||
_folder_names_and_paths = None # dict[str, tuple[list[str], list[str]]]
|
||||
def folder_paths_folder_names_and_paths(refresh = False):
|
||||
# TODO: "diffusers" extension whitelist is ["folder"]
|
||||
global _folder_names_and_paths
|
||||
if refresh or _folder_names_and_paths is None:
|
||||
_folder_names_and_paths = {}
|
||||
@@ -189,6 +193,7 @@ def ui_rules():
|
||||
Rule("model-show-add-button", True, bool),
|
||||
Rule("model-show-copy-button", True, bool),
|
||||
Rule("model-show-load-workflow-button", True, bool),
|
||||
Rule("model-show-open-model-url-button", False, bool),
|
||||
Rule("model-info-button-on-left", False, bool),
|
||||
Rule("model-buttons-only-on-hover", False, bool),
|
||||
|
||||
@@ -232,6 +237,7 @@ def get_def_headers(url=""):
|
||||
if url.startswith("https://civitai.com/"):
|
||||
api_key = server_settings["civitai_api_key"]
|
||||
if (api_key != ""):
|
||||
def_headers["Content-Type"] = f"application/json"
|
||||
def_headers["Authorization"] = f"Bearer {api_key}"
|
||||
url += "&" if "?" in url else "?" # not the most robust solution
|
||||
url += f"token={api_key}" # TODO: Authorization didn't work in the header
|
||||
@@ -243,6 +249,246 @@ def get_def_headers(url=""):
|
||||
return def_headers
|
||||
|
||||
|
||||
def save_web_url(path, url):
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(f"[InternetShortcut]\nURL={url}\n")
|
||||
|
||||
|
||||
def try_load_web_url(path):
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
if f.readline() != "[InternetShortcut]\n": return ""
|
||||
url = f.readline()
|
||||
if not url.startswith("URL="): return ""
|
||||
if not url.endswith("\n"): return ""
|
||||
return url[4:len(url)-1]
|
||||
|
||||
|
||||
def hash_file(path, buffer_size=1024*1024):
|
||||
sha256 = hashlib.sha256()
|
||||
with open(path, 'rb') as f:
|
||||
while True:
|
||||
data = f.read(buffer_size)
|
||||
if not data: break
|
||||
sha256.update(data)
|
||||
return sha256.hexdigest()
|
||||
|
||||
|
||||
class Civitai:
|
||||
IMAGE_URL_SUBDIRECTORY_PREFIX = "https://civitai.com/images/"
|
||||
IMAGE_URL_DOMAIN_PREFIX = "'https://image.civitai.com/"
|
||||
|
||||
@staticmethod
|
||||
def image_subdirectory_url_to_image_url(image_url):
|
||||
url_suffix = image_url[len(Civitai.IMAGE_URL_SUBDIRECTORY_PREFIX):]
|
||||
image_id = re.search(r"^\d+", url_suffix).group(0)
|
||||
image_id = str(int(image_id))
|
||||
image_info_url = f"https://civitai.com/api/v1/images?imageId={image_id}"
|
||||
def_headers = get_def_headers(image_info_url)
|
||||
response = requests.get(
|
||||
url=image_info_url,
|
||||
stream=False,
|
||||
verify=False,
|
||||
headers=def_headers,
|
||||
proxies=None,
|
||||
allow_redirects=False,
|
||||
)
|
||||
if response.ok:
|
||||
#content_type = response.headers.get("Content-Type")
|
||||
info = response.json()
|
||||
items = info["items"]
|
||||
if len(items) == 0:
|
||||
raise RuntimeError("Civitai /api/v1/images returned 0 items!")
|
||||
return items[0]["url"]
|
||||
else:
|
||||
raise RuntimeError("Bad response from api/v1/images!")
|
||||
|
||||
@staticmethod
|
||||
def image_domain_url_full_size(url, width = None):
|
||||
result = re.search("/width=(\d+)", url)
|
||||
if width is None:
|
||||
i0 = result.span()[0]
|
||||
i1 = result.span()[1]
|
||||
return url[0:i0] + url[i1:]
|
||||
else:
|
||||
w = int(result.group(1))
|
||||
return url.replace(str(w), str(width))
|
||||
|
||||
@staticmethod
|
||||
def search_by_hash(sha256_hash):
|
||||
url_api_hash = r"https://civitai.com/api/v1/model-versions/by-hash/" + sha256_hash
|
||||
hash_response = requests.get(url_api_hash)
|
||||
if hash_response.status_code != 200:
|
||||
return {}
|
||||
return hash_response.json() # model version info
|
||||
|
||||
@staticmethod
|
||||
def search_by_model_id(model_id):
|
||||
url_api_model = r"https://civitai.com/api/v1/models/" + str(model_id)
|
||||
model_response = requests.get(url_api_model)
|
||||
if model_response.status_code != 200:
|
||||
return {}
|
||||
return model_response.json() # model group info
|
||||
|
||||
@staticmethod
|
||||
def get_model_url(model_version_info):
|
||||
if len(model_version_info) == 0: return ""
|
||||
model_id = model_version_info.get("modelId")
|
||||
if model_id is None:
|
||||
# there can be incomplete model info, so don't throw just in case
|
||||
return ""
|
||||
url = f"https://civitai.com/models/{model_id}"
|
||||
version_id = model_version_info.get("id")
|
||||
if version_id is not None:
|
||||
url += f"?modelVersionId={version_id}"
|
||||
return url
|
||||
|
||||
@staticmethod
|
||||
def get_preview_urls(model_version_info, full_size=False):
|
||||
images = model_version_info.get("images", None)
|
||||
if images is None:
|
||||
return []
|
||||
preview_urls = []
|
||||
for image_info in images:
|
||||
url = image_info["url"]
|
||||
if full_size:
|
||||
url = Civitai.image_domain_url_full_size(url, image_info.get("width", None))
|
||||
preview_urls.append(url)
|
||||
return preview_urls
|
||||
|
||||
@staticmethod
|
||||
def search_notes(model_version_info):
|
||||
if len(model_version_info) == 0:
|
||||
return ""
|
||||
model_name = None
|
||||
if "modelId" in model_version_info and "id" in model_version_info:
|
||||
model_id = model_version_info.get("modelId")
|
||||
model_version_id = model_version_info.get("id")
|
||||
|
||||
model_version_description = ""
|
||||
model_trigger_words = []
|
||||
model_info = Civitai.search_by_model_id(model_id)
|
||||
if len(model_info) == 0: # can happen if model download is disabled
|
||||
print("Model Manager WARNING: Unable to find Civitai 'modelId' " + str(model_id) + ". Try deleting .json file and trying again later!")
|
||||
return ""
|
||||
model_name = model_info.get("name")
|
||||
model_description = model_info.get("description")
|
||||
for model_version in model_info["modelVersions"]:
|
||||
if model_version["id"] == model_version_id:
|
||||
model_version_description = model_version.get("description")
|
||||
model_trigger_words = model_version.get("trainedWords")
|
||||
break
|
||||
elif "description" in model_version_info and "activation text" in model_version_info and "notes" in model_version_info:
|
||||
# {'description': str, 'sd version': str, 'activation text': str, 'preferred weight': int, 'notes': str}
|
||||
model_description = model_version_info.get("description")
|
||||
activation_text = model_version_info.get("activation text")
|
||||
if activation_text != "":
|
||||
model_trigger_words = [activation_text]
|
||||
else:
|
||||
model_trigger_words = []
|
||||
model_version_description = model_version_info.get("notes")
|
||||
else:
|
||||
return ""
|
||||
model_description = model_description if model_description is not None else ""
|
||||
model_trigger_words = model_trigger_words if model_trigger_words is not None else []
|
||||
model_version_description = model_version_description if model_version_description is not None else ""
|
||||
model_name = model_name if model_name is not None else "Model Description"
|
||||
|
||||
notes = ""
|
||||
if len(model_trigger_words) > 0:
|
||||
notes += "# Trigger Words\n\n"
|
||||
model_trigger_words = [re.sub(",$", "", s.strip()) for s in model_trigger_words]
|
||||
join_separator = ', '
|
||||
for s in model_trigger_words:
|
||||
if ',' in s:
|
||||
join_separator = '\n'
|
||||
break
|
||||
if join_separator == '\n':
|
||||
model_trigger_words = ["* " + s for s in model_trigger_words]
|
||||
notes += join_separator.join(model_trigger_words)
|
||||
if model_version_description != "":
|
||||
if len(notes) > 0: notes += "\n\n"
|
||||
notes += "# About this version\n\n"
|
||||
notes += markdownify.markdownify(model_version_description)
|
||||
if model_description != "":
|
||||
if len(notes) > 0: notes += "\n\n"
|
||||
notes += "# " + model_name + "\n\n"
|
||||
notes += markdownify.markdownify(model_description)
|
||||
return notes.strip()
|
||||
|
||||
|
||||
class ModelInfo:
|
||||
@staticmethod
|
||||
def search_by_hash(sha256_hash):
|
||||
model_info = Civitai.search_by_hash(sha256_hash)
|
||||
if len(model_info) > 0: return model_info
|
||||
# TODO: search other websites
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def try_load_cached(model_path):
|
||||
model_info_path = os.path.splitext(model_path)[0] + model_info_extension
|
||||
if os.path.isfile(model_info_path):
|
||||
with open(model_info_path, "r", encoding="utf-8") as f:
|
||||
model_info = json.load(f)
|
||||
return model_info
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def get_hash(model_info):
|
||||
model_info = Civitai.get_hash(model_info)
|
||||
if len(model_info) > 0: return model_info
|
||||
# TODO: search other websites
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def search_info(model_path, cache=True, use_cached=True):
|
||||
model_info = ModelInfo.try_load_cached(model_path)
|
||||
if use_cached and len(model_info) > 0:
|
||||
return model_info
|
||||
|
||||
sha256_hash = hash_file(model_path)
|
||||
model_info = ModelInfo.search_by_hash(sha256_hash)
|
||||
if cache and len(model_info) > 0:
|
||||
model_info_path = os.path.splitext(model_path)[0] + model_info_extension
|
||||
with open(model_info_path, "w", encoding="utf-8") as f:
|
||||
json.dump(model_info, f, indent=4)
|
||||
print("Saved file: " + model_info_path)
|
||||
|
||||
return model_info
|
||||
|
||||
@staticmethod
|
||||
def get_url(model_info):
|
||||
if len(model_info) == 0:
|
||||
return ""
|
||||
model_url = Civitai.get_model_url(model_info)
|
||||
if model_url != "":
|
||||
return model_url
|
||||
# TODO: huggingface has <user>/<model> formats
|
||||
# TODO: support other websites
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def search_notes(model_path):
|
||||
assert(os.path.isfile(model_path))
|
||||
model_info = ModelInfo.search_info(model_path, cache=True, use_cached=True) # assume cached is correct; re-download elsewhere
|
||||
if len(model_info) == 0:
|
||||
return ""
|
||||
notes = Civitai.search_notes(model_info)
|
||||
if len(notes) > 0 and not notes.isspace():
|
||||
return notes
|
||||
# TODO: search other websites
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def get_web_preview_urls(model_info, full_size=False):
|
||||
if len(model_info) == 0:
|
||||
return []
|
||||
preview_urls = Civitai.get_preview_urls(model_info, full_size)
|
||||
if len(preview_urls) > 0:
|
||||
return preview_urls
|
||||
# TODO: support other websites
|
||||
return []
|
||||
|
||||
@server.PromptServer.instance.routes.get("/model-manager/timestamp")
|
||||
async def get_timestamp(request):
|
||||
return web.json_response({ "timestamp": datetime.now().timestamp() })
|
||||
@@ -327,9 +573,12 @@ def get_auto_thumbnail_format(original_format):
|
||||
return "JPEG" # default fallback
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.get("/model-manager/preview/get")
|
||||
@server.PromptServer.instance.routes.get("/model-manager/preview/get/{uri}")
|
||||
async def get_model_preview(request):
|
||||
uri = request.query.get("uri")
|
||||
uri = request.match_info["uri"]
|
||||
if uri is None: # BUG: this should never happen
|
||||
print(f"Invalid uri! Request url: {request.url}")
|
||||
uri = "no-preview"
|
||||
quality = 75
|
||||
response_image_format = request.query.get("image-format", None)
|
||||
if isinstance(response_image_format, str):
|
||||
@@ -451,42 +700,16 @@ async def get_image_extensions(request):
|
||||
return web.json_response(image_extensions)
|
||||
|
||||
|
||||
def download_model_preview(formdata):
|
||||
path = formdata.get("path", None)
|
||||
if type(path) is not str:
|
||||
def download_model_preview(path, image, overwrite):
|
||||
if not os.path.isfile(path):
|
||||
raise ValueError("Invalid path!")
|
||||
path, model_type = search_path_to_system_path(path)
|
||||
model_type_extensions = folder_paths_get_supported_pt_extensions(model_type)
|
||||
path_without_extension, _ = split_valid_ext(path, model_type_extensions)
|
||||
path_without_extension = os.path.splitext(path)[0]
|
||||
|
||||
overwrite = formdata.get("overwrite", "true").lower()
|
||||
overwrite = True if overwrite == "true" else False
|
||||
|
||||
image = formdata.get("image", None)
|
||||
if type(image) is str:
|
||||
civitai_image_url = "https://civitai.com/images/"
|
||||
if image.startswith(civitai_image_url):
|
||||
image_id = re.search(r"^\d+", image[len(civitai_image_url):]).group(0)
|
||||
image_id = str(int(image_id))
|
||||
image_info_url = f"https://civitai.com/api/v1/images?imageId={image_id}"
|
||||
def_headers = get_def_headers(image_info_url)
|
||||
response = requests.get(
|
||||
url=image_info_url,
|
||||
stream=False,
|
||||
verify=False,
|
||||
headers=def_headers,
|
||||
proxies=None,
|
||||
allow_redirects=False,
|
||||
)
|
||||
if response.ok:
|
||||
content_type = response.headers.get("Content-Type")
|
||||
info = response.json()
|
||||
items = info["items"]
|
||||
if len(items) == 0:
|
||||
raise RuntimeError("Civitai /api/v1/images returned 0 items!")
|
||||
image = items[0]["url"]
|
||||
else:
|
||||
raise RuntimeError("Bad response from api/v1/images!")
|
||||
if image.startswith(Civitai.IMAGE_URL_SUBDIRECTORY_PREFIX):
|
||||
image = Civitai.image_subdirectory_url_to_image_url(image)
|
||||
if image.startswith(Civitai.IMAGE_URL_DOMAIN_PREFIX):
|
||||
image = Civitai.image_domain_url_full_size(image)
|
||||
_, image_extension = split_valid_ext(image, image_extensions)
|
||||
if image_extension == "":
|
||||
raise ValueError("Invalid image type!")
|
||||
@@ -514,17 +737,23 @@ def download_model_preview(formdata):
|
||||
|
||||
# detect (and try to fix) wrong file extension
|
||||
image_format = None
|
||||
with Image.open(image_path) as image:
|
||||
image_format = image.format
|
||||
image_dir_and_name, image_ext = os.path.splitext(image_path)
|
||||
if not image_format_is_equal(image_format, image_ext):
|
||||
corrected_image_path = image_dir_and_name + "." + image_format.lower()
|
||||
if os.path.exists(corrected_image_path) and not overwrite:
|
||||
print("WARNING: '" + image_path + "' has wrong extension!")
|
||||
else:
|
||||
os.rename(image_path, corrected_image_path)
|
||||
print("Saved file: " + corrected_image_path)
|
||||
image_path = corrected_image_path
|
||||
try:
|
||||
with Image.open(image_path) as image:
|
||||
image_format = image.format
|
||||
image_dir_and_name, image_ext = os.path.splitext(image_path)
|
||||
if not image_format_is_equal(image_format, image_ext):
|
||||
corrected_image_path = image_dir_and_name + "." + image_format.lower()
|
||||
if os.path.exists(corrected_image_path) and not overwrite:
|
||||
print("WARNING: '" + image_path + "' has wrong extension!")
|
||||
else:
|
||||
os.rename(image_path, corrected_image_path)
|
||||
print("Saved file: " + corrected_image_path)
|
||||
image_path = corrected_image_path
|
||||
except Image.UnidentifiedImageError as e: #TODO: handle case where "image" is actually video
|
||||
print("WARNING: '" + image_path + "' image format was unknown!")
|
||||
os.remove(image_path)
|
||||
print("Deleted file: " + image_path)
|
||||
image_path = ""
|
||||
return image_path # return in-case need corrected path
|
||||
|
||||
|
||||
@@ -532,7 +761,15 @@ def download_model_preview(formdata):
|
||||
async def set_model_preview(request):
|
||||
formdata = await request.post()
|
||||
try:
|
||||
download_model_preview(formdata)
|
||||
search_path = formdata.get("path", None)
|
||||
model_path, model_type = search_path_to_system_path(search_path)
|
||||
|
||||
image = formdata.get("image", None)
|
||||
|
||||
overwrite = formdata.get("overwrite", "true").lower()
|
||||
overwrite = True if overwrite == "true" else False
|
||||
|
||||
download_model_preview(model_path, image, overwrite)
|
||||
return web.json_response({ "success": True })
|
||||
except ValueError as e:
|
||||
print(e, file=sys.stderr, flush=True)
|
||||
@@ -710,8 +947,8 @@ async def get_model_list(request):
|
||||
if image is not None:
|
||||
raw_post = os.path.join(model_type, str(base_path_index), rel_path, image)
|
||||
item["preview"] = {
|
||||
"path": urllib.parse.quote_plus(raw_post),
|
||||
"dateModified": urllib.parse.quote_plus(str(image_modified)),
|
||||
"path": raw_post,
|
||||
"dateModified": str(image_modified),
|
||||
}
|
||||
model_items.append(item)
|
||||
|
||||
@@ -779,6 +1016,116 @@ async def get_directory_list(request):
|
||||
return web.json_response(dir_list)
|
||||
|
||||
|
||||
def try_download_and_save_model_info(model_file_path):
|
||||
success = (0, 0, 0) #info, notes, url
|
||||
head, _ = os.path.splitext(model_file_path)
|
||||
model_info_path = head + model_info_extension
|
||||
model_notes_path = head + model_notes_extension
|
||||
model_url_path = head + ".url"
|
||||
if os.path.exists(model_info_path) and os.path.exists(model_notes_path) and os.path.exists(model_url_path):
|
||||
return success
|
||||
print("Scanning " + model_file_path)
|
||||
|
||||
model_info = {}
|
||||
model_info = ModelInfo.search_info(model_file_path, cache=True, use_cached=True)
|
||||
if len(model_info) == 0:
|
||||
return success
|
||||
success[0] = 1
|
||||
|
||||
if not os.path.exists(model_notes_path):
|
||||
notes = ModelInfo.search_notes(model_file_path)
|
||||
if not notes.isspace() and notes != "":
|
||||
try:
|
||||
with open(model_notes_path, "w", encoding="utf-8") as f:
|
||||
f.write(notes)
|
||||
print("Saved file: " + model_notes_path)
|
||||
success[1] = 1
|
||||
except Exception as e:
|
||||
print(f"Failed to save {model_notes_path}!")
|
||||
print(e, file=sys.stderr, flush=True)
|
||||
|
||||
if not os.path.exists(model_url_path):
|
||||
web_url = ModelInfo.get_url(model_info)
|
||||
if web_url is not None and web_url != "":
|
||||
try:
|
||||
save_web_url(model_url_path, web_url)
|
||||
print("Saved file: " + model_url_path)
|
||||
success[2] = 1
|
||||
except Exception as e:
|
||||
print(f"Failed to save {model_url_path}!")
|
||||
print(e, file=sys.stderr, flush=True)
|
||||
return success
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.post("/model-manager/models/scan")
|
||||
async def try_scan_download(request):
|
||||
refresh = request.query.get("refresh", None) is not None
|
||||
response = {
|
||||
"success": False,
|
||||
"infoCount": 0,
|
||||
"notesCount": 0,
|
||||
"urlCount": 0,
|
||||
}
|
||||
model_paths = folder_paths_folder_names_and_paths(refresh)
|
||||
for _, (model_dirs, model_extension_whitelist) in model_paths.items():
|
||||
for root_dir in model_dirs:
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
for file in files:
|
||||
file_name, file_extension = os.path.splitext(file)
|
||||
if file_extension not in model_extension_whitelist:
|
||||
continue
|
||||
model_file_path = root + os.path.sep + file
|
||||
savedInfo, savedNotes, savedUrl = try_download_and_save_model_info(model_file_path)
|
||||
response["infoCount"] += savedInfo
|
||||
response["notesCount"] += savedNotes
|
||||
response["urlCount"] += savedUrl
|
||||
|
||||
response["success"] = True
|
||||
return web.json_response(response)
|
||||
|
||||
@server.PromptServer.instance.routes.post("/model-manager/preview/scan")
|
||||
async def try_scan_download_previews(request):
|
||||
refresh = request.query.get("refresh", None) is not None
|
||||
response = {
|
||||
"success": False,
|
||||
"count": 0,
|
||||
}
|
||||
model_paths = folder_paths_folder_names_and_paths(refresh)
|
||||
for _, (model_dirs, model_extension_whitelist) in model_paths.items():
|
||||
for root_dir in model_dirs:
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
for file in files:
|
||||
file_name, file_extension = os.path.splitext(file)
|
||||
if file_extension not in model_extension_whitelist:
|
||||
continue
|
||||
model_file_path = root + os.path.sep + file
|
||||
model_file_head = os.path.splitext(model_file_path)[0]
|
||||
|
||||
preview_exists = False
|
||||
for preview_extension in preview_extensions:
|
||||
preview_path = model_file_head + preview_extension
|
||||
if os.path.isfile(preview_path):
|
||||
preview_exists = True
|
||||
break
|
||||
if preview_exists:
|
||||
continue
|
||||
|
||||
model_info = ModelInfo.try_load_cached(model_file_path) # NOTE: model info must already be downloaded
|
||||
web_previews = ModelInfo.get_web_preview_urls(model_info, True)
|
||||
if len(web_previews) == 0:
|
||||
continue
|
||||
saved_image_path = download_model_preview(
|
||||
model_file_path,
|
||||
image=web_previews[0],
|
||||
overwrite=False,
|
||||
)
|
||||
if os.path.isfile(saved_image_path):
|
||||
response["count"] += 1
|
||||
|
||||
response["success"] = True
|
||||
return web.json_response(response)
|
||||
|
||||
|
||||
def download_file(url, filename, overwrite):
|
||||
if not overwrite and os.path.isfile(filename):
|
||||
raise ValueError("File already exists!")
|
||||
@@ -885,13 +1232,13 @@ def bytes_to_size(total_bytes):
|
||||
return "{:.2f}".format(total_bytes / (1 << (i * 10))) + " " + units[i]
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.get("/model-manager/model/info")
|
||||
async def get_model_info(request):
|
||||
@server.PromptServer.instance.routes.get("/model-manager/model/info/{path}")
|
||||
async def get_model_metadata(request):
|
||||
result = { "success": False }
|
||||
|
||||
model_path = request.query.get("path", None)
|
||||
model_path = request.match_info["path"]
|
||||
if model_path is None:
|
||||
result["alert"] = "Missing model path!"
|
||||
result["alert"] = "Invalid model path!"
|
||||
return web.json_response(result)
|
||||
model_path = urllib.parse.unquote(model_path)
|
||||
|
||||
@@ -900,16 +1247,16 @@ async def get_model_info(request):
|
||||
result["alert"] = "Invalid model path!"
|
||||
return web.json_response(result)
|
||||
|
||||
info = {}
|
||||
data = {}
|
||||
comfyui_directory, name = os.path.split(model_path)
|
||||
info["File Name"] = name
|
||||
info["File Directory"] = comfyui_directory
|
||||
info["File Size"] = bytes_to_size(os.path.getsize(abs_path))
|
||||
data["File Name"] = name
|
||||
data["File Directory"] = comfyui_directory
|
||||
data["File Size"] = bytes_to_size(os.path.getsize(abs_path))
|
||||
stats = pathlib.Path(abs_path).stat()
|
||||
date_format = "%Y-%m-%d %H:%M:%S"
|
||||
date_modified = datetime.fromtimestamp(stats.st_mtime).strftime(date_format)
|
||||
#info["Date Modified"] = date_modified
|
||||
#info["Date Created"] = datetime.fromtimestamp(stats.st_ctime).strftime(date_format)
|
||||
#data["Date Modified"] = date_modified
|
||||
#data["Date Created"] = datetime.fromtimestamp(stats.st_ctime).strftime(date_format)
|
||||
|
||||
model_extensions = folder_paths_get_supported_pt_extensions(model_type)
|
||||
abs_name , _ = split_valid_ext(abs_path, model_extensions)
|
||||
@@ -919,36 +1266,36 @@ async def get_model_info(request):
|
||||
if os.path.isfile(maybe_preview):
|
||||
preview_path, _ = split_valid_ext(model_path, model_extensions)
|
||||
preview_modified = pathlib.Path(maybe_preview).stat().st_mtime_ns
|
||||
info["Preview"] = {
|
||||
"path": urllib.parse.quote_plus(preview_path + extension),
|
||||
"dateModified": urllib.parse.quote_plus(str(preview_modified)),
|
||||
data["Preview"] = {
|
||||
"path": preview_path + extension,
|
||||
"dateModified": str(preview_modified),
|
||||
}
|
||||
break
|
||||
|
||||
header = get_safetensor_header(abs_path)
|
||||
metadata = header.get("__metadata__", None)
|
||||
|
||||
if metadata is not None and info.get("Preview", None) is None:
|
||||
if metadata is not None and data.get("Preview", None) is None:
|
||||
thumbnail = metadata.get("modelspec.thumbnail")
|
||||
if thumbnail is not None:
|
||||
i0 = thumbnail.find("/") + 1
|
||||
i1 = thumbnail.find(";", i0)
|
||||
thumbnail_extension = "." + thumbnail[i0:i1]
|
||||
if thumbnail_extension in image_extensions:
|
||||
info["Preview"] = {
|
||||
data["Preview"] = {
|
||||
"path": request.query["path"] + thumbnail_extension,
|
||||
"dateModified": date_modified,
|
||||
}
|
||||
|
||||
if metadata is not None:
|
||||
info["Base Training Model"] = metadata.get("ss_sd_model_name", "")
|
||||
info["Base Model Version"] = metadata.get("ss_base_model_version", "")
|
||||
info["Network Dimension"] = metadata.get("ss_network_dim", "")
|
||||
info["Network Alpha"] = metadata.get("ss_network_alpha", "")
|
||||
data["Base Training Model"] = metadata.get("ss_sd_model_name", "")
|
||||
data["Base Model Version"] = metadata.get("ss_base_model_version", "")
|
||||
data["Network Dimension"] = metadata.get("ss_network_dim", "")
|
||||
data["Network Alpha"] = metadata.get("ss_network_alpha", "")
|
||||
|
||||
if metadata is not None:
|
||||
training_comment = metadata.get("ss_training_comment", "")
|
||||
info["Description"] = (
|
||||
data["Description"] = (
|
||||
metadata.get("modelspec.description", "") +
|
||||
"\n\n" +
|
||||
metadata.get("modelspec.usage_hint", "") +
|
||||
@@ -956,12 +1303,17 @@ async def get_model_info(request):
|
||||
training_comment if training_comment != "None" else ""
|
||||
).strip()
|
||||
|
||||
info_text_file = abs_name + model_info_extension
|
||||
notes_file = abs_name + model_notes_extension
|
||||
notes = ""
|
||||
if os.path.isfile(info_text_file):
|
||||
with open(info_text_file, 'r', encoding="utf-8") as f:
|
||||
if os.path.isfile(notes_file):
|
||||
with open(notes_file, 'r', encoding="utf-8") as f:
|
||||
notes = f.read()
|
||||
|
||||
web_url_file = abs_name + ".url"
|
||||
web_url = ""
|
||||
if os.path.isfile(web_url_file):
|
||||
web_url = try_load_web_url(web_url_file)
|
||||
|
||||
if metadata is not None:
|
||||
img_buckets = metadata.get("ss_bucket_info", None)
|
||||
datasets = metadata.get("ss_datasets", None)
|
||||
@@ -983,7 +1335,7 @@ async def get_model_info(request):
|
||||
resolutions[str(x) + "x" + str(y)] = count
|
||||
resolutions = list(resolutions.items())
|
||||
resolutions.sort(key=lambda x: x[1], reverse=True)
|
||||
info["Bucket Resolutions"] = resolutions
|
||||
data["Bucket Resolutions"] = resolutions
|
||||
|
||||
tags = None
|
||||
if metadata is not None:
|
||||
@@ -997,21 +1349,82 @@ async def get_model_info(request):
|
||||
tags = list(tags.items())
|
||||
tags.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
model_info = ModelInfo.try_load_cached(abs_path)
|
||||
web_previews = ModelInfo.get_web_preview_urls(model_info, True)
|
||||
|
||||
result["success"] = True
|
||||
result["info"] = info
|
||||
result["info"] = data
|
||||
if metadata is not None:
|
||||
result["metadata"] = metadata
|
||||
if tags is not None:
|
||||
result["tags"] = tags
|
||||
result["notes"] = notes
|
||||
result["url"] = web_url
|
||||
result["webPreviews"] = web_previews
|
||||
return web.json_response(result)
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.get("/model-manager/model/web-url")
|
||||
async def get_model_web_url(request):
|
||||
result = { "success": False }
|
||||
|
||||
model_path = request.query.get("path", None)
|
||||
if model_path is None:
|
||||
result["alert"] = "Invalid model path!"
|
||||
return web.json_response(result)
|
||||
model_path = urllib.parse.unquote(model_path)
|
||||
|
||||
abs_path, model_type = search_path_to_system_path(model_path)
|
||||
if abs_path is None:
|
||||
result["alert"] = "Invalid model path!"
|
||||
return web.json_response(result)
|
||||
|
||||
url_path = os.path.splitext(abs_path)[0] + ".url"
|
||||
if os.path.isfile(url_path):
|
||||
web_url = try_load_web_url(url_path)
|
||||
if web_url != "":
|
||||
result["success"] = True
|
||||
return web.json_response({ "url": web_url })
|
||||
|
||||
model_info = ModelInfo.search_info(abs_path)
|
||||
if len(model_info) == 0:
|
||||
result["alert"] = "Unable to find model info!"
|
||||
return web.json_response(result)
|
||||
web_url = ModelInfo.get_url(model_info)
|
||||
if web_url != "" and web_url is not None:
|
||||
save_web_url(url_path, web_url)
|
||||
result["success"] = True
|
||||
|
||||
return web.json_response({ "url": web_url })
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.get("/model-manager/system-separator")
|
||||
async def get_system_separator(request):
|
||||
return web.json_response(os.path.sep)
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.post("/model-manager/model/download/info")
|
||||
async def download_model_info(request):
|
||||
result = { "success": False }
|
||||
|
||||
model_path = request.query.get("path", None)
|
||||
if model_path is None:
|
||||
result["alert"] = "Missing model path!"
|
||||
return web.json_response(result)
|
||||
model_path = urllib.parse.unquote(model_path)
|
||||
|
||||
abs_path, model_type = search_path_to_system_path(model_path)
|
||||
if abs_path is None:
|
||||
result["alert"] = "Invalid model path!"
|
||||
return web.json_response(result)
|
||||
|
||||
model_info = ModelInfo.search_info(abs_path, cache=True, use_cached=False)
|
||||
if len(model_info) > 0:
|
||||
result["success"] = True
|
||||
|
||||
return web.json_response(result)
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.post("/model-manager/model/download")
|
||||
async def download_model(request):
|
||||
formdata = await request.post()
|
||||
@@ -1026,6 +1439,7 @@ async def download_model(request):
|
||||
result["alert"] = "Invalid save path!"
|
||||
return web.json_response(result)
|
||||
|
||||
# download model
|
||||
download_uri = formdata.get("download")
|
||||
if download_uri is None:
|
||||
result["alert"] = "Invalid download url!"
|
||||
@@ -1049,14 +1463,24 @@ async def download_model(request):
|
||||
result["alert"] = "Failed to download model!\n\n" + str(e)
|
||||
return web.json_response(result)
|
||||
|
||||
# download model info
|
||||
model_info = ModelInfo.search_info(file_name, cache=True) # save json
|
||||
|
||||
# save url
|
||||
url_file_path = os.path.splitext(file_name)[0] + ".url"
|
||||
url = ModelInfo.get_url(model_info)
|
||||
if url != "" and url is not None:
|
||||
save_web_url(url_file_path, url)
|
||||
|
||||
# save image as model preview
|
||||
image = formdata.get("image")
|
||||
if image is not None and image != "":
|
||||
try:
|
||||
download_model_preview({
|
||||
"path": model_path + os.sep + name,
|
||||
"image": image,
|
||||
"overwrite": formdata.get("overwrite"),
|
||||
})
|
||||
download_model_preview(
|
||||
file_name,
|
||||
image,
|
||||
formdata.get("overwrite"),
|
||||
)
|
||||
except Exception as e:
|
||||
print(e, file=sys.stderr, flush=True)
|
||||
result["alert"] = "Failed to download preview!\n\n" + str(e)
|
||||
@@ -1122,7 +1546,7 @@ async def move_model(request):
|
||||
return web.json_response(result)
|
||||
|
||||
# TODO: this could overwrite existing files in destination; do a check beforehand?
|
||||
for extension in preview_extensions + (model_info_extension,):
|
||||
for extension in preview_extensions + (model_notes_extension,) + (model_info_extension,):
|
||||
old_file = old_file_without_extension + extension
|
||||
if os.path.isfile(old_file):
|
||||
new_file = new_file_without_extension + extension
|
||||
@@ -1176,6 +1600,7 @@ async def delete_model(request):
|
||||
print("Deleted file: " + model_path)
|
||||
|
||||
delete_same_name_files(path_and_name, preview_extensions)
|
||||
delete_same_name_files(path_and_name, (model_notes_extension,))
|
||||
delete_same_name_files(path_and_name, (model_info_extension,))
|
||||
|
||||
return web.json_response(result)
|
||||
@@ -1200,7 +1625,7 @@ async def set_notes(request):
|
||||
model_path, model_type = search_path_to_system_path(model_path)
|
||||
model_extensions = folder_paths_get_supported_pt_extensions(model_type)
|
||||
file_path_without_extension, _ = split_valid_ext(model_path, model_extensions)
|
||||
filename = os.path.normpath(file_path_without_extension + model_info_extension)
|
||||
filename = os.path.normpath(file_path_without_extension + model_notes_extension)
|
||||
|
||||
if dt_epoch is not None and os.path.exists(filename) and os.path.getmtime(filename) > dt_epoch:
|
||||
# discard late save
|
||||
@@ -1221,12 +1646,52 @@ async def set_notes(request):
|
||||
except ValueError as e:
|
||||
print(e, file=sys.stderr, flush=True)
|
||||
result["alert"] = "Failed to save notes!\n\n" + str(e)
|
||||
web.json_response(result)
|
||||
return web.json_response(result)
|
||||
|
||||
result["success"] = True
|
||||
return web.json_response(result)
|
||||
|
||||
|
||||
@server.PromptServer.instance.routes.post("/model-manager/notes/download")
|
||||
async def try_download_notes(request):
|
||||
result = { "success": False }
|
||||
|
||||
model_path = request.query.get("path", None)
|
||||
if model_path is None:
|
||||
result["alert"] = "Missing model path!"
|
||||
return web.json_response(result)
|
||||
model_path = urllib.parse.unquote(model_path)
|
||||
|
||||
abs_path, model_type = search_path_to_system_path(model_path)
|
||||
if abs_path is None:
|
||||
result["alert"] = "Invalid model path!"
|
||||
return web.json_response(result)
|
||||
|
||||
overwrite = request.query.get("overwrite", None)
|
||||
overwrite = not (overwrite == "False" or overwrite == "false" or overwrite == None)
|
||||
notes_path = os.path.splitext(abs_path)[0] + ".txt"
|
||||
if not overwrite and os.path.isfile(notes_path):
|
||||
result["alert"] = "Notes already exist!"
|
||||
return web.json_response(result)
|
||||
|
||||
notes = ModelInfo.search_notes(abs_path)
|
||||
if notes.isspace() or notes == "":
|
||||
result["alert"] = "No notes found!"
|
||||
return web.json_response(result)
|
||||
|
||||
try:
|
||||
with open(notes_path, "w", encoding="utf-8") as f:
|
||||
f.write(notes)
|
||||
result["success"] = True
|
||||
except ValueError as e:
|
||||
print(e, file=sys.stderr, flush=True)
|
||||
result["alert"] = "Failed to save notes!\n\n" + str(e)
|
||||
return web.json_response(result)
|
||||
|
||||
result["notes"] = notes
|
||||
return web.json_response(result)
|
||||
|
||||
|
||||
WEB_DIRECTORY = "web"
|
||||
NODE_CLASS_MAPPINGS = {}
|
||||
__all__ = ["NODE_CLASS_MAPPINGS"]
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 72 KiB After Width: | Height: | Size: 77 KiB |
1
requirements.txt
Normal file
1
requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
markdownify
|
||||
@@ -10,7 +10,7 @@
|
||||
position: fixed;
|
||||
overflow: hidden;
|
||||
width: 100%;
|
||||
z-index: 1100;
|
||||
z-index: 1100; /*needs to be below the dialog modal element*/
|
||||
|
||||
/*override comfy-modal settings*/
|
||||
border-radius: 0;
|
||||
@@ -23,6 +23,10 @@
|
||||
touch-action: manipulation;
|
||||
}
|
||||
|
||||
.model-manager .model-manager-dialog {
|
||||
z-index: 2001; /*needs to be above the model manager element*/
|
||||
}
|
||||
|
||||
.model-manager .comfy-modal-content {
|
||||
width: 100%;
|
||||
gap: 16px;
|
||||
@@ -249,6 +253,10 @@
|
||||
user-select: none;
|
||||
}
|
||||
|
||||
.model-manager code {
|
||||
text-wrap: wrap;
|
||||
}
|
||||
|
||||
/* main content */
|
||||
.model-manager .model-manager-panel {
|
||||
color: var(--fg-color);
|
||||
@@ -409,6 +417,11 @@
|
||||
border-radius: 8px;
|
||||
}
|
||||
|
||||
.model-manager .model-info-container .item {
|
||||
width: fit-content;
|
||||
height: 50vh;
|
||||
}
|
||||
|
||||
.model-manager .item img {
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
@@ -416,15 +429,13 @@
|
||||
border-radius: 8px;
|
||||
}
|
||||
|
||||
.model-manager .model-info-container .item {
|
||||
width: auto;
|
||||
height: auto;
|
||||
}
|
||||
.model-manager .model-info-container .item img {
|
||||
.model-manager .model-info-container .item img,
|
||||
.model-manager .model-preview-full {
|
||||
height: auto;
|
||||
width: auto;
|
||||
max-width: 100%;
|
||||
max-height: 50vh;
|
||||
border-radius: 8px;
|
||||
}
|
||||
|
||||
.model-manager .model-preview-button-left,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user