Added buttons to search the web for notes and open url in new tab.

- See here for discussion: https://github.com/hayden-fr/ComfyUI-Model-Manager/issues/18
- Added python requirement.
This commit is contained in:
Christian Bastian
2024-09-07 03:28:53 -04:00
parent eeb63912f6
commit 25d52dda6d
3 changed files with 249 additions and 12 deletions

View File

@@ -8,6 +8,8 @@ import copy
import importlib
import re
import base64
import hashlib
import markdownify
from aiohttp import web
import server
@@ -187,6 +189,7 @@ def ui_rules():
Rule("model-show-add-button", True, bool),
Rule("model-show-copy-button", True, bool),
Rule("model-show-load-workflow-button", True, bool),
Rule("model-show-open-model-url-button", False, bool),
Rule("model-info-button-on-left", False, bool),
Rule("model-add-embedding-extension", False, bool),
@@ -240,6 +243,93 @@ def get_def_headers(url=""):
return def_headers
def civitai_get_model_info(sha256_hash: str):
url_api_hash = r"https://civitai.com/api/v1/model-versions/by-hash/" + sha256_hash
hash_response = requests.get(url_api_hash)
if hash_response.status_code != 200:
return {}
hash_info = hash_response.json()
if len(hash_info) == 0:
return {}
model_id = hash_info["modelId"]
url_api_model = r"https://civitai.com/api/v1/models/" + str(model_id)
model_response = requests.get(url_api_model)
if model_response.status_code != 200:
return {}
return model_response.json()
def search_web_for_model_url(sha256_hash):
model_info = civitai_get_model_info(sha256_hash)
if len(model_info) > 0:
model_id = model_info["id"]
version_id = None
for model_version in model_info["modelVersions"]:
for files in model_version["files"]:
if files["hashes"]["SHA256"].lower() == sha256_hash.lower():
version_id = model_version["id"]
break
if version_id is not None: break
return f"https://civitai.com/models/{model_id}?modelVersionId={version_id}"
# TODO: search other websites
return ""
def search_web_for_model_notes(sha256_hash):
model_info = civitai_get_model_info(sha256_hash)
if len(model_info) > 0:
model_description = model_info.get("description", "")
model_version_description = ""
model_trigger_words = []
for model_version in model_info["modelVersions"]:
for files in model_version["files"]:
if files["hashes"]["SHA256"].lower() == sha256_hash.lower():
model_version_description = model_version.get("description", "")
model_trigger_words = model_version.get("trainedWords", "")
break
if model_version_description != "": break
notes = ""
if len(model_trigger_words) > 0:
notes += "# Trigger Words\n\n"
model_trigger_words = [re.sub(",$", "", s.strip()) for s in model_trigger_words]
join_separator = ', '
for s in model_trigger_words:
if ',' in s:
join_separator = '\n'
break
if join_separator == '\n' and len(model_trigger_words) > 1:
model_trigger_words = ["* " + s for s in model_trigger_words]
notes += join_separator.join(model_trigger_words)
if model_version_description != "":
if len(notes) > 0: notes += "\n\n"
notes += "# About this version\n\n"
notes += markdownify.markdownify(model_version_description)
if model_description != "":
if len(notes) > 0: notes += "\n\n"
notes += "# " + model_info.get("name", str(model_info["id"])) + "\n\n"
notes += markdownify.markdownify(model_description)
notes = notes.strip()
return notes
# TODO: search other websites
return ""
def hash_file(path, buffer_size=1024*1024):
sha256 = hashlib.sha256()
with open(path, 'rb') as f:
while True:
data = f.read(buffer_size)
if not data: break
sha256.update(data)
return sha256.hexdigest()
@server.PromptServer.instance.routes.get("/model-manager/timestamp")
async def get_timestamp(request):
return web.json_response({ "timestamp": datetime.now().timestamp() })
@@ -1004,6 +1094,23 @@ async def get_model_info(request):
return web.json_response(result)
@server.PromptServer.instance.routes.get("/model-manager/model/info/web-url")
async def get_model_web_url(request):
model_path = request.query.get("path", None)
if model_path is None:
web.json_response({ "url": "" })
model_path = urllib.parse.unquote(model_path)
abs_path, model_type = search_path_to_system_path(model_path)
if abs_path is None:
web.json_response({ "url": "" })
sha256_hash = hash_file(abs_path)
url = search_web_for_model_url(sha256_hash)
return web.json_response({ "url": url })
@server.PromptServer.instance.routes.get("/model-manager/system-separator")
async def get_system_separator(request):
return web.json_response(os.path.sep)
@@ -1218,12 +1325,50 @@ async def set_notes(request):
except ValueError as e:
print(e, file=sys.stderr, flush=True)
result["alert"] = "Failed to save notes!\n\n" + str(e)
web.json_response(result)
return web.json_response(result)
result["success"] = True
return web.json_response(result)
@server.PromptServer.instance.routes.post("/model-manager/notes/download")
async def try_download_notes(request):
result = { "success": False }
model_path = request.query.get("path", None)
if model_path is None:
result["alert"] = "Missing model path!"
return web.json_response(result)
model_path = urllib.parse.unquote(model_path)
abs_path, model_type = search_path_to_system_path(model_path)
if abs_path is None:
result["alert"] = "Invalid model path!"
return web.json_response(result)
overwrite = request.query.get("overwrite", None)
overwrite = not (overwrite == "False" or overwrite == "false" or overwrite == None)
notes_path = os.path.splitext(abs_path)[0] + ".txt"
if not overwrite and os.path.isfile(notes_path):
result["alert"] = "Notes already exist!"
return web.json_response(result)
sha256_hash = hash_file(abs_path)
notes = search_web_for_model_notes(sha256_hash)
if not notes.isspace() and notes != "":
try:
with open(notes_path, "w", encoding="utf-8") as f:
f.write(notes)
result["success"] = True
except ValueError as e:
print(e, file=sys.stderr, flush=True)
result["alert"] = "Failed to save notes!\n\n" + str(e)
return web.json_response(result)
result["notes"] = notes
return web.json_response(result)
WEB_DIRECTORY = "web"
NODE_CLASS_MAPPINGS = {}
__all__ = ["NODE_CLASS_MAPPINGS"]