Files
littleprynter/src/printer.py
n07070 7d19098b61 Change management of state : we assume the printer is online. Otherwise,
because this is a real-time command, we might not get the good answer
and fail to print fast enough. See https://download4.epson.biz/sec_pubs/pos/reference_en/escpos/realtime_commands.html
2026-06-12 17:19:53 +02:00

598 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
This class manages connexion to a Printer
"""
import os.path
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
import time
from enum import Enum
import uuid
import threading
import usb.core
from PIL import Image, ImageEnhance
import numpy as np
# Importing the modules needed for each supported printer Type
import escpos.printer
from brother_ql.models import ModelsManager
from brother_ql.backends import backend_factory
from brother_ql.raster import BrotherQLRaster
from brother_ql.conversion import convert
from brother_ql.backends.helpers import send
class PrinterType(Enum):
"""
What are the capacities of a Printer ?
"""
EPSON = "epson"
BROTHER = "brother"
# For Brother-QL Printers
@dataclass
class PrinterInfo:
"""
Brother-QL printer information
"""
identifier: str
backend: str
protocol: str
vendor_id: str
product_id: str
serial_number: str
name: str = "Brother QL Printer"
model: str = "QL-570"
status: str = "unknown"
label_type: str = "unknown"
label_size: str = "unknown"
label_width: int = 0
label_height: int = 0
def __getitem__(self, item):
return getattr(self, item)
def __setitem__(self, key, value):
setattr(self, key, value)
class Printer(ABC):
"""
If it outputs printed paper and speaks like a printer, then it must be a printer.
"""
def __init__(self, app, vendor_id, device_id, printer_type: PrinterType):
"""
We initialize a Printer via it's USB connexion, and generate a unique ID
"""
self.id = uuid.uuid4()
self.app = app
self.vendor_id = vendor_id
self.device_id = device_id
self.ready = False
self.printer_type = printer_type
self._lock = threading.Lock()
@abstractmethod
def _has_paper(self) -> bool:
"""Check if the printer has papier"""
@abstractmethod
def _state(self) -> bool:
"""Reports the state of the Printer"""
@abstractmethod
def print_task(self, task_type, data) -> None:
"""Takes a PrintTask and executes it"""
class EscPosPrinter(Printer):
"""
Create a new ESC/POS based printer.
"""
def __init__(self, app, vendor_id, device_id):
"""
Create a connexion to a ESC/POS Printer via USB,
Making sure the printer is alive,
Making sure it has paper,
Define default print settings
"""
super().__init__(app, vendor_id, device_id, printer_type=PrinterType.EPSON)
self.printer = None
self.usb_args = {}
self.usb_args["idVendor"] = self.vendor_id
self.usb_args["idProduct"] = self.device_id
try:
# This also calls open(), which we need to close()
# or else the device will appear as busy.
p = escpos.printer.Usb(self.vendor_id, self.device_id, 0, profile="TM-P80")
except escpos.exceptions.DeviceNotFoundError as e:
self.app.logger.error(
"The USB device is not plugged in : %s",
str(e),
)
except Exception as e:
self.app.logger.error("Printer could not be connected : %s ", str(e))
try:
if p.is_online():
self.app.logger.debug("Printer online !")
except Exception as e:
raise e
# Setting up the printing options.
p.set(
align="center",
font="a",
bold=False,
underline=0,
width=1,
height=1,
density=9,
invert=False,
smooth=False,
flip=False,
double_width=False,
double_height=False,
custom_size=False,
)
# Beware : if we print every time the printer becomes ready, it means
# we are printing before and after every print !
self.printer = p
self.printer.close() # We close the connexion to the Printer
try:
self._has_paper()
except Exception as e:
raise e
self.ready = True
def _has_paper(self):
"""Check if the printer has paper left"""
self.app.logger.debug("Checking paper status...")
self.printer.open(self.usb_args)
status = self.printer.paper_status()
match status:
case 0:
self.app.logger.error("Printer has no more paper, aborting...")
self.printer.close()
raise RuntimeError("No more paper in the printer")
case 1:
self.app.logger.warning(
"Printer needs paper to be changed very soon ! "
)
self.printer.close()
return True
case 2:
self.app.logger.debug("Printer has paper, good to go")
self.printer.close()
return True
def _print_txt(self, msg, signature="", bold=False):
self.ready = False
if not isinstance(msg, str):
self.app.logger.error(
"It is not possible to print a " + str(type(msg)) + ", only strings."
)
raise ValueError
# We make sure that the signature is not something too goofy
clean_msg = str(msg) + "\n"
clean_signature = str(signature)
# Make checks on the size of the message being printed
if len(clean_msg) > 4096:
self.app.logger.warning(
"Could not print message of this length: " + str(len(clean_msg))
)
raise RuntimeError(
"Could not print message of this length :"
+ str(len(clean_msg))
+ ", needs to be below 4096 caracters long."
)
if len(signature) > 256:
self.app.logger.warning(
"Could not print signature of this length: " + str(len(clean_signature))
)
raise RuntimeError(
"Could not print signature of this length :"
+ str(len(clean_signature))
+ ", needs to be below 256 caracters long."
)
# Do the actual printing
# We would pop the next element in the queue here, if it's a sms type
try:
self.printer.open(self.usb_args)
self.printer.set(align="center", font="a", bold=bold)
self.printer.textln(clean_msg)
if clean_signature:
self.printer.textln(clean_signature)
self.printer.textln()
self.printer.close()
except Exception as e:
self.app.logger.error("Unable to print because : " + str(e))
raise RuntimeError(
"Unable to print a SMS, the printer couldn't do it."
) from e
self.app.logger.info("Printed text")
self.ready = True
def _print_img(self, path, signature="", center=True, process=False):
self.ready = False
clean_signature = str(signature)
if len(signature) > 256:
self.app.logger.warning(
"Could not print signature of this length: " + str(len(clean_signature))
)
raise ValueError(
"Could not print signature of this length :"
+ str(len(clean_signature))
+ ", needs to be below 256 caracters long."
)
if not os.path.isfile(str(path)):
self.app.logger.warning("File does not exist : " + str(path))
raise OSError(
"The file path for this image :"
+ str(path)
+ " wasn't found. Please try again."
)
self.app.logger.debug("Printing file from " + str(path))
if process:
try:
self.app.logger.debug("Proccessing the image")
processed_path = _process_image(self, path)
except RuntimeError as e:
self.app.logger.error(
"Error while processing the image, aborting print : %s", str(e)
)
raise e
else:
processed_path = path
self.app.logger.warning("Not proccessing the image")
try:
self.printer.open(self.usb_args)
self.printer.image(processed_path, center=center)
self.printer.textln(signature)
self.printer.close()
self.app.logger.debug("Printed an image : " + str(processed_path))
except Exception as e:
self.app.logger.error(str(e))
raise RuntimeError("Could not print the picture") from e
finally:
try:
os.remove(path)
os.remove(processed_path)
except OSError as e:
raise e
self.app.logger.debug("Removed image : " + str(processed_path))
self.app.logger.debug("Removed image : " + str(path))
try:
self.printer.close()
except Exception as e:
self.app.logger.error(
"Could not close the printer connexion %s", str(e)
)
raise RuntimeError("Could not close the printer connexion. ") from e
self.app.logger.info("Printed a picture")
self.ready = True
def _qr(self, content):
self.ready = False
try:
self.printer.open(self.usb_args)
self.printer.qr(content, center=True)
self.printer.textln(content)
self.printer.close()
except RuntimeError as e:
self.printer.close()
self.app.logger.error(str(e))
raise e
self.app.logger.info("Printed a QR")
self.ready = True
def _cut(self):
self.ready = False
try:
self.printer.open(self.usb_args)
self.printer.cut()
self.printer.close()
except Exception as e:
self.printer.close()
self.app.logger.error(str(e))
raise e
self.app.logger.info("Did a cut")
self.ready = True
def _state(self) -> bool:
has_paper = self._has_paper()
is_ready = self.ready
self.app.logger.debug("Has paper : %s " , has_paper )
self.app.logger.debug("Ready : %s " , is_ready )
return is_ready and has_paper # and is_online
def print_task(self, task_type, data):
"""Execute actual print based on task type"""
with self._lock:
self.app.logger.debug("Acquired lock to start print")
i_m_ready = self._state()
while not i_m_ready:
self.app.logger.debug("Waiting for the printer to become ready, current state %s ", str(i_m_ready))
i_m_ready = self._state()
time.sleep(0.3)
self.app.logger.debug("Checked state to start printing : %s", self._state())
self.ready = False
try:
self.app.logger.debug("Checking task type")
match (task_type.value):
case "text":
self._print_txt(data["txt"], signature=data["sign"])
self.ready = True
case "image":
self._print_img(
data["img"], signature=data["sign"], process=data["process"]
)
self.ready = True
case "cut":
self._cut()
self.ready = True
case "qr":
self._qr(data["txt"])
self.ready = True
case _:
raise RuntimeError("This task type is not supported")
except Exception as e:
self.app.logger.debug("Exception occured while printing %s", str(e))
self.ready = True
raise RuntimeError from e
class BrotherPrinter(Printer):
"""
Manages connexion and capabilities of a BrotherQL Printer
"""
def __init__(self, app, vendor_id, device_id):
super().__init__(
app, vendor_id="", device_id="", printer_type=PrinterType.BROTHER
)
self.printer = None
self.usb_args = {}
self.usb_args["idVendor"] = self.device_id
self.usb_args["idProduct"] = self.vendor_id
self.model_manager = ModelsManager()
# Code taken from https://github.com/5shekel/printit/blob/master/printer_utils.py
backend = backend_factory("pyusb")
available_devices = backend["list_available_devices"]()
for printer in available_devices:
self.app.logger.debug(f"Found device: {printer}")
identifier = printer["identifier"]
parts = identifier.split("/")
if len(parts) < 4:
self.app.logger.warning(
f"Skipping device with invalid identifier format: {identifier}"
)
continue
protocol = parts[0]
# device_info = parts[2]
serial_number = parts[3]
try:
product_id_int = int(self.device_id, 16)
for m in self.model_manager.iter_elements():
if m.product_id == product_id_int:
model = m.identifier
break
self.app.logger.debug(f"Matched printer model: {model}")
except ValueError:
self.app.logger.warning(f"Invalid product ID format: {m.product_id}")
self.printer_info = PrinterInfo(
identifier=identifier,
backend="pyusb",
model=model,
protocol=protocol,
vendor_id=vendor_id,
product_id=self.device_id,
serial_number=serial_number,
)
self.ready = True
def _has_paper(self):
raise NotImplementedError("This printer model does not support this.")
def _state(self):
return self.ready
def _print_img(self, data):
"""
Print a raster image via a Brother QL printer
"""
self.ready = False
label_type = "102"
rotate = 0
dither = False
try:
# Prepare the image for printing
qlr = BrotherQLRaster(self.printer_info["model"])
instructions = convert(
qlr=qlr,
images=[data["img"]],
label=label_type,
rotate=rotate,
threshold=70,
dither=dither,
compress=True,
red=False,
dpi_600=False,
hq=False,
cut=True,
)
# Debug logging
if os.getenv("FLASK_DEBUG"):
self.app.logger.debug(f"""
Print parameters:
- Label type: {label_type}
- Rotate: {rotate}
- Dither: {dither}
- Model: {self.printer_info['model']}
- Backend: {self.printer_info['backend']}
- Identifier: {self.printer_info['identifier']}
""")
# Try to print using Python API
# send() = status = {
# 'instructions_sent': True, # The instructions were sent to the printer.
# 'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error'
# 'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it.
# 'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability).
# 'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown.
# }
status = send(
instructions=instructions,
printer_identifier=self.printer_info["identifier"],
backend_identifier="pyusb",
)
if (
not status["did_print"]
or status["outcome"] == "error"
or status["outcome"] == "unknown"
):
raise RuntimeError("Failed to print using Python API")
if status["printer_state"]:
self.ready = bool(status["printer_state"])
else:
self.ready = True
except usb.core.USBError as e:
# Treat timeout errors as successful since they often occur after print completion
if e.errno == 110: # Operation timed out
self.app.logger.debug(
"USB timeout occurred - this is normal and the print likely completed"
)
self.app.logger.debug("Print completed (timeout is normal)")
self.ready = True
error_msg = f"USBError encountered: {e}"
self.app.logger.debug(error_msg)
raise RuntimeError from e
except Exception as e:
error_msg = f"Unexpected error during printing: {str(e)}"
self.app.logger.debug(error_msg)
raise RuntimeError from e
def print_task(self, task_type, data):
"""Execute actual print based on task type"""
with self._lock:
if self._state:
self._state = False
match (task_type.value):
case "image":
self._print_img(data["img"])
self._state = True
case "cut":
# The cut happens by default on Brother QL printers.
self._state = True
case _:
raise RuntimeError("This task type is not supported")
else:
raise RuntimeError("The printer is not ready to print yet !")
# These values are by default for now
# raise NotImplementedError("This printer type is not implemented yet")
def _process_image(self, path):
brightness_factor = 1.5 # Used only if image is too dark
brightness_threshold = 100 # Brightness threshold (0255)
contrast_factor = 2 # Less than 1.0 = lower contrast
max_width = 575
max_height = 1000
with Image.open(path) as original_img:
# Convert to RGB if needed (JPEG doesn't support alpha)
if original_img.mode in ("RGBA", "P"):
self.app.logger.debug("Converting the image from RGBA to RGBA")
original_img = original_img.convert("RGB")
# Resize while maintaining aspect ratio
original_img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
self.app.logger.debug("Resized the image")
# # Convert to grayscale for dithering
# dithered_img = original_img.convert("L").convert("1")
# Dithering using default method (FloydSteinberg)
# self.app.logger.debug("Dithered the image")
# Compute brightness of original image (grayscale average)
grayscale = original_img.convert("L")
avg_brightness = np.array(grayscale).mean()
self.app.logger.debug(
"Average brightness of the image : " + str(avg_brightness)
)
# Dynamically compute brightness factor if too dark
if avg_brightness < brightness_threshold:
brightness_factor = (
1 + (brightness_threshold - avg_brightness) / brightness_threshold
)
brightness_factor = min(
max(brightness_factor, 1.1), 2.5
) # Clamp between 1.1 and 2.5
self.app.logger.debug(
f"Image too dark, increasing brightness by a factor of {brightness_factor:.2f}"
)
enhancer = ImageEnhance.Brightness(grayscale)
grayscale = enhancer.enhance(brightness_factor)
# Computer current contrast of grayscale image
contrast = np.clip(np.std(np.array(grayscale)), 0, 255)
self.app.logger.debug("Standard deviation of the contrast : %s", contrast)
# # Enhance contrast
contrast_enhancer = ImageEnhance.Contrast(grayscale)
original_img = contrast_enhancer.enhance(contrast_factor)
# Convert to JPEG and save
jpeg_path = os.path.splitext(path)[0] + "_processed.jpg"
grayscale.save(jpeg_path, format="JPEG", quality=95, optimize=True)
self.app.logger.debug("Processed and saved image.")
return jpeg_path