From 0575124d355f468e7245b76ad2bc89976628cb7e Mon Sep 17 00:00:00 2001 From: Hayden <48267247+hayden-fr@users.noreply.github.com> Date: Thu, 30 Jan 2025 21:06:24 +0800 Subject: [PATCH] Refactor code structure (#106) * refactor: rename searcher.py to information.py * refactor: move the routes into each sub-modules * refactor: move services's func into sub-modules --- __init__.py | 273 +---------------------------- py/download.py | 83 +++++++++ py/{searcher.py => information.py} | 186 ++++++++++++++++---- py/manager.py | 210 ++++++++++++++++++++++ py/services.py | 193 -------------------- py/utils.py | 6 +- 6 files changed, 460 insertions(+), 491 deletions(-) rename py/{searcher.py => information.py} (56%) create mode 100644 py/manager.py delete mode 100644 py/services.py diff --git a/__init__.py b/__init__.py index 2c4cb23..efccd62 100644 --- a/__init__.py +++ b/__init__.py @@ -1,10 +1,10 @@ import os -import folder_paths from .py import config from .py import utils extension_uri = utils.normalize_path(os.path.dirname(__file__)) +# Install requirements requirements_path = utils.join_path(extension_uri, "requirements.txt") with open(requirements_path, "r", encoding="utf-8") as f: @@ -24,276 +24,21 @@ if len(uninstalled_package) > 0: # Init config settings config.extension_uri = extension_uri +# Try to download web distribution version = utils.get_current_version() utils.download_web_distribution(version) -from aiohttp import web -from .py import services - +# Add api routes +from .py import manager +from .py import download +from .py import information routes = config.routes - -@routes.get("/model-manager/download/task") -async def scan_download_tasks(request): - """ - Read download task list. - """ - try: - result = await services.scan_model_download_task_list() - return web.json_response({"success": True, "data": result}) - except Exception as e: - error_msg = f"Read download task list failed: {e}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.put("/model-manager/download/{task_id}") -async def resume_download_task(request): - """ - Toggle download task status. - """ - try: - task_id = request.match_info.get("task_id", None) - if task_id is None: - raise web.HTTPBadRequest(reason="Invalid task id") - json_data = await request.json() - status = json_data.get("status", None) - if status == "pause": - await services.pause_model_download_task(task_id) - elif status == "resume": - await services.resume_model_download_task(task_id, request) - else: - raise web.HTTPBadRequest(reason="Invalid status") - - return web.json_response({"success": True}) - except Exception as e: - error_msg = f"Resume download task failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.delete("/model-manager/download/{task_id}") -async def delete_model_download_task(request): - """ - Delete download task. - """ - task_id = request.match_info.get("task_id", None) - try: - await services.delete_model_download_task(task_id) - return web.json_response({"success": True}) - except Exception as e: - error_msg = f"Delete download task failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.get("/model-manager/base-folders") -@utils.deprecated(reason="Use `/model-manager/models` instead.") -async def get_model_paths(request): - """ - Returns the base folders for models. - """ - model_base_paths = utils.resolve_model_base_paths() - return web.json_response({"success": True, "data": model_base_paths}) - - -@routes.post("/model-manager/model") -async def create_model(request): - """ - Create a new model. - - request body: x-www-form-urlencoded - - type: model type. - - pathIndex: index of the model folders. - - fullname: filename that relative to the model folder. - - previewFile: preview file. - - description: description. - - downloadPlatform: download platform. - - downloadUrl: download url. - - hash: a JSON string containing the hash value of the downloaded model. - """ - task_data = await request.post() - task_data = dict(task_data) - try: - task_id = await services.create_model_download_task(task_data, request) - return web.json_response({"success": True, "data": {"taskId": task_id}}) - except Exception as e: - error_msg = f"Create model download task failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.get("/model-manager/models") -async def list_model_types(request): - """ - Scan all models and read their information. - """ - try: - result = utils.resolve_model_base_paths() - return web.json_response({"success": True, "data": result}) - except Exception as e: - error_msg = f"Read models failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.get("/model-manager/models/{folder}") -async def read_models(request): - try: - folder = request.match_info.get("folder", None) - results = services.scan_models(folder, request) - return web.json_response({"success": True, "data": results}) - except Exception as e: - error_msg = f"Read models failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.get("/model-manager/model/{type}/{index}/{filename:.*}") -async def read_model_info(request): - """ - Get the information of the specified model. - """ - model_type = request.match_info.get("type", None) - index = int(request.match_info.get("index", None)) - filename = request.match_info.get("filename", None) - - try: - model_path = utils.get_valid_full_path(model_type, index, filename) - result = services.get_model_info(model_path) - return web.json_response({"success": True, "data": result}) - except Exception as e: - error_msg = f"Read model info failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.put("/model-manager/model/{type}/{index}/{filename:.*}") -async def update_model(request): - """ - Update model information. - - request body: x-www-form-urlencoded - - previewFile: preview file. - - description: description. - - type: model type. - - pathIndex: index of the model folders. - - fullname: filename that relative to the model folder. - All fields are optional, but type, pathIndex and fullname must appear together. - """ - model_type = request.match_info.get("type", None) - index = int(request.match_info.get("index", None)) - filename = request.match_info.get("filename", None) - - model_data = await request.post() - model_data = dict(model_data) - - try: - model_path = utils.get_valid_full_path(model_type, index, filename) - if model_path is None: - raise RuntimeError(f"File {filename} not found") - services.update_model(model_path, model_data) - return web.json_response({"success": True}) - except Exception as e: - error_msg = f"Update model failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.delete("/model-manager/model/{type}/{index}/{filename:.*}") -async def delete_model(request): - """ - Delete model. - """ - model_type = request.match_info.get("type", None) - index = int(request.match_info.get("index", None)) - filename = request.match_info.get("filename", None) - - try: - model_path = utils.get_valid_full_path(model_type, index, filename) - if model_path is None: - raise RuntimeError(f"File {filename} not found") - services.remove_model(model_path) - return web.json_response({"success": True}) - except Exception as e: - error_msg = f"Delete model failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.get("/model-manager/model-info") -async def fetch_model_info(request): - """ - Fetch model information from network with model page. - """ - try: - model_page = request.query.get("model-page", None) - result = services.fetch_model_info(model_page) - return web.json_response({"success": True, "data": result}) - except Exception as e: - error_msg = f"Fetch model info failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.post("/model-manager/model-info/scan") -async def download_model_info(request): - """ - Create a task to download model information. - """ - post = await utils.get_request_body(request) - try: - scan_mode = post.get("scanMode", "diff") - await services.download_model_info(scan_mode, request) - return web.json_response({"success": True}) - except Exception as e: - error_msg = f"Download model info failed: {str(e)}" - utils.print_error(error_msg) - return web.json_response({"success": False, "error": error_msg}) - - -@routes.get("/model-manager/preview/{type}/{index}/{filename:.*}") -async def read_model_preview(request): - """ - Get the file stream of the specified image. - If the file does not exist, no-preview.png is returned. - - :param type: The type of the model. eg.checkpoints, loras, vae, etc. - :param index: The index of the model folders. - :param filename: The filename of the image. - """ - model_type = request.match_info.get("type", None) - index = int(request.match_info.get("index", None)) - filename = request.match_info.get("filename", None) - - extension_uri = config.extension_uri - - try: - folders = folder_paths.get_folder_paths(model_type) - base_path = folders[index] - abs_path = utils.join_path(base_path, filename) - except: - abs_path = extension_uri - - if not os.path.isfile(abs_path): - abs_path = utils.join_path(extension_uri, "assets", "no-preview.png") - return web.FileResponse(abs_path) - - -@routes.get("/model-manager/preview/download/{filename}") -async def read_download_preview(request): - filename = request.match_info.get("filename", None) - extension_uri = config.extension_uri - - download_path = utils.get_download_path() - preview_path = utils.join_path(download_path, filename) - - if not os.path.isfile(preview_path): - preview_path = utils.join_path(extension_uri, "assets", "no-preview.png") - - return web.FileResponse(preview_path) +manager.ModelManager().add_routes(routes) +download.ModelDownload().add_routes(routes) +information.Information().add_routes(routes) WEB_DIRECTORY = "web" diff --git a/py/download.py b/py/download.py index 49c3ec6..b7f15e0 100644 --- a/py/download.py +++ b/py/download.py @@ -403,3 +403,86 @@ async def download_model_file( else: task_status.status = "pause" await utils.send_json("update_download_task", task_status.to_dict()) + + +from aiohttp import web + + +class ModelDownload: + def add_routes(self, routes): + + @routes.get("/model-manager/download/task") + async def scan_download_tasks(request): + """ + Read download task list. + """ + try: + result = await scan_model_download_task_list() + return web.json_response({"success": True, "data": result}) + except Exception as e: + error_msg = f"Read download task list failed: {e}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.put("/model-manager/download/{task_id}") + async def resume_download_task(request): + """ + Toggle download task status. + """ + try: + task_id = request.match_info.get("task_id", None) + if task_id is None: + raise web.HTTPBadRequest(reason="Invalid task id") + json_data = await request.json() + status = json_data.get("status", None) + if status == "pause": + await pause_model_download_task(task_id) + elif status == "resume": + await download_model(task_id, request) + else: + raise web.HTTPBadRequest(reason="Invalid status") + + return web.json_response({"success": True}) + except Exception as e: + error_msg = f"Resume download task failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.delete("/model-manager/download/{task_id}") + async def delete_model_download_task(request): + """ + Delete download task. + """ + task_id = request.match_info.get("task_id", None) + try: + await delete_model_download_task(task_id) + return web.json_response({"success": True}) + except Exception as e: + error_msg = f"Delete download task failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.post("/model-manager/model") + async def create_model(request): + """ + Create a new model. + + request body: x-www-form-urlencoded + - type: model type. + - pathIndex: index of the model folders. + - fullname: filename that relative to the model folder. + - previewFile: preview file. + - description: description. + - downloadPlatform: download platform. + - downloadUrl: download url. + - hash: a JSON string containing the hash value of the downloaded model. + """ + task_data = await request.post() + task_data = dict(task_data) + try: + task_id = await create_model_download_task(task_data, request) + return web.json_response({"success": True, "data": {"taskId": task_id}}) + except Exception as e: + error_msg = f"Create model download task failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) diff --git a/py/searcher.py b/py/information.py similarity index 56% rename from py/searcher.py rename to py/information.py index a2a1926..3a804a8 100644 --- a/py/searcher.py +++ b/py/information.py @@ -27,9 +27,7 @@ class ModelSearcher(ABC): class UnknownWebsiteSearcher(ModelSearcher): def search_by_url(self, url: str): - raise RuntimeError( - f"Unknown Website, please input a URL from huggingface.co or civitai.com." - ) + raise RuntimeError(f"Unknown Website, please input a URL from huggingface.co or civitai.com.") def search_by_hash(self, hash: str): raise RuntimeError(f"Unknown Website, unable to search with hash value.") @@ -87,29 +85,15 @@ class CivitaiModelSearcher(ModelSearcher): description_parts.append("") description_parts.append(f"# Trigger Words") description_parts.append("") - description_parts.append( - ", ".join(version.get("trainedWords", ["No trigger words"])) - ) + description_parts.append(", ".join(version.get("trainedWords", ["No trigger words"]))) description_parts.append("") description_parts.append(f"# About this version") description_parts.append("") - description_parts.append( - markdownify.markdownify( - version.get( - "description", "

No description about this version

" - ) - ).strip() - ) + description_parts.append(markdownify.markdownify(version.get("description", "

No description about this version

")).strip()) description_parts.append("") description_parts.append(f"# {res_data.get('name')}") description_parts.append("") - description_parts.append( - markdownify.markdownify( - res_data.get( - "description", "

No description about this model

" - ) - ).strip() - ) + description_parts.append(markdownify.markdownify(res_data.get("description", "

No description about this model

")).strip()) description_parts.append("") model = { @@ -136,18 +120,14 @@ class CivitaiModelSearcher(ModelSearcher): if not hash: raise RuntimeError(f"Hash value is empty.") - response = requests.get( - f"https://civitai.com/api/v1/model-versions/by-hash/{hash}" - ) + response = requests.get(f"https://civitai.com/api/v1/model-versions/by-hash/{hash}") response.raise_for_status() version: dict = response.json() model_id = version.get("modelId") version_id = version.get("id") - model_page = ( - f"https://civitai.com/models/{model_id}?modelVersionId={version_id}" - ) + model_page = f"https://civitai.com/models/{model_id}?modelVersionId={version_id}" models = self.search_by_url(model_page) @@ -186,9 +166,7 @@ class HuggingfaceModelSearcher(ModelSearcher): response.raise_for_status() res_data: dict = response.json() - sibling_files: list[str] = [ - x.get("rfilename") for x in res_data.get("siblings", []) - ] + sibling_files: list[str] = [x.get("rfilename") for x in res_data.get("siblings", [])] model_files = utils.filter_with( utils.filter_with(sibling_files, self._match_model_files()), @@ -199,10 +177,7 @@ class HuggingfaceModelSearcher(ModelSearcher): utils.filter_with(sibling_files, self._match_image_files()), self._match_tree_files(rest_pathname), ) - image_files = [ - f"https://huggingface.co/{model_id}/resolve/main/{filename}" - for filename in image_files - ] + image_files = [f"https://huggingface.co/{model_id}/resolve/main/{filename}" for filename in image_files] models: list[dict] = [] @@ -315,3 +290,148 @@ def get_model_searcher_by_url(url: str) -> ModelSearcher: elif host_name == "huggingface.co": return HuggingfaceModelSearcher() return UnknownWebsiteSearcher() + + +import folder_paths + + +from . import config + + +from aiohttp import web + + +class Information: + def add_routes(self, routes): + + @routes.get("/model-manager/model-info") + async def fetch_model_info(request): + """ + Fetch model information from network with model page. + """ + try: + model_page = request.query.get("model-page", None) + result = self.fetch_model_info(model_page) + return web.json_response({"success": True, "data": result}) + except Exception as e: + error_msg = f"Fetch model info failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.post("/model-manager/model-info/scan") + async def download_model_info(request): + """ + Create a task to download model information. + """ + post = await utils.get_request_body(request) + try: + scan_mode = post.get("scanMode", "diff") + await self.download_model_info(scan_mode, request) + return web.json_response({"success": True}) + except Exception as e: + error_msg = f"Download model info failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.get("/model-manager/preview/{type}/{index}/{filename:.*}") + async def read_model_preview(request): + """ + Get the file stream of the specified image. + If the file does not exist, no-preview.png is returned. + + :param type: The type of the model. eg.checkpoints, loras, vae, etc. + :param index: The index of the model folders. + :param filename: The filename of the image. + """ + model_type = request.match_info.get("type", None) + index = int(request.match_info.get("index", None)) + filename = request.match_info.get("filename", None) + + extension_uri = config.extension_uri + + try: + folders = folder_paths.get_folder_paths(model_type) + base_path = folders[index] + abs_path = utils.join_path(base_path, filename) + except: + abs_path = extension_uri + + if not os.path.isfile(abs_path): + abs_path = utils.join_path(extension_uri, "assets", "no-preview.png") + return web.FileResponse(abs_path) + + @routes.get("/model-manager/preview/download/{filename}") + async def read_download_preview(request): + filename = request.match_info.get("filename", None) + extension_uri = config.extension_uri + + download_path = utils.get_download_path() + preview_path = utils.join_path(download_path, filename) + + if not os.path.isfile(preview_path): + preview_path = utils.join_path(extension_uri, "assets", "no-preview.png") + + return web.FileResponse(preview_path) + + def fetch_model_info(self, model_page: str): + if not model_page: + return [] + + model_searcher = get_model_searcher_by_url(model_page) + result = model_searcher.search_by_url(model_page) + return result + + async def download_model_info(self, scan_mode: str, request): + utils.print_info(f"Download model info for {scan_mode}") + model_base_paths = utils.resolve_model_base_paths() + for model_type in model_base_paths: + + folders, extensions = folder_paths.folder_names_and_paths[model_type] + for path_index, base_path in enumerate(folders): + files = utils.recursive_search_files(base_path, request) + + models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions) + + for fullname in models: + fullname = utils.normalize_path(fullname) + basename = os.path.splitext(fullname)[0] + + abs_model_path = utils.join_path(base_path, fullname) + + image_name = utils.get_model_preview_name(abs_model_path) + abs_image_path = utils.join_path(base_path, image_name) + + has_preview = os.path.isfile(abs_image_path) + + description_name = utils.get_model_description_name(abs_model_path) + abs_description_path = utils.join_path(base_path, description_name) if description_name else None + has_description = os.path.isfile(abs_description_path) if abs_description_path else False + + try: + + utils.print_info(f"Checking model {abs_model_path}") + utils.print_debug(f"Scan mode: {scan_mode}") + utils.print_debug(f"Has preview: {has_preview}") + utils.print_debug(f"Has description: {has_description}") + + if scan_mode != "full" and (has_preview and has_description): + continue + + utils.print_debug(f"Calculate sha256 for {abs_model_path}") + hash_value = utils.calculate_sha256(abs_model_path) + utils.print_info(f"Searching model info by hash {hash_value}") + model_info = CivitaiModelSearcher().search_by_hash(hash_value) + + preview_url_list = model_info.get("preview", []) + preview_image_url = preview_url_list[0] if preview_url_list else None + if preview_image_url: + utils.print_debug(f"Save preview image to {abs_image_path}") + utils.save_model_preview_image(abs_model_path, preview_image_url) + + description = model_info.get("description", None) + if description: + utils.save_model_description(abs_model_path, description) + except Exception as e: + utils.print_error(f"Failed to download model info for {abs_model_path}: {e}") + + utils.print_debug("Completed scan model information.") diff --git a/py/manager.py b/py/manager.py new file mode 100644 index 0000000..0d52a0b --- /dev/null +++ b/py/manager.py @@ -0,0 +1,210 @@ +import os +import folder_paths +from aiohttp import web + + +from . import utils + + +class ModelManager: + + def add_routes(self, routes): + + @routes.get("/model-manager/base-folders") + @utils.deprecated(reason="Use `/model-manager/models` instead.") + async def get_model_paths(request): + """ + Returns the base folders for models. + """ + model_base_paths = utils.resolve_model_base_paths() + return web.json_response({"success": True, "data": model_base_paths}) + + @routes.get("/model-manager/models") + async def get_folders(request): + """ + Returns the base folders for models. + """ + try: + result = utils.resolve_model_base_paths() + return web.json_response({"success": True, "data": result}) + except Exception as e: + error_msg = f"Read models failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.get("/model-manager/models/{folder}") + async def get_folder_models(request): + try: + folder = request.match_info.get("folder", None) + results = self.scan_models(folder, request) + return web.json_response({"success": True, "data": results}) + except Exception as e: + error_msg = f"Read models failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.get("/model-manager/model/{type}/{index}/{filename:.*}") + async def get_model_info(request): + """ + Get the information of the specified model. + """ + model_type = request.match_info.get("type", None) + path_index = int(request.match_info.get("index", None)) + filename = request.match_info.get("filename", None) + + try: + model_path = utils.get_valid_full_path(model_type, path_index, filename) + result = self.get_model_info(model_path) + return web.json_response({"success": True, "data": result}) + except Exception as e: + error_msg = f"Read model info failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.put("/model-manager/model/{type}/{index}/{filename:.*}") + async def update_model(request): + """ + Update model information. + + request body: x-www-form-urlencoded + - previewFile: preview file. + - description: description. + - type: model type. + - pathIndex: index of the model folders. + - fullname: filename that relative to the model folder. + All fields are optional, but type, pathIndex and fullname must appear together. + """ + model_type = request.match_info.get("type", None) + path_index = int(request.match_info.get("index", None)) + filename = request.match_info.get("filename", None) + + model_data = await request.post() + model_data = dict(model_data) + + try: + model_path = utils.get_valid_full_path(model_type, path_index, filename) + if model_path is None: + raise RuntimeError(f"File {filename} not found") + self.update_model(model_path, model_data) + return web.json_response({"success": True}) + except Exception as e: + error_msg = f"Update model failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + @routes.delete("/model-manager/model/{type}/{index}/{filename:.*}") + async def delete_model(request): + """ + Delete model. + """ + model_type = request.match_info.get("type", None) + path_index = int(request.match_info.get("index", None)) + filename = request.match_info.get("filename", None) + + try: + model_path = utils.get_valid_full_path(model_type, path_index, filename) + if model_path is None: + raise RuntimeError(f"File {filename} not found") + self.remove_model(model_path) + return web.json_response({"success": True}) + except Exception as e: + error_msg = f"Delete model failed: {str(e)}" + utils.print_error(error_msg) + return web.json_response({"success": False, "error": error_msg}) + + def scan_models(self, folder: str, request): + result = [] + + folders, extensions = folder_paths.folder_names_and_paths[folder] + for path_index, base_path in enumerate(folders): + files = utils.recursive_search_files(base_path, request) + + models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions) + + for fullname in models: + fullname = utils.normalize_path(fullname) + basename = os.path.splitext(fullname)[0] + extension = os.path.splitext(fullname)[1] + + abs_path = utils.join_path(base_path, fullname) + file_stats = os.stat(abs_path) + + # Resolve preview + image_name = utils.get_model_preview_name(abs_path) + image_name = utils.join_path(os.path.dirname(fullname), image_name) + abs_image_path = utils.join_path(base_path, image_name) + if os.path.isfile(abs_image_path): + image_state = os.stat(abs_image_path) + image_timestamp = round(image_state.st_mtime_ns / 1000000) + image_name = f"{image_name}?ts={image_timestamp}" + model_preview = f"/model-manager/preview/{folder}/{path_index}/{image_name}" + + model_info = { + "fullname": fullname, + "basename": basename, + "extension": extension, + "type": folder, + "pathIndex": path_index, + "sizeBytes": file_stats.st_size, + "preview": model_preview, + "createdAt": round(file_stats.st_ctime_ns / 1000000), + "updatedAt": round(file_stats.st_mtime_ns / 1000000), + } + + result.append(model_info) + + return result + + def get_model_info(self, model_path: str): + directory = os.path.dirname(model_path) + + metadata = utils.get_model_metadata(model_path) + + description_file = utils.get_model_description_name(model_path) + description_file = utils.join_path(directory, description_file) + description = None + if os.path.isfile(description_file): + with open(description_file, "r", encoding="utf-8", newline="") as f: + description = f.read() + + return { + "metadata": metadata, + "description": description, + } + + def update_model(self, model_path: str, model_data: dict): + + if "previewFile" in model_data: + previewFile = model_data["previewFile"] + if type(previewFile) is str and previewFile == "undefined": + utils.remove_model_preview_image(model_path) + else: + utils.save_model_preview_image(model_path, previewFile) + + if "description" in model_data: + description = model_data["description"] + utils.save_model_description(model_path, description) + + if "type" in model_data and "pathIndex" in model_data and "fullname" in model_data: + model_type = model_data.get("type", None) + path_index = int(model_data.get("pathIndex", None)) + fullname = model_data.get("fullname", None) + if model_type is None or path_index is None or fullname is None: + raise RuntimeError("Invalid type or pathIndex or fullname") + + # get new path + new_model_path = utils.get_full_path(model_type, path_index, fullname) + + utils.rename_model(model_path, new_model_path) + + def remove_model(self, model_path: str): + model_dirname = os.path.dirname(model_path) + os.remove(model_path) + + model_previews = utils.get_model_all_images(model_path) + for preview in model_previews: + os.remove(utils.join_path(model_dirname, preview)) + + model_descriptions = utils.get_model_all_descriptions(model_path) + for description in model_descriptions: + os.remove(utils.join_path(model_dirname, description)) diff --git a/py/services.py b/py/services.py deleted file mode 100644 index 3768aaa..0000000 --- a/py/services.py +++ /dev/null @@ -1,193 +0,0 @@ -import os - -import folder_paths - -from . import utils -from . import download -from . import searcher - - -def scan_models(folder: str, request): - result = [] - - folders, extensions = folder_paths.folder_names_and_paths[folder] - for path_index, base_path in enumerate(folders): - files = utils.recursive_search_files(base_path, request) - - models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions) - - for fullname in models: - fullname = utils.normalize_path(fullname) - basename = os.path.splitext(fullname)[0] - extension = os.path.splitext(fullname)[1] - - abs_path = utils.join_path(base_path, fullname) - file_stats = os.stat(abs_path) - - # Resolve preview - image_name = utils.get_model_preview_name(abs_path) - image_name = utils.join_path(os.path.dirname(fullname), image_name) - abs_image_path = utils.join_path(base_path, image_name) - if os.path.isfile(abs_image_path): - image_state = os.stat(abs_image_path) - image_timestamp = round(image_state.st_mtime_ns / 1000000) - image_name = f"{image_name}?ts={image_timestamp}" - model_preview = f"/model-manager/preview/{folder}/{path_index}/{image_name}" - - model_info = { - "fullname": fullname, - "basename": basename, - "extension": extension, - "type": folder, - "pathIndex": path_index, - "sizeBytes": file_stats.st_size, - "preview": model_preview, - "createdAt": round(file_stats.st_ctime_ns / 1000000), - "updatedAt": round(file_stats.st_mtime_ns / 1000000), - } - - result.append(model_info) - - return result - - -def get_model_info(model_path: str): - directory = os.path.dirname(model_path) - - metadata = utils.get_model_metadata(model_path) - - description_file = utils.get_model_description_name(model_path) - description_file = utils.join_path(directory, description_file) - description = None - if os.path.isfile(description_file): - with open(description_file, "r", encoding="utf-8", newline="") as f: - description = f.read() - - return { - "metadata": metadata, - "description": description, - } - - -def update_model(model_path: str, model_data: dict): - - if "previewFile" in model_data: - previewFile = model_data["previewFile"] - if type(previewFile) is str and previewFile == "undefined": - utils.remove_model_preview_image(model_path) - else: - utils.save_model_preview_image(model_path, previewFile) - - if "description" in model_data: - description = model_data["description"] - utils.save_model_description(model_path, description) - - if "type" in model_data and "pathIndex" in model_data and "fullname" in model_data: - model_type = model_data.get("type", None) - path_index = int(model_data.get("pathIndex", None)) - fullname = model_data.get("fullname", None) - if model_type is None or path_index is None or fullname is None: - raise RuntimeError("Invalid type or pathIndex or fullname") - - # get new path - new_model_path = utils.get_full_path(model_type, path_index, fullname) - - utils.rename_model(model_path, new_model_path) - - -def remove_model(model_path: str): - model_dirname = os.path.dirname(model_path) - os.remove(model_path) - - model_previews = utils.get_model_all_images(model_path) - for preview in model_previews: - os.remove(utils.join_path(model_dirname, preview)) - - model_descriptions = utils.get_model_all_descriptions(model_path) - for description in model_descriptions: - os.remove(utils.join_path(model_dirname, description)) - - -async def create_model_download_task(task_data, request): - return await download.create_model_download_task(task_data, request) - - -async def scan_model_download_task_list(): - return await download.scan_model_download_task_list() - - -async def pause_model_download_task(task_id): - return await download.pause_model_download_task(task_id) - - -async def resume_model_download_task(task_id, request): - return await download.download_model(task_id, request) - - -async def delete_model_download_task(task_id): - return await download.delete_model_download_task(task_id) - - -def fetch_model_info(model_page: str): - if not model_page: - return [] - - model_searcher = searcher.get_model_searcher_by_url(model_page) - result = model_searcher.search_by_url(model_page) - return result - - -async def download_model_info(scan_mode: str, request): - utils.print_info(f"Download model info for {scan_mode}") - model_base_paths = utils.resolve_model_base_paths() - for model_type in model_base_paths: - - folders, extensions = folder_paths.folder_names_and_paths[model_type] - for path_index, base_path in enumerate(folders): - files = utils.recursive_search_files(base_path, request) - - models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions) - - for fullname in models: - fullname = utils.normalize_path(fullname) - basename = os.path.splitext(fullname)[0] - - abs_model_path = utils.join_path(base_path, fullname) - - image_name = utils.get_model_preview_name(abs_model_path) - abs_image_path = utils.join_path(base_path, image_name) - - has_preview = os.path.isfile(abs_image_path) - - description_name = utils.get_model_description_name(abs_model_path) - abs_description_path = utils.join_path(base_path, description_name) if description_name else None - has_description = os.path.isfile(abs_description_path) if abs_description_path else False - - try: - - utils.print_info(f"Checking model {abs_model_path}") - utils.print_debug(f"Scan mode: {scan_mode}") - utils.print_debug(f"Has preview: {has_preview}") - utils.print_debug(f"Has description: {has_description}") - - if scan_mode != "full" and (has_preview and has_description): - continue - - utils.print_debug(f"Calculate sha256 for {abs_model_path}") - hash_value = utils.calculate_sha256(abs_model_path) - utils.print_info(f"Searching model info by hash {hash_value}") - model_info = searcher.CivitaiModelSearcher().search_by_hash(hash_value) - - preview_url_list = model_info.get("preview", []) - preview_image_url = preview_url_list[0] if preview_url_list else None - if preview_image_url: - utils.print_debug(f"Save preview image to {abs_image_path}") - utils.save_model_preview_image(abs_model_path, preview_image_url) - - description = model_info.get("description", None) - if description: - utils.save_model_description(abs_model_path, description) - except Exception as e: - utils.print_error(f"Failed to download model info for {abs_model_path}: {e}") - - utils.print_debug("Completed scan model information.") diff --git a/py/utils.py b/py/utils.py index 5ae9b01..040c74f 100644 --- a/py/utils.py +++ b/py/utils.py @@ -133,7 +133,11 @@ def download_web_distribution(version: str): print_error(f"An unexpected error occurred: {e}") -def resolve_model_base_paths(): +def resolve_model_base_paths() -> dict[str, list[str]]: + """ + Resolve model base paths. + eg. { "checkpoints": ["path/to/checkpoints"] } + """ folders = list(folder_paths.folder_names_and_paths.keys()) model_base_paths = {} folder_black_list = ["configs", "custom_nodes"]