Spaces:
Runtime error
Runtime error
File size: 11,180 Bytes
5fbd25d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
"""
Task queue management
This module provides classes and functions for managing the task queue.
Classes:
QueueTask: A class representing a task in the queue.
TaskQueue: A class for managing the task queue.
"""
import uuid
import time
from typing import List, Tuple
import numpy as np
import requests
from fooocusapi.utils.file_utils import delete_output_file, get_file_serve_url
from fooocusapi.utils.img_utils import narray_to_base64img
from fooocusapi.utils.logger import logger
from fooocusapi.models.common.task import ImageGenerationResult, GenerationFinishReason
from fooocusapi.parameters import ImageGenerationParams
from fooocusapi.models.common.task import TaskType
class QueueTask:
"""
A class representing a task in the queue.
Attributes:
job_id (str): The unique identifier for the task, generated by uuid.
task_type (TaskType): The type of task.
is_finished (bool): Indicates whether the task has been completed.
finish_progress (int): The progress of the task completion.
in_queue_mills (int): The time the task was added to the queue, in milliseconds.
start_mills (int): The time the task started, in milliseconds.
finish_mills (int): The time the task finished, in milliseconds.
finish_with_error (bool): Indicates whether the task finished with an error.
task_status (str): The status of the task.
task_step_preview (str): A list of step previews for the task.
task_result (List[ImageGenerationResult]): The result of the task.
error_message (str): The error message, if any.
webhook_url (str): The webhook URL, if any.
"""
job_id: str
task_type: TaskType
req_param: ImageGenerationParams
is_finished: bool = False
finish_progress: int = 0
in_queue_mills: int
start_mills: int = 0
finish_mills: int = 0
finish_with_error: bool = False
task_status: str | None = None
task_step_preview: str | None = None
task_result: List[ImageGenerationResult] = None
error_message: str | None = None
webhook_url: str | None = None # attribute for individual webhook_url
def __init__(
self,
job_id: str,
task_type: TaskType,
req_param: ImageGenerationParams,
webhook_url: str | None = None,
):
self.job_id = job_id
self.task_type = task_type
self.req_param = req_param
self.in_queue_mills = int(round(time.time() * 1000))
self.webhook_url = webhook_url
def set_progress(self, progress: int, status: str | None):
"""
Set progress and status
Arguments:
progress {int} -- progress
status {str} -- status
"""
progress = min(progress, 100)
self.finish_progress = progress
self.task_status = status
def set_step_preview(self, task_step_preview: str | None):
"""set step preview
Set step preview
Arguments:
task_step_preview {str} -- step preview
"""
self.task_step_preview = task_step_preview
def set_result(
self,
task_result: List[ImageGenerationResult],
finish_with_error: bool,
error_message: str | None = None,
):
"""set result
Set task result
Arguments:
task_result {List[ImageGenerationResult]} -- task result
finish_with_error {bool} -- finish with error
error_message {str} -- error message
"""
if not finish_with_error:
self.finish_progress = 100
self.task_status = "Finished"
self.task_result = task_result
self.finish_with_error = finish_with_error
self.error_message = error_message
def __str__(self) -> str:
return f"QueueTask(job_id={self.job_id}, task_type={self.task_type},\
is_finished={self.is_finished}, finished_progress={self.finish_progress}, \
in_queue_mills={self.in_queue_mills}, start_mills={self.start_mills}, \
finish_mills={self.finish_mills}, finish_with_error={self.finish_with_error}, \
error_message={self.error_message}, task_status={self.task_status}, \
task_step_preview={self.task_step_preview}, webhook_url={self.webhook_url})"
class TaskQueue:
"""
TaskQueue is a queue of tasks that are waiting to be processed.
Attributes:
queue: List[QueueTask]
history: List[QueueTask]
last_job_id: str
webhook_url: str
persistent: bool
"""
queue: List[QueueTask] = []
history: List[QueueTask] = []
last_job_id: str = None
webhook_url: str | None = None
persistent: bool = False
def __init__(
self,
queue_size: int,
history_size: int,
webhook_url: str | None = None,
persistent: bool | None = False,
):
self.queue_size = queue_size
self.history_size = history_size
self.webhook_url = webhook_url
self.persistent = False if persistent is None else persistent
def add_task(
self,
task_type: TaskType,
req_param: ImageGenerationParams,
webhook_url: str | None = None,
) -> QueueTask | None:
"""
Create and add task to queue
:param task_type: task type
:param req_param: request parameters
:param webhook_url: webhook url
:returns: The created task's job_id, or None if reach the queue size limit
"""
if len(self.queue) >= self.queue_size:
return None
if isinstance(req_param, dict):
req_param = ImageGenerationParams(**req_param)
job_id = str(uuid.uuid4())
task = QueueTask(
job_id=job_id,
task_type=task_type,
req_param=req_param,
webhook_url=webhook_url,
)
self.queue.append(task)
self.last_job_id = job_id
return task
def get_task(self, job_id: str, include_history: bool = False) -> QueueTask | None:
"""
Get task by job_id
:param job_id: job id
:param include_history: whether to include history tasks
:returns: The task with the given job_id, or None if not found
"""
for task in self.queue:
if task.job_id == job_id:
return task
if include_history:
for task in self.history:
if task.job_id == job_id:
return task
return None
def is_task_ready_to_start(self, job_id: str) -> bool:
"""
Check if the task is ready to start
:param job_id: job id
:returns: True if the task is ready to start, False otherwise
"""
task = self.get_task(job_id)
if task is None:
return False
return self.queue[0].job_id == job_id
def is_task_finished(self, job_id: str) -> bool:
"""
Check if the task is finished
:param job_id: job id
:returns: True if the task is finished, False otherwise
"""
task = self.get_task(job_id, True)
if task is None:
return False
return task.is_finished
def start_task(self, job_id: str):
"""
Start task by job_id
:param job_id: job id
"""
task = self.get_task(job_id)
if task is not None:
task.start_mills = int(round(time.time() * 1000))
def finish_task(self, job_id: str):
"""
Finish task by job_id
:param job_id: job id
"""
task = self.get_task(job_id)
if task is not None:
task.is_finished = True
task.finish_mills = int(round(time.time() * 1000))
# Use the task's webhook_url if available, else use the default
webhook_url = task.webhook_url or self.webhook_url
data = {"job_id": task.job_id, "job_result": []}
if isinstance(task.task_result, List):
for item in task.task_result:
data["job_result"].append(
{
"url": get_file_serve_url(item.im) if item.im else None,
"seed": item.seed if item.seed else "-1",
}
)
# Send webhook
if task.is_finished and webhook_url:
try:
res = requests.post(webhook_url, json=data, timeout=15)
print(f"Call webhook response status: {res.status_code}")
except Exception as e:
print("Call webhook error:", e)
# Move task to history
self.queue.remove(task)
self.history.append(task)
# save history to database
if self.persistent:
from fooocusapi.sql_client import add_history
add_history(
params=task.req_param.to_dict(),
task_info=dict(
task_type=task.task_type.value,
task_id=task.job_id,
task_in_queue_mills=task.in_queue_mills,
task_start_mills=task.start_mills,
task_finish_mills=task.finish_mills,
),
result_url=",".join([job["url"] for job in data["job_result"]]),
finish_reason=task.task_result[0].finish_reason.value,
)
# Clean history
if len(self.history) > self.history_size != 0:
removed_task = self.history.pop(0)
if isinstance(removed_task.task_result, List):
for item in removed_task.task_result:
if (
isinstance(item, ImageGenerationResult)
and item.finish_reason == GenerationFinishReason.success
and item.im is not None
):
delete_output_file(item.im)
logger.std_info(
f"[TaskQueue] Clean task history, remove task: {removed_task.job_id}"
)
class TaskOutputs:
"""
TaskOutputs is a container for task outputs
"""
outputs = []
def __init__(self, task: QueueTask):
self.task = task
def append(self, args: List[any]):
"""
Append output to task outputs list
:param args: output arguments
"""
self.outputs.append(args)
if len(args) >= 2:
if (
args[0] == "preview"
and isinstance(args[1], Tuple)
and len(args[1]) >= 2
):
number = args[1][0]
text = args[1][1]
self.task.set_progress(number, text)
if len(args[1]) >= 3 and isinstance(args[1][2], np.ndarray):
base64_preview_img = narray_to_base64img(args[1][2])
self.task.set_step_preview(base64_preview_img)
|