feat: support task batch

POST /v2/manager/queue/batch
GET /v2/manager/queue/history_list
GET /v2/manager/queue/history?id={id}
GET /v2/manager/queue/abort_current
This commit is contained in:
Dr.Lt.Data
2025-03-24 22:49:38 +09:00
committed by bymyself
parent 34efbe9262
commit 5ea7bf3683
6 changed files with 407 additions and 174 deletions

View File

@@ -16,7 +16,7 @@ from datetime import datetime
from server import PromptServer
import logging
import asyncio
import queue
from collections import deque
from . import manager_core as core
from . import manager_util
@@ -389,16 +389,73 @@ def nickname_filter(json_obj):
return json_obj
task_queue = queue.Queue()
nodepack_result = {}
model_result = {}
class TaskBatch:
def __init__(self, batch_json, tasks, failed):
self.nodepack_result = {}
self.model_result = {}
self.batch_id = batch_json.get('batch_id') if batch_json is not None else None
self.batch_json = batch_json
self.tasks = tasks
self.current_index = 0
self.stats = {}
self.failed = failed if failed is not None else set()
self.is_aborted = False
def is_done(self):
return len(self.tasks) <= self.current_index
def get_next(self):
if self.is_done():
return None
item = self.tasks[self.current_index]
self.current_index += 1
return item
def done_count(self):
return len(self.nodepack_result) + len(self.model_result)
def total_count(self):
return len(self.tasks)
def abort(self):
self.is_aborted = True
def finalize(self):
if self.batch_id is not None:
batch_path = os.path.join(core.manager_batch_history_path, self.batch_id+".json")
json_obj = {
"batch": self.batch_json,
"nodepack_result": self.nodepack_result,
"model_result": self.model_result,
"failed": list(self.failed)
}
with open(batch_path, "w") as json_file:
json.dump(json_obj, json_file, indent=4)
temp_queue_batch = []
task_batch_queue = deque()
tasks_in_progress = set()
task_worker_lock = threading.Lock()
aborted_batch = None
def finalize_temp_queue_batch(batch_json=None, failed=None):
"""
make temp_queue_batch as a batch snapshot and add to batch_queue
"""
global temp_queue_batch
if len(temp_queue_batch):
batch = TaskBatch(batch_json, temp_queue_batch, failed)
task_batch_queue.append(batch)
temp_queue_batch = []
async def task_worker():
global task_queue
global nodepack_result
global model_result
global tasks_in_progress
async def do_install(item) -> str:
@@ -411,8 +468,7 @@ async def task_worker():
return f"Cannot resolve install target: '{node_spec_str}'"
node_name, version_spec, is_specified = node_spec
res = await core.unified_manager.install_by_id(node_name, version_spec, channel, mode, return_postinstall=skip_post_install)
# discard post install if skip_post_install mode
res = await core.unified_manager.install_by_id(node_name, version_spec, channel, mode, return_postinstall=skip_post_install) # discard post install if skip_post_install mode
if res.action not in ['skip', 'enable', 'install-git', 'install-cnr', 'switch-cnr']:
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
@@ -427,6 +483,11 @@ async def task_worker():
traceback.print_exc()
return f"Installation failed:\n{node_spec_str}"
async def do_enable(item) -> str:
ui_id, cnr_id = item
core.unified_manager.unified_enable(cnr_id)
return 'success'
async def do_update(item):
ui_id, node_name, node_ver = item
@@ -588,31 +649,45 @@ async def task_worker():
return f"Model installation error: {model_url}"
stats = {}
while True:
done_count = len(nodepack_result) + len(model_result)
total_count = done_count + task_queue.qsize()
with task_worker_lock:
if len(task_batch_queue) > 0:
cur_batch = task_batch_queue[0]
else:
logging.info(f"\n[ComfyUI-Manager] All tasks are completed.")
logging.info("\nAfter restarting ComfyUI, please refresh the browser.")
if task_queue.empty():
logging.info(f"\n[ComfyUI-Manager] Queued works are completed.\n{stats}")
res = {'status': 'all-done'}
logging.info("\nAfter restarting ComfyUI, please refresh the browser.")
PromptServer.instance.send_sync("cm-queue-status",
{'status': 'done',
'nodepack_result': nodepack_result, 'model_result': model_result,
'total_count': total_count, 'done_count': done_count})
nodepack_result = {}
task_queue = queue.Queue()
return # terminate worker thread
PromptServer.instance.send_sync("cm-queue-status", res)
return
if cur_batch.is_done():
logging.info(f"\n[ComfyUI-Manager] A tasks batch(batch_id={cur_batch.batch_id}) is completed.\nstat={cur_batch.stats}")
res = {'status': 'batch-done',
'nodepack_result': cur_batch.nodepack_result,
'model_result': cur_batch.model_result,
'total_count': cur_batch.total_count(),
'done_count': cur_batch.done_count(),
'batch_id': cur_batch.batch_id,
'remaining_batch_count': len(task_batch_queue) }
PromptServer.instance.send_sync("cm-queue-status", res)
cur_batch.finalize()
task_batch_queue.popleft()
continue
with task_worker_lock:
kind, item = task_queue.get()
kind, item = cur_batch.get_next()
tasks_in_progress.add((kind, item[0]))
try:
if kind == 'install':
msg = await do_install(item)
elif kind == 'enable':
msg = await do_enable(item)
elif kind == 'install-model':
msg = await do_install_model(item)
elif kind == 'update':
@@ -636,28 +711,128 @@ async def task_worker():
with task_worker_lock:
tasks_in_progress.remove((kind, item[0]))
ui_id = item[0]
if kind == 'install-model':
model_result[ui_id] = msg
ui_target = "model_manager"
elif kind == 'update-main':
nodepack_result[ui_id] = msg
ui_target = "main"
elif kind == 'update-comfyui':
nodepack_result['comfyui'] = msg
ui_target = "main"
elif kind == 'update':
nodepack_result[ui_id] = msg['msg']
ui_target = "nodepack_manager"
else:
nodepack_result[ui_id] = msg
ui_target = "nodepack_manager"
ui_id = item[0]
if kind == 'install-model':
cur_batch.model_result[ui_id] = msg
ui_target = "model_manager"
elif kind == 'update-main':
cur_batch.nodepack_result[ui_id] = msg
ui_target = "main"
elif kind == 'update-comfyui':
cur_batch.nodepack_result['comfyui'] = msg
ui_target = "main"
elif kind == 'update':
cur_batch.nodepack_result[ui_id] = msg['msg']
ui_target = "nodepack_manager"
else:
cur_batch.nodepack_result[ui_id] = msg
ui_target = "nodepack_manager"
stats[kind] = stats.get(kind, 0) + 1
cur_batch.stats[kind] = cur_batch.stats.get(kind, 0) + 1
PromptServer.instance.send_sync("cm-queue-status",
{'status': 'in_progress', 'target': item[0], 'ui_target': ui_target,
'total_count': total_count, 'done_count': done_count})
{'status': 'in_progress',
'target': item[0],
'batch_id': cur_batch.batch_id,
'ui_target': ui_target,
'total_count': cur_batch.total_count(),
'done_count': cur_batch.done_count()})
@routes.post("/v2/manager/queue/batch")
async def queue_batch(request):
json_data = await request.json()
failed = set()
for k, v in json_data.items():
if k == 'update_all':
await _update_all({'mode': v})
elif k == 'reinstall':
for x in v:
res = await _uninstall_custom_node(x)
if res.status != 200:
failed.add(x[0])
else:
res = await _install_custom_node(x)
if res.status != 200:
failed.add(x[0])
elif k == 'install':
for x in v:
res = await _install_custom_node(x)
if res.status != 200:
failed.add(x[0])
elif k == 'uninstall':
for x in v:
res = await _uninstall_custom_node(x)
if res.status != 200:
failed.add(x[0])
elif k == 'update':
for x in v:
res = await _update_custom_node(x)
if res.status != 200:
failed.add(x[0])
elif k == 'update_comfyui':
await update_comfyui(None)
elif k == 'disable':
for x in v:
await _disable_node(x)
elif k == 'install_model':
for x in v:
res = await _install_model(x)
if res.status != 200:
failed.add(x[0])
elif k == 'fix':
for x in v:
res = await _fix_custom_node(x)
if res.status != 200:
failed.add(x[0])
with task_worker_lock:
finalize_temp_queue_batch(json_data, failed)
_queue_start()
return web.json_response({"failed": list(failed)}, content_type='application/json')
@routes.get("/v2/manager/queue/history_list")
async def get_history_list(request):
history_path = core.manager_batch_history_path
try:
files = [os.path.join(history_path, f) for f in os.listdir(history_path) if os.path.isfile(os.path.join(history_path, f))]
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
history_ids = [os.path.basename(f)[:-5] for f in files]
return web.json_response({"ids": list(history_ids)}, content_type='application/json')
except Exception as e:
logging.error(f"[ComfyUI-Manager] /v2/manager/queue/history_list - {e}")
return web.Response(status=400)
@routes.get("/v2/manager/queue/history")
async def get_history(request):
try:
json_name = request.rel_url.query["id"]+'.json'
batch_path = os.path.join(core.manager_batch_history_path, json_name)
with open(batch_path, 'r', encoding='utf-8') as file:
json_str = file.read()
json_obj = json.loads(json_str)
return web.json_response(json_obj, content_type='application/json')
except Exception as e:
logging.error(f"[ComfyUI-Manager] /v2/manager/queue/history - {e}")
return web.Response(status=400)
@routes.get("/v2/customnode/getmappings")
@@ -725,6 +900,11 @@ async def fetch_updates(request):
@routes.get("/v2/manager/queue/update_all")
async def update_all(request):
json_data = dict(request.rel_url.query)
return await _update_all(json_data)
async def _update_all(json_data):
if not is_allowed_security_level('middle'):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403)
@@ -736,13 +916,13 @@ async def update_all(request):
await core.save_snapshot_with_postfix('autosave')
if request.rel_url.query["mode"] == "local":
if json_data["mode"] == "local":
channel = 'local'
else:
channel = core.get_config()['channel_url']
await core.unified_manager.reload(request.rel_url.query["mode"])
await core.unified_manager.get_custom_nodes(channel, request.rel_url.query["mode"])
await core.unified_manager.reload(json_data["mode"])
await core.unified_manager.get_custom_nodes(channel, json_data["mode"])
for k, v in core.unified_manager.active_nodes.items():
if k == 'comfyui-manager':
@@ -751,7 +931,7 @@ async def update_all(request):
continue
update_item = k, k, v[0]
task_queue.put(("update-main", update_item))
temp_queue_batch.append(("update-main", update_item))
for k, v in core.unified_manager.unknown_active_nodes.items():
if k == 'comfyui-manager':
@@ -760,7 +940,7 @@ async def update_all(request):
continue
update_item = k, k, 'unknown'
task_queue.put(("update-main", update_item))
temp_queue_batch.append(("update-main", update_item))
return web.Response(status=200)
@@ -1097,8 +1277,27 @@ async def reinstall_custom_node(request):
@routes.get("/v2/manager/queue/reset")
async def reset_queue(request):
global task_queue
task_queue = queue.Queue()
global task_batch_queue
global temp_queue_batch
with task_worker_lock:
temp_queue_batch = []
task_batch_queue = deque()
return web.Response(status=200)
@routes.get("/v2/manager/queue/abort_current")
async def reset_queue(request):
global task_batch_queue
global temp_queue_batch
with task_worker_lock:
temp_queue_batch = []
if len(task_batch_queue) > 0:
task_batch_queue[0].abort()
task_batch_queue.popleft()
return web.Response(status=200)
@@ -1107,24 +1306,37 @@ async def queue_count(request):
global task_queue
with task_worker_lock:
done_count = len(nodepack_result) + len(model_result)
in_progress_count = len(tasks_in_progress)
total_count = done_count + in_progress_count + task_queue.qsize()
is_processing = task_worker_thread is not None and task_worker_thread.is_alive()
if len(task_batch_queue) > 0:
cur_batch = task_batch_queue[0]
done_count = cur_batch.done_count()
total_count = cur_batch.total_count()
in_progress_count = len(tasks_in_progress)
is_processing = task_worker_thread is not None and task_worker_thread.is_alive()
else:
done_count = 0
total_count = 0
in_progress_count = 0
is_processing = False
return web.json_response({
'total_count': total_count, 'done_count': done_count, 'in_progress_count': in_progress_count,
'total_count': total_count,
'done_count': done_count,
'in_progress_count': in_progress_count,
'is_processing': is_processing})
@routes.post("/v2/manager/queue/install")
async def install_custom_node(request):
json_data = await request.json()
print(f"install={json_data}")
return await _install_custom_node(json_data)
async def _install_custom_node(json_data):
if not is_allowed_security_level('middle'):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403, text="A security error has occurred. Please check the terminal logs")
json_data = await request.json()
# non-nightly cnr is safe
risky_level = None
cnr_id = json_data.get('id')
@@ -1136,8 +1348,10 @@ async def install_custom_node(request):
if json_data['version'] != 'unknown' and selected_version != 'unknown':
if skip_post_install:
if cnr_id in core.unified_manager.nightly_inactive_nodes or cnr_id in core.unified_manager.cnr_inactive_nodes:
core.unified_manager.unified_enable(cnr_id)
enable_item = json_data.get('ui_id'), cnr_id
temp_queue_batch.append(("enable", enable_item))
return web.Response(status=200)
elif selected_version is None:
selected_version = 'latest'
@@ -1150,9 +1364,11 @@ async def install_custom_node(request):
if git_url is None:
logging.error(f"[ComfyUI-Manager] Following node pack doesn't provide `nightly` version: ${git_url}")
return web.Response(status=404, text=f"Following node pack doesn't provide `nightly` version: ${git_url}")
elif json_data['version'] != 'unknown' and selected_version == 'unknown':
logging.error(f"[ComfyUI-Manager] Invalid installation request: {json_data}")
return web.Response(status=400, text="Invalid installation request")
else:
# unknown
unknown_name = os.path.basename(json_data['files'][0])
@@ -1171,7 +1387,7 @@ async def install_custom_node(request):
return web.Response(status=404, text="A security error has occurred. Please check the terminal logs")
install_item = json_data.get('ui_id'), node_spec_str, json_data['channel'], json_data['mode'], skip_post_install
task_queue.put(("install", install_item))
temp_queue_batch.append(("install", install_item))
return web.Response(status=200)
@@ -1180,16 +1396,16 @@ task_worker_thread:threading.Thread = None
@routes.get("/v2/manager/queue/start")
async def queue_start(request):
global nodepack_result
global model_result
with task_worker_lock:
finalize_temp_queue_batch()
return _queue_start()
def _queue_start():
global task_worker_thread
if task_worker_thread is not None and task_worker_thread.is_alive():
return web.Response(status=201) # already in-progress
nodepack_result = {}
model_result = {}
task_worker_thread = threading.Thread(target=lambda: asyncio.run(task_worker()))
task_worker_thread.start()
@@ -1198,12 +1414,15 @@ async def queue_start(request):
@routes.post("/v2/manager/queue/fix")
async def fix_custom_node(request):
json_data = await request.json()
return await _fix_custom_node(json_data)
async def _fix_custom_node(json_data):
if not is_allowed_security_level('middle'):
logging.error(SECURITY_MESSAGE_GENERAL)
return web.Response(status=403, text="A security error has occurred. Please check the terminal logs")
json_data = await request.json()
node_id = json_data.get('id')
node_ver = json_data['version']
if node_ver != 'unknown':
@@ -1213,7 +1432,7 @@ async def fix_custom_node(request):
node_name = os.path.basename(json_data['files'][0])
update_item = json_data.get('ui_id'), node_name, json_data['version']
task_queue.put(("fix", update_item))
temp_queue_batch.append(("fix", update_item))
return web.Response(status=200)
@@ -1252,12 +1471,15 @@ async def install_custom_node_pip(request):
@routes.post("/v2/manager/queue/uninstall")
async def uninstall_custom_node(request):
json_data = await request.json()
return await _uninstall_custom_node(json_data)
async def _uninstall_custom_node(json_data):
if not is_allowed_security_level('middle'):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403, text="A security error has occurred. Please check the terminal logs")
json_data = await request.json()
node_id = json_data.get('id')
if json_data['version'] != 'unknown':
is_unknown = False
@@ -1268,19 +1490,22 @@ async def uninstall_custom_node(request):
node_name = os.path.basename(json_data['files'][0])
uninstall_item = json_data.get('ui_id'), node_name, is_unknown
task_queue.put(("uninstall", uninstall_item))
temp_queue_batch.append(("uninstall", uninstall_item))
return web.Response(status=200)
@routes.post("/v2/manager/queue/update")
async def update_custom_node(request):
json_data = await request.json()
return await _update_custom_node(json_data)
async def _update_custom_node(json_data):
if not is_allowed_security_level('middle'):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403, text="A security error has occurred. Please check the terminal logs")
json_data = await request.json()
node_id = json_data.get('id')
if json_data['version'] != 'unknown':
node_name = node_id
@@ -1289,7 +1514,7 @@ async def update_custom_node(request):
node_name = os.path.basename(json_data['files'][0])
update_item = json_data.get('ui_id'), node_name, json_data['version']
task_queue.put(("update", update_item))
temp_queue_batch.append(("update", update_item))
return web.Response(status=200)
@@ -1297,7 +1522,7 @@ async def update_custom_node(request):
@routes.get("/v2/manager/queue/update_comfyui")
async def update_comfyui(request):
is_stable = core.get_config()['update_policy'] != 'nightly-comfyui'
task_queue.put(("update-comfyui", ('comfyui', is_stable)))
temp_queue_batch.append(("update-comfyui", ('comfyui', is_stable)))
return web.Response(status=200)
@@ -1328,7 +1553,11 @@ async def comfyui_switch_version(request):
@routes.post("/v2/manager/queue/disable")
async def disable_node(request):
json_data = await request.json()
await _disable_node(json_data)
return web.Response(status=200)
async def _disable_node(json_data):
node_id = json_data.get('id')
if json_data['version'] != 'unknown':
is_unknown = False
@@ -1339,9 +1568,7 @@ async def disable_node(request):
node_name = os.path.basename(json_data['files'][0])
update_item = json_data.get('ui_id'), node_name, is_unknown
task_queue.put(("disable", update_item))
return web.Response(status=200)
temp_queue_batch.append(("disable", update_item))
async def check_whitelist_for_model(item):
@@ -1363,7 +1590,10 @@ async def check_whitelist_for_model(item):
@routes.post("/v2/manager/queue/install_model")
async def install_model(request):
json_data = await request.json()
return await _install_model(json_data)
async def _install_model(json_data):
if not is_allowed_security_level('middle'):
logging.error(SECURITY_MESSAGE_MIDDLE_OR_BELOW)
return web.Response(status=403, text="A security error has occurred. Please check the terminal logs")
@@ -1387,7 +1617,7 @@ async def install_model(request):
return web.Response(status=403, text="A security error has occurred. Please check the terminal logs")
install_item = json_data.get('ui_id'), json_data
task_queue.put(("install-model", install_item))
temp_queue_batch.append(("install-model", install_item))
return web.Response(status=200)