From f841cd56281e65d8a082f20cbd2c907f4655f8a2 Mon Sep 17 00:00:00 2001 From: n07070 Date: Wed, 3 Jun 2026 23:54:15 +0200 Subject: [PATCH] Add queue completion method, add Printer discovery and types --- src/print_queue.py | 8 +- src/printer.py | 319 +++++++++++++++++++++++++++++++++------------ src/printers.py | 129 ++++++++++++++++-- 3 files changed, 363 insertions(+), 93 deletions(-) diff --git a/src/print_queue.py b/src/print_queue.py index 766a322..2922143 100644 --- a/src/print_queue.py +++ b/src/print_queue.py @@ -76,7 +76,13 @@ class PrintQueue: """Return current queue state""" with self._lock: self.app.logger.debug("Return current queue state") - return [{"task_id": t.task_id, "status": t.status} for t in self._queue] + return [{"task_id": t.task_id, "status": t.status, "type": str(t.task_type) } for t in self._queue] + + def get_queue_completed(self): + """Return completed queue elements""" + with self._lock: + self.app.logger.debug("Return completed queue elements") + return self._completed_tasks def get_status(self, task_id): """Get full status info for a task""" diff --git a/src/printer.py b/src/printer.py index 93570cc..5e32dde 100644 --- a/src/printer.py +++ b/src/printer.py @@ -1,13 +1,15 @@ """ This class manages connexion to a Printer """ -# import brother_ql from time import sleep import os.path from abc import ABC, abstractmethod +from dataclasses import dataclass from enum import Enum import uuid +import usb.core +import threading from PIL import Image, ImageEnhance import numpy as np @@ -15,20 +17,49 @@ import numpy as np # Importing the modules needed for each supported printer Type import escpos.printer -import brother_ql +from brother_ql.models import ModelsManager +from brother_ql.backends import backend_factory +from brother_ql import labels +from brother_ql.raster import BrotherQLRaster +from brother_ql.conversion import convert +from brother_ql.backends.helpers import send -class PrinterCapabilities(Enum): - """ - What are the capacities of a Printer ? - """ +class PrinterType(Enum): + """ + What are the capacities of a Printer ? + """ + EPSON = "epson" + BROTHER = "brother" +# For Brother-QL Printers +@dataclass +class PrinterInfo: + identifier: str + backend: str + protocol: str + vendor_id: str + product_id: str + serial_number: str + name: str = "Brother QL Printer" + model: str = "QL-570" + status: str = "unknown" + label_type: str = "unknown" + label_size : str = "unknown" + label_width: int = 0 + label_height: int = 0 + + def __getitem__(self, item): + return getattr(self, item) + + def __setitem__(self, key, value): + setattr(self, key, value) class Printer(ABC): """ If it outputs printed paper and speaks like a printer, then it must be a printer. """ - def __init__(self, app, vendor_id, device_id): + def __init__(self, app, vendor_id, device_id, printer_type: PrinterType): """ We initialize a Printer via it's USB connexion, and generate a unique ID """ @@ -37,6 +68,8 @@ class Printer(ABC): self.vendor_id = vendor_id self.device_id = device_id self.ready = False + self.printer_type = printer_type + self._lock = threading.Lock() @abstractmethod def _has_paper(self) -> bool: @@ -53,69 +86,41 @@ class Printer(ABC): class EscPosPrinter(Printer): """ - # The connection is based on the ESC/POS library - - ## Connection to the USB printer - - ## Making sure the printer is alive - - ## Making sure it has paper - - ## Define default print settings - - ## Print starting message, log time of first print, cut. - - ## Annonce readyness : return a positive pong message. + Create a new ESC/POS based printer. """ - def __init__(self, app): - super().__init__(app, vendor_id="",device_id="") + def __init__(self, app, vendor_id, device_id): + """ + Create a connexion to a ESC/POS Printer via USB, + Making sure the printer is alive, + Making sure it has paper, + Define default print settings + """ + super().__init__(app,vendor_id,device_id, printer_type=PrinterType.EPSON) self.printer = None self.usb_args = {} - self.usb_args["idVendor"] = self.device_id - self.usb_args["idProduct"] = self.vendor_id + self.usb_args["idVendor"] = self.vendor_id + self.usb_args["idProduct"] = self.device_id - if os.getenv("FLASK_DEBUG"): - waiting_elapsed = 3 - else: - waiting_elapsed = 10 + try: + # This also calls open(), which we need to close() + # or else the device will appear as busy. + p = escpos.printer.Usb( + self.vendor_id, self.device_id, 0, profile="TM-P80" + ) + except escpos.exceptions.DeviceNotFoundError as e: + self.app.logger.error( + "The USB device is not plugged in : %s", + str(e), + ) + except Exception as e: + self.app.logger.error("Printer could not be connected : %s ", str(e)) - self.app.logger.debug("Waiting for printer to get online...") - - online = False - while not online: - try: - # This also calls open(), which we need to close() - # or else the device will appear as busy. - p = escpos.printer.Usb( - self.device_id, self.vendor_id, 0, profile="TM-P80" - ) - except escpos.exceptions.DeviceNotFoundError as e: - self.app.logger.error( - "The USB device is not plugged in, trying again %s : %s", - waiting_elapsed, - str(e), - ) - - try: - if p.is_online(): - online = True - self.app.logger.debug("Printer online !") - except escpos.exceptions.DeviceNotFoundError as e: - self.app.logger.error( - "Error while getting the printer online %s : %s", - waiting_elapsed, - str(e), - ) - - sleep(1) - waiting_elapsed -= 1 - if waiting_elapsed < 1: - self.app.logger.error( - "Printer took more than 30 seconds to get online, aborting..." - ) - waiting_elapsed = 1 # Reset the waiting time for the next print. - raise RuntimeError("Could not get Printer %s online" % self.id) + try: + if p.is_online(): + self.app.logger.debug("Printer online !") + except Exception as e: + raise e # Setting up the printing options. p.set( @@ -284,7 +289,7 @@ class EscPosPrinter(Printer): try: self.printer.open(self.usb_args) self.printer.qr(content, center=True) - self.printer.textln(content, center=True) + self.printer.textln(content) self.printer.close() except RuntimeError as e: self.printer.close() @@ -311,21 +316,28 @@ class EscPosPrinter(Printer): def print_task(self, task_type, data): """Execute actual print based on task type""" - if self._state(): - match (task_type.value): - case "text": - self._print_txt(data["txt"], signature=data["sign"]) - case "image": - self._print_img( - data["img"], signature=data["sign"], process=data["process"] - ) - case "cut": - self._cut() - case "qr": - self._qr(data["txt"]) - case _: - raise RuntimeError("This task type is not supported") - + with self._lock: + if self._state: + self._state = False + match (task_type.value): + case "text": + self._print_txt(data["txt"], signature=data["sign"]) + self._state = True + case "image": + self._print_img( + data["img"], signature=data["sign"], process=data["process"] + ) + self._state = True + case "cut": + self._cut() + self._state = True + case "qr": + self._qr(data["txt"]) + self._state = True + case _: + raise RuntimeError("This task type is not supported") + else: + raise RuntimeError("The printer is not ready to print yet !") class BrotherPrinter(Printer): """ @@ -333,17 +345,156 @@ class BrotherPrinter(Printer): """ def __init__(self, app, vendor_id, device_id): - super().__init__(app, vendor_id="",device_id="") + super().__init__(app, vendor_id="",device_id="", printer_type=PrinterType.BROTHER) self.printer = None self.usb_args = {} self.usb_args["idVendor"] = self.device_id self.usb_args["idProduct"] = self.vendor_id + self.model_manager = ModelsManager() - def _has_paper(): + # Code taken from https://github.com/5shekel/printit/blob/master/printer_utils.py - def _state(): + backend = backend_factory("pyusb") + available_devices = backend["list_available_devices"]() - def print_task(): + for printer in available_devices: + self.app.logger.debug(f"Found device: {printer}") + identifier = printer["identifier"] + parts = identifier.split("/") + + if len(parts) < 4: + self.app.logger.warning(f"Skipping device with invalid identifier format: {identifier}") + continue + + protocol = parts[0] + device_info = parts[2] + serial_number = parts[3] + + try: + product_id_int = int(self.device_id, 16) + for m in self.model_manager.iter_elements(): + if m.product_id == product_id_int: + model = m.identifier + break + self.app.logger.debug(f"Matched printer model: {model}") + except ValueError: + self.app.logger.warning(f"Invalid product ID format: {product_id}") + + self.printer_info = PrinterInfo( + identifier=identifier, + backend="pyusb", + model=model, + protocol=protocol, + vendor_id=vendor_id, + product_id=self.device_id, + serial_number=serial_number, + ) + + self.ready = True + + + def _has_paper(self): + raise NotImplementedError("This printer model does not support this.") + + def _state(self): + return self.ready + + def _print_img(self,data): + """ + Print a raster image via a Brother QL printer + """ + self.ready = False + label_type = "102" + rotate = 0 + dither = False + try: + # Prepare the image for printing + qlr = BrotherQLRaster(self.printer_info["model"]) + + instructions = convert( + qlr=qlr, + images=[data["img"]], + label=label_type, + rotate=rotate, + threshold=70, + dither=dither, + compress=True, + red=False, + dpi_600=False, + hq=False, + cut=True, + ) + + # Debug logging + if FLASK_DEBUG: + self.app.logger.debug(f""" + Print parameters: + - Label type: {label_type} + - Rotate: {rotate} + - Dither: {dither} + - Model: {self.printer_info['model']} + - Backend: {self.printer_info['backend']} + - Identifier: {self.printer_info['identifier']} + """) + + # Try to print using Python API + # send() = status = { + # 'instructions_sent': True, # The instructions were sent to the printer. + # 'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error' + # 'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it. + # 'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability). + # 'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown. + # } + status = send( + instructions=instructions, + printer_identifier= self.printer_info["identifier"], + backend_identifier="pyusb" + ) + + if not status["did_print"] or status["outcome"] == "error" or status["outcome"] == "unknown": + raise RuntimeError("Failed to print using Python API") + + if status["printer_state"]: + self.ready = bool(status["printer_state"]) + else: + self.ready = True + + except usb.core.USBError as e: + # Treat timeout errors as successful since they often occur after print completion + if e.errno == 110: # Operation timed out + self.app.logger.debug("USB timeout occurred - this is normal and the print likely completed") + self.app.logger.debug("Print completed (timeout is normal)") + self.ready = True + + error_msg = f"USBError encountered: {e}" + self.app.logger.debug(error_msg) + raise RuntimeError from e + + except Exception as e: + error_msg = f"Unexpected error during printing: {str(e)}" + self.app.logger.debug(error_msg) + raise RuntimeError from e + + def print_task(self, task_type, data): + """Execute actual print based on task type""" + with self._lock: + if self._state: + self._state = False + match (task_type.value): + case "image": + self._print_img(data["img"]) + self._state = True + case "cut": + # The cut happens by default on Brother QL printers. + self._state = True + case _: + raise RuntimeError("This task type is not supported") + else: + raise RuntimeError("The printer is not ready to print yet !") + + # These values are by default for now + + # raise NotImplementedError("This printer type is not implemented yet") def _process_image(self, path): brightness_factor = 1.5 # Used only if image is too dark diff --git a/src/printers.py b/src/printers.py index 0f820bc..7df1d99 100644 --- a/src/printers.py +++ b/src/printers.py @@ -3,16 +3,129 @@ A collection of Printers. It has methods to discover printers, and provides an interface for the methods expected from printers. """ - -from printer import Printer +from collections.abc import Mapping, Set +import usb.core +import usb.util +from printer import Printer, EscPosPrinter, BrotherPrinter, PrinterType class Printers(): """ - A collection of Printers + Finds and creates a set of Printer that can be used by the Workers to print. """ - def __init__(self, name, age): - self.name = name - self.age = age + def __init__(self, app): + """ + Discover printers connected to the computer and return a Collection of Printer() + """ + self.app = app + self.printers = self._discover_printers() - def _discover_printers(self): - pass \ No newline at end of file + def _discover_printers(self) -> Set[Printer]: + """ + Gets connected USB printer devices using the pyusb library. + + We analyse the USB devices, get the ones that match + the printer class ( 7 ) and for Brother and EPSON printers, + try to create a Printer object that can be used by the Worker class + to execute prints. + + Returns a set of Printer + """ + + self.app.logger.debug("Discovering USB Devices connected to this system") + + printers = set() + + # Find all connected USB devices + devices = usb.core.find(find_all=True,custom_match=_FindClass(7)) + if not devices: + self.app.logger.warning("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.") + raise RuntimeError("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.") + + for dev in devices: + # Attempt to get the manufacturer and product strings + try: + manufacturer = usb.util.get_string(dev, dev.iManufacturer) + except Exception: + manufacturer = "Unknown" + + try: + product = usb.util.get_string(dev, dev.iProduct) + except Exception: + product = "Unknown" + self.app.logger.debug("Looking at %s %s (%s:%s)", manufacturer, product, hex(dev.idVendor), hex(dev.idProduct)) + + if manufacturer == "EPSON": + try: + # We create a new EscPosPrinter() + self.app.logger.debug("Trying to creat a new EPSON printer") + prid = dev.idProduct + vendir = dev.idVendor + escpos_printer = EscPosPrinter(self.app, vendor_id=vendir, device_id=prid) + except Exception as e: + raise e + + # If the object creation is successfull, we add it to the list of Printers + printers.add(escpos_printer) + self.app.logger.debug("Found a %s printer" % manufacturer ) + + # We already found the type of printer, + # we don't need an extra comparaison. + continue + + # or a Brother Printer + if manufacturer == "Brother": + try: + # We create a new BrotherPrinter() + self.app.logger.debug("Trying to creat a new BROTHER printer") + prid = dev.idProduct + vendir = dev.idVendor + brother_printer = BrotherPrinter(self.app, vendor_id=vendir,device_id=prid) + except Exception as e: + self.app.logger.error("Could not create a %s printer class with %s:%s" % product, dev.idVendor, dev.idProduct) + raise e + + # If the object creation is successfull, we add it to the list of Printers + printers.add(brother_printer) + self.app.logger.debug("Found a %s printer" % manufacturer ) + + self.app.logger.debug("Found %s printers" % len(printers)) + return printers + + def any(self) -> Printer: + """ + Return a dict key: UUID, value: Printer, with any connected printer. + """ + if len(self.printers) > 0: + for i in self.printers: + return i + else: + raise RuntimeError("No printers available") + + def get_printer(self, printer_type): + """ + Return a specific printer + + printer_type -- a printer type + """ + return NotImplementedError() + + +class _FindClass(): + def __init__(self, class_): + self._class = class_ + def __call__(self, device): + # first, let's check the device + if device.bDeviceClass == self._class: + return True + # ok, transverse all devices to find an + # interface that matches our class + for cfg in device: + # find_descriptor: what's it? + intf = usb.util.find_descriptor( + cfg, + bInterfaceClass=self._class + ) + if intf is not None: + return True + + return False