Refactor code structure (#106)
* refactor: rename searcher.py to information.py * refactor: move the routes into each sub-modules * refactor: move services's func into sub-modules
This commit is contained in:
@@ -403,3 +403,86 @@ async def download_model_file(
|
||||
else:
|
||||
task_status.status = "pause"
|
||||
await utils.send_json("update_download_task", task_status.to_dict())
|
||||
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
|
||||
class ModelDownload:
|
||||
def add_routes(self, routes):
|
||||
|
||||
@routes.get("/model-manager/download/task")
|
||||
async def scan_download_tasks(request):
|
||||
"""
|
||||
Read download task list.
|
||||
"""
|
||||
try:
|
||||
result = await scan_model_download_task_list()
|
||||
return web.json_response({"success": True, "data": result})
|
||||
except Exception as e:
|
||||
error_msg = f"Read download task list failed: {e}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.put("/model-manager/download/{task_id}")
|
||||
async def resume_download_task(request):
|
||||
"""
|
||||
Toggle download task status.
|
||||
"""
|
||||
try:
|
||||
task_id = request.match_info.get("task_id", None)
|
||||
if task_id is None:
|
||||
raise web.HTTPBadRequest(reason="Invalid task id")
|
||||
json_data = await request.json()
|
||||
status = json_data.get("status", None)
|
||||
if status == "pause":
|
||||
await pause_model_download_task(task_id)
|
||||
elif status == "resume":
|
||||
await download_model(task_id, request)
|
||||
else:
|
||||
raise web.HTTPBadRequest(reason="Invalid status")
|
||||
|
||||
return web.json_response({"success": True})
|
||||
except Exception as e:
|
||||
error_msg = f"Resume download task failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.delete("/model-manager/download/{task_id}")
|
||||
async def delete_model_download_task(request):
|
||||
"""
|
||||
Delete download task.
|
||||
"""
|
||||
task_id = request.match_info.get("task_id", None)
|
||||
try:
|
||||
await delete_model_download_task(task_id)
|
||||
return web.json_response({"success": True})
|
||||
except Exception as e:
|
||||
error_msg = f"Delete download task failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.post("/model-manager/model")
|
||||
async def create_model(request):
|
||||
"""
|
||||
Create a new model.
|
||||
|
||||
request body: x-www-form-urlencoded
|
||||
- type: model type.
|
||||
- pathIndex: index of the model folders.
|
||||
- fullname: filename that relative to the model folder.
|
||||
- previewFile: preview file.
|
||||
- description: description.
|
||||
- downloadPlatform: download platform.
|
||||
- downloadUrl: download url.
|
||||
- hash: a JSON string containing the hash value of the downloaded model.
|
||||
"""
|
||||
task_data = await request.post()
|
||||
task_data = dict(task_data)
|
||||
try:
|
||||
task_id = await create_model_download_task(task_data, request)
|
||||
return web.json_response({"success": True, "data": {"taskId": task_id}})
|
||||
except Exception as e:
|
||||
error_msg = f"Create model download task failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@@ -27,9 +27,7 @@ class ModelSearcher(ABC):
|
||||
|
||||
class UnknownWebsiteSearcher(ModelSearcher):
|
||||
def search_by_url(self, url: str):
|
||||
raise RuntimeError(
|
||||
f"Unknown Website, please input a URL from huggingface.co or civitai.com."
|
||||
)
|
||||
raise RuntimeError(f"Unknown Website, please input a URL from huggingface.co or civitai.com.")
|
||||
|
||||
def search_by_hash(self, hash: str):
|
||||
raise RuntimeError(f"Unknown Website, unable to search with hash value.")
|
||||
@@ -87,29 +85,15 @@ class CivitaiModelSearcher(ModelSearcher):
|
||||
description_parts.append("")
|
||||
description_parts.append(f"# Trigger Words")
|
||||
description_parts.append("")
|
||||
description_parts.append(
|
||||
", ".join(version.get("trainedWords", ["No trigger words"]))
|
||||
)
|
||||
description_parts.append(", ".join(version.get("trainedWords", ["No trigger words"])))
|
||||
description_parts.append("")
|
||||
description_parts.append(f"# About this version")
|
||||
description_parts.append("")
|
||||
description_parts.append(
|
||||
markdownify.markdownify(
|
||||
version.get(
|
||||
"description", "<p>No description about this version</p>"
|
||||
)
|
||||
).strip()
|
||||
)
|
||||
description_parts.append(markdownify.markdownify(version.get("description", "<p>No description about this version</p>")).strip())
|
||||
description_parts.append("")
|
||||
description_parts.append(f"# {res_data.get('name')}")
|
||||
description_parts.append("")
|
||||
description_parts.append(
|
||||
markdownify.markdownify(
|
||||
res_data.get(
|
||||
"description", "<p>No description about this model</p>"
|
||||
)
|
||||
).strip()
|
||||
)
|
||||
description_parts.append(markdownify.markdownify(res_data.get("description", "<p>No description about this model</p>")).strip())
|
||||
description_parts.append("")
|
||||
|
||||
model = {
|
||||
@@ -136,18 +120,14 @@ class CivitaiModelSearcher(ModelSearcher):
|
||||
if not hash:
|
||||
raise RuntimeError(f"Hash value is empty.")
|
||||
|
||||
response = requests.get(
|
||||
f"https://civitai.com/api/v1/model-versions/by-hash/{hash}"
|
||||
)
|
||||
response = requests.get(f"https://civitai.com/api/v1/model-versions/by-hash/{hash}")
|
||||
response.raise_for_status()
|
||||
version: dict = response.json()
|
||||
|
||||
model_id = version.get("modelId")
|
||||
version_id = version.get("id")
|
||||
|
||||
model_page = (
|
||||
f"https://civitai.com/models/{model_id}?modelVersionId={version_id}"
|
||||
)
|
||||
model_page = f"https://civitai.com/models/{model_id}?modelVersionId={version_id}"
|
||||
|
||||
models = self.search_by_url(model_page)
|
||||
|
||||
@@ -186,9 +166,7 @@ class HuggingfaceModelSearcher(ModelSearcher):
|
||||
response.raise_for_status()
|
||||
res_data: dict = response.json()
|
||||
|
||||
sibling_files: list[str] = [
|
||||
x.get("rfilename") for x in res_data.get("siblings", [])
|
||||
]
|
||||
sibling_files: list[str] = [x.get("rfilename") for x in res_data.get("siblings", [])]
|
||||
|
||||
model_files = utils.filter_with(
|
||||
utils.filter_with(sibling_files, self._match_model_files()),
|
||||
@@ -199,10 +177,7 @@ class HuggingfaceModelSearcher(ModelSearcher):
|
||||
utils.filter_with(sibling_files, self._match_image_files()),
|
||||
self._match_tree_files(rest_pathname),
|
||||
)
|
||||
image_files = [
|
||||
f"https://huggingface.co/{model_id}/resolve/main/{filename}"
|
||||
for filename in image_files
|
||||
]
|
||||
image_files = [f"https://huggingface.co/{model_id}/resolve/main/{filename}" for filename in image_files]
|
||||
|
||||
models: list[dict] = []
|
||||
|
||||
@@ -315,3 +290,148 @@ def get_model_searcher_by_url(url: str) -> ModelSearcher:
|
||||
elif host_name == "huggingface.co":
|
||||
return HuggingfaceModelSearcher()
|
||||
return UnknownWebsiteSearcher()
|
||||
|
||||
|
||||
import folder_paths
|
||||
|
||||
|
||||
from . import config
|
||||
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
|
||||
class Information:
|
||||
def add_routes(self, routes):
|
||||
|
||||
@routes.get("/model-manager/model-info")
|
||||
async def fetch_model_info(request):
|
||||
"""
|
||||
Fetch model information from network with model page.
|
||||
"""
|
||||
try:
|
||||
model_page = request.query.get("model-page", None)
|
||||
result = self.fetch_model_info(model_page)
|
||||
return web.json_response({"success": True, "data": result})
|
||||
except Exception as e:
|
||||
error_msg = f"Fetch model info failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.post("/model-manager/model-info/scan")
|
||||
async def download_model_info(request):
|
||||
"""
|
||||
Create a task to download model information.
|
||||
"""
|
||||
post = await utils.get_request_body(request)
|
||||
try:
|
||||
scan_mode = post.get("scanMode", "diff")
|
||||
await self.download_model_info(scan_mode, request)
|
||||
return web.json_response({"success": True})
|
||||
except Exception as e:
|
||||
error_msg = f"Download model info failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.get("/model-manager/preview/{type}/{index}/{filename:.*}")
|
||||
async def read_model_preview(request):
|
||||
"""
|
||||
Get the file stream of the specified image.
|
||||
If the file does not exist, no-preview.png is returned.
|
||||
|
||||
:param type: The type of the model. eg.checkpoints, loras, vae, etc.
|
||||
:param index: The index of the model folders.
|
||||
:param filename: The filename of the image.
|
||||
"""
|
||||
model_type = request.match_info.get("type", None)
|
||||
index = int(request.match_info.get("index", None))
|
||||
filename = request.match_info.get("filename", None)
|
||||
|
||||
extension_uri = config.extension_uri
|
||||
|
||||
try:
|
||||
folders = folder_paths.get_folder_paths(model_type)
|
||||
base_path = folders[index]
|
||||
abs_path = utils.join_path(base_path, filename)
|
||||
except:
|
||||
abs_path = extension_uri
|
||||
|
||||
if not os.path.isfile(abs_path):
|
||||
abs_path = utils.join_path(extension_uri, "assets", "no-preview.png")
|
||||
return web.FileResponse(abs_path)
|
||||
|
||||
@routes.get("/model-manager/preview/download/{filename}")
|
||||
async def read_download_preview(request):
|
||||
filename = request.match_info.get("filename", None)
|
||||
extension_uri = config.extension_uri
|
||||
|
||||
download_path = utils.get_download_path()
|
||||
preview_path = utils.join_path(download_path, filename)
|
||||
|
||||
if not os.path.isfile(preview_path):
|
||||
preview_path = utils.join_path(extension_uri, "assets", "no-preview.png")
|
||||
|
||||
return web.FileResponse(preview_path)
|
||||
|
||||
def fetch_model_info(self, model_page: str):
|
||||
if not model_page:
|
||||
return []
|
||||
|
||||
model_searcher = get_model_searcher_by_url(model_page)
|
||||
result = model_searcher.search_by_url(model_page)
|
||||
return result
|
||||
|
||||
async def download_model_info(self, scan_mode: str, request):
|
||||
utils.print_info(f"Download model info for {scan_mode}")
|
||||
model_base_paths = utils.resolve_model_base_paths()
|
||||
for model_type in model_base_paths:
|
||||
|
||||
folders, extensions = folder_paths.folder_names_and_paths[model_type]
|
||||
for path_index, base_path in enumerate(folders):
|
||||
files = utils.recursive_search_files(base_path, request)
|
||||
|
||||
models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions)
|
||||
|
||||
for fullname in models:
|
||||
fullname = utils.normalize_path(fullname)
|
||||
basename = os.path.splitext(fullname)[0]
|
||||
|
||||
abs_model_path = utils.join_path(base_path, fullname)
|
||||
|
||||
image_name = utils.get_model_preview_name(abs_model_path)
|
||||
abs_image_path = utils.join_path(base_path, image_name)
|
||||
|
||||
has_preview = os.path.isfile(abs_image_path)
|
||||
|
||||
description_name = utils.get_model_description_name(abs_model_path)
|
||||
abs_description_path = utils.join_path(base_path, description_name) if description_name else None
|
||||
has_description = os.path.isfile(abs_description_path) if abs_description_path else False
|
||||
|
||||
try:
|
||||
|
||||
utils.print_info(f"Checking model {abs_model_path}")
|
||||
utils.print_debug(f"Scan mode: {scan_mode}")
|
||||
utils.print_debug(f"Has preview: {has_preview}")
|
||||
utils.print_debug(f"Has description: {has_description}")
|
||||
|
||||
if scan_mode != "full" and (has_preview and has_description):
|
||||
continue
|
||||
|
||||
utils.print_debug(f"Calculate sha256 for {abs_model_path}")
|
||||
hash_value = utils.calculate_sha256(abs_model_path)
|
||||
utils.print_info(f"Searching model info by hash {hash_value}")
|
||||
model_info = CivitaiModelSearcher().search_by_hash(hash_value)
|
||||
|
||||
preview_url_list = model_info.get("preview", [])
|
||||
preview_image_url = preview_url_list[0] if preview_url_list else None
|
||||
if preview_image_url:
|
||||
utils.print_debug(f"Save preview image to {abs_image_path}")
|
||||
utils.save_model_preview_image(abs_model_path, preview_image_url)
|
||||
|
||||
description = model_info.get("description", None)
|
||||
if description:
|
||||
utils.save_model_description(abs_model_path, description)
|
||||
except Exception as e:
|
||||
utils.print_error(f"Failed to download model info for {abs_model_path}: {e}")
|
||||
|
||||
utils.print_debug("Completed scan model information.")
|
||||
210
py/manager.py
Normal file
210
py/manager.py
Normal file
@@ -0,0 +1,210 @@
|
||||
import os
|
||||
import folder_paths
|
||||
from aiohttp import web
|
||||
|
||||
|
||||
from . import utils
|
||||
|
||||
|
||||
class ModelManager:
|
||||
|
||||
def add_routes(self, routes):
|
||||
|
||||
@routes.get("/model-manager/base-folders")
|
||||
@utils.deprecated(reason="Use `/model-manager/models` instead.")
|
||||
async def get_model_paths(request):
|
||||
"""
|
||||
Returns the base folders for models.
|
||||
"""
|
||||
model_base_paths = utils.resolve_model_base_paths()
|
||||
return web.json_response({"success": True, "data": model_base_paths})
|
||||
|
||||
@routes.get("/model-manager/models")
|
||||
async def get_folders(request):
|
||||
"""
|
||||
Returns the base folders for models.
|
||||
"""
|
||||
try:
|
||||
result = utils.resolve_model_base_paths()
|
||||
return web.json_response({"success": True, "data": result})
|
||||
except Exception as e:
|
||||
error_msg = f"Read models failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.get("/model-manager/models/{folder}")
|
||||
async def get_folder_models(request):
|
||||
try:
|
||||
folder = request.match_info.get("folder", None)
|
||||
results = self.scan_models(folder, request)
|
||||
return web.json_response({"success": True, "data": results})
|
||||
except Exception as e:
|
||||
error_msg = f"Read models failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.get("/model-manager/model/{type}/{index}/{filename:.*}")
|
||||
async def get_model_info(request):
|
||||
"""
|
||||
Get the information of the specified model.
|
||||
"""
|
||||
model_type = request.match_info.get("type", None)
|
||||
path_index = int(request.match_info.get("index", None))
|
||||
filename = request.match_info.get("filename", None)
|
||||
|
||||
try:
|
||||
model_path = utils.get_valid_full_path(model_type, path_index, filename)
|
||||
result = self.get_model_info(model_path)
|
||||
return web.json_response({"success": True, "data": result})
|
||||
except Exception as e:
|
||||
error_msg = f"Read model info failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.put("/model-manager/model/{type}/{index}/{filename:.*}")
|
||||
async def update_model(request):
|
||||
"""
|
||||
Update model information.
|
||||
|
||||
request body: x-www-form-urlencoded
|
||||
- previewFile: preview file.
|
||||
- description: description.
|
||||
- type: model type.
|
||||
- pathIndex: index of the model folders.
|
||||
- fullname: filename that relative to the model folder.
|
||||
All fields are optional, but type, pathIndex and fullname must appear together.
|
||||
"""
|
||||
model_type = request.match_info.get("type", None)
|
||||
path_index = int(request.match_info.get("index", None))
|
||||
filename = request.match_info.get("filename", None)
|
||||
|
||||
model_data = await request.post()
|
||||
model_data = dict(model_data)
|
||||
|
||||
try:
|
||||
model_path = utils.get_valid_full_path(model_type, path_index, filename)
|
||||
if model_path is None:
|
||||
raise RuntimeError(f"File {filename} not found")
|
||||
self.update_model(model_path, model_data)
|
||||
return web.json_response({"success": True})
|
||||
except Exception as e:
|
||||
error_msg = f"Update model failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
@routes.delete("/model-manager/model/{type}/{index}/{filename:.*}")
|
||||
async def delete_model(request):
|
||||
"""
|
||||
Delete model.
|
||||
"""
|
||||
model_type = request.match_info.get("type", None)
|
||||
path_index = int(request.match_info.get("index", None))
|
||||
filename = request.match_info.get("filename", None)
|
||||
|
||||
try:
|
||||
model_path = utils.get_valid_full_path(model_type, path_index, filename)
|
||||
if model_path is None:
|
||||
raise RuntimeError(f"File {filename} not found")
|
||||
self.remove_model(model_path)
|
||||
return web.json_response({"success": True})
|
||||
except Exception as e:
|
||||
error_msg = f"Delete model failed: {str(e)}"
|
||||
utils.print_error(error_msg)
|
||||
return web.json_response({"success": False, "error": error_msg})
|
||||
|
||||
def scan_models(self, folder: str, request):
|
||||
result = []
|
||||
|
||||
folders, extensions = folder_paths.folder_names_and_paths[folder]
|
||||
for path_index, base_path in enumerate(folders):
|
||||
files = utils.recursive_search_files(base_path, request)
|
||||
|
||||
models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions)
|
||||
|
||||
for fullname in models:
|
||||
fullname = utils.normalize_path(fullname)
|
||||
basename = os.path.splitext(fullname)[0]
|
||||
extension = os.path.splitext(fullname)[1]
|
||||
|
||||
abs_path = utils.join_path(base_path, fullname)
|
||||
file_stats = os.stat(abs_path)
|
||||
|
||||
# Resolve preview
|
||||
image_name = utils.get_model_preview_name(abs_path)
|
||||
image_name = utils.join_path(os.path.dirname(fullname), image_name)
|
||||
abs_image_path = utils.join_path(base_path, image_name)
|
||||
if os.path.isfile(abs_image_path):
|
||||
image_state = os.stat(abs_image_path)
|
||||
image_timestamp = round(image_state.st_mtime_ns / 1000000)
|
||||
image_name = f"{image_name}?ts={image_timestamp}"
|
||||
model_preview = f"/model-manager/preview/{folder}/{path_index}/{image_name}"
|
||||
|
||||
model_info = {
|
||||
"fullname": fullname,
|
||||
"basename": basename,
|
||||
"extension": extension,
|
||||
"type": folder,
|
||||
"pathIndex": path_index,
|
||||
"sizeBytes": file_stats.st_size,
|
||||
"preview": model_preview,
|
||||
"createdAt": round(file_stats.st_ctime_ns / 1000000),
|
||||
"updatedAt": round(file_stats.st_mtime_ns / 1000000),
|
||||
}
|
||||
|
||||
result.append(model_info)
|
||||
|
||||
return result
|
||||
|
||||
def get_model_info(self, model_path: str):
|
||||
directory = os.path.dirname(model_path)
|
||||
|
||||
metadata = utils.get_model_metadata(model_path)
|
||||
|
||||
description_file = utils.get_model_description_name(model_path)
|
||||
description_file = utils.join_path(directory, description_file)
|
||||
description = None
|
||||
if os.path.isfile(description_file):
|
||||
with open(description_file, "r", encoding="utf-8", newline="") as f:
|
||||
description = f.read()
|
||||
|
||||
return {
|
||||
"metadata": metadata,
|
||||
"description": description,
|
||||
}
|
||||
|
||||
def update_model(self, model_path: str, model_data: dict):
|
||||
|
||||
if "previewFile" in model_data:
|
||||
previewFile = model_data["previewFile"]
|
||||
if type(previewFile) is str and previewFile == "undefined":
|
||||
utils.remove_model_preview_image(model_path)
|
||||
else:
|
||||
utils.save_model_preview_image(model_path, previewFile)
|
||||
|
||||
if "description" in model_data:
|
||||
description = model_data["description"]
|
||||
utils.save_model_description(model_path, description)
|
||||
|
||||
if "type" in model_data and "pathIndex" in model_data and "fullname" in model_data:
|
||||
model_type = model_data.get("type", None)
|
||||
path_index = int(model_data.get("pathIndex", None))
|
||||
fullname = model_data.get("fullname", None)
|
||||
if model_type is None or path_index is None or fullname is None:
|
||||
raise RuntimeError("Invalid type or pathIndex or fullname")
|
||||
|
||||
# get new path
|
||||
new_model_path = utils.get_full_path(model_type, path_index, fullname)
|
||||
|
||||
utils.rename_model(model_path, new_model_path)
|
||||
|
||||
def remove_model(self, model_path: str):
|
||||
model_dirname = os.path.dirname(model_path)
|
||||
os.remove(model_path)
|
||||
|
||||
model_previews = utils.get_model_all_images(model_path)
|
||||
for preview in model_previews:
|
||||
os.remove(utils.join_path(model_dirname, preview))
|
||||
|
||||
model_descriptions = utils.get_model_all_descriptions(model_path)
|
||||
for description in model_descriptions:
|
||||
os.remove(utils.join_path(model_dirname, description))
|
||||
193
py/services.py
193
py/services.py
@@ -1,193 +0,0 @@
|
||||
import os
|
||||
|
||||
import folder_paths
|
||||
|
||||
from . import utils
|
||||
from . import download
|
||||
from . import searcher
|
||||
|
||||
|
||||
def scan_models(folder: str, request):
|
||||
result = []
|
||||
|
||||
folders, extensions = folder_paths.folder_names_and_paths[folder]
|
||||
for path_index, base_path in enumerate(folders):
|
||||
files = utils.recursive_search_files(base_path, request)
|
||||
|
||||
models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions)
|
||||
|
||||
for fullname in models:
|
||||
fullname = utils.normalize_path(fullname)
|
||||
basename = os.path.splitext(fullname)[0]
|
||||
extension = os.path.splitext(fullname)[1]
|
||||
|
||||
abs_path = utils.join_path(base_path, fullname)
|
||||
file_stats = os.stat(abs_path)
|
||||
|
||||
# Resolve preview
|
||||
image_name = utils.get_model_preview_name(abs_path)
|
||||
image_name = utils.join_path(os.path.dirname(fullname), image_name)
|
||||
abs_image_path = utils.join_path(base_path, image_name)
|
||||
if os.path.isfile(abs_image_path):
|
||||
image_state = os.stat(abs_image_path)
|
||||
image_timestamp = round(image_state.st_mtime_ns / 1000000)
|
||||
image_name = f"{image_name}?ts={image_timestamp}"
|
||||
model_preview = f"/model-manager/preview/{folder}/{path_index}/{image_name}"
|
||||
|
||||
model_info = {
|
||||
"fullname": fullname,
|
||||
"basename": basename,
|
||||
"extension": extension,
|
||||
"type": folder,
|
||||
"pathIndex": path_index,
|
||||
"sizeBytes": file_stats.st_size,
|
||||
"preview": model_preview,
|
||||
"createdAt": round(file_stats.st_ctime_ns / 1000000),
|
||||
"updatedAt": round(file_stats.st_mtime_ns / 1000000),
|
||||
}
|
||||
|
||||
result.append(model_info)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_model_info(model_path: str):
|
||||
directory = os.path.dirname(model_path)
|
||||
|
||||
metadata = utils.get_model_metadata(model_path)
|
||||
|
||||
description_file = utils.get_model_description_name(model_path)
|
||||
description_file = utils.join_path(directory, description_file)
|
||||
description = None
|
||||
if os.path.isfile(description_file):
|
||||
with open(description_file, "r", encoding="utf-8", newline="") as f:
|
||||
description = f.read()
|
||||
|
||||
return {
|
||||
"metadata": metadata,
|
||||
"description": description,
|
||||
}
|
||||
|
||||
|
||||
def update_model(model_path: str, model_data: dict):
|
||||
|
||||
if "previewFile" in model_data:
|
||||
previewFile = model_data["previewFile"]
|
||||
if type(previewFile) is str and previewFile == "undefined":
|
||||
utils.remove_model_preview_image(model_path)
|
||||
else:
|
||||
utils.save_model_preview_image(model_path, previewFile)
|
||||
|
||||
if "description" in model_data:
|
||||
description = model_data["description"]
|
||||
utils.save_model_description(model_path, description)
|
||||
|
||||
if "type" in model_data and "pathIndex" in model_data and "fullname" in model_data:
|
||||
model_type = model_data.get("type", None)
|
||||
path_index = int(model_data.get("pathIndex", None))
|
||||
fullname = model_data.get("fullname", None)
|
||||
if model_type is None or path_index is None or fullname is None:
|
||||
raise RuntimeError("Invalid type or pathIndex or fullname")
|
||||
|
||||
# get new path
|
||||
new_model_path = utils.get_full_path(model_type, path_index, fullname)
|
||||
|
||||
utils.rename_model(model_path, new_model_path)
|
||||
|
||||
|
||||
def remove_model(model_path: str):
|
||||
model_dirname = os.path.dirname(model_path)
|
||||
os.remove(model_path)
|
||||
|
||||
model_previews = utils.get_model_all_images(model_path)
|
||||
for preview in model_previews:
|
||||
os.remove(utils.join_path(model_dirname, preview))
|
||||
|
||||
model_descriptions = utils.get_model_all_descriptions(model_path)
|
||||
for description in model_descriptions:
|
||||
os.remove(utils.join_path(model_dirname, description))
|
||||
|
||||
|
||||
async def create_model_download_task(task_data, request):
|
||||
return await download.create_model_download_task(task_data, request)
|
||||
|
||||
|
||||
async def scan_model_download_task_list():
|
||||
return await download.scan_model_download_task_list()
|
||||
|
||||
|
||||
async def pause_model_download_task(task_id):
|
||||
return await download.pause_model_download_task(task_id)
|
||||
|
||||
|
||||
async def resume_model_download_task(task_id, request):
|
||||
return await download.download_model(task_id, request)
|
||||
|
||||
|
||||
async def delete_model_download_task(task_id):
|
||||
return await download.delete_model_download_task(task_id)
|
||||
|
||||
|
||||
def fetch_model_info(model_page: str):
|
||||
if not model_page:
|
||||
return []
|
||||
|
||||
model_searcher = searcher.get_model_searcher_by_url(model_page)
|
||||
result = model_searcher.search_by_url(model_page)
|
||||
return result
|
||||
|
||||
|
||||
async def download_model_info(scan_mode: str, request):
|
||||
utils.print_info(f"Download model info for {scan_mode}")
|
||||
model_base_paths = utils.resolve_model_base_paths()
|
||||
for model_type in model_base_paths:
|
||||
|
||||
folders, extensions = folder_paths.folder_names_and_paths[model_type]
|
||||
for path_index, base_path in enumerate(folders):
|
||||
files = utils.recursive_search_files(base_path, request)
|
||||
|
||||
models = folder_paths.filter_files_extensions(files, folder_paths.supported_pt_extensions)
|
||||
|
||||
for fullname in models:
|
||||
fullname = utils.normalize_path(fullname)
|
||||
basename = os.path.splitext(fullname)[0]
|
||||
|
||||
abs_model_path = utils.join_path(base_path, fullname)
|
||||
|
||||
image_name = utils.get_model_preview_name(abs_model_path)
|
||||
abs_image_path = utils.join_path(base_path, image_name)
|
||||
|
||||
has_preview = os.path.isfile(abs_image_path)
|
||||
|
||||
description_name = utils.get_model_description_name(abs_model_path)
|
||||
abs_description_path = utils.join_path(base_path, description_name) if description_name else None
|
||||
has_description = os.path.isfile(abs_description_path) if abs_description_path else False
|
||||
|
||||
try:
|
||||
|
||||
utils.print_info(f"Checking model {abs_model_path}")
|
||||
utils.print_debug(f"Scan mode: {scan_mode}")
|
||||
utils.print_debug(f"Has preview: {has_preview}")
|
||||
utils.print_debug(f"Has description: {has_description}")
|
||||
|
||||
if scan_mode != "full" and (has_preview and has_description):
|
||||
continue
|
||||
|
||||
utils.print_debug(f"Calculate sha256 for {abs_model_path}")
|
||||
hash_value = utils.calculate_sha256(abs_model_path)
|
||||
utils.print_info(f"Searching model info by hash {hash_value}")
|
||||
model_info = searcher.CivitaiModelSearcher().search_by_hash(hash_value)
|
||||
|
||||
preview_url_list = model_info.get("preview", [])
|
||||
preview_image_url = preview_url_list[0] if preview_url_list else None
|
||||
if preview_image_url:
|
||||
utils.print_debug(f"Save preview image to {abs_image_path}")
|
||||
utils.save_model_preview_image(abs_model_path, preview_image_url)
|
||||
|
||||
description = model_info.get("description", None)
|
||||
if description:
|
||||
utils.save_model_description(abs_model_path, description)
|
||||
except Exception as e:
|
||||
utils.print_error(f"Failed to download model info for {abs_model_path}: {e}")
|
||||
|
||||
utils.print_debug("Completed scan model information.")
|
||||
@@ -133,7 +133,11 @@ def download_web_distribution(version: str):
|
||||
print_error(f"An unexpected error occurred: {e}")
|
||||
|
||||
|
||||
def resolve_model_base_paths():
|
||||
def resolve_model_base_paths() -> dict[str, list[str]]:
|
||||
"""
|
||||
Resolve model base paths.
|
||||
eg. { "checkpoints": ["path/to/checkpoints"] }
|
||||
"""
|
||||
folders = list(folder_paths.folder_names_and_paths.keys())
|
||||
model_base_paths = {}
|
||||
folder_black_list = ["configs", "custom_nodes"]
|
||||
|
||||
Reference in New Issue
Block a user