438 lines
15 KiB
Python
438 lines
15 KiB
Python
|
||
# import brother_ql
|
||
from time import sleep
|
||
import os.path
|
||
|
||
from PIL import Image, ImageEnhance
|
||
import numpy as np
|
||
|
||
# Importing the module to manage the connection to the printer.
|
||
import escpos.printer
|
||
|
||
|
||
class Printer(object):
|
||
"""
|
||
# The connection is based on the ESC/POS library
|
||
|
||
## Connection to the USB printer
|
||
|
||
## Making sure the printer is alive
|
||
|
||
## Making sure it has paper
|
||
|
||
## Define default print settings
|
||
|
||
## Print starting message, log time of first print, cut.
|
||
|
||
## Annonce readyness : return a positive pong message.
|
||
"""
|
||
|
||
# Is the printer ready to accept a new print ?
|
||
ready = False
|
||
|
||
def __init__(self, app, device_id, vendor_id):
|
||
super(Printer, self).__init__()
|
||
self.app = app
|
||
self.ready = False
|
||
self.printer = None
|
||
self.device_id = device_id
|
||
self.vendor_id = vendor_id
|
||
self.usb_args = {}
|
||
self.usb_args["idVendor"] = self.device_id
|
||
self.usb_args["idProduct"] = self.vendor_id
|
||
|
||
def check_paper(self) -> bool:
|
||
"""
|
||
On printers that support it, we check that the printer has paper
|
||
"""
|
||
self.app.logger.debug("Checking paper status...")
|
||
self.printer.open(self.usb_args)
|
||
status = self.printer.paper_status()
|
||
match status:
|
||
case 0:
|
||
self.app.logger.error("Printer has no more paper, aborting...")
|
||
self.printer.close()
|
||
raise RuntimeError("No more paper in the printer")
|
||
case 1:
|
||
self.app.logger.warning(
|
||
"Printer needs paper to be changed very soon ! "
|
||
)
|
||
self.printer.close()
|
||
case 2:
|
||
self.app.logger.debug("Printer has paper, good to go")
|
||
self.printer.close()
|
||
|
||
def init_printer(self):
|
||
"""
|
||
Check if the printer online ? Is the communication with the printer successfull ?
|
||
"""
|
||
|
||
# TODO: This could happen directly when creating a new Printer class
|
||
if os.getenv("FLASK_DEBUG"):
|
||
waiting_elapsed = 15
|
||
else:
|
||
waiting_elapsed = 10
|
||
|
||
self.app.logger.debug("Waiting for printer to get online...")
|
||
|
||
while not self.ready:
|
||
try:
|
||
# This also calls open(), which we need to close()
|
||
# or else the device will appear as busy.
|
||
p = escpos.printer.Usb(
|
||
self.device_id, self.vendor_id, 0, profile="TM-P80"
|
||
)
|
||
except Exception as e:
|
||
self.app.logger.error(
|
||
"The USB device is not plugged in, trying again %s : %s",
|
||
waiting_elapsed,
|
||
str(e),
|
||
)
|
||
pass
|
||
|
||
try:
|
||
if p.is_online():
|
||
self.ready = True
|
||
self.app.logger.debug("Printer online !")
|
||
except Exception as e:
|
||
self.app.logger.error(
|
||
"Error while getting the printer online %s : %s",
|
||
waiting_elapsed,
|
||
str(e),
|
||
)
|
||
pass
|
||
|
||
sleep(1)
|
||
waiting_elapsed -= 1
|
||
if waiting_elapsed < 1:
|
||
self.app.logger.error(
|
||
"Printer took more than 30 seconds to get online, aborting..."
|
||
)
|
||
waiting_elapsed = 1 # Reset the waiting time for the next print.
|
||
return False
|
||
|
||
# Setting up the printing options.
|
||
p.set(
|
||
align="center",
|
||
font="a",
|
||
bold=False,
|
||
underline=0,
|
||
width=1,
|
||
height=1,
|
||
density=9,
|
||
invert=False,
|
||
smooth=False,
|
||
flip=False,
|
||
double_width=False,
|
||
double_height=False,
|
||
custom_size=False,
|
||
)
|
||
# Beware : if we print every time the printer becomes ready, it means
|
||
# we are printing before and after every print !
|
||
self.printer = p
|
||
self.ready = True
|
||
self.printer.close()
|
||
|
||
self.check_paper()
|
||
|
||
return True
|
||
|
||
def _print_sms(self, msg, signature="", bold=False):
|
||
|
||
if not isinstance(msg, str):
|
||
self.app.logger.error(
|
||
"It is not possible to print a " + str(type(msg)) + ", only strings."
|
||
)
|
||
raise ValueError
|
||
|
||
# We make sure that the signature is not something too goofy
|
||
clean_msg = str(msg) + "\n"
|
||
clean_signature = str(signature)
|
||
|
||
# Make checks on the size of the message being printed
|
||
if len(clean_msg) > 4096:
|
||
self.app.logger.warning(
|
||
"Could not print message of this length: " + str(len(clean_msg))
|
||
)
|
||
raise Exception(
|
||
"Could not print message of this length :"
|
||
+ str(len(clean_msg))
|
||
+ ", needs to be below 4096 caracters long."
|
||
)
|
||
|
||
if len(signature) > 256:
|
||
self.app.logger.warning(
|
||
"Could not print signature of this length: " + str(len(clean_signature))
|
||
)
|
||
raise Exception(
|
||
"Could not print signature of this length :"
|
||
+ str(len(clean_signature))
|
||
+ ", needs to be below 256 caracters long."
|
||
)
|
||
|
||
# Do the actual printing
|
||
# We would pop the next element in the queue here, if it's a sms type
|
||
try:
|
||
self.printer.open(self.usb_args)
|
||
self.printer.set(align="center", font="a", bold=bold)
|
||
self.printer.textln(clean_msg)
|
||
if clean_signature:
|
||
self.printer.textln(clean_signature)
|
||
self.printer.close()
|
||
except Exception as e:
|
||
self.app.logger.error("Unable to print because : " + str(e))
|
||
raise RuntimeError(
|
||
"Unable to print a SMS, the printer couldn't do it."
|
||
) from e
|
||
|
||
self.app.logger.info("Printed text")
|
||
return True
|
||
|
||
def _print_img(self, path, signature="", center=True, process=False):
|
||
clean_signature = str(signature)
|
||
|
||
if len(signature) > 256:
|
||
self.app.logger.warning(
|
||
"Could not print signature of this length: " + str(len(clean_signature))
|
||
)
|
||
raise ValueError(
|
||
"Could not print signature of this length :"
|
||
+ str(len(clean_signature))
|
||
+ ", needs to be below 256 caracters long."
|
||
)
|
||
|
||
if not os.path.isfile(str(path)):
|
||
self.app.logger.warning("File does not exist : " + str(path))
|
||
raise OSError(
|
||
"The file path for this image :"
|
||
+ str(path)
|
||
+ " wasn't found. Please try again."
|
||
)
|
||
else:
|
||
self.app.logger.debug("Printing file from " + str(path))
|
||
|
||
if process:
|
||
try:
|
||
self.app.logger.debug("Proccessing the image")
|
||
path = _process_image(self, path)
|
||
except RuntimeError as e:
|
||
self.app.logger.error(
|
||
"Error while processing the image, aborting print : %s", str(e)
|
||
)
|
||
raise e
|
||
else:
|
||
self.app.logger.warning("Not proccessing the image")
|
||
|
||
try:
|
||
self.printer.open(self.usb_args)
|
||
self.printer.image(path, center=center)
|
||
self.printer.textln(signature)
|
||
self.printer.close()
|
||
self.app.logger.debug("Printed an image : " + str(path))
|
||
except Exception as e:
|
||
self.app.logger.error(str(e))
|
||
raise RuntimeError("Could not print the picture") from e
|
||
finally:
|
||
try:
|
||
os.remove(path)
|
||
except OSError as e:
|
||
raise e
|
||
|
||
self.app.logger.debug("Removed image : " + str(path))
|
||
|
||
try:
|
||
self.printer.close()
|
||
except Exception as e:
|
||
self.app.logger.error(
|
||
"Could not close the printer connexion %s", str(e)
|
||
)
|
||
raise RuntimeError("Could not close the printer connexion. ") from e
|
||
|
||
self.app.logger.info("Printed a picture")
|
||
return True
|
||
|
||
def _qr(self, content):
|
||
try:
|
||
self.printer.open(self.usb_args)
|
||
self.printer.qr(content, center=True)
|
||
self.printer.close()
|
||
except Exception as e:
|
||
self.printer.close()
|
||
self.app.logger.error(str(e))
|
||
return False
|
||
|
||
self.app.logger.info("Printed a QR")
|
||
return True
|
||
|
||
def _cut(self):
|
||
try:
|
||
self.printer.open(self.usb_args)
|
||
self.printer.cut()
|
||
self.printer.close()
|
||
except Exception as e:
|
||
self.printer.close()
|
||
self.app.logger.error(str(e))
|
||
raise e
|
||
|
||
self.app.logger.info("Did a cut")
|
||
return True
|
||
|
||
def print_task(self, task_type, data):
|
||
"""Execute actual print based on task type"""
|
||
match (task_type.value):
|
||
case "text":
|
||
self._print_sms(data["txt"], signature=data["sign"])
|
||
case "image":
|
||
self._print_img(
|
||
data["img"], signature=data["sign"], process=data["process"]
|
||
)
|
||
case "cut":
|
||
self._cut()
|
||
case _:
|
||
raise RuntimeError("This task type is not supported")
|
||
|
||
|
||
def _process_image(self, path):
|
||
brightness_factor = 1.5 # Used only if image is too dark
|
||
brightness_threshold = 100 # Brightness threshold (0–255)
|
||
contrast_factor = 0.6 # Less than 1.0 = lower contrast
|
||
max_width = 575
|
||
max_height = 1000
|
||
|
||
with Image.open(path) as original_img:
|
||
# Convert to RGB if needed (JPEG doesn't support alpha)
|
||
if original_img.mode in ("RGBA", "P"):
|
||
self.app.logger.debug("Converting the image to RGB from RGBA")
|
||
original_img = original_img.convert("RGB")
|
||
|
||
# Resize while maintaining aspect ratio
|
||
original_img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
|
||
self.app.logger.debug("Resized the image")
|
||
|
||
# # Convert to grayscale for dithering
|
||
# dithered_img = original_img.convert("L").convert("1") # Dithering using default method (Floyd–Steinberg)
|
||
# self.app.logger.debug("Dithered the image")
|
||
|
||
# Compute brightness of original image (grayscale average)
|
||
grayscale = original_img.convert("L")
|
||
avg_brightness = np.array(grayscale).mean()
|
||
self.app.logger.debug(
|
||
"Average brightness of the image : " + str(avg_brightness)
|
||
)
|
||
|
||
# Dynamically compute brightness factor if too dark
|
||
if avg_brightness < brightness_threshold:
|
||
brightness_factor = (
|
||
1 + (brightness_threshold - avg_brightness) / brightness_threshold
|
||
)
|
||
brightness_factor = min(
|
||
max(brightness_factor, 1.1), 2.5
|
||
) # Clamp between 1.1 and 2.5
|
||
self.app.logger.debug(
|
||
f"Image too dark, increasing brightness by a factor of {brightness_factor:.2f}"
|
||
)
|
||
enhancer = ImageEnhance.Brightness(original_img)
|
||
original_img = enhancer.enhance(brightness_factor)
|
||
|
||
# # Reduce contrast
|
||
# contrast_enhancer = ImageEnhance.Contrast(original_img)
|
||
# original_img = contrast_enhancer.enhance(contrast_factor)
|
||
|
||
# Convert to JPEG and save
|
||
jpeg_path = os.path.splitext(path)[0] + "_processed.jpg"
|
||
original_img.save(jpeg_path, format="JPEG", quality=95, optimize=True)
|
||
self.app.logger.debug("Processed and saved image.")
|
||
|
||
return jpeg_path
|
||
|
||
|
||
def discover_printers():
|
||
"""
|
||
We try to find all the connected printers ( 0 or n ) to this system.
|
||
|
||
For every type of supported printer, we try to autodiscover them.
|
||
|
||
http://www.linux-usb.org/usb.ids A list of USB vendor IDs
|
||
|
||
04b8 Seiko Epson Corp.
|
||
04f9 Brother Industries, Ltd
|
||
"""
|
||
|
||
|
||
def find_and_parse_borther_ql_printer():
|
||
|
||
## We might be able to no use this because there is a `discover` command in https://github.com/pklaus/brother_ql#usage
|
||
|
||
## Code stolen from https://framagit.org/stickoeur/diagnostickoeur/-/blob/no-masters/printit.py?ref_type=heads
|
||
|
||
"""Find and parse Brother QL printer information."""
|
||
|
||
model_manager = ModelsManager()
|
||
|
||
# Debug print to show we're searching
|
||
# print("Searching for Brother QL printer...")
|
||
|
||
for backend_name in ["pyusb", "linux_kernel"]:
|
||
try:
|
||
# print(f"Trying backend: {backend_name}")
|
||
backend = backend_factory(backend_name)
|
||
available_devices = backend["list_available_devices"]()
|
||
# print(f"Found {len(available_devices)} devices with {backend_name} backend")
|
||
|
||
for printer in available_devices:
|
||
# print(f"Found device: {printer}")
|
||
identifier = printer["identifier"]
|
||
parts = identifier.split("/")
|
||
|
||
if len(parts) < 4:
|
||
# print(f"Skipping device with invalid identifier format: {identifier}")
|
||
continue
|
||
|
||
protocol = parts[0]
|
||
device_info = parts[2]
|
||
serial_number = parts[3]
|
||
|
||
try:
|
||
vendor_id, product_id = device_info.split(":")
|
||
except ValueError:
|
||
# print(f"Invalid device info format: {device_info}")
|
||
continue
|
||
|
||
# Default model
|
||
model = "QL-570"
|
||
|
||
# Try to match product ID to determine actual model
|
||
try:
|
||
product_id_int = int(product_id, 16)
|
||
for m in model_manager.iter_elements():
|
||
if m.product_id == product_id_int:
|
||
model = m.identifier
|
||
break
|
||
# print(f"Matched printer model: {model}")
|
||
except ValueError:
|
||
# print(f"Invalid product ID format: {product_id}")
|
||
continue
|
||
|
||
printer_info = {
|
||
"identifier": identifier,
|
||
"backend": backend_name,
|
||
"model": model,
|
||
"protocol": protocol,
|
||
"vendor_id": vendor_id,
|
||
"product_id": product_id,
|
||
"serial_number": serial_number,
|
||
}
|
||
# print(f"Found printer: {printer_info}")
|
||
return printer_info
|
||
|
||
except Exception as e:
|
||
# print(f"Error with backend {backend_name}: {str(e)}")
|
||
continue
|
||
|
||
print("No Brother QL printer found")
|
||
return None
|
||
|
||
|
||
def fint_and_parse_epson_printer():
|
||
pass
|