Restructure main class to activate worker and use tasks, print queue,

update Printer
This commit is contained in:
n07070
2026-05-21 02:34:12 +02:00
parent b48e7072bf
commit f52d7493c8
2 changed files with 94 additions and 39 deletions

View File

@@ -23,6 +23,7 @@ import sys
import os # For VARS from the shell. import os # For VARS from the shell.
import pprint # To pretty print JSON import pprint # To pretty print JSON
import toml # Used for the config file parsing import toml # Used for the config file parsing
import threading
from flask import ( from flask import (
Flask, Flask,
request, request,
@@ -39,11 +40,15 @@ from flask_limiter.util import get_remote_address
from printer import Printer # The wrapper for the printer class from printer import Printer # The wrapper for the printer class
from raspberry import Raspberry # The Raspberry pi control Class from raspberry import Raspberry # The Raspberry pi control Class
from web import Web # Wrapper for the web routes and API from web import Web # Wrapper for the web routes and API
from print_queue import PrintQueue
from worker import PrintWorker
# Variables
# We create the main Flask object
app = Flask(__name__) app = Flask(__name__)
socketio = SocketIO(app) socketio = SocketIO(app, cors_allowed_origins="*")
# Global variables
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"} ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
# Load the configuration file # Load the configuration file
@@ -116,18 +121,21 @@ rpi = Raspberry(
RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi() RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi()
# Queue creation
print_queue = PrintQueue(app)
# Web & API routes # Web & API routes
web = Web(app, print_queue)
web = Web(app, printer) # Start worker thread
worker = PrintWorker(app, print_queue, printer, socketio)
if __name__ == "__main__": worker.start()
app.run(debug=True, use_reloader=False, host="0.0.0.0", ssl_context="adhoc")
limiter = Limiter( limiter = Limiter(
get_remote_address, app=app, default_limits=["1500 per day", "500 per hour"] get_remote_address, app=app, default_limits=["1500 per day", "500 per hour"]
) )
# General routes
@app.route("/") @app.route("/")
@limiter.limit("1/second", override_defaults=False) @limiter.limit("1/second", override_defaults=False)
@@ -144,6 +152,8 @@ def webcam():
app.logger.debug("Loading webcam interface") app.logger.debug("Loading webcam interface")
return render_template("webcam.html") return render_template("webcam.html")
# Form treatement
@app.route("/web/print/sms", methods=["POST"]) @app.route("/web/print/sms", methods=["POST"])
@limiter.limit("6/minute", override_defaults=False) @limiter.limit("6/minute", override_defaults=False)
def web_print_sms(): def web_print_sms():
@@ -299,7 +309,7 @@ def api_print_image():
@app.route("/api/camera/picture", methods=["GET"]) @app.route("/api/camera/picture", methods=["GET"])
def camera_picture(): def camera_picture():
"""Returns a picture taken by the camera""" """Returns a picture taken by the camera on a raspberry pi"""
if RASPBERRY_PI_CONNECTED: if RASPBERRY_PI_CONNECTED:
try: try:
return rpi.camera_picture() return rpi.camera_picture()
@@ -308,6 +318,26 @@ def camera_picture():
else: else:
return jsonify({"message": "No camera present"}), 500 return jsonify({"message": "No camera present"}), 500
@app.route('/api/queue', methods=["GET"])
def api_queue_status():
"""API endpoint for entire queue"""
return jsonify(web.get_queue_state())
@app.route('/api/worker', methods=["GET"])
def api_worker_state():
"""API endpoint to get the worker state"""
return jsonify(worker.current_state())
@app.route('/api/worker/start')
def api_worker_start():
worker.start_worker()
return jsonify(worker.current_state())
@app.route('/api/worker/stop')
def api_worker_stop():
worker.stop_worker()
return jsonify(worker.current_state())
## Authentification ## Authentification
@@ -362,3 +392,6 @@ def camera_status():
socketio.emit("camera_status", True) socketio.emit("camera_status", True)
else: else:
socketio.emit("camera_status", False) socketio.emit("camera_status", False)
if __name__ == "__main__":
app.run(debug=True, use_reloader=False, host="0.0.0.0", ssl_context="adhoc")

View File

@@ -1,12 +1,11 @@
# Importing the module to manage the connection to the printer. # Importing the module to manage the connection to the printer.
import escpos.printer import escpos.printer
import brother_ql # import brother_ql
from time import sleep, gmtime, strftime from time import sleep, gmtime, strftime
import os.path import os.path
from PIL import Image, ImageEnhance, ImageOps from PIL import Image, ImageEnhance, ImageOps
import numpy as np import numpy as np
class Printer(object): class Printer(object):
""" """
# The connection is based on the ESC/POS library # The connection is based on the ESC/POS library
@@ -39,7 +38,9 @@ class Printer(object):
self.usb_args["idProduct"] = self.vendor_id self.usb_args["idProduct"] = self.vendor_id
def check_paper(self) -> bool: def check_paper(self) -> bool:
# Let's check paper status """
On printers that support it, we check that the printer has paper
"""
self.app.logger.debug("Checking paper status...") self.app.logger.debug("Checking paper status...")
self.printer.open(self.usb_args) self.printer.open(self.usb_args)
status = self.printer.paper_status() status = self.printer.paper_status()
@@ -58,10 +59,13 @@ class Printer(object):
self.printer.close() self.printer.close()
def init_printer(self): def init_printer(self):
"""
Check if the printer online ? Is the communication with the printer successfull ?
"""
# Is the printer online ? Is the communication with the printer successfull ? # TODO: This could happen directly when creating a new Printer class
if os.getenv("FLASK_DEBUG"): if os.getenv("FLASK_DEBUG"):
waiting_elapsed = 1 waiting_elapsed = 15
else: else:
waiting_elapsed = 10 waiting_elapsed = 10
@@ -122,10 +126,17 @@ class Printer(object):
return True return True
def print_sms(self, msg, signature="", bold=False): def _print_sms(self, msg, signature="", bold=False):
if not isinstance(msg,str):
self.app.logger.error("It is not possible to print a " + str(type(msg)) + ", only strings.")
raise ValueError
# We make sure that the signature is not something too goofy
clean_msg = str(msg) + "\n" clean_msg = str(msg) + "\n"
clean_signature = str(signature) clean_signature = str(signature)
# Make checks on the size of the message being printed
if len(clean_msg) > 4096: if len(clean_msg) > 4096:
self.app.logger.warning( self.app.logger.warning(
"Could not print message of this length: " + str(len(clean_msg)) "Could not print message of this length: " + str(len(clean_msg))
@@ -146,6 +157,8 @@ class Printer(object):
+ ", needs to be below 256 caracters long." + ", needs to be below 256 caracters long."
) )
# Do the actual printing
# We would pop the next element in the queue here, if it's a sms type
try: try:
self.printer.open(self.usb_args) self.printer.open(self.usb_args)
self.printer.set(align="center", font="a", bold=bold) self.printer.set(align="center", font="a", bold=bold)
@@ -160,14 +173,14 @@ class Printer(object):
self.app.logger.info("Printed text") self.app.logger.info("Printed text")
return True return True
def print_img(self, path, sign="", center=True, process=False): def _print_img(self, path, signature="", center=True, process=False):
clean_signature = str(sign) clean_signature = str(signature)
if len(sign) > 256: if len(signature) > 256:
self.app.logger.warning( self.app.logger.warning(
"Could not print signature of this length: " + str(len(clean_signature)) "Could not print signature of this length: " + str(len(clean_signature))
) )
raise Exception( raise ValueError(
"Could not print signature of this length :" "Could not print signature of this length :"
+ str(len(clean_signature)) + str(len(clean_signature))
+ ", needs to be below 256 caracters long." + ", needs to be below 256 caracters long."
@@ -175,7 +188,7 @@ class Printer(object):
if not os.path.isfile(str(path)): if not os.path.isfile(str(path)):
self.app.logger.warning("File does not exist : " + str(path)) self.app.logger.warning("File does not exist : " + str(path))
raise Exception( raise OSError(
"The file path for this image :" "The file path for this image :"
+ str(path) + str(path)
+ " wasn't found. Please try again." + " wasn't found. Please try again."
@@ -186,34 +199,40 @@ class Printer(object):
if process: if process:
try: try:
self.app.logger.debug("Proccessing the image") self.app.logger.debug("Proccessing the image")
path = process_image(self, path) path = _process_image(self, path)
except Exception as e: except RuntimeError as e:
self.app.logger.error(str(e)) self.app.logger.error("Error while processing the image, aborting print : %s",str(e))
return False raise e
else: else:
self.app.logger.warning("Not proccessing the image") self.app.logger.warning("Not proccessing the image")
try: try:
self.printer.open(self.usb_args) self.printer.open(self.usb_args)
self.printer.image(path, center=center) self.printer.image(path, center=center)
self.printer.textln(signature)
self.printer.close() self.printer.close()
self.app.logger.debug("Printed an image : " + str(path)) self.app.logger.debug("Printed an image : " + str(path))
os.remove(path)
self.app.logger.debug("Removed image : " + str(path))
except Exception as e: except Exception as e:
self.app.logger.error(str(e)) self.app.logger.error(str(e))
raise RuntimeError("Could not print the picture") from e raise RuntimeError("Could not print the picture") from e
finally: finally:
try:
os.remove(path)
except OSError as e:
raise e
self.app.logger.debug("Removed image : " + str(path))
try: try:
self.printer.close() self.printer.close()
except Exception as e: except Exception as e:
self.app.logger.error(str(e)) self.app.logger.error("Could not close the printer connexion %s", str(e))
raise RuntimeError("Could not close the printer connexion. ") from e raise RuntimeError("Could not close the printer connexion. ") from e
self.app.logger.info("Printed a picture") self.app.logger.info("Printed a picture")
return True return True
def qr(self, content): def _qr(self, content):
try: try:
self.printer.open(self.usb_args) self.printer.open(self.usb_args)
self.printer.qr(content, center=True) self.printer.qr(content, center=True)
@@ -226,7 +245,7 @@ class Printer(object):
self.app.logger.info("Printed a QR") self.app.logger.info("Printed a QR")
return True return True
def cut(self): def _cut(self):
try: try:
self.printer.open(self.usb_args) self.printer.open(self.usb_args)
self.printer.cut() self.printer.cut()
@@ -239,8 +258,20 @@ class Printer(object):
self.app.logger.info("Did a cut") self.app.logger.info("Did a cut")
return True return True
def print_task(self, task_type, data):
"""Execute actual print based on task type"""
match (task_type.value):
case ("text"):
self._print_sms(data["txt"],signature=data["sign"])
case ("image"):
self._print_img(data["img"], signature=data["sign"],process=data["process"])
case ("cut"):
self._cut()
case _:
raise RuntimeError("This task type is not supported")
def process_image(self, path):
def _process_image(self, path):
brightness_factor = 1.5 # Used only if image is too dark brightness_factor = 1.5 # Used only if image is too dark
brightness_threshold = 100 # Brightness threshold (0255) brightness_threshold = 100 # Brightness threshold (0255)
contrast_factor = 0.6 # Less than 1.0 = lower contrast contrast_factor = 0.6 # Less than 1.0 = lower contrast
@@ -254,7 +285,7 @@ def process_image(self, path):
original_img = original_img.convert("RGB") original_img = original_img.convert("RGB")
# Resize while maintaining aspect ratio # Resize while maintaining aspect ratio
original_img.thumbnail((max_width, max_height), Image.LANCZOS) original_img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
self.app.logger.debug("Resized the image") self.app.logger.debug("Resized the image")
# # Convert to grayscale for dithering # # Convert to grayscale for dithering
@@ -286,18 +317,10 @@ def process_image(self, path):
# contrast_enhancer = ImageEnhance.Contrast(original_img) # contrast_enhancer = ImageEnhance.Contrast(original_img)
# original_img = contrast_enhancer.enhance(contrast_factor) # original_img = contrast_enhancer.enhance(contrast_factor)
# Final resize check
if original_img.height > max_height:
raise ValueError("Image is too long, sorry! Keep it below 575×1000 pixels.")
self.app.logger.error(
"Image is too long, sorry! Keep it below 575×1000 pixels."
)
return False
# Convert to JPEG and save # Convert to JPEG and save
jpeg_path = os.path.splitext(path)[0] + "_processed.jpg" jpeg_path = os.path.splitext(path)[0] + "_processed.jpg"
original_img.save(jpeg_path, format="JPEG", quality=95, optimize=True) original_img.save(jpeg_path, format="JPEG", quality=95, optimize=True)
app.logger.debug("Processed and saved image.") self.app.logger.debug("Processed and saved image.")
return jpeg_path return jpeg_path
@@ -387,6 +410,5 @@ def find_and_parse_borther_ql_printer():
print("No Brother QL printer found") print("No Brother QL printer found")
return None return None
def fint_and_parse_epson_printer(): def fint_and_parse_epson_printer():
pass pass