import os import pathlib import sys import copy import importlib import re from aiohttp import web import server import urllib.parse import urllib.request import struct import json import requests requests.packages.urllib3.disable_warnings() import folder_paths config_loader_path = os.path.join(os.path.dirname(__file__), 'config_loader.py') config_loader_spec = importlib.util.spec_from_file_location('config_loader', config_loader_path) config_loader = importlib.util.module_from_spec(config_loader_spec) config_loader_spec.loader.exec_module(config_loader) comfyui_model_uri = os.path.join(os.getcwd(), "models") extension_uri = os.path.join(os.getcwd(), "custom_nodes" + os.path.sep + "ComfyUI-Model-Manager") no_preview_image = os.path.join(extension_uri, "no-preview.png") ui_settings_uri = os.path.join(extension_uri, "ui_settings.yaml") server_settings_uri = os.path.join(extension_uri, "server_settings.yaml") fallback_model_extensions = set([".bin", ".ckpt", ".onnx", ".pt", ".pth", ".safetensors"]) # TODO: magic values image_extensions = (".apng", ".gif", ".jpeg", ".jpg", ".png", ".webp") #video_extensions = (".avi", ".mp4", ".webm") # TODO: Requires ffmpeg or cv2. Cache preview frame? _folder_names_and_paths = None # dict[str, tuple[list[str], list[str]]] def folder_paths_folder_names_and_paths(refresh = False): global _folder_names_and_paths if refresh or _folder_names_and_paths is None: _folder_names_and_paths = {} for item_name in os.listdir(comfyui_model_uri): item_path = os.path.join(comfyui_model_uri, item_name) if not os.path.isdir(item_path): continue if item_name == "configs": continue if item_name in folder_paths.folder_names_and_paths: dir_paths, extensions = copy.deepcopy(folder_paths.folder_names_and_paths[item_name]) else: dir_paths = [item_path] extensions = copy.deepcopy(fallback_model_extensions) _folder_names_and_paths[item_name] = (dir_paths, extensions) return _folder_names_and_paths def folder_paths_get_folder_paths(folder_name, refresh = False): # API function crashes querying unknown model folder paths = folder_paths_folder_names_and_paths(refresh) if folder_name in paths: return paths[folder_name][0] maybe_path = os.path.join(comfyui_model_uri, folder_name) if os.path.exists(maybe_path): return [maybe_path] return [] def folder_paths_get_supported_pt_extensions(folder_name, refresh = False): # Missing API function paths = folder_paths_folder_names_and_paths(refresh) if folder_name in paths: return paths[folder_name][1] model_extensions = copy.deepcopy(fallback_model_extensions) return model_extensions def get_safetensor_header(path): try: with open(path, "rb") as f: length_of_header = struct.unpack(" 0: # DEPTH-FIRST dir_path, dir_index = dir_stack.pop() dir_items = os.listdir(dir_path) dir_items = sorted(dir_items, key=str.casefold) dir_child_count = 0 # TODO: sort content of directory: alphabetically # TODO: sort content of directory: files first subdirs = [] for item_name in dir_items: # BREADTH-FIRST item_path = os.path.join(dir_path, item_name) if os.path.isdir(item_path): # dir subdir_index = len(dir_list) # this must be done BEFORE `dir_list.append` subdirs.append((item_path, subdir_index)) dir_list.append({ "name": item_name, "childIndex": None, "childCount": 0 }) dir_child_count += 1 else: # file _, file_extension = os.path.splitext(item_name) if extension_whitelist is None or file_extension in extension_whitelist: dir_list.append({ "name": item_name }) dir_child_count += 1 if dir_child_count > 0: dir_list[dir_index]["childIndex"] = len(dir_list) - dir_child_count dir_list[dir_index]["childCount"] = dir_child_count subdirs.reverse() for dir_path, subdir_index in subdirs: dir_stack.append((dir_path, subdir_index)) return dir_list @server.PromptServer.instance.routes.get("/model-manager/model-directory-list") async def directory_list(request): #body = await request.json() dir_list = linear_directory_hierarchy(True) #json.dump(dir_list, sys.stdout, indent=4) return web.json_response(dir_list) def download_file(url, filename, overwrite): if not overwrite and os.path.isfile(filename): raise Exception("File already exists!") filename_temp = filename + ".download" def_headers = { "User-Agent": "Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148", } if url.startswith("https://civitai.com/"): api_key = server_settings["civitai_api_key"] if (api_key != ""): def_headers["Authorization"] = f"Bearer {api_key}" elif url.startswith("https://huggingface.co/"): api_key = server_settings["huggingface_api_key"] if api_key != "": def_headers["Authorization"] = f"Bearer {api_key}" rh = requests.get(url=url, stream=True, verify=False, headers=def_headers, proxies=None, allow_redirects=False) if not rh.ok: raise Exception("Unable to download") downloaded_size = 0 if rh.status_code == 200 and os.path.exists(filename_temp): downloaded_size = os.path.getsize(filename_temp) headers = {"Range": "bytes=%d-" % downloaded_size} headers["User-Agent"] = def_headers["User-Agent"] r = requests.get(url=url, stream=True, verify=False, headers=headers, proxies=None, allow_redirects=False) if rh.status_code == 307 and r.status_code == 307: # Civitai redirect redirect_url = r.content.decode("utf-8") if not redirect_url.startswith("http"): # Civitai requires login (NSFW or user-required) # TODO: inform user WHY download failed raise Exception("Unable to download!") download_file(redirect_url, filename, overwrite) return if rh.status_code == 302 and r.status_code == 302: # HuggingFace redirect redirect_url = r.content.decode("utf-8") redirect_url_index = redirect_url.find("http") if redirect_url_index == -1: raise Exception("Unable to download!") download_file(redirect_url[redirect_url_index:], filename, overwrite) return elif rh.status_code == 200 and r.status_code == 206: # Civitai download link pass total_size = int(rh.headers.get("Content-Length", 0)) # TODO: pass in total size earlier print("Download file: " + filename) if total_size != 0: print("Download file size: " + str(total_size)) mode = "wb" if overwrite else "ab" with open(filename_temp, mode) as f: for chunk in r.iter_content(chunk_size=1024): if chunk is not None: downloaded_size += len(chunk) f.write(chunk) f.flush() if total_size != 0: fraction = 1 if downloaded_size == total_size else downloaded_size / total_size progress = int(50 * fraction) sys.stdout.reconfigure(encoding="utf-8") sys.stdout.write( "\r[%s%s] %d%%" % ( "-" * progress, " " * (50 - progress), 100 * fraction, ) ) sys.stdout.flush() print() if overwrite and os.path.isfile(filename): os.remove(filename) os.rename(filename_temp, filename) @server.PromptServer.instance.routes.post("/model-manager/download") async def download_model(request): body = await request.json() overwrite = body.get("overwrite", False) model_type = body.get("type") model_path_type = model_type_to_dir_name(model_type) if model_path_type is None or model_path_type == "": return web.json_response({"success": False}) model_path = body.get("path", "/0") model_path = model_path.replace("/", os.path.sep) regex_result = re.search(r'\d+', model_path) if regex_result is None: return web.json_response({"success": False}) model_path_index = int(regex_result.group()) paths = folder_paths_get_folder_paths(model_path_type) if model_path_index < 0 or model_path_index >= len(paths): return web.json_response({"success": False}) model_path_span = regex_result.span() directory = os.path.join( comfyui_model_uri, ( paths[model_path_index] + model_path[model_path_span[1]:] ) ) download_uri = body.get("download") if download_uri is None: return web.json_response({"success": False}) name = body.get("name") model_extension = None for ext in folder_paths_get_supported_pt_extensions(model_type): if name.endswith(ext): model_extension = ext break if model_extension is None: return web.json_response({"success": False}) file_name = os.path.join(directory, name) try: download_file(download_uri, file_name, overwrite) except: return web.json_response({"success": False}) image_uri = body.get("image") if image_uri is not None and image_uri != "": image_extension = None for ext in image_extensions: if image_uri.endswith(ext): image_extension = ext break if image_extension is not None: image_name = os.path.join( directory, (name[:len(name) - len(model_extension)]) + image_extension ) try: download_file(image_uri, image_name, overwrite) except Exception as e: print(e, file=sys.stderr, flush=True) return web.json_response({"success": True}) WEB_DIRECTORY = "web" NODE_CLASS_MAPPINGS = {} __all__ = ["NODE_CLASS_MAPPINGS"]