Apply black formatter

This commit is contained in:
n07070
2026-06-04 00:31:04 +02:00
parent a2d1779e2b
commit 3c490e10b4
8 changed files with 103 additions and 49 deletions

View File

@@ -126,6 +126,7 @@ limiter = Limiter(
get_remote_address, app=app, default_limits=["1500 per day", "500 per hour"]
)
# General routes
@app.route("/")
@limiter.limit("1/second", override_defaults=False)
@@ -317,11 +318,13 @@ def api_queue_status():
"""API endpoint for entire queue"""
return jsonify(web.get_queue_state())
@app.route("/api/queue/completed", methods=["GET"])
def api_queue_completed():
"""API endpoint that returns the finished tasks"""
return jsonify(web.get_queue_completed())
@app.route("/api/worker", methods=["GET"])
def api_worker_state():
"""API endpoint to get the worker state"""

View File

@@ -76,7 +76,10 @@ class PrintQueue:
"""Return current queue state"""
with self._lock:
self.app.logger.debug("Return current queue state")
return [{"task_id": t.task_id, "status": t.status, "type": str(t.task_type) } for t in self._queue]
return [
{"task_id": t.task_id, "status": t.status, "type": str(t.task_type)}
for t in self._queue
]
def get_queue_completed(self):
"""Return completed queue elements"""

View File

@@ -1,8 +1,10 @@
"""
This class manages connexion to a Printer
"""
from time import sleep
import os.path
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -14,7 +16,6 @@ import threading
from PIL import Image, ImageEnhance
import numpy as np
# Importing the modules needed for each supported printer Type
import escpos.printer
from brother_ql.models import ModelsManager
@@ -24,13 +25,16 @@ from brother_ql.raster import BrotherQLRaster
from brother_ql.conversion import convert
from brother_ql.backends.helpers import send
class PrinterType(Enum):
"""
What are the capacities of a Printer ?
"""
EPSON = "epson"
BROTHER = "brother"
# For Brother-QL Printers
@dataclass
class PrinterInfo:
@@ -54,6 +58,7 @@ class PrinterInfo:
def __setitem__(self, key, value):
setattr(self, key, value)
class Printer(ABC):
"""
If it outputs printed paper and speaks like a printer, then it must be a printer.
@@ -105,9 +110,7 @@ class EscPosPrinter(Printer):
try:
# This also calls open(), which we need to close()
# or else the device will appear as busy.
p = escpos.printer.Usb(
self.vendor_id, self.device_id, 0, profile="TM-P80"
)
p = escpos.printer.Usb(self.vendor_id, self.device_id, 0, profile="TM-P80")
except escpos.exceptions.DeviceNotFoundError as e:
self.app.logger.error(
"The USB device is not plugged in : %s",
@@ -339,13 +342,16 @@ class EscPosPrinter(Printer):
else:
raise RuntimeError("The printer is not ready to print yet !")
class BrotherPrinter(Printer):
"""
Manages connexion and capabilities of a BrotherQL Printer
"""
def __init__(self, app, vendor_id, device_id):
super().__init__(app, vendor_id="",device_id="", printer_type=PrinterType.BROTHER)
super().__init__(
app, vendor_id="", device_id="", printer_type=PrinterType.BROTHER
)
self.printer = None
self.usb_args = {}
self.usb_args["idVendor"] = self.device_id
@@ -363,7 +369,9 @@ class BrotherPrinter(Printer):
parts = identifier.split("/")
if len(parts) < 4:
self.app.logger.warning(f"Skipping device with invalid identifier format: {identifier}")
self.app.logger.warning(
f"Skipping device with invalid identifier format: {identifier}"
)
continue
protocol = parts[0]
@@ -378,7 +386,7 @@ class BrotherPrinter(Printer):
break
self.app.logger.debug(f"Matched printer model: {model}")
except ValueError:
self.app.logger.warning(f"Invalid product ID format: {product_id}")
self.app.logger.warning(f"Invalid product ID format: {m.product_id}")
self.printer_info = PrinterInfo(
identifier=identifier,
@@ -392,7 +400,6 @@ class BrotherPrinter(Printer):
self.ready = True
def _has_paper(self):
raise NotImplementedError("This printer model does not support this.")
@@ -426,7 +433,7 @@ class BrotherPrinter(Printer):
)
# Debug logging
if FLASK_DEBUG:
if os.getenv("FLASK_DEBUG"):
self.app.logger.debug(f"""
Print parameters:
- Label type: {label_type}
@@ -448,10 +455,14 @@ class BrotherPrinter(Printer):
status = send(
instructions=instructions,
printer_identifier=self.printer_info["identifier"],
backend_identifier="pyusb"
backend_identifier="pyusb",
)
if not status["did_print"] or status["outcome"] == "error" or status["outcome"] == "unknown":
if (
not status["did_print"]
or status["outcome"] == "error"
or status["outcome"] == "unknown"
):
raise RuntimeError("Failed to print using Python API")
if status["printer_state"]:
@@ -462,7 +473,9 @@ class BrotherPrinter(Printer):
except usb.core.USBError as e:
# Treat timeout errors as successful since they often occur after print completion
if e.errno == 110: # Operation timed out
self.app.logger.debug("USB timeout occurred - this is normal and the print likely completed")
self.app.logger.debug(
"USB timeout occurred - this is normal and the print likely completed"
)
self.app.logger.debug("Print completed (timeout is normal)")
self.ready = True
@@ -496,6 +509,7 @@ class BrotherPrinter(Printer):
# raise NotImplementedError("This printer type is not implemented yet")
def _process_image(self, path):
brightness_factor = 1.5 # Used only if image is too dark
brightness_threshold = 100 # Brightness threshold (0255)

View File

@@ -3,15 +3,18 @@ A collection of Printers.
It has methods to discover printers, and provides an interface for the methods expected from printers.
"""
from collections.abc import Mapping, Set
import usb.core
import usb.util
from printer import Printer, EscPosPrinter, BrotherPrinter, PrinterType
class Printers():
class Printers:
"""
Finds and creates a set of Printer that can be used by the Workers to print.
"""
def __init__(self, app):
"""
Discover printers connected to the computer and return a Collection of Printer()
@@ -38,8 +41,12 @@ class Printers():
# Find all connected USB devices
devices = usb.core.find(find_all=True, custom_match=_FindClass(7))
if not devices:
self.app.logger.warning("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.")
raise RuntimeError("No USB devices of class 7 ( printers ) found or pyusb could not access the bus.")
self.app.logger.warning(
"No USB devices of class 7 ( printers ) found or pyusb could not access the bus."
)
raise RuntimeError(
"No USB devices of class 7 ( printers ) found or pyusb could not access the bus."
)
for dev in devices:
# Attempt to get the manufacturer and product strings
@@ -52,7 +59,13 @@ class Printers():
product = usb.util.get_string(dev, dev.iProduct)
except Exception:
product = "Unknown"
self.app.logger.debug("Looking at %s %s (%s:%s)", manufacturer, product, hex(dev.idVendor), hex(dev.idProduct))
self.app.logger.debug(
"Looking at %s %s (%s:%s)",
manufacturer,
product,
hex(dev.idVendor),
hex(dev.idProduct),
)
if manufacturer == "EPSON":
try:
@@ -60,7 +73,9 @@ class Printers():
self.app.logger.debug("Trying to creat a new EPSON printer")
prid = dev.idProduct
vendir = dev.idVendor
escpos_printer = EscPosPrinter(self.app, vendor_id=vendir, device_id=prid)
escpos_printer = EscPosPrinter(
self.app, vendor_id=vendir, device_id=prid
)
except Exception as e:
raise e
@@ -79,9 +94,15 @@ class Printers():
self.app.logger.debug("Trying to creat a new BROTHER printer")
prid = dev.idProduct
vendir = dev.idVendor
brother_printer = BrotherPrinter(self.app, vendor_id=vendir,device_id=prid)
brother_printer = BrotherPrinter(
self.app, vendor_id=vendir, device_id=prid
)
except Exception as e:
self.app.logger.error("Could not create a %s printer class with %s:%s" % product, dev.idVendor, dev.idProduct)
self.app.logger.error(
"Could not create a %s printer class with %s:%s" % product,
dev.idVendor,
dev.idProduct,
)
raise e
# If the object creation is successfull, we add it to the list of Printers
@@ -110,9 +131,10 @@ class Printers():
return NotImplementedError()
class _FindClass():
class _FindClass:
def __init__(self, class_):
self._class = class_
def __call__(self, device):
# first, let's check the device
if device.bDeviceClass == self._class:
@@ -121,10 +143,7 @@ class _FindClass():
# interface that matches our class
for cfg in device:
# find_descriptor: what's it?
intf = usb.util.find_descriptor(
cfg,
bInterfaceClass=self._class
)
intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class)
if intf is not None:
return True

View File

@@ -15,7 +15,8 @@ from gpiozero import Button, LED, DigitalOutputDevice
from PIL import Image
from task import TextTask, ImageTask, CutTask
class Raspberry():
class Raspberry:
"""
This class will manage three things :
- Connecting to a USB webcam
@@ -264,7 +265,9 @@ class Raspberry():
self.led.on()
self.crop_to_square(self.image_path)
self.print_queue.enqueue(ImageTask(self.image_path, signature="", process=True))
self.print_queue.enqueue(TextTask(content="Imprimé par LittlePrynter", signature=""))
self.print_queue.enqueue(
TextTask(content="Imprimé par LittlePrynter", signature="")
)
time = strftime("%Y-%m-%d %H:%M", gmtime())
self.print_queue.enqueue(TextTask(content=time, signature=""))
self.print_queue.enqueue(CutTask())

View File

@@ -14,6 +14,7 @@ to print at the same time.
We can also delay and store printing tasks until a printer becomes
available if none is online.
"""
from abc import ABC, abstractmethod
## See https://docs.python.org/3/library/abc.html to learn more about this
@@ -27,6 +28,7 @@ class TaskType(Enum):
"""
The different tasks supported by the printers
"""
TEXT = "text"
IMAGE = "image"
CUT = "cut"
@@ -47,7 +49,6 @@ class PrintTask(ABC):
def get_print_data(self):
"""Return data formatted for printer"""
def _generate_id(self):
# Generate unique task ID
return str(uuid.uuid4())
@@ -66,8 +67,10 @@ class TextTask(PrintTask):
def get_print_data(self):
return {"txt": self.content, "sign": self.signature}
class QRTask(TextTask):
"""This task prints a QR-Code, the signature is ignore and is always the content itself"""
def __init__(self, content):
super().__init__(content, signature="")
self.content = content
@@ -76,6 +79,7 @@ class QRTask(TextTask):
def get_print_data(self):
return {"txt": self.content, "sign": self.signature}
class ImageTask(PrintTask):
"""
This tasks represents a image content ( in the form of it's path ), and it's signature.

View File

@@ -48,7 +48,11 @@ class PrintWorker(threading.Thread):
try:
self.app.logger.debug("Changing printers")
self.printer = next(self.printers)
self.app.logger.debug("The worker got a %s printer and it's %s", self.printer.printer_type, "Ready" if self.printer.ready else "Not ready")
self.app.logger.debug(
"The worker got a %s printer and it's %s",
self.printer.printer_type,
"Ready" if self.printer.ready else "Not ready",
)
except Exception as e:
self.app.logger.error(str(e))
self.printer = None
@@ -89,7 +93,9 @@ class PrintWorker(threading.Thread):
task.status = "completed"
self.print_queue.mark_completed(task.task_id, "completed")
self._emit_status(task.task_id, "completed")
self.app.logger.debug("Finished printing task %s " % task.task_id)
self.app.logger.debug(
"Finished printing task %s " % task.task_id
)
self.state = "idle"
except RuntimeError as e:
@@ -97,7 +103,9 @@ class PrintWorker(threading.Thread):
self.state = "idle"
self.print_queue.mark_completed(task.task_id, "failed")
self._emit_status(task.task_id, "failed", error=str(e))
self.app.logger.error("Could not print task %s because %s " % task.task_id, str(e))
self.app.logger.error(
"Could not print task %s because %s " % task.task_id, str(e)
)
else:
# When they are no new tasks to handle, we put the thread to sleep.