# This class has the method by which we manage the Tasks # It's a printing queue, so we need to add, remove and get information on where # the queue is from collections import deque # Because actually printing and adding new print job requests happen at # diffrent times, the print queue is managed by it's own thread. import threading from datetime import datetime from task import TaskType, CutTask class PrintQueue: """ A Double-ended Queue to manage the printing Tasks """ def __init__(self, app): self.app = app self._queue = deque() self._lock = threading.Lock() self._completed_tasks = {} # Store completed task info self._task_counter = 0 self.app.logger.debug("Created a new PrintQueue") def __len__(self) -> int: return len(self._queue) def enqueue(self, task): """Add task to right of the queue and return position""" with self._lock: try: self.app.logger.info("Add task %s to queue ", task.task_id) self._queue.append(task) position = self._queue.index(task) # We return the current position of the task if it was added self.app.logger.debug("Added a new task %s to the queue at position %s", task.task_id, position) return position except Exception as e: self.app.logger.error("Could not add a task to the queue : %s ", e) raise e def dequeue(self): """Remove and return next task ( from the left of the queue ) (thread-safe)""" with self._lock: return self._queue.popleft() if len(self._queue) > 0 else None def get_position(self, task): """Get current position of task in queue (1-indexed)""" with self._lock: if task.task_id in self._completed_tasks: return None # Task already completed try: # Try to get the position of a Task return self._queue.index(task) except ValueError as e: raise e # end try def is_empty(self): """Check if queue is empty""" with self._lock: self.app.logger.debug("Checking if queue is empty") return len(self._queue) == 0 def get_queue_state(self): """Return current queue state""" with self._lock: self.app.logger.debug("Return current queue state") return [{"task_id": t.task_id, "status": t.status} for t in self._queue] def get_status(self, task_id): """Get full status info for a task""" with self._lock: if task_id in self._completed_tasks: return self._completed_tasks[task_id] # Check in queue if it exists for index, task in enumerate(self._queue): if task.task_id == task_id: # Depending on it's type, we return more info if task.task_type == TaskType.IMAGE: return { "task_id": task_id, "status": task.status, "type" : task.task_type, "position": index, "in_queue": True, "content" : task.content, "signature": task.signature } if task.task_type == TaskType.TEXT: return { "task_id": task_id, "status": task.status, "type" : task.task_type, "position": index, "in_queue": True, "image_path" : str(task.image_path), "signature" : task.signature, "process" : str(task.process) } if task.task_type == TaskType.CUT: return { "task_id": task_id, "status": task.status, "type" : task.task_type, "position": index, "in_queue": True } return None def mark_completed(self, task_id, task_status): """Mark task as completed and remove from queue""" with self._lock: self._completed_tasks[task_id] = { "task_id": task_id, "status": task_status, "position": None, "in_queue": False, "completed_at": datetime.now().isoformat() }