import os import pathlib import sys import copy import hashlib import importlib from aiohttp import web import server import urllib.parse import struct import json import requests requests.packages.urllib3.disable_warnings() import folder_paths config_loader_path = os.path.join(os.path.dirname(__file__), 'config_loader.py') config_loader_spec = importlib.util.spec_from_file_location('config_loader', config_loader_path) config_loader = importlib.util.module_from_spec(config_loader_spec) config_loader_spec.loader.exec_module(config_loader) comfyui_model_uri = os.path.join(os.getcwd(), "models") extension_uri = os.path.join(os.getcwd(), "custom_nodes" + os.path.sep + "ComfyUI-Model-Manager") index_uri = os.path.join(extension_uri, "index.json") #checksum_cache_uri = os.path.join(extension_uri, "checksum_cache.txt") no_preview_image = os.path.join(extension_uri, "no-preview.png") ui_settings_uri = os.path.join(extension_uri, "ui_settings.yaml") fallback_model_extensions = set([".bin", ".ckpt", ".onnx", ".pt", ".pth", ".safetensors"]) # TODO: magic values image_extensions = (".apng", ".gif", ".jpeg", ".jpg", ".png", ".webp") #video_extensions = (".avi", ".mp4", ".webm") # TODO: Requires ffmpeg or cv2. Cache preview frame? #hash_buffer_size = 4096 _folder_names_and_paths = None # dict[str, tuple[list[str], list[str]]] def folder_paths_folder_names_and_paths(refresh = False): global _folder_names_and_paths if refresh or _folder_names_and_paths is None: _folder_names_and_paths = {} for item_name in os.listdir(comfyui_model_uri): item_path = os.path.join(comfyui_model_uri, item_name) if not os.path.isdir(item_path): continue if item_name == "configs": continue if item_name in folder_paths.folder_names_and_paths: dir_paths, extensions = copy.deepcopy(folder_paths.folder_names_and_paths[item_name]) else: dir_paths = [item_path] extensions = copy.deepcopy(fallback_model_extensions) _folder_names_and_paths[item_name] = (dir_paths, extensions) return _folder_names_and_paths def folder_paths_get_folder_paths(folder_name, refresh = False): # API function crashes querying unknown model folder paths = folder_paths_folder_names_and_paths(refresh) if folder_name in paths: return paths[folder_name][0] maybe_path = os.path.join(comfyui_model_uri, folder_name) if os.path.exists(maybe_path): return [maybe_path] return [] def folder_paths_get_supported_pt_extensions(folder_name, refresh = False): # Missing API function paths = folder_paths_folder_names_and_paths(refresh) if folder_name in paths: return paths[folder_name][1] model_extensions = copy.deepcopy(fallback_model_extensions) return model_extensions def get_safetensor_header(path): try: with open(path, "rb") as f: length_of_header = struct.unpack(" 0: # DEPTH-FIRST dir_path, dir_index = dir_stack.pop() dir_items = os.listdir(dir_path) dir_items = sorted(dir_items, key=str.casefold) dir_child_count = 0 # TODO: sort content of directory: alphabetically # TODO: sort content of directory: files first subdirs = [] for item_name in dir_items: # BREADTH-FIRST item_path = os.path.join(dir_path, item_name) if os.path.isdir(item_path): # dir subdir_index = len(dir_list) # this must be done BEFORE `dir_list.append` subdirs.append((item_path, subdir_index)) dir_list.append({ "name": item_name, "childIndex": None, "childCount": 0 }) dir_child_count += 1 else: # file _, file_extension = os.path.splitext(item_name) if extension_whitelist is None or file_extension in extension_whitelist: dir_list.append({ "name": item_name }) dir_child_count += 1 if dir_child_count > 0: dir_list[dir_index]["childIndex"] = len(dir_list) - dir_child_count dir_list[dir_index]["childCount"] = dir_child_count subdirs.reverse() for dir_path, subdir_index in subdirs: dir_stack.append((dir_path, subdir_index)) return dir_list @server.PromptServer.instance.routes.get("/model-manager/model-directory-list") async def directory_list(request): #body = await request.json() dir_list = linear_directory_hierarchy(True) #json.dump(dir_list, sys.stdout, indent=4) return web.json_response(dir_list) def_headers = { "User-Agent": "Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148" } def download_model_file(url, filename): dl_filename = filename + ".download" rh = requests.get( url=url, stream=True, verify=False, headers=def_headers, proxies=None ) print("temp file is " + dl_filename) total_size = int(rh.headers["Content-Length"]) basename, ext = os.path.splitext(filename) print("Start download {}, file size: {}".format(basename, total_size)) downloaded_size = 0 if os.path.exists(dl_filename): downloaded_size = os.path.getsize(download_file) headers = {"Range": "bytes=%d-" % downloaded_size} headers["User-Agent"] = def_headers["User-Agent"] r = requests.get(url=url, stream=True, verify=False, headers=headers, proxies=None) with open(dl_filename, "ab") as f: for chunk in r.iter_content(chunk_size=1024): if chunk is not None: downloaded_size += len(chunk) f.write(chunk) f.flush() progress = int(50 * downloaded_size / total_size) sys.stdout.reconfigure(encoding="utf-8") sys.stdout.write( "\r[%s%s] %d%%" % ( "-" * progress, " " * (50 - progress), 100 * downloaded_size / total_size, ) ) sys.stdout.flush() print() os.rename(dl_filename, filename) @server.PromptServer.instance.routes.post("/model-manager/download") async def download_file(request): body = await request.json() model_type = body.get("type") model_type_path = model_type_to_dir_name(model_type) if model_type_path is None: return web.json_response({"success": False}) download_uri = body.get("download") if download_uri is None: return web.json_response({"success": False}) model_name = body.get("name") file_name = os.path.join(comfyui_model_uri, model_type_path, model_name) download_model_file(download_uri, file_name) print("File download completed!") return web.json_response({"success": True}) WEB_DIRECTORY = "web" NODE_CLASS_MAPPINGS = {} __all__ = ["NODE_CLASS_MAPPINGS"]