File size: 11,180 Bytes
5fbd25d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
"""
Task queue management

This module provides classes and functions for managing the task queue.

Classes:
    QueueTask: A class representing a task in the queue.
    TaskQueue: A class for managing the task queue.
"""
import uuid
import time
from typing import List, Tuple
import numpy as np
import requests

from fooocusapi.utils.file_utils import delete_output_file, get_file_serve_url
from fooocusapi.utils.img_utils import narray_to_base64img
from fooocusapi.utils.logger import logger

from fooocusapi.models.common.task import ImageGenerationResult, GenerationFinishReason
from fooocusapi.parameters import ImageGenerationParams
from fooocusapi.models.common.task import TaskType


class QueueTask:
    """
    A class representing a task in the queue.

    Attributes:
        job_id (str): The unique identifier for the task, generated by uuid.
        task_type (TaskType): The type of task.
        is_finished (bool): Indicates whether the task has been completed.
        finish_progress (int): The progress of the task completion.
        in_queue_mills (int): The time the task was added to the queue, in milliseconds.
        start_mills (int): The time the task started, in milliseconds.
        finish_mills (int): The time the task finished, in milliseconds.
        finish_with_error (bool): Indicates whether the task finished with an error.
        task_status (str): The status of the task.
        task_step_preview (str): A list of step previews for the task.
        task_result (List[ImageGenerationResult]): The result of the task.
        error_message (str): The error message, if any.
        webhook_url (str): The webhook URL, if any.
    """

    job_id: str
    task_type: TaskType
    req_param: ImageGenerationParams
    is_finished: bool = False
    finish_progress: int = 0
    in_queue_mills: int
    start_mills: int = 0
    finish_mills: int = 0
    finish_with_error: bool = False
    task_status: str | None = None
    task_step_preview: str | None = None
    task_result: List[ImageGenerationResult] = None
    error_message: str | None = None
    webhook_url: str | None = None  # attribute for individual webhook_url

    def __init__(
        self,
        job_id: str,
        task_type: TaskType,
        req_param: ImageGenerationParams,
        webhook_url: str | None = None,
    ):
        self.job_id = job_id
        self.task_type = task_type
        self.req_param = req_param
        self.in_queue_mills = int(round(time.time() * 1000))
        self.webhook_url = webhook_url

    def set_progress(self, progress: int, status: str | None):
        """
        Set progress and status
        Arguments:
            progress {int} -- progress
            status {str} -- status
        """
        progress = min(progress, 100)
        self.finish_progress = progress
        self.task_status = status

    def set_step_preview(self, task_step_preview: str | None):
        """set step preview
        Set step preview
        Arguments:
            task_step_preview {str} -- step preview
        """
        self.task_step_preview = task_step_preview

    def set_result(
        self,
        task_result: List[ImageGenerationResult],
        finish_with_error: bool,
        error_message: str | None = None,
    ):
        """set result
        Set task result
        Arguments:
            task_result {List[ImageGenerationResult]} -- task result
            finish_with_error {bool} -- finish with error
            error_message {str} -- error message
        """
        if not finish_with_error:
            self.finish_progress = 100
            self.task_status = "Finished"
        self.task_result = task_result
        self.finish_with_error = finish_with_error
        self.error_message = error_message

    def __str__(self) -> str:
        return f"QueueTask(job_id={self.job_id}, task_type={self.task_type},\
                is_finished={self.is_finished}, finished_progress={self.finish_progress}, \
                in_queue_mills={self.in_queue_mills}, start_mills={self.start_mills}, \
                finish_mills={self.finish_mills}, finish_with_error={self.finish_with_error}, \
                error_message={self.error_message}, task_status={self.task_status}, \
                task_step_preview={self.task_step_preview}, webhook_url={self.webhook_url})"


class TaskQueue:
    """
    TaskQueue is a queue of tasks that are waiting to be processed.

    Attributes:
        queue: List[QueueTask]
        history: List[QueueTask]
        last_job_id: str
        webhook_url: str
        persistent: bool
    """

    queue: List[QueueTask] = []
    history: List[QueueTask] = []
    last_job_id: str = None
    webhook_url: str | None = None
    persistent: bool = False

    def __init__(
        self,
        queue_size: int,
        history_size: int,
        webhook_url: str | None = None,
        persistent: bool | None = False,
    ):
        self.queue_size = queue_size
        self.history_size = history_size
        self.webhook_url = webhook_url
        self.persistent = False if persistent is None else persistent

    def add_task(
        self,
        task_type: TaskType,
        req_param: ImageGenerationParams,
        webhook_url: str | None = None,
    ) -> QueueTask | None:
        """
        Create and add task to queue
        :param task_type: task type
        :param req_param: request parameters
        :param webhook_url: webhook url
        :returns: The created task's job_id, or None if reach the queue size limit
        """
        if len(self.queue) >= self.queue_size:
            return None

        if isinstance(req_param, dict):
            req_param = ImageGenerationParams(**req_param)

        job_id = str(uuid.uuid4())
        task = QueueTask(
            job_id=job_id,
            task_type=task_type,
            req_param=req_param,
            webhook_url=webhook_url,
        )
        self.queue.append(task)
        self.last_job_id = job_id
        return task

    def get_task(self, job_id: str, include_history: bool = False) -> QueueTask | None:
        """
        Get task by job_id
        :param job_id: job id
        :param include_history: whether to include history tasks
        :returns: The task with the given job_id, or None if not found
        """
        for task in self.queue:
            if task.job_id == job_id:
                return task

        if include_history:
            for task in self.history:
                if task.job_id == job_id:
                    return task

        return None

    def is_task_ready_to_start(self, job_id: str) -> bool:
        """
        Check if the task is ready to start
        :param job_id: job id
        :returns: True if the task is ready to start, False otherwise
        """
        task = self.get_task(job_id)
        if task is None:
            return False

        return self.queue[0].job_id == job_id

    def is_task_finished(self, job_id: str) -> bool:
        """
        Check if the task is finished
        :param job_id: job id
        :returns: True if the task is finished, False otherwise
        """
        task = self.get_task(job_id, True)
        if task is None:
            return False

        return task.is_finished

    def start_task(self, job_id: str):
        """
        Start task by job_id
        :param job_id: job id
        """
        task = self.get_task(job_id)
        if task is not None:
            task.start_mills = int(round(time.time() * 1000))

    def finish_task(self, job_id: str):
        """
        Finish task by job_id
        :param job_id: job id
        """
        task = self.get_task(job_id)
        if task is not None:
            task.is_finished = True
            task.finish_mills = int(round(time.time() * 1000))

            # Use the task's webhook_url if available, else use the default
            webhook_url = task.webhook_url or self.webhook_url

            data = {"job_id": task.job_id, "job_result": []}

            if isinstance(task.task_result, List):
                for item in task.task_result:
                    data["job_result"].append(
                        {
                            "url": get_file_serve_url(item.im) if item.im else None,
                            "seed": item.seed if item.seed else "-1",
                        }
                    )

            # Send webhook
            if task.is_finished and webhook_url:
                try:
                    res = requests.post(webhook_url, json=data, timeout=15)
                    print(f"Call webhook response status: {res.status_code}")
                except Exception as e:
                    print("Call webhook error:", e)

            # Move task to history
            self.queue.remove(task)
            self.history.append(task)

            # save history to database
            if self.persistent:
                from fooocusapi.sql_client import add_history
                add_history(
                    params=task.req_param.to_dict(),
                    task_info=dict(
                        task_type=task.task_type.value,
                        task_id=task.job_id,
                        task_in_queue_mills=task.in_queue_mills,
                        task_start_mills=task.start_mills,
                        task_finish_mills=task.finish_mills,
                    ),
                    result_url=",".join([job["url"] for job in data["job_result"]]),
                    finish_reason=task.task_result[0].finish_reason.value,
                )

            # Clean history
            if len(self.history) > self.history_size != 0:
                removed_task = self.history.pop(0)
                if isinstance(removed_task.task_result, List):
                    for item in removed_task.task_result:
                        if (
                            isinstance(item, ImageGenerationResult)
                            and item.finish_reason == GenerationFinishReason.success
                            and item.im is not None
                        ):
                            delete_output_file(item.im)
                logger.std_info(
                    f"[TaskQueue] Clean task history, remove task: {removed_task.job_id}"
                )


class TaskOutputs:
    """
    TaskOutputs is a container for task outputs
    """

    outputs = []

    def __init__(self, task: QueueTask):
        self.task = task

    def append(self, args: List[any]):
        """
        Append output to task outputs list
        :param args: output arguments
        """
        self.outputs.append(args)
        if len(args) >= 2:
            if (
                args[0] == "preview"
                and isinstance(args[1], Tuple)
                and len(args[1]) >= 2
            ):
                number = args[1][0]
                text = args[1][1]
                self.task.set_progress(number, text)
                if len(args[1]) >= 3 and isinstance(args[1][2], np.ndarray):
                    base64_preview_img = narray_to_base64img(args[1][2])
                    self.task.set_step_preview(base64_preview_img)