diff --git a/configuration/config.toml b/configuration/config.toml index cf299c7..31f6e5f 100644 --- a/configuration/config.toml +++ b/configuration/config.toml @@ -5,8 +5,6 @@ signature = "Anonymous" # Printer settings [printer] -vendor_id = 0x04b8 -device_id = 0x0e28 upload_folder = "src/static/uploads" # Raspberry Pi Configuration diff --git a/src/main.py b/src/main.py index e68399a..4a0645b 100644 --- a/src/main.py +++ b/src/main.py @@ -36,7 +36,6 @@ import werkzeug.exceptions from flask_socketio import SocketIO from flask_limiter import Limiter from flask_limiter.util import get_remote_address -from printer import Printer # The wrapper for the printer class from raspberry import Raspberry # The Raspberry pi control Class from web import Web # Wrapper for the web routes and API from print_queue import PrintQueue @@ -73,11 +72,7 @@ except OSError as e: app.logger.debug("Config file loaded !") -# Define the USB connections here. -vendor_id = configuration_file["printer"]["vendor_id"] -device_id = configuration_file["printer"]["device_id"] UPLOAD_FOLDER = str(configuration_file["printer"]["upload_folder"]) - try: os.mkdir(UPLOAD_FOLDER) app.logger.debug("Directory %s created successfully.", UPLOAD_FOLDER) @@ -88,49 +83,48 @@ except PermissionError: sys.exit(77) # Output the config file -if os.getenv("FLASK_DEBUG"): +if not os.getenv("FLASK_DEBUG") is None and os.getenv("FLASK_DEBUG") is True: pprint.pprint(configuration_file) # We define the app module used by Flask app.secret_key = configuration_file["secrets"]["flask_secret_key"] app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER app.config["ALLOWED_EXTENSIONS"] = ALLOWED_EXTENSIONS -app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 3Mb for a file upload +app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 10Mb for a file upload app.config["TEMPLATES_AUTO_RELOAD"] = True -# Printer connection -# Uses the class defined in the printer.py file -printer = Printer(app, 0x04B8, 0x0E28) -printer.init_printer() - -# Find out if we are running on a Raspberry Pi -rpi = Raspberry( - printer, - app, - socketio, - configuration_file["rpi"]["button_gpio_port_number"], - configuration_file["rpi"]["indicator_gpio_port_number"], - configuration_file["rpi"]["flash_gpio_port_number"], - configuration_file["rpi"]["flash"], -) - -RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi() - # Queue creation print_queue = PrintQueue(app) +# Find out if we are running on a Raspberry Pi +rpi = Raspberry( + print_queue, + app, + configuration_file +) +RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi() + + # Web & API management web = Web(app, print_queue) # Start worker thread -worker = PrintWorker(app, print_queue, printer, socketio) -worker.start() +# When created, the worker will try to find printers connected to the system +try: + worker = PrintWorker(app, print_queue) + worker.start() +except Exception as e: + app.logger.error("Could not start the worker because %s ", str(e)) + sys.exit(-1) # The rate limit limiter = Limiter( get_remote_address, app=app, default_limits=["1500 per day", "500 per hour"] ) + +app.logger.info("🖶 Welcome to LittlePrynter !") + # General routes @app.route("/") @limiter.limit("1/second", override_defaults=False) @@ -197,6 +191,9 @@ def web_print_img(): "No signature found for this print, using default signature : %s", str(e) ) sign = configuration_file["defaults"]["signature"] + except werkzeug.exceptions.RequestEntityTooLarge as e: + flash("Whoops, image is too big: " + str(e), "error") + return redirect(url_for("index")) # check if the post request has the file part if "img" not in request.files: @@ -304,6 +301,7 @@ def api_print_image(): return "OK", 200 +# TODO: This might not depend on the Raspberry Pi @app.route("/api/camera/picture", methods=["GET"]) def camera_picture(): """Returns a picture taken by the camera on a raspberry pi""" @@ -322,6 +320,12 @@ def api_queue_status(): return jsonify(web.get_queue_state()) +@app.route("/api/queue/completed", methods=["GET"]) +def api_queue_completed(): + """API endpoint that returns the finished tasks""" + return jsonify(web.get_queue_completed()) + + @app.route("/api/worker", methods=["GET"]) def api_worker_state(): """API endpoint to get the worker state""" @@ -403,4 +407,4 @@ def camera_status(): if __name__ == "__main__": - app.run(debug=True, use_reloader=False, host="0.0.0.0", ssl_context="adhoc") + app.run(use_reloader=False, host="0.0.0.0", ssl_context="adhoc") diff --git a/src/print_queue.py b/src/print_queue.py index 766a322..38e3c3e 100644 --- a/src/print_queue.py +++ b/src/print_queue.py @@ -76,7 +76,16 @@ class PrintQueue: """Return current queue state""" with self._lock: self.app.logger.debug("Return current queue state") - return [{"task_id": t.task_id, "status": t.status} for t in self._queue] + return [ + {"task_id": t.task_id, "status": t.status, "type": str(t.task_type), "content": str(t.get_print_data())} + for t in self._queue + ] + + def get_queue_completed(self): + """Return completed queue elements""" + with self._lock: + self.app.logger.debug("Return completed queue elements") + return self._completed_tasks def get_status(self, task_id): """Get full status info for a task""" diff --git a/src/printer.py b/src/printer.py index 66a781f..3a886b3 100644 --- a/src/printer.py +++ b/src/printer.py @@ -1,115 +1,129 @@ """ This class manages connexion to a Printer """ -# import brother_ql -from time import sleep -import os.path +import os.path +import os +from abc import ABC, abstractmethod +from dataclasses import dataclass +import time +from enum import Enum +import uuid +import threading +import usb.core from PIL import Image, ImageEnhance import numpy as np -# Importing the module to manage the connection to the printer. +# Importing the modules needed for each supported printer Type import escpos.printer +from brother_ql.models import ModelsManager +from brother_ql.backends import backend_factory +from brother_ql.raster import BrotherQLRaster +from brother_ql.conversion import convert +from brother_ql.backends.helpers import send -class Printer(): +class PrinterType(Enum): """ - # The connection is based on the ESC/POS library - - ## Connection to the USB printer - - ## Making sure the printer is alive - - ## Making sure it has paper - - ## Define default print settings - - ## Print starting message, log time of first print, cut. - - ## Annonce readyness : return a positive pong message. + What are the capacities of a Printer ? """ - # Is the printer ready to accept a new print ? - ready = False + EPSON = "epson" + BROTHER = "brother" - def __init__(self, app, device_id, vendor_id): - super().__init__() + +# For Brother-QL Printers +@dataclass +class PrinterInfo: + """ + Brother-QL printer information + """ + identifier: str + backend: str + protocol: str + vendor_id: str + product_id: str + serial_number: str + name: str = "Brother QL Printer" + model: str = "QL-570" + status: str = "unknown" + label_type: str = "unknown" + label_size: str = "unknown" + label_width: int = 0 + label_height: int = 0 + + def __getitem__(self, item): + return getattr(self, item) + + def __setitem__(self, key, value): + setattr(self, key, value) + + +class Printer(ABC): + """ + If it outputs printed paper and speaks like a printer, then it must be a printer. + """ + + def __init__(self, app, vendor_id, device_id, printer_type: PrinterType): + """ + We initialize a Printer via it's USB connexion, and generate a unique ID + """ + self.id = uuid.uuid4() self.app = app - self.ready = False - self.printer = None - self.device_id = device_id self.vendor_id = vendor_id + self.device_id = device_id + self.ready = False + self.printer_type = printer_type + self._lock = threading.Lock() + + @abstractmethod + def _has_paper(self) -> bool: + """Check if the printer has papier""" + + @abstractmethod + def _state(self) -> bool: + """Reports the state of the Printer""" + + @abstractmethod + def print_task(self, task_type, data) -> None: + """Takes a PrintTask and executes it""" + + +class EscPosPrinter(Printer): + """ + Create a new ESC/POS based printer. + """ + + def __init__(self, app, vendor_id, device_id): + """ + Create a connexion to a ESC/POS Printer via USB, + Making sure the printer is alive, + Making sure it has paper, + Define default print settings + """ + super().__init__(app, vendor_id, device_id, printer_type=PrinterType.EPSON) + self.printer = None self.usb_args = {} - self.usb_args["idVendor"] = self.device_id - self.usb_args["idProduct"] = self.vendor_id + self.usb_args["idVendor"] = self.vendor_id + self.usb_args["idProduct"] = self.device_id - def check_paper(self) -> bool: - """ - On printers that support it, we check that the printer has paper - """ - self.app.logger.debug("Checking paper status...") - self.printer.open(self.usb_args) - status = self.printer.paper_status() - match status: - case 0: - self.app.logger.error("Printer has no more paper, aborting...") - self.printer.close() - raise RuntimeError("No more paper in the printer") - case 1: - self.app.logger.warning( - "Printer needs paper to be changed very soon ! " - ) - self.printer.close() - case 2: - self.app.logger.debug("Printer has paper, good to go") - self.printer.close() + try: + # This also calls open(), which we need to close() + # or else the device will appear as busy. + p = escpos.printer.Usb(self.vendor_id, self.device_id, 0, profile="TM-P80") + except escpos.exceptions.DeviceNotFoundError as e: + self.app.logger.error( + "The USB device is not plugged in : %s", + str(e), + ) + except Exception as e: + self.app.logger.error("Printer could not be connected : %s ", str(e)) - def init_printer(self): - """ - Check if the printer online ? Is the communication with the printer successfull ? - """ - - # TODO: This could happen directly when creating a new Printer class - if os.getenv("FLASK_DEBUG"): - waiting_elapsed = 3 - else: - waiting_elapsed = 10 - - self.app.logger.debug("Waiting for printer to get online...") - - while not self.ready: - try: - # This also calls open(), which we need to close() - # or else the device will appear as busy. - p = escpos.printer.Usb( - self.device_id, self.vendor_id, 0, profile="TM-P80" - ) - except RuntimeError as e: - self.app.logger.error( - "The USB device is not plugged in, trying again %s : %s", - waiting_elapsed, - str(e), - ) - - try: - if p.is_online(): - self.ready = True - self.app.logger.debug("Printer online !") - except RuntimeError as e: - self.app.logger.error( - "Error while getting the printer online %s : %s", - waiting_elapsed, - str(e), - ) - - sleep(1) - waiting_elapsed -= 1 - if waiting_elapsed < 1: - self.app.logger.error( - "Printer took more than 30 seconds to get online, aborting..." - ) - waiting_elapsed = 1 # Reset the waiting time for the next print. - return False + try: + if p.is_online(): + self.app.logger.debug("Printer online !") + except Exception as e: + raise e # Setting up the printing options. p.set( @@ -130,15 +144,38 @@ class Printer(): # Beware : if we print every time the printer becomes ready, it means # we are printing before and after every print ! self.printer = p + self.printer.close() # We close the connexion to the Printer + + try: + self._has_paper() + except Exception as e: + raise e + self.ready = True - self.printer.close() - self.check_paper() - - return True - - def _print_sms(self, msg, signature="", bold=False): + def _has_paper(self): + """Check if the printer has paper left""" + self.app.logger.debug("Checking paper status...") + self.printer.open(self.usb_args) + status = self.printer.paper_status() + match status: + case 0: + self.app.logger.error("Printer has no more paper, aborting...") + self.printer.close() + raise RuntimeError("No more paper in the printer") + case 1: + self.app.logger.warning( + "Printer needs paper to be changed very soon ! " + ) + self.printer.close() + return True + case 2: + self.app.logger.debug("Printer has paper, good to go") + self.printer.close() + return True + def _print_txt(self, msg, signature="", bold=False): + self.ready = False if not isinstance(msg, str): self.app.logger.error( "It is not possible to print a " + str(type(msg)) + ", only strings." @@ -178,6 +215,7 @@ class Printer(): self.printer.textln(clean_msg) if clean_signature: self.printer.textln(clean_signature) + self.printer.textln() self.printer.close() except Exception as e: self.app.logger.error("Unable to print because : " + str(e)) @@ -186,9 +224,10 @@ class Printer(): ) from e self.app.logger.info("Printed text") - return True + self.ready = True def _print_img(self, path, signature="", center=True, process=False): + self.ready = False clean_signature = str(signature) if len(signature) > 256: @@ -214,30 +253,33 @@ class Printer(): if process: try: self.app.logger.debug("Proccessing the image") - path = _process_image(self, path) + processed_path = _process_image(self, path) except RuntimeError as e: self.app.logger.error( "Error while processing the image, aborting print : %s", str(e) ) raise e else: + processed_path = path self.app.logger.warning("Not proccessing the image") try: self.printer.open(self.usb_args) - self.printer.image(path, center=center) + self.printer.image(processed_path, center=center) self.printer.textln(signature) self.printer.close() - self.app.logger.debug("Printed an image : " + str(path)) + self.app.logger.debug("Printed an image : " + str(processed_path)) except Exception as e: self.app.logger.error(str(e)) raise RuntimeError("Could not print the picture") from e finally: try: os.remove(path) + os.remove(processed_path) except OSError as e: raise e + self.app.logger.debug("Removed image : " + str(processed_path)) self.app.logger.debug("Removed image : " + str(path)) try: @@ -249,22 +291,25 @@ class Printer(): raise RuntimeError("Could not close the printer connexion. ") from e self.app.logger.info("Printed a picture") - return True + self.ready = True def _qr(self, content): + self.ready = False try: self.printer.open(self.usb_args) self.printer.qr(content, center=True) + self.printer.textln(content) self.printer.close() except RuntimeError as e: self.printer.close() self.app.logger.error(str(e)) - return False + raise e self.app.logger.info("Printed a QR") - return True + self.ready = True def _cut(self): + self.ready = False try: self.printer.open(self.usb_args) self.printer.cut() @@ -275,34 +320,236 @@ class Printer(): raise e self.app.logger.info("Did a cut") - return True + self.ready = True + + def _state(self) -> bool: + has_paper = self._has_paper() + is_ready = self.ready + + self.app.logger.debug("Has paper : %s " , has_paper ) + self.app.logger.debug("Ready : %s " , is_ready ) + + return is_ready and has_paper # and is_online def print_task(self, task_type, data): """Execute actual print based on task type""" - match (task_type.value): - case "text": - self._print_sms(data["txt"], signature=data["sign"]) - case "image": - self._print_img( - data["img"], signature=data["sign"], process=data["process"] + + + with self._lock: + self.app.logger.debug("Acquired lock to start print") + + i_m_ready = self._state() + while not i_m_ready: + self.app.logger.debug("Waiting for the printer to become ready, current state %s ", str(i_m_ready)) + i_m_ready = self._state() + time.sleep(0.3) + + self.app.logger.debug("Checked state to start printing : %s", self._state()) + self.ready = False + try: + self.app.logger.debug("Checking task type") + match (task_type.value): + case "text": + + self._print_txt(data["txt"], signature=data["sign"]) + self.ready = True + case "image": + self._print_img( + data["img"], signature=data["sign"], process=data["process"] + ) + self.ready = True + case "cut": + self._cut() + self.ready = True + case "qr": + self._qr(data["txt"]) + self.ready = True + case _: + raise RuntimeError("This task type is not supported") + except Exception as e: + self.app.logger.debug("Exception occured while printing %s", str(e)) + self.ready = True + raise RuntimeError from e + + +class BrotherPrinter(Printer): + """ + Manages connexion and capabilities of a BrotherQL Printer + """ + + def __init__(self, app, vendor_id, device_id): + super().__init__( + app, vendor_id="", device_id="", printer_type=PrinterType.BROTHER + ) + self.printer = None + self.usb_args = {} + self.usb_args["idVendor"] = self.device_id + self.usb_args["idProduct"] = self.vendor_id + self.model_manager = ModelsManager() + + # Code taken from https://github.com/5shekel/printit/blob/master/printer_utils.py + + backend = backend_factory("pyusb") + available_devices = backend["list_available_devices"]() + + for printer in available_devices: + self.app.logger.debug(f"Found device: {printer}") + identifier = printer["identifier"] + parts = identifier.split("/") + + if len(parts) < 4: + self.app.logger.warning( + f"Skipping device with invalid identifier format: {identifier}" ) - case "cut": - self._cut() - case _: - raise RuntimeError("This task type is not supported") + continue + + protocol = parts[0] + # device_info = parts[2] + serial_number = parts[3] + + try: + product_id_int = int(self.device_id, 16) + for m in self.model_manager.iter_elements(): + if m.product_id == product_id_int: + model = m.identifier + break + self.app.logger.debug(f"Matched printer model: {model}") + except ValueError: + self.app.logger.warning(f"Invalid product ID format: {m.product_id}") + + self.printer_info = PrinterInfo( + identifier=identifier, + backend="pyusb", + model=model, + protocol=protocol, + vendor_id=vendor_id, + product_id=self.device_id, + serial_number=serial_number, + ) + + self.ready = True + + def _has_paper(self): + raise NotImplementedError("This printer model does not support this.") + + def _state(self): + return self.ready + + def _print_img(self, data): + """ + Print a raster image via a Brother QL printer + """ + self.ready = False + label_type = "102" + rotate = 0 + dither = False + try: + # Prepare the image for printing + qlr = BrotherQLRaster(self.printer_info["model"]) + + instructions = convert( + qlr=qlr, + images=[data["img"]], + label=label_type, + rotate=rotate, + threshold=70, + dither=dither, + compress=True, + red=False, + dpi_600=False, + hq=False, + cut=True, + ) + + # Debug logging + if os.getenv("FLASK_DEBUG"): + self.app.logger.debug(f""" + Print parameters: + - Label type: {label_type} + - Rotate: {rotate} + - Dither: {dither} + - Model: {self.printer_info['model']} + - Backend: {self.printer_info['backend']} + - Identifier: {self.printer_info['identifier']} + """) + + # Try to print using Python API + # send() = status = { + # 'instructions_sent': True, # The instructions were sent to the printer. + # 'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error' + # 'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it. + # 'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability). + # 'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown. + # } + status = send( + instructions=instructions, + printer_identifier=self.printer_info["identifier"], + backend_identifier="pyusb", + ) + + if ( + not status["did_print"] + or status["outcome"] == "error" + or status["outcome"] == "unknown" + ): + raise RuntimeError("Failed to print using Python API") + + if status["printer_state"]: + self.ready = bool(status["printer_state"]) + else: + self.ready = True + + except usb.core.USBError as e: + # Treat timeout errors as successful since they often occur after print completion + if e.errno == 110: # Operation timed out + self.app.logger.debug( + "USB timeout occurred - this is normal and the print likely completed" + ) + self.app.logger.debug("Print completed (timeout is normal)") + self.ready = True + + error_msg = f"USBError encountered: {e}" + self.app.logger.debug(error_msg) + raise RuntimeError from e + + except Exception as e: + error_msg = f"Unexpected error during printing: {str(e)}" + self.app.logger.debug(error_msg) + raise RuntimeError from e + + def print_task(self, task_type, data): + """Execute actual print based on task type""" + with self._lock: + if self._state: + self._state = False + match (task_type.value): + case "image": + self._print_img(data["img"]) + self._state = True + case "cut": + # The cut happens by default on Brother QL printers. + self._state = True + case _: + raise RuntimeError("This task type is not supported") + else: + raise RuntimeError("The printer is not ready to print yet !") + + # These values are by default for now + + # raise NotImplementedError("This printer type is not implemented yet") def _process_image(self, path): brightness_factor = 1.5 # Used only if image is too dark brightness_threshold = 100 # Brightness threshold (0–255) - contrast_factor = 0.6 # Less than 1.0 = lower contrast + contrast_factor = 2 # Less than 1.0 = lower contrast max_width = 575 max_height = 1000 with Image.open(path) as original_img: # Convert to RGB if needed (JPEG doesn't support alpha) if original_img.mode in ("RGBA", "P"): - self.app.logger.debug("Converting the image to RGB from RGBA") + self.app.logger.debug("Converting the image from RGBA to RGBA") original_img = original_img.convert("RGB") # Resize while maintaining aspect ratio @@ -332,16 +579,19 @@ def _process_image(self, path): self.app.logger.debug( f"Image too dark, increasing brightness by a factor of {brightness_factor:.2f}" ) - enhancer = ImageEnhance.Brightness(original_img) - original_img = enhancer.enhance(brightness_factor) + enhancer = ImageEnhance.Brightness(grayscale) + grayscale = enhancer.enhance(brightness_factor) - # # Reduce contrast - # contrast_enhancer = ImageEnhance.Contrast(original_img) - # original_img = contrast_enhancer.enhance(contrast_factor) + # Computer current contrast of grayscale image + contrast = np.clip(np.std(np.array(grayscale)), 0, 255) + self.app.logger.debug("Standard deviation of the contrast : %s", contrast) + # # Enhance contrast + contrast_enhancer = ImageEnhance.Contrast(grayscale) + original_img = contrast_enhancer.enhance(contrast_factor) # Convert to JPEG and save jpeg_path = os.path.splitext(path)[0] + "_processed.jpg" - original_img.save(jpeg_path, format="JPEG", quality=95, optimize=True) + grayscale.save(jpeg_path, format="JPEG", quality=95, optimize=True) self.app.logger.debug("Processed and saved image.") return jpeg_path diff --git a/src/printers.py b/src/printers.py new file mode 100644 index 0000000..7e21317 --- /dev/null +++ b/src/printers.py @@ -0,0 +1,160 @@ +""" +A collection of Printers. + +It has methods to discover printers, and provides an interface for +the methods expected from printers. +""" + +from collections.abc import Set +import usb.core +import usb.util +from printer import Printer, EscPosPrinter, BrotherPrinter + + +class Printers: + """ + Finds and creates a set of Printer that can be used by the Workers to print. + """ + + def __init__(self, app): + """ + Discover printers connected to the computer and return a Collection of Printer() + """ + self.app = app + self.printers = self._discover_printers() + + def _discover_printers(self) -> Set[Printer]: + """ + Gets connected USB printer devices using the pyusb library. + + We analyse the USB devices, get the ones that match + the printer class ( 7 ) and for Brother and EPSON printers, + try to create a Printer object that can be used by the Worker class + to execute prints. + + Returns a set of Printer + """ + + self.app.logger.debug("Discovering USB Devices connected to this system") + + printers = set() + + # Find all connected USB devices + devices = usb.core.find(find_all=True, custom_match=_FindClass(7)) + if not devices: + self.app.logger.warning( + "No USB devices of class 7 ( printers ) found or pyusb could not access the bus." + ) + raise RuntimeError( + "No USB devices of class 7 ( printers ) found or pyusb could not access the bus." + ) + + for dev in devices: + # Attempt to get the manufacturer and product strings + try: + manufacturer = usb.util.get_string(dev, dev.iManufacturer) + except Exception: + manufacturer = "Unknown" + + try: + product = usb.util.get_string(dev, dev.iProduct) + except Exception: + product = "Unknown" + self.app.logger.debug( + "Looking at %s %s (%s:%s)", + manufacturer, + product, + hex(dev.idVendor), + hex(dev.idProduct), + ) + + if manufacturer == "EPSON": + try: + # We create a new EscPosPrinter() + self.app.logger.debug("Trying to creat a new EPSON printer") + prid = dev.idProduct + vendir = dev.idVendor + escpos_printer = EscPosPrinter( + self.app, vendor_id=vendir, device_id=prid + ) + except Exception as e: + raise e + + # If the object creation is successfull, we add it to the list of Printers + printers.add(escpos_printer) + self.app.logger.debug("Found a %s printer", manufacturer) + + # We already found the type of printer, + # we don't need an extra comparaison. + continue + + # or a Brother Printer + if manufacturer == "Brother": + try: + # We create a new BrotherPrinter() + self.app.logger.debug("Trying to creat a new BROTHER printer") + prid = dev.idProduct + vendir = dev.idVendor + brother_printer = BrotherPrinter( + self.app, vendor_id=vendir, device_id=prid + ) + except Exception as e: + self.app.logger.error( + "Could not create a %s printer class with %s:%s" , product, + dev.idVendor, + dev.idProduct, + ) + raise e + + # If the object creation is successfull, we add it to the list of Printers + printers.add(brother_printer) + self.app.logger.debug("Found a %s printer" , manufacturer) + + self.app.logger.debug("Found %s printers" , len(printers)) + + if len(printers) < 1: + self.app.logger.warning("Not printers found ! Please plug in a Printer and restart the program.") + raise RuntimeError("No printers found") + + return printers + + def any(self) -> Printer: + """ + Return a dict key: UUID, value: Printer, with any connected printer. + """ + if len(self.printers) > 0: + for i in self.printers: + return i + else: + raise RuntimeError("No printers available") + + # def get_printer(self, printer_type): + # """ + # Return a specific printer + + # printer_type -- a printer type + # """ + # return NotImplementedError() + + +class _FindClass: + """ + Use by usb.core to modify the way USB devices are found + Taken from pyUSB documentation on Github + """ + def __init__(self, class_): + self._class = class_ + + def __call__(self, device): + # first, let's check the device + if device.bDeviceClass == self._class: + return True + # ok, transverse all devices to find an + # interface that matches our class + for cfg in device: + # find_descriptor: what's it? + intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class) + if intf is not None: + return True + + return False diff --git a/src/raspberry.py b/src/raspberry.py index a75b401..fa8553c 100644 --- a/src/raspberry.py +++ b/src/raspberry.py @@ -10,12 +10,12 @@ import io # To check if we are on a Raspberry Pi import subprocess import os from time import sleep, gmtime, strftime -from flask_socketio import SocketIO from gpiozero import Button, LED, DigitalOutputDevice from PIL import Image from task import TextTask, ImageTask, CutTask -class Raspberry(): + +class Raspberry: """ This class will manage three things : - Connecting to a USB webcam @@ -23,29 +23,22 @@ class Raspberry(): - Activating a flash ( or light ) - Flash an indicator light + # pylint: disable=too-many-instance-attributes + # dede """ def __init__( self, print_queue, app, - socketio, - button_gpio_port_number, - indicator_gpio_port_number, - flash_gpio_port_number, - is_flash_present, + configuration_file ): self.print_queue = print_queue - self.socketio = socketio self.app = app - - self.flash_gpio = flash_gpio_port_number - self.is_flash_present = is_flash_present - self.button_gpio = button_gpio_port_number - self.led_gpio = indicator_gpio_port_number + self.configuration_file = configuration_file self.image_path = self.app.config["UPLOAD_FOLDER"] + "/image.jpg" - def is_raspberry_pi(self, raise_on_errors=False): + def is_raspberry_pi(self): """ Checking if we are on a Raspberry Pi by checking information on the /proc/cpuinfo file @@ -72,9 +65,10 @@ class Raspberry(): return False if not found: - self.app.logger.error( - "Couldn't get sufficient hardware information from /proc/cpuinfo, Unable to determine if we are on a Raspberry Pi." + self.app.logger.warning( + "Couldn't get sufficient hardware information from /proc/cpuinfo" ) + self.app.logger.warning("Unable to determine if we are on a Raspberry Pi.") return False except IOError: self.app.logger.error("Unable to open `/proc/cpuinfo`.") @@ -83,28 +77,38 @@ class Raspberry(): self.app.logger.debug("It seems we are on a Raspberry Pi") try: - self.initialise_gpio() + self._initialise_gpio() except Exception as e: self.app.logger.debug("Could not init GPIO : " + str(e)) raise e return True - def initialise_gpio(self): + def _initialise_gpio(self): + """ + Set GPIO ports from configuration and activate them + to show the user it's working. + """ self.app.logger.debug("Initializing GPIO") - self.led = LED(self.led_gpio) + self.led = LED(self.configuration_file["rpi"]["indicator_gpio_port_number"]) self.app.logger.debug("Activated indicator LED") self.indicator_countdown(iters=3) - self.button = Button(self.button_gpio, pull_up=True, bounce_time=0.1) + self.button = Button( + self.configuration_file["rpi"]["button_gpio_port_number"], + pull_up=True, bounce_time=0.1 + ) self.button.when_pressed = self.on_button_pressed self.app.logger.debug("Activated button") # The "flash" is a relay-controlled device ( light bulb for example ) - self.flash = DigitalOutputDevice(self.flash_gpio) + self.flash = DigitalOutputDevice(self.configuration_file["rpi"]["flash_gpio_port_number"]) self.flash_toggle() self.app.logger.debug("Activated flash") def indicator_countdown(self, iters=10, multi=10): + """ + Activates the LED faster and faster to show a countdown + """ for i in range(iters, 0, -1): self.led.on() sleep(i / multi) @@ -112,7 +116,10 @@ class Raspberry(): sleep(i / multi) def indicator_led(self, timing=0.2, l=5): - for i in range(l): + """ + Turns on the indicator LED for a certain period of time + """ + for _ in range(l): self.app.logger.debug("LED turned on") self.led.on() sleep(timing) @@ -121,6 +128,9 @@ class Raspberry(): sleep(timing) def flash_toggle(self): + """ + Flashes the flash + """ self.app.logger.debug("Flash turned on") self.flash.on() sleep(0.3) @@ -128,6 +138,9 @@ class Raspberry(): self.app.logger.debug("Flash turned off") def take_picture(self): + """ + Takes a picture via the USB webcam + """ # Validate if the image path is valid if not os.path.isdir(os.path.dirname(self.image_path)): self.app.logger.error( @@ -173,6 +186,9 @@ class Raspberry(): position="bottom_right", margin=10, ): + """ + Takes an image and overlays it with a another picture. + """ try: image = Image.open(image_path).convert("RGBA") logo = Image.open(logo_path).convert("RGBA") @@ -199,7 +215,9 @@ class Raspberry(): y = image.height - logo.height - margin else: raise ValueError( - "Invalid position. Choose from 'bottom_right', 'top_left', 'top_right', or 'bottom_left'." + "Invalid position." + + "Choose from 'bottom_right', 'top_left', " + + " 'top_right', or 'bottom_left'." ) # Composite the logo onto the image @@ -217,6 +235,9 @@ class Raspberry(): return True def crop_to_square(self, image_path, output_path=None): + """ + Crop an image so that it becomes a square + """ try: image = Image.open(image_path) width, height = image.size @@ -244,6 +265,10 @@ class Raspberry(): return True def on_button_pressed(self): + """ + When a button press is detected, a picture is taken from the webcam + and added to the print queue. + """ self.app.logger.debug("Button has been pressed") self.led.on() self.app.logger.debug("Counting down") @@ -263,8 +288,10 @@ class Raspberry(): self.app.logger.debug("Printing picture") self.led.on() self.crop_to_square(self.image_path) - self.print_queue.enqueue(ImageTask(self.image_path,signature="",process=True)) - self.print_queue.enqueue(TextTask(content="Imprimé par LittlePrynter", signature="")) + self.print_queue.enqueue(ImageTask(self.image_path, signature="", process=True)) + self.print_queue.enqueue( + TextTask(content="Imprimé par LittlePrynter", signature="") + ) time = strftime("%Y-%m-%d %H:%M", gmtime()) self.print_queue.enqueue(TextTask(content=time, signature="")) self.print_queue.enqueue(CutTask()) diff --git a/src/task.py b/src/task.py index 2962618..5b45920 100644 --- a/src/task.py +++ b/src/task.py @@ -14,6 +14,7 @@ to print at the same time. We can also delay and store printing tasks until a printer becomes available if none is online. """ + from abc import ABC, abstractmethod ## See https://docs.python.org/3/library/abc.html to learn more about this @@ -27,9 +28,11 @@ class TaskType(Enum): """ The different tasks supported by the printers """ + TEXT = "text" IMAGE = "image" CUT = "cut" + QR = "qr" class PrintTask(ABC): @@ -37,18 +40,15 @@ class PrintTask(ABC): A print task holds information about what we are looking to print. """ - def __init__(self, task_type): + def __init__(self, task_type: TaskType): self.task_id = self._generate_id() self.task_type = task_type self.status = "pending" # pending, processing, completed, failed - print("Created a new " + str(self.task_type) + " with ID " + self.task_id) - @abstractmethod def get_print_data(self): """Return data formatted for printer""" - def _generate_id(self): # Generate unique task ID return str(uuid.uuid4()) @@ -68,6 +68,18 @@ class TextTask(PrintTask): return {"txt": self.content, "sign": self.signature} +class QRTask(TextTask): + """This task prints a QR-Code, the signature is ignore and is always the content itself""" + + def __init__(self, content): + super().__init__(content, signature="") + self.content = content + self.signature = content + + def get_print_data(self): + return {"txt": self.content, "sign": self.signature} + + class ImageTask(PrintTask): """ This tasks represents a image content ( in the form of it's path ), and it's signature. diff --git a/src/templates/index.html b/src/templates/index.html index 1eeb467..2f2b0a8 100644 --- a/src/templates/index.html +++ b/src/templates/index.html @@ -6,7 +6,8 @@