From 9e77e0980b080a98108d8287732ad39bf5808836 Mon Sep 17 00:00:00 2001 From: n07070 Date: Mon, 1 Jun 2026 21:40:13 +0200 Subject: [PATCH 01/25] Change the Printer class to implement multiple printer types --- src/main.py | 1 - src/printer.py | 175 +++++++++++++++++++++++++++++++----------------- src/printers.py | 18 +++++ 3 files changed, 132 insertions(+), 62 deletions(-) create mode 100644 src/printers.py diff --git a/src/main.py b/src/main.py index e68399a..619a627 100644 --- a/src/main.py +++ b/src/main.py @@ -101,7 +101,6 @@ app.config["TEMPLATES_AUTO_RELOAD"] = True # Printer connection # Uses the class defined in the printer.py file printer = Printer(app, 0x04B8, 0x0E28) -printer.init_printer() # Find out if we are running on a Raspberry Pi rpi = Raspberry( diff --git a/src/printer.py b/src/printer.py index 66a781f..93570cc 100644 --- a/src/printer.py +++ b/src/printer.py @@ -4,15 +4,54 @@ This class manages connexion to a Printer # import brother_ql from time import sleep import os.path +from abc import ABC, abstractmethod + +from enum import Enum +import uuid from PIL import Image, ImageEnhance import numpy as np -# Importing the module to manage the connection to the printer. + +# Importing the modules needed for each supported printer Type import escpos.printer +import brother_ql + +class PrinterCapabilities(Enum): + """ + What are the capacities of a Printer ? + """ -class Printer(): +class Printer(ABC): + """ + If it outputs printed paper and speaks like a printer, then it must be a printer. + """ + + def __init__(self, app, vendor_id, device_id): + """ + We initialize a Printer via it's USB connexion, and generate a unique ID + """ + self.id = uuid.uuid4() + self.app = app + self.vendor_id = vendor_id + self.device_id = device_id + self.ready = False + + @abstractmethod + def _has_paper(self) -> bool: + """Check if the printer has papier""" + + @abstractmethod + def _state(self) -> bool: + """Reports the state of the Printer""" + + @abstractmethod + def print_task(self, task_type, data)-> None: + """Takes a PrintTask and executes it""" + + +class EscPosPrinter(Printer): """ # The connection is based on the ESC/POS library @@ -29,47 +68,13 @@ class Printer(): ## Annonce readyness : return a positive pong message. """ - # Is the printer ready to accept a new print ? - ready = False - - def __init__(self, app, device_id, vendor_id): - super().__init__() - self.app = app - self.ready = False + def __init__(self, app): + super().__init__(app, vendor_id="",device_id="") self.printer = None - self.device_id = device_id - self.vendor_id = vendor_id self.usb_args = {} self.usb_args["idVendor"] = self.device_id self.usb_args["idProduct"] = self.vendor_id - def check_paper(self) -> bool: - """ - On printers that support it, we check that the printer has paper - """ - self.app.logger.debug("Checking paper status...") - self.printer.open(self.usb_args) - status = self.printer.paper_status() - match status: - case 0: - self.app.logger.error("Printer has no more paper, aborting...") - self.printer.close() - raise RuntimeError("No more paper in the printer") - case 1: - self.app.logger.warning( - "Printer needs paper to be changed very soon ! " - ) - self.printer.close() - case 2: - self.app.logger.debug("Printer has paper, good to go") - self.printer.close() - - def init_printer(self): - """ - Check if the printer online ? Is the communication with the printer successfull ? - """ - - # TODO: This could happen directly when creating a new Printer class if os.getenv("FLASK_DEBUG"): waiting_elapsed = 3 else: @@ -77,14 +82,15 @@ class Printer(): self.app.logger.debug("Waiting for printer to get online...") - while not self.ready: + online = False + while not online: try: # This also calls open(), which we need to close() # or else the device will appear as busy. p = escpos.printer.Usb( self.device_id, self.vendor_id, 0, profile="TM-P80" ) - except RuntimeError as e: + except escpos.exceptions.DeviceNotFoundError as e: self.app.logger.error( "The USB device is not plugged in, trying again %s : %s", waiting_elapsed, @@ -93,9 +99,9 @@ class Printer(): try: if p.is_online(): - self.ready = True + online = True self.app.logger.debug("Printer online !") - except RuntimeError as e: + except escpos.exceptions.DeviceNotFoundError as e: self.app.logger.error( "Error while getting the printer online %s : %s", waiting_elapsed, @@ -109,7 +115,7 @@ class Printer(): "Printer took more than 30 seconds to get online, aborting..." ) waiting_elapsed = 1 # Reset the waiting time for the next print. - return False + raise RuntimeError("Could not get Printer %s online" % self.id) # Setting up the printing options. p.set( @@ -130,14 +136,37 @@ class Printer(): # Beware : if we print every time the printer becomes ready, it means # we are printing before and after every print ! self.printer = p + self.printer.close() # We close the connexion to the Printer + + try: + self._has_paper() + except Exception as e: + raise e + self.ready = True - self.printer.close() - self.check_paper() + def _has_paper(self): + """Check if the printer has paper left""" + self.app.logger.debug("Checking paper status...") + self.printer.open(self.usb_args) + status = self.printer.paper_status() + match status: + case 0: + self.app.logger.error("Printer has no more paper, aborting...") + self.printer.close() + raise RuntimeError("No more paper in the printer") + case 1: + self.app.logger.warning( + "Printer needs paper to be changed very soon ! " + ) + self.printer.close() + return True + case 2: + self.app.logger.debug("Printer has paper, good to go") + self.printer.close() + return True - return True - - def _print_sms(self, msg, signature="", bold=False): + def _print_txt(self, msg, signature="", bold=False): if not isinstance(msg, str): self.app.logger.error( @@ -255,14 +284,14 @@ class Printer(): try: self.printer.open(self.usb_args) self.printer.qr(content, center=True) + self.printer.textln(content, center=True) self.printer.close() except RuntimeError as e: self.printer.close() self.app.logger.error(str(e)) - return False + raise e self.app.logger.info("Printed a QR") - return True def _cut(self): try: @@ -277,21 +306,45 @@ class Printer(): self.app.logger.info("Did a cut") return True + def _state(self): + return self.printer.is_online() and self.ready and self._has_paper() + def print_task(self, task_type, data): """Execute actual print based on task type""" - match (task_type.value): - case "text": - self._print_sms(data["txt"], signature=data["sign"]) - case "image": - self._print_img( - data["img"], signature=data["sign"], process=data["process"] - ) - case "cut": - self._cut() - case _: - raise RuntimeError("This task type is not supported") + if self._state(): + match (task_type.value): + case "text": + self._print_txt(data["txt"], signature=data["sign"]) + case "image": + self._print_img( + data["img"], signature=data["sign"], process=data["process"] + ) + case "cut": + self._cut() + case "qr": + self._qr(data["txt"]) + case _: + raise RuntimeError("This task type is not supported") +class BrotherPrinter(Printer): + """ + Manages connexion and capabilities of a BrotherQL Printer + """ + + def __init__(self, app, vendor_id, device_id): + super().__init__(app, vendor_id="",device_id="") + self.printer = None + self.usb_args = {} + self.usb_args["idVendor"] = self.device_id + self.usb_args["idProduct"] = self.vendor_id + + def _has_paper(): + + def _state(): + + def print_task(): + def _process_image(self, path): brightness_factor = 1.5 # Used only if image is too dark brightness_threshold = 100 # Brightness threshold (0–255) diff --git a/src/printers.py b/src/printers.py new file mode 100644 index 0000000..0f820bc --- /dev/null +++ b/src/printers.py @@ -0,0 +1,18 @@ +""" +A collection of Printers. + +It has methods to discover printers, and provides an interface for the methods expected from printers. +""" + +from printer import Printer + +class Printers(): + """ + A collection of Printers + """ + def __init__(self, name, age): + self.name = name + self.age = age + + def _discover_printers(self): + pass \ No newline at end of file -- 2.47.3 From 9e4ec6c1a5ab91589d06c462524efc21678a17b3 Mon Sep 17 00:00:00 2001 From: n07070 Date: Mon, 1 Jun 2026 21:40:28 +0200 Subject: [PATCH 02/25] Add support for QR code task --- src/task.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/task.py b/src/task.py index 2962618..b7a01f8 100644 --- a/src/task.py +++ b/src/task.py @@ -30,6 +30,7 @@ class TaskType(Enum): TEXT = "text" IMAGE = "image" CUT = "cut" + QR = "qr" class PrintTask(ABC): @@ -67,6 +68,15 @@ class TextTask(PrintTask): def get_print_data(self): return {"txt": self.content, "sign": self.signature} +class QRTask(TextTask): + """This task prints a QR-Code, the signature is ignore and is always the content itself""" + def __init__(self, content): + super().__init__(content, signature="") + self.content = content + self.signature = content + + def get_print_data(self): + return {"txt": self.content, "sign": self.signature} class ImageTask(PrintTask): """ -- 2.47.3 From e549cdc64be6acabb3ff279e36dbe573485f7645 Mon Sep 17 00:00:00 2001 From: n07070 Date: Wed, 3 Jun 2026 23:51:13 +0200 Subject: [PATCH 03/25] Remove vendor and device ID from configuration because it's not needed anymore --- configuration/config.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/configuration/config.toml b/configuration/config.toml index cf299c7..31f6e5f 100644 --- a/configuration/config.toml +++ b/configuration/config.toml @@ -5,8 +5,6 @@ signature = "Anonymous" # Printer settings [printer] -vendor_id = 0x04b8 -device_id = 0x0e28 upload_folder = "src/static/uploads" # Raspberry Pi Configuration -- 2.47.3 From ef613b3c10e148804b28ea2ca311b4986257a08b Mon Sep 17 00:00:00 2001 From: n07070 Date: Wed, 3 Jun 2026 23:51:38 +0200 Subject: [PATCH 04/25] Remove print, add task Typing --- src/task.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/task.py b/src/task.py index b7a01f8..9fc38a9 100644 --- a/src/task.py +++ b/src/task.py @@ -38,13 +38,11 @@ class PrintTask(ABC): A print task holds information about what we are looking to print. """ - def __init__(self, task_type): + def __init__(self, task_type: TaskType): self.task_id = self._generate_id() self.task_type = task_type self.status = "pending" # pending, processing, completed, failed - print("Created a new " + str(self.task_type) + " with ID " + self.task_id) - @abstractmethod def get_print_data(self): """Return data formatted for printer""" -- 2.47.3 From 1218d3fbee7e2bcc8c42636ec5b4cdfc6d0cb8d8 Mon Sep 17 00:00:00 2001 From: n07070 Date: Wed, 3 Jun 2026 23:52:23 +0200 Subject: [PATCH 05/25] Remove unused import, add get_queue_completed method --- src/web.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/web.py b/src/web.py index 1d77974..1db14dd 100644 --- a/src/web.py +++ b/src/web.py @@ -1,7 +1,6 @@ +import os from flask import flash from werkzeug.utils import secure_filename -import time -import os from task import TextTask, ImageTask, CutTask @@ -99,11 +98,12 @@ class Web(object): + str(os.path.join(self.app.config["UPLOAD_FOLDER"], filename)) ) return True - else: - self.app.logger.error( - "Could not save file because the filename is forbidden" - ) - return False + + self.app.logger.error( + "Could not save file because the filename is forbidden" + ) + return False + else: self.app.logger.error( "Could not save file, it seems to be null ? : " + str(filename) @@ -113,3 +113,7 @@ class Web(object): def get_queue_state(self): """Return current queue state""" return self.print_queue.get_queue_state() + + def get_queue_completed(self): + """Return completed queue elements""" + return self.print_queue.get_queue_completed() \ No newline at end of file -- 2.47.3 From db7e030a1fbc62981b1be2a7dd16d746f9ed898a Mon Sep 17 00:00:00 2001 From: n07070 Date: Wed, 3 Jun 2026 23:53:02 +0200 Subject: [PATCH 06/25] Remove printer import, remove vendor&device configuration, update print_queue --- src/main.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/main.py b/src/main.py index 619a627..75b5cff 100644 --- a/src/main.py +++ b/src/main.py @@ -36,7 +36,6 @@ import werkzeug.exceptions from flask_socketio import SocketIO from flask_limiter import Limiter from flask_limiter.util import get_remote_address -from printer import Printer # The wrapper for the printer class from raspberry import Raspberry # The Raspberry pi control Class from web import Web # Wrapper for the web routes and API from print_queue import PrintQueue @@ -73,11 +72,7 @@ except OSError as e: app.logger.debug("Config file loaded !") -# Define the USB connections here. -vendor_id = configuration_file["printer"]["vendor_id"] -device_id = configuration_file["printer"]["device_id"] UPLOAD_FOLDER = str(configuration_file["printer"]["upload_folder"]) - try: os.mkdir(UPLOAD_FOLDER) app.logger.debug("Directory %s created successfully.", UPLOAD_FOLDER) @@ -98,13 +93,12 @@ app.config["ALLOWED_EXTENSIONS"] = ALLOWED_EXTENSIONS app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 3Mb for a file upload app.config["TEMPLATES_AUTO_RELOAD"] = True -# Printer connection -# Uses the class defined in the printer.py file -printer = Printer(app, 0x04B8, 0x0E28) +# Queue creation +print_queue = PrintQueue(app) # Find out if we are running on a Raspberry Pi rpi = Raspberry( - printer, + print_queue, app, socketio, configuration_file["rpi"]["button_gpio_port_number"], @@ -112,18 +106,20 @@ rpi = Raspberry( configuration_file["rpi"]["flash_gpio_port_number"], configuration_file["rpi"]["flash"], ) - RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi() -# Queue creation -print_queue = PrintQueue(app) # Web & API management web = Web(app, print_queue) # Start worker thread -worker = PrintWorker(app, print_queue, printer, socketio) -worker.start() +# When created, the worker will try to find printers connected to the system +try: + worker = PrintWorker(app, print_queue, socketio) + worker.start() +except Exception as e: + app.logger.error("Could not start the worker because %s ", str(e)) + sys.exit(-1) # The rate limit limiter = Limiter( @@ -303,6 +299,7 @@ def api_print_image(): return "OK", 200 +# TODO: This might not depend on the Raspberry Pi @app.route("/api/camera/picture", methods=["GET"]) def camera_picture(): """Returns a picture taken by the camera on a raspberry pi""" @@ -320,6 +317,10 @@ def api_queue_status(): """API endpoint for entire queue""" return jsonify(web.get_queue_state()) +@app.route("/api/queue/completed", methods=["GET"]) +def api_queue_completed(): + """API endpoint that returns the finished tasks""" + return jsonify(web.get_queue_completed()) @app.route("/api/worker", methods=["GET"]) def api_worker_state(): -- 2.47.3 From f841cd56281e65d8a082f20cbd2c907f4655f8a2 Mon Sep 17 00:00:00 2001 From: n07070 Date: Wed, 3 Jun 2026 23:54:15 +0200 Subject: [PATCH 07/25] Add queue completion method, add Printer discovery and types --- src/print_queue.py | 8 +- src/printer.py | 319 +++++++++++++++++++++++++++++++++------------ src/printers.py | 129 ++++++++++++++++-- 3 files changed, 363 insertions(+), 93 deletions(-) diff --git a/src/print_queue.py b/src/print_queue.py index 766a322..2922143 100644 --- a/src/print_queue.py +++ b/src/print_queue.py @@ -76,7 +76,13 @@ class PrintQueue: """Return current queue state""" with self._lock: self.app.logger.debug("Return current queue state") - return [{"task_id": t.task_id, "status": t.status} for t in self._queue] + return [{"task_id": t.task_id, "status": t.status, "type": str(t.task_type) } for t in self._queue] + + def get_queue_completed(self): + """Return completed queue elements""" + with self._lock: + self.app.logger.debug("Return completed queue elements") + return self._completed_tasks def get_status(self, task_id): """Get full status info for a task""" diff --git a/src/printer.py b/src/printer.py index 93570cc..5e32dde 100644 --- a/src/printer.py +++ b/src/printer.py @@ -1,13 +1,15 @@ """ This class manages connexion to a Printer """ -# import brother_ql from time import sleep import os.path from abc import ABC, abstractmethod +from dataclasses import dataclass from enum import Enum import uuid +import usb.core +import threading from PIL import Image, ImageEnhance import numpy as np @@ -15,20 +17,49 @@ import numpy as np # Importing the modules needed for each supported printer Type import escpos.printer -import brother_ql +from brother_ql.models import ModelsManager +from brother_ql.backends import backend_factory +from brother_ql import labels +from brother_ql.raster import BrotherQLRaster +from brother_ql.conversion import convert +from brother_ql.backends.helpers import send -class PrinterCapabilities(Enum): - """ - What are the capacities of a Printer ? - """ +class PrinterType(Enum): + """ + What are the capacities of a Printer ? + """ + EPSON = "epson" + BROTHER = "brother" +# For Brother-QL Printers +@dataclass +class PrinterInfo: + identifier: str + backend: str + protocol: str + vendor_id: str + product_id: str + serial_number: str + name: str = "Brother QL Printer" + model: str = "QL-570" + status: str = "unknown" + label_type: str = "unknown" + label_size : str = "unknown" + label_width: int = 0 + label_height: int = 0 + + def __getitem__(self, item): + return getattr(self, item) + + def __setitem__(self, key, value): + setattr(self, key, value) class Printer(ABC): """ If it outputs printed paper and speaks like a printer, then it must be a printer. """ - def __init__(self, app, vendor_id, device_id): + def __init__(self, app, vendor_id, device_id, printer_type: PrinterType): """ We initialize a Printer via it's USB connexion, and generate a unique ID """ @@ -37,6 +68,8 @@ class Printer(ABC): self.vendor_id = vendor_id self.device_id = device_id self.ready = False + self.printer_type = printer_type + self._lock = threading.Lock() @abstractmethod def _has_paper(self) -> bool: @@ -53,69 +86,41 @@ class Printer(ABC): class EscPosPrinter(Printer): """ - # The connection is based on the ESC/POS library - - ## Connection to the USB printer - - ## Making sure the printer is alive - - ## Making sure it has paper - - ## Define default print settings - - ## Print starting message, log time of first print, cut. - - ## Annonce readyness : return a positive pong message. + Create a new ESC/POS based printer. """ - def __init__(self, app): - super().__init__(app, vendor_id="",device_id="") + def __init__(self, app, vendor_id, device_id): + """ + Create a connexion to a ESC/POS Printer via USB, + Making sure the printer is alive, + Making sure it has paper, + Define default print settings + """ + super().__init__(app,vendor_id,device_id, printer_type=PrinterType.EPSON) self.printer = None self.usb_args = {} - self.usb_args["idVendor"] = self.device_id - self.usb_args["idProduct"] = self.vendor_id + self.usb_args["idVendor"] = self.vendor_id + self.usb_args["idProduct"] = self.device_id - if os.getenv("FLASK_DEBUG"): - waiting_elapsed = 3 - else: - waiting_elapsed = 10 + try: + # This also calls open(), which we need to close() + # or else the device will appear as busy. + p = escpos.printer.Usb( + self.vendor_id, self.device_id, 0, profile="TM-P80" + ) + except escpos.exceptions.DeviceNotFoundError as e: + self.app.logger.error( + "The USB device is not plugged in : %s", + str(e), + ) + except Exception as e: + self.app.logger.error("Printer could not be connected : %s ", str(e)) - self.app.logger.debug("Waiting for printer to get online...") - - online = False - while not online: - try: - # This also calls open(), which we need to close() - # or else the device will appear as busy. - p = escpos.printer.Usb( - self.device_id, self.vendor_id, 0, profile="TM-P80" - ) - except escpos.exceptions.DeviceNotFoundError as e: - self.app.logger.error( - "The USB device is not plugged in, trying again %s : %s", - waiting_elapsed, - str(e), - ) - - try: - if p.is_online(): - online = True - self.app.logger.debug("Printer online !") - except escpos.exceptions.DeviceNotFoundError as e: - self.app.logger.error( - "Error while getting the printer online %s : %s", - waiting_elapsed, - str(e), - ) - - sleep(1) - waiting_elapsed -= 1 - if waiting_elapsed < 1: - self.app.logger.error( - "Printer took more than 30 seconds to get online, aborting..." - ) - waiting_elapsed = 1 # Reset the waiting time for the next print. - raise RuntimeError("Could not get Printer %s online" % self.id) + try: + if p.is_online(): + self.app.logger.debug("Printer online !") + except Exception as e: + raise e # Setting up the printing options. p.set( @@ -284,7 +289,7 @@ class EscPosPrinter(Printer): try: self.printer.open(self.usb_args) self.printer.qr(content, center=True) - self.printer.textln(content, center=True) + self.printer.textln(content) self.printer.close() except RuntimeError as e: self.printer.close() @@ -311,21 +316,28 @@ class EscPosPrinter(Printer): def print_task(self, task_type, data): """Execute actual print based on task type""" - if self._state(): - match (task_type.value): - case "text": - self._print_txt(data["txt"], signature=data["sign"]) - case "image": - self._print_img( - data["img"], signature=data["sign"], process=data["process"] - ) - case "cut": - self._cut() - case "qr": - self._qr(data["txt"]) - case _: - raise RuntimeError("This task type is not supported") - + with self._lock: + if self._state: + self._state = False + match (task_type.value): + case "text": + self._print_txt(data["txt"], signature=data["sign"]) + self._state = True + case "image": + self._print_img( + data["img"], signature=data["sign"], process=data["process"] + ) + self._state = True + case "cut": + self._cut() + self._state = True + case "qr": + self._qr(data["txt"]) + self._state = True + case _: + raise RuntimeError("This task type is not supported") + else: + raise RuntimeError("The printer is not ready to print yet !") class BrotherPrinter(Printer): """ @@ -333,17 +345,156 @@ class BrotherPrinter(Printer): """ def __init__(self, app, vendor_id, device_id): - super().__init__(app, vendor_id="",device_id="") + super().__init__(app, vendor_id="",device_id="", printer_type=PrinterType.BROTHER) self.printer = None self.usb_args = {} self.usb_args["idVendor"] = self.device_id self.usb_args["idProduct"] = self.vendor_id + self.model_manager = ModelsManager() - def _has_paper(): + # Code taken from https://github.com/5shekel/printit/blob/master/printer_utils.py - def _state(): + backend = backend_factory("pyusb") + available_devices = backend["list_available_devices"]() - def print_task(): + for printer in available_devices: + self.app.logger.debug(f"Found device: {printer}") + identifier = printer["identifier"] + parts = identifier.split("/") + + if len(parts) < 4: + self.app.logger.warning(f"Skipping device with invalid identifier format: {identifier}") + continue + + protocol = parts[0] + device_info = parts[2] + serial_number = parts[3] + + try: + product_id_int = int(self.device_id, 16) + for m in self.model_manager.iter_elements(): + if m.product_id == product_id_int: + model = m.identifier + break + self.app.logger.debug(f"Matched printer model: {model}") + except ValueError: + self.app.logger.warning(f"Invalid product ID format: {product_id}") + + self.printer_info = PrinterInfo( + identifier=identifier, + backend="pyusb", + model=model, + protocol=protocol, + vendor_id=vendor_id, + product_id=self.device_id, + serial_number=serial_number, + ) + + self.ready = True + + + def _has_paper(self): + raise NotImplementedError("This printer model does not support this.") + + def _state(self): + return self.ready + + def _print_img(self,data): + """ + Print a raster image via a Brother QL printer + """ + self.ready = False + label_type = "102" + rotate = 0 + dither = False + try: + # Prepare the image for printing + qlr = BrotherQLRaster(self.printer_info["model"]) + + instructions = convert( + qlr=qlr, + images=[data["img"]], + label=label_type, + rotate=rotate, + threshold=70, + dither=dither, + compress=True, + red=False, + dpi_600=False, + hq=False, + cut=True, + ) + + # Debug logging + if FLASK_DEBUG: + self.app.logger.debug(f""" + Print parameters: + - Label type: {label_type} + - Rotate: {rotate} + - Dither: {dither} + - Model: {self.printer_info['model']} + - Backend: {self.printer_info['backend']} + - Identifier: {self.printer_info['identifier']} + """) + + # Try to print using Python API + # send() = status = { + # 'instructions_sent': True, # The instructions were sent to the printer. + # 'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error' + # 'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it. + # 'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability). + # 'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown. + # } + status = send( + instructions=instructions, + printer_identifier= self.printer_info["identifier"], + backend_identifier="pyusb" + ) + + if not status["did_print"] or status["outcome"] == "error" or status["outcome"] == "unknown": + raise RuntimeError("Failed to print using Python API") + + if status["printer_state"]: + self.ready = bool(status["printer_state"]) + else: + self.ready = True + + except usb.core.USBError as e: + # Treat timeout errors as successful since they often occur after print completion + if e.errno == 110: # Operation timed out + self.app.logger.debug("USB timeout occurred - this is normal and the print likely completed") + self.app.logger.debug("Print completed (timeout is normal)") + self.ready = True + + error_msg = f"USBError encountered: {e}" + self.app.logger.debug(error_msg) + raise RuntimeError from e + + except Exception as e: + error_msg = f"Unexpected error during printing: {str(e)}" + self.app.logger.debug(error_msg) + raise RuntimeError from e + + def print_task(self, task_type, data): + """Execute actual print based on task type""" + with self._lock: + if self._state: + self._state = False + match (task_type.value): + case "image": + self._print_img(data["img"]) + self._state = True + case "cut": + # The cut happens by default on Brother QL printers. + self._state = True + case _: + raise RuntimeError("This task type is not supported") + else: + raise RuntimeError("The printer is not ready to print yet !") + + # These values are by default for now + + # raise NotImplementedError("This printer type is not implemented yet") def _process_image(self, path): brightness_factor = 1.5 # Used only if image is too dark diff --git a/src/printers.py b/src/printers.py index 0f820bc..7df1d99 100644 --- a/src/printers.py +++ b/src/printers.py @@ -3,16 +3,129 @@ A collection of Printers. It has methods to discover printers, and provides an interface for the methods expected from printers. """ - -from printer import Printer +from collections.abc import Mapping, Set +import usb.core +import usb.util +from printer import Printer, EscPosPrinter, BrotherPrinter, PrinterType class Printers(): """ - A collection of Printers + Finds and creates a set of Printer that can be used by the Workers to print. """ - def __init__(self, name, age): - self.name = name - self.age = age + def __init__(self, app): + """ + Discover printers connected to the computer and return a Collection of Printer() + """ + self.app = app + self.printers = self._discover_printers() - def _discover_printers(self): - pass \ No newline at end of file + def _discover_printers(self) -> Set[Printer]: + """ + Gets connected USB printer devices using the pyusb library. + + We analyse the USB devices, get the ones that match + the printer class ( 7 ) and for Brother and EPSON printers, + try to create a Printer object that can be used by the Worker class + to execute prints. + + Returns a set of Printer + """ + + self.app.logger.debug("Discovering USB Devices connected to this system") + + printers = set() + + # Find all connected USB devices + devices = usb.core.find(find_all=True,custom_match=_FindClass(7)) + if not devices: + self.app.logger.warning("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.") + raise RuntimeError("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.") + + for dev in devices: + # Attempt to get the manufacturer and product strings + try: + manufacturer = usb.util.get_string(dev, dev.iManufacturer) + except Exception: + manufacturer = "Unknown" + + try: + product = usb.util.get_string(dev, dev.iProduct) + except Exception: + product = "Unknown" + self.app.logger.debug("Looking at %s %s (%s:%s)", manufacturer, product, hex(dev.idVendor), hex(dev.idProduct)) + + if manufacturer == "EPSON": + try: + # We create a new EscPosPrinter() + self.app.logger.debug("Trying to creat a new EPSON printer") + prid = dev.idProduct + vendir = dev.idVendor + escpos_printer = EscPosPrinter(self.app, vendor_id=vendir, device_id=prid) + except Exception as e: + raise e + + # If the object creation is successfull, we add it to the list of Printers + printers.add(escpos_printer) + self.app.logger.debug("Found a %s printer" % manufacturer ) + + # We already found the type of printer, + # we don't need an extra comparaison. + continue + + # or a Brother Printer + if manufacturer == "Brother": + try: + # We create a new BrotherPrinter() + self.app.logger.debug("Trying to creat a new BROTHER printer") + prid = dev.idProduct + vendir = dev.idVendor + brother_printer = BrotherPrinter(self.app, vendor_id=vendir,device_id=prid) + except Exception as e: + self.app.logger.error("Could not create a %s printer class with %s:%s" % product, dev.idVendor, dev.idProduct) + raise e + + # If the object creation is successfull, we add it to the list of Printers + printers.add(brother_printer) + self.app.logger.debug("Found a %s printer" % manufacturer ) + + self.app.logger.debug("Found %s printers" % len(printers)) + return printers + + def any(self) -> Printer: + """ + Return a dict key: UUID, value: Printer, with any connected printer. + """ + if len(self.printers) > 0: + for i in self.printers: + return i + else: + raise RuntimeError("No printers available") + + def get_printer(self, printer_type): + """ + Return a specific printer + + printer_type -- a printer type + """ + return NotImplementedError() + + +class _FindClass(): + def __init__(self, class_): + self._class = class_ + def __call__(self, device): + # first, let's check the device + if device.bDeviceClass == self._class: + return True + # ok, transverse all devices to find an + # interface that matches our class + for cfg in device: + # find_descriptor: what's it? + intf = usb.util.find_descriptor( + cfg, + bInterfaceClass=self._class + ) + if intf is not None: + return True + + return False -- 2.47.3 From a2d1779e2bcbb9b89d0ea60207923a93fb71a092 Mon Sep 17 00:00:00 2001 From: n07070 Date: Wed, 3 Jun 2026 23:55:04 +0200 Subject: [PATCH 08/25] Update worker to get differents printer types --- src/worker.py | 114 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 36 deletions(-) diff --git a/src/worker.py b/src/worker.py index 5e5410f..e5a7fec 100644 --- a/src/worker.py +++ b/src/worker.py @@ -4,64 +4,105 @@ import threading import time +from printers import Printers class PrintWorker(threading.Thread): - def __init__(self, app, print_queue, printer, socketio=None): + def __init__(self, app, print_queue, socketio=None): super().__init__(daemon=True) self.app = app self.print_queue = print_queue - self.printer = printer + self.printer = None + self._lock = threading.Lock() self.socketio = socketio # Optional self.running = True self.state = "idle" # idle, printing, dead, drinking-a-beer - self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...") + try: + self.printers = Printers(self.app) + self.printers_obj = self.printers.printers + self.printers = iter(self.printers.printers) + except RuntimeError as e: + self.app.logger.warning("Could not get any Printers") + raise e + def run(self): """Background thread that processes queue items""" - self.app.logger.info("Worker started working.") + self.app.logger.debug("Worker %s started working.", threading.get_ident()) + self.app.logger.debug("Current threads : %s" % threading.active_count()) + self.app.logger.debug("Threads actives : %s " % threading.enumerate()) + while True: - if not self.running or not self.printer.ready: + + # If the printer is dead or asleep, it can't work. + if not self.running: time.sleep(0.2) continue - try: - task = self.print_queue.dequeue() - except Exception as e: - self.app.logger.error("Could not get a new task ! %s ", str(e)) - raise RuntimeError( - "We could not get a new task because " + str(e) - ) from e - - if task: + # If we have no available printer, we look at the list printers we know about, and try to find one that is available. + # When we find a printer, we acquire it + # When we are finished with a printer, we release it to the world. + while not self.printer or not self.printer.ready: + time.sleep(1) try: - self.app.logger.info("Got a new task") - self.app.logger.debug("Got task %s", task.task_id) - self.state = "printing" - task.status = "processing" - self._emit_status(task.task_id, "processing") + self.app.logger.debug("Changing printers") + self.printer = next(self.printers) + self.app.logger.debug("The worker got a %s printer and it's %s", self.printer.printer_type, "Ready" if self.printer.ready else "Not ready") + except Exception as e: + self.app.logger.error(str(e)) + self.printer = None - print_data = task.get_print_data() + if self.state != "idle": + self.app.logger("We are not idle, waiting...") + time.sleep(1) + continue + + self.state = "printing" + + with self._lock: + try: + task = self.print_queue.dequeue() + except Exception as e: + self.app.logger.error("Could not get a new task ! %s ", str(e)) + self.state = "idle" + raise RuntimeError( + "We could not get a new task because " + str(e) + ) from e + + if task: try: - self.printer.print_task(task.task_type, print_data) + self.app.logger.info("Got a new task") + self.app.logger.debug("Got task %s", task.task_id) + task.status = "processing" + self._emit_status(task.task_id, "processing") + + print_data = task.get_print_data() + + try: + self.printer.print_task(task.task_type, print_data) + except RuntimeError as e: + self.state = "idle" + self.app.logger.error("Could not print : %s", str(e)) + raise e + + task.status = "completed" + self.print_queue.mark_completed(task.task_id, "completed") + self._emit_status(task.task_id, "completed") + self.app.logger.debug("Finished printing task %s " % task.task_id) + self.state = "idle" + except RuntimeError as e: - self.app.logger.error("Could not print : %s", str(e)) - raise e + task.status = "failed" + self.state = "idle" + self.print_queue.mark_completed(task.task_id, "failed") + self._emit_status(task.task_id, "failed", error=str(e)) + self.app.logger.error("Could not print task %s because %s " % task.task_id, str(e)) - task.status = "completed" - self.print_queue.mark_completed(task.task_id, "completed") - self._emit_status(task.task_id, "completed") - - except RuntimeError as e: - task.status = "failed" - self.print_queue.mark_completed(task.task_id, "failed") - self._emit_status(task.task_id, "failed", error=str(e)) - print(f"Print task {task.task_id} failed: {e}") - else: - # When they are no new tasks to handle, we put the thread to sleep. - self.state = "idle" - time.sleep(0.1) + else: + # When they are no new tasks to handle, we put the thread to sleep. + self.state = "idle" + time.sleep(0.1) def _emit_status(self, task_id, status, error=None): """Emit status update via Socket.IO if available""" @@ -104,4 +145,5 @@ class PrintWorker(threading.Thread): "is_running": self.running, "queue_size": len(self.print_queue), "state": self.state, + "printers": len(self.printers_obj), } -- 2.47.3 From 3c490e10b4ddede0af5f5704c84bc080c8ba514a Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 00:31:04 +0200 Subject: [PATCH 09/25] Apply black formatter --- src/main.py | 3 +++ src/print_queue.py | 5 +++- src/printer.py | 60 ++++++++++++++++++++++++++++------------------ src/printers.py | 49 +++++++++++++++++++++++++------------ src/raspberry.py | 9 ++++--- src/task.py | 6 ++++- src/web.py | 4 ++-- src/worker.py | 16 +++++++++---- 8 files changed, 103 insertions(+), 49 deletions(-) diff --git a/src/main.py b/src/main.py index 75b5cff..f428402 100644 --- a/src/main.py +++ b/src/main.py @@ -126,6 +126,7 @@ limiter = Limiter( get_remote_address, app=app, default_limits=["1500 per day", "500 per hour"] ) + # General routes @app.route("/") @limiter.limit("1/second", override_defaults=False) @@ -317,11 +318,13 @@ def api_queue_status(): """API endpoint for entire queue""" return jsonify(web.get_queue_state()) + @app.route("/api/queue/completed", methods=["GET"]) def api_queue_completed(): """API endpoint that returns the finished tasks""" return jsonify(web.get_queue_completed()) + @app.route("/api/worker", methods=["GET"]) def api_worker_state(): """API endpoint to get the worker state""" diff --git a/src/print_queue.py b/src/print_queue.py index 2922143..0dfc24f 100644 --- a/src/print_queue.py +++ b/src/print_queue.py @@ -76,7 +76,10 @@ class PrintQueue: """Return current queue state""" with self._lock: self.app.logger.debug("Return current queue state") - return [{"task_id": t.task_id, "status": t.status, "type": str(t.task_type) } for t in self._queue] + return [ + {"task_id": t.task_id, "status": t.status, "type": str(t.task_type)} + for t in self._queue + ] def get_queue_completed(self): """Return completed queue elements""" diff --git a/src/printer.py b/src/printer.py index 5e32dde..222c847 100644 --- a/src/printer.py +++ b/src/printer.py @@ -1,8 +1,10 @@ """ This class manages connexion to a Printer """ + from time import sleep import os.path +import os from abc import ABC, abstractmethod from dataclasses import dataclass @@ -14,7 +16,6 @@ import threading from PIL import Image, ImageEnhance import numpy as np - # Importing the modules needed for each supported printer Type import escpos.printer from brother_ql.models import ModelsManager @@ -24,12 +25,15 @@ from brother_ql.raster import BrotherQLRaster from brother_ql.conversion import convert from brother_ql.backends.helpers import send + class PrinterType(Enum): - """ - What are the capacities of a Printer ? - """ - EPSON = "epson" - BROTHER = "brother" + """ + What are the capacities of a Printer ? + """ + + EPSON = "epson" + BROTHER = "brother" + # For Brother-QL Printers @dataclass @@ -44,7 +48,7 @@ class PrinterInfo: model: str = "QL-570" status: str = "unknown" label_type: str = "unknown" - label_size : str = "unknown" + label_size: str = "unknown" label_width: int = 0 label_height: int = 0 @@ -54,6 +58,7 @@ class PrinterInfo: def __setitem__(self, key, value): setattr(self, key, value) + class Printer(ABC): """ If it outputs printed paper and speaks like a printer, then it must be a printer. @@ -80,7 +85,7 @@ class Printer(ABC): """Reports the state of the Printer""" @abstractmethod - def print_task(self, task_type, data)-> None: + def print_task(self, task_type, data) -> None: """Takes a PrintTask and executes it""" @@ -96,7 +101,7 @@ class EscPosPrinter(Printer): Making sure it has paper, Define default print settings """ - super().__init__(app,vendor_id,device_id, printer_type=PrinterType.EPSON) + super().__init__(app, vendor_id, device_id, printer_type=PrinterType.EPSON) self.printer = None self.usb_args = {} self.usb_args["idVendor"] = self.vendor_id @@ -105,9 +110,7 @@ class EscPosPrinter(Printer): try: # This also calls open(), which we need to close() # or else the device will appear as busy. - p = escpos.printer.Usb( - self.vendor_id, self.device_id, 0, profile="TM-P80" - ) + p = escpos.printer.Usb(self.vendor_id, self.device_id, 0, profile="TM-P80") except escpos.exceptions.DeviceNotFoundError as e: self.app.logger.error( "The USB device is not plugged in : %s", @@ -141,7 +144,7 @@ class EscPosPrinter(Printer): # Beware : if we print every time the printer becomes ready, it means # we are printing before and after every print ! self.printer = p - self.printer.close() # We close the connexion to the Printer + self.printer.close() # We close the connexion to the Printer try: self._has_paper() @@ -339,13 +342,16 @@ class EscPosPrinter(Printer): else: raise RuntimeError("The printer is not ready to print yet !") + class BrotherPrinter(Printer): """ Manages connexion and capabilities of a BrotherQL Printer """ def __init__(self, app, vendor_id, device_id): - super().__init__(app, vendor_id="",device_id="", printer_type=PrinterType.BROTHER) + super().__init__( + app, vendor_id="", device_id="", printer_type=PrinterType.BROTHER + ) self.printer = None self.usb_args = {} self.usb_args["idVendor"] = self.device_id @@ -363,7 +369,9 @@ class BrotherPrinter(Printer): parts = identifier.split("/") if len(parts) < 4: - self.app.logger.warning(f"Skipping device with invalid identifier format: {identifier}") + self.app.logger.warning( + f"Skipping device with invalid identifier format: {identifier}" + ) continue protocol = parts[0] @@ -378,7 +386,7 @@ class BrotherPrinter(Printer): break self.app.logger.debug(f"Matched printer model: {model}") except ValueError: - self.app.logger.warning(f"Invalid product ID format: {product_id}") + self.app.logger.warning(f"Invalid product ID format: {m.product_id}") self.printer_info = PrinterInfo( identifier=identifier, @@ -392,14 +400,13 @@ class BrotherPrinter(Printer): self.ready = True - def _has_paper(self): raise NotImplementedError("This printer model does not support this.") def _state(self): return self.ready - def _print_img(self,data): + def _print_img(self, data): """ Print a raster image via a Brother QL printer """ @@ -426,7 +433,7 @@ class BrotherPrinter(Printer): ) # Debug logging - if FLASK_DEBUG: + if os.getenv("FLASK_DEBUG"): self.app.logger.debug(f""" Print parameters: - Label type: {label_type} @@ -447,11 +454,15 @@ class BrotherPrinter(Printer): # } status = send( instructions=instructions, - printer_identifier= self.printer_info["identifier"], - backend_identifier="pyusb" + printer_identifier=self.printer_info["identifier"], + backend_identifier="pyusb", ) - if not status["did_print"] or status["outcome"] == "error" or status["outcome"] == "unknown": + if ( + not status["did_print"] + or status["outcome"] == "error" + or status["outcome"] == "unknown" + ): raise RuntimeError("Failed to print using Python API") if status["printer_state"]: @@ -462,7 +473,9 @@ class BrotherPrinter(Printer): except usb.core.USBError as e: # Treat timeout errors as successful since they often occur after print completion if e.errno == 110: # Operation timed out - self.app.logger.debug("USB timeout occurred - this is normal and the print likely completed") + self.app.logger.debug( + "USB timeout occurred - this is normal and the print likely completed" + ) self.app.logger.debug("Print completed (timeout is normal)") self.ready = True @@ -496,6 +509,7 @@ class BrotherPrinter(Printer): # raise NotImplementedError("This printer type is not implemented yet") + def _process_image(self, path): brightness_factor = 1.5 # Used only if image is too dark brightness_threshold = 100 # Brightness threshold (0–255) diff --git a/src/printers.py b/src/printers.py index 7df1d99..379ce18 100644 --- a/src/printers.py +++ b/src/printers.py @@ -3,15 +3,18 @@ A collection of Printers. It has methods to discover printers, and provides an interface for the methods expected from printers. """ + from collections.abc import Mapping, Set import usb.core import usb.util from printer import Printer, EscPosPrinter, BrotherPrinter, PrinterType -class Printers(): + +class Printers: """ Finds and creates a set of Printer that can be used by the Workers to print. """ + def __init__(self, app): """ Discover printers connected to the computer and return a Collection of Printer() @@ -36,10 +39,14 @@ class Printers(): printers = set() # Find all connected USB devices - devices = usb.core.find(find_all=True,custom_match=_FindClass(7)) + devices = usb.core.find(find_all=True, custom_match=_FindClass(7)) if not devices: - self.app.logger.warning("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.") - raise RuntimeError("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.") + self.app.logger.warning( + "No USB devices of class 7 ( printers ) found or pyusb could not access the bus." + ) + raise RuntimeError( + "No USB devices of class 7 ( printers ) found or pyusb could not access the bus." + ) for dev in devices: # Attempt to get the manufacturer and product strings @@ -52,7 +59,13 @@ class Printers(): product = usb.util.get_string(dev, dev.iProduct) except Exception: product = "Unknown" - self.app.logger.debug("Looking at %s %s (%s:%s)", manufacturer, product, hex(dev.idVendor), hex(dev.idProduct)) + self.app.logger.debug( + "Looking at %s %s (%s:%s)", + manufacturer, + product, + hex(dev.idVendor), + hex(dev.idProduct), + ) if manufacturer == "EPSON": try: @@ -60,13 +73,15 @@ class Printers(): self.app.logger.debug("Trying to creat a new EPSON printer") prid = dev.idProduct vendir = dev.idVendor - escpos_printer = EscPosPrinter(self.app, vendor_id=vendir, device_id=prid) + escpos_printer = EscPosPrinter( + self.app, vendor_id=vendir, device_id=prid + ) except Exception as e: raise e # If the object creation is successfull, we add it to the list of Printers printers.add(escpos_printer) - self.app.logger.debug("Found a %s printer" % manufacturer ) + self.app.logger.debug("Found a %s printer" % manufacturer) # We already found the type of printer, # we don't need an extra comparaison. @@ -79,14 +94,20 @@ class Printers(): self.app.logger.debug("Trying to creat a new BROTHER printer") prid = dev.idProduct vendir = dev.idVendor - brother_printer = BrotherPrinter(self.app, vendor_id=vendir,device_id=prid) + brother_printer = BrotherPrinter( + self.app, vendor_id=vendir, device_id=prid + ) except Exception as e: - self.app.logger.error("Could not create a %s printer class with %s:%s" % product, dev.idVendor, dev.idProduct) + self.app.logger.error( + "Could not create a %s printer class with %s:%s" % product, + dev.idVendor, + dev.idProduct, + ) raise e # If the object creation is successfull, we add it to the list of Printers printers.add(brother_printer) - self.app.logger.debug("Found a %s printer" % manufacturer ) + self.app.logger.debug("Found a %s printer" % manufacturer) self.app.logger.debug("Found %s printers" % len(printers)) return printers @@ -110,9 +131,10 @@ class Printers(): return NotImplementedError() -class _FindClass(): +class _FindClass: def __init__(self, class_): self._class = class_ + def __call__(self, device): # first, let's check the device if device.bDeviceClass == self._class: @@ -121,10 +143,7 @@ class _FindClass(): # interface that matches our class for cfg in device: # find_descriptor: what's it? - intf = usb.util.find_descriptor( - cfg, - bInterfaceClass=self._class - ) + intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class) if intf is not None: return True diff --git a/src/raspberry.py b/src/raspberry.py index a75b401..7e1841b 100644 --- a/src/raspberry.py +++ b/src/raspberry.py @@ -15,7 +15,8 @@ from gpiozero import Button, LED, DigitalOutputDevice from PIL import Image from task import TextTask, ImageTask, CutTask -class Raspberry(): + +class Raspberry: """ This class will manage three things : - Connecting to a USB webcam @@ -263,8 +264,10 @@ class Raspberry(): self.app.logger.debug("Printing picture") self.led.on() self.crop_to_square(self.image_path) - self.print_queue.enqueue(ImageTask(self.image_path,signature="",process=True)) - self.print_queue.enqueue(TextTask(content="Imprimé par LittlePrynter", signature="")) + self.print_queue.enqueue(ImageTask(self.image_path, signature="", process=True)) + self.print_queue.enqueue( + TextTask(content="Imprimé par LittlePrynter", signature="") + ) time = strftime("%Y-%m-%d %H:%M", gmtime()) self.print_queue.enqueue(TextTask(content=time, signature="")) self.print_queue.enqueue(CutTask()) diff --git a/src/task.py b/src/task.py index 9fc38a9..5b45920 100644 --- a/src/task.py +++ b/src/task.py @@ -14,6 +14,7 @@ to print at the same time. We can also delay and store printing tasks until a printer becomes available if none is online. """ + from abc import ABC, abstractmethod ## See https://docs.python.org/3/library/abc.html to learn more about this @@ -27,6 +28,7 @@ class TaskType(Enum): """ The different tasks supported by the printers """ + TEXT = "text" IMAGE = "image" CUT = "cut" @@ -47,7 +49,6 @@ class PrintTask(ABC): def get_print_data(self): """Return data formatted for printer""" - def _generate_id(self): # Generate unique task ID return str(uuid.uuid4()) @@ -66,8 +67,10 @@ class TextTask(PrintTask): def get_print_data(self): return {"txt": self.content, "sign": self.signature} + class QRTask(TextTask): """This task prints a QR-Code, the signature is ignore and is always the content itself""" + def __init__(self, content): super().__init__(content, signature="") self.content = content @@ -76,6 +79,7 @@ class QRTask(TextTask): def get_print_data(self): return {"txt": self.content, "sign": self.signature} + class ImageTask(PrintTask): """ This tasks represents a image content ( in the form of it's path ), and it's signature. diff --git a/src/web.py b/src/web.py index 1db14dd..b31cd10 100644 --- a/src/web.py +++ b/src/web.py @@ -6,7 +6,7 @@ from task import TextTask, ImageTask, CutTask class Web(object): """Web is the class that gets all of the information from web calls - ( API and Web page ) and provides checks before sending stuff to printing""" + ( API and Web page ) and provides checks before sending stuff to printing""" def __init__(self, app, print_queue): super(Web).__init__() @@ -116,4 +116,4 @@ class Web(object): def get_queue_completed(self): """Return completed queue elements""" - return self.print_queue.get_queue_completed() \ No newline at end of file + return self.print_queue.get_queue_completed() diff --git a/src/worker.py b/src/worker.py index e5a7fec..ad10220 100644 --- a/src/worker.py +++ b/src/worker.py @@ -31,7 +31,7 @@ class PrintWorker(threading.Thread): """Background thread that processes queue items""" self.app.logger.debug("Worker %s started working.", threading.get_ident()) self.app.logger.debug("Current threads : %s" % threading.active_count()) - self.app.logger.debug("Threads actives : %s " % threading.enumerate()) + self.app.logger.debug("Threads actives : %s " % threading.enumerate()) while True: @@ -48,7 +48,11 @@ class PrintWorker(threading.Thread): try: self.app.logger.debug("Changing printers") self.printer = next(self.printers) - self.app.logger.debug("The worker got a %s printer and it's %s", self.printer.printer_type, "Ready" if self.printer.ready else "Not ready") + self.app.logger.debug( + "The worker got a %s printer and it's %s", + self.printer.printer_type, + "Ready" if self.printer.ready else "Not ready", + ) except Exception as e: self.app.logger.error(str(e)) self.printer = None @@ -89,7 +93,9 @@ class PrintWorker(threading.Thread): task.status = "completed" self.print_queue.mark_completed(task.task_id, "completed") self._emit_status(task.task_id, "completed") - self.app.logger.debug("Finished printing task %s " % task.task_id) + self.app.logger.debug( + "Finished printing task %s " % task.task_id + ) self.state = "idle" except RuntimeError as e: @@ -97,7 +103,9 @@ class PrintWorker(threading.Thread): self.state = "idle" self.print_queue.mark_completed(task.task_id, "failed") self._emit_status(task.task_id, "failed", error=str(e)) - self.app.logger.error("Could not print task %s because %s " % task.task_id, str(e)) + self.app.logger.error( + "Could not print task %s because %s " % task.task_id, str(e) + ) else: # When they are no new tasks to handle, we put the thread to sleep. -- 2.47.3 From 651235a610e89b2810673048a18ef2fee1b3c291 Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 00:34:49 +0200 Subject: [PATCH 10/25] Lint printer file --- src/printer.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/printer.py b/src/printer.py index 222c847..abd8d5a 100644 --- a/src/printer.py +++ b/src/printer.py @@ -2,7 +2,6 @@ This class manages connexion to a Printer """ -from time import sleep import os.path import os from abc import ABC, abstractmethod @@ -10,8 +9,8 @@ from dataclasses import dataclass from enum import Enum import uuid -import usb.core import threading +import usb.core from PIL import Image, ImageEnhance import numpy as np @@ -20,7 +19,6 @@ import numpy as np import escpos.printer from brother_ql.models import ModelsManager from brother_ql.backends import backend_factory -from brother_ql import labels from brother_ql.raster import BrotherQLRaster from brother_ql.conversion import convert from brother_ql.backends.helpers import send @@ -38,6 +36,9 @@ class PrinterType(Enum): # For Brother-QL Printers @dataclass class PrinterInfo: + """ + Brother-QL printer information + """ identifier: str backend: str protocol: str @@ -375,7 +376,7 @@ class BrotherPrinter(Printer): continue protocol = parts[0] - device_info = parts[2] + # device_info = parts[2] serial_number = parts[3] try: @@ -513,7 +514,7 @@ class BrotherPrinter(Printer): def _process_image(self, path): brightness_factor = 1.5 # Used only if image is too dark brightness_threshold = 100 # Brightness threshold (0–255) - contrast_factor = 0.6 # Less than 1.0 = lower contrast + # contrast_factor = 0.6 # Less than 1.0 = lower contrast max_width = 575 max_height = 1000 -- 2.47.3 From 6d9db2d2aa117fd0bddeafca1778440ec81fbfcb Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 00:39:37 +0200 Subject: [PATCH 11/25] Apply linting to printers --- src/printers.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/printers.py b/src/printers.py index 379ce18..b3d020c 100644 --- a/src/printers.py +++ b/src/printers.py @@ -1,13 +1,14 @@ """ A collection of Printers. -It has methods to discover printers, and provides an interface for the methods expected from printers. +It has methods to discover printers, and provides an interface for +the methods expected from printers. """ -from collections.abc import Mapping, Set +from collections.abc import Set import usb.core import usb.util -from printer import Printer, EscPosPrinter, BrotherPrinter, PrinterType +from printer import Printer, EscPosPrinter, BrotherPrinter class Printers: @@ -81,7 +82,7 @@ class Printers: # If the object creation is successfull, we add it to the list of Printers printers.add(escpos_printer) - self.app.logger.debug("Found a %s printer" % manufacturer) + self.app.logger.debug("Found a %s printer", manufacturer) # We already found the type of printer, # we don't need an extra comparaison. @@ -99,7 +100,7 @@ class Printers: ) except Exception as e: self.app.logger.error( - "Could not create a %s printer class with %s:%s" % product, + "Could not create a %s printer class with %s:%s" , product, dev.idVendor, dev.idProduct, ) @@ -107,9 +108,9 @@ class Printers: # If the object creation is successfull, we add it to the list of Printers printers.add(brother_printer) - self.app.logger.debug("Found a %s printer" % manufacturer) + self.app.logger.debug("Found a %s printer" , manufacturer) - self.app.logger.debug("Found %s printers" % len(printers)) + self.app.logger.debug("Found %s printers" , len(printers)) return printers def any(self) -> Printer: @@ -122,16 +123,20 @@ class Printers: else: raise RuntimeError("No printers available") - def get_printer(self, printer_type): - """ - Return a specific printer + # def get_printer(self, printer_type): + # """ + # Return a specific printer - printer_type -- a printer type - """ - return NotImplementedError() + # printer_type -- a printer type + # """ + # return NotImplementedError() class _FindClass: + """ + Use by usb.core to modify the way USB devices are found + Taken from pyUSB documentation on Github + """ def __init__(self, class_): self._class = class_ -- 2.47.3 From adcc744e7a5c1b340f70215eb0825035aa9ff98d Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 00:59:17 +0200 Subject: [PATCH 12/25] Apply linting to the Raspberry Pi --- src/main.py | 6 +---- src/raspberry.py | 68 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/src/main.py b/src/main.py index f428402..2acd4e8 100644 --- a/src/main.py +++ b/src/main.py @@ -100,11 +100,7 @@ print_queue = PrintQueue(app) rpi = Raspberry( print_queue, app, - socketio, - configuration_file["rpi"]["button_gpio_port_number"], - configuration_file["rpi"]["indicator_gpio_port_number"], - configuration_file["rpi"]["flash_gpio_port_number"], - configuration_file["rpi"]["flash"], + configuration_file ) RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi() diff --git a/src/raspberry.py b/src/raspberry.py index 7e1841b..729e93a 100644 --- a/src/raspberry.py +++ b/src/raspberry.py @@ -10,7 +10,6 @@ import io # To check if we are on a Raspberry Pi import subprocess import os from time import sleep, gmtime, strftime -from flask_socketio import SocketIO from gpiozero import Button, LED, DigitalOutputDevice from PIL import Image from task import TextTask, ImageTask, CutTask @@ -24,29 +23,22 @@ class Raspberry: - Activating a flash ( or light ) - Flash an indicator light + # pylint: disable=too-many-instance-attributes + # dede """ def __init__( self, print_queue, app, - socketio, - button_gpio_port_number, - indicator_gpio_port_number, - flash_gpio_port_number, - is_flash_present, + configuration_file ): self.print_queue = print_queue - self.socketio = socketio self.app = app - - self.flash_gpio = flash_gpio_port_number - self.is_flash_present = is_flash_present - self.button_gpio = button_gpio_port_number - self.led_gpio = indicator_gpio_port_number + self.configuration_file = configuration_file self.image_path = self.app.config["UPLOAD_FOLDER"] + "/image.jpg" - def is_raspberry_pi(self, raise_on_errors=False): + def is_raspberry_pi(self): """ Checking if we are on a Raspberry Pi by checking information on the /proc/cpuinfo file @@ -73,9 +65,10 @@ class Raspberry: return False if not found: - self.app.logger.error( - "Couldn't get sufficient hardware information from /proc/cpuinfo, Unable to determine if we are on a Raspberry Pi." + self.app.logger.warning( + "Couldn't get sufficient hardware information from /proc/cpuinfo" ) + self.app.logger.error("Unable to determine if we are on a Raspberry Pi.") return False except IOError: self.app.logger.error("Unable to open `/proc/cpuinfo`.") @@ -84,28 +77,38 @@ class Raspberry: self.app.logger.debug("It seems we are on a Raspberry Pi") try: - self.initialise_gpio() + self._initialise_gpio() except Exception as e: self.app.logger.debug("Could not init GPIO : " + str(e)) raise e return True - def initialise_gpio(self): + def _initialise_gpio(self): + """ + Set GPIO ports from configuration and activate them + to show the user it's working. + """ self.app.logger.debug("Initializing GPIO") - self.led = LED(self.led_gpio) + self.led = LED(self.configuration_file["rpi"]["indicator_gpio_port_number"]) self.app.logger.debug("Activated indicator LED") self.indicator_countdown(iters=3) - self.button = Button(self.button_gpio, pull_up=True, bounce_time=0.1) + self.button = Button( + self.configuration_file["rpi"]["button_gpio_port_number"], + pull_up=True, bounce_time=0.1 + ) self.button.when_pressed = self.on_button_pressed self.app.logger.debug("Activated button") # The "flash" is a relay-controlled device ( light bulb for example ) - self.flash = DigitalOutputDevice(self.flash_gpio) + self.flash = DigitalOutputDevice(self.configuration_file["rpi"]["flash_gpio_port_number"]) self.flash_toggle() self.app.logger.debug("Activated flash") def indicator_countdown(self, iters=10, multi=10): + """ + Activates the LED faster and faster to show a countdown + """ for i in range(iters, 0, -1): self.led.on() sleep(i / multi) @@ -113,7 +116,10 @@ class Raspberry: sleep(i / multi) def indicator_led(self, timing=0.2, l=5): - for i in range(l): + """ + Turns on the indicator LED for a certain period of time + """ + for _ in range(l): self.app.logger.debug("LED turned on") self.led.on() sleep(timing) @@ -122,6 +128,9 @@ class Raspberry: sleep(timing) def flash_toggle(self): + """ + Flashes the flash + """ self.app.logger.debug("Flash turned on") self.flash.on() sleep(0.3) @@ -129,6 +138,9 @@ class Raspberry: self.app.logger.debug("Flash turned off") def take_picture(self): + """ + Takes a picture via the USB webcam + """ # Validate if the image path is valid if not os.path.isdir(os.path.dirname(self.image_path)): self.app.logger.error( @@ -174,6 +186,9 @@ class Raspberry: position="bottom_right", margin=10, ): + """ + Takes an image and overlays it with a another picture. + """ try: image = Image.open(image_path).convert("RGBA") logo = Image.open(logo_path).convert("RGBA") @@ -200,7 +215,9 @@ class Raspberry: y = image.height - logo.height - margin else: raise ValueError( - "Invalid position. Choose from 'bottom_right', 'top_left', 'top_right', or 'bottom_left'." + "Invalid position." + + "Choose from 'bottom_right', 'top_left', " + + " 'top_right', or 'bottom_left'." ) # Composite the logo onto the image @@ -218,6 +235,9 @@ class Raspberry: return True def crop_to_square(self, image_path, output_path=None): + """ + Crop an image so that it becomes a square + """ try: image = Image.open(image_path) width, height = image.size @@ -245,6 +265,10 @@ class Raspberry: return True def on_button_pressed(self): + """ + When a button press is detected, a picture is taken from the webcam + and added to the print queue. + """ self.app.logger.debug("Button has been pressed") self.led.on() self.app.logger.debug("Counting down") -- 2.47.3 From ad3cb6231a568e6e888ec9a0b08a9ff39a0a3d3f Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 01:11:37 +0200 Subject: [PATCH 13/25] Apply linting to web --- src/web.py | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/web.py b/src/web.py index b31cd10..8bcb92c 100644 --- a/src/web.py +++ b/src/web.py @@ -1,10 +1,13 @@ +""" +Manage all of the inputs from a web source +""" + import os -from flask import flash from werkzeug.utils import secure_filename from task import TextTask, ImageTask, CutTask -class Web(object): +class Web: """Web is the class that gets all of the information from web calls ( API and Web page ) and provides checks before sending stuff to printing""" @@ -65,16 +68,19 @@ class Web(object): return True - def login(self, username: str, password: str) -> bool: - """Not implemented""" - return + # def login(self, username: str, password: str) -> bool: + # """Not implemented""" + # return - def logout(self, username: str, password: str) -> bool: - """Not implemented""" - return + # def logout(self, username: str, password: str) -> bool: + # """Not implemented""" + # return def allowed_file(self, filename) -> bool: - self.app.logger.debug("Is the filename allowed ?") + """ + Check if the file extension is allowed + """ + self.app.logger.debug("Checking if the file extension is allowed") return ( "." in filename and filename.rsplit(".", 1)[1].lower() @@ -82,8 +88,11 @@ class Web(object): ) def upload_file(self, image) -> bool: + """ + Save the file after executing checks on it + """ self.app.logger.debug("Validating file") - if image: + if not image is None or not image == "": if self.allowed_file(image.filename): filename = secure_filename(image.filename) self.app.logger.debug("File valid") @@ -99,16 +108,10 @@ class Web(object): ) return True - self.app.logger.error( - "Could not save file because the filename is forbidden" - ) - return False - - else: - self.app.logger.error( - "Could not save file, it seems to be null ? : " + str(filename) - ) - return False + self.app.logger.error( + "Could not save file because the filename is forbidden" + ) + return False def get_queue_state(self): """Return current queue state""" -- 2.47.3 From 2262840f75bdfb3a5352875a9654521183723017 Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 01:26:47 +0200 Subject: [PATCH 14/25] Apply linting to worker --- src/worker.py | 50 ++++++++++++++++++++------------------------------ 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/src/worker.py b/src/worker.py index ad10220..91b7567 100644 --- a/src/worker.py +++ b/src/worker.py @@ -1,6 +1,7 @@ -# This is the main printing thread -# As explained in the task file, this is where we command -# printing to happen. +""" +This is the main printing thread. A worker thread consums Tasks from + a PrintQueue, while trying to find available printers. +""" import threading import time @@ -8,13 +9,21 @@ from printers import Printers class PrintWorker(threading.Thread): - def __init__(self, app, print_queue, socketio=None): + """ + A thread used to consume Tasks added to a Print Queue. + + On initialisation, the worker will try to find Printers, + and on each print, choose an available Printer from a list of Printers. + + If a print fails, it's not retried, but will be added to a list of completed + tasks. + """ + def __init__(self, app, print_queue): super().__init__(daemon=True) self.app = app self.print_queue = print_queue self.printer = None self._lock = threading.Lock() - self.socketio = socketio # Optional self.running = True self.state = "idle" # idle, printing, dead, drinking-a-beer self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...") @@ -30,8 +39,8 @@ class PrintWorker(threading.Thread): def run(self): """Background thread that processes queue items""" self.app.logger.debug("Worker %s started working.", threading.get_ident()) - self.app.logger.debug("Current threads : %s" % threading.active_count()) - self.app.logger.debug("Threads actives : %s " % threading.enumerate()) + self.app.logger.debug("Current threads : %s" , threading.active_count()) + self.app.logger.debug("Threads actives : %s " , threading.enumerate()) while True: @@ -40,7 +49,8 @@ class PrintWorker(threading.Thread): time.sleep(0.2) continue - # If we have no available printer, we look at the list printers we know about, and try to find one that is available. + # If we have no available printer, we look at the list printers + # we know about, and try to find one that is available. # When we find a printer, we acquire it # When we are finished with a printer, we release it to the world. while not self.printer or not self.printer.ready: @@ -79,7 +89,6 @@ class PrintWorker(threading.Thread): self.app.logger.info("Got a new task") self.app.logger.debug("Got task %s", task.task_id) task.status = "processing" - self._emit_status(task.task_id, "processing") print_data = task.get_print_data() @@ -92,9 +101,8 @@ class PrintWorker(threading.Thread): task.status = "completed" self.print_queue.mark_completed(task.task_id, "completed") - self._emit_status(task.task_id, "completed") self.app.logger.debug( - "Finished printing task %s " % task.task_id + "Finished printing task %s " , task.task_id ) self.state = "idle" @@ -102,9 +110,8 @@ class PrintWorker(threading.Thread): task.status = "failed" self.state = "idle" self.print_queue.mark_completed(task.task_id, "failed") - self._emit_status(task.task_id, "failed", error=str(e)) self.app.logger.error( - "Could not print task %s because %s " % task.task_id, str(e) + "Could not print task %s because %s " , task.task_id, str(e) ) else: @@ -112,23 +119,6 @@ class PrintWorker(threading.Thread): self.state = "idle" time.sleep(0.1) - def _emit_status(self, task_id, status, error=None): - """Emit status update via Socket.IO if available""" - if not self.socketio: - return - - room = f"task_{task_id}" - data = { - "task_id": task_id, - "status": status, - "position": None, # Task no longer in queue - } - - if error: - data["error"] = error - - self.socketio.emit("task_status", data, room=room) - def stop_worker(self): """ Give the worker a break -- 2.47.3 From 54678175ba6ada96d15f7f11d243309c3ca0cb24 Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 01:27:23 +0200 Subject: [PATCH 15/25] Remove socketio from Worker for the moment --- src/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index 2acd4e8..2ae875a 100644 --- a/src/main.py +++ b/src/main.py @@ -111,7 +111,7 @@ web = Web(app, print_queue) # Start worker thread # When created, the worker will try to find printers connected to the system try: - worker = PrintWorker(app, print_queue, socketio) + worker = PrintWorker(app, print_queue) worker.start() except Exception as e: app.logger.error("Could not start the worker because %s ", str(e)) -- 2.47.3 From 9ccd2b8bdfced4713b1bfb211f5dd4a7f805192d Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 02:31:53 +0200 Subject: [PATCH 16/25] Use textarea instead of input, easier for ASCII art --- src/templates/index.html | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/templates/index.html b/src/templates/index.html index 1eeb467..2f2b0a8 100644 --- a/src/templates/index.html +++ b/src/templates/index.html @@ -6,7 +6,8 @@

Print a short message

-
+ +

-- 2.47.3 From c57e2f91a21eafc3e2d188a8df42e46e6bd2278b Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 02:32:08 +0200 Subject: [PATCH 17/25] Update getting debug env --- src/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.py b/src/main.py index 2ae875a..acc4b93 100644 --- a/src/main.py +++ b/src/main.py @@ -83,7 +83,7 @@ except PermissionError: sys.exit(77) # Output the config file -if os.getenv("FLASK_DEBUG"): +if not os.getenv("FLASK_DEBUG") is None and os.getenv("FLASK_DEBUG") is True: pprint.pprint(configuration_file) # We define the app module used by Flask @@ -402,4 +402,4 @@ def camera_status(): if __name__ == "__main__": - app.run(debug=True, use_reloader=False, host="0.0.0.0", ssl_context="adhoc") + app.run(use_reloader=False, host="0.0.0.0", ssl_context="adhoc") -- 2.47.3 From 3a1d9b20fbb2c6cdef40f0efef36a13054c50977 Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 02:32:23 +0200 Subject: [PATCH 18/25] Add skip line and catch printing errors --- src/printer.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/printer.py b/src/printer.py index abd8d5a..62715c1 100644 --- a/src/printer.py +++ b/src/printer.py @@ -216,6 +216,7 @@ class EscPosPrinter(Printer): self.printer.textln(clean_msg) if clean_signature: self.printer.textln(clean_signature) + self.printer.textln() self.printer.close() except Exception as e: self.app.logger.error("Unable to print because : " + str(e)) @@ -323,24 +324,28 @@ class EscPosPrinter(Printer): with self._lock: if self._state: self._state = False - match (task_type.value): - case "text": - self._print_txt(data["txt"], signature=data["sign"]) - self._state = True - case "image": - self._print_img( - data["img"], signature=data["sign"], process=data["process"] - ) - self._state = True - case "cut": - self._cut() - self._state = True - case "qr": - self._qr(data["txt"]) - self._state = True - case _: - raise RuntimeError("This task type is not supported") - else: + try: + match (task_type.value): + case "text": + self._print_txt(data["txt"], signature=data["sign"]) + self._state = True + case "image": + self._print_img( + data["img"], signature=data["sign"], process=data["process"] + ) + self._state = True + case "cut": + self._cut() + self._state = True + case "qr": + self._qr(data["txt"]) + self._state = True + case _: + raise RuntimeError("This task type is not supported") + except Exception as e: + self._state = True + raise RuntimeError from e + raise RuntimeError("The printer is not ready to print yet !") -- 2.47.3 From 175dd3385ad609fcef4f74fce12b51ec12f64683 Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 02:32:47 +0200 Subject: [PATCH 19/25] Add content in print queue method --- src/print_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/print_queue.py b/src/print_queue.py index 0dfc24f..38e3c3e 100644 --- a/src/print_queue.py +++ b/src/print_queue.py @@ -77,7 +77,7 @@ class PrintQueue: with self._lock: self.app.logger.debug("Return current queue state") return [ - {"task_id": t.task_id, "status": t.status, "type": str(t.task_type)} + {"task_id": t.task_id, "status": t.status, "type": str(t.task_type), "content": str(t.get_print_data())} for t in self._queue ] -- 2.47.3 From 53010987f49e86935765b17fd91b67df88b9ce2d Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 19:26:21 +0200 Subject: [PATCH 20/25] Update Error raising in uploads and image processing --- src/printer.py | 101 +++++++++++++++++++++++++++++-------------------- src/web.py | 6 +-- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/src/printer.py b/src/printer.py index 62715c1..27acdb9 100644 --- a/src/printer.py +++ b/src/printer.py @@ -6,12 +6,11 @@ import os.path import os from abc import ABC, abstractmethod from dataclasses import dataclass - +import time from enum import Enum import uuid import threading import usb.core - from PIL import Image, ImageEnhance import numpy as np @@ -176,7 +175,7 @@ class EscPosPrinter(Printer): return True def _print_txt(self, msg, signature="", bold=False): - + self.ready = False if not isinstance(msg, str): self.app.logger.error( "It is not possible to print a " + str(type(msg)) + ", only strings." @@ -225,9 +224,10 @@ class EscPosPrinter(Printer): ) from e self.app.logger.info("Printed text") - return True + self.ready = True def _print_img(self, path, signature="", center=True, process=False): + self.ready = False clean_signature = str(signature) if len(signature) > 256: @@ -253,30 +253,33 @@ class EscPosPrinter(Printer): if process: try: self.app.logger.debug("Proccessing the image") - path = _process_image(self, path) + processed_path = _process_image(self, path) except RuntimeError as e: self.app.logger.error( "Error while processing the image, aborting print : %s", str(e) ) raise e else: + processed_path = path self.app.logger.warning("Not proccessing the image") try: self.printer.open(self.usb_args) - self.printer.image(path, center=center) + self.printer.image(processed_path, center=center) self.printer.textln(signature) self.printer.close() - self.app.logger.debug("Printed an image : " + str(path)) + self.app.logger.debug("Printed an image : " + str(processed_path)) except Exception as e: self.app.logger.error(str(e)) raise RuntimeError("Could not print the picture") from e finally: try: os.remove(path) + os.remove(processed_path) except OSError as e: raise e + self.app.logger.debug("Removed image : " + str(processed_path)) self.app.logger.debug("Removed image : " + str(path)) try: @@ -288,9 +291,10 @@ class EscPosPrinter(Printer): raise RuntimeError("Could not close the printer connexion. ") from e self.app.logger.info("Printed a picture") - return True + self.ready = True def _qr(self, content): + self.ready = False try: self.printer.open(self.usb_args) self.printer.qr(content, center=True) @@ -302,8 +306,10 @@ class EscPosPrinter(Printer): raise e self.app.logger.info("Printed a QR") + self.ready = True def _cut(self): + self.ready = False try: self.printer.open(self.usb_args) self.printer.cut() @@ -314,39 +320,49 @@ class EscPosPrinter(Printer): raise e self.app.logger.info("Did a cut") - return True + self.ready = True def _state(self): + self.app.logger.debug("Online : %s " , self.printer.is_online()) + self.app.logger.debug("Has paper : %s " , self._has_paper()) + self.app.logger.debug("Ready : %s " , self.ready) return self.printer.is_online() and self.ready and self._has_paper() def print_task(self, task_type, data): """Execute actual print based on task type""" with self._lock: - if self._state: - self._state = False - try: - match (task_type.value): - case "text": - self._print_txt(data["txt"], signature=data["sign"]) - self._state = True - case "image": - self._print_img( - data["img"], signature=data["sign"], process=data["process"] - ) - self._state = True - case "cut": - self._cut() - self._state = True - case "qr": - self._qr(data["txt"]) - self._state = True - case _: - raise RuntimeError("This task type is not supported") - except Exception as e: - self._state = True - raise RuntimeError from e + self.app.logger.debug("Acquired lock to start print") - raise RuntimeError("The printer is not ready to print yet !") + while not self._state(): + self.app.logger.debug("Waiting for the printer to become ready..") + time.sleep(0.3) + + self.app.logger.debug("Checked state to start printing %s", self._state()) + self.ready = False + try: + self.app.logger.debug("Checking task type") + match (task_type.value): + case "text": + + self._print_txt(data["txt"], signature=data["sign"]) + self.ready = True + case "image": + self._print_img( + data["img"], signature=data["sign"], process=data["process"] + ) + self.ready = True + case "cut": + self._cut() + self.ready = True + case "qr": + self._qr(data["txt"]) + self.ready = True + case _: + raise RuntimeError("This task type is not supported") + except Exception as e: + self.app.logger.debug("Exception occured while printing %s", str(e)) + self.ready = True + raise RuntimeError from e class BrotherPrinter(Printer): @@ -519,14 +535,14 @@ class BrotherPrinter(Printer): def _process_image(self, path): brightness_factor = 1.5 # Used only if image is too dark brightness_threshold = 100 # Brightness threshold (0–255) - # contrast_factor = 0.6 # Less than 1.0 = lower contrast + contrast_factor = 2 # Less than 1.0 = lower contrast max_width = 575 max_height = 1000 with Image.open(path) as original_img: # Convert to RGB if needed (JPEG doesn't support alpha) if original_img.mode in ("RGBA", "P"): - self.app.logger.debug("Converting the image to RGB from RGBA") + self.app.logger.debug("Converting the image from RGBA to RGBA") original_img = original_img.convert("RGB") # Resize while maintaining aspect ratio @@ -556,16 +572,19 @@ def _process_image(self, path): self.app.logger.debug( f"Image too dark, increasing brightness by a factor of {brightness_factor:.2f}" ) - enhancer = ImageEnhance.Brightness(original_img) - original_img = enhancer.enhance(brightness_factor) + enhancer = ImageEnhance.Brightness(grayscale) + grayscale = enhancer.enhance(brightness_factor) - # # Reduce contrast - # contrast_enhancer = ImageEnhance.Contrast(original_img) - # original_img = contrast_enhancer.enhance(contrast_factor) + # Computer current contrast of grayscale image + contrast = np.clip(np.std(np.array(grayscale)), 0, 255) + self.app.logger.debug("Standard deviation of the contrast : %s", contrast) + # # Enhance contrast + contrast_enhancer = ImageEnhance.Contrast(grayscale) + original_img = contrast_enhancer.enhance(contrast_factor) # Convert to JPEG and save jpeg_path = os.path.splitext(path)[0] + "_processed.jpg" - original_img.save(jpeg_path, format="JPEG", quality=95, optimize=True) + grayscale.save(jpeg_path, format="JPEG", quality=95, optimize=True) self.app.logger.debug("Processed and saved image.") return jpeg_path diff --git a/src/web.py b/src/web.py index 8bcb92c..90f88ea 100644 --- a/src/web.py +++ b/src/web.py @@ -44,7 +44,7 @@ class Web: file_uploaded = self.upload_file(image) except Exception as e: self.app.logger.error(e) - raise RuntimeError("Could not upload file") from e + raise RuntimeError("Could not upload file : " + str(e)) from e if file_uploaded: self.app.logger.debug("File has been uploaded, printing...") @@ -100,7 +100,7 @@ class Web: image.save(os.path.join(self.app.config["UPLOAD_FOLDER"], filename)) except OSError as e: self.app.logger.error("Could not save file %s", e) - return False + raise RuntimeError("An OS error occured while uploading this file : " + str(e)) from e self.app.logger.debug( "File saved to " @@ -111,7 +111,7 @@ class Web: self.app.logger.error( "Could not save file because the filename is forbidden" ) - return False + raise RuntimeError("This file type is forbidden.") def get_queue_state(self): """Return current queue state""" -- 2.47.3 From af15ed875457f3dc979665275e54e2a55598e098 Mon Sep 17 00:00:00 2001 From: n07070 Date: Thu, 4 Jun 2026 19:45:37 +0200 Subject: [PATCH 21/25] Manage file too big exceptions --- src/main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index acc4b93..ccd4efd 100644 --- a/src/main.py +++ b/src/main.py @@ -90,7 +90,7 @@ if not os.getenv("FLASK_DEBUG") is None and os.getenv("FLASK_DEBUG") is True: app.secret_key = configuration_file["secrets"]["flask_secret_key"] app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER app.config["ALLOWED_EXTENSIONS"] = ALLOWED_EXTENSIONS -app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 3Mb for a file upload +app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 10Mb for a file upload app.config["TEMPLATES_AUTO_RELOAD"] = True # Queue creation @@ -189,6 +189,9 @@ def web_print_img(): "No signature found for this print, using default signature : %s", str(e) ) sign = configuration_file["defaults"]["signature"] + except werkzeug.exceptions.RequestEntityTooLarge as e: + flash("Whoops, image is too big: " + str(e), "error") + return redirect(url_for("index")) # check if the post request has the file part if "img" not in request.files: -- 2.47.3 From 65e4a2ad9cedfaaaebac505965bbf66ab37ee601 Mon Sep 17 00:00:00 2001 From: n07070 Date: Fri, 12 Jun 2026 16:35:38 +0200 Subject: [PATCH 22/25] Manage error when no Printers are found --- src/printers.py | 5 +++++ src/worker.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/printers.py b/src/printers.py index b3d020c..7e21317 100644 --- a/src/printers.py +++ b/src/printers.py @@ -111,6 +111,11 @@ class Printers: self.app.logger.debug("Found a %s printer" , manufacturer) self.app.logger.debug("Found %s printers" , len(printers)) + + if len(printers) < 1: + self.app.logger.warning("Not printers found ! Please plug in a Printer and restart the program.") + raise RuntimeError("No printers found") + return printers def any(self) -> Printer: diff --git a/src/worker.py b/src/worker.py index 91b7567..048b304 100644 --- a/src/worker.py +++ b/src/worker.py @@ -64,7 +64,7 @@ class PrintWorker(threading.Thread): "Ready" if self.printer.ready else "Not ready", ) except Exception as e: - self.app.logger.error(str(e)) + self.app.logger.error("No printer detected" + str(e)) self.printer = None if self.state != "idle": -- 2.47.3 From 7d19098b61a1052da1777d8d781dca231ac6bb9e Mon Sep 17 00:00:00 2001 From: n07070 Date: Fri, 12 Jun 2026 17:19:53 +0200 Subject: [PATCH 23/25] Change management of state : we assume the printer is online. Otherwise, because this is a real-time command, we might not get the good answer and fail to print fast enough. See https://download4.epson.biz/sec_pubs/pos/reference_en/escpos/realtime_commands.html --- src/printer.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/printer.py b/src/printer.py index 27acdb9..3a886b3 100644 --- a/src/printer.py +++ b/src/printer.py @@ -322,22 +322,29 @@ class EscPosPrinter(Printer): self.app.logger.info("Did a cut") self.ready = True - def _state(self): - self.app.logger.debug("Online : %s " , self.printer.is_online()) - self.app.logger.debug("Has paper : %s " , self._has_paper()) - self.app.logger.debug("Ready : %s " , self.ready) - return self.printer.is_online() and self.ready and self._has_paper() + def _state(self) -> bool: + has_paper = self._has_paper() + is_ready = self.ready + + self.app.logger.debug("Has paper : %s " , has_paper ) + self.app.logger.debug("Ready : %s " , is_ready ) + + return is_ready and has_paper # and is_online def print_task(self, task_type, data): """Execute actual print based on task type""" + + with self._lock: self.app.logger.debug("Acquired lock to start print") - while not self._state(): - self.app.logger.debug("Waiting for the printer to become ready..") + i_m_ready = self._state() + while not i_m_ready: + self.app.logger.debug("Waiting for the printer to become ready, current state %s ", str(i_m_ready)) + i_m_ready = self._state() time.sleep(0.3) - self.app.logger.debug("Checked state to start printing %s", self._state()) + self.app.logger.debug("Checked state to start printing : %s", self._state()) self.ready = False try: self.app.logger.debug("Checking task type") -- 2.47.3 From 0699775d3554f883e145870ee54b775396971029 Mon Sep 17 00:00:00 2001 From: n07070 Date: Fri, 12 Jun 2026 17:20:44 +0200 Subject: [PATCH 24/25] Add welcome message --- src/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main.py b/src/main.py index ccd4efd..4a0645b 100644 --- a/src/main.py +++ b/src/main.py @@ -123,6 +123,8 @@ limiter = Limiter( ) +app.logger.info("🖶 Welcome to LittlePrynter !") + # General routes @app.route("/") @limiter.limit("1/second", override_defaults=False) -- 2.47.3 From 09e588c3ff338860451ec785dd63a985600cb079 Mon Sep 17 00:00:00 2001 From: n07070 Date: Fri, 12 Jun 2026 17:20:53 +0200 Subject: [PATCH 25/25] Change Error to warning when not on a Raspberry --- src/raspberry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/raspberry.py b/src/raspberry.py index 729e93a..fa8553c 100644 --- a/src/raspberry.py +++ b/src/raspberry.py @@ -68,7 +68,7 @@ class Raspberry: self.app.logger.warning( "Couldn't get sufficient hardware information from /proc/cpuinfo" ) - self.app.logger.error("Unable to determine if we are on a Raspberry Pi.") + self.app.logger.warning("Unable to determine if we are on a Raspberry Pi.") return False except IOError: self.app.logger.error("Unable to open `/proc/cpuinfo`.") -- 2.47.3