Update worker to get differents printer types
This commit is contained in:
114
src/worker.py
114
src/worker.py
@@ -4,64 +4,105 @@
|
|||||||
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
from printers import Printers
|
||||||
|
|
||||||
|
|
||||||
class PrintWorker(threading.Thread):
|
class PrintWorker(threading.Thread):
|
||||||
def __init__(self, app, print_queue, printer, socketio=None):
|
def __init__(self, app, print_queue, socketio=None):
|
||||||
super().__init__(daemon=True)
|
super().__init__(daemon=True)
|
||||||
self.app = app
|
self.app = app
|
||||||
self.print_queue = print_queue
|
self.print_queue = print_queue
|
||||||
self.printer = printer
|
self.printer = None
|
||||||
|
self._lock = threading.Lock()
|
||||||
self.socketio = socketio # Optional
|
self.socketio = socketio # Optional
|
||||||
self.running = True
|
self.running = True
|
||||||
self.state = "idle" # idle, printing, dead, drinking-a-beer
|
self.state = "idle" # idle, printing, dead, drinking-a-beer
|
||||||
|
|
||||||
self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...")
|
self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.printers = Printers(self.app)
|
||||||
|
self.printers_obj = self.printers.printers
|
||||||
|
self.printers = iter(self.printers.printers)
|
||||||
|
except RuntimeError as e:
|
||||||
|
self.app.logger.warning("Could not get any Printers")
|
||||||
|
raise e
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Background thread that processes queue items"""
|
"""Background thread that processes queue items"""
|
||||||
self.app.logger.info("Worker started working.")
|
self.app.logger.debug("Worker %s started working.", threading.get_ident())
|
||||||
|
self.app.logger.debug("Current threads : %s" % threading.active_count())
|
||||||
|
self.app.logger.debug("Threads actives : %s " % threading.enumerate())
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if not self.running or not self.printer.ready:
|
|
||||||
|
# If the printer is dead or asleep, it can't work.
|
||||||
|
if not self.running:
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
# If we have no available printer, we look at the list printers we know about, and try to find one that is available.
|
||||||
task = self.print_queue.dequeue()
|
# When we find a printer, we acquire it
|
||||||
except Exception as e:
|
# When we are finished with a printer, we release it to the world.
|
||||||
self.app.logger.error("Could not get a new task ! %s ", str(e))
|
while not self.printer or not self.printer.ready:
|
||||||
raise RuntimeError(
|
time.sleep(1)
|
||||||
"We could not get a new task because " + str(e)
|
|
||||||
) from e
|
|
||||||
|
|
||||||
if task:
|
|
||||||
try:
|
try:
|
||||||
self.app.logger.info("Got a new task")
|
self.app.logger.debug("Changing printers")
|
||||||
self.app.logger.debug("Got task %s", task.task_id)
|
self.printer = next(self.printers)
|
||||||
self.state = "printing"
|
self.app.logger.debug("The worker got a %s printer and it's %s", self.printer.printer_type, "Ready" if self.printer.ready else "Not ready")
|
||||||
task.status = "processing"
|
except Exception as e:
|
||||||
self._emit_status(task.task_id, "processing")
|
self.app.logger.error(str(e))
|
||||||
|
self.printer = None
|
||||||
|
|
||||||
print_data = task.get_print_data()
|
if self.state != "idle":
|
||||||
|
self.app.logger("We are not idle, waiting...")
|
||||||
|
time.sleep(1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.state = "printing"
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
try:
|
||||||
|
task = self.print_queue.dequeue()
|
||||||
|
except Exception as e:
|
||||||
|
self.app.logger.error("Could not get a new task ! %s ", str(e))
|
||||||
|
self.state = "idle"
|
||||||
|
raise RuntimeError(
|
||||||
|
"We could not get a new task because " + str(e)
|
||||||
|
) from e
|
||||||
|
|
||||||
|
if task:
|
||||||
try:
|
try:
|
||||||
self.printer.print_task(task.task_type, print_data)
|
self.app.logger.info("Got a new task")
|
||||||
|
self.app.logger.debug("Got task %s", task.task_id)
|
||||||
|
task.status = "processing"
|
||||||
|
self._emit_status(task.task_id, "processing")
|
||||||
|
|
||||||
|
print_data = task.get_print_data()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.printer.print_task(task.task_type, print_data)
|
||||||
|
except RuntimeError as e:
|
||||||
|
self.state = "idle"
|
||||||
|
self.app.logger.error("Could not print : %s", str(e))
|
||||||
|
raise e
|
||||||
|
|
||||||
|
task.status = "completed"
|
||||||
|
self.print_queue.mark_completed(task.task_id, "completed")
|
||||||
|
self._emit_status(task.task_id, "completed")
|
||||||
|
self.app.logger.debug("Finished printing task %s " % task.task_id)
|
||||||
|
self.state = "idle"
|
||||||
|
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
self.app.logger.error("Could not print : %s", str(e))
|
task.status = "failed"
|
||||||
raise e
|
self.state = "idle"
|
||||||
|
self.print_queue.mark_completed(task.task_id, "failed")
|
||||||
|
self._emit_status(task.task_id, "failed", error=str(e))
|
||||||
|
self.app.logger.error("Could not print task %s because %s " % task.task_id, str(e))
|
||||||
|
|
||||||
task.status = "completed"
|
else:
|
||||||
self.print_queue.mark_completed(task.task_id, "completed")
|
# When they are no new tasks to handle, we put the thread to sleep.
|
||||||
self._emit_status(task.task_id, "completed")
|
self.state = "idle"
|
||||||
|
time.sleep(0.1)
|
||||||
except RuntimeError as e:
|
|
||||||
task.status = "failed"
|
|
||||||
self.print_queue.mark_completed(task.task_id, "failed")
|
|
||||||
self._emit_status(task.task_id, "failed", error=str(e))
|
|
||||||
print(f"Print task {task.task_id} failed: {e}")
|
|
||||||
else:
|
|
||||||
# When they are no new tasks to handle, we put the thread to sleep.
|
|
||||||
self.state = "idle"
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
def _emit_status(self, task_id, status, error=None):
|
def _emit_status(self, task_id, status, error=None):
|
||||||
"""Emit status update via Socket.IO if available"""
|
"""Emit status update via Socket.IO if available"""
|
||||||
@@ -104,4 +145,5 @@ class PrintWorker(threading.Thread):
|
|||||||
"is_running": self.running,
|
"is_running": self.running,
|
||||||
"queue_size": len(self.print_queue),
|
"queue_size": len(self.print_queue),
|
||||||
"state": self.state,
|
"state": self.state,
|
||||||
|
"printers": len(self.printers_obj),
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user