Add worker class

This commit is contained in:
n07070
2026-05-21 02:33:40 +02:00
parent e7a7c84664
commit cbd5d59445

111
src/worker.py Normal file
View File

@@ -0,0 +1,111 @@
# This is the main printing thread
# As explained in the task file, this is where we command
# printing to happen.
import threading
import time
from task import TaskType
class PrintWorker(threading.Thread):
def __init__(self, app, print_queue, printer, socketio=None):
super().__init__(daemon=True)
self.app = app
self.print_queue = print_queue
self.printer = printer
self.socketio = socketio # Optional
self.running = True
self.state = "idle" # idle, printing, dead, drinking-a-beer
self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...")
def run(self):
"""Background thread that processes queue items"""
self.app.logger.info("Worker started working.")
while True:
if not self.running:
time.sleep(0.2)
continue
# TODO: This could be improved to simply no start
# the while loop as long as the printer is not ready.
# and maybe get out of it when the printer is not ready anymore ?
if not self.printer.ready:
self.app.logger.debug("Waiting for the printer to be ready...")
time.sleep(1)
continue
try:
task = self.print_queue.dequeue()
except Exception as e:
self.app.logger.error("Could not get a new task ! %s ", str(e))
raise RuntimeError("We could not get a new task because " + str(e)) from e
if task:
try:
self.app.logger.info("Got a new task")
self.app.logger.debug("Got task %s", task.task_id)
self.state = "printing"
task.status = "processing"
self._emit_status(task.task_id, "processing")
print_data = task.get_print_data()
try:
self.printer.print_task(task.task_type, print_data)
except Exception as e:
self.app.logger.error("Could not print : %s", str(e))
raise e
task.status = "completed"
self.print_queue.mark_completed(task.task_id, "completed")
self._emit_status(task.task_id, "completed")
except Exception as e:
task.status = "failed"
self.print_queue.mark_completed(task.task_id, "failed")
self._emit_status(task.task_id, "failed", error=str(e))
print(f"Print task {task.task_id} failed: {e}")
else:
# When they are no new tasks to handle, we put the thread to sleep.
self.state = "idle"
time.sleep(0.1)
def _emit_status(self, task_id, status, error=None):
"""Emit status update via Socket.IO if available"""
if not self.socketio:
return
room = f"task_{task_id}"
data = {
"task_id": task_id,
"status": status,
"position": None # Task no longer in queue
}
if error:
data["error"] = error
self.socketio.emit('task_status', data, room=room)
def stop_worker(self):
"""
Give the worker a break
"""
self.state = "drinking-a-beer"
self.running = False
def start_worker(self):
"""
Get the worker back to it
"""
self.state = "idle"
self.running = True
def current_state(self):
"""
Return the worker state
"""
return {
"is_running": self.running,
"queue_size": len(self.print_queue),
"state" : self.state
}