feat: stop feature
feat: model-manager - support background tasking
This commit is contained in:
@@ -369,12 +369,14 @@ def nickname_filter(json_obj):
|
||||
return json_obj
|
||||
|
||||
|
||||
install_queue = queue.Queue()
|
||||
install_result = {}
|
||||
task_queue = queue.Queue()
|
||||
nodepack_result = {}
|
||||
model_result = {}
|
||||
|
||||
async def install_worker():
|
||||
global install_result
|
||||
global install_queue
|
||||
async def task_worker():
|
||||
global task_queue
|
||||
global nodepack_result
|
||||
global model_result
|
||||
|
||||
async def do_install(item):
|
||||
ui_id, node_spec_str, channel, mode, skip_post_install = item
|
||||
@@ -384,7 +386,7 @@ async def install_worker():
|
||||
|
||||
if node_spec is None:
|
||||
logging.error(f"Cannot resolve install target: '{node_spec_str}'")
|
||||
install_result[ui_id] = f"Cannot resolve install target: '{node_spec_str}'"
|
||||
nodepack_result[ui_id] = f"Cannot resolve install target: '{node_spec_str}'"
|
||||
return
|
||||
|
||||
node_name, version_spec, is_specified = node_spec
|
||||
@@ -393,18 +395,18 @@ async def install_worker():
|
||||
|
||||
if res.action not in ['skip', 'enable', 'install-git', 'install-cnr', 'switch-cnr']:
|
||||
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
|
||||
install_result[ui_id] = res.msg
|
||||
nodepack_result[ui_id] = res.msg
|
||||
return
|
||||
|
||||
elif not res.result:
|
||||
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
|
||||
install_result[ui_id] = res.msg
|
||||
nodepack_result[ui_id] = res.msg
|
||||
return
|
||||
|
||||
install_result[ui_id] = 'success'
|
||||
nodepack_result[ui_id] = 'success'
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
install_result[ui_id] = f"Installation failed:\n{node_spec_str}"
|
||||
nodepack_result[ui_id] = f"Installation failed:\n{node_spec_str}"
|
||||
|
||||
async def do_update(item):
|
||||
ui_id, node_name, node_ver = item
|
||||
@@ -415,14 +417,14 @@ async def install_worker():
|
||||
manager_util.clear_pip_cache()
|
||||
|
||||
if res.result:
|
||||
install_result[ui_id] = 'success'
|
||||
nodepack_result[ui_id] = 'success'
|
||||
return
|
||||
|
||||
logging.error(f"\nERROR: An error occurred while updating '{node_name}'.")
|
||||
install_result[ui_id] = f"An error occurred while updating '{node_name}'."
|
||||
nodepack_result[ui_id] = f"An error occurred while updating '{node_name}'."
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
install_result[ui_id] = f"An error occurred while updating '{node_name}'."
|
||||
nodepack_result[ui_id] = f"An error occurred while updating '{node_name}'."
|
||||
|
||||
async def do_fix(item):
|
||||
ui_id, node_name, node_ver = item
|
||||
@@ -431,16 +433,16 @@ async def install_worker():
|
||||
res = core.unified_manager.unified_fix(node_name, node_ver)
|
||||
|
||||
if res.result:
|
||||
install_result[ui_id] = 'success'
|
||||
nodepack_result[ui_id] = 'success'
|
||||
return
|
||||
else:
|
||||
logging.error(res.msg)
|
||||
|
||||
logging.error(f"\nERROR: An error occurred while fixing '{node_name}@{node_ver}'.")
|
||||
install_result[ui_id] = f"An error occurred while fixing '{node_name}@{node_ver}'."
|
||||
nodepack_result[ui_id] = f"An error occurred while fixing '{node_name}@{node_ver}'."
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
install_result[ui_id] = f"An error occurred while fixing '{node_name}@{node_ver}'."
|
||||
nodepack_result[ui_id] = f"An error occurred while fixing '{node_name}@{node_ver}'."
|
||||
|
||||
async def do_uninstall(item):
|
||||
ui_id, node_name, is_unknown = item
|
||||
@@ -449,14 +451,14 @@ async def install_worker():
|
||||
res = core.unified_manager.unified_uninstall(node_name, is_unknown)
|
||||
|
||||
if res.result:
|
||||
install_result[ui_id] = 'success'
|
||||
nodepack_result[ui_id] = 'success'
|
||||
return
|
||||
|
||||
logging.error(f"\nERROR: An error occurred while uninstalling '{node_name}'.")
|
||||
install_result[ui_id] = f"An error occurred while uninstalling '{node_name}'."
|
||||
nodepack_result[ui_id] = f"An error occurred while uninstalling '{node_name}'."
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
install_result[ui_id] = f"An error occurred while uninstalling '{node_name}'."
|
||||
nodepack_result[ui_id] = f"An error occurred while uninstalling '{node_name}'."
|
||||
|
||||
async def do_disable(item):
|
||||
ui_id, node_name, is_unknown = item
|
||||
@@ -465,48 +467,96 @@ async def install_worker():
|
||||
res = core.unified_manager.unified_disable(node_name, is_unknown)
|
||||
|
||||
if res:
|
||||
install_result[ui_id] = 'success'
|
||||
nodepack_result[ui_id] = 'success'
|
||||
return
|
||||
|
||||
install_result[ui_id] = f"Failed to disable: '{node_name}'"
|
||||
nodepack_result[ui_id] = f"Failed to disable: '{node_name}'"
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
install_result[ui_id] = f"Failed to disable: '{node_name}'"
|
||||
nodepack_result[ui_id] = f"Failed to disable: '{node_name}'"
|
||||
|
||||
async def do_install_model(item):
|
||||
ui_id, json_data = item
|
||||
|
||||
model_path = get_model_path(json_data)
|
||||
model_url = json_data['url']
|
||||
|
||||
try:
|
||||
if model_path is not None:
|
||||
logging.info(f"Install model '{json_data['name']}' from '{model_url}' into '{model_path}'")
|
||||
if not core.get_config()['model_download_by_agent'] and (
|
||||
model_url.startswith('https://github.com') or model_url.startswith('https://huggingface.co') or model_url.startswith('https://heibox.uni-heidelberg.de')):
|
||||
model_dir = get_model_dir(json_data, True)
|
||||
download_url(model_url, model_dir, filename=json_data['filename'])
|
||||
if model_path.endswith('.zip'):
|
||||
res = core.unzip(model_path)
|
||||
else:
|
||||
res = True
|
||||
|
||||
if res:
|
||||
model_result[ui_id] = 'success'
|
||||
return
|
||||
else:
|
||||
res = download_url_with_agent(model_url, model_path)
|
||||
if res and model_path.endswith('.zip'):
|
||||
res = core.unzip(model_path)
|
||||
else:
|
||||
logging.error(f"Model installation error: invalid model type - {json_data['type']}")
|
||||
return
|
||||
|
||||
if res:
|
||||
model_result[ui_id] = 'success'
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"[ERROR] {e}", file=sys.stderr)
|
||||
|
||||
model_result[ui_id] = f"Model installation error: {model_url}"
|
||||
|
||||
stats = {}
|
||||
|
||||
while True:
|
||||
done_count = len(install_result)
|
||||
total_count = done_count + install_queue.qsize()
|
||||
done_count = len(nodepack_result) + len(model_result)
|
||||
total_count = done_count + task_queue.qsize()
|
||||
|
||||
if install_queue.empty():
|
||||
if task_queue.empty():
|
||||
logging.info(f"\n[ComfyUI-Manager] Queued works are completed.\n{stats}")
|
||||
|
||||
logging.info("\nAfter restarting ComfyUI, please refresh the browser.")
|
||||
PromptServer.instance.send_sync("cm-install-status",
|
||||
{'status': 'done', 'result': install_result,
|
||||
PromptServer.instance.send_sync("cm-queue-status",
|
||||
{'status': 'done',
|
||||
'nodepack_result': nodepack_result, 'model_result': model_result,
|
||||
'total_count': total_count, 'done_count': done_count})
|
||||
install_result = {}
|
||||
install_queue = queue.Queue()
|
||||
nodepack_result = {}
|
||||
task_queue = queue.Queue()
|
||||
return
|
||||
|
||||
kind, item = install_queue.get()
|
||||
kind, item = task_queue.get()
|
||||
|
||||
if kind == 'install':
|
||||
await do_install(item)
|
||||
elif kind == 'update':
|
||||
await do_update(item)
|
||||
elif kind == 'fix':
|
||||
await do_fix(item)
|
||||
elif kind == 'uninstall':
|
||||
await do_uninstall(item)
|
||||
elif kind == 'disable':
|
||||
await do_disable(item)
|
||||
try:
|
||||
if kind == 'install':
|
||||
await do_install(item)
|
||||
if kind == 'install-model':
|
||||
await do_install_model(item)
|
||||
elif kind == 'update':
|
||||
await do_update(item)
|
||||
elif kind == 'fix':
|
||||
await do_fix(item)
|
||||
elif kind == 'uninstall':
|
||||
await do_uninstall(item)
|
||||
elif kind == 'disable':
|
||||
await do_disable(item)
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
||||
stats[kind] = stats.get(kind, 0) + 1
|
||||
|
||||
PromptServer.instance.send_sync("cm-install-status",
|
||||
{'status': 'in_progress', 'target': item[0],
|
||||
ui_target = "model_manager" if kind == 'install-model' else 'nodepack_manager'
|
||||
|
||||
print(f"kind: {kind} / ui_target: {ui_target}")
|
||||
|
||||
PromptServer.instance.send_sync("cm-queue-status",
|
||||
{'status': 'in_progress', 'target': item[0], 'ui_target': ui_target,
|
||||
'total_count': total_count, 'done_count': done_count})
|
||||
|
||||
|
||||
@@ -1006,30 +1056,31 @@ async def import_fail_info(request):
|
||||
return web.Response(status=400)
|
||||
|
||||
|
||||
@routes.post("/customnode/reinstall")
|
||||
@routes.post("/manager/queue/reinstall")
|
||||
async def reinstall_custom_node(request):
|
||||
await uninstall_custom_node(request)
|
||||
await install_custom_node(request)
|
||||
|
||||
|
||||
@routes.get("/customnode/queue/reset")
|
||||
@routes.get("/manager/queue/reset")
|
||||
async def reset_queue(request):
|
||||
global install_queue
|
||||
install_queue = queue.Queue()
|
||||
global task_queue
|
||||
task_queue = queue.Queue()
|
||||
return web.Response(status=200)
|
||||
|
||||
|
||||
@routes.get("/customnode/queue/count")
|
||||
@routes.get("/manager/queue/status")
|
||||
async def queue_count(request):
|
||||
global install_queue
|
||||
global task_queue
|
||||
|
||||
done_count = len(install_result)
|
||||
total_count = done_count + install_queue.qsize()
|
||||
done_count = len(nodepack_result) + len(model_result)
|
||||
total_count = done_count + task_queue.qsize()
|
||||
in_progress = task_worker_thread is not None and task_worker_thread.is_alive()
|
||||
|
||||
return web.json_response({'total_count': total_count, 'done_count': done_count})
|
||||
return web.json_response({'total_count': total_count, 'done_count': done_count, 'in_progress': in_progress})
|
||||
|
||||
|
||||
@routes.post("/customnode/queue/install")
|
||||
@routes.post("/manager/queue/install")
|
||||
async def install_custom_node(request):
|
||||
if not is_allowed_security_level('middle'):
|
||||
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
|
||||
@@ -1073,22 +1124,32 @@ async def install_custom_node(request):
|
||||
return web.Response(status=404, text="A security error has occurred. Please check the terminal logs")
|
||||
|
||||
install_item = json_data.get('ui_id'), node_spec_str, json_data['channel'], json_data['mode'], skip_post_install
|
||||
install_queue.put(("install", install_item))
|
||||
task_queue.put(("install", install_item))
|
||||
|
||||
return web.Response(status=200)
|
||||
|
||||
|
||||
@routes.get("/customnode/queue/start")
|
||||
task_worker_thread = None
|
||||
|
||||
@routes.get("/manager/queue/start")
|
||||
async def queue_start(request):
|
||||
global install_result
|
||||
install_result = {}
|
||||
global nodepack_result
|
||||
global model_result
|
||||
global task_worker_thread
|
||||
|
||||
threading.Thread(target=lambda: asyncio.run(install_worker())).start()
|
||||
if task_worker_thread is not None and task_worker_thread.is_alive():
|
||||
return web.Response(status=201) # already in-progress
|
||||
|
||||
nodepack_result = {}
|
||||
model_result = {}
|
||||
|
||||
task_worker_thread = threading.Thread(target=lambda: asyncio.run(task_worker()))
|
||||
task_worker_thread.start()
|
||||
|
||||
return web.Response(status=200)
|
||||
|
||||
|
||||
@routes.post("/customnode/queue/fix")
|
||||
@routes.post("/manager/queue/fix")
|
||||
async def fix_custom_node(request):
|
||||
if not is_allowed_security_level('middle'):
|
||||
logging.error(SECURITY_MESSAGE_GENERAL)
|
||||
@@ -1105,7 +1166,7 @@ async def fix_custom_node(request):
|
||||
node_name = os.path.basename(json_data['files'][0])
|
||||
|
||||
update_item = json_data.get('ui_id'), node_name, json_data['version']
|
||||
install_queue.put(("fix", update_item))
|
||||
task_queue.put(("fix", update_item))
|
||||
|
||||
return web.Response(status=200)
|
||||
|
||||
@@ -1142,7 +1203,7 @@ async def install_custom_node_pip(request):
|
||||
return web.Response(status=200)
|
||||
|
||||
|
||||
@routes.post("/customnode/queue/uninstall")
|
||||
@routes.post("/manager/queue/uninstall")
|
||||
async def uninstall_custom_node(request):
|
||||
if not is_allowed_security_level('middle'):
|
||||
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
|
||||
@@ -1160,12 +1221,12 @@ async def uninstall_custom_node(request):
|
||||
node_name = os.path.basename(json_data['files'][0])
|
||||
|
||||
uninstall_item = json_data.get('ui_id'), node_name, is_unknown
|
||||
install_queue.put(("uninstall", uninstall_item))
|
||||
task_queue.put(("uninstall", uninstall_item))
|
||||
|
||||
return web.Response(status=200)
|
||||
|
||||
|
||||
@routes.post("/customnode/queue/update")
|
||||
@routes.post("/manager/queue/update")
|
||||
async def update_custom_node(request):
|
||||
if not is_allowed_security_level('middle'):
|
||||
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
|
||||
@@ -1181,7 +1242,7 @@ async def update_custom_node(request):
|
||||
node_name = os.path.basename(json_data['files'][0])
|
||||
|
||||
update_item = json_data.get('ui_id'), node_name, json_data['version']
|
||||
install_queue.put(("update", update_item))
|
||||
task_queue.put(("update", update_item))
|
||||
|
||||
return web.Response(status=200)
|
||||
|
||||
@@ -1232,7 +1293,7 @@ async def comfyui_switch_version(request):
|
||||
return web.Response(status=400)
|
||||
|
||||
|
||||
@routes.post("/customnode/queue/disable")
|
||||
@routes.post("/manager/queue/disable")
|
||||
async def disable_node(request):
|
||||
json_data = await request.json()
|
||||
|
||||
@@ -1246,7 +1307,7 @@ async def disable_node(request):
|
||||
node_name = os.path.basename(json_data['files'][0])
|
||||
|
||||
update_item = json_data.get('ui_id'), node_name, is_unknown
|
||||
install_queue.put(("disable", update_item))
|
||||
task_queue.put(("disable", update_item))
|
||||
|
||||
return web.Response(status=200)
|
||||
|
||||
@@ -1264,12 +1325,10 @@ async def need_to_migrate(request):
|
||||
return web.Response(text=str(core.need_to_migrate), status=200)
|
||||
|
||||
|
||||
@routes.post("/model/install")
|
||||
@routes.post("/manager/queue/install_model")
|
||||
async def install_model(request):
|
||||
json_data = await request.json()
|
||||
|
||||
model_path = get_model_path(json_data)
|
||||
|
||||
if not is_allowed_security_level('middle'):
|
||||
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
|
||||
return web.Response(status=403)
|
||||
@@ -1287,42 +1346,8 @@ async def install_model(request):
|
||||
logging.error(SECURITY_MESSAGE_NORMAL_MINUS)
|
||||
return web.Response(status=403)
|
||||
|
||||
def do_install():
|
||||
res = False
|
||||
|
||||
try:
|
||||
if model_path is not None:
|
||||
|
||||
model_url = json_data['url']
|
||||
logging.info(f"Install model '{json_data['name']}' from '{model_url}' into '{model_path}'")
|
||||
if not core.get_config()['model_download_by_agent'] and (
|
||||
model_url.startswith('https://github.com') or model_url.startswith('https://huggingface.co') or model_url.startswith('https://heibox.uni-heidelberg.de')):
|
||||
model_dir = get_model_dir(json_data, True)
|
||||
download_url(model_url, model_dir, filename=json_data['filename'])
|
||||
if model_path.endswith('.zip'):
|
||||
res = core.unzip(model_path)
|
||||
else:
|
||||
res = True
|
||||
|
||||
if res:
|
||||
return web.json_response({}, content_type='application/json')
|
||||
else:
|
||||
res = download_url_with_agent(model_url, model_path)
|
||||
if res and model_path.endswith('.zip'):
|
||||
res = core.unzip(model_path)
|
||||
else:
|
||||
logging.error(f"Model installation error: invalid model type - {json_data['type']}")
|
||||
|
||||
if res:
|
||||
return web.json_response({}, content_type='application/json')
|
||||
except Exception as e:
|
||||
logging.error(f"[ERROR] {e}", file=sys.stderr)
|
||||
return web.Response(status=400)
|
||||
|
||||
# Run the installation in a thread pool
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
|
||||
asyncio.get_event_loop().run_in_executor(executor, do_install)
|
||||
install_item = json_data.get('ui_id'), json_data
|
||||
task_queue.put(("install-model", install_item))
|
||||
|
||||
return web.Response(status=200)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user