import os import pathlib import sys import copy import hashlib import importlib import re from aiohttp import web import server import urllib.parse import urllib.request import struct import json import requests requests.packages.urllib3.disable_warnings() import folder_paths config_loader_path = os.path.join(os.path.dirname(__file__), 'config_loader.py') config_loader_spec = importlib.util.spec_from_file_location('config_loader', config_loader_path) config_loader = importlib.util.module_from_spec(config_loader_spec) config_loader_spec.loader.exec_module(config_loader) comfyui_model_uri = os.path.join(os.getcwd(), "models") extension_uri = os.path.join(os.getcwd(), "custom_nodes" + os.path.sep + "ComfyUI-Model-Manager") index_uri = os.path.join(extension_uri, "index.json") #checksum_cache_uri = os.path.join(extension_uri, "checksum_cache.txt") no_preview_image = os.path.join(extension_uri, "no-preview.png") ui_settings_uri = os.path.join(extension_uri, "ui_settings.yaml") server_settings_uri = os.path.join(extension_uri, "server_settings.yaml") fallback_model_extensions = set([".bin", ".ckpt", ".onnx", ".pt", ".pth", ".safetensors"]) # TODO: magic values image_extensions = (".apng", ".gif", ".jpeg", ".jpg", ".png", ".webp") #video_extensions = (".avi", ".mp4", ".webm") # TODO: Requires ffmpeg or cv2. Cache preview frame? #hash_buffer_size = 4096 _folder_names_and_paths = None # dict[str, tuple[list[str], list[str]]] def folder_paths_folder_names_and_paths(refresh = False): global _folder_names_and_paths if refresh or _folder_names_and_paths is None: _folder_names_and_paths = {} for item_name in os.listdir(comfyui_model_uri): item_path = os.path.join(comfyui_model_uri, item_name) if not os.path.isdir(item_path): continue if item_name == "configs": continue if item_name in folder_paths.folder_names_and_paths: dir_paths, extensions = copy.deepcopy(folder_paths.folder_names_and_paths[item_name]) else: dir_paths = [item_path] extensions = copy.deepcopy(fallback_model_extensions) _folder_names_and_paths[item_name] = (dir_paths, extensions) return _folder_names_and_paths def folder_paths_get_folder_paths(folder_name, refresh = False): # API function crashes querying unknown model folder paths = folder_paths_folder_names_and_paths(refresh) if folder_name in paths: return paths[folder_name][0] maybe_path = os.path.join(comfyui_model_uri, folder_name) if os.path.exists(maybe_path): return [maybe_path] return [] def folder_paths_get_supported_pt_extensions(folder_name, refresh = False): # Missing API function paths = folder_paths_folder_names_and_paths(refresh) if folder_name in paths: return paths[folder_name][1] model_extensions = copy.deepcopy(fallback_model_extensions) return model_extensions def get_safetensor_header(path): try: with open(path, "rb") as f: length_of_header = struct.unpack(" 0: # DEPTH-FIRST dir_path, dir_index = dir_stack.pop() dir_items = os.listdir(dir_path) dir_items = sorted(dir_items, key=str.casefold) dir_child_count = 0 # TODO: sort content of directory: alphabetically # TODO: sort content of directory: files first subdirs = [] for item_name in dir_items: # BREADTH-FIRST item_path = os.path.join(dir_path, item_name) if os.path.isdir(item_path): # dir subdir_index = len(dir_list) # this must be done BEFORE `dir_list.append` subdirs.append((item_path, subdir_index)) dir_list.append({ "name": item_name, "childIndex": None, "childCount": 0 }) dir_child_count += 1 else: # file _, file_extension = os.path.splitext(item_name) if extension_whitelist is None or file_extension in extension_whitelist: dir_list.append({ "name": item_name }) dir_child_count += 1 if dir_child_count > 0: dir_list[dir_index]["childIndex"] = len(dir_list) - dir_child_count dir_list[dir_index]["childCount"] = dir_child_count subdirs.reverse() for dir_path, subdir_index in subdirs: dir_stack.append((dir_path, subdir_index)) return dir_list @server.PromptServer.instance.routes.get("/model-manager/model-directory-list") async def directory_list(request): #body = await request.json() dir_list = linear_directory_hierarchy(True) #json.dump(dir_list, sys.stdout, indent=4) return web.json_response(dir_list) def download_file(url, filename, overwrite): if not overwrite and os.path.isfile(filename): raise Exception("File already exists!") filename_temp = filename + ".download" def_headers = { "User-Agent": "Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148", } if url.startswith("https://civitai.com/"): api_key = server_settings["civitai_api_key"] if (api_key != ""): def_headers["Authorization"] = f"Bearer {api_key}" elif url.startswith("https://huggingface.co/"): api_key = server_settings["huggingface_api_key"] if api_key != "": def_headers["Authorization"] = f"Bearer {api_key}" rh = requests.get(url=url, stream=True, verify=False, headers=def_headers, proxies=None, allow_redirects=False) if not rh.ok: raise Exception("Unable to download") downloaded_size = 0 if rh.status_code == 200 and os.path.exists(filename_temp): downloaded_size = os.path.getsize(filename_temp) headers = {"Range": "bytes=%d-" % downloaded_size} headers["User-Agent"] = def_headers["User-Agent"] r = requests.get(url=url, stream=True, verify=False, headers=headers, proxies=None, allow_redirects=False) if rh.status_code == 307 and r.status_code == 307: # Civitai redirect redirect_url = r.content.decode("utf-8") if not redirect_url.startswith("http"): # Civitai requires login (NSFW or user-required) # TODO: inform user WHY download failed raise Exception("Unable to download!") download_file(redirect_url, filename, overwrite) return if rh.status_code == 302 and r.status_code == 302: # HuggingFace redirect redirect_url = r.content.decode("utf-8") redirect_url_index = redirect_url.find("http") if redirect_url_index == -1: raise Exception("Unable to download!") download_file(redirect_url[redirect_url_index:], filename, overwrite) return elif rh.status_code == 200 and r.status_code == 206: # Civitai download link pass total_size = int(rh.headers.get("Content-Length", 0)) # TODO: pass in total size earlier print("Download file: " + filename) if total_size != 0: print("Download file size: " + str(total_size)) mode = "wb" if overwrite else "ab" with open(filename_temp, mode) as f: for chunk in r.iter_content(chunk_size=1024): if chunk is not None: downloaded_size += len(chunk) f.write(chunk) f.flush() if total_size != 0: fraction = 1 if downloaded_size == total_size else downloaded_size / total_size progress = int(50 * fraction) sys.stdout.reconfigure(encoding="utf-8") sys.stdout.write( "\r[%s%s] %d%%" % ( "-" * progress, " " * (50 - progress), 100 * fraction, ) ) sys.stdout.flush() print() if overwrite and os.path.isfile(filename): os.remove(filename) os.rename(filename_temp, filename) @server.PromptServer.instance.routes.post("/model-manager/download") async def download_model(request): body = await request.json() overwrite = body.get("overwrite", False) model_type = body.get("type") model_path_type = model_type_to_dir_name(model_type) if model_path_type is None or model_path_type == "": return web.json_response({"success": False}) model_path = body.get("path", "/0") model_path = model_path.replace("/", os.path.sep) regex_result = re.search(r'\d+', model_path) if regex_result is None: return web.json_response({"success": False}) model_path_index = int(regex_result.group()) paths = folder_paths_get_folder_paths(model_path_type) if model_path_index < 0 or model_path_index >= len(paths): return web.json_response({"success": False}) model_path_span = regex_result.span() directory = os.path.join( comfyui_model_uri, ( paths[model_path_index] + model_path[model_path_span[1]:] ) ) download_uri = body.get("download") if download_uri is None: return web.json_response({"success": False}) name = body.get("name") model_extension = None for ext in folder_paths_get_supported_pt_extensions(model_type): if name.endswith(ext): model_extension = ext break if model_extension is None: return web.json_response({"success": False}) file_name = os.path.join(directory, name) try: download_file(download_uri, file_name, overwrite) except: return web.json_response({"success": False}) image_uri = body.get("image") if image_uri is not None and image_uri != "": image_extension = None for ext in image_extensions: if image_uri.endswith(ext): image_extension = ext break if image_extension is not None: image_name = os.path.join( directory, (name[:len(name) - len(model_extension)]) + image_extension ) try: download_file(image_uri, image_name, overwrite) except Exception as e: print(e, file=sys.stderr, flush=True) return web.json_response({"success": True}) WEB_DIRECTORY = "web" NODE_CLASS_MAPPINGS = {} __all__ = ["NODE_CLASS_MAPPINGS"]