Compare commits
25 Commits
master
...
multi-prin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09e588c3ff | ||
|
|
0699775d35 | ||
|
|
7d19098b61 | ||
|
|
65e4a2ad9c | ||
|
|
af15ed8754 | ||
|
|
53010987f4 | ||
|
|
175dd3385a | ||
|
|
3a1d9b20fb | ||
|
|
c57e2f91a2 | ||
|
|
9ccd2b8bdf | ||
|
|
54678175ba | ||
|
|
2262840f75 | ||
|
|
ad3cb6231a | ||
|
|
adcc744e7a | ||
|
|
6d9db2d2aa | ||
|
|
651235a610 | ||
|
|
3c490e10b4 | ||
|
|
a2d1779e2b | ||
|
|
f841cd5628 | ||
|
|
db7e030a1f | ||
|
|
1218d3fbee | ||
|
|
ef613b3c10 | ||
|
|
e549cdc64b | ||
|
|
9e4ec6c1a5 | ||
|
|
9e77e0980b |
@@ -5,8 +5,6 @@ signature = "Anonymous"
|
||||
|
||||
# Printer settings
|
||||
[printer]
|
||||
vendor_id = 0x04b8
|
||||
device_id = 0x0e28
|
||||
upload_folder = "src/static/uploads"
|
||||
|
||||
# Raspberry Pi Configuration
|
||||
|
||||
60
src/main.py
60
src/main.py
@@ -36,7 +36,6 @@ import werkzeug.exceptions
|
||||
from flask_socketio import SocketIO
|
||||
from flask_limiter import Limiter
|
||||
from flask_limiter.util import get_remote_address
|
||||
from printer import Printer # The wrapper for the printer class
|
||||
from raspberry import Raspberry # The Raspberry pi control Class
|
||||
from web import Web # Wrapper for the web routes and API
|
||||
from print_queue import PrintQueue
|
||||
@@ -73,11 +72,7 @@ except OSError as e:
|
||||
|
||||
app.logger.debug("Config file loaded !")
|
||||
|
||||
# Define the USB connections here.
|
||||
vendor_id = configuration_file["printer"]["vendor_id"]
|
||||
device_id = configuration_file["printer"]["device_id"]
|
||||
UPLOAD_FOLDER = str(configuration_file["printer"]["upload_folder"])
|
||||
|
||||
try:
|
||||
os.mkdir(UPLOAD_FOLDER)
|
||||
app.logger.debug("Directory %s created successfully.", UPLOAD_FOLDER)
|
||||
@@ -88,49 +83,48 @@ except PermissionError:
|
||||
sys.exit(77)
|
||||
|
||||
# Output the config file
|
||||
if os.getenv("FLASK_DEBUG"):
|
||||
if not os.getenv("FLASK_DEBUG") is None and os.getenv("FLASK_DEBUG") is True:
|
||||
pprint.pprint(configuration_file)
|
||||
|
||||
# We define the app module used by Flask
|
||||
app.secret_key = configuration_file["secrets"]["flask_secret_key"]
|
||||
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
|
||||
app.config["ALLOWED_EXTENSIONS"] = ALLOWED_EXTENSIONS
|
||||
app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 3Mb for a file upload
|
||||
app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 10Mb for a file upload
|
||||
app.config["TEMPLATES_AUTO_RELOAD"] = True
|
||||
|
||||
# Printer connection
|
||||
# Uses the class defined in the printer.py file
|
||||
printer = Printer(app, 0x04B8, 0x0E28)
|
||||
printer.init_printer()
|
||||
|
||||
# Find out if we are running on a Raspberry Pi
|
||||
rpi = Raspberry(
|
||||
printer,
|
||||
app,
|
||||
socketio,
|
||||
configuration_file["rpi"]["button_gpio_port_number"],
|
||||
configuration_file["rpi"]["indicator_gpio_port_number"],
|
||||
configuration_file["rpi"]["flash_gpio_port_number"],
|
||||
configuration_file["rpi"]["flash"],
|
||||
)
|
||||
|
||||
RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi()
|
||||
|
||||
# Queue creation
|
||||
print_queue = PrintQueue(app)
|
||||
|
||||
# Find out if we are running on a Raspberry Pi
|
||||
rpi = Raspberry(
|
||||
print_queue,
|
||||
app,
|
||||
configuration_file
|
||||
)
|
||||
RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi()
|
||||
|
||||
|
||||
# Web & API management
|
||||
web = Web(app, print_queue)
|
||||
|
||||
# Start worker thread
|
||||
worker = PrintWorker(app, print_queue, printer, socketio)
|
||||
worker.start()
|
||||
# When created, the worker will try to find printers connected to the system
|
||||
try:
|
||||
worker = PrintWorker(app, print_queue)
|
||||
worker.start()
|
||||
except Exception as e:
|
||||
app.logger.error("Could not start the worker because %s ", str(e))
|
||||
sys.exit(-1)
|
||||
|
||||
# The rate limit
|
||||
limiter = Limiter(
|
||||
get_remote_address, app=app, default_limits=["1500 per day", "500 per hour"]
|
||||
)
|
||||
|
||||
|
||||
app.logger.info("🖶 Welcome to LittlePrynter !")
|
||||
|
||||
# General routes
|
||||
@app.route("/")
|
||||
@limiter.limit("1/second", override_defaults=False)
|
||||
@@ -197,6 +191,9 @@ def web_print_img():
|
||||
"No signature found for this print, using default signature : %s", str(e)
|
||||
)
|
||||
sign = configuration_file["defaults"]["signature"]
|
||||
except werkzeug.exceptions.RequestEntityTooLarge as e:
|
||||
flash("Whoops, image is too big: " + str(e), "error")
|
||||
return redirect(url_for("index"))
|
||||
|
||||
# check if the post request has the file part
|
||||
if "img" not in request.files:
|
||||
@@ -304,6 +301,7 @@ def api_print_image():
|
||||
return "OK", 200
|
||||
|
||||
|
||||
# TODO: This might not depend on the Raspberry Pi
|
||||
@app.route("/api/camera/picture", methods=["GET"])
|
||||
def camera_picture():
|
||||
"""Returns a picture taken by the camera on a raspberry pi"""
|
||||
@@ -322,6 +320,12 @@ def api_queue_status():
|
||||
return jsonify(web.get_queue_state())
|
||||
|
||||
|
||||
@app.route("/api/queue/completed", methods=["GET"])
|
||||
def api_queue_completed():
|
||||
"""API endpoint that returns the finished tasks"""
|
||||
return jsonify(web.get_queue_completed())
|
||||
|
||||
|
||||
@app.route("/api/worker", methods=["GET"])
|
||||
def api_worker_state():
|
||||
"""API endpoint to get the worker state"""
|
||||
@@ -403,4 +407,4 @@ def camera_status():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=True, use_reloader=False, host="0.0.0.0", ssl_context="adhoc")
|
||||
app.run(use_reloader=False, host="0.0.0.0", ssl_context="adhoc")
|
||||
|
||||
@@ -76,7 +76,16 @@ class PrintQueue:
|
||||
"""Return current queue state"""
|
||||
with self._lock:
|
||||
self.app.logger.debug("Return current queue state")
|
||||
return [{"task_id": t.task_id, "status": t.status} for t in self._queue]
|
||||
return [
|
||||
{"task_id": t.task_id, "status": t.status, "type": str(t.task_type), "content": str(t.get_print_data())}
|
||||
for t in self._queue
|
||||
]
|
||||
|
||||
def get_queue_completed(self):
|
||||
"""Return completed queue elements"""
|
||||
with self._lock:
|
||||
self.app.logger.debug("Return completed queue elements")
|
||||
return self._completed_tasks
|
||||
|
||||
def get_status(self, task_id):
|
||||
"""Get full status info for a task"""
|
||||
|
||||
500
src/printer.py
500
src/printer.py
@@ -1,115 +1,129 @@
|
||||
"""
|
||||
This class manages connexion to a Printer
|
||||
"""
|
||||
# import brother_ql
|
||||
from time import sleep
|
||||
import os.path
|
||||
|
||||
import os.path
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
from enum import Enum
|
||||
import uuid
|
||||
import threading
|
||||
import usb.core
|
||||
from PIL import Image, ImageEnhance
|
||||
import numpy as np
|
||||
|
||||
# Importing the module to manage the connection to the printer.
|
||||
# Importing the modules needed for each supported printer Type
|
||||
import escpos.printer
|
||||
from brother_ql.models import ModelsManager
|
||||
from brother_ql.backends import backend_factory
|
||||
from brother_ql.raster import BrotherQLRaster
|
||||
from brother_ql.conversion import convert
|
||||
from brother_ql.backends.helpers import send
|
||||
|
||||
|
||||
class Printer():
|
||||
class PrinterType(Enum):
|
||||
"""
|
||||
# The connection is based on the ESC/POS library
|
||||
|
||||
## Connection to the USB printer
|
||||
|
||||
## Making sure the printer is alive
|
||||
|
||||
## Making sure it has paper
|
||||
|
||||
## Define default print settings
|
||||
|
||||
## Print starting message, log time of first print, cut.
|
||||
|
||||
## Annonce readyness : return a positive pong message.
|
||||
What are the capacities of a Printer ?
|
||||
"""
|
||||
|
||||
# Is the printer ready to accept a new print ?
|
||||
ready = False
|
||||
EPSON = "epson"
|
||||
BROTHER = "brother"
|
||||
|
||||
def __init__(self, app, device_id, vendor_id):
|
||||
super().__init__()
|
||||
|
||||
# For Brother-QL Printers
|
||||
@dataclass
|
||||
class PrinterInfo:
|
||||
"""
|
||||
Brother-QL printer information
|
||||
"""
|
||||
identifier: str
|
||||
backend: str
|
||||
protocol: str
|
||||
vendor_id: str
|
||||
product_id: str
|
||||
serial_number: str
|
||||
name: str = "Brother QL Printer"
|
||||
model: str = "QL-570"
|
||||
status: str = "unknown"
|
||||
label_type: str = "unknown"
|
||||
label_size: str = "unknown"
|
||||
label_width: int = 0
|
||||
label_height: int = 0
|
||||
|
||||
def __getitem__(self, item):
|
||||
return getattr(self, item)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
setattr(self, key, value)
|
||||
|
||||
|
||||
class Printer(ABC):
|
||||
"""
|
||||
If it outputs printed paper and speaks like a printer, then it must be a printer.
|
||||
"""
|
||||
|
||||
def __init__(self, app, vendor_id, device_id, printer_type: PrinterType):
|
||||
"""
|
||||
We initialize a Printer via it's USB connexion, and generate a unique ID
|
||||
"""
|
||||
self.id = uuid.uuid4()
|
||||
self.app = app
|
||||
self.ready = False
|
||||
self.printer = None
|
||||
self.device_id = device_id
|
||||
self.vendor_id = vendor_id
|
||||
self.device_id = device_id
|
||||
self.ready = False
|
||||
self.printer_type = printer_type
|
||||
self._lock = threading.Lock()
|
||||
|
||||
@abstractmethod
|
||||
def _has_paper(self) -> bool:
|
||||
"""Check if the printer has papier"""
|
||||
|
||||
@abstractmethod
|
||||
def _state(self) -> bool:
|
||||
"""Reports the state of the Printer"""
|
||||
|
||||
@abstractmethod
|
||||
def print_task(self, task_type, data) -> None:
|
||||
"""Takes a PrintTask and executes it"""
|
||||
|
||||
|
||||
class EscPosPrinter(Printer):
|
||||
"""
|
||||
Create a new ESC/POS based printer.
|
||||
"""
|
||||
|
||||
def __init__(self, app, vendor_id, device_id):
|
||||
"""
|
||||
Create a connexion to a ESC/POS Printer via USB,
|
||||
Making sure the printer is alive,
|
||||
Making sure it has paper,
|
||||
Define default print settings
|
||||
"""
|
||||
super().__init__(app, vendor_id, device_id, printer_type=PrinterType.EPSON)
|
||||
self.printer = None
|
||||
self.usb_args = {}
|
||||
self.usb_args["idVendor"] = self.device_id
|
||||
self.usb_args["idProduct"] = self.vendor_id
|
||||
self.usb_args["idVendor"] = self.vendor_id
|
||||
self.usb_args["idProduct"] = self.device_id
|
||||
|
||||
def check_paper(self) -> bool:
|
||||
"""
|
||||
On printers that support it, we check that the printer has paper
|
||||
"""
|
||||
self.app.logger.debug("Checking paper status...")
|
||||
self.printer.open(self.usb_args)
|
||||
status = self.printer.paper_status()
|
||||
match status:
|
||||
case 0:
|
||||
self.app.logger.error("Printer has no more paper, aborting...")
|
||||
self.printer.close()
|
||||
raise RuntimeError("No more paper in the printer")
|
||||
case 1:
|
||||
self.app.logger.warning(
|
||||
"Printer needs paper to be changed very soon ! "
|
||||
)
|
||||
self.printer.close()
|
||||
case 2:
|
||||
self.app.logger.debug("Printer has paper, good to go")
|
||||
self.printer.close()
|
||||
try:
|
||||
# This also calls open(), which we need to close()
|
||||
# or else the device will appear as busy.
|
||||
p = escpos.printer.Usb(self.vendor_id, self.device_id, 0, profile="TM-P80")
|
||||
except escpos.exceptions.DeviceNotFoundError as e:
|
||||
self.app.logger.error(
|
||||
"The USB device is not plugged in : %s",
|
||||
str(e),
|
||||
)
|
||||
except Exception as e:
|
||||
self.app.logger.error("Printer could not be connected : %s ", str(e))
|
||||
|
||||
def init_printer(self):
|
||||
"""
|
||||
Check if the printer online ? Is the communication with the printer successfull ?
|
||||
"""
|
||||
|
||||
# TODO: This could happen directly when creating a new Printer class
|
||||
if os.getenv("FLASK_DEBUG"):
|
||||
waiting_elapsed = 3
|
||||
else:
|
||||
waiting_elapsed = 10
|
||||
|
||||
self.app.logger.debug("Waiting for printer to get online...")
|
||||
|
||||
while not self.ready:
|
||||
try:
|
||||
# This also calls open(), which we need to close()
|
||||
# or else the device will appear as busy.
|
||||
p = escpos.printer.Usb(
|
||||
self.device_id, self.vendor_id, 0, profile="TM-P80"
|
||||
)
|
||||
except RuntimeError as e:
|
||||
self.app.logger.error(
|
||||
"The USB device is not plugged in, trying again %s : %s",
|
||||
waiting_elapsed,
|
||||
str(e),
|
||||
)
|
||||
|
||||
try:
|
||||
if p.is_online():
|
||||
self.ready = True
|
||||
self.app.logger.debug("Printer online !")
|
||||
except RuntimeError as e:
|
||||
self.app.logger.error(
|
||||
"Error while getting the printer online %s : %s",
|
||||
waiting_elapsed,
|
||||
str(e),
|
||||
)
|
||||
|
||||
sleep(1)
|
||||
waiting_elapsed -= 1
|
||||
if waiting_elapsed < 1:
|
||||
self.app.logger.error(
|
||||
"Printer took more than 30 seconds to get online, aborting..."
|
||||
)
|
||||
waiting_elapsed = 1 # Reset the waiting time for the next print.
|
||||
return False
|
||||
try:
|
||||
if p.is_online():
|
||||
self.app.logger.debug("Printer online !")
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
# Setting up the printing options.
|
||||
p.set(
|
||||
@@ -130,15 +144,38 @@ class Printer():
|
||||
# Beware : if we print every time the printer becomes ready, it means
|
||||
# we are printing before and after every print !
|
||||
self.printer = p
|
||||
self.printer.close() # We close the connexion to the Printer
|
||||
|
||||
try:
|
||||
self._has_paper()
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
self.ready = True
|
||||
self.printer.close()
|
||||
|
||||
self.check_paper()
|
||||
|
||||
return True
|
||||
|
||||
def _print_sms(self, msg, signature="", bold=False):
|
||||
def _has_paper(self):
|
||||
"""Check if the printer has paper left"""
|
||||
self.app.logger.debug("Checking paper status...")
|
||||
self.printer.open(self.usb_args)
|
||||
status = self.printer.paper_status()
|
||||
match status:
|
||||
case 0:
|
||||
self.app.logger.error("Printer has no more paper, aborting...")
|
||||
self.printer.close()
|
||||
raise RuntimeError("No more paper in the printer")
|
||||
case 1:
|
||||
self.app.logger.warning(
|
||||
"Printer needs paper to be changed very soon ! "
|
||||
)
|
||||
self.printer.close()
|
||||
return True
|
||||
case 2:
|
||||
self.app.logger.debug("Printer has paper, good to go")
|
||||
self.printer.close()
|
||||
return True
|
||||
|
||||
def _print_txt(self, msg, signature="", bold=False):
|
||||
self.ready = False
|
||||
if not isinstance(msg, str):
|
||||
self.app.logger.error(
|
||||
"It is not possible to print a " + str(type(msg)) + ", only strings."
|
||||
@@ -178,6 +215,7 @@ class Printer():
|
||||
self.printer.textln(clean_msg)
|
||||
if clean_signature:
|
||||
self.printer.textln(clean_signature)
|
||||
self.printer.textln()
|
||||
self.printer.close()
|
||||
except Exception as e:
|
||||
self.app.logger.error("Unable to print because : " + str(e))
|
||||
@@ -186,9 +224,10 @@ class Printer():
|
||||
) from e
|
||||
|
||||
self.app.logger.info("Printed text")
|
||||
return True
|
||||
self.ready = True
|
||||
|
||||
def _print_img(self, path, signature="", center=True, process=False):
|
||||
self.ready = False
|
||||
clean_signature = str(signature)
|
||||
|
||||
if len(signature) > 256:
|
||||
@@ -214,30 +253,33 @@ class Printer():
|
||||
if process:
|
||||
try:
|
||||
self.app.logger.debug("Proccessing the image")
|
||||
path = _process_image(self, path)
|
||||
processed_path = _process_image(self, path)
|
||||
except RuntimeError as e:
|
||||
self.app.logger.error(
|
||||
"Error while processing the image, aborting print : %s", str(e)
|
||||
)
|
||||
raise e
|
||||
else:
|
||||
processed_path = path
|
||||
self.app.logger.warning("Not proccessing the image")
|
||||
|
||||
try:
|
||||
self.printer.open(self.usb_args)
|
||||
self.printer.image(path, center=center)
|
||||
self.printer.image(processed_path, center=center)
|
||||
self.printer.textln(signature)
|
||||
self.printer.close()
|
||||
self.app.logger.debug("Printed an image : " + str(path))
|
||||
self.app.logger.debug("Printed an image : " + str(processed_path))
|
||||
except Exception as e:
|
||||
self.app.logger.error(str(e))
|
||||
raise RuntimeError("Could not print the picture") from e
|
||||
finally:
|
||||
try:
|
||||
os.remove(path)
|
||||
os.remove(processed_path)
|
||||
except OSError as e:
|
||||
raise e
|
||||
|
||||
self.app.logger.debug("Removed image : " + str(processed_path))
|
||||
self.app.logger.debug("Removed image : " + str(path))
|
||||
|
||||
try:
|
||||
@@ -249,22 +291,25 @@ class Printer():
|
||||
raise RuntimeError("Could not close the printer connexion. ") from e
|
||||
|
||||
self.app.logger.info("Printed a picture")
|
||||
return True
|
||||
self.ready = True
|
||||
|
||||
def _qr(self, content):
|
||||
self.ready = False
|
||||
try:
|
||||
self.printer.open(self.usb_args)
|
||||
self.printer.qr(content, center=True)
|
||||
self.printer.textln(content)
|
||||
self.printer.close()
|
||||
except RuntimeError as e:
|
||||
self.printer.close()
|
||||
self.app.logger.error(str(e))
|
||||
return False
|
||||
raise e
|
||||
|
||||
self.app.logger.info("Printed a QR")
|
||||
return True
|
||||
self.ready = True
|
||||
|
||||
def _cut(self):
|
||||
self.ready = False
|
||||
try:
|
||||
self.printer.open(self.usb_args)
|
||||
self.printer.cut()
|
||||
@@ -275,34 +320,236 @@ class Printer():
|
||||
raise e
|
||||
|
||||
self.app.logger.info("Did a cut")
|
||||
return True
|
||||
self.ready = True
|
||||
|
||||
def _state(self) -> bool:
|
||||
has_paper = self._has_paper()
|
||||
is_ready = self.ready
|
||||
|
||||
self.app.logger.debug("Has paper : %s " , has_paper )
|
||||
self.app.logger.debug("Ready : %s " , is_ready )
|
||||
|
||||
return is_ready and has_paper # and is_online
|
||||
|
||||
def print_task(self, task_type, data):
|
||||
"""Execute actual print based on task type"""
|
||||
match (task_type.value):
|
||||
case "text":
|
||||
self._print_sms(data["txt"], signature=data["sign"])
|
||||
case "image":
|
||||
self._print_img(
|
||||
data["img"], signature=data["sign"], process=data["process"]
|
||||
|
||||
|
||||
with self._lock:
|
||||
self.app.logger.debug("Acquired lock to start print")
|
||||
|
||||
i_m_ready = self._state()
|
||||
while not i_m_ready:
|
||||
self.app.logger.debug("Waiting for the printer to become ready, current state %s ", str(i_m_ready))
|
||||
i_m_ready = self._state()
|
||||
time.sleep(0.3)
|
||||
|
||||
self.app.logger.debug("Checked state to start printing : %s", self._state())
|
||||
self.ready = False
|
||||
try:
|
||||
self.app.logger.debug("Checking task type")
|
||||
match (task_type.value):
|
||||
case "text":
|
||||
|
||||
self._print_txt(data["txt"], signature=data["sign"])
|
||||
self.ready = True
|
||||
case "image":
|
||||
self._print_img(
|
||||
data["img"], signature=data["sign"], process=data["process"]
|
||||
)
|
||||
self.ready = True
|
||||
case "cut":
|
||||
self._cut()
|
||||
self.ready = True
|
||||
case "qr":
|
||||
self._qr(data["txt"])
|
||||
self.ready = True
|
||||
case _:
|
||||
raise RuntimeError("This task type is not supported")
|
||||
except Exception as e:
|
||||
self.app.logger.debug("Exception occured while printing %s", str(e))
|
||||
self.ready = True
|
||||
raise RuntimeError from e
|
||||
|
||||
|
||||
class BrotherPrinter(Printer):
|
||||
"""
|
||||
Manages connexion and capabilities of a BrotherQL Printer
|
||||
"""
|
||||
|
||||
def __init__(self, app, vendor_id, device_id):
|
||||
super().__init__(
|
||||
app, vendor_id="", device_id="", printer_type=PrinterType.BROTHER
|
||||
)
|
||||
self.printer = None
|
||||
self.usb_args = {}
|
||||
self.usb_args["idVendor"] = self.device_id
|
||||
self.usb_args["idProduct"] = self.vendor_id
|
||||
self.model_manager = ModelsManager()
|
||||
|
||||
# Code taken from https://github.com/5shekel/printit/blob/master/printer_utils.py
|
||||
|
||||
backend = backend_factory("pyusb")
|
||||
available_devices = backend["list_available_devices"]()
|
||||
|
||||
for printer in available_devices:
|
||||
self.app.logger.debug(f"Found device: {printer}")
|
||||
identifier = printer["identifier"]
|
||||
parts = identifier.split("/")
|
||||
|
||||
if len(parts) < 4:
|
||||
self.app.logger.warning(
|
||||
f"Skipping device with invalid identifier format: {identifier}"
|
||||
)
|
||||
case "cut":
|
||||
self._cut()
|
||||
case _:
|
||||
raise RuntimeError("This task type is not supported")
|
||||
continue
|
||||
|
||||
protocol = parts[0]
|
||||
# device_info = parts[2]
|
||||
serial_number = parts[3]
|
||||
|
||||
try:
|
||||
product_id_int = int(self.device_id, 16)
|
||||
for m in self.model_manager.iter_elements():
|
||||
if m.product_id == product_id_int:
|
||||
model = m.identifier
|
||||
break
|
||||
self.app.logger.debug(f"Matched printer model: {model}")
|
||||
except ValueError:
|
||||
self.app.logger.warning(f"Invalid product ID format: {m.product_id}")
|
||||
|
||||
self.printer_info = PrinterInfo(
|
||||
identifier=identifier,
|
||||
backend="pyusb",
|
||||
model=model,
|
||||
protocol=protocol,
|
||||
vendor_id=vendor_id,
|
||||
product_id=self.device_id,
|
||||
serial_number=serial_number,
|
||||
)
|
||||
|
||||
self.ready = True
|
||||
|
||||
def _has_paper(self):
|
||||
raise NotImplementedError("This printer model does not support this.")
|
||||
|
||||
def _state(self):
|
||||
return self.ready
|
||||
|
||||
def _print_img(self, data):
|
||||
"""
|
||||
Print a raster image via a Brother QL printer
|
||||
"""
|
||||
self.ready = False
|
||||
label_type = "102"
|
||||
rotate = 0
|
||||
dither = False
|
||||
try:
|
||||
# Prepare the image for printing
|
||||
qlr = BrotherQLRaster(self.printer_info["model"])
|
||||
|
||||
instructions = convert(
|
||||
qlr=qlr,
|
||||
images=[data["img"]],
|
||||
label=label_type,
|
||||
rotate=rotate,
|
||||
threshold=70,
|
||||
dither=dither,
|
||||
compress=True,
|
||||
red=False,
|
||||
dpi_600=False,
|
||||
hq=False,
|
||||
cut=True,
|
||||
)
|
||||
|
||||
# Debug logging
|
||||
if os.getenv("FLASK_DEBUG"):
|
||||
self.app.logger.debug(f"""
|
||||
Print parameters:
|
||||
- Label type: {label_type}
|
||||
- Rotate: {rotate}
|
||||
- Dither: {dither}
|
||||
- Model: {self.printer_info['model']}
|
||||
- Backend: {self.printer_info['backend']}
|
||||
- Identifier: {self.printer_info['identifier']}
|
||||
""")
|
||||
|
||||
# Try to print using Python API
|
||||
# send() = status = {
|
||||
# 'instructions_sent': True, # The instructions were sent to the printer.
|
||||
# 'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error'
|
||||
# 'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it.
|
||||
# 'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability).
|
||||
# 'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown.
|
||||
# }
|
||||
status = send(
|
||||
instructions=instructions,
|
||||
printer_identifier=self.printer_info["identifier"],
|
||||
backend_identifier="pyusb",
|
||||
)
|
||||
|
||||
if (
|
||||
not status["did_print"]
|
||||
or status["outcome"] == "error"
|
||||
or status["outcome"] == "unknown"
|
||||
):
|
||||
raise RuntimeError("Failed to print using Python API")
|
||||
|
||||
if status["printer_state"]:
|
||||
self.ready = bool(status["printer_state"])
|
||||
else:
|
||||
self.ready = True
|
||||
|
||||
except usb.core.USBError as e:
|
||||
# Treat timeout errors as successful since they often occur after print completion
|
||||
if e.errno == 110: # Operation timed out
|
||||
self.app.logger.debug(
|
||||
"USB timeout occurred - this is normal and the print likely completed"
|
||||
)
|
||||
self.app.logger.debug("Print completed (timeout is normal)")
|
||||
self.ready = True
|
||||
|
||||
error_msg = f"USBError encountered: {e}"
|
||||
self.app.logger.debug(error_msg)
|
||||
raise RuntimeError from e
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error during printing: {str(e)}"
|
||||
self.app.logger.debug(error_msg)
|
||||
raise RuntimeError from e
|
||||
|
||||
def print_task(self, task_type, data):
|
||||
"""Execute actual print based on task type"""
|
||||
with self._lock:
|
||||
if self._state:
|
||||
self._state = False
|
||||
match (task_type.value):
|
||||
case "image":
|
||||
self._print_img(data["img"])
|
||||
self._state = True
|
||||
case "cut":
|
||||
# The cut happens by default on Brother QL printers.
|
||||
self._state = True
|
||||
case _:
|
||||
raise RuntimeError("This task type is not supported")
|
||||
else:
|
||||
raise RuntimeError("The printer is not ready to print yet !")
|
||||
|
||||
# These values are by default for now
|
||||
|
||||
# raise NotImplementedError("This printer type is not implemented yet")
|
||||
|
||||
|
||||
def _process_image(self, path):
|
||||
brightness_factor = 1.5 # Used only if image is too dark
|
||||
brightness_threshold = 100 # Brightness threshold (0–255)
|
||||
contrast_factor = 0.6 # Less than 1.0 = lower contrast
|
||||
contrast_factor = 2 # Less than 1.0 = lower contrast
|
||||
max_width = 575
|
||||
max_height = 1000
|
||||
|
||||
with Image.open(path) as original_img:
|
||||
# Convert to RGB if needed (JPEG doesn't support alpha)
|
||||
if original_img.mode in ("RGBA", "P"):
|
||||
self.app.logger.debug("Converting the image to RGB from RGBA")
|
||||
self.app.logger.debug("Converting the image from RGBA to RGBA")
|
||||
original_img = original_img.convert("RGB")
|
||||
|
||||
# Resize while maintaining aspect ratio
|
||||
@@ -332,16 +579,19 @@ def _process_image(self, path):
|
||||
self.app.logger.debug(
|
||||
f"Image too dark, increasing brightness by a factor of {brightness_factor:.2f}"
|
||||
)
|
||||
enhancer = ImageEnhance.Brightness(original_img)
|
||||
original_img = enhancer.enhance(brightness_factor)
|
||||
enhancer = ImageEnhance.Brightness(grayscale)
|
||||
grayscale = enhancer.enhance(brightness_factor)
|
||||
|
||||
# # Reduce contrast
|
||||
# contrast_enhancer = ImageEnhance.Contrast(original_img)
|
||||
# original_img = contrast_enhancer.enhance(contrast_factor)
|
||||
# Computer current contrast of grayscale image
|
||||
contrast = np.clip(np.std(np.array(grayscale)), 0, 255)
|
||||
self.app.logger.debug("Standard deviation of the contrast : %s", contrast)
|
||||
# # Enhance contrast
|
||||
contrast_enhancer = ImageEnhance.Contrast(grayscale)
|
||||
original_img = contrast_enhancer.enhance(contrast_factor)
|
||||
|
||||
# Convert to JPEG and save
|
||||
jpeg_path = os.path.splitext(path)[0] + "_processed.jpg"
|
||||
original_img.save(jpeg_path, format="JPEG", quality=95, optimize=True)
|
||||
grayscale.save(jpeg_path, format="JPEG", quality=95, optimize=True)
|
||||
self.app.logger.debug("Processed and saved image.")
|
||||
|
||||
return jpeg_path
|
||||
|
||||
160
src/printers.py
Normal file
160
src/printers.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""
|
||||
A collection of Printers.
|
||||
|
||||
It has methods to discover printers, and provides an interface for
|
||||
the methods expected from printers.
|
||||
"""
|
||||
|
||||
from collections.abc import Set
|
||||
import usb.core
|
||||
import usb.util
|
||||
from printer import Printer, EscPosPrinter, BrotherPrinter
|
||||
|
||||
|
||||
class Printers:
|
||||
"""
|
||||
Finds and creates a set of Printer that can be used by the Workers to print.
|
||||
"""
|
||||
|
||||
def __init__(self, app):
|
||||
"""
|
||||
Discover printers connected to the computer and return a Collection of Printer()
|
||||
"""
|
||||
self.app = app
|
||||
self.printers = self._discover_printers()
|
||||
|
||||
def _discover_printers(self) -> Set[Printer]:
|
||||
"""
|
||||
Gets connected USB printer devices using the pyusb library.
|
||||
|
||||
We analyse the USB devices, get the ones that match
|
||||
the printer class ( 7 ) and for Brother and EPSON printers,
|
||||
try to create a Printer object that can be used by the Worker class
|
||||
to execute prints.
|
||||
|
||||
Returns a set of Printer
|
||||
"""
|
||||
|
||||
self.app.logger.debug("Discovering USB Devices connected to this system")
|
||||
|
||||
printers = set()
|
||||
|
||||
# Find all connected USB devices
|
||||
devices = usb.core.find(find_all=True, custom_match=_FindClass(7))
|
||||
if not devices:
|
||||
self.app.logger.warning(
|
||||
"No USB devices of class 7 ( printers ) found or pyusb could not access the bus."
|
||||
)
|
||||
raise RuntimeError(
|
||||
"No USB devices of class 7 ( printers ) found or pyusb could not access the bus."
|
||||
)
|
||||
|
||||
for dev in devices:
|
||||
# Attempt to get the manufacturer and product strings
|
||||
try:
|
||||
manufacturer = usb.util.get_string(dev, dev.iManufacturer)
|
||||
except Exception:
|
||||
manufacturer = "Unknown"
|
||||
|
||||
try:
|
||||
product = usb.util.get_string(dev, dev.iProduct)
|
||||
except Exception:
|
||||
product = "Unknown"
|
||||
self.app.logger.debug(
|
||||
"Looking at %s %s (%s:%s)",
|
||||
manufacturer,
|
||||
product,
|
||||
hex(dev.idVendor),
|
||||
hex(dev.idProduct),
|
||||
)
|
||||
|
||||
if manufacturer == "EPSON":
|
||||
try:
|
||||
# We create a new EscPosPrinter()
|
||||
self.app.logger.debug("Trying to creat a new EPSON printer")
|
||||
prid = dev.idProduct
|
||||
vendir = dev.idVendor
|
||||
escpos_printer = EscPosPrinter(
|
||||
self.app, vendor_id=vendir, device_id=prid
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
# If the object creation is successfull, we add it to the list of Printers
|
||||
printers.add(escpos_printer)
|
||||
self.app.logger.debug("Found a %s printer", manufacturer)
|
||||
|
||||
# We already found the type of printer,
|
||||
# we don't need an extra comparaison.
|
||||
continue
|
||||
|
||||
# or a Brother Printer
|
||||
if manufacturer == "Brother":
|
||||
try:
|
||||
# We create a new BrotherPrinter()
|
||||
self.app.logger.debug("Trying to creat a new BROTHER printer")
|
||||
prid = dev.idProduct
|
||||
vendir = dev.idVendor
|
||||
brother_printer = BrotherPrinter(
|
||||
self.app, vendor_id=vendir, device_id=prid
|
||||
)
|
||||
except Exception as e:
|
||||
self.app.logger.error(
|
||||
"Could not create a %s printer class with %s:%s" , product,
|
||||
dev.idVendor,
|
||||
dev.idProduct,
|
||||
)
|
||||
raise e
|
||||
|
||||
# If the object creation is successfull, we add it to the list of Printers
|
||||
printers.add(brother_printer)
|
||||
self.app.logger.debug("Found a %s printer" , manufacturer)
|
||||
|
||||
self.app.logger.debug("Found %s printers" , len(printers))
|
||||
|
||||
if len(printers) < 1:
|
||||
self.app.logger.warning("Not printers found ! Please plug in a Printer and restart the program.")
|
||||
raise RuntimeError("No printers found")
|
||||
|
||||
return printers
|
||||
|
||||
def any(self) -> Printer:
|
||||
"""
|
||||
Return a dict key: UUID, value: Printer, with any connected printer.
|
||||
"""
|
||||
if len(self.printers) > 0:
|
||||
for i in self.printers:
|
||||
return i
|
||||
else:
|
||||
raise RuntimeError("No printers available")
|
||||
|
||||
# def get_printer(self, printer_type):
|
||||
# """
|
||||
# Return a specific printer
|
||||
|
||||
# printer_type -- a printer type
|
||||
# """
|
||||
# return NotImplementedError()
|
||||
|
||||
|
||||
class _FindClass:
|
||||
"""
|
||||
Use by usb.core to modify the way USB devices are found
|
||||
Taken from pyUSB documentation on Github
|
||||
"""
|
||||
def __init__(self, class_):
|
||||
self._class = class_
|
||||
|
||||
def __call__(self, device):
|
||||
# first, let's check the device
|
||||
if device.bDeviceClass == self._class:
|
||||
return True
|
||||
# ok, transverse all devices to find an
|
||||
# interface that matches our class
|
||||
for cfg in device:
|
||||
# find_descriptor: what's it?
|
||||
intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class)
|
||||
if intf is not None:
|
||||
return True
|
||||
|
||||
return False
|
||||
@@ -10,12 +10,12 @@ import io # To check if we are on a Raspberry Pi
|
||||
import subprocess
|
||||
import os
|
||||
from time import sleep, gmtime, strftime
|
||||
from flask_socketio import SocketIO
|
||||
from gpiozero import Button, LED, DigitalOutputDevice
|
||||
from PIL import Image
|
||||
from task import TextTask, ImageTask, CutTask
|
||||
|
||||
class Raspberry():
|
||||
|
||||
class Raspberry:
|
||||
"""
|
||||
This class will manage three things :
|
||||
- Connecting to a USB webcam
|
||||
@@ -23,29 +23,22 @@ class Raspberry():
|
||||
- Activating a flash ( or light )
|
||||
- Flash an indicator light
|
||||
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
# dede
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
print_queue,
|
||||
app,
|
||||
socketio,
|
||||
button_gpio_port_number,
|
||||
indicator_gpio_port_number,
|
||||
flash_gpio_port_number,
|
||||
is_flash_present,
|
||||
configuration_file
|
||||
):
|
||||
self.print_queue = print_queue
|
||||
self.socketio = socketio
|
||||
self.app = app
|
||||
|
||||
self.flash_gpio = flash_gpio_port_number
|
||||
self.is_flash_present = is_flash_present
|
||||
self.button_gpio = button_gpio_port_number
|
||||
self.led_gpio = indicator_gpio_port_number
|
||||
self.configuration_file = configuration_file
|
||||
self.image_path = self.app.config["UPLOAD_FOLDER"] + "/image.jpg"
|
||||
|
||||
def is_raspberry_pi(self, raise_on_errors=False):
|
||||
def is_raspberry_pi(self):
|
||||
"""
|
||||
Checking if we are on a Raspberry Pi by checking
|
||||
information on the /proc/cpuinfo file
|
||||
@@ -72,9 +65,10 @@ class Raspberry():
|
||||
return False
|
||||
|
||||
if not found:
|
||||
self.app.logger.error(
|
||||
"Couldn't get sufficient hardware information from /proc/cpuinfo, Unable to determine if we are on a Raspberry Pi."
|
||||
self.app.logger.warning(
|
||||
"Couldn't get sufficient hardware information from /proc/cpuinfo"
|
||||
)
|
||||
self.app.logger.warning("Unable to determine if we are on a Raspberry Pi.")
|
||||
return False
|
||||
except IOError:
|
||||
self.app.logger.error("Unable to open `/proc/cpuinfo`.")
|
||||
@@ -83,28 +77,38 @@ class Raspberry():
|
||||
self.app.logger.debug("It seems we are on a Raspberry Pi")
|
||||
|
||||
try:
|
||||
self.initialise_gpio()
|
||||
self._initialise_gpio()
|
||||
except Exception as e:
|
||||
self.app.logger.debug("Could not init GPIO : " + str(e))
|
||||
raise e
|
||||
return True
|
||||
|
||||
def initialise_gpio(self):
|
||||
def _initialise_gpio(self):
|
||||
"""
|
||||
Set GPIO ports from configuration and activate them
|
||||
to show the user it's working.
|
||||
"""
|
||||
self.app.logger.debug("Initializing GPIO")
|
||||
|
||||
self.led = LED(self.led_gpio)
|
||||
self.led = LED(self.configuration_file["rpi"]["indicator_gpio_port_number"])
|
||||
self.app.logger.debug("Activated indicator LED")
|
||||
self.indicator_countdown(iters=3)
|
||||
self.button = Button(self.button_gpio, pull_up=True, bounce_time=0.1)
|
||||
self.button = Button(
|
||||
self.configuration_file["rpi"]["button_gpio_port_number"],
|
||||
pull_up=True, bounce_time=0.1
|
||||
)
|
||||
self.button.when_pressed = self.on_button_pressed
|
||||
self.app.logger.debug("Activated button")
|
||||
|
||||
# The "flash" is a relay-controlled device ( light bulb for example )
|
||||
self.flash = DigitalOutputDevice(self.flash_gpio)
|
||||
self.flash = DigitalOutputDevice(self.configuration_file["rpi"]["flash_gpio_port_number"])
|
||||
self.flash_toggle()
|
||||
self.app.logger.debug("Activated flash")
|
||||
|
||||
def indicator_countdown(self, iters=10, multi=10):
|
||||
"""
|
||||
Activates the LED faster and faster to show a countdown
|
||||
"""
|
||||
for i in range(iters, 0, -1):
|
||||
self.led.on()
|
||||
sleep(i / multi)
|
||||
@@ -112,7 +116,10 @@ class Raspberry():
|
||||
sleep(i / multi)
|
||||
|
||||
def indicator_led(self, timing=0.2, l=5):
|
||||
for i in range(l):
|
||||
"""
|
||||
Turns on the indicator LED for a certain period of time
|
||||
"""
|
||||
for _ in range(l):
|
||||
self.app.logger.debug("LED turned on")
|
||||
self.led.on()
|
||||
sleep(timing)
|
||||
@@ -121,6 +128,9 @@ class Raspberry():
|
||||
sleep(timing)
|
||||
|
||||
def flash_toggle(self):
|
||||
"""
|
||||
Flashes the flash
|
||||
"""
|
||||
self.app.logger.debug("Flash turned on")
|
||||
self.flash.on()
|
||||
sleep(0.3)
|
||||
@@ -128,6 +138,9 @@ class Raspberry():
|
||||
self.app.logger.debug("Flash turned off")
|
||||
|
||||
def take_picture(self):
|
||||
"""
|
||||
Takes a picture via the USB webcam
|
||||
"""
|
||||
# Validate if the image path is valid
|
||||
if not os.path.isdir(os.path.dirname(self.image_path)):
|
||||
self.app.logger.error(
|
||||
@@ -173,6 +186,9 @@ class Raspberry():
|
||||
position="bottom_right",
|
||||
margin=10,
|
||||
):
|
||||
"""
|
||||
Takes an image and overlays it with a another picture.
|
||||
"""
|
||||
try:
|
||||
image = Image.open(image_path).convert("RGBA")
|
||||
logo = Image.open(logo_path).convert("RGBA")
|
||||
@@ -199,7 +215,9 @@ class Raspberry():
|
||||
y = image.height - logo.height - margin
|
||||
else:
|
||||
raise ValueError(
|
||||
"Invalid position. Choose from 'bottom_right', 'top_left', 'top_right', or 'bottom_left'."
|
||||
"Invalid position." +
|
||||
"Choose from 'bottom_right', 'top_left', " +
|
||||
" 'top_right', or 'bottom_left'."
|
||||
)
|
||||
|
||||
# Composite the logo onto the image
|
||||
@@ -217,6 +235,9 @@ class Raspberry():
|
||||
return True
|
||||
|
||||
def crop_to_square(self, image_path, output_path=None):
|
||||
"""
|
||||
Crop an image so that it becomes a square
|
||||
"""
|
||||
try:
|
||||
image = Image.open(image_path)
|
||||
width, height = image.size
|
||||
@@ -244,6 +265,10 @@ class Raspberry():
|
||||
return True
|
||||
|
||||
def on_button_pressed(self):
|
||||
"""
|
||||
When a button press is detected, a picture is taken from the webcam
|
||||
and added to the print queue.
|
||||
"""
|
||||
self.app.logger.debug("Button has been pressed")
|
||||
self.led.on()
|
||||
self.app.logger.debug("Counting down")
|
||||
@@ -263,8 +288,10 @@ class Raspberry():
|
||||
self.app.logger.debug("Printing picture")
|
||||
self.led.on()
|
||||
self.crop_to_square(self.image_path)
|
||||
self.print_queue.enqueue(ImageTask(self.image_path,signature="",process=True))
|
||||
self.print_queue.enqueue(TextTask(content="Imprimé par LittlePrynter", signature=""))
|
||||
self.print_queue.enqueue(ImageTask(self.image_path, signature="", process=True))
|
||||
self.print_queue.enqueue(
|
||||
TextTask(content="Imprimé par LittlePrynter", signature="")
|
||||
)
|
||||
time = strftime("%Y-%m-%d %H:%M", gmtime())
|
||||
self.print_queue.enqueue(TextTask(content=time, signature=""))
|
||||
self.print_queue.enqueue(CutTask())
|
||||
|
||||
20
src/task.py
20
src/task.py
@@ -14,6 +14,7 @@ to print at the same time.
|
||||
We can also delay and store printing tasks until a printer becomes
|
||||
available if none is online.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
## See https://docs.python.org/3/library/abc.html to learn more about this
|
||||
@@ -27,9 +28,11 @@ class TaskType(Enum):
|
||||
"""
|
||||
The different tasks supported by the printers
|
||||
"""
|
||||
|
||||
TEXT = "text"
|
||||
IMAGE = "image"
|
||||
CUT = "cut"
|
||||
QR = "qr"
|
||||
|
||||
|
||||
class PrintTask(ABC):
|
||||
@@ -37,18 +40,15 @@ class PrintTask(ABC):
|
||||
A print task holds information about what we are looking to print.
|
||||
"""
|
||||
|
||||
def __init__(self, task_type):
|
||||
def __init__(self, task_type: TaskType):
|
||||
self.task_id = self._generate_id()
|
||||
self.task_type = task_type
|
||||
self.status = "pending" # pending, processing, completed, failed
|
||||
|
||||
print("Created a new " + str(self.task_type) + " with ID " + self.task_id)
|
||||
|
||||
@abstractmethod
|
||||
def get_print_data(self):
|
||||
"""Return data formatted for printer"""
|
||||
|
||||
|
||||
def _generate_id(self):
|
||||
# Generate unique task ID
|
||||
return str(uuid.uuid4())
|
||||
@@ -68,6 +68,18 @@ class TextTask(PrintTask):
|
||||
return {"txt": self.content, "sign": self.signature}
|
||||
|
||||
|
||||
class QRTask(TextTask):
|
||||
"""This task prints a QR-Code, the signature is ignore and is always the content itself"""
|
||||
|
||||
def __init__(self, content):
|
||||
super().__init__(content, signature="")
|
||||
self.content = content
|
||||
self.signature = content
|
||||
|
||||
def get_print_data(self):
|
||||
return {"txt": self.content, "sign": self.signature}
|
||||
|
||||
|
||||
class ImageTask(PrintTask):
|
||||
"""
|
||||
This tasks represents a image content ( in the form of it's path ), and it's signature.
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
<h3 class="card-header">Print a short message</h3>
|
||||
<div class="card-body">
|
||||
<form class="form-group" action="/web/print/sms" method="post">
|
||||
<input class="form-control" type="text" name="txt" placeholder="200 chars or less " maxlength="200" required><br>
|
||||
<textarea class="form-control" type="text" name="txt" placeholder="4096 chars or less " maxlength="4096"></textarea>
|
||||
<br>
|
||||
<input class="form-control" type="text" name="signature" placeholder="Signature or pseudo" maxlength="200"><br>
|
||||
<input class="btn btn-primary float-right" type="submit" value="Imprimer" name="imprimer">
|
||||
</form>
|
||||
|
||||
57
src/web.py
57
src/web.py
@@ -1,13 +1,15 @@
|
||||
from flask import flash
|
||||
from werkzeug.utils import secure_filename
|
||||
import time
|
||||
"""
|
||||
Manage all of the inputs from a web source
|
||||
"""
|
||||
|
||||
import os
|
||||
from werkzeug.utils import secure_filename
|
||||
from task import TextTask, ImageTask, CutTask
|
||||
|
||||
|
||||
class Web(object):
|
||||
class Web:
|
||||
"""Web is the class that gets all of the information from web calls
|
||||
( API and Web page ) and provides checks before sending stuff to printing"""
|
||||
( API and Web page ) and provides checks before sending stuff to printing"""
|
||||
|
||||
def __init__(self, app, print_queue):
|
||||
super(Web).__init__()
|
||||
@@ -42,7 +44,7 @@ class Web(object):
|
||||
file_uploaded = self.upload_file(image)
|
||||
except Exception as e:
|
||||
self.app.logger.error(e)
|
||||
raise RuntimeError("Could not upload file") from e
|
||||
raise RuntimeError("Could not upload file : " + str(e)) from e
|
||||
|
||||
if file_uploaded:
|
||||
self.app.logger.debug("File has been uploaded, printing...")
|
||||
@@ -66,16 +68,19 @@ class Web(object):
|
||||
|
||||
return True
|
||||
|
||||
def login(self, username: str, password: str) -> bool:
|
||||
"""Not implemented"""
|
||||
return
|
||||
# def login(self, username: str, password: str) -> bool:
|
||||
# """Not implemented"""
|
||||
# return
|
||||
|
||||
def logout(self, username: str, password: str) -> bool:
|
||||
"""Not implemented"""
|
||||
return
|
||||
# def logout(self, username: str, password: str) -> bool:
|
||||
# """Not implemented"""
|
||||
# return
|
||||
|
||||
def allowed_file(self, filename) -> bool:
|
||||
self.app.logger.debug("Is the filename allowed ?")
|
||||
"""
|
||||
Check if the file extension is allowed
|
||||
"""
|
||||
self.app.logger.debug("Checking if the file extension is allowed")
|
||||
return (
|
||||
"." in filename
|
||||
and filename.rsplit(".", 1)[1].lower()
|
||||
@@ -83,8 +88,11 @@ class Web(object):
|
||||
)
|
||||
|
||||
def upload_file(self, image) -> bool:
|
||||
"""
|
||||
Save the file after executing checks on it
|
||||
"""
|
||||
self.app.logger.debug("Validating file")
|
||||
if image:
|
||||
if not image is None or not image == "":
|
||||
if self.allowed_file(image.filename):
|
||||
filename = secure_filename(image.filename)
|
||||
self.app.logger.debug("File valid")
|
||||
@@ -92,24 +100,23 @@ class Web(object):
|
||||
image.save(os.path.join(self.app.config["UPLOAD_FOLDER"], filename))
|
||||
except OSError as e:
|
||||
self.app.logger.error("Could not save file %s", e)
|
||||
return False
|
||||
raise RuntimeError("An OS error occured while uploading this file : " + str(e)) from e
|
||||
|
||||
self.app.logger.debug(
|
||||
"File saved to "
|
||||
+ str(os.path.join(self.app.config["UPLOAD_FOLDER"], filename))
|
||||
)
|
||||
return True
|
||||
else:
|
||||
self.app.logger.error(
|
||||
"Could not save file because the filename is forbidden"
|
||||
)
|
||||
return False
|
||||
else:
|
||||
self.app.logger.error(
|
||||
"Could not save file, it seems to be null ? : " + str(filename)
|
||||
)
|
||||
return False
|
||||
|
||||
self.app.logger.error(
|
||||
"Could not save file because the filename is forbidden"
|
||||
)
|
||||
raise RuntimeError("This file type is forbidden.")
|
||||
|
||||
def get_queue_state(self):
|
||||
"""Return current queue state"""
|
||||
return self.print_queue.get_queue_state()
|
||||
|
||||
def get_queue_completed(self):
|
||||
"""Return completed queue elements"""
|
||||
return self.print_queue.get_queue_completed()
|
||||
|
||||
154
src/worker.py
154
src/worker.py
@@ -1,84 +1,123 @@
|
||||
# This is the main printing thread
|
||||
# As explained in the task file, this is where we command
|
||||
# printing to happen.
|
||||
"""
|
||||
This is the main printing thread. A worker thread consums Tasks from
|
||||
a PrintQueue, while trying to find available printers.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
from printers import Printers
|
||||
|
||||
|
||||
class PrintWorker(threading.Thread):
|
||||
def __init__(self, app, print_queue, printer, socketio=None):
|
||||
"""
|
||||
A thread used to consume Tasks added to a Print Queue.
|
||||
|
||||
On initialisation, the worker will try to find Printers,
|
||||
and on each print, choose an available Printer from a list of Printers.
|
||||
|
||||
If a print fails, it's not retried, but will be added to a list of completed
|
||||
tasks.
|
||||
"""
|
||||
def __init__(self, app, print_queue):
|
||||
super().__init__(daemon=True)
|
||||
self.app = app
|
||||
self.print_queue = print_queue
|
||||
self.printer = printer
|
||||
self.socketio = socketio # Optional
|
||||
self.printer = None
|
||||
self._lock = threading.Lock()
|
||||
self.running = True
|
||||
self.state = "idle" # idle, printing, dead, drinking-a-beer
|
||||
|
||||
self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...")
|
||||
|
||||
try:
|
||||
self.printers = Printers(self.app)
|
||||
self.printers_obj = self.printers.printers
|
||||
self.printers = iter(self.printers.printers)
|
||||
except RuntimeError as e:
|
||||
self.app.logger.warning("Could not get any Printers")
|
||||
raise e
|
||||
|
||||
def run(self):
|
||||
"""Background thread that processes queue items"""
|
||||
self.app.logger.info("Worker started working.")
|
||||
self.app.logger.debug("Worker %s started working.", threading.get_ident())
|
||||
self.app.logger.debug("Current threads : %s" , threading.active_count())
|
||||
self.app.logger.debug("Threads actives : %s " , threading.enumerate())
|
||||
|
||||
while True:
|
||||
if not self.running or not self.printer.ready:
|
||||
|
||||
# If the printer is dead or asleep, it can't work.
|
||||
if not self.running:
|
||||
time.sleep(0.2)
|
||||
continue
|
||||
|
||||
try:
|
||||
task = self.print_queue.dequeue()
|
||||
except Exception as e:
|
||||
self.app.logger.error("Could not get a new task ! %s ", str(e))
|
||||
raise RuntimeError(
|
||||
"We could not get a new task because " + str(e)
|
||||
) from e
|
||||
|
||||
if task:
|
||||
# If we have no available printer, we look at the list printers
|
||||
# we know about, and try to find one that is available.
|
||||
# When we find a printer, we acquire it
|
||||
# When we are finished with a printer, we release it to the world.
|
||||
while not self.printer or not self.printer.ready:
|
||||
time.sleep(1)
|
||||
try:
|
||||
self.app.logger.info("Got a new task")
|
||||
self.app.logger.debug("Got task %s", task.task_id)
|
||||
self.state = "printing"
|
||||
task.status = "processing"
|
||||
self._emit_status(task.task_id, "processing")
|
||||
self.app.logger.debug("Changing printers")
|
||||
self.printer = next(self.printers)
|
||||
self.app.logger.debug(
|
||||
"The worker got a %s printer and it's %s",
|
||||
self.printer.printer_type,
|
||||
"Ready" if self.printer.ready else "Not ready",
|
||||
)
|
||||
except Exception as e:
|
||||
self.app.logger.error("No printer detected" + str(e))
|
||||
self.printer = None
|
||||
|
||||
print_data = task.get_print_data()
|
||||
if self.state != "idle":
|
||||
self.app.logger("We are not idle, waiting...")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
self.state = "printing"
|
||||
|
||||
with self._lock:
|
||||
try:
|
||||
task = self.print_queue.dequeue()
|
||||
except Exception as e:
|
||||
self.app.logger.error("Could not get a new task ! %s ", str(e))
|
||||
self.state = "idle"
|
||||
raise RuntimeError(
|
||||
"We could not get a new task because " + str(e)
|
||||
) from e
|
||||
|
||||
if task:
|
||||
try:
|
||||
self.printer.print_task(task.task_type, print_data)
|
||||
self.app.logger.info("Got a new task")
|
||||
self.app.logger.debug("Got task %s", task.task_id)
|
||||
task.status = "processing"
|
||||
|
||||
print_data = task.get_print_data()
|
||||
|
||||
try:
|
||||
self.printer.print_task(task.task_type, print_data)
|
||||
except RuntimeError as e:
|
||||
self.state = "idle"
|
||||
self.app.logger.error("Could not print : %s", str(e))
|
||||
raise e
|
||||
|
||||
task.status = "completed"
|
||||
self.print_queue.mark_completed(task.task_id, "completed")
|
||||
self.app.logger.debug(
|
||||
"Finished printing task %s " , task.task_id
|
||||
)
|
||||
self.state = "idle"
|
||||
|
||||
except RuntimeError as e:
|
||||
self.app.logger.error("Could not print : %s", str(e))
|
||||
raise e
|
||||
task.status = "failed"
|
||||
self.state = "idle"
|
||||
self.print_queue.mark_completed(task.task_id, "failed")
|
||||
self.app.logger.error(
|
||||
"Could not print task %s because %s " , task.task_id, str(e)
|
||||
)
|
||||
|
||||
task.status = "completed"
|
||||
self.print_queue.mark_completed(task.task_id, "completed")
|
||||
self._emit_status(task.task_id, "completed")
|
||||
|
||||
except RuntimeError as e:
|
||||
task.status = "failed"
|
||||
self.print_queue.mark_completed(task.task_id, "failed")
|
||||
self._emit_status(task.task_id, "failed", error=str(e))
|
||||
print(f"Print task {task.task_id} failed: {e}")
|
||||
else:
|
||||
# When they are no new tasks to handle, we put the thread to sleep.
|
||||
self.state = "idle"
|
||||
time.sleep(0.1)
|
||||
|
||||
def _emit_status(self, task_id, status, error=None):
|
||||
"""Emit status update via Socket.IO if available"""
|
||||
if not self.socketio:
|
||||
return
|
||||
|
||||
room = f"task_{task_id}"
|
||||
data = {
|
||||
"task_id": task_id,
|
||||
"status": status,
|
||||
"position": None, # Task no longer in queue
|
||||
}
|
||||
|
||||
if error:
|
||||
data["error"] = error
|
||||
|
||||
self.socketio.emit("task_status", data, room=room)
|
||||
else:
|
||||
# When they are no new tasks to handle, we put the thread to sleep.
|
||||
self.state = "idle"
|
||||
time.sleep(0.1)
|
||||
|
||||
def stop_worker(self):
|
||||
"""
|
||||
@@ -104,4 +143,5 @@ class PrintWorker(threading.Thread):
|
||||
"is_running": self.running,
|
||||
"queue_size": len(self.print_queue),
|
||||
"state": self.state,
|
||||
"printers": len(self.printers_obj),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user