108 lines
3.5 KiB
Python
108 lines
3.5 KiB
Python
# This is the main printing thread
|
|
# As explained in the task file, this is where we command
|
|
# printing to happen.
|
|
|
|
import threading
|
|
import time
|
|
|
|
|
|
class PrintWorker(threading.Thread):
|
|
def __init__(self, app, print_queue, printer, socketio=None):
|
|
super().__init__(daemon=True)
|
|
self.app = app
|
|
self.print_queue = print_queue
|
|
self.printer = printer
|
|
self.socketio = socketio # Optional
|
|
self.running = True
|
|
self.state = "idle" # idle, printing, dead, drinking-a-beer
|
|
|
|
self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...")
|
|
|
|
def run(self):
|
|
"""Background thread that processes queue items"""
|
|
self.app.logger.info("Worker started working.")
|
|
while True:
|
|
if not self.running or not self.printer.ready:
|
|
time.sleep(0.2)
|
|
continue
|
|
|
|
try:
|
|
task = self.print_queue.dequeue()
|
|
except Exception as e:
|
|
self.app.logger.error("Could not get a new task ! %s ", str(e))
|
|
raise RuntimeError(
|
|
"We could not get a new task because " + str(e)
|
|
) from e
|
|
|
|
if task:
|
|
try:
|
|
self.app.logger.info("Got a new task")
|
|
self.app.logger.debug("Got task %s", task.task_id)
|
|
self.state = "printing"
|
|
task.status = "processing"
|
|
self._emit_status(task.task_id, "processing")
|
|
|
|
print_data = task.get_print_data()
|
|
try:
|
|
self.printer.print_task(task.task_type, print_data)
|
|
except RuntimeError as e:
|
|
self.app.logger.error("Could not print : %s", str(e))
|
|
raise e
|
|
|
|
task.status = "completed"
|
|
self.print_queue.mark_completed(task.task_id, "completed")
|
|
self._emit_status(task.task_id, "completed")
|
|
|
|
except RuntimeError as e:
|
|
task.status = "failed"
|
|
self.print_queue.mark_completed(task.task_id, "failed")
|
|
self._emit_status(task.task_id, "failed", error=str(e))
|
|
print(f"Print task {task.task_id} failed: {e}")
|
|
else:
|
|
# When they are no new tasks to handle, we put the thread to sleep.
|
|
self.state = "idle"
|
|
time.sleep(0.1)
|
|
|
|
def _emit_status(self, task_id, status, error=None):
|
|
"""Emit status update via Socket.IO if available"""
|
|
if not self.socketio:
|
|
return
|
|
|
|
room = f"task_{task_id}"
|
|
data = {
|
|
"task_id": task_id,
|
|
"status": status,
|
|
"position": None, # Task no longer in queue
|
|
}
|
|
|
|
if error:
|
|
data["error"] = error
|
|
|
|
self.socketio.emit("task_status", data, room=room)
|
|
|
|
def stop_worker(self):
|
|
"""
|
|
Give the worker a break
|
|
"""
|
|
self.app.logger.debug("Giving the worker a break")
|
|
self.state = "drinking-a-beer"
|
|
self.running = False
|
|
|
|
def start_worker(self):
|
|
"""
|
|
Get the worker back to it
|
|
"""
|
|
self.app.logger.debug("Time to work !")
|
|
self.state = "idle"
|
|
self.running = True
|
|
|
|
def current_state(self):
|
|
"""
|
|
Return the worker state
|
|
"""
|
|
return {
|
|
"is_running": self.running,
|
|
"queue_size": len(self.print_queue),
|
|
"state": self.state,
|
|
}
|