Change the Printer class to implement multiple printer types
This commit is contained in:
@@ -101,7 +101,6 @@ app.config["TEMPLATES_AUTO_RELOAD"] = True
|
|||||||
# Printer connection
|
# Printer connection
|
||||||
# Uses the class defined in the printer.py file
|
# Uses the class defined in the printer.py file
|
||||||
printer = Printer(app, 0x04B8, 0x0E28)
|
printer = Printer(app, 0x04B8, 0x0E28)
|
||||||
printer.init_printer()
|
|
||||||
|
|
||||||
# Find out if we are running on a Raspberry Pi
|
# Find out if we are running on a Raspberry Pi
|
||||||
rpi = Raspberry(
|
rpi = Raspberry(
|
||||||
|
|||||||
153
src/printer.py
153
src/printer.py
@@ -4,15 +4,54 @@ This class manages connexion to a Printer
|
|||||||
# import brother_ql
|
# import brother_ql
|
||||||
from time import sleep
|
from time import sleep
|
||||||
import os.path
|
import os.path
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
from enum import Enum
|
||||||
|
import uuid
|
||||||
|
|
||||||
from PIL import Image, ImageEnhance
|
from PIL import Image, ImageEnhance
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
# Importing the module to manage the connection to the printer.
|
|
||||||
|
# Importing the modules needed for each supported printer Type
|
||||||
import escpos.printer
|
import escpos.printer
|
||||||
|
import brother_ql
|
||||||
|
|
||||||
|
class PrinterCapabilities(Enum):
|
||||||
|
"""
|
||||||
|
What are the capacities of a Printer ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class Printer():
|
class Printer(ABC):
|
||||||
|
"""
|
||||||
|
If it outputs printed paper and speaks like a printer, then it must be a printer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, app, vendor_id, device_id):
|
||||||
|
"""
|
||||||
|
We initialize a Printer via it's USB connexion, and generate a unique ID
|
||||||
|
"""
|
||||||
|
self.id = uuid.uuid4()
|
||||||
|
self.app = app
|
||||||
|
self.vendor_id = vendor_id
|
||||||
|
self.device_id = device_id
|
||||||
|
self.ready = False
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _has_paper(self) -> bool:
|
||||||
|
"""Check if the printer has papier"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _state(self) -> bool:
|
||||||
|
"""Reports the state of the Printer"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def print_task(self, task_type, data)-> None:
|
||||||
|
"""Takes a PrintTask and executes it"""
|
||||||
|
|
||||||
|
|
||||||
|
class EscPosPrinter(Printer):
|
||||||
"""
|
"""
|
||||||
# The connection is based on the ESC/POS library
|
# The connection is based on the ESC/POS library
|
||||||
|
|
||||||
@@ -29,47 +68,13 @@ class Printer():
|
|||||||
## Annonce readyness : return a positive pong message.
|
## Annonce readyness : return a positive pong message.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Is the printer ready to accept a new print ?
|
def __init__(self, app):
|
||||||
ready = False
|
super().__init__(app, vendor_id="",device_id="")
|
||||||
|
|
||||||
def __init__(self, app, device_id, vendor_id):
|
|
||||||
super().__init__()
|
|
||||||
self.app = app
|
|
||||||
self.ready = False
|
|
||||||
self.printer = None
|
self.printer = None
|
||||||
self.device_id = device_id
|
|
||||||
self.vendor_id = vendor_id
|
|
||||||
self.usb_args = {}
|
self.usb_args = {}
|
||||||
self.usb_args["idVendor"] = self.device_id
|
self.usb_args["idVendor"] = self.device_id
|
||||||
self.usb_args["idProduct"] = self.vendor_id
|
self.usb_args["idProduct"] = self.vendor_id
|
||||||
|
|
||||||
def check_paper(self) -> bool:
|
|
||||||
"""
|
|
||||||
On printers that support it, we check that the printer has paper
|
|
||||||
"""
|
|
||||||
self.app.logger.debug("Checking paper status...")
|
|
||||||
self.printer.open(self.usb_args)
|
|
||||||
status = self.printer.paper_status()
|
|
||||||
match status:
|
|
||||||
case 0:
|
|
||||||
self.app.logger.error("Printer has no more paper, aborting...")
|
|
||||||
self.printer.close()
|
|
||||||
raise RuntimeError("No more paper in the printer")
|
|
||||||
case 1:
|
|
||||||
self.app.logger.warning(
|
|
||||||
"Printer needs paper to be changed very soon ! "
|
|
||||||
)
|
|
||||||
self.printer.close()
|
|
||||||
case 2:
|
|
||||||
self.app.logger.debug("Printer has paper, good to go")
|
|
||||||
self.printer.close()
|
|
||||||
|
|
||||||
def init_printer(self):
|
|
||||||
"""
|
|
||||||
Check if the printer online ? Is the communication with the printer successfull ?
|
|
||||||
"""
|
|
||||||
|
|
||||||
# TODO: This could happen directly when creating a new Printer class
|
|
||||||
if os.getenv("FLASK_DEBUG"):
|
if os.getenv("FLASK_DEBUG"):
|
||||||
waiting_elapsed = 3
|
waiting_elapsed = 3
|
||||||
else:
|
else:
|
||||||
@@ -77,14 +82,15 @@ class Printer():
|
|||||||
|
|
||||||
self.app.logger.debug("Waiting for printer to get online...")
|
self.app.logger.debug("Waiting for printer to get online...")
|
||||||
|
|
||||||
while not self.ready:
|
online = False
|
||||||
|
while not online:
|
||||||
try:
|
try:
|
||||||
# This also calls open(), which we need to close()
|
# This also calls open(), which we need to close()
|
||||||
# or else the device will appear as busy.
|
# or else the device will appear as busy.
|
||||||
p = escpos.printer.Usb(
|
p = escpos.printer.Usb(
|
||||||
self.device_id, self.vendor_id, 0, profile="TM-P80"
|
self.device_id, self.vendor_id, 0, profile="TM-P80"
|
||||||
)
|
)
|
||||||
except RuntimeError as e:
|
except escpos.exceptions.DeviceNotFoundError as e:
|
||||||
self.app.logger.error(
|
self.app.logger.error(
|
||||||
"The USB device is not plugged in, trying again %s : %s",
|
"The USB device is not plugged in, trying again %s : %s",
|
||||||
waiting_elapsed,
|
waiting_elapsed,
|
||||||
@@ -93,9 +99,9 @@ class Printer():
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
if p.is_online():
|
if p.is_online():
|
||||||
self.ready = True
|
online = True
|
||||||
self.app.logger.debug("Printer online !")
|
self.app.logger.debug("Printer online !")
|
||||||
except RuntimeError as e:
|
except escpos.exceptions.DeviceNotFoundError as e:
|
||||||
self.app.logger.error(
|
self.app.logger.error(
|
||||||
"Error while getting the printer online %s : %s",
|
"Error while getting the printer online %s : %s",
|
||||||
waiting_elapsed,
|
waiting_elapsed,
|
||||||
@@ -109,7 +115,7 @@ class Printer():
|
|||||||
"Printer took more than 30 seconds to get online, aborting..."
|
"Printer took more than 30 seconds to get online, aborting..."
|
||||||
)
|
)
|
||||||
waiting_elapsed = 1 # Reset the waiting time for the next print.
|
waiting_elapsed = 1 # Reset the waiting time for the next print.
|
||||||
return False
|
raise RuntimeError("Could not get Printer %s online" % self.id)
|
||||||
|
|
||||||
# Setting up the printing options.
|
# Setting up the printing options.
|
||||||
p.set(
|
p.set(
|
||||||
@@ -130,14 +136,37 @@ class Printer():
|
|||||||
# Beware : if we print every time the printer becomes ready, it means
|
# Beware : if we print every time the printer becomes ready, it means
|
||||||
# we are printing before and after every print !
|
# we are printing before and after every print !
|
||||||
self.printer = p
|
self.printer = p
|
||||||
|
self.printer.close() # We close the connexion to the Printer
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._has_paper()
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
|
||||||
self.ready = True
|
self.ready = True
|
||||||
|
|
||||||
|
def _has_paper(self):
|
||||||
|
"""Check if the printer has paper left"""
|
||||||
|
self.app.logger.debug("Checking paper status...")
|
||||||
|
self.printer.open(self.usb_args)
|
||||||
|
status = self.printer.paper_status()
|
||||||
|
match status:
|
||||||
|
case 0:
|
||||||
|
self.app.logger.error("Printer has no more paper, aborting...")
|
||||||
|
self.printer.close()
|
||||||
|
raise RuntimeError("No more paper in the printer")
|
||||||
|
case 1:
|
||||||
|
self.app.logger.warning(
|
||||||
|
"Printer needs paper to be changed very soon ! "
|
||||||
|
)
|
||||||
|
self.printer.close()
|
||||||
|
return True
|
||||||
|
case 2:
|
||||||
|
self.app.logger.debug("Printer has paper, good to go")
|
||||||
self.printer.close()
|
self.printer.close()
|
||||||
|
|
||||||
self.check_paper()
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _print_sms(self, msg, signature="", bold=False):
|
def _print_txt(self, msg, signature="", bold=False):
|
||||||
|
|
||||||
if not isinstance(msg, str):
|
if not isinstance(msg, str):
|
||||||
self.app.logger.error(
|
self.app.logger.error(
|
||||||
@@ -255,14 +284,14 @@ class Printer():
|
|||||||
try:
|
try:
|
||||||
self.printer.open(self.usb_args)
|
self.printer.open(self.usb_args)
|
||||||
self.printer.qr(content, center=True)
|
self.printer.qr(content, center=True)
|
||||||
|
self.printer.textln(content, center=True)
|
||||||
self.printer.close()
|
self.printer.close()
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
self.printer.close()
|
self.printer.close()
|
||||||
self.app.logger.error(str(e))
|
self.app.logger.error(str(e))
|
||||||
return False
|
raise e
|
||||||
|
|
||||||
self.app.logger.info("Printed a QR")
|
self.app.logger.info("Printed a QR")
|
||||||
return True
|
|
||||||
|
|
||||||
def _cut(self):
|
def _cut(self):
|
||||||
try:
|
try:
|
||||||
@@ -277,21 +306,45 @@ class Printer():
|
|||||||
self.app.logger.info("Did a cut")
|
self.app.logger.info("Did a cut")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _state(self):
|
||||||
|
return self.printer.is_online() and self.ready and self._has_paper()
|
||||||
|
|
||||||
def print_task(self, task_type, data):
|
def print_task(self, task_type, data):
|
||||||
"""Execute actual print based on task type"""
|
"""Execute actual print based on task type"""
|
||||||
|
if self._state():
|
||||||
match (task_type.value):
|
match (task_type.value):
|
||||||
case "text":
|
case "text":
|
||||||
self._print_sms(data["txt"], signature=data["sign"])
|
self._print_txt(data["txt"], signature=data["sign"])
|
||||||
case "image":
|
case "image":
|
||||||
self._print_img(
|
self._print_img(
|
||||||
data["img"], signature=data["sign"], process=data["process"]
|
data["img"], signature=data["sign"], process=data["process"]
|
||||||
)
|
)
|
||||||
case "cut":
|
case "cut":
|
||||||
self._cut()
|
self._cut()
|
||||||
|
case "qr":
|
||||||
|
self._qr(data["txt"])
|
||||||
case _:
|
case _:
|
||||||
raise RuntimeError("This task type is not supported")
|
raise RuntimeError("This task type is not supported")
|
||||||
|
|
||||||
|
|
||||||
|
class BrotherPrinter(Printer):
|
||||||
|
"""
|
||||||
|
Manages connexion and capabilities of a BrotherQL Printer
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, app, vendor_id, device_id):
|
||||||
|
super().__init__(app, vendor_id="",device_id="")
|
||||||
|
self.printer = None
|
||||||
|
self.usb_args = {}
|
||||||
|
self.usb_args["idVendor"] = self.device_id
|
||||||
|
self.usb_args["idProduct"] = self.vendor_id
|
||||||
|
|
||||||
|
def _has_paper():
|
||||||
|
|
||||||
|
def _state():
|
||||||
|
|
||||||
|
def print_task():
|
||||||
|
|
||||||
def _process_image(self, path):
|
def _process_image(self, path):
|
||||||
brightness_factor = 1.5 # Used only if image is too dark
|
brightness_factor = 1.5 # Used only if image is too dark
|
||||||
brightness_threshold = 100 # Brightness threshold (0–255)
|
brightness_threshold = 100 # Brightness threshold (0–255)
|
||||||
|
|||||||
18
src/printers.py
Normal file
18
src/printers.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
"""
|
||||||
|
A collection of Printers.
|
||||||
|
|
||||||
|
It has methods to discover printers, and provides an interface for the methods expected from printers.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from printer import Printer
|
||||||
|
|
||||||
|
class Printers():
|
||||||
|
"""
|
||||||
|
A collection of Printers
|
||||||
|
"""
|
||||||
|
def __init__(self, name, age):
|
||||||
|
self.name = name
|
||||||
|
self.age = age
|
||||||
|
|
||||||
|
def _discover_printers(self):
|
||||||
|
pass
|
||||||
Reference in New Issue
Block a user