Add queue completion method, add Printer discovery and types

This commit is contained in:
n07070
2026-06-03 23:54:15 +02:00
parent db7e030a1f
commit f841cd5628
3 changed files with 363 additions and 93 deletions

View File

@@ -76,7 +76,13 @@ class PrintQueue:
"""Return current queue state"""
with self._lock:
self.app.logger.debug("Return current queue state")
return [{"task_id": t.task_id, "status": t.status} for t in self._queue]
return [{"task_id": t.task_id, "status": t.status, "type": str(t.task_type) } for t in self._queue]
def get_queue_completed(self):
"""Return completed queue elements"""
with self._lock:
self.app.logger.debug("Return completed queue elements")
return self._completed_tasks
def get_status(self, task_id):
"""Get full status info for a task"""

View File

@@ -1,13 +1,15 @@
"""
This class manages connexion to a Printer
"""
# import brother_ql
from time import sleep
import os.path
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import uuid
import usb.core
import threading
from PIL import Image, ImageEnhance
import numpy as np
@@ -15,20 +17,49 @@ import numpy as np
# Importing the modules needed for each supported printer Type
import escpos.printer
import brother_ql
from brother_ql.models import ModelsManager
from brother_ql.backends import backend_factory
from brother_ql import labels
from brother_ql.raster import BrotherQLRaster
from brother_ql.conversion import convert
from brother_ql.backends.helpers import send
class PrinterCapabilities(Enum):
class PrinterType(Enum):
"""
What are the capacities of a Printer ?
"""
EPSON = "epson"
BROTHER = "brother"
# For Brother-QL Printers
@dataclass
class PrinterInfo:
identifier: str
backend: str
protocol: str
vendor_id: str
product_id: str
serial_number: str
name: str = "Brother QL Printer"
model: str = "QL-570"
status: str = "unknown"
label_type: str = "unknown"
label_size : str = "unknown"
label_width: int = 0
label_height: int = 0
def __getitem__(self, item):
return getattr(self, item)
def __setitem__(self, key, value):
setattr(self, key, value)
class Printer(ABC):
"""
If it outputs printed paper and speaks like a printer, then it must be a printer.
"""
def __init__(self, app, vendor_id, device_id):
def __init__(self, app, vendor_id, device_id, printer_type: PrinterType):
"""
We initialize a Printer via it's USB connexion, and generate a unique ID
"""
@@ -37,6 +68,8 @@ class Printer(ABC):
self.vendor_id = vendor_id
self.device_id = device_id
self.ready = False
self.printer_type = printer_type
self._lock = threading.Lock()
@abstractmethod
def _has_paper(self) -> bool:
@@ -53,69 +86,41 @@ class Printer(ABC):
class EscPosPrinter(Printer):
"""
# The connection is based on the ESC/POS library
## Connection to the USB printer
## Making sure the printer is alive
## Making sure it has paper
## Define default print settings
## Print starting message, log time of first print, cut.
## Annonce readyness : return a positive pong message.
Create a new ESC/POS based printer.
"""
def __init__(self, app):
super().__init__(app, vendor_id="",device_id="")
def __init__(self, app, vendor_id, device_id):
"""
Create a connexion to a ESC/POS Printer via USB,
Making sure the printer is alive,
Making sure it has paper,
Define default print settings
"""
super().__init__(app,vendor_id,device_id, printer_type=PrinterType.EPSON)
self.printer = None
self.usb_args = {}
self.usb_args["idVendor"] = self.device_id
self.usb_args["idProduct"] = self.vendor_id
self.usb_args["idVendor"] = self.vendor_id
self.usb_args["idProduct"] = self.device_id
if os.getenv("FLASK_DEBUG"):
waiting_elapsed = 3
else:
waiting_elapsed = 10
self.app.logger.debug("Waiting for printer to get online...")
online = False
while not online:
try:
# This also calls open(), which we need to close()
# or else the device will appear as busy.
p = escpos.printer.Usb(
self.device_id, self.vendor_id, 0, profile="TM-P80"
self.vendor_id, self.device_id, 0, profile="TM-P80"
)
except escpos.exceptions.DeviceNotFoundError as e:
self.app.logger.error(
"The USB device is not plugged in, trying again %s : %s",
waiting_elapsed,
"The USB device is not plugged in : %s",
str(e),
)
except Exception as e:
self.app.logger.error("Printer could not be connected : %s ", str(e))
try:
if p.is_online():
online = True
self.app.logger.debug("Printer online !")
except escpos.exceptions.DeviceNotFoundError as e:
self.app.logger.error(
"Error while getting the printer online %s : %s",
waiting_elapsed,
str(e),
)
sleep(1)
waiting_elapsed -= 1
if waiting_elapsed < 1:
self.app.logger.error(
"Printer took more than 30 seconds to get online, aborting..."
)
waiting_elapsed = 1 # Reset the waiting time for the next print.
raise RuntimeError("Could not get Printer %s online" % self.id)
except Exception as e:
raise e
# Setting up the printing options.
p.set(
@@ -284,7 +289,7 @@ class EscPosPrinter(Printer):
try:
self.printer.open(self.usb_args)
self.printer.qr(content, center=True)
self.printer.textln(content, center=True)
self.printer.textln(content)
self.printer.close()
except RuntimeError as e:
self.printer.close()
@@ -311,21 +316,28 @@ class EscPosPrinter(Printer):
def print_task(self, task_type, data):
"""Execute actual print based on task type"""
if self._state():
with self._lock:
if self._state:
self._state = False
match (task_type.value):
case "text":
self._print_txt(data["txt"], signature=data["sign"])
self._state = True
case "image":
self._print_img(
data["img"], signature=data["sign"], process=data["process"]
)
self._state = True
case "cut":
self._cut()
self._state = True
case "qr":
self._qr(data["txt"])
self._state = True
case _:
raise RuntimeError("This task type is not supported")
else:
raise RuntimeError("The printer is not ready to print yet !")
class BrotherPrinter(Printer):
"""
@@ -333,17 +345,156 @@ class BrotherPrinter(Printer):
"""
def __init__(self, app, vendor_id, device_id):
super().__init__(app, vendor_id="",device_id="")
super().__init__(app, vendor_id="",device_id="", printer_type=PrinterType.BROTHER)
self.printer = None
self.usb_args = {}
self.usb_args["idVendor"] = self.device_id
self.usb_args["idProduct"] = self.vendor_id
self.model_manager = ModelsManager()
def _has_paper():
# Code taken from https://github.com/5shekel/printit/blob/master/printer_utils.py
def _state():
backend = backend_factory("pyusb")
available_devices = backend["list_available_devices"]()
def print_task():
for printer in available_devices:
self.app.logger.debug(f"Found device: {printer}")
identifier = printer["identifier"]
parts = identifier.split("/")
if len(parts) < 4:
self.app.logger.warning(f"Skipping device with invalid identifier format: {identifier}")
continue
protocol = parts[0]
device_info = parts[2]
serial_number = parts[3]
try:
product_id_int = int(self.device_id, 16)
for m in self.model_manager.iter_elements():
if m.product_id == product_id_int:
model = m.identifier
break
self.app.logger.debug(f"Matched printer model: {model}")
except ValueError:
self.app.logger.warning(f"Invalid product ID format: {product_id}")
self.printer_info = PrinterInfo(
identifier=identifier,
backend="pyusb",
model=model,
protocol=protocol,
vendor_id=vendor_id,
product_id=self.device_id,
serial_number=serial_number,
)
self.ready = True
def _has_paper(self):
raise NotImplementedError("This printer model does not support this.")
def _state(self):
return self.ready
def _print_img(self,data):
"""
Print a raster image via a Brother QL printer
"""
self.ready = False
label_type = "102"
rotate = 0
dither = False
try:
# Prepare the image for printing
qlr = BrotherQLRaster(self.printer_info["model"])
instructions = convert(
qlr=qlr,
images=[data["img"]],
label=label_type,
rotate=rotate,
threshold=70,
dither=dither,
compress=True,
red=False,
dpi_600=False,
hq=False,
cut=True,
)
# Debug logging
if FLASK_DEBUG:
self.app.logger.debug(f"""
Print parameters:
- Label type: {label_type}
- Rotate: {rotate}
- Dither: {dither}
- Model: {self.printer_info['model']}
- Backend: {self.printer_info['backend']}
- Identifier: {self.printer_info['identifier']}
""")
# Try to print using Python API
# send() = status = {
# 'instructions_sent': True, # The instructions were sent to the printer.
# 'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error'
# 'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it.
# 'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability).
# 'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown.
# }
status = send(
instructions=instructions,
printer_identifier= self.printer_info["identifier"],
backend_identifier="pyusb"
)
if not status["did_print"] or status["outcome"] == "error" or status["outcome"] == "unknown":
raise RuntimeError("Failed to print using Python API")
if status["printer_state"]:
self.ready = bool(status["printer_state"])
else:
self.ready = True
except usb.core.USBError as e:
# Treat timeout errors as successful since they often occur after print completion
if e.errno == 110: # Operation timed out
self.app.logger.debug("USB timeout occurred - this is normal and the print likely completed")
self.app.logger.debug("Print completed (timeout is normal)")
self.ready = True
error_msg = f"USBError encountered: {e}"
self.app.logger.debug(error_msg)
raise RuntimeError from e
except Exception as e:
error_msg = f"Unexpected error during printing: {str(e)}"
self.app.logger.debug(error_msg)
raise RuntimeError from e
def print_task(self, task_type, data):
"""Execute actual print based on task type"""
with self._lock:
if self._state:
self._state = False
match (task_type.value):
case "image":
self._print_img(data["img"])
self._state = True
case "cut":
# The cut happens by default on Brother QL printers.
self._state = True
case _:
raise RuntimeError("This task type is not supported")
else:
raise RuntimeError("The printer is not ready to print yet !")
# These values are by default for now
# raise NotImplementedError("This printer type is not implemented yet")
def _process_image(self, path):
brightness_factor = 1.5 # Used only if image is too dark

View File

@@ -3,16 +3,129 @@ A collection of Printers.
It has methods to discover printers, and provides an interface for the methods expected from printers.
"""
from printer import Printer
from collections.abc import Mapping, Set
import usb.core
import usb.util
from printer import Printer, EscPosPrinter, BrotherPrinter, PrinterType
class Printers():
"""
A collection of Printers
Finds and creates a set of Printer that can be used by the Workers to print.
"""
def __init__(self, name, age):
self.name = name
self.age = age
def __init__(self, app):
"""
Discover printers connected to the computer and return a Collection of Printer()
"""
self.app = app
self.printers = self._discover_printers()
def _discover_printers(self):
pass
def _discover_printers(self) -> Set[Printer]:
"""
Gets connected USB printer devices using the pyusb library.
We analyse the USB devices, get the ones that match
the printer class ( 7 ) and for Brother and EPSON printers,
try to create a Printer object that can be used by the Worker class
to execute prints.
Returns a set of Printer
"""
self.app.logger.debug("Discovering USB Devices connected to this system")
printers = set()
# Find all connected USB devices
devices = usb.core.find(find_all=True,custom_match=_FindClass(7))
if not devices:
self.app.logger.warning("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.")
raise RuntimeError("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.")
for dev in devices:
# Attempt to get the manufacturer and product strings
try:
manufacturer = usb.util.get_string(dev, dev.iManufacturer)
except Exception:
manufacturer = "Unknown"
try:
product = usb.util.get_string(dev, dev.iProduct)
except Exception:
product = "Unknown"
self.app.logger.debug("Looking at %s %s (%s:%s)", manufacturer, product, hex(dev.idVendor), hex(dev.idProduct))
if manufacturer == "EPSON":
try:
# We create a new EscPosPrinter()
self.app.logger.debug("Trying to creat a new EPSON printer")
prid = dev.idProduct
vendir = dev.idVendor
escpos_printer = EscPosPrinter(self.app, vendor_id=vendir, device_id=prid)
except Exception as e:
raise e
# If the object creation is successfull, we add it to the list of Printers
printers.add(escpos_printer)
self.app.logger.debug("Found a %s printer" % manufacturer )
# We already found the type of printer,
# we don't need an extra comparaison.
continue
# or a Brother Printer
if manufacturer == "Brother":
try:
# We create a new BrotherPrinter()
self.app.logger.debug("Trying to creat a new BROTHER printer")
prid = dev.idProduct
vendir = dev.idVendor
brother_printer = BrotherPrinter(self.app, vendor_id=vendir,device_id=prid)
except Exception as e:
self.app.logger.error("Could not create a %s printer class with %s:%s" % product, dev.idVendor, dev.idProduct)
raise e
# If the object creation is successfull, we add it to the list of Printers
printers.add(brother_printer)
self.app.logger.debug("Found a %s printer" % manufacturer )
self.app.logger.debug("Found %s printers" % len(printers))
return printers
def any(self) -> Printer:
"""
Return a dict key: UUID, value: Printer, with any connected printer.
"""
if len(self.printers) > 0:
for i in self.printers:
return i
else:
raise RuntimeError("No printers available")
def get_printer(self, printer_type):
"""
Return a specific printer
printer_type -- a printer type
"""
return NotImplementedError()
class _FindClass():
def __init__(self, class_):
self._class = class_
def __call__(self, device):
# first, let's check the device
if device.bDeviceClass == self._class:
return True
# ok, transverse all devices to find an
# interface that matches our class
for cfg in device:
# find_descriptor: what's it?
intf = usb.util.find_descriptor(
cfg,
bInterfaceClass=self._class
)
if intf is not None:
return True
return False